11use std:: any:: Any ;
22use std:: collections:: HashMap ;
33use std:: pin:: Pin ;
4+ use std:: sync:: atomic:: { AtomicU16 , AtomicU64 , Ordering } ;
45use std:: sync:: { Arc , Mutex , MutexGuard } ;
5- use std:: sync:: atomic:: { AtomicU64 , Ordering , AtomicU16 } ;
66use std:: task:: { Context , Poll } ;
77
88use crate :: shm;
@@ -66,11 +66,21 @@ struct Entry {
6666
6767impl ScanRegistry {
6868 pub fn new ( ) -> Self {
69- Self { inner : Mutex :: new ( HashMap :: new ( ) ) , conn_id : 0 , next_id : AtomicU64 :: new ( 1 ) , next_slot : AtomicU16 :: new ( 0 ) }
69+ Self {
70+ inner : Mutex :: new ( HashMap :: new ( ) ) ,
71+ conn_id : 0 ,
72+ next_id : AtomicU64 :: new ( 1 ) ,
73+ next_slot : AtomicU16 :: new ( 0 ) ,
74+ }
7075 }
7176
7277 pub fn with_conn ( conn_id : usize ) -> Self {
73- Self { inner : Mutex :: new ( HashMap :: new ( ) ) , conn_id, next_id : AtomicU64 :: new ( 1 ) , next_slot : AtomicU16 :: new ( 0 ) }
78+ Self {
79+ inner : Mutex :: new ( HashMap :: new ( ) ) ,
80+ conn_id,
81+ next_id : AtomicU64 :: new ( 1 ) ,
82+ next_slot : AtomicU16 :: new ( 0 ) ,
83+ }
7484 }
7585
7686 #[ inline]
@@ -136,7 +146,9 @@ impl ScanRegistry {
136146 }
137147
138148 #[ inline]
139- pub fn conn_id ( & self ) -> usize { self . conn_id }
149+ pub fn conn_id ( & self ) -> usize {
150+ self . conn_id
151+ }
140152}
141153
142154// No global registry; registries are per-connection and owned by the server Storage.
@@ -204,7 +216,7 @@ impl TableProvider for PgTableProvider {
204216 scan_id,
205217 self . table_oid ,
206218 Arc :: clone ( & self . schema ) , // full schema for decoding
207- proj_schema, // physical output schema
219+ proj_schema, // physical output schema
208220 proj_indices,
209221 Arc :: clone ( & self . registry ) ,
210222 ) ;
@@ -215,7 +227,7 @@ impl TableProvider for PgTableProvider {
215227#[ derive( Debug ) ]
216228pub struct PgScanExec {
217229 // Full table schema (all columns) used to compute attribute metadata for decoding
218- full_schema : SchemaRef ,
230+ _full_schema : SchemaRef ,
219231 // Projected schema exposed by this plan node
220232 proj_schema : SchemaRef ,
221233 // Postgres table OID for this scan
@@ -248,7 +260,7 @@ impl PgScanExec {
248260 ) ;
249261 let attrs_full = Arc :: new ( attrs_from_schema ( & full_schema) ) ;
250262 Self {
251- full_schema,
263+ _full_schema : full_schema,
252264 proj_schema,
253265 table_oid,
254266 scan_id,
@@ -345,7 +357,13 @@ impl PgScanStream {
345357 rx : Option < mpsc:: Receiver < HeapBlock > > ,
346358 conn_id : usize ,
347359 ) -> Self {
348- Self { proj_schema, attrs_full, proj_indices, rx, conn_id }
360+ Self {
361+ proj_schema,
362+ attrs_full,
363+ proj_indices,
364+ rx,
365+ conn_id,
366+ }
349367 }
350368}
351369
@@ -362,7 +380,9 @@ impl Stream for PgScanStream {
362380 // Borrow page and (optional) visibility bitmap from shared memory (no copy)
363381 let page = unsafe { shm:: block_slice ( this. conn_id , block. slot_id as usize ) } ;
364382 let _vis = if block. vis_len > 0 {
365- Some ( unsafe { shm:: vis_slice ( this. conn_id , block. slot_id as usize , block. vis_len as usize ) } )
383+ Some ( unsafe {
384+ shm:: vis_slice ( this. conn_id , block. slot_id as usize , block. vis_len as usize )
385+ } )
366386 } else {
367387 None
368388 } ;
@@ -390,7 +410,7 @@ impl Stream for PgScanStream {
390410 }
391411 let pairs_len = pairs. len ( ) ;
392412 // Create iterator borrowing the filled pairs slice
393- let mut it = hp. tuples_by_offset ( None , std:: ptr:: null_mut ( ) , & mut pairs) ;
413+ let it = hp. tuples_by_offset ( None , std:: ptr:: null_mut ( ) , & mut pairs) ;
394414 tracing:: trace!(
395415 target = "executor::server" ,
396416 blkno = block. blkno,
@@ -401,7 +421,7 @@ impl Stream for PgScanStream {
401421 let page_hdr = unsafe { & * ( page. as_ptr ( ) as * const pg_sys:: PageHeaderData ) }
402422 as * const pg_sys:: PageHeaderData ;
403423 let mut decoded_rows = 0usize ;
404- while let Some ( tup) = it. next ( ) {
424+ for tup in it {
405425 // Decode projected columns for tuple using iterator over requested projection
406426 let iter = unsafe {
407427 decode_tuple_project (
@@ -415,11 +435,11 @@ impl Stream for PgScanStream {
415435 continue ;
416436 } ;
417437 // Iterate over projected columns in order
418- for col_idx in 0 .. total_cols {
438+ for b in builders . iter_mut ( ) . take ( total_cols) {
419439 match iter. next ( ) {
420- Some ( Ok ( v) ) => append_scalar ( & mut builders [ col_idx ] , v) ,
421- Some ( Err ( _e) ) => append_null ( & mut builders [ col_idx ] ) ,
422- None => append_null ( & mut builders [ col_idx ] ) ,
440+ Some ( Ok ( v) ) => append_scalar ( b , v) ,
441+ Some ( Err ( _e) ) => append_null ( b ) ,
442+ None => append_null ( b ) ,
423443 }
424444 }
425445 decoded_rows += 1 ;
@@ -434,10 +454,13 @@ impl Stream for PgScanStream {
434454 // Special case: empty projection — use row_count to communicate the number of rows
435455 let opts = RecordBatchOptions :: new ( ) . with_row_count ( Some ( decoded_rows) ) ;
436456 RecordBatch :: try_new_with_options ( Arc :: clone ( & this. proj_schema ) , vec ! [ ] , & opts)
437- . map_err ( |e| datafusion:: error:: DataFusionError :: Execution ( format ! ( "{e}" ) ) ) ?
457+ . map_err ( |e| {
458+ datafusion:: error:: DataFusionError :: Execution ( format ! ( "{e}" ) )
459+ } ) ?
438460 } else {
439- RecordBatch :: try_new ( Arc :: clone ( & this. proj_schema ) , arrs)
440- . map_err ( |e| datafusion:: error:: DataFusionError :: Execution ( format ! ( "{e}" ) ) ) ?
461+ RecordBatch :: try_new ( Arc :: clone ( & this. proj_schema ) , arrs) . map_err ( |e| {
462+ datafusion:: error:: DataFusionError :: Execution ( format ! ( "{e}" ) )
463+ } ) ?
441464 } ;
442465 tracing:: trace!(
443466 target = "executor::server" ,
0 commit comments