Skip to content

Commit dc4e3ab

Browse files
nuno-fariamartin-galamb
authored
feat: Implement the statistics_cache function (#19054)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18953. ## Rationale for this change Allow a way to check the contents of the file statistics cache. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Added the `statistics_cache` function to `datafusion-cli`. - Converted `FileStatisticsCache` to a trait and implemented the `list_entries` method. - Added unit tests. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes, `FileStatisticsCache` has been changed to a trait. Previous implementations need to implement the `list_entries` method. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Martin Grigorov <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent e8a0829 commit dc4e3ab

File tree

6 files changed

+339
-13
lines changed

6 files changed

+339
-13
lines changed

datafusion-cli/src/functions.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -581,3 +581,119 @@ impl TableFunctionImpl for MetadataCacheFunc {
581581
Ok(Arc::new(metadata_cache))
582582
}
583583
}
584+
585+
/// STATISTICS_CACHE table function
586+
#[derive(Debug)]
587+
struct StatisticsCacheTable {
588+
schema: SchemaRef,
589+
batch: RecordBatch,
590+
}
591+
592+
#[async_trait]
593+
impl TableProvider for StatisticsCacheTable {
594+
fn as_any(&self) -> &dyn std::any::Any {
595+
self
596+
}
597+
598+
fn schema(&self) -> arrow::datatypes::SchemaRef {
599+
self.schema.clone()
600+
}
601+
602+
fn table_type(&self) -> datafusion::logical_expr::TableType {
603+
datafusion::logical_expr::TableType::Base
604+
}
605+
606+
async fn scan(
607+
&self,
608+
_state: &dyn Session,
609+
projection: Option<&Vec<usize>>,
610+
_filters: &[Expr],
611+
_limit: Option<usize>,
612+
) -> Result<Arc<dyn ExecutionPlan>> {
613+
Ok(MemorySourceConfig::try_new_exec(
614+
&[vec![self.batch.clone()]],
615+
TableProvider::schema(self),
616+
projection.cloned(),
617+
)?)
618+
}
619+
}
620+
621+
#[derive(Debug)]
622+
pub struct StatisticsCacheFunc {
623+
cache_manager: Arc<CacheManager>,
624+
}
625+
626+
impl StatisticsCacheFunc {
627+
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
628+
Self { cache_manager }
629+
}
630+
}
631+
632+
impl TableFunctionImpl for StatisticsCacheFunc {
633+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
634+
if !exprs.is_empty() {
635+
return plan_err!("statistics_cache should have no arguments");
636+
}
637+
638+
let schema = Arc::new(Schema::new(vec![
639+
Field::new("path", DataType::Utf8, false),
640+
Field::new(
641+
"file_modified",
642+
DataType::Timestamp(TimeUnit::Millisecond, None),
643+
false,
644+
),
645+
Field::new("file_size_bytes", DataType::UInt64, false),
646+
Field::new("e_tag", DataType::Utf8, true),
647+
Field::new("version", DataType::Utf8, true),
648+
Field::new("num_rows", DataType::Utf8, false),
649+
Field::new("num_columns", DataType::UInt64, false),
650+
Field::new("table_size_bytes", DataType::Utf8, false),
651+
Field::new("statistics_size_bytes", DataType::UInt64, false),
652+
]));
653+
654+
// construct record batch from metadata
655+
let mut path_arr = vec![];
656+
let mut file_modified_arr = vec![];
657+
let mut file_size_bytes_arr = vec![];
658+
let mut e_tag_arr = vec![];
659+
let mut version_arr = vec![];
660+
let mut num_rows_arr = vec![];
661+
let mut num_columns_arr = vec![];
662+
let mut table_size_bytes_arr = vec![];
663+
let mut statistics_size_bytes_arr = vec![];
664+
665+
if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
666+
{
667+
for (path, entry) in file_statistics_cache.list_entries() {
668+
path_arr.push(path.to_string());
669+
file_modified_arr
670+
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
671+
file_size_bytes_arr.push(entry.object_meta.size);
672+
e_tag_arr.push(entry.object_meta.e_tag);
673+
version_arr.push(entry.object_meta.version);
674+
num_rows_arr.push(entry.num_rows.to_string());
675+
num_columns_arr.push(entry.num_columns as u64);
676+
table_size_bytes_arr.push(entry.table_size_bytes.to_string());
677+
statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
678+
}
679+
}
680+
681+
let batch = RecordBatch::try_new(
682+
schema.clone(),
683+
vec![
684+
Arc::new(StringArray::from(path_arr)),
685+
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
686+
Arc::new(UInt64Array::from(file_size_bytes_arr)),
687+
Arc::new(StringArray::from(e_tag_arr)),
688+
Arc::new(StringArray::from(version_arr)),
689+
Arc::new(StringArray::from(num_rows_arr)),
690+
Arc::new(UInt64Array::from(num_columns_arr)),
691+
Arc::new(StringArray::from(table_size_bytes_arr)),
692+
Arc::new(UInt64Array::from(statistics_size_bytes_arr)),
693+
],
694+
)?;
695+
696+
let statistics_cache = StatisticsCacheTable { schema, batch };
697+
Ok(Arc::new(statistics_cache))
698+
}
699+
}

