Skip to content

Commit 9aa8a8e

Browse files
committed
Improve
1 parent b4a83df commit 9aa8a8e

File tree

7 files changed

+136
-73
lines changed

7 files changed

+136
-73
lines changed

etl-destinations/src/bigquery/core.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use etl::destination::Destination;
22
use etl::error::{ErrorKind, EtlError, EtlResult};
33
use etl::store::schema::SchemaStore;
4-
use etl::store::state::{DestinationSchemaState, DestinationSchemaStateType, StateStore};
4+
use etl::store::state::{DestinationSchemaState, StateStore};
55
use etl::types::{
66
Cell, Event, ReplicatedTableSchema, SchemaDiff, TableId, TableName, TableRow,
77
generate_sequence_number,
@@ -346,6 +346,18 @@ where
346346
// Note that if the table is deleted outside ETL and the cache marks it as created, the
347347
// inserts will fail because the table will be missing and won't be created.
348348
if !inner.created_tables.contains(&sequenced_bigquery_table_id) {
349+
let snapshot_id = replicated_table_schema.get_inner().snapshot_id;
350+
let replication_mask = replicated_table_schema.replication_mask().clone();
351+
352+
// Mark the state as applying BEFORE creating the table. This ensures that if the
353+
// process crashes during table creation, we'll detect the incomplete state on restart.
354+
self.state_store
355+
.store_destination_schema_state(
356+
table_id,
357+
DestinationSchemaState::applying(snapshot_id, replication_mask.clone()),
358+
)
359+
.await?;
360+
349361
self.client
350362
.create_table_if_missing(
351363
&self.dataset_id,
@@ -356,17 +368,11 @@ where
356368
)
357369
.await?;
358370

359-
// Store the initial destination schema state with the current snapshot and mask.
360-
// This is needed so that when schema changes occur, we can reconstruct the old
361-
// ReplicatedTableSchema for accurate diffing.
371+
// Mark as applied after successful table creation.
362372
self.state_store
363373
.store_destination_schema_state(
364374
table_id,
365-
DestinationSchemaState {
366-
state: DestinationSchemaStateType::Applied,
367-
snapshot_id: replicated_table_schema.get_inner().snapshot_id,
368-
replication_mask: replicated_table_schema.replication_mask().clone(),
369-
},
375+
DestinationSchemaState::applied(snapshot_id, replication_mask),
370376
)
371377
.await?;
372378

@@ -572,7 +578,7 @@ where
572578
)
573579
);
574580
}
575-
Some(state) if state.state == DestinationSchemaStateType::Applying => {
581+
Some(state) if state.is_applying() => {
576582
// A previous schema change was interrupted, require manual intervention.
577583
// The previous valid snapshot_id can be derived from table_schemas table
578584
// by finding the second-highest snapshot_id for this table.
@@ -587,7 +593,7 @@ where
587593
)
588594
);
589595
}
590-
Some(state) if state.state == DestinationSchemaStateType::Applied => {
596+
Some(state) if state.is_applied() => {
591597
let current_snapshot_id = state.snapshot_id;
592598
let current_replication_mask = state.replication_mask.clone();
593599

@@ -626,15 +632,16 @@ where
626632
let old_schema =
627633
ReplicatedTableSchema::from_mask(old_table_schema, current_replication_mask);
628634

635+
let new_replication_mask = new_schema.replication_mask().clone();
636+
629637
// Mark as applying before making changes (with the NEW snapshot_id and mask).
630638
self.state_store
631639
.store_destination_schema_state(
632640
table_id,
633-
DestinationSchemaState {
634-
state: DestinationSchemaStateType::Applying,
635-
snapshot_id: new_snapshot_id,
636-
replication_mask: new_schema.replication_mask().clone(),
637-
},
641+
DestinationSchemaState::applying(
642+
new_snapshot_id,
643+
new_replication_mask.clone(),
644+
),
638645
)
639646
.await?;
640647

@@ -652,11 +659,7 @@ where
652659
self.state_store
653660
.store_destination_schema_state(
654661
table_id,
655-
DestinationSchemaState {
656-
state: DestinationSchemaStateType::Applied,
657-
snapshot_id: new_snapshot_id,
658-
replication_mask: new_schema.replication_mask().clone(),
659-
},
662+
DestinationSchemaState::applied(new_snapshot_id, new_replication_mask),
660663
)
661664
.await?;
662665

etl-destinations/tests/bigquery_pipeline.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
44
use etl::config::BatchConfig;
55
use etl::error::ErrorKind;
66
use etl::state::table::TableReplicationPhaseType;
7+
use etl::store::state::StateStore;
78
use etl::test_utils::database::{spawn_source_database, test_table_name};
89
use etl::test_utils::notify::NotifyingStore;
910
use etl::test_utils::pipeline::{create_pipeline, create_pipeline_with};
@@ -1776,13 +1777,6 @@ async fn table_validation_out_of_bounds_values() {
17761777
);
17771778
}
17781779

1779-
/// Tests schema change support (ADD COLUMN, DROP COLUMN, RENAME COLUMN) in BigQuery.
1780-
///
1781-
/// This test verifies that DDL changes are correctly replicated to BigQuery by applying
1782-
/// multiple schema changes (rename, drop, add) and verifying the final schema matches.
1783-
/// The Storage Write API caches schema information, so after DDL changes, initial inserts
1784-
/// may fail with "extra fields" errors until the cache refreshes. The retry logic in
1785-
/// `stream_with_schema_retry` handles this by retrying on schema mismatch.
17861780
#[tokio::test(flavor = "multi_thread")]
17871781
async fn table_schema_change() {
17881782
init_test_tracing();
@@ -1830,7 +1824,7 @@ async fn table_schema_change() {
18301824
pipeline.start().await.unwrap();
18311825
table_sync_done.notified().await;
18321826

1833-
// Insert initial row.
1827+
// Insert the initial row.
18341828
let event_notify = destination
18351829
.wait_for_events_count(vec![(EventType::Insert, 1)])
18361830
.await;
@@ -1853,9 +1847,21 @@ async fn table_schema_change() {
18531847
.unwrap();
18541848
initial_schema.assert_columns(&["id", "name", "age", "status"]);
18551849

1850+
// Verify destination schema state is applied after initial table creation.
1851+
let initial_state = store
1852+
.get_destination_schema_state(&table_id)
1853+
.await
1854+
.unwrap()
1855+
.expect("destination schema state should exist after table creation");
1856+
assert!(
1857+
initial_state.is_applied(),
1858+
"initial destination schema state should be applied"
1859+
);
1860+
let initial_snapshot_id = initial_state.snapshot_id;
1861+
18561862
// Apply multiple schema changes:
18571863
// 1. Rename name -> full_name
1858-
// 2. Drop status column
1864+
// 2. Drop the status column
18591865
// 3. Add email column
18601866
//
18611867
// Note: Each DDL change is captured via the DDL event trigger and stored in the schema
@@ -1913,7 +1919,7 @@ async fn table_schema_change() {
19131919

19141920
pipeline.shutdown_and_wait().await.unwrap();
19151921

1916-
// Verify final schema:
1922+
// Verify the final schema:
19171923
// - name should be renamed to full_name
19181924
// - status should be dropped
19191925
// - email should be added
@@ -1925,6 +1931,21 @@ async fn table_schema_change() {
19251931
final_schema.assert_no_column("name");
19261932
final_schema.assert_no_column("status");
19271933

1934+
// Verify destination schema state is applied after schema changes.
1935+
let final_state = store
1936+
.get_destination_schema_state(&table_id)
1937+
.await
1938+
.unwrap()
1939+
.expect("destination schema state should exist after schema change");
1940+
assert!(
1941+
final_state.is_applied(),
1942+
"final destination schema state should be applied"
1943+
);
1944+
assert!(
1945+
final_state.snapshot_id > initial_snapshot_id,
1946+
"snapshot_id should have increased after schema change"
1947+
);
1948+
19281949
// Verify data was inserted correctly.
19291950
let rows = bigquery_database.query_table(table_name).await.unwrap();
19301951
assert_eq!(rows.len(), 2);

etl-destinations/tests/support/bigquery.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,7 @@ impl BigQueryDatabase {
166166
/// Returns the column names and data types from INFORMATION_SCHEMA.COLUMNS.
167167
/// The table name pattern matches using REGEXP_CONTAINS to match the sequenced
168168
/// table name format: `{table_id}_{sequence_number}`.
169-
pub async fn query_table_schema(
170-
&self,
171-
table_name: TableName,
172-
) -> Option<BigQueryTableSchema> {
169+
pub async fn query_table_schema(&self, table_name: TableName) -> Option<BigQueryTableSchema> {
173170
let client = self.client().unwrap();
174171

175172
let project_id = self.project_id();
@@ -1129,8 +1126,7 @@ impl BigQueryTableSchema {
11291126

11301127
assert_eq!(
11311128
actual_sorted, expected_sorted,
1132-
"schema columns mismatch. Expected: {:?}, Actual: {:?}",
1133-
expected_sorted, actual_sorted
1129+
"schema columns mismatch. Expected: {expected_sorted:?}, Actual: {actual_sorted:?}"
11341130
);
11351131
}
11361132

etl-postgres/src/types/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ impl TableSchema {
252252
/// Each element is either 0 (not replicated) or 1 (replicated), with indices
253253
/// corresponding to the columns in the table schema. Wrapped in [`Arc`] for
254254
/// efficient sharing across multiple events.
255-
#[derive(Debug, Clone)]
255+
#[derive(Debug, Clone, PartialEq, Eq)]
256256
pub struct ReplicationMask(Arc<Vec<u8>>);
257257

258258
impl ReplicationMask {

etl/src/store/state/base.rs

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,10 @@
1-
use etl_postgres::types::{ReplicationMask, SnapshotId, TableId};
1+
use etl_postgres::types::TableId;
22
use std::{collections::HashMap, future::Future};
33

44
use crate::error::EtlResult;
55
use crate::state::table::TableReplicationPhase;
66

7-
/// The state of a schema change operation.
8-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9-
pub enum DestinationSchemaStateType {
10-
/// A schema change is currently being applied.
11-
///
12-
/// If the system restarts and finds this state, it indicates that a previous
13-
/// schema change was interrupted and manual intervention may be required.
14-
/// The previous valid snapshot_id can be derived from table_schemas table.
15-
Applying,
16-
/// The schema has been successfully applied.
17-
Applied,
18-
}
19-
20-
/// Represents the state of the schema at a destination.
21-
///
22-
/// Used to track which schema version is currently applied at a destination
23-
/// and to detect interrupted schema changes that require recovery.
24-
///
25-
/// This structure tracks both the snapshot_id and the replication mask, which
26-
/// is needed to correctly reconstruct the [`ReplicatedTableSchema`] for diffing
27-
/// when schema changes occur.
28-
#[derive(Debug, Clone)]
29-
pub struct DestinationSchemaState {
30-
/// The current state of the schema change operation.
31-
pub state: DestinationSchemaStateType,
32-
/// The current snapshot_id at the destination.
33-
pub snapshot_id: SnapshotId,
34-
/// The replication mask indicating which columns are replicated.
35-
///
36-
/// This is stored alongside the snapshot_id so that when a schema change
37-
/// occurs, we can reconstruct the old [`ReplicatedTableSchema`] with the
38-
/// correct mask for accurate diffing.
39-
pub replication_mask: ReplicationMask,
40-
}
7+
use super::DestinationSchemaState;
418

429
/// Trait for storing and retrieving table replication state and mapping information.
4310
///
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use etl_postgres::types::{ReplicationMask, SnapshotId};
2+
3+
/// The state of a schema change operation.
4+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5+
pub enum DestinationSchemaStateType {
6+
/// A schema change is currently being applied.
7+
///
8+
/// If the system restarts and finds this state, it indicates that a previous
9+
/// schema change was interrupted and manual intervention may be required.
10+
/// The previous valid snapshot_id can be derived from table_schemas table.
11+
Applying,
12+
/// The schema has been successfully applied.
13+
Applied,
14+
}
15+
16+
/// Represents the state of the schema at a destination.
17+
///
18+
/// Used to track which schema version is currently applied at a destination
19+
/// and to detect interrupted schema changes that require recovery.
20+
///
21+
/// This structure tracks both the snapshot_id and the replication mask, which
22+
/// is needed to correctly reconstruct the [`ReplicatedTableSchema`] for diffing
23+
/// when schema changes occur.
24+
#[derive(Debug, Clone, PartialEq, Eq)]
25+
pub struct DestinationSchemaState {
26+
/// The current state of the schema change operation.
27+
pub state: DestinationSchemaStateType,
28+
/// The current snapshot_id at the destination.
29+
pub snapshot_id: SnapshotId,
30+
/// The replication mask indicating which columns are replicated.
31+
///
32+
/// This is stored alongside the snapshot_id so that when a schema change
33+
/// occurs, we can reconstruct the old [`ReplicatedTableSchema`] with the
34+
/// correct mask for accurate diffing.
35+
pub replication_mask: ReplicationMask,
36+
}
37+
38+
impl DestinationSchemaState {
39+
/// Creates a new state indicating a schema change is being applied.
40+
pub fn applying(snapshot_id: SnapshotId, replication_mask: ReplicationMask) -> Self {
41+
Self {
42+
state: DestinationSchemaStateType::Applying,
43+
snapshot_id,
44+
replication_mask,
45+
}
46+
}
47+
48+
/// Creates a new state indicating a schema has been successfully applied.
49+
pub fn applied(snapshot_id: SnapshotId, replication_mask: ReplicationMask) -> Self {
50+
Self {
51+
state: DestinationSchemaStateType::Applied,
52+
snapshot_id,
53+
replication_mask,
54+
}
55+
}
56+
57+
/// Returns true if the state indicates a schema change is in progress.
58+
pub fn is_applying(&self) -> bool {
59+
self.state == DestinationSchemaStateType::Applying
60+
}
61+
62+
/// Returns true if the state indicates the schema has been applied.
63+
pub fn is_applied(&self) -> bool {
64+
self.state == DestinationSchemaStateType::Applied
65+
}
66+
67+
/// Transitions this state to applied, keeping the same snapshot_id and mask.
68+
pub fn to_applied(self) -> Self {
69+
Self {
70+
state: DestinationSchemaStateType::Applied,
71+
..self
72+
}
73+
}
74+
}

etl/src/store/state/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
mod base;
2+
mod destination_state;
23

34
pub use base::*;
5+
pub use destination_state::*;

0 commit comments

Comments
 (0)