-
Notifications
You must be signed in to change notification settings - Fork 76
feat: add FileWriter base interface for data file writers #446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
shangxinli
wants to merge
5
commits into
apache:main
Choose a base branch
from
shangxinli:data_file_writers
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+353
−0
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
657f22f
feat: add FileWriter base interface for data file writers
shangxinli 5cae029
fix: apply clang-format and use default member initializers
shangxinli 994e7da
Update src/iceberg/data/writer.h
shangxinli ba6d0a0
Merge branch 'main' into data_file_writers
shangxinli 0b5f880
build: move data_writer_test to ICEBERG_BUILD_BUNDLE block
shangxinli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| iceberg_install_all_headers(iceberg/data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "iceberg/data/writer.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| // FileWriter is a pure virtual interface class. | ||
| // Implementations will be provided in subsequent tasks. | ||
|
|
||
| } // namespace iceberg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| /// \file iceberg/data/writer.h | ||
| /// Base interface for Iceberg data file writers. | ||
|
|
||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include "iceberg/arrow_c_data.h" | ||
| #include "iceberg/iceberg_export.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/result.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| /// \brief Base interface for data file writers. | ||
| /// | ||
| /// This interface defines the common operations for writing Iceberg data files, | ||
| /// including data files, equality delete files, and position delete files. | ||
| /// | ||
| /// Typical usage: | ||
| /// 1. Create a writer instance (via concrete implementation) | ||
| /// 2. Call Write() one or more times to write data | ||
| /// 3. Call Close() to finalize the file | ||
| /// 4. Call Metadata() to get file metadata (only valid after Close()) | ||
| /// | ||
| /// \note This interface is not thread-safe. Concurrent calls to Write() | ||
| /// from multiple threads on the same instance are not supported. | ||
| class ICEBERG_EXPORT FileWriter { | ||
| public: | ||
| virtual ~FileWriter() = default; | ||
|
|
||
| /// \brief Write a batch of records. | ||
| /// | ||
| /// \param data Arrow array containing the records to write. | ||
| /// \return Status indicating success or failure. | ||
| virtual Status Write(ArrowArray* data) = 0; | ||
|
|
||
| /// \brief Get the current number of bytes written. | ||
| /// | ||
| /// \return Result containing the number of bytes written or an error. | ||
| virtual Result<int64_t> Length() const = 0; | ||
|
|
||
| /// \brief Close the writer and finalize the file. | ||
| /// | ||
| /// \return Status indicating success or failure. | ||
| virtual Status Close() = 0; | ||
|
|
||
| /// \brief File metadata for all files produced by the writer. | ||
| struct ICEBERG_EXPORT WriteResult { | ||
| /// Usually a writer produces a single data or delete file. | ||
| /// Position delete writer may produce multiple file-scoped delete files. | ||
| /// In the future, multiple files can be produced if file rolling is supported. | ||
| std::vector<std::shared_ptr<DataFile>> data_files; | ||
| }; | ||
|
|
||
| /// \brief Get file metadata for all files produced by this writer. | ||
| /// | ||
| /// This method should be called after Close() to retrieve the metadata | ||
| /// for all files written by this writer. | ||
| /// | ||
| /// \return Result containing the write result or an error. | ||
| virtual Result<WriteResult> Metadata() = 0; | ||
| }; | ||
|
|
||
| } // namespace iceberg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,218 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include <gmock/gmock.h> | ||
| #include <gtest/gtest.h> | ||
|
|
||
| #include "iceberg/arrow_c_data.h" | ||
| #include "iceberg/data/writer.h" | ||
| #include "iceberg/manifest/manifest_entry.h" | ||
| #include "iceberg/result.h" | ||
| #include "iceberg/test/matchers.h" | ||
|
|
||
| namespace iceberg { | ||
|
|
||
| // Mock implementation of FileWriter for testing | ||
| class MockFileWriter : public FileWriter { | ||
| public: | ||
| MockFileWriter() = default; | ||
|
|
||
| Status Write(ArrowArray* data) override { | ||
| if (is_closed_) { | ||
| return Invalid("Writer is closed"); | ||
| } | ||
| if (data == nullptr) { | ||
| return Invalid("Null data provided"); | ||
| } | ||
| write_count_++; | ||
| // Simulate writing some bytes | ||
| bytes_written_ += 1024; | ||
| return {}; | ||
| } | ||
|
|
||
| Result<int64_t> Length() const override { return bytes_written_; } | ||
|
|
||
| Status Close() override { | ||
| if (is_closed_) { | ||
| return Invalid("Writer already closed"); | ||
| } | ||
| is_closed_ = true; | ||
| return {}; | ||
| } | ||
|
|
||
| Result<WriteResult> Metadata() override { | ||
| if (!is_closed_) { | ||
| return Invalid("Writer must be closed before getting metadata"); | ||
| } | ||
|
|
||
| WriteResult result; | ||
| auto data_file = std::make_shared<DataFile>(); | ||
| data_file->file_path = "/test/data/file.parquet"; | ||
| data_file->file_format = FileFormatType::kParquet; | ||
| data_file->record_count = write_count_ * 100; | ||
| data_file->file_size_in_bytes = bytes_written_; | ||
| result.data_files.push_back(data_file); | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| bool is_closed() const { return is_closed_; } | ||
| int32_t write_count() const { return write_count_; } | ||
|
|
||
| private: | ||
| int64_t bytes_written_ = 0; | ||
| bool is_closed_ = false; | ||
| int32_t write_count_ = 0; | ||
| }; | ||
|
|
||
| TEST(FileWriterTest, BasicWriteOperation) { | ||
| MockFileWriter writer; | ||
|
|
||
| // Create a dummy ArrowArray (normally this would contain actual data) | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_EQ(writer.write_count(), 1); | ||
|
|
||
| auto length_result = writer.Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| ASSERT_EQ(*length_result, 1024); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MultipleWrites) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| // Write multiple times | ||
| for (int i = 0; i < 5; i++) { | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| } | ||
|
|
||
| ASSERT_EQ(writer.write_count(), 5); | ||
|
|
||
| auto length_result = writer.Length(); | ||
| ASSERT_THAT(length_result, IsOk()); | ||
| ASSERT_EQ(*length_result, 5120); // 5 * 1024 | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteNullData) { | ||
| MockFileWriter writer; | ||
|
|
||
| auto status = writer.Write(nullptr); | ||
| ASSERT_THAT(status, HasErrorMessage("Null data provided")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, CloseWriter) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_FALSE(writer.is_closed()); | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
| ASSERT_TRUE(writer.is_closed()); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, DoubleClose) { | ||
| MockFileWriter writer; | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
| auto status = writer.Close(); | ||
| ASSERT_THAT(status, HasErrorMessage("Writer already closed")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteAfterClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
|
|
||
| auto status = writer.Write(&dummy_array); | ||
| ASSERT_THAT(status, HasErrorMessage("Writer is closed")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MetadataBeforeClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
|
|
||
| auto metadata_result = writer.Metadata(); | ||
| ASSERT_THAT(metadata_result, | ||
| HasErrorMessage("Writer must be closed before getting metadata")); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, MetadataAfterClose) { | ||
| MockFileWriter writer; | ||
| ArrowArray dummy_array = {}; | ||
|
|
||
| // Write some data | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
| ASSERT_THAT(writer.Write(&dummy_array), IsOk()); | ||
|
|
||
| // Close the writer | ||
| ASSERT_THAT(writer.Close(), IsOk()); | ||
|
|
||
| // Get metadata | ||
| auto metadata_result = writer.Metadata(); | ||
| ASSERT_THAT(metadata_result, IsOk()); | ||
|
|
||
| const auto& result = *metadata_result; | ||
| ASSERT_EQ(result.data_files.size(), 1); | ||
|
|
||
| const auto& data_file = result.data_files[0]; | ||
| ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); | ||
| ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); | ||
| ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records | ||
| ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 | ||
| } | ||
|
|
||
| TEST(FileWriterTest, WriteResultStructure) { | ||
| FileWriter::WriteResult result; | ||
|
|
||
| // Test that WriteResult can hold multiple data files | ||
| auto data_file1 = std::make_shared<DataFile>(); | ||
| data_file1->file_path = "/test/data/file1.parquet"; | ||
| data_file1->record_count = 100; | ||
|
|
||
| auto data_file2 = std::make_shared<DataFile>(); | ||
| data_file2->file_path = "/test/data/file2.parquet"; | ||
| data_file2->record_count = 200; | ||
|
|
||
| result.data_files.push_back(data_file1); | ||
| result.data_files.push_back(data_file2); | ||
|
|
||
| ASSERT_EQ(result.data_files.size(), 2); | ||
| ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet"); | ||
| ASSERT_EQ(result.data_files[0]->record_count, 100); | ||
| ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet"); | ||
| ASSERT_EQ(result.data_files[1]->record_count, 200); | ||
| } | ||
|
|
||
| TEST(FileWriterTest, EmptyWriteResult) { | ||
| FileWriter::WriteResult result; | ||
| ASSERT_EQ(result.data_files.size(), 0); | ||
| ASSERT_TRUE(result.data_files.empty()); | ||
| } | ||
|
|
||
| } // namespace iceberg | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, we don't need to add test cases for a pure interface classes. We can keep it for now and then remove all these cases once we have implemented
DataWriter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure