Skip to content

Commit b571e94

Browse files
committed
Improve
1 parent 4b1ced6 commit b571e94

File tree

2 files changed

+9
-11
lines changed

2 files changed

+9
-11
lines changed

etl-destinations/src/bigquery/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,11 @@ fn bq_error_to_etl_error(err: BQError) -> EtlError {
849849
== "The caller does not have permission to execute the specified operation"
850850
{
851851
(ErrorKind::PermissionDenied, "BigQuery permission denied")
852+
} else if is_schema_mismatch_message(status.message()) {
853+
(
854+
ErrorKind::DestinationSchemaMismatch,
855+
"BigQuery schema mismatch",
856+
)
852857
} else {
853858
(ErrorKind::DestinationError, "BigQuery gRPC status error")
854859
}

etl-destinations/tests/bigquery_pipeline.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,14 +1776,10 @@ async fn table_validation_out_of_bounds_values() {
17761776
);
17771777
}
17781778

1779-
// TODO: These schema change tests are currently failing because BigQuery's Storage Write API
1780-
// caches schema information and doesn't immediately reflect DDL changes. After ALTER TABLE
1781-
// adds a column, the streaming API rejects inserts with the new column as "extra fields".
1782-
// This requires either:
1783-
// 1. Retry logic on schema mismatch errors
1784-
// 2. Falling back to batch loading after schema changes
1785-
// 3. Creating a new table version with the new schema
1786-
#[ignore = "BigQuery Storage Write API schema cache doesn't immediately reflect DDL changes"]
1779+
// Schema change tests verify that DDL changes (ADD/DROP/RENAME COLUMN) are correctly
1780+
// replicated to BigQuery. The Storage Write API caches schema information, so after DDL
1781+
// changes, initial inserts may fail with "extra fields" errors until the cache refreshes.
1782+
// The retry logic in `stream_with_schema_retry` handles this by retrying on schema mismatch.
17871783
#[tokio::test(flavor = "multi_thread")]
17881784
async fn table_schema_change_add_column() {
17891785
init_test_tracing();
@@ -1897,7 +1893,6 @@ async fn table_schema_change_add_column() {
18971893
assert_eq!(rows.len(), 2);
18981894
}
18991895

1900-
#[ignore = "BigQuery Storage Write API schema cache doesn't immediately reflect DDL changes"]
19011896
#[tokio::test(flavor = "multi_thread")]
19021897
async fn table_schema_change_drop_column() {
19031898
init_test_tracing();
@@ -2012,7 +2007,6 @@ async fn table_schema_change_drop_column() {
20122007
assert_eq!(rows.len(), 2);
20132008
}
20142009

2015-
#[ignore = "BigQuery Storage Write API schema cache doesn't immediately reflect DDL changes"]
20162010
#[tokio::test(flavor = "multi_thread")]
20172011
async fn table_schema_change_rename_column() {
20182012
init_test_tracing();
@@ -2118,7 +2112,6 @@ async fn table_schema_change_rename_column() {
21182112
assert_eq!(rows.len(), 2);
21192113
}
21202114

2121-
#[ignore = "BigQuery Storage Write API schema cache doesn't immediately reflect DDL changes"]
21222115
#[tokio::test(flavor = "multi_thread")]
21232116
async fn table_schema_change_multiple_operations() {
21242117
init_test_tracing();

0 commit comments

Comments
 (0)