Skip to content

Commit

Permalink
feat: add reference_file_schema_uri to LoadJobConfig, ExternalConfig (
Browse files Browse the repository at this point in the history
#1399)

* feat: add 'reference_file_schema_uri' to LoadJobConfig and ExternalConfig
  • Loading branch information
aribray authored Nov 14, 2022
1 parent 5d3e5d3 commit 931285f
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 5 deletions.
14 changes: 14 additions & 0 deletions google/cloud/bigquery/external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,20 @@ def hive_partitioning(self, value):
prop = value.to_api_repr() if value is not None else None
self._properties["hivePartitioningOptions"] = prop

@property
def reference_file_schema_uri(self):
"""Optional[str]:
When creating an external table, the user can provide a reference file with the
table schema. This is enabled for the following formats:
AVRO, PARQUET, ORC
"""
return self._properties.get("referenceFileSchemaUri")

@reference_file_schema_uri.setter
def reference_file_schema_uri(self, value):
self._properties["referenceFileSchemaUri"] = value

@property
def ignore_unknown_values(self):
"""bool: If :data:`True`, extra values that are not represented in the
Expand Down
21 changes: 21 additions & 0 deletions google/cloud/bigquery/job/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,20 @@ def range_partitioning(self, value):
)
self._set_sub_prop("rangePartitioning", resource)

@property
def reference_file_schema_uri(self):
"""Optional[str]:
When creating an external table, the user can provide a reference file with the
table schema. This is enabled for the following formats:
AVRO, PARQUET, ORC
"""
return self._get_sub_prop("referenceFileSchemaUri")

@reference_file_schema_uri.setter
def reference_file_schema_uri(self, value):
return self._set_sub_prop("referenceFileSchemaUri", value)

@property
def schema(self):
"""Optional[Sequence[Union[ \
Expand Down Expand Up @@ -651,6 +665,13 @@ def quote_character(self):
"""
return self._configuration.quote_character

@property
def reference_file_schema_uri(self):
"""See:
attr:`google.cloud.bigquery.job.LoadJobConfig.reference_file_schema_uri`.
"""
return self._configuration.reference_file_schema_uri

@property
def skip_leading_rows(self):
"""See
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ python-dateutil==2.7.3
requests==2.21.0
Shapely==1.6.4.post2
six==1.13.0
tqdm==4.7.4
tqdm==4.7.4
203 changes: 203 additions & 0 deletions tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@
),
]

SOURCE_URIS_AVRO = [
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro",
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.avro",
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.avro",
]
SOURCE_URIS_PARQUET = [
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet",
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.parquet",
"gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.parquet",
]
REFERENCE_FILE_SCHEMA_URI_AVRO = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro"
REFERENCE_FILE_SCHEMA_URI_PARQUET = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.parquet"


# The VPC-SC team maintains a mirror of the GCS bucket used for code
# samples. The public bucket crosses the configured security boundary.
# See: https://github.com/googleapis/google-cloud-python/issues/8550
Expand Down Expand Up @@ -1052,6 +1066,195 @@ def test_load_table_from_file_w_explicit_location(self):
table_ref, "gs://{}/letters-us.csv".format(bucket_name), location="US"
).result()

def test_create_external_table_with_reference_file_schema_uri_avro(self):
client = Config.CLIENT
dataset_id = _make_dataset_id("external_reference_file_avro")
self.temp_dataset(dataset_id)
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
table_id = "test_ref_file_avro"
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)

expected_schema = [
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
]

# By default, the table should have the c-twitter schema because it is lexicographically last
# in the `SOURCE_URIs` list:
# a-twitter schema: (username, tweet, timestamp, likes)
# b-twitter schema: (username, tweet, timestamp)
# c-twitter schema: (username, tweet)

# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema

# Create external data configuration
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.AVRO)
external_config.source_uris = SOURCE_URIS_AVRO
external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO

table = bigquery.Table(table_ref)
table.external_data_configuration = external_config

table = client.create_table(table)

# Get table created by the create_table API call
generated_table = client.get_table(table_ref)

self.assertEqual(generated_table.schema, expected_schema)
self.assertEqual(
generated_table.external_data_configuration._properties[
"referenceFileSchemaUri"
],
REFERENCE_FILE_SCHEMA_URI_AVRO,
)

