Skip to content

Commit 3a8cef0

Browse files
committed
table lvt
1 parent 793b082 commit 3a8cef0

File tree

19 files changed

+208
-21
lines changed

19 files changed

+208
-21
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ build_exceptions! {
511511
IllegalUser(2218),
512512
}
513513

514-
// Database and Catalog Management Errors [2301-2317, 2321-2323]
514+
// Database and Catalog Management Errors [2301-2317, 2321-2327]
515515
build_exceptions! {
516516
/// Database already exists
517517
DatabaseAlreadyExists(2301),
@@ -551,6 +551,8 @@ build_exceptions! {
551551
RowAccessPolicyAlreadyExists(2324),
552552
/// General failures met while garbage collecting database meta
553553
GeneralDbGcFailure(2325),
554+
/// Table snapshot is expired
555+
TableSnapshotExpired(2327),
554556
}
555557

556558
// Stage and Connection Errors [2501-2505, 2510-2512]

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2642,6 +2642,7 @@ impl SchemaApiTestSuite {
26422642
mt: &MT,
26432643
) -> anyhow::Result<()> {
26442644
let tenant_name = "tenant1";
2645+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
26452646
let db_name = "db1";
26462647
let tbl_name = "tb2";
26472648

@@ -2713,10 +2714,12 @@ impl SchemaApiTestSuite {
27132714
let table_id = table.ident.table_id;
27142715
let table_version = table.ident.seq;
27152716
let req = UpdateTableMetaReq {
2717+
tenant: tenant.clone(),
27162718
table_id,
27172719
seq: MatchSeq::Exact(table_version),
27182720
new_table_meta: new_table_meta.clone(),
27192721
base_snapshot_location: None,
2722+
snapshot_ts: None,
27202723
};
27212724

27222725
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2738,10 +2741,12 @@ impl SchemaApiTestSuite {
27382741
let table_id = table.ident.table_id;
27392742
let table_version = table.ident.seq;
27402743
let req = UpdateTableMetaReq {
2744+
tenant: tenant.clone(),
27412745
table_id,
27422746
seq: MatchSeq::Exact(table_version + 1),
27432747
new_table_meta: new_table_meta.clone(),
27442748
base_snapshot_location: None,
2749+
snapshot_ts: None,
27452750
};
27462751
let res = mt
27472752
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -2764,10 +2769,12 @@ impl SchemaApiTestSuite {
27642769
let table_id = table.ident.table_id;
27652770
let table_version = table.ident.seq;
27662771
let req = UpdateTableMetaReq {
2772+
tenant: tenant.clone(),
27672773
table_id,
27682774
seq: MatchSeq::Exact(table_version),
27692775
new_table_meta: new_table_meta.clone(),
27702776
base_snapshot_location: None,
2777+
snapshot_ts: None,
27712778
};
27722779
let res = mt
27732780
.update_multi_table_meta_with_sender(
@@ -2845,10 +2852,12 @@ impl SchemaApiTestSuite {
28452852
};
28462853

28472854
let req = UpdateTableMetaReq {
2855+
tenant: tenant.clone(),
28482856
table_id,
28492857
seq: MatchSeq::Exact(table_version),
28502858
new_table_meta: new_table_meta.clone(),
28512859
base_snapshot_location: None,
2860+
snapshot_ts: None,
28522861
};
28532862
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
28542863
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2895,10 +2904,12 @@ impl SchemaApiTestSuite {
28952904
insert_if_not_exists: true,
28962905
};
28972906
let req = UpdateTableMetaReq {
2907+
tenant: tenant.clone(),
28982908
table_id,
28992909
seq: MatchSeq::Exact(table_version),
29002910
new_table_meta: new_table_meta.clone(),
29012911
base_snapshot_location: None,
2912+
snapshot_ts: None,
29022913
};
29032914
mt.update_multi_table_meta(UpdateMultiTableMetaReq {
29042915
update_table_metas: vec![(req, table.as_ref().clone())],
@@ -2945,10 +2956,12 @@ impl SchemaApiTestSuite {
29452956
insert_if_not_exists: true,
29462957
};
29472958
let req = UpdateTableMetaReq {
2959+
tenant,
29482960
table_id,
29492961
seq: MatchSeq::Exact(table_version),
29502962
new_table_meta: new_table_meta.clone(),
29512963
base_snapshot_location: None,
2964+
snapshot_ts: None,
29522965
};
29532966
let result = mt
29542967
.update_multi_table_meta(UpdateMultiTableMetaReq {
@@ -4296,10 +4309,12 @@ impl SchemaApiTestSuite {
42964309
};
42974310

42984311
let req = UpdateTableMetaReq {
4312+
tenant: tenant.clone(),
42994313
table_id,
43004314
seq: MatchSeq::Any,
43014315
new_table_meta: table_meta.clone(),
43024316
base_snapshot_location: None,
4317+
snapshot_ts: None,
43034318
};
43044319

43054320
let table = mt
@@ -4436,10 +4451,12 @@ impl SchemaApiTestSuite {
44364451
};
44374452

44384453
let req = UpdateTableMetaReq {
4454+
tenant: tenant.clone(),
44394455
table_id,
44404456
seq: MatchSeq::Any,
44414457
new_table_meta: create_table_meta.clone(),
44424458
base_snapshot_location: None,
4459+
snapshot_ts: None,
44434460
};
44444461

44454462
let table = mt
@@ -6178,6 +6195,7 @@ impl SchemaApiTestSuite {
61786195
mt: &MT,
61796196
) -> anyhow::Result<()> {
61806197
let tenant_name = "tenant1";
6198+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
61816199

61826200
let db_name = "db1";
61836201
let tbl_name = "tb2";
@@ -6233,10 +6251,12 @@ impl SchemaApiTestSuite {
62336251
};
62346252

62356253
let req = UpdateTableMetaReq {
6254+
tenant: tenant.clone(),
62366255
table_id,
62376256
seq: MatchSeq::Any,
62386257
new_table_meta: table_meta(created_on),
62396258
base_snapshot_location: None,
6259+
snapshot_ts: None,
62406260
};
62416261

62426262
let table = mt
@@ -6284,10 +6304,12 @@ impl SchemaApiTestSuite {
62846304
};
62856305

62866306
let req = UpdateTableMetaReq {
6307+
tenant,
62876308
table_id,
62886309
seq: MatchSeq::Any,
62896310
new_table_meta: table_meta(created_on),
62906311
base_snapshot_location: None,
6312+
snapshot_ts: None,
62916313
};
62926314

62936315
let table = mt
@@ -7740,6 +7762,7 @@ impl SchemaApiTestSuite {
77407762
mt: &MT,
77417763
) -> anyhow::Result<()> {
77427764
let tenant_name = "tenant1";
7765+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
77437766

77447767
let db_name = "db1";
77457768
let tbl_name = "tb2";
@@ -7797,10 +7820,12 @@ impl SchemaApiTestSuite {
77977820
};
77987821

77997822
let req = UpdateTableMetaReq {
7823+
tenant: tenant.clone(),
78007824
table_id,
78017825
seq: MatchSeq::Any,
78027826
new_table_meta: table_meta(created_on),
78037827
base_snapshot_location: None,
7828+
snapshot_ts: None,
78047829
};
78057830

78067831
let table = mt
@@ -7856,10 +7881,12 @@ impl SchemaApiTestSuite {
78567881
};
78577882

78587883
let req = UpdateTableMetaReq {
7884+
tenant: tenant.clone(),
78597885
table_id,
78607886
seq: MatchSeq::Any,
78617887
new_table_meta: table_meta(created_on),
78627888
base_snapshot_location: None,
7889+
snapshot_ts: None,
78637890
};
78647891

78657892
let table = mt
@@ -7912,10 +7939,12 @@ impl SchemaApiTestSuite {
79127939
};
79137940

79147941
let req = UpdateTableMetaReq {
7942+
tenant,
79157943
table_id,
79167944
seq: MatchSeq::Any,
79177945
new_table_meta: table_meta(created_on),
79187946
base_snapshot_location: None,
7947+
snapshot_ts: None,
79197948
};
79207949

79217950
let table = mt
@@ -8348,6 +8377,9 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83488377
&self,
83498378
n: usize,
83508379
) -> anyhow::Result<BTreeMap<String, TableCopiedFileInfo>> {
8380+
let tenant_name = "tenant1";
8381+
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;
8382+
83518383
let mut file_infos = BTreeMap::new();
83528384

83538385
for i in 0..n {
@@ -8366,10 +8398,12 @@ where MT: SchemaApi + kvapi::KVApi<Error = MetaError>
83668398
};
83678399

83688400
let req = UpdateTableMetaReq {
8401+
tenant,
83698402
table_id: self.table_id,
83708403
seq: MatchSeq::Any,
83718404
new_table_meta: self.table_meta(),
83728405
base_snapshot_location: None,
8406+
snapshot_ts: None,
83738407
};
83748408

83758409
let req = UpdateMultiTableMetaReq {

src/meta/api/src/table_api.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_meta_app::app_error::MultiStmtTxnCommitFailed;
3131
use databend_common_meta_app::app_error::StreamAlreadyExists;
3232
use databend_common_meta_app::app_error::StreamVersionMismatched;
3333
use databend_common_meta_app::app_error::TableAlreadyExists;
34+
use databend_common_meta_app::app_error::TableSnapshotExpired;
3435
use databend_common_meta_app::app_error::TableVersionMismatched;
3536
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
3637
use databend_common_meta_app::app_error::UndropTableWithNoDropTime;
@@ -1188,29 +1189,45 @@ where
11881189
let mut tbl_seqs = HashMap::new();
11891190
let mut txn = TxnRequest::default();
11901191
let mut mismatched_tbs = vec![];
1191-
let tid_vec = update_table_metas
1192+
let (tid_vec, lvt_vec): (Vec<_>, Vec<_>) = update_table_metas
11921193
.iter()
1193-
.map(|req| {
1194-
TableId {
1195-
table_id: req.0.table_id,
1194+
.map(|(req, _)| {
1195+
let tid = TableId {
1196+
table_id: req.table_id,
11961197
}
1197-
.to_string_key()
1198+
.to_string_key();
1199+
let lvt = LeastVisibleTimeIdent::new(&req.tenant, req.table_id);
1200+
(tid, lvt)
11981201
})
1199-
.collect::<Vec<_>>();
1202+
.unzip();
12001203
let mut tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(self, &tid_vec).await?;
1201-
for (req, (tb_meta_seq, table_meta)) in
1202-
update_table_metas.iter().zip(tb_meta_vec.iter_mut())
1204+
let tb_lvt_vec = self.get_pb_values_vec(lvt_vec).await?;
1205+
1206+
for (((req, _), (tb_meta_seq, table_meta)), lvt) in update_table_metas
1207+
.iter()
1208+
.zip(tb_meta_vec.iter_mut())
1209+
.zip(tb_lvt_vec.iter())
12031210
{
1204-
let req_seq = req.0.seq;
1211+
let req_seq = req.seq;
12051212

12061213
if *tb_meta_seq == 0 || table_meta.is_none() {
12071214
return Err(KVAppError::AppError(AppError::UnknownTableId(
1208-
UnknownTableId::new(req.0.table_id, "update_multi_table_meta"),
1215+
UnknownTableId::new(req.table_id, "update_multi_table_meta"),
12091216
)));
12101217
}
1218+
if let Some(lvt) = lvt {
1219+
if let Some(ts) = req.snapshot_ts {
1220+
let lvt = lvt.data.time;
1221+
if lvt > ts {
1222+
return Err(KVAppError::AppError(AppError::TableSnapshotExpired(
1223+
TableSnapshotExpired::new(req.table_id, ts, lvt),
1224+
)));
1225+
}
1226+
}
1227+
}
12111228
if req_seq.match_seq(tb_meta_seq).is_err() {
12121229
mismatched_tbs.push((
1213-
req.0.table_id,
1230+
req.table_id,
12141231
*tb_meta_seq,
12151232
std::mem::take(table_meta).unwrap(),
12161233
));

src/meta/app/src/app_error.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,24 @@ impl TableLockExpired {
863863
}
864864
}
865865

866+
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
867+
#[error("Snapshot timestamp {snapshot_ts} for table {table_id} is older than the table's least visible time {lvt}")]
868+
pub struct TableSnapshotExpired {
869+
table_id: u64,
870+
snapshot_ts: DateTime<Utc>,
871+
lvt: DateTime<Utc>,
872+
}
873+
874+
impl TableSnapshotExpired {
875+
pub fn new(table_id: u64, snapshot_ts: DateTime<Utc>, lvt: DateTime<Utc>) -> Self {
876+
Self {
877+
table_id,
878+
snapshot_ts,
879+
lvt,
880+
}
881+
}
882+
}
883+
866884
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
867885
#[error(
868886
"CannotShareDatabaseCreatedFromShare: cannot share database {database_name} which created from share while {context}"
@@ -1125,6 +1143,9 @@ pub enum AppError {
11251143
#[error(transparent)]
11261144
TableLockExpired(#[from] TableLockExpired),
11271145

1146+
#[error(transparent)]
1147+
TableSnapshotExpired(#[from] TableSnapshotExpired),
1148+
11281149
#[error(transparent)]
11291150
CannotShareDatabaseCreatedFromShare(#[from] CannotShareDatabaseCreatedFromShare),
11301151

@@ -1472,6 +1493,15 @@ impl AppErrorMessage for TableLockExpired {
14721493
}
14731494
}
14741495

1496+
impl AppErrorMessage for TableSnapshotExpired {
1497+
fn message(&self) -> String {
1498+
format!(
1499+
"Snapshot timestamp {} for table {} is older than the table's least visible time {}",
1500+
self.snapshot_ts, self.table_id, self.lvt,
1501+
)
1502+
}
1503+
}
1504+
14751505
impl AppErrorMessage for CannotShareDatabaseCreatedFromShare {
14761506
fn message(&self) -> String {
14771507
format!(
@@ -1688,6 +1718,7 @@ impl From<AppError> for ErrorCode {
16881718
ErrorCode::UnknownShareEndpointId(err.message())
16891719
}
16901720
AppError::TableLockExpired(err) => ErrorCode::TableLockExpired(err.message()),
1721+
AppError::TableSnapshotExpired(err) => ErrorCode::TableSnapshotExpired(err.message()),
16911722
AppError::CannotShareDatabaseCreatedFromShare(err) => {
16921723
ErrorCode::CannotShareDatabaseCreatedFromShare(err.message())
16931724
}

src/meta/app/src/schema/table/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,10 +811,12 @@ pub struct UpdateStreamMetaReq {
811811

812812
#[derive(Clone, Debug, PartialEq, Eq)]
813813
pub struct UpdateTableMetaReq {
814+
pub tenant: Tenant,
814815
pub table_id: u64,
815816
pub seq: MatchSeq,
816817
pub new_table_meta: TableMeta,
817818
pub base_snapshot_location: Option<String>,
819+
pub snapshot_ts: Option<DateTime<Utc>>,
818820
}
819821

820822
#[derive(Clone, Debug, PartialEq, Eq)]

0 commit comments

Comments
 (0)