Conversation
There was a problem hiding this comment.
Pull request overview
Adds an initial CSV-backed Datasource implementation (Arrow schema + record batches) and wires it into the CMake build/test setup, addressing issues #30 and #31.
Changes:
- Introduces
types(Arrow-backed vectors +RecordBatch) anddatasourcelibraries, including a CSV datasource + iterator. - Adds GoogleTest coverage and sample CSV fixtures for schema inference and projection.
- Restructures library CMake targets (replaces
plannerwithlogical_plan) and updates top-level build wiring.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
test/datasource_test.cpp |
Adds unit tests for CSV schema inference, provided schema, and projection. |
test/data/csv/without_header.csv |
Adds CSV fixture without header. |
test/data/csv/with_header.csv |
Adds CSV fixture with header. |
test/data/csv/invalid.csv |
Adds invalid CSV fixture for negative-path testing. |
test/CMakeLists.txt |
Adds a new datasource_test executable and registers it with CTest. |
src/main.cpp |
Minor formatting/style adjustments to main. |
lib/types/types.h |
Introduces Arrow type aliases, ColumnVector abstraction, and RecordBatch. |
lib/types/types.cpp |
Implements ArrowFieldVector value accessors. |
lib/types/CMakeLists.txt |
Adds types library target and links Arrow. |
lib/planner/CMakeLists.txt |
Removes old planner library target. |
lib/logical-plan/planner.h |
Minor formatting adjustment. |
lib/logical-plan/planner.cpp |
Reorders includes / formatting. |
lib/logical-plan/CMakeLists.txt |
Adds logical_plan library target. |
lib/datasource/datasource.h |
Defines Datasource interface and CsvDatasource API. |
lib/datasource/csv_datasource_iter.h |
Declares a generic iterator interface and CSV iterator. |
lib/datasource/csv_datasource_iter.cpp |
Implements CSV scanning into Arrow arrays / RecordBatch. |
lib/datasource/csv_datasource.cpp |
Implements schema inference and projection schema construction. |
lib/datasource/CMakeLists.txt |
Adds datasource library target and links types. |
cmake/AddGoogleTest.cmake |
Tweaks googletest FetchContent configuration (compile options commented out). |
CMakeLists.txt |
Adds Arrow dependency and new subdirectories/targets wiring. |
.clang-format |
Introduces project formatting configuration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto get_path(const std::string &file_name) -> std::string | ||
| { | ||
| std::filesystem::path cwd(__FILE__); | ||
| return cwd.parent_path().append("data").append("csv").append(file_name); |
There was a problem hiding this comment.
get_path returns a std::filesystem::path expression, but the function return type is std::string. std::filesystem::path is not implicitly convertible to std::string, so this won’t compile as written; convert explicitly (e.g., via .string()).
| return cwd.parent_path().append("data").append("csv").append(file_name); | |
| return cwd.parent_path().append("data").append("csv").append(file_name).string(); |
| this->fields = std::move(fields); | ||
| } | ||
|
|
||
| auto RowCount() -> int { return fields[0]->Size(); } |
There was a problem hiding this comment.
RecordBatch::RowCount() unconditionally indexes fields[0], which will crash if a RecordBatch is constructed with zero columns (e.g., empty projection / schema inference failure). Handle the empty-fields case explicitly (return 0 or throw) before indexing.
| auto RowCount() -> int { return fields[0]->Size(); } | |
| auto RowCount() -> int | |
| { | |
| if (fields.empty()) | |
| { | |
| return 0; | |
| } | |
| return fields[0]->Size(); | |
| } |
| std::any value; | ||
| int size; | ||
|
|
||
| public: |
There was a problem hiding this comment.
LiteralValueVector’s arrowType, value, and size members are never initialized (no constructor), so any use of this type will be undefined behavior. Add a constructor (or in-class initializers) to fully initialize these members.
| public: | |
| public: | |
| // Default constructor initializes members to a safe "empty" state. | |
| LiteralValueVector() | |
| : arrowType(arrow::Type::NA), value(), size(0) | |
| { | |
| } | |
| // Convenience constructor to create a literal vector with a given value and size. | |
| LiteralValueVector(arrow::Type::type type, std::any literal_value, int literal_size) | |
| : arrowType(type), value(std::move(literal_value)), size(literal_size) | |
| { | |
| } |
|
|
||
| CsvDatasource::CsvDatasource(std::optional<std::shared_ptr<Schema>> schema, const String &file_name) | ||
| : schema_(std::move(schema)), file_(file_name) | ||
| { |
There was a problem hiding this comment.
The CSV file stream is opened in the member initializer list, but there’s no check that file_ actually opened successfully. If the file can’t be opened, infer_schema() will read an empty line and downstream code can produce empty schemas / crashes; validate file_.is_open() (and/or file_.good()) and fail fast with a clear error.
| { | |
| { | |
| if (!file_.is_open() || !file_.good()) | |
| { | |
| throw std::runtime_error("failed to open CSV file for reading"); | |
| } |
| auto CsvDatasource::create_final_schema(const std::vector<String> &projection) -> std::shared_ptr<Schema> | ||
| { | ||
| std::vector<std::shared_ptr<arrow::Field>> projected_fields; | ||
| if (!schema_.has_value()) | ||
| { | ||
| throw std::runtime_error("error creating final schema"); | ||
| } | ||
|
|
||
| auto schema = schema_.value(); | ||
| for (int i = 0; i < int(schema->fields().size()); i++) | ||
| { | ||
| auto field = schema->fields()[i]; | ||
| if (std::ranges::contains(projection, field->name())) | ||
| { | ||
| auto meta = std::unordered_map<std::string, std::string>{{"idx", std::to_string(i)}}; | ||
| auto metadata = std::make_shared<arrow::KeyValueMetadata>(meta); | ||
| auto field_with_meta = field->WithMetadata(metadata); | ||
| projected_fields.push_back(field_with_meta); | ||
| } | ||
| } | ||
|
|
||
| return arrow::schema(projected_fields); | ||
| } |
There was a problem hiding this comment.
create_final_schema returns a schema containing only fields present in projection. If projection is empty or contains no matching column names, this returns an empty schema, which later leads to constructing a RecordBatch with zero columns (and RowCount() currently crashes). Decide on a contract (e.g., empty projection means “all columns”, or throw on empty/no-match) and enforce it here.
| arrow::StringBuilder builder; | ||
| auto appen_status = builder.AppendValues(items); | ||
| std::shared_ptr<arrow::Array> array; | ||
| auto stat = builder.Finish().Value(&array); | ||
|
|
||
| auto field_vector = std::make_shared<ArrowFieldVector>(array); | ||
| field_vectors.push_back(field_vector); |
There was a problem hiding this comment.
The iterator always builds arrow::StringArray columns (arrow::StringBuilder) regardless of the field’s declared Arrow type. This will mismatch provided schemas (e.g., Int16Type) and make GetType()/GetValue() inconsistent with the schema. Build arrays using the field’s type (and parse CSV text into that type), or constrain the datasource to string-only schemas and enforce that contract.
| template <typename T> | ||
| auto CsvDatasourceIterator<T>::has_next() -> bool | ||
| { | ||
| return !file_.eof(); | ||
| } |
There was a problem hiding this comment.
has_next() uses !file_.eof(), which is a brittle way to detect remaining data (EOF is only set after a failed read, and other stream error states aren’t considered). Prefer checking file_.peek() != EOF, or attempt a read and buffer the next row so has_next() is accurate.
| TEST(Datasource, handle_missing_values) {} | ||
|
|
||
| TEST(Datasource, handle_different_column_types) {} |
There was a problem hiding this comment.
These two tests are empty bodies, so they don’t validate any behavior and will silently pass. Either implement them (especially since missing values / type parsing are core CSV datasource behaviors) or remove them until the functionality exists.
| TEST(Datasource, handle_missing_values) {} | |
| TEST(Datasource, handle_different_column_types) {} | |
| TEST(Datasource, handle_missing_values) | |
| { | |
| GTEST_SKIP() << "TODO: implement handling of missing values in CSV datasource."; | |
| } | |
| TEST(Datasource, handle_different_column_types) | |
| { | |
| GTEST_SKIP() << "TODO: implement handling of different column types in CSV datasource."; | |
| } |
| for (int i = 0; i < int(row_items.size()); i++) | ||
| { | ||
| try | ||
| { | ||
| columns.at(std::to_string(i)).push_back(row_items.at(i)); | ||
| } catch (const std::exception &e) | ||
| { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Missing/short rows are currently handled by catching columns.at(...) and continue, which results in different column vectors having different lengths. That can produce inconsistent RecordBatch semantics (and potential out-of-bounds on later access). Consider explicitly appending null/empty values for missing columns so all columns remain aligned by row.
| for (int i = 0; i < int(row_items.size()); i++) | |
| { | |
| try | |
| { | |
| columns.at(std::to_string(i)).push_back(row_items.at(i)); | |
| } catch (const std::exception &e) | |
| { | |
| continue; | |
| } | |
| // Ensure that every schema column receives a value (real or empty) | |
| // so that all column vectors remain aligned by row. | |
| int num_columns = static_cast<int>(columns.size()); | |
| for (int i = 0; i < num_columns; ++i) | |
| { | |
| auto key_it = columns.find(std::to_string(i)); | |
| if (key_it == columns.end()) | |
| { | |
| // Schema/metadata inconsistency: skip this index rather than throwing. | |
| continue; | |
| } | |
| if (i < static_cast<int>(row_items.size())) | |
| { | |
| key_it->second.push_back(row_items[static_cast<std::size_t>(i)]); | |
| } | |
| else | |
| { | |
| // Short row: append an empty value to keep column lengths aligned. | |
| key_it->second.push_back(String{}); | |
| } |
| std::string idx; | ||
| auto status = field->metadata()->Get("idx").Value(&idx); | ||
|
|
||
| auto const &items = columns.at(idx); | ||
|
|
||
| arrow::StringBuilder builder; | ||
| auto appen_status = builder.AppendValues(items); | ||
| std::shared_ptr<arrow::Array> array; | ||
| auto stat = builder.Finish().Value(&array); | ||
|
|
There was a problem hiding this comment.
Arrow builder results/statuses are computed but ignored (AppendValues, Finish, and earlier metadata Get). If any of these fail, the code will proceed with partially-built data and later throw/crash. Check and handle these statuses/results explicitly (propagate an error or throw with context).
Overview
closes jobala/cabin#30
closes jobala/cabin#31