# Clean up test
self.to_delete.insert(0, generated_table)

def test_load_table_from_uri_with_reference_file_schema_uri_avro(self):
dataset_id = _make_dataset_id("test_reference_file_avro")
self.temp_dataset(dataset_id)
client = Config.CLIENT
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
table_id = "test_ref_file_avro"
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)

expected_schema = [
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
]

# By default, the table should have the c-twitter schema because it is lexicographically last
# in the `SOURCE_URIS` list:
# a-twitter schema: (username, tweet, timestamp, likes)
# b-twitter schema: (username, tweet, timestamp)
# c-twitter schema: (username, tweet)

# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema

# Create load job configuration
load_job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.AVRO
)
load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_AVRO

load_job = client.load_table_from_uri(
source_uris=SOURCE_URIS_AVRO,
destination=table_ref,
job_config=load_job_config,
)
# Wait for load job to complete
result = load_job.result()

# Get table created by the load job
generated_table = client.get_table(table_ref)
self.assertEqual(generated_table.schema, expected_schema)
self.assertEqual(
result._properties["configuration"]["load"]["referenceFileSchemaUri"],
REFERENCE_FILE_SCHEMA_URI_AVRO,
)

# Clean up test
self.to_delete.insert(0, generated_table)

def test_create_external_table_with_reference_file_schema_uri_parquet(self):
client = Config.CLIENT
dataset_id = _make_dataset_id("external_table_ref_file_parquet")
self.temp_dataset(dataset_id)
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
table_id = "test_ref_file_parquet"
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)

expected_schema = [
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
]

# By default, the table should have the c-twitter schema because it is lexicographically last
# in the `SOURCE_URIS` list:
# a-twitter schema: (username, tweet, timestamp, likes)
# b-twitter schema: (username, tweet, timestamp)
# c-twitter schema: (username, tweet)

# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema

# Create external data configuration
external_config = bigquery.ExternalConfig(bigquery.ExternalSourceFormat.PARQUET)
external_config.source_uris = SOURCE_URIS_PARQUET
external_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET

table = bigquery.Table(table_ref)
table.external_data_configuration = external_config

table = client.create_table(table)

# Get table created by the create_table API call
generated_table = client.get_table(table_ref)
self.assertEqual(generated_table.schema, expected_schema)
self.assertEqual(
generated_table.external_data_configuration._properties[
"referenceFileSchemaUri"
],
REFERENCE_FILE_SCHEMA_URI_PARQUET,
)

# Clean up test
self.to_delete.insert(0, generated_table)

def test_load_table_from_uri_with_reference_file_schema_uri_parquet(self):
dataset_id = _make_dataset_id("test_reference_file_parquet")
self.temp_dataset(dataset_id)
client = Config.CLIENT
dataset_ref = bigquery.DatasetReference(client.project, dataset_id)
table_id = "test_ref_file_parquet"
table_ref = bigquery.TableReference(dataset_ref=dataset_ref, table_id=table_id)

expected_schema = [
bigquery.SchemaField("username", "STRING", mode="NULLABLE"),
bigquery.SchemaField("tweet", "STRING", mode="NULLABLE"),
bigquery.SchemaField("timestamp", "STRING", mode="NULLABLE"),
bigquery.SchemaField("likes", "INTEGER", mode="NULLABLE"),
]

# By default, the table should have the c-twitter schema because it is lexicographically last
# in the `SOURCE_URIS` list:
# a-twitter schema: (username, tweet, timestamp, likes)
# b-twitter schema: (username, tweet, timestamp)
# c-twitter schema: (username, tweet)

# Because `referenceFileSchemaUri` is set as a-twitter, the table will have a-twitter schema

# Create load job configuration
load_job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET
)
load_job_config.reference_file_schema_uri = REFERENCE_FILE_SCHEMA_URI_PARQUET

load_job = client.load_table_from_uri(
source_uris=SOURCE_URIS_PARQUET,
destination=table_ref,
job_config=load_job_config,
)
# Wait for load job to complete
result = load_job.result()

# Get table created by the load job
generated_table = client.get_table(table_ref)
self.assertEqual(generated_table.schema, expected_schema)
self.assertEqual(
result._properties["configuration"]["load"]["referenceFileSchemaUri"],
REFERENCE_FILE_SCHEMA_URI_PARQUET,
)

