diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index bf24ae32e..a3e71aecb 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -80,6 +80,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/pending_update.cc
+ update/snapshot_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_sort_order.cc
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 30ad14c1b..c9f28d406 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{
virtual ~Table();
- /// \brief Return the identifier of this table
+ /// \brief Returns the identifier of this table
const TableIdentifier& name() const { return identifier_; }
/// \brief Returns the UUID of the table
@@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this {
/// \brief Return the schema for this table, return NotFoundError if not found
Result> schema() const;
- /// \brief Return a map of schema for this table
+ /// \brief Returns a map of schema for this table
Result<
std::reference_wrapper>>>
schemas() const;
- /// \brief Return the partition spec for this table, return NotFoundError if not found
+ /// \brief Returns the partition spec for this table, return NotFoundError if not found
Result> spec() const;
- /// \brief Return a map of partition specs for this table
+ /// \brief Returns a map of partition specs for this table
Result>>>
specs() const;
- /// \brief Return the sort order for this table, return NotFoundError if not found
+ /// \brief Returns the sort order for this table, return NotFoundError if not found
Result> sort_order() const;
- /// \brief Return a map of sort order IDs to sort orders for this table
+ /// \brief Returns a map of sort order IDs to sort orders for this table
Result>>>
sort_orders() const;
- /// \brief Return a map of string properties for this table
+ /// \brief Returns the properties of this table
const TableProperties& properties() const;
- /// \brief Return the table's metadata file location
+ /// \brief Returns the table's metadata file location
std::string_view metadata_file_location() const;
- /// \brief Return the table's base location
+ /// \brief Returns the table's base location
std::string_view location() const;
/// \brief Returns the time when this table was last updated
TimePointMs last_updated_ms() const;
- /// \brief Return the table's current snapshot, return NotFoundError if not found
+ /// \brief Returns the table's current snapshot, return NotFoundError if not found
Result> current_snapshot() const;
/// \brief Get the snapshot of this table with the given id
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 851048b30..386c265ba 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -591,6 +591,8 @@ class TableMetadataBuilder::Impl {
Status RemoveSchemas(const std::unordered_set& schema_ids);
Result AddSchema(const Schema& schema, int32_t new_last_column_id);
void SetLocation(std::string_view location);
+ Status AddSnapshot(const Snapshot& snapshot);
+ Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
Result> Build();
@@ -1207,12 +1209,14 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
std::shared_ptr snapshot) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(*snapshot));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id,
const std::string& branch) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, branch));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h
index feb4a2001..e9599bcdd 100644
--- a/src/iceberg/table_properties.h
+++ b/src/iceberg/table_properties.h
@@ -244,6 +244,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase {
inline static Entry kDeleteTargetFileSizeBytes{
"write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024}; // 64 MB
+ inline static Entry kSnapshotIdInheritanceEnabled{
+ "compatibility.snapshot-id-inheritance.enabled", false};
+
// Garbage collection properties
inline static Entry kGcEnabled{"gc.enabled", true};
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 91578977c..f1f0957f6 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -134,7 +134,7 @@ void SetDefaultSortOrder::GenerateRequirements(TableUpdateContext& context) cons
// AddSnapshot
void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.AddSnapshot(snapshot_);
}
void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index c8446e8bb..4ee593827 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -105,6 +105,11 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AddPartitionSpec(std::move(result.spec));
}
} break;
+ case PendingUpdate::Kind::kUpdateSnapshot: {
+ auto& update_snapshot = internal::checked_cast(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
+ metadata_builder_->AddSnapshot(std::move(result.snapshot));
+ } break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast(update.kind()));
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 87a2139b1..6559783ed 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -77,7 +77,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this
+#include
+#include
+
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/manifest/rolling_manifest_writer.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr transaction)
+ : PendingUpdate(std::move(transaction)) {
+ target_manifest_size_bytes_ =
+ transaction_->current().properties.Get(TableProperties::kManifestTargetSizeBytes);
+
+ // For format version 1, check if snapshot ID inheritance is enabled
+ if (transaction_->current().format_version == 1) {
+ can_inherit_snapshot_id_ = transaction_->current().properties.Get(
+ TableProperties::kSnapshotIdInheritanceEnabled);
+ }
+
+ // Generate commit UUID
+ commit_uuid_ = Uuid::GenerateV7().ToString();
+
+ // Initialize delete function if not set
+ if (!delete_func_) {
+ delete_func_ = [this](const std::string& path) {
+ return transaction_->table()->io()->DeleteFile(path);
+ };
+ }
+}
+
+Result> SnapshotUpdate::WriteDataManifests(
+ const std::vector& data_files, const PartitionSpec& spec) {
+ if (data_files.empty()) {
+ return std::vector{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
+ transaction_->table()->metadata()->Schema());
+
+ std::shared_ptr spec_ptr = std::make_shared(spec);
+ int8_t format_version = transaction_->table()->metadata()->format_version;
+ std::optional snapshot_id =
+ snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt;
+
+ // Create factory function for rolling manifest writer
+ RollingManifestWriter::ManifestWriterFactory factory =
+ [this, spec_ptr, current_schema, format_version,
+ snapshot_id]() -> Result> {
+ std::string manifest_path = ManifestPath();
+
+ if (format_version == 1) {
+ return ManifestWriter::MakeV1Writer(snapshot_id, manifest_path,
+ transaction_->table()->io(), spec_ptr,
+ current_schema);
+ } else if (format_version == 2) {
+ return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path,
+ transaction_->table()->io(), spec_ptr,
+ current_schema, ManifestContent::kData);
+ } else { // format_version == 3
+ std::optional first_row_id =
+ transaction_->table()->metadata()->next_row_id;
+ return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path,
+ transaction_->table()->io(), spec_ptr,
+ current_schema, ManifestContent::kData);
+ }
+ };
+
+ // Create rolling manifest writer
+ RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_);
+
+ // Write all files
+ for (const auto& file : data_files) {
+ ICEBERG_RETURN_UNEXPECTED(
+ rolling_writer.WriteAddedEntry(std::make_shared(file)));
+ }
+
+ // Close the rolling writer
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+
+ // Get all manifest files
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles());
+
+ return manifest_files;
+}
+
+Result> SnapshotUpdate::WriteDeleteManifests(
+ const std::vector& delete_files, const PartitionSpec& spec) {
+ if (delete_files.empty()) {
+ return std::vector{};
+ }
+
+ int8_t format_version = transaction_->current().format_version;
+ if (format_version < 2) {
+ // Delete manifests are only supported in format version 2+
+ return std::vector{};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto current_schema, transaction_->current().Schema());
+
+ std::shared_ptr spec_ptr = std::make_shared(spec);
+ std::optional snapshot_id =
+ snapshot_id_ ? std::make_optional(*snapshot_id_) : std::nullopt;
+
+ // Create factory function for rolling manifest writer
+ RollingManifestWriter::ManifestWriterFactory factory =
+ [this, spec_ptr, current_schema, format_version,
+ snapshot_id]() -> Result> {
+ std::string manifest_path = ManifestPath();
+
+ if (format_version == 2) {
+ return ManifestWriter::MakeV2Writer(snapshot_id, manifest_path,
+ transaction_->table()->io(), spec_ptr,
+ current_schema, ManifestContent::kDeletes);
+ } else { // format_version == 3
+ std::optional first_row_id =
+ transaction_->table()->metadata()->next_row_id;
+ return ManifestWriter::MakeV3Writer(snapshot_id, first_row_id, manifest_path,
+ transaction_->table()->io(), spec_ptr,
+ current_schema, ManifestContent::kDeletes);
+ }
+ };
+
+ // Create rolling manifest writer
+ RollingManifestWriter rolling_writer(factory, target_manifest_size_bytes_);
+
+ // Write all delete files
+ for (const auto& file : delete_files) {
+ ICEBERG_RETURN_UNEXPECTED(
+ rolling_writer.WriteAddedEntry(std::make_shared(file)));
+ }
+
+ // Close the rolling writer
+ ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+
+ // Get all manifest files
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, rolling_writer.ToManifestFiles());
+
+ return manifest_files;
+}
+
+Result SnapshotUpdate::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ // Refresh metadata to get latest state
+ // Note: In Java, refresh() is called here, but in C++ transaction_->current()
+ // already provides the current state, so we don't need to refresh explicitly
+
+ // Get the latest snapshot for the target branch
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto parent_snapshot,
+ SnapshotUtil::LatestSnapshot(transaction_->current(), target_branch_));
+
+ // Generate snapshot ID (cache it if not already set)
+ // Check if snapshot ID already exists in current metadata and regenerate if needed
+ if (!snapshot_id_.has_value()) {
+ snapshot_id_ = SnapshotUtil::GenerateSnapshotId(*transaction_->table());
+ }
+ // Check against current metadata (like Java does with
+ // ops.current().snapshot(snapshotId))
+ while (true) {
+ auto existing_snapshot_result = transaction_->current().SnapshotById(*snapshot_id_);
+ if (!existing_snapshot_result.has_value()) {
+ break; // Snapshot ID doesn't exist, we can use it
+ }
+ // Snapshot ID already exists, generate a new one
+ snapshot_id_ = SnapshotUtil::GenerateSnapshotId(*transaction_->table());
+ }
+ int64_t new_snapshot_id = *snapshot_id_;
+ std::optional parent_snapshot_id =
+ parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) : std::nullopt;
+
+ // Validate the update
+ ICEBERG_RETURN_UNEXPECTED(Validate(transaction_->current(), parent_snapshot));
+
+ // Get sequence number
+ int64_t sequence_number = transaction_->current().last_sequence_number + 1;
+
+ // Apply changes to get manifest files
+ std::vector manifests = Apply(transaction_->current(), parent_snapshot);
+
+ // Write manifest list
+ std::string manifest_list_path = ManifestListPath();
+ manifest_lists_.push_back(manifest_list_path);
+
+ // Create manifest list writer based on format version
+ int8_t format_version = transaction_->current().format_version;
+ std::unique_ptr writer;
+
+ if (format_version == 1) {
+ ICEBERG_ASSIGN_OR_RAISE(writer, ManifestListWriter::MakeV1Writer(
+ new_snapshot_id, parent_snapshot_id,
+ manifest_list_path, transaction_->table()->io()));
+ } else if (format_version == 2) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ writer, ManifestListWriter::MakeV2Writer(new_snapshot_id, parent_snapshot_id,
+ sequence_number, manifest_list_path,
+ transaction_->table()->io()));
+ } else { // format_version == 3
+ int64_t first_row_id = transaction_->current().next_row_id;
+ ICEBERG_ASSIGN_OR_RAISE(
+ writer, ManifestListWriter::MakeV3Writer(
+ new_snapshot_id, parent_snapshot_id, sequence_number, first_row_id,
+ manifest_list_path, transaction_->table()->io()));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+ // Get nextRowId and assignedRows for format version 3
+ std::optional next_row_id;
+ std::optional assigned_rows;
+ if (format_version >= 3) {
+ next_row_id = transaction_->current().next_row_id;
+ if (writer->next_row_id().has_value() && next_row_id.has_value()) {
+ assigned_rows = writer->next_row_id().value() - next_row_id.value();
+ }
+ }
+
+ // Compute summary
+ std::unordered_map summary =
+ ComputeSummary(transaction_->table()->metadata());
+
+ // Validate REPLACE operation if applicable
+ std::string op = operation();
+ if (!op.empty() && op == DataOperation::kReplace) {
+ auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords);
+ auto replaced_records_it = summary.find(SnapshotSummaryFields::kDeletedRecords);
+ if (added_records_it != summary.end() && replaced_records_it != summary.end()) {
+ int64_t added_records = 0;
+ int64_t replaced_records = 0;
+ auto [_, ec1] = std::from_chars(
+ added_records_it->second.data(),
+ added_records_it->second.data() + added_records_it->second.size(),
+ added_records);
+ auto [__, ec2] = std::from_chars(
+ replaced_records_it->second.data(),
+ replaced_records_it->second.data() + replaced_records_it->second.size(),
+ replaced_records);
+ if (ec1 == std::errc() && ec2 == std::errc() && added_records > replaced_records) {
+ return InvalidArgument(
+ "Invalid REPLACE operation: {} added records > {} replaced records",
+ added_records, replaced_records);
+ }
+ }
+ }
+
+ // Get current time
+ auto now = std::chrono::system_clock::now();
+ auto duration_since_epoch = now.time_since_epoch();
+ TimePointMs timestamp_ms = std::chrono::time_point_cast(
+ std::chrono::system_clock::time_point(duration_since_epoch));
+
+ // Get schema ID
+ std::optional schema_id = transaction_->current().current_schema_id;
+
+ // Create snapshot
+ staged_snapshot_ =
+ std::make_shared(Snapshot{.snapshot_id = new_snapshot_id,
+ .parent_snapshot_id = parent_snapshot_id,
+ .sequence_number = sequence_number,
+ .timestamp_ms = timestamp_ms,
+ .manifest_list = manifest_list_path,
+ .summary = std::move(summary),
+ .schema_id = schema_id});
+
+ // Build metadata update
+ auto builder = TableMetadataBuilder::BuildFrom(&transaction_->current());
+
+ // Check if this is a rollback (snapshot already exists in current metadata)
+ auto existing_snapshot_result =
+ transaction_->current().SnapshotById(staged_snapshot_->snapshot_id);
+ if (existing_snapshot_result.has_value()) {
+ // Rollback operation - set branch snapshot to existing snapshot ID
+ builder->SetBranchSnapshot(staged_snapshot_->snapshot_id, target_branch_);
+ } else if (stage_only_) {
+ // Stage only - add snapshot but don't set as current
+ builder->AddSnapshot(staged_snapshot_);
+ } else {
+ // Normal commit - add snapshot first, then set as branch snapshot
+ builder->AddSnapshot(staged_snapshot_);
+ builder->SetBranchSnapshot(staged_snapshot_->snapshot_id, target_branch_);
+ }
+
+ // Build updated metadata
+ ICEBERG_ASSIGN_OR_RAISE(auto updated_metadata, builder->Build());
+
+ // Return the staged snapshot
+ return ApplyResult{.snapshot = staged_snapshot_, .target_branch = target_branch_};
+}
+
+Status SnapshotUpdate::Commit() {
+ auto status = transaction_->Apply(*this);
+
+ // Cleanup after successful commit
+ if (status.has_value() && staged_snapshot_ && cleanup_after_commit()) {
+ // CleanUncommitted();//TODO: Implement
+ // Clean up unused manifest lists
+ for (const auto& manifest_list : manifest_lists_) {
+ if (manifest_list != staged_snapshot_->manifest_list) {
+ DeleteFile(manifest_list);
+ }
+ }
+ }
+
+ return status;
+}
+
+void SnapshotUpdate::SetTargetBranch(const std::string& branch) {
+ if (branch.empty()) {
+ AddError(ErrorKind::kInvalidArgument, "Invalid branch name: empty");
+ return;
+ }
+
+ auto ref_it = transaction_->current().refs.find(branch);
+ if (ref_it != transaction_->current().refs.end()) {
+ if (ref_it->second->type() != SnapshotRefType::kBranch) {
+ AddError(
+ ErrorKind::kInvalidArgument,
+ "{} is a tag, not a branch. Tags cannot be targets for producing snapshots",
+ branch);
+ return;
+ }
+ }
+
+ target_branch_ = branch;
+}
+
+std::unordered_map SnapshotUpdate::ComputeSummary(
+ const std::shared_ptr& previous) {
+ std::unordered_map summary = Summary();
+
+ // Get previous summary from the target branch
+ std::unordered_map previous_summary;
+ if (auto ref_it = transaction_->current().refs.find(target_branch_);
+ ref_it != transaction_->current().refs.end()) {
+ auto snapshot_result =
+ transaction_->current().SnapshotById(ref_it->second->snapshot_id);
+ if (snapshot_result.has_value() && (*snapshot_result)->summary.size() > 0) {
+ previous_summary = (*snapshot_result)->summary;
+ }
+ }
+
+ // If no previous summary, initialize with zeros
+ if (previous_summary.empty()) {
+ previous_summary[SnapshotSummaryFields::kTotalRecords] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0";
+ previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0";
+ }
+
+ // Copy all summary properties from the implementation
+ for (const auto& [key, value] : summary_properties_) {
+ summary[key] = value;
+ }
+
+ // Update totals
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalRecords,
+ SnapshotSummaryFields::kAddedRecords,
+ SnapshotSummaryFields::kDeletedRecords);
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalFileSize,
+ SnapshotSummaryFields::kAddedFileSize,
+ SnapshotSummaryFields::kRemovedFileSize);
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles,
+ SnapshotSummaryFields::kAddedDataFiles,
+ SnapshotSummaryFields::kDeletedDataFiles);
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalDeleteFiles,
+ SnapshotSummaryFields::kAddedDeleteFiles,
+ SnapshotSummaryFields::kRemovedDeleteFiles);
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalPosDeletes,
+ SnapshotSummaryFields::kAddedPosDeletes,
+ SnapshotSummaryFields::kRemovedPosDeletes);
+ UpdateTotal(summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes,
+ SnapshotSummaryFields::kAddedEqDeletes,
+ SnapshotSummaryFields::kRemovedEqDeletes);
+
+ return summary;
+}
+
+void SnapshotUpdate::CleanAll() {
+ for (const auto& manifest_list : manifest_lists_) {
+ DeleteFile(manifest_list);
+ }
+ manifest_lists_.clear();
+ // Pass empty set - subclasses will implement CleanUncommitted
+ CleanUncommitted(std::unordered_set{});
+}
+
+Status SnapshotUpdate::DeleteFile(const std::string& path) { return delete_func_(path); }
+
+std::string SnapshotUpdate::ManifestListPath() {
+ // Generate manifest list path
+ // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro
+ int64_t snapshot_id = snapshot_id_
+ ? *snapshot_id_
+ : SnapshotUtil::GenerateSnapshotId(*transaction_->table());
+ std::string filename = std::format("snap-{}-{}-{}.avro", snapshot_id,
+ attempt_.fetch_add(1) + 1, commit_uuid_);
+ return std::format("{}/metadata/{}", transaction_->table()->location(), filename);
+}
+
+std::string SnapshotUpdate::ManifestPath() {
+ // Generate manifest path
+ // Format: {metadata_location}/{uuid}-m{manifest_count}.avro
+ int32_t count = manifest_count_.fetch_add(1);
+ std::string filename = std::format("{}-m{}.avro", commit_uuid_, count);
+ return std::format("{}/metadata/{}", transaction_->table()->location(), filename);
+}
+
+void SnapshotUpdate::UpdateTotal(
+ std::unordered_map& summary,
+ const std::unordered_map& previous_summary,
+ const std::string& total_property, const std::string& added_property,
+ const std::string& deleted_property) {
+ auto total_it = previous_summary.find(total_property);
+ if (total_it != previous_summary.end()) {
+ int64_t new_total;
+ auto [_, ec] =
+ std::from_chars(total_it->second.data(),
+ total_it->second.data() + total_it->second.size(), new_total);
+ if (ec != std::errc()) [[unlikely]] {
+ // Ignore and do not add total
+ return;
+ }
+
+ auto added_it = summary.find(added_property);
+ if (new_total >= 0 && added_it != summary.end()) {
+ int64_t added_value;
+ auto [_, ec] =
+ std::from_chars(added_it->second.data(),
+ added_it->second.data() + added_it->second.size(), added_value);
+ if (ec == std::errc()) [[unlikely]] {
+ new_total += added_value;
+ }
+ }
+
+ auto deleted_it = summary.find(deleted_property);
+ if (new_total >= 0 && deleted_it != summary.end()) {
+ int64_t deleted_value;
+ auto [_, ec] = std::from_chars(
+ deleted_it->second.data(),
+ deleted_it->second.data() + deleted_it->second.size(), deleted_value);
+ if (ec == std::errc()) [[unlikely]] {
+ new_total -= deleted_value;
+ }
+ }
+
+ if (new_total >= 0) {
+ summary[total_property] = std::to_string(new_total);
+ }
+ }
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.h b/src/iceberg/update/snapshot_update.h
new file mode 100644
index 000000000..63683f773
--- /dev/null
+++ b/src/iceberg/update/snapshot_update.h
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "iceberg/catalog.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+namespace iceberg {
+
+/// \brief Base class for operations that produce snapshots.
+///
+/// This class provides common functionality for creating new snapshots,
+/// including manifest list writing, commit retries, and cleanup.
+///
+class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
+ public:
+ /// \brief Result of applying a snapshot update
+ struct ApplyResult {
+ /// \brief The new snapshot
+ std::shared_ptr snapshot;
+ std::string target_branch;
+ };
+
+ ~SnapshotUpdate() override = default;
+
+ /// \brief Set a summary property in the snapshot produced by this update.
+ ///
+ /// \param property A string property name
+ /// \param value A string property value
+ /// \return Reference to this for method chaining
+ /// \tparam T The concrete subclass type
+ template
+ requires std::is_base_of_v
+ T& Set(const std::string& property, const std::string& value) {
+ summary_properties_[property] = value;
+ return *this;
+ }
+
+ /// \brief Set a callback to delete files instead of the table's default.
+ ///
+ /// \param delete_func A function used to delete file locations
+ /// \return Reference to this for method chaining
+ /// \tparam T The concrete subclass type
+ template
+ requires std::is_base_of_v
+ T& DeleteWith(std::function delete_func) {
+ delete_func_ = std::move(delete_func);
+ return static_cast(*this);
+ }
+
+ /// \brief Stage a snapshot in table metadata, but not update the current snapshot id.
+ ///
+ /// \return Reference to this for method chaining
+ /// \tparam T The concrete subclass type
+ template
+ requires std::is_base_of_v
+ T& StageOnly() {
+ stage_only_ = true;
+ return static_cast(*this);
+ }
+
+ /// \brief Apply the update's changes to create a new snapshot.
+ ///
+ /// This method validates the changes, applies them to the metadata,
+ /// and creates a new snapshot without committing it. The snapshot
+ /// is stored internally and can be accessed after Apply() succeeds.
+ ///
+ /// \return A result containing the new snapshot, or an error
+ Result Apply();
+
+ /// \brief Commits the snapshot changes to the table.
+ ///
+ /// This method applies the changes and commits them through the catalog.
+ /// It handles retries and cleanup of uncommitted files.
+ ///
+ /// \return Status::OK if the commit was successful, or an error
+ Status Commit() override;
+
+ protected:
+ explicit SnapshotUpdate(std::shared_ptr transaction);
+
+ /// \brief Write data manifests for the given data files
+ ///
+ /// \param data_files The data files to write
+ /// \param spec The partition spec to use
+ /// \return A vector of manifest files
+ Result> WriteDataManifests(
+ const std::vector& data_files, const PartitionSpec& spec);
+
+ /// \brief Write delete manifests for the given delete files
+ ///
+ /// \param delete_files The delete files to write
+ /// \param spec The partition spec to use
+ /// \return A vector of manifest files
+ Result> WriteDeleteManifests(
+ const std::vector& delete_files, const PartitionSpec& spec);
+
+ /// \brief Get the target branch name
+ const std::string& target_branch() const { return target_branch_; }
+
+ /// \brief Set the target branch name
+ void SetTargetBranch(const std::string& branch);
+
+ /// \brief Check if snapshot ID inheritance is enabled
+ bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; }
+
+ /// \brief Get the target manifest size in bytes
+ int64_t target_manifest_size_bytes() const { return target_manifest_size_bytes_; }
+
+ /// \brief Get the commit UUID
+ const std::string& commit_uuid() const { return commit_uuid_; }
+
+ /// \brief Get the attempt number
+ int32_t attempt() const { return attempt_.load(); }
+
+ /// \brief Get the manifest count
+ int32_t manifest_count() const { return manifest_count_.load(); }
+
+ /// \brief Clean up any uncommitted manifests that were created.
+ ///
+ /// Manifests may not be committed if apply is called multiple times
+ /// because a commit conflict has occurred. Implementations may keep
+ /// around manifests because the same changes will be made by both
+ /// apply calls. This method instructs the implementation to clean up
+ /// those manifests and passes the paths of the manifests that were
+ /// actually committed.
+ ///
+ /// \param committed A set of manifest paths that were actually committed
+ virtual void CleanUncommitted(const std::unordered_set& committed) = 0;
+
+ /// \brief A string that describes the action that produced the new snapshot.
+ ///
+ /// \return A string operation name
+ virtual std::string operation() = 0;
+
+ /// \brief Validate the current metadata.
+ ///
+ /// Child operations can override this to add custom validation.
+ ///
+ /// \param current_metadata Current table metadata to validate
+ /// \param snapshot Ending snapshot on the lineage which is being validated
+ virtual Status Validate(const TableMetadata& current_metadata,
+ const std::shared_ptr& snapshot) = 0;
+
+ /// \brief Apply the update's changes to the given metadata and snapshot.
+ /// Return the new manifest list.
+ ///
+ /// \param metadata_to_update The base table metadata to apply changes to
+ /// \param snapshot Snapshot to apply the changes to
+ /// \return A vector of manifest files for the new snapshot
+ virtual std::vector Apply(const TableMetadata& metadata_to_update,
+ const std::shared_ptr& snapshot) = 0;
+
+ /// \brief Get the summary map for this operation.
+ ///
+ /// \return A map of summary properties
+ virtual std::unordered_map Summary() = 0;
+
+ /// \brief Check if cleanup should happen after commit
+ ///
+ /// \return True if cleanup should happen after commit
+ virtual bool cleanup_after_commit() const { return true; }
+
+ private:
+ /// \brief Compute the final summary including totals
+ std::unordered_map ComputeSummary(
+ const std::shared_ptr& previous);
+
+ /// \brief Clean up all uncommitted files
+ void CleanAll();
+
+ /// \brief Delete a file using the configured delete function
+ Status DeleteFile(const std::string& path);
+
+ /// \brief Get the path for a manifest list file
+ std::string ManifestListPath();
+
+ /// \brief Get the path for a manifest file
+ std::string ManifestPath();
+
+ /// \brief Update a total property in the summary
+ void UpdateTotal(std::unordered_map& summary,
+ const std::unordered_map& previous_summary,
+ const std::string& total_property, const std::string& added_property,
+ const std::string& deleted_property);
+
+ int32_t format_version_;
+ std::shared_ptr spec_;
+ std::shared_ptr schema_;
+
+ std::unordered_map summary_properties_;
+ std::function delete_func_;
+ bool stage_only_ = false;
+ std::string target_branch_ = std::string(SnapshotRef::kMainBranch);
+
+ std::optional snapshot_id_{std::nullopt};
+ std::atomic attempt_{0};
+ std::atomic manifest_count_{0};
+ std::vector manifest_lists_;
+ std::string commit_uuid_;
+ std::shared_ptr staged_snapshot_;
+
+ int64_t target_manifest_size_bytes_;
+ // For format version > 1, inheritance is enabled by default
+ bool can_inherit_snapshot_id_{true};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc
index e76426f3c..3391d4b93 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -26,6 +26,7 @@
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/timepoint.h"
+#include "iceberg/util/uuid.h"
namespace iceberg {
@@ -320,4 +321,17 @@ Result> SnapshotUtil::LatestSnapshot(
return metadata.SnapshotById(it->second->snapshot_id);
}
+int64_t SnapshotUtil::GenerateSnapshotId() {
+ auto uuid = Uuid::GenerateV7();
+ return (uuid.highbits() ^ uuid.lowbits()) & std::numeric_limits::max();
+}
+
+int64_t SnapshotUtil::GenerateSnapshotId(const Table& table) {
+ auto snapshot_id = GenerateSnapshotId();
+ while (table.SnapshotById(snapshot_id).has_value()) {
+ snapshot_id = GenerateSnapshotId();
+ }
+ return snapshot_id;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h
index e0d8830ff..53b4ba967 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -247,6 +247,15 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result> LatestSnapshot(const TableMetadata& metadata,
const std::string& branch);
+ /// \brief Generate a new snapshot ID.
+ static int64_t GenerateSnapshotId();
+
+ /// \brief Generate a new snapshot ID for the given table.
+ ///
+ /// \param table The table
+ /// \return A new snapshot ID
+ static int64_t GenerateSnapshotId(const Table& table);
+
private:
/// \brief Helper function to traverse ancestors of a snapshot.
///
diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc
index 9322deb93..8fea859c2 100644
--- a/src/iceberg/util/uuid.cc
+++ b/src/iceberg/util/uuid.cc
@@ -217,4 +217,16 @@ std::string Uuid::ToString() const {
data_[15]);
}
+int64_t Uuid::highbits() const {
+ int64_t result;
+ std::memcpy(&result, data_.data(), 8);
+ return result;
+}
+
+int64_t Uuid::lowbits() const {
+ int64_t result;
+ std::memcpy(&result, data_.data() + 8, 8);
+ return result;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h
index 64db7c5d6..923736ecc 100644
--- a/src/iceberg/util/uuid.h
+++ b/src/iceberg/util/uuid.h
@@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable {
return lhs.data_ == rhs.data_;
}
+ int64_t highbits() const;
+ int64_t lowbits() const;
+
private:
std::array data_;
};