datafusion-cli/src/main.rs

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3131
use datafusion::logical_expr::ExplainFormat;
3232
use datafusion::prelude::SessionContext;
3333
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
34-
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
34+
use datafusion_cli::functions::{
35+
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
36+
};
3537
use datafusion_cli::object_storage::instrumented::{
3638
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
3739
};
@@ -244,6 +246,14 @@ async fn main_inner() -> Result<()> {
244246
)),
245247
);
246248

249+
// register `statistics_cache` table function to get the contents of the file statistics cache
250+
ctx.register_udtf(
251+
"statistics_cache",
252+
Arc::new(StatisticsCacheFunc::new(
253+
ctx.task_ctx().runtime_env().cache_manager.clone(),
254+
)),
255+
);
256+
247257
let mut print_options = PrintOptions {
248258
format: args.format,
249259
quiet: args.quiet,
@@ -423,7 +433,13 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
423433
#[cfg(test)]
424434
mod tests {
425435
use super::*;
426-
use datafusion::{common::test_util::batches_to_string, prelude::ParquetReadOptions};
436+
use datafusion::{
437+
common::test_util::batches_to_string,
438+
execution::cache::{
439+
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
440+
},
441+
prelude::ParquetReadOptions,
442+
};
427443
use insta::assert_snapshot;
428444

429445
fn assert_conversion(input: &str, expected: Result<usize, String>) {
@@ -631,4 +647,102 @@ mod tests {
631647

632648
Ok(())
633649
}
650+
651+
/// Shows that the statistics cache is not enabled by default yet
652+
/// See https://github.com/apache/datafusion/issues/19217
653+
#[tokio::test]
654+
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
655+
let ctx = SessionContext::new();
656+
657+
ctx.register_udtf(
658+
"statistics_cache",
659+
Arc::new(StatisticsCacheFunc::new(
660+
ctx.task_ctx().runtime_env().cache_manager.clone(),
661+
)),
662+
);
663+
664+
for filename in [
665+
"alltypes_plain",
666+
"alltypes_tiny_pages",
667+
"lz4_raw_compressed_larger",
668+
] {
669+
ctx.sql(
670+
format!(
671+
"create external table {filename}
672+
stored as parquet
673+
location '../parquet-testing/data/{filename}.parquet'",
674+
)
675+
.as_str(),
676+
)
677+
.await?
678+
.collect()
679+
.await?;
680+
}
681+
682+
// When the cache manager creates a StatisticsCache by default,
683+
// the contents will show up here
684+
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
685+
let df = ctx.sql(sql).await?;
686+
let rbs = df.collect().await?;
687+
assert_snapshot!(batches_to_string(&rbs),@r"
688+
++
689+
++
690+
");
691+
692+
Ok(())
693+
}
694+
695+
// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
696+
#[tokio::test]
697+
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
698+
// Install a specific StatisticsCache implementation
699+
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
700+
let cache_config = CacheManagerConfig::default()
701+
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
702+
let runtime = RuntimeEnvBuilder::new()
703+
.with_cache_manager(cache_config)
704+
.build()?;
705+
let config = SessionConfig::new().with_collect_statistics(true);
706+
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));
707+
708+
ctx.register_udtf(
709+
"statistics_cache",
710+
Arc::new(StatisticsCacheFunc::new(
711+
ctx.task_ctx().runtime_env().cache_manager.clone(),
712+
)),
713+
);
714+
715+
for filename in [
716+
"alltypes_plain",
717+
"alltypes_tiny_pages",
718+
"lz4_raw_compressed_larger",
719+
] {
720+
ctx.sql(
721+
format!(
722+
"create external table {filename}
723+
stored as parquet
724+
location '../parquet-testing/data/{filename}.parquet'",
725+
)
726+
.as_str(),
727+
)
728+
.await?
729+
.collect()
730+
.await?;
731+
}
732+
733+
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
734+
let df = ctx.sql(sql).await?;
735+
let rbs = df.collect().await?;
736+
assert_snapshot!(batches_to_string(&rbs),@r"
737+
+-----------------------------------+-----------------+--------------+-------------+------------------+
738+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
739+
+-----------------------------------+-----------------+--------------+-------------+------------------+
740+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent |
741+
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent |
742+
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent |
743+
+-----------------------------------+-----------------+--------------+-------------+------------------+
744+
");
745+
746+
Ok(())
747+
}
634748
}