# Clean up test
self.to_delete.insert(0, generated_table)

def _write_csv_to_storage(self, bucket_name, blob_name, header_row, data_rows):
from google.cloud._testing import _NamedTemporaryFile

Expand Down
5 changes: 1 addition & 4 deletions tests/unit/job/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,6 @@ def test_result_default_wo_state(self):
conn = make_connection(
_make_retriable_exception(),
begun_job_resource,
_make_retriable_exception(),
done_job_resource,
)
client = _make_client(project=self.PROJECT, connection=conn)
Expand All @@ -963,9 +962,7 @@ def test_result_default_wo_state(self):
query_params={"location": "US"},
timeout=None,
)
conn.api_request.assert_has_calls(
[begin_call, begin_call, reload_call, reload_call]
)
conn.api_request.assert_has_calls([begin_call, begin_call, reload_call])

def test_result_w_retry_wo_state(self):
begun_job_resource = _make_job_resource(
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/job/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def _setUpConstants(self):
self.INPUT_BYTES = 12345
self.OUTPUT_BYTES = 23456
self.OUTPUT_ROWS = 345
self.REFERENCE_FILE_SCHEMA_URI = "gs://path/to/reference"

def _make_resource(self, started=False, ended=False):
resource = super(TestLoadJob, self)._make_resource(started, ended)
Expand All @@ -47,6 +48,7 @@ def _make_resource(self, started=False, ended=False):
"datasetId": self.DS_ID,
"tableId": self.TABLE_ID,
}
config["referenceFileSchemaUri"] = self.REFERENCE_FILE_SCHEMA_URI

if ended:
resource["status"] = {"state": "DONE"}
Expand Down Expand Up @@ -136,6 +138,12 @@ def _verifyResourceProperties(self, job, resource):
self.assertEqual(str(job.skip_leading_rows), config["skipLeadingRows"])
else:
self.assertIsNone(job.skip_leading_rows)
if "referenceFileSchemaUri" in config:
self.assertEqual(
job.reference_file_schema_uri, config["referenceFileSchemaUri"]
)
else:
self.assertIsNone(job.reference_file_schema_uri)

if "destinationEncryptionConfiguration" in config:
self.assertIsNotNone(job.destination_encryption_configuration)
Expand Down Expand Up @@ -186,6 +194,7 @@ def test_ctor(self):
self.assertIsNone(job.use_avro_logical_types)
self.assertIsNone(job.clustering_fields)
self.assertIsNone(job.schema_update_options)
self.assertIsNone(job.reference_file_schema_uri)

def test_ctor_w_config(self):
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -461,6 +470,7 @@ def test_begin_w_bound_client(self):
"datasetId": self.DS_ID,
"tableId": self.TABLE_ID,
},
"referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI,
}
},
},
Expand Down Expand Up @@ -503,6 +513,7 @@ def test_begin_w_autodetect(self):
"datasetId": self.DS_ID,
"tableId": self.TABLE_ID,
},
"referenceFileSchemaUri": self.REFERENCE_FILE_SCHEMA_URI,
"autodetect": True,
}
},
Expand Down Expand Up @@ -585,6 +596,7 @@ def test_begin_w_alternate_client(self):
config.use_avro_logical_types = True
config.write_disposition = WriteDisposition.WRITE_TRUNCATE
config.schema_update_options = [SchemaUpdateOption.ALLOW_FIELD_ADDITION]
config.reference_file_schema_uri = "gs://path/to/reference"
with mock.patch(
"google.cloud.bigquery.opentelemetry_tracing._get_final_span_attributes"
) as final_attributes:
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/test_external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def test_connection_id(self):
ec.connection_id = "path/to/connection"
self.assertEqual(ec.connection_id, "path/to/connection")

def test_reference_file_schema_uri(self):
ec = external_config.ExternalConfig("")
self.assertIsNone(ec.reference_file_schema_uri)
ec.reference_file_schema_uri = "path/to/reference"
self.assertEqual(ec.reference_file_schema_uri, "path/to/reference")

def test_schema_None(self):
ec = external_config.ExternalConfig("")
ec.schema = None
Expand Down

0 comments on commit 931285f

Please sign in to comment.