Skip to content

Commit 156e5dc

Browse files
committed
table lvt
1 parent 793b082 commit 156e5dc

File tree

19 files changed

+299
-30
lines changed

19 files changed

+299
-30
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2323]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2327]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),
@@ -551,6 +551,8 @@ build_exceptions! {
551551
RowAccessPolicyAlreadyExists(2324),
552552
/// General failures met while garbage collecting database meta
553553
GeneralDbGcFailure(2325),
554+
/// Table snapshot is expired
555+
TableSnapshotExpired(2327),
554556
}
555557

556558
// Stage and Connection Errors [2501-2505, 2510-2512]

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2642,6 +2642,7 @@ impl SchemaApiTestSuite {
26422642
mt: &MT,
26432643
) -> anyhow::Result<()> {
26442644
let tenant_name = "tenant1";
2645+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
26452646
let db_name = "db1";
26462647
let tbl_name = "tb2";
26472648

@@ -2713,10 +2714,12 @@ impl SchemaApiTestSuite {
27132714
let table_id = table.ident.table_id;
27142715
let table_version = table.ident.seq;
27152716
let req = UpdateTableMetaReq {
2717+
tenant: tenant.clone(),
27162718
table_id,
27172719
seq: MatchSeq::Exact(table_version),
27182720
new_table_meta: new_table_meta.clone(),
27192721
base_snapshot_location: None,
2722+
snapshot_ts: None,
27202723
};
27212724

27222725
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2738,10 +2741,12 @@ impl SchemaApiTestSuite {
27382741
let table_id = table.ident.table_id;
27392742
let table_version = table.ident.seq;
27402743
let req = UpdateTableMetaReq {
2744+
tenant: tenant.clone(),
27412745
table_id,
27422746
seq: MatchSeq::Exact(table_version + 1),
27432747
new_table_meta: new_table_meta.clone(),
27442748
base_snapshot_location: None,
2749+
snapshot_ts: None,
27452750
};
27462751
let res = mt
27472752
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2764,10 +2769,12 @@ impl SchemaApiTestSuite {
27642769
let table_id = table.ident.table_id;
27652770
let table_version = table.ident.seq;
27662771
let req = UpdateTableMetaReq {
2772+
tenant: tenant.clone(),
27672773
table_id,
27682774
seq: MatchSeq::Exact(table_version),
27692775
new_table_meta: new_table_meta.clone(),
27702776
base_snapshot_location: None,
2777+
snapshot_ts: None,
27712778
};
27722779
let res = mt
27732780
.update_multi_table_meta_with_sender(
@@ -2845,10 +2852,12 @@ impl SchemaApiTestSuite {
28452852
};
28462853

28472854
let req = UpdateTableMetaReq {
2855+
tenant: tenant.clone(),
28482856
table_id,
28492857
seq: MatchSeq::Exact(table_version),
28502858
new_table_meta: new_table_meta.clone(),
28512859
base_snapshot_location: None,
2860+
snapshot_ts: None,
28522861
};
28532862
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
28542863
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2895,10 +2904,12 @@ impl SchemaApiTestSuite {
28952904
insert_if_not_exists: true,
28962905
};
28972906
let req = UpdateTableMetaReq {
2907+
tenant: tenant.clone(),
28982908
table_id,
28992909
seq: MatchSeq::Exact(table_version),
29002910
new_table_meta: new_table_meta.clone(),
29012911
base_snapshot_location: None,
2912+
snapshot_ts: None,
29022913
};
29032914
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
29042915
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2945,10 +2956,12 @@ impl SchemaApiTestSuite {
29452956
insert_if_not_exists: true,
29462957
};
29472958
let req = UpdateTableMetaReq {
2959+
tenant: tenant.clone(),
29482960
table_id,
29492961
seq: MatchSeq::Exact(table_version),
29502962
new_table_meta: new_table_meta.clone(),
29512963
base_snapshot_location: None,
2964+
snapshot_ts: None,
29522965
};
29532966
let result = mt
29542967
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2961,6 +2974,63 @@ impl SchemaApiTestSuite {
29612974
let err = ErrorCode::from(err);
29622975
assert_eq!(ErrorCode::DUPLICATED_UPSERT_FILES, err.code());
29632976
}
2977+
2978+
info!("--- update table meta, snapshot_ts must respect LVT");
2979+
{
2980+
let table = util.get_table().await.unwrap();
2981+
let table_id = table.ident.table_id;
2982+
let lvt_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
2983+
let lvt_time = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();
2984+
mt.set_table_lvt(&lvt_ident, &LeastVisibleTime::new(lvt_time))
2985+
.await?;
2986+
2987+
// Snapshot older than LVT should be rejected.
2988+
let mut new_table_meta = table.meta.clone();
2989+
new_table_meta.comment = "lvt guard should fail".to_string();
2990+
let bad_snapshot_ts = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
2991+
let req = UpdateTableMetaReq {
2992+
tenant: tenant.clone(),
2993+
table_id,
2994+
seq: MatchSeq::Exact(table.ident.seq),
2995+
new_table_meta: new_table_meta.clone(),
2996+
base_snapshot_location: None,
2997+
snapshot_ts: Some(bad_snapshot_ts),
2998+
};
2999+
let err = mt
3000+
.update_multi_table_meta(UpdateMultiTableMetaReq {
3001+
update_table_metas: vec![(req, table.as_ref().clone())],
3002+
..Default::default()
3003+
})
3004+
.await
3005+
.unwrap_err();
3006+
assert_eq!(
3007+
ErrorCode::TABLE_SNAPSHOT_EXPIRED,
3008+
ErrorCode::from(err).code()
3009+
);
3010+
3011+
// Snapshot newer than LVT should succeed.
3012+
let table = util.get_table().await.unwrap();
3013+
let mut ok_table_meta = table.meta.clone();
3014+
ok_table_meta.comment = "lvt guard success".to_string();
3015+
let ok_snapshot_ts = DateTime::<Utc>::from_timestamp(2_001, 0).unwrap();
3016+
let req = UpdateTableMetaReq {
3017+
tenant,
3018+
table_id,
3019+
seq: MatchSeq::Exact(table.ident.seq),
3020+
new_table_meta: ok_table_meta.clone(),
3021+
base_snapshot_location: None,
3022+
snapshot_ts: Some(ok_snapshot_ts),
3023+
};
3024+
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
3025+
update_table_metas: vec![(req, table.as_ref().clone())],
3026+
..Default::default()
3027+
})
3028+
.await?
3029+
.unwrap();
3030+
3031+
let updated = util.get_table().await.unwrap();
3032+
assert_eq!(updated.meta.comment, "lvt guard success");
3033+
}
29643034
}
29653035
Ok(())
29663036
}
@@ -4296,10 +4366,12 @@ impl SchemaApiTestSuite {
42964366
};
42974367

42984368
let req = UpdateTableMetaReq {
4369+
tenant: tenant.clone(),
42994370
table_id,
43004371
seq: MatchSeq::Any,
43014372
new_table_meta: table_meta.clone(),
43024373
base_snapshot_location: None,
4374+
snapshot_ts: None,
43034375
};
43044376

43054377
let table = mt
@@ -4436,10 +4508,12 @@ impl SchemaApiTestSuite {
44364508
};
44374509

44384510
let req = UpdateTableMetaReq {
4511+
tenant: tenant.clone(),
44394512
table_id,
44404513
seq: MatchSeq::Any,
44414514
new_table_meta: create_table_meta.clone(),
44424515
base_snapshot_location: None,
4516+
snapshot_ts: None,
44434517
};
44444518

44454519
let table = mt
@@ -6178,6 +6252,7 @@ impl SchemaApiTestSuite {
61786252
mt: &MT,
61796253
) -> anyhow::Result<()> {
61806254
let tenant_name = "tenant1";
6255+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
61816256

61826257
let db_name = "db1";
61836258
let tbl_name = "tb2";
@@ -6233,10 +6308,12 @@ impl SchemaApiTestSuite {
62336308
};
62346309

62356310
let req = UpdateTableMetaReq {
6311+
tenant: tenant.clone(),
62366312
table_id,
62376313
seq: MatchSeq::Any,
62386314
new_table_meta: table_meta(created_on),
62396315
base_snapshot_location: None,
6316+
snapshot_ts: None,
62406317
};
62416318

62426319
let table = mt
@@ -6284,10 +6361,12 @@ impl SchemaApiTestSuite {
62846361
};
62856362

62866363
let req = UpdateTableMetaReq {
6364+
tenant,
62876365
table_id,
62886366
seq: MatchSeq::Any,
62896367
new_table_meta: table_meta(created_on),
62906368
base_snapshot_location: None,
6369+
snapshot_ts: None,
62916370
};
62926371

62936372
let table = mt
@@ -7740,6 +7819,7 @@ impl SchemaApiTestSuite {
77407819
mt: &MT,
77417820
) -> anyhow::Result<()> {
77427821
let tenant_name = "tenant1";
7822+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
77437823

77447824
let db_name = "db1";
77457825
let tbl_name = "tb2";
@@ -7797,10 +7877,12 @@ impl SchemaApiTestSuite {
77977877
};
77987878

77997879
let req = UpdateTableMetaReq {
7880+
tenant: tenant.clone(),
78007881
table_id,
78017882
seq: MatchSeq::Any,
78027883
new_table_meta: table_meta(created_on),
78037884
base_snapshot_location: None,
7885+
snapshot_ts: None,
78047886
};
78057887

78067888
let table = mt
@@ -7856,10 +7938,12 @@ impl SchemaApiTestSuite {
78567938
};
78577939

78587940
let req = UpdateTableMetaReq {
7941+
tenant: tenant.clone(),
78597942
table_id,
78607943
seq: MatchSeq::Any,
78617944
new_table_meta: table_meta(created_on),
78627945
base_snapshot_location: None,
7946+
snapshot_ts: None,
78637947
};
78647948

78657949
let table = mt
@@ -7912,10 +7996,12 @@ impl SchemaApiTestSuite {
79127996
};
79137997

79147998
let req = UpdateTableMetaReq {
7999+
tenant,
79158000
table_id,
79168001
seq: MatchSeq::Any,
79178002
new_table_meta: table_meta(created_on),
79188003
base_snapshot_location: None,
8004+
snapshot_ts: None,
79198005
};
79208006

79218007
let table = mt
@@ -8348,6 +8434,9 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83488434
&self,
83498435
n: usize,
83508436
) -> anyhow::Result<BTreeMap<String, TableCopiedFileInfo>> {
8437+
let tenant_name = "tenant1";
8438+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
8439+
83518440
let mut file_infos = BTreeMap::new();
83528441

83538442
for i in 0..n {
@@ -8366,10 +8455,12 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83668455
};
83678456

83688457
let req = UpdateTableMetaReq {
8458+
tenant,
83698459
table_id: self.table_id,
83708460
seq: MatchSeq::Any,
83718461
new_table_meta: self.table_meta(),
83728462
base_snapshot_location: None,
8463+
snapshot_ts: None,
83738464
};
83748465

83758466
let req = UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
3131
use databend_common_meta_app::app_error::StreamAlreadyExists;
3232
use databend_common_meta_app::app_error::StreamVersionMismatched;
3333
use databend_common_meta_app::app_error::TableAlreadyExists;
34+
use databend_common_meta_app::app_error::TableSnapshotExpired;
3435
use databend_common_meta_app::app_error::TableVersionMismatched;
3536
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
3637
use databend_common_meta_app::app_error::UndropTableWithNoDropTime;
@@ -1188,29 +1189,56 @@ where
11881189
let mut tbl_seqs = HashMap::new();
11891190
let mut txn = TxnRequest::default();
11901191
let mut mismatched_tbs = vec![];
1191-
let tid_vec = update_table_metas
1192+
let (tid_vec, lvt_vec): (Vec<_>, Vec<_>) = update_table_metas
11921193
.iter()
1193-
.map(|req| {
1194-
TableId {
1195-
table_id: req.0.table_id,
1194+
.map(|(req, _)| {
1195+
let tid = TableId {
1196+
table_id: req.table_id,
11961197
}
1197-
.to_string_key()
1198+
.to_string_key();
1199+
let lvt = LeastVisibleTimeIdent::new(&req.tenant, req.table_id);
1200+
(tid, lvt)
11981201
})
1199-
.collect::<Vec<_>>();
1202+
.unzip();
12001203
let mut tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(self, &tid_vec).await?;
1201-
for (req, (tb_meta_seq, table_meta)) in
1202-
update_table_metas.iter().zip(tb_meta_vec.iter_mut())
1204+
let need_lvt_check = update_table_metas
1205+
.iter()
1206+
.any(|(req, _)| req.snapshot_ts.is_some());
1207+
let tb_lvt_vec = if need_lvt_check {
1208+
self.get_pb_values_vec(lvt_vec).await?
1209+
} else {
1210+
vec![None; lvt_vec.len()]
1211+
};
1212+
1213+
for (((req, _), (tb_meta_seq, table_meta)), lvt) in update_table_metas
1214+
.iter()
1215+
.zip(tb_meta_vec.iter_mut())
1216+
.zip(tb_lvt_vec.iter())
12031217
{
1204-
let req_seq = req.0.seq;
1218+
let req_seq = req.seq;
12051219

12061220
if *tb_meta_seq == 0 || table_meta.is_none() {
12071221
return Err(KVAppError::AppError(AppError::UnknownTableId(
1208-
UnknownTableId::new(req.0.table_id, "update_multi_table_meta"),
1222+
UnknownTableId::new(req.table_id, "update_multi_table_meta"),
12091223
)));
12101224
}
1225+
if let Some(lvt) = lvt {
1226+
if let Some(ts) = req.snapshot_ts {
1227+
// req.snapshot_ts records the timestamp of the snapshot that will become
1228+
// visible after commit. Vacuum may be purging snapshots at the same time,
1229+
// so rejecting snapshots older than LVT prevents them from being cleaned
1230+
// up immediately after commit.
1231+
let lvt = lvt.data.time;
1232+
if lvt > ts {
1233+
return Err(KVAppError::AppError(AppError::TableSnapshotExpired(
1234+
TableSnapshotExpired::new(req.table_id, ts, lvt),
1235+
)));
1236+
}
1237+
}
1238+
}
12111239
if req_seq.match_seq(tb_meta_seq).is_err() {
12121240
mismatched_tbs.push((
1213-
req.0.table_id,
1241+
req.table_id,
12141242
*tb_meta_seq,
12151243
std::mem::take(table_meta).unwrap(),
12161244
));

0 commit comments

Comments
 (0)