datafusion/catalog-listing/src/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ pub struct ListingTable {
178178
/// The SQL definition for this table, if any
179179
definition: Option<String>,
180180
/// Cache for collected file statistics
181-
collected_statistics: FileStatisticsCache,
181+
collected_statistics: Arc<dyn FileStatisticsCache>,
182182
/// Constraints applied to this table
183183
constraints: Constraints,
184184
/// Column default expressions for columns that are not physically present in the data files
@@ -255,7 +255,7 @@ impl ListingTable {
255255
/// multiple times in the same session.
256256
///
257257
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
258-
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
258+
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
259259
self.collected_statistics =
260260
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
261261
self

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
// under the License.
1717

1818
use crate::cache::CacheAccessor;
19-
use crate::cache::DefaultFilesMetadataCache;
19+
use crate::cache::cache_unit::DefaultFilesMetadataCache;
20+
use datafusion_common::stats::Precision;
2021
use datafusion_common::{Result, Statistics};
2122
use object_store::ObjectMeta;
2223
use object_store::path::Path;
@@ -35,8 +36,27 @@ use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
3536
/// session lifetime.
3637
///
3738
/// See [`crate::runtime_env::RuntimeEnv`] for more details
38-
pub type FileStatisticsCache =
39-
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
39+
pub trait FileStatisticsCache:
40+
CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
41+
{
42+
/// Retrieves the information about the entries currently cached.
43+
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
44+
}
45+
46+
/// Represents information about a cached statistics entry.
47+
/// This is used to expose the statistics cache contents to outside modules.
48+
#[derive(Debug, Clone, PartialEq, Eq)]
49+
pub struct FileStatisticsCacheEntry {
50+
pub object_meta: ObjectMeta,
51+
/// Number of table rows.
52+
pub num_rows: Precision<usize>,
53+
/// Number of table columns.
54+
pub num_columns: usize,
55+
/// Total table size, in bytes.
56+
pub table_size_bytes: Precision<usize>,
57+
/// Size of the statistics entry, in bytes.
58+
pub statistics_size_bytes: usize,
59+
}
4060

4161
/// Cache for storing the [`ObjectMeta`]s that result from listing a path
4262
///
@@ -116,7 +136,7 @@ pub struct FileMetadataCacheEntry {
116136
pub extra: HashMap<String, String>,
117137
}
118138

119-
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
139+
impl Debug for dyn FileStatisticsCache {
120140
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
121141
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
122142
}
@@ -143,7 +163,7 @@ impl Debug for dyn FileMetadataCache {
143163
/// See [`CacheManagerConfig`] for configuration options.
144164
#[derive(Debug)]
145165
pub struct CacheManager {
146-
file_statistic_cache: Option<FileStatisticsCache>,
166+
file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
147167
list_files_cache: Option<Arc<dyn ListFilesCache>>,
148168
file_metadata_cache: Arc<dyn FileMetadataCache>,
149169
}
@@ -174,7 +194,7 @@ impl CacheManager {
174194
}
175195

176196
/// Get the cache of listing files statistics.
177-
pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
197+
pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
178198
self.file_statistic_cache.clone()
179199
}
180200

@@ -213,7 +233,7 @@ pub struct CacheManagerConfig {
213233
/// Enable caching of file statistics when listing files.
214234
/// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
215235
/// Default is disabled. Currently only Parquet files are supported.
216-
pub table_files_statistics_cache: Option<FileStatisticsCache>,
236+
pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
217237
/// Enable caching of file metadata when listing files.
218238
/// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
219239
/// expensive in certain situations (e.g. remote object storage), for objects under paths that
@@ -255,7 +275,7 @@ impl CacheManagerConfig {
255275
/// Default is `None` (disabled).
256276
pub fn with_files_statistics_cache(
257277
mut self,
258-
cache: Option<FileStatisticsCache>,
278+
cache: Option<Arc<dyn FileStatisticsCache>>,
259279
) -> Self {
260280
self.table_files_statistics_cache = cache;
261281
self

0 commit comments

Comments
 (0)