Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,516 changes: 705 additions & 811 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 12 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ semver = { version = "1.0.18", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_yaml = "0.9.34"
sqlparser = "0.58"
sqlparser = "0.59"
tempfile = "3.13.0"
thiserror = "2.0"
tokio = { version = "1.36.0", features = [
Expand All @@ -133,22 +133,24 @@ uuid = { version = "1.11.0", features = ["v7"] }
zstd = "0.13.3"

# Datafusion and Arrow crates
arrow = "56"
arrow-flight = { version = "56", features = ["flight-sql-experimental"] }
datafusion = { version = "50", features = ["serde"] }
datafusion-tracing = { version = "50" }
datafusion-datasource = { version = "50" }
arrow = "57"
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
datafusion = { version = "52", features = ["serde"] }
datafusion-tracing = { version = "52" }
datafusion-datasource = { version = "52" }
object_store = { version = "0.12", features = ["aws", "gcp", "azure"] }

# Crates that should follow the version used by DataFusion and Arrow
prost = "0.13.3"
prost-build = "0.13.3"
tonic = { version = "0.13", features = [
prost = "0.14.1"
prost-build = "0.14.1"
tonic = { version = "0.14", features = [
"transport",
"gzip",
"tls-native-roots",
] }
tonic-build = "0.13"
tonic-build = "0.14"
tonic-prost = "0.14"
tonic-prost-build = "0.14"

[profile.dev]
# Locally measured to reduce total wall clock time of `cargo build` by 34%
Expand Down
4 changes: 2 additions & 2 deletions crates/arrow-to-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ version.workspace = true
license-file.workspace = true

[dependencies]
arrow-array = { version = "56", default-features = false }
arrow-schema = { version = "56" }
arrow-array = { version = "57", default-features = false }
arrow-schema = { version = "57" }
bytes = "1.10.1"
enum_dispatch = "0.3.13"
serde_json = "1.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/bin/ampsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ default = []
amp-client = { path = "../../clients/flight", features = ["postgres"] }
anyhow.workspace = true
axum.workspace = true
arrow-array = { version = "56", default-features = false }
arrow-schema = { version = "56" }
arrow-array = { version = "57", default-features = false }
arrow-schema = { version = "57" }
arrow-to-postgres = { path = "../../arrow-to-postgres" }
backon.workspace = true
bytes = "1.10.1"
Expand Down
14 changes: 8 additions & 6 deletions crates/core/common/src/catalog/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,19 @@ impl TableProvider for TableSnapshot {

let parquet_file_reader_factory = Arc::clone(&self.reader_factory);
let table_parquet_options = state.table_options().parquet.clone();
let file_source = ParquetSource::new(table_parquet_options)
.with_parquet_file_reader_factory(parquet_file_reader_factory)
.with_predicate(predicate)
.into();
let file_source = Arc::new(
ParquetSource::new(table_schema)
.with_table_parquet_options(table_parquet_options)
.with_parquet_file_reader_factory(parquet_file_reader_factory)
.with_predicate(predicate),
);

let data_source = Arc::new(
FileScanConfigBuilder::new(object_store_url, table_schema, file_source)
FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(file_groups)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_projection(projection.cloned())
.with_projection_indices(projection.cloned())?
.with_statistics(statistics)
.build(),
);
Expand Down
12 changes: 7 additions & 5 deletions crates/core/common/src/catalog/physical/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use amp_data_store::{CachedParquetData, DataStore};
use bytes::Bytes;
use datafusion::{
arrow::datatypes::SchemaRef,
datasource::physical_plan::{FileMeta, ParquetFileMetrics, ParquetFileReaderFactory},
datasource::physical_plan::{ParquetFileMetrics, ParquetFileReaderFactory},
error::{DataFusionError, Result as DataFusionResult},
parquet::{
arrow::{
Expand All @@ -16,6 +16,7 @@ use datafusion::{
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use datafusion_datasource::PartitionedFile;
use futures::future::BoxFuture;
use metadata_db::{LocationId, files::FileId};

Expand All @@ -41,18 +42,19 @@ impl ParquetFileReaderFactory for AmpReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> DataFusionResult<Box<dyn AsyncFileReader + Send>> {
let path = file_meta.location();
let file_meta = &partitioned_file.object_meta;
let path = &file_meta.location;
let file_metrics = ParquetFileMetrics::new(partition_index, path.as_ref(), metrics);
let inner = self
.store
.create_file_reader_from_path(path.clone())
.with_file_size(file_meta.object_meta.size);
.with_file_size(file_meta.size);
let location_id = self.location_id;
let file_id = file_meta
let file_id = partitioned_file
.extensions
.ok_or(DataFusionError::Execution(format!(
"FileMeta missing extensions for location_id: {}",
Expand Down
8 changes: 4 additions & 4 deletions crates/core/common/src/evm/udfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ fn append_sol_value_to_builder(
.append_value(i64::try_from(s)?),
n if n <= DEC_128_MAX_BINARY_PREC => {
let val = i128::try_from(s)?;
validate_decimal_precision(val, DEC128_PREC)?;
validate_decimal_precision(val, DEC128_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
Expand All @@ -406,7 +406,7 @@ fn append_sol_value_to_builder(
}
n if n <= DEC_256_MAX_BINARY_PREC => {
let val = i256::from_le_bytes(s.to_le_bytes());
validate_decimal256_precision(val, DEC256_PREC)?;
validate_decimal256_precision(val, DEC256_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal256Builder>()
Expand Down Expand Up @@ -455,7 +455,7 @@ fn append_sol_value_to_builder(
.append_value(u64::try_from(u)?),
n if n <= DEC_128_MAX_BINARY_PREC => {
let val = i128::try_from(u)?;
validate_decimal_precision(val, DEC128_PREC)?;
validate_decimal_precision(val, DEC128_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
Expand All @@ -466,7 +466,7 @@ fn append_sol_value_to_builder(
}
n if n <= DEC_256_MAX_BINARY_PREC => {
let val = i256::from_le_bytes(u.to_le_bytes());
validate_decimal256_precision(val, DEC256_PREC)?;
validate_decimal256_precision(val, DEC256_PREC, 0)?;
let builder = builder
.as_any_mut()
.downcast_mut::<Decimal256Builder>()
Expand Down
6 changes: 6 additions & 0 deletions crates/core/common/src/evm/udfs/eth_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ impl EthCall {
DataType::Utf8,
]),
volatility: Volatility::Volatile,
parameter_names: Some(vec![
"from".to_string(),
"to".to_string(),
"input_data".to_string(),
"block".to_string(),
]),
},
fields: Fields::from_iter([
Field::new("data", DataType::Binary, true),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/common/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ fn all_function_refs(stmt: &Statement) -> Result<Vec<Vec<String>>, AllFunctionNa
};
let stmt = match stmt {
Statement::Statement(statement) => statement,
Statement::CreateExternalTable(_) | Statement::CopyTo(_) => {
Statement::CreateExternalTable(_) | Statement::CopyTo(_) | Statement::Reset(_) => {
return Err(AllFunctionNamesError::DmlNotSupported);
}
Statement::Explain(explain) => match explain.statement.as_ref() {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/dump/src/compaction/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common::{
},
};
use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
datasource::{listing::PartitionedFile, physical_plan::ParquetFileReaderFactory},
error::DataFusionError,
execution::SendableRecordBatchStream,
physical_plan::{metrics::ExecutionPlanMetricsSet, stream::RecordBatchStreamAdapter},
Expand Down Expand Up @@ -53,7 +53,7 @@ impl CompactionFile {
let file_id = segment.id;
let range = segment.range.clone();

let mut file_meta = FileMeta::from(segment.object.clone());
let mut file_meta = PartitionedFile::from(segment.object.clone());

file_meta.extensions = Some(Arc::new(file_id));

Expand Down
7 changes: 4 additions & 3 deletions crates/core/dump/src/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use common::{
},
},
parquet::{
arrow::AsyncArrowWriter, errors::ParquetError,
file::properties::WriterProperties as ParquetWriterProperties, format::KeyValue,
arrow::AsyncArrowWriter,
errors::ParquetError,
file::{metadata::KeyValue, properties::WriterProperties as ParquetWriterProperties},
},
};
use metadata_db::{
Expand Down Expand Up @@ -143,7 +144,7 @@ impl ParquetFileWriter {
self.filename,
range.start(),
range.end(),
meta.num_rows,
meta.file_metadata().num_rows(),
);

let object_meta = self
Expand Down
2 changes: 2 additions & 0 deletions crates/core/js-runtime/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ impl ToV8 for ScalarValue {
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_)
| ScalarValue::Union(_, _, _)
| ScalarValue::Decimal32(_, _, _)
| ScalarValue::Decimal64(_, _, _)
| ScalarValue::Dictionary(_, _) => Err(BoxError::from(format!(
"{} not yet supported in functions",
self.data_type()
Expand Down
1 change: 1 addition & 0 deletions crates/core/js-runtime/src/js_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl JsUdf {
let signature = Signature {
type_signature: TypeSignature::Exact(input_types),
volatility: Volatility::Immutable,
parameter_names: None,
};

// Create UDF name based on whether schema is provided
Expand Down
3 changes: 2 additions & 1 deletion crates/extractors/firehose/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde_json = { workspace = true, optional = true }
thiserror.workspace = true
tokio.workspace = true
tonic.workspace = true
tonic-prost.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand All @@ -31,7 +32,7 @@ serde_json.workspace = true
# These dependencies are only included when the gen_proto cfg flag is enabled
[target.'cfg(gen_proto)'.build-dependencies]
prost-build = { workspace = true }
tonic-build = { workspace = true }
tonic-prost-build = { workspace = true }

[lints.rust]
# Allow the gen_proto cfg flag used for conditional protobuf code generation
Expand Down
4 changes: 2 additions & 2 deletions crates/extractors/firehose/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// These comments break doc tests, so we disable them.
prost_config.disable_comments(&["google.protobuf.Timestamp", "google.protobuf.Any"]);

let config = tonic_build::configure()
let config = tonic_prost_build::configure()
.build_server(false)
.out_dir("src/proto");

Expand All @@ -22,7 +22,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
.emit_rerun_if_changed(false); // See https://github.com/hyperium/tonic/issues/1070

config.compile_protos_with_config(
config.compile_with_config(
prost_config,
&["proto/firehose.proto", "proto/ethereum.proto"],
&[""],
Expand Down
1 change: 0 additions & 1 deletion crates/extractors/firehose/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ pub struct AuthInterceptor {
}

impl AuthInterceptor {
#[expect(clippy::result_large_err)]
pub fn new(token: Option<String>) -> Result<Self, Error> {
Ok(AuthInterceptor {
token: token
Expand Down
1 change: 0 additions & 1 deletion crates/extractors/firehose/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod dataset;
mod dataset_kind;
pub mod evm;
pub mod metrics;
#[expect(clippy::doc_overindented_list_items)]
#[expect(clippy::enum_variant_names)]
mod proto;

Expand Down
17 changes: 8 additions & 9 deletions crates/extractors/firehose/src/proto/google.protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is @generated by prost-build.
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Clone, PartialEq, ::prost::Message)]
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Any {
/// A URL/resource name that uniquely identifies the type of the serialized
/// protocol buffer message. This string must contain at least
Expand All @@ -15,13 +15,13 @@ pub struct Any {
/// server that maps type URLs to message definitions as follows:
///
/// * If no scheme is provided, `https` is assumed.
/// * An HTTP GET on the URL must yield a [google.protobuf.Type][]
/// value in binary format, or produce an error.
/// * An HTTP GET on the URL must yield a \[google.protobuf.Type\]\[\]
/// value in binary format, or produce an error.
/// * Applications are allowed to cache lookup results based on the
/// URL, or have them precompiled into a binary to avoid any
/// lookup. Therefore, binary compatibility needs to be preserved
/// on changes to types. (Use versioned type names to manage
/// breaking changes.)
/// URL, or have them precompiled into a binary to avoid any
/// lookup. Therefore, binary compatibility needs to be preserved
/// on changes to types. (Use versioned type names to manage
/// breaking changes.)
///
/// Note: this functionality is not currently available in the official
/// protobuf release, and it is not used for type URLs beginning with
Expand All @@ -30,15 +30,14 @@ pub struct Any {
///
/// Schemes other than `http`, `https` (or the empty scheme) might be
/// used with implementation specific semantics.
///
#[prost(string, tag = "1")]
pub type_url: ::prost::alloc::string::String,
/// Must be a valid serialized protocol buffer of the above specified type.
#[prost(bytes = "vec", tag = "2")]
pub value: ::prost::alloc::vec::Vec<u8>,
}
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Timestamp {
/// Represents seconds of UTC time since Unix epoch
/// 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to
Expand Down
Loading