Skip to content

Commit 7c68756

Browse files
committed
get table lvt
1 parent 793b082 commit 7c68756

File tree

9 files changed

+115
-17
lines changed

9 files changed

+115
-17
lines changed

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,7 @@ impl SchemaApiTestSuite {
14531453

14541454
info!("--- test lvt");
14551455
{
1456+
let time_zero = DateTime::<Utc>::from_timestamp(0, 0).unwrap();
14561457
let time_small = DateTime::<Utc>::from_timestamp(102, 0).unwrap();
14571458
let time_big = DateTime::<Utc>::from_timestamp(1024, 0).unwrap();
14581459
let time_bigger = DateTime::<Utc>::from_timestamp(1025, 0).unwrap();
@@ -1463,25 +1464,25 @@ impl SchemaApiTestSuite {
14631464

14641465
let lvt_name_ident = LeastVisibleTimeIdent::new(&tenant, table_id);
14651466

1466-
let res = mt.get_pb(&lvt_name_ident).await?;
1467-
assert!(res.is_none());
1467+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1468+
assert_eq!(res.time, time_zero);
14681469

14691470
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_big).await?;
14701471
assert_eq!(res.time, time_big);
1471-
let res = mt.get_pb(&lvt_name_ident).await?;
1472-
assert_eq!(res.unwrap().data.time, time_big);
1472+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1473+
assert_eq!(res.time, time_big);
14731474

14741475
// test lvt never fall back
14751476

14761477
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_small).await?;
14771478
assert_eq!(res.time, time_big);
1478-
let res = mt.get_pb(&lvt_name_ident).await?;
1479-
assert_eq!(res.unwrap().data.time, time_big);
1479+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1480+
assert_eq!(res.time, time_big);
14801481

14811482
let res = mt.set_table_lvt(&lvt_name_ident, &lvt_bigger).await?;
14821483
assert_eq!(res.time, time_bigger);
1483-
let res = mt.get_pb(&lvt_name_ident).await?;
1484-
assert_eq!(res.unwrap().data.time, time_bigger);
1484+
let res = mt.get_table_lvt(&lvt_name_ident).await?;
1485+
assert_eq!(res.time, time_bigger);
14851486
}
14861487

14871488
Ok(())

src/meta/api/src/table_api.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,6 +1895,18 @@ where
18951895

18961896
return Ok(transition.unwrap().result.into_value().unwrap_or_default());
18971897
}
1898+
1899+
#[logcall::logcall]
1900+
#[fastrace::trace]
1901+
async fn get_table_lvt(
1902+
&self,
1903+
name_ident: &LeastVisibleTimeIdent,
1904+
) -> Result<LeastVisibleTime, KVAppError> {
1905+
debug!(req :? =(&name_ident); "TableApi: {}", func_name!());
1906+
1907+
let seq_v = self.get_pb(name_ident).await?;
1908+
Ok(seq_v.map(|v| v.data).unwrap_or_default())
1909+
}
18981910
}
18991911

19001912
#[async_trait::async_trait]

src/query/catalog/src/catalog/interface.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,10 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
619619
unimplemented!()
620620
}
621621

622+
async fn get_table_lvt(&self, _name_ident: &LeastVisibleTimeIdent) -> Result<LeastVisibleTime> {
623+
unimplemented!()
624+
}
625+
622626
async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()>;
623627

