Skip to content

Commit 844665c

Browse files
committed
Improve
1 parent 2292357 commit 844665c

File tree

3 files changed

+42
-26
lines changed

3 files changed

+42
-26
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ const-oid = { version = "0.9.6", default-features = false }
4646
constant_time_eq = { version = "0.4.2" }
4747
fail = { version = "0.5.1", default-features = false }
4848
futures = { version = "0.3.31", default-features = false }
49-
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "4759f728b9083f2288d44bec9338207d8d54e5ec" }
49+
gcp-bigquery-client = { git = "https://github.com/iambriccardo/gcp-bigquery-client", default-features = false, rev = "a1cc7895afce36c0c86cd71bab94253fef04f05c" }
5050
iceberg = { version = "0.7.0", default-features = false }
5151
iceberg-catalog-rest = { version = "0.7.0", default-features = false }
5252
insta = { version = "1.43.1", default-features = false }

etl-destinations/src/bigquery/client.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,17 @@ impl BigQueryClient {
394394
/// which can be processed concurrently.
395395
/// If ordering guarantees are needed, all data for a given table must be included
396396
/// in a single batch.
397-
pub async fn stream_table_batches_concurrent(
397+
pub async fn stream_table_batches_concurrent<I>(
398398
&self,
399-
table_batches: Arc<[TableBatch<BigQueryTableRow>]>,
399+
table_batches: I,
400400
max_concurrent_streams: usize,
401-
) -> EtlResult<(usize, usize)> {
402-
if table_batches.is_empty() {
401+
) -> EtlResult<(usize, usize)>
402+
where
403+
I: IntoIterator<Item = Arc<TableBatch<BigQueryTableRow>>>,
404+
I::IntoIter: ExactSizeIterator,
405+
{
406+
let table_batches = table_batches.into_iter();
407+
if table_batches.len() == 0 {
403408
return Ok((0, 0));
404409
}
405410

@@ -462,14 +467,14 @@ impl BigQueryClient {
462467
/// Creates a TableBatch for a specific table with validated rows.
463468
///
464469
/// Converts TableRow instances to BigQueryTableRow and creates a properly configured
465-
/// TableBatch with the appropriate stream name and table descriptor.
470+
/// TableBatch wrapped in Arc for efficient sharing and retry operations.
466471
pub fn create_table_batch(
467472
&self,
468473
dataset_id: &BigQueryDatasetId,
469474
table_id: &BigQueryTableId,
470-
table_descriptor: Arc<TableDescriptor>,
475+
table_descriptor: TableDescriptor,
471476
rows: Vec<TableRow>,
472-
) -> EtlResult<TableBatch<BigQueryTableRow>> {
477+
) -> EtlResult<Arc<TableBatch<BigQueryTableRow>>> {
473478
let validated_rows = rows
474479
.into_iter()
475480
.map(BigQueryTableRow::try_from)
@@ -484,11 +489,11 @@ impl BigQueryClient {
484489
table_id.to_string(),
485490
);
486491

487-
Ok(TableBatch::new(
488-
Arc::new(stream_name),
492+
Ok(Arc::new(TableBatch::new(
493+
stream_name,
489494
table_descriptor,
490-
validated_rows.into(),
491-
))
495+
validated_rows,
496+
)))
492497
}
493498

494499
/// Executes a BigQuery SQL query and returns the result set.

etl-destinations/src/bigquery/core.rs

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ where
328328
&self,
329329
replicated_table_schema: &ReplicatedTableSchema,
330330
use_cdc_sequence_column: bool,
331-
) -> EtlResult<(SequencedBigQueryTableId, Arc<TableDescriptor>)> {
331+
) -> EtlResult<(SequencedBigQueryTableId, TableDescriptor)> {
332332
// We hold the lock for the entire preparation to avoid race conditions since the consistency
333333
// of this code path is critical.
334334
let mut inner = self.inner.lock().await;
@@ -388,12 +388,17 @@ where
388388
)
389389
.await?;
390390

391+
// Note: We return TableDescriptor by value for simplicity, which means callers clone it
392+
// when creating multiple batches. This is acceptable because the descriptor is small
393+
// (one String per column) and the cost is negligible compared to network I/O. If profiling
394+
// shows this is a bottleneck, we could wrap it in Arc here and use Arc::unwrap_or_clone
395+
// at the call site to avoid redundant clones.
391396
let table_descriptor = BigQueryClient::column_schemas_to_table_descriptor(
392397
replicated_table_schema,
393398
use_cdc_sequence_column,
394399
);
395400

396-
Ok((sequenced_bigquery_table_id, Arc::new(table_descriptor)))
401+
Ok((sequenced_bigquery_table_id, table_descriptor))
397402
}
398403

399404
/// Adds a table to the creation cache to avoid redundant existence checks.
@@ -516,7 +521,7 @@ where
516521
}
517522

518523
// Stream with schema mismatch retry.
519-
let (bytes_sent, bytes_received) = self.stream_with_schema_retry(table_batches).await?;
524+
let (bytes_sent, bytes_received) = self.stream_with_schema_retry(&table_batches).await?;
520525

521526
if bytes_sent > 0 {
522527
// Logs with egress_metric = true can be used to identify egress logs.
@@ -753,22 +758,28 @@ where
753758
/// may cache stale schema information and reject inserts with "extra field" errors.
754759
/// This method retries streaming operations when such errors are detected, allowing
755760
/// time for the schema cache to refresh.
756-
async fn stream_with_schema_retry<T>(&self, table_batches: T) -> EtlResult<(usize, usize)>
757-
where
758-
T: Into<Arc<[TableBatch<BigQueryTableRow>]>>,
759-
{
760-
let retry_delay = Duration::from_millis(SCHEMA_MISMATCH_RETRY_DELAY_MS);
761-
let mut attempts = 0;
762-
763-
let table_batches = table_batches.into();
761+
///
762+
/// Takes a slice of `Arc<TableBatch>` to enable efficient retries - on each attempt,
763+
/// we iterate over the slice and clone the `Arc`s (O(1) per batch) rather than
764+
/// recreating the batches.
765+
async fn stream_with_schema_retry(
766+
&self,
767+
table_batches: &[Arc<TableBatch<BigQueryTableRow>>],
768+
) -> EtlResult<(usize, usize)> {
764769
if table_batches.is_empty() {
765770
return Ok((0, 0));
766771
}
767772

773+
let retry_delay = Duration::from_millis(SCHEMA_MISMATCH_RETRY_DELAY_MS);
774+
let mut attempts = 0;
775+
768776
loop {
777+
// Clone the Arc references (O(1) per batch) to create an iterator for this attempt.
778+
let batches_iter = table_batches.iter().cloned();
779+
769780
match self
770781
.client
771-
.stream_table_batches_concurrent(table_batches.clone(), self.max_concurrent_streams)
782+
.stream_table_batches_concurrent(batches_iter, self.max_concurrent_streams)
772783
.await
773784
{
774785
Ok(result) => return Ok(result),
@@ -878,7 +889,7 @@ where
878889
if !table_id_to_data.is_empty() {
879890
// Prepare batch metadata for all tables before streaming.
880891
// This collects (sequenced_table_id, table_descriptor, rows) for retry support.
881-
let mut prepared_data: Vec<(String, Arc<TableDescriptor>, Vec<TableRow>)> =
892+
let mut prepared_data: Vec<(String, TableDescriptor, Vec<TableRow>)> =
882893
Vec::with_capacity(table_id_to_data.len());
883894

884895
for (_, (replicated_table_schema, table_rows)) in table_id_to_data {
@@ -907,7 +918,7 @@ where
907918

908919
// Stream with schema mismatch retry.
909920
let (bytes_sent, bytes_received) =
910-
self.stream_with_schema_retry(table_batches).await?;
921+
self.stream_with_schema_retry(&table_batches).await?;
911922

912923
if bytes_sent > 0 {
913924
// Logs with egress_metric = true can be used to identify egress logs.

0 commit comments

Comments
 (0)