624628
fn transform_udtf_as_table_function(

src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use databend_storages_common_table_meta::meta::CompactSegmentInfo;
4444
use databend_storages_common_table_meta::meta::TableSnapshot;
4545
use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX;
4646
use log::info;
47+
use log::warn;
4748
use opendal::Entry;
4849
use opendal::ErrorKind;
4950

@@ -612,24 +613,35 @@ async fn process_snapshot_refs(
612613
ctx: &Arc<dyn TableContext>,
613614
) -> Result<RefVacuumInfo> {
614615
let start = std::time::Instant::now();
616+
let table_info = fuse_table.get_table_info();
617+
let mut files_to_gc = match fuse_table.cleanup_orphan_ref_dirs().await {
618+
Ok(paths) => paths,
619+
Err(e) => {
620+
warn!(
621+
"Failed to clean orphan refs for table {}: {}",
622+
table_info.desc, e
623+
);
624+
Vec::new()
625+
}
626+
};
615627
let op = fuse_table.get_operator();
616628
// Refs that expired and should be cleaned up
617629
let mut expired_refs = HashSet::new();
630+
let mut expired_ref_names = Vec::new();
618631
// Ref snapshot paths to be cleaned up
619632
let mut ref_snapshots_to_gc = Vec::new();
620633
let mut ref_gc_roots = Vec::new();
621634
let mut gc_root_meta_ts: Option<DateTime<Utc>> = None;
622635
let mut gc_root_timestamp: Option<DateTime<Utc>> = None;
623-
let mut files_to_gc = Vec::new();
624636

625637
let now = Utc::now();
626638
let (retention_time, num_snapshots_to_keep) =
627639
fuse_table.get_refs_retention_policy(ctx.as_ref(), now)?;
628-
let table_info = fuse_table.get_table_info();
629640
// Process active refs
630641
for (ref_name, snapshot_ref) in table_info.meta.refs.iter() {
631642
if snapshot_ref.expire_at.is_some_and(|v| v < now) {
632-
expired_refs.insert(ref_name);
643+
expired_refs.insert(snapshot_ref.id);
644+
expired_ref_names.push(ref_name);
633645
continue;
634646
}
635647

@@ -701,9 +713,10 @@ async fn process_snapshot_refs(
701713

702714
if !expired_refs.is_empty() {
703715
let start_update = std::time::Instant::now();
704-
files_to_gc = fuse_table
716+
let expired_ref_dirs = fuse_table
705717
.update_table_refs_meta(ctx, &expired_refs)
706718
.await?;
719+
files_to_gc.extend(expired_ref_dirs);
707720
ctx.set_status_info(&format!(
708721
"Updated table meta for table {}, elapsed: {:?}",
709722
table_info.desc,
@@ -717,12 +730,11 @@ async fn process_snapshot_refs(
717730
file_op.remove_file_in_batch(&ref_snapshots_to_gc).await?;
718731
}
719732

720-
let expired_vec = expired_refs.into_iter().collect::<Vec<_>>();
721733
ctx.set_status_info(&format!(
722734
"Processed snapshot refs for table {}, elapsed: {:?}, expire_refs: {}, ref_snapshots_to_gc: {}, ref_gc_root_meta_ts: {:?}",
723735
table_info.desc,
724736
start.elapsed(),
725-
slice_summary(&expired_vec),
737+
slice_summary(&expired_ref_names),
726738
slice_summary(&ref_snapshots_to_gc),
727739
gc_root_meta_ts
728740
));

src/query/service/src/catalogs/default/database_catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,10 @@ impl Catalog for DatabaseCatalog {
921921
self.mutable_catalog.set_table_lvt(name_ident, value).await
922922
}
923923

924+
async fn get_table_lvt(&self, name_ident: &LeastVisibleTimeIdent) -> Result<LeastVisibleTime> {
925+
self.mutable_catalog.get_table_lvt(name_ident).await
926+
}
927+
924928
async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()> {
925929
self.mutable_catalog.rename_dictionary(req).await
926930
}

src/query/service/src/catalogs/default/mutable_catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,10 @@ impl Catalog for MutableCatalog {
971971
Ok(self.ctx.meta.set_table_lvt(name_ident, value).await?)
972972
}
973973

974+
async fn get_table_lvt(&self, name_ident: &LeastVisibleTimeIdent) -> Result<LeastVisibleTime> {
975+
Ok(self.ctx.meta.get_table_lvt(name_ident).await?)
976+
}
977+
974978
#[async_backtrace::framed]
975979
async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()> {
976980
let res = self.ctx.meta.rename_dictionary(req).await?;

src/query/service/src/catalogs/default/session_catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,10 @@ impl Catalog for SessionCatalog {
791791
self.inner.set_table_lvt(name_ident, value).await
792792
}
793793

794+
async fn get_table_lvt(&self, name_ident: &LeastVisibleTimeIdent) -> Result<LeastVisibleTime> {
795+
self.inner.get_table_lvt(name_ident).await
796+
}
797+
794798
async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()> {
795799
self.inner.rename_dictionary(req).await
796800
}

src/query/storages/fuse/src/operations/gc.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,15 @@ impl FuseTable {
895895
counter: &mut PurgeCounter,
896896
dry_run: bool,
897897
) -> Result<HashSet<Location>> {
898+
if !dry_run {
899+
if let Err(e) = self.cleanup_orphan_ref_dirs().await {
900+
warn!(
901+
"Failed to clean orphan refs for table {} before purge: {}",
902+
self.table_info.desc, e
903+
);
904+
}
905+
}
906+
898907
let now = Utc::now();
899908
let table_info = self.get_table_info();
900909
let op = self.get_operator();
@@ -909,7 +918,7 @@ impl FuseTable {
909918
for (ref_name, snapshot_ref) in table_info.meta.refs.iter() {
910919
// Check if ref is expired
911920
if snapshot_ref.expire_at.is_some_and(|v| v < now) {
912-
expired_refs.insert(ref_name);
921+
expired_refs.insert(snapshot_ref.id);
913922
continue;
914923
}
915924

src/query/storages/fuse/src/operations/vacuum.rs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ impl FuseTable {
439439
pub async fn update_table_refs_meta(
440440
&self,
441441
ctx: &Arc<dyn TableContext>,
442-
expired_refs: &HashSet<&String>,
442+
expired_refs: &HashSet<u64>,
443443
) -> Result<Vec<String>> {
444444
let catalog = ctx.get_default_catalog()?;
445445

@@ -453,7 +453,7 @@ impl FuseTable {
453453
let mut new_table_meta = latest_table_info.meta.clone();
454454
new_table_meta
455455
.refs
456-
.retain(|ref_name, _| !expired_refs.contains(ref_name));
456+
.retain(|_, val| !expired_refs.contains(&val.id));
457457
let req = UpdateTableMetaReq {
458458
table_id: latest_table_info.ident.table_id,
459459
seq: MatchSeq::Exact(latest_table_info.ident.seq),
@@ -505,6 +505,54 @@ impl FuseTable {
505505
}
506506
Ok(dir_to_gc)
507507
}
508+
509+
pub async fn cleanup_orphan_ref_dirs(&self) -> Result<Vec<String>> {
510+
let prefix = self
511+
.meta_location_generator()
512+
.ref_snapshot_location_prefix();
513+
let op = self.get_operator();
514+
let table_info = self.get_table_info();
515+
let table_seq = table_info.ident.seq;
516+
let active_ids = table_info
517+
.meta
518+
.refs
519+
.values()
520+
.map(|snapshot_ref| snapshot_ref.id)
521+
.collect::<HashSet<_>>();
522+
let mut removed = Vec::new();
523+
let mut lister = op.lister(prefix).await?;
524+
while let Some(entry) = lister.try_next().await? {
525+
if !entry.metadata().is_dir() {
526+
continue;
527+
}
528+
529+
let dir_path = entry.path();
530+
let id_str = dir_path.trim_end_matches('/').rsplit('/').next().unwrap();
531+
let Ok(ref_id) = id_str.parse::<u64>() else {
532+
continue;
533+
};
534+
if table_seq <= ref_id || active_ids.contains(&ref_id) {
535+
continue;
536+
}
537+
538+
match op.remove_all(dir_path).await {
539+
Ok(_) => {
540+
info!(
541+
"Removed orphan ref directory '{}' for table {}",
542+
dir_path, table_info.desc
543+
);
544+
removed.push(dir_path.to_string());
545+
}
546+
Err(e) => {
547+
warn!(
548+
"Failed to remove orphan ref directory '{}' for table {}: {}",
549+
dir_path, table_info.desc, e
550+
);
551+
}
552+
}
553+
}
554+
Ok(removed)
555+
}
508556
}
509557

510558
pub async fn vacuum_tables_from_info(

0 commit comments

Comments
 (0)