Skip to content

Commit 0c25493

Browse files
committed
fix: hanging queries
1 parent e5c0cd7 commit 0c25493

File tree

8 files changed

+577
-189
lines changed

8 files changed

+577
-189
lines changed

executor/src/ipc.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,17 @@ pub async fn signal_listener(state: Arc<SharedState<'_>>) {
3030
let mut signal = signal(SignalKind::user_defined1()).expect("failed to create signal");
3131
while signal.recv().await.is_some() {
3232
// Notify affected sockets that the signal has been received.
33+
let mut woke = 0usize;
3334
for i in 0..state.size {
3435
if state.flags[i].load(Ordering::Acquire) {
3536
state.wakers[i].wake();
37+
woke += 1;
38+
tracing::trace!(target = "executor::ipc", conn_id = i, "signal_listener: woke socket");
3639
}
3740
}
41+
if woke == 0 {
42+
tracing::trace!(target = "executor::ipc", "signal_listener: signal received but no flags set");
43+
}
3844
}
3945
}
4046

executor/src/pgscan.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub struct HeapBlock {
5151
#[derive(Debug, Default)]
5252
pub struct ScanRegistry {
5353
inner: Mutex<HashMap<ScanId, Entry>>,
54+
conn_id: usize,
5455
}
5556

5657
#[derive(Debug)]
@@ -60,9 +61,9 @@ struct Entry {
6061
}
6162

6263
impl ScanRegistry {
63-
pub fn new() -> Self {
64-
Self::default()
65-
}
64+
pub fn new() -> Self { Self { inner: Mutex::new(HashMap::new()), conn_id: 0 } }
65+
66+
pub fn with_conn(conn_id: usize) -> Self { Self { inner: Mutex::new(HashMap::new()), conn_id } }
6667

6768
#[inline]
6869
fn lock(&self) -> MutexGuard<HashMap<ScanId, Entry>> {
@@ -111,6 +112,9 @@ impl ScanRegistry {
111112
let mut map = self.lock();
112113
let _ = map.remove(&scan_id);
113114
}
115+
116+
#[inline]
117+
pub fn conn_id(&self) -> usize { self.conn_id }
114118
}
115119

116120
// No global registry; registries are per-connection and owned by the server Storage.
@@ -249,7 +253,9 @@ impl ExecutionPlan for PgScanExec {
249253
_ctx: Arc<TaskContext>,
250254
) -> DFResult<SendableRecordBatchStream> {
251255
let rx = self.registry.take_receiver(self.scan_id);
252-
let stream = PgScanStream::new(Arc::clone(&self.schema), Arc::clone(&self.attrs), rx);
256+
// Use connection id stored in registry to address per-connection slot buffers
257+
let conn_id = self.registry.conn_id();
258+
let stream = PgScanStream::new(Arc::clone(&self.schema), Arc::clone(&self.attrs), rx, conn_id);
253259
Ok(Box::pin(stream))
254260
}
255261

@@ -262,15 +268,17 @@ pub struct PgScanStream {
262268
schema: SchemaRef,
263269
attrs: Arc<Vec<PgAttrMeta>>,
264270
rx: Option<mpsc::Receiver<HeapBlock>>,
271+
conn_id: usize,
265272
}
266273

267274
impl PgScanStream {
268275
pub fn new(
269276
schema: SchemaRef,
270277
attrs: Arc<Vec<PgAttrMeta>>,
271278
rx: Option<mpsc::Receiver<HeapBlock>>,
279+
conn_id: usize,
272280
) -> Self {
273-
Self { schema, attrs, rx }
281+
Self { schema, attrs, rx, conn_id }
274282
}
275283
}
276284

@@ -285,9 +293,9 @@ impl Stream for PgScanStream {
285293
match rx.poll_recv(cx) {
286294
Poll::Ready(Some(block)) => {
287295
// Borrow page and (optional) visibility bitmap from shared memory (no copy)
288-
let page = unsafe { shm::block_slice(block.slot_id as usize) };
296+
let page = unsafe { shm::block_slice(this.conn_id, block.slot_id as usize) };
289297
let _vis = if block.vis_len > 0 {
290-
Some(unsafe { shm::vis_slice(block.slot_id as usize, block.vis_len as usize) })
298+
Some(unsafe { shm::vis_slice(this.conn_id, block.slot_id as usize, block.vis_len as usize) })
291299
} else {
292300
None
293301
};
@@ -309,9 +317,23 @@ impl Stream for PgScanStream {
309317
.map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?;
310318
// Use tuples_by_offset to iterate LP_NORMAL tuples in page order
311319
let mut pairs: Vec<(u16, u16)> = Vec::new();
320+
// Pre-scan to populate pairs and log LP_NORMAL count
321+
{
322+
let _ = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
323+
}
324+
let pairs_len = pairs.len();
325+
// Create iterator borrowing the filled pairs slice
312326
let mut it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
327+
tracing::trace!(
328+
target = "executor::server",
329+
blkno = block.blkno,
330+
num_offsets = block.num_offsets,
331+
lp_normal = pairs_len,
332+
"pgscan: tuples_by_offset summary"
333+
);
313334
let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) }
314335
as *const pg_sys::PageHeaderData;
336+
let mut decoded_rows = 0usize;
315337
while let Some(tup) = it.next() {
316338
// Decode projected columns for tuple using iterator over all columns
317339
let iter =
@@ -326,6 +348,7 @@ impl Stream for PgScanStream {
326348
None => append_null(&mut builders[col_idx]),
327349
}
328350
}
351+
decoded_rows += 1;
329352
}
330353

331354
// Build Arrow arrays and RecordBatch
@@ -335,6 +358,12 @@ impl Stream for PgScanStream {
335358
}
336359
let rb = RecordBatch::try_new(Arc::clone(&this.schema), arrs)
337360
.map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?;
361+
tracing::trace!(
362+
target = "executor::server",
363+
rows = decoded_rows,
364+
blkno = block.blkno,
365+
"pgscan: decoded rows"
366+
);
338367
Poll::Ready(Some(Ok(rb)))
339368
}
340369
Poll::Ready(None) => Poll::Ready(None),

executor/src/server.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ impl Storage {
5757
*self = Self::default();
5858
self.registry = registry;
5959
}
60+
61+
pub fn set_registry_conn(&mut self, conn_id: usize) {
62+
self.registry = Arc::new(crate::pgscan::ScanRegistry::with_conn(conn_id));
63+
}
6064
}
6165

6266
pub struct Connection<'bytes> {
@@ -87,6 +91,11 @@ impl<'bytes> Connection<'bytes> {
8791
}
8892

8993
pub async fn poll(&mut self) -> Result<()> {
94+
let pending = self.recv_socket.buffer.len();
95+
if pending > 0 {
96+
trace!("poll: data already available: {} bytes", pending);
97+
return Ok(());
98+
}
9099
trace!("poll: waiting for socket signal");
91100
(&mut self.recv_socket).await?;
92101
trace!(
@@ -135,6 +144,13 @@ impl<'bytes> Connection<'bytes> {
135144
}
136145

137146
pub async fn process_message(&mut self, storage: &mut Storage) -> Result<()> {
147+
// Guard against spurious wakeups where no bytes are available yet.
148+
let pending = self.recv_socket.buffer.len();
149+
if pending < 6 {
150+
// Header is at least 6 bytes (3 fixints + u16)
151+
trace!("process_message: woke but no header bytes available",);
152+
return Ok(());
153+
}
138154
let header = consume_header(&mut self.recv_socket.buffer)?;
139155
debug!(
140156
direction = ?header.direction,
@@ -152,6 +168,8 @@ impl<'bytes> Connection<'bytes> {
152168
// Column layout arrives during planning/prepare; store it for later encoding use.
153169
if header.tag == ControlPacket::ColumnLayout as u8 {
154170
let attrs = consume_column_layout(&mut self.recv_socket.buffer)?;
171+
let len = attrs.len();
172+
tracing::trace!(target = "executor::server", pg_attrs = len, "column layout received");
155173
storage.pg_attrs = Some(attrs);
156174
return Ok(());
157175
}
@@ -162,6 +180,16 @@ impl<'bytes> Connection<'bytes> {
162180
// Read metadata; we expect the visibility bitmap to be stored in shared memory.
163181
let meta =
164182
read_heap_block_bitmap_meta(&mut self.recv_socket.buffer, header.length)?;
183+
trace!(
184+
target = "executor::server",
185+
scan_id = meta.scan_id,
186+
slot_id = meta.slot_id,
187+
table_oid = meta.table_oid,
188+
blkno = meta.blkno,
189+
num_offsets = meta.num_offsets,
190+
bitmap_inline_len = meta.bitmap_len,
191+
"heap bitmap meta received"
192+
);
165193
// If payload included inline bitmap bytes (legacy), drain them without allocation.
166194
let mut remaining = meta.bitmap_len as usize;
167195
while remaining > 0 {
@@ -172,6 +200,13 @@ impl<'bytes> Connection<'bytes> {
172200
}
173201
// Trailing u16 carries the visibility bitmap length in shared memory
174202
let vis_len = read_u16_msgpack(&mut self.recv_socket.buffer)?;
203+
trace!(
204+
target = "executor::server",
205+
scan_id = meta.scan_id,
206+
slot_id = meta.slot_id,
207+
vis_len,
208+
"heap bitmap vis_len (shm) received"
209+
);
175210
if let Some(tx) = storage.registry.sender(meta.scan_id as u64) {
176211
let block = crate::pgscan::HeapBlock {
177212
scan_id: meta.scan_id as u64,
@@ -190,6 +225,13 @@ impl<'bytes> Connection<'bytes> {
190225
);
191226
}
192227
// Request the next heap block for this scan using the same slot
228+
trace!(
229+
target = "executor::server",
230+
scan_id = meta.scan_id,
231+
table_oid = meta.table_oid,
232+
slot_id = meta.slot_id,
233+
"requesting next heap block"
234+
);
193235
if let Err(e) = request_heap_block(
194236
&mut self.send_buffer,
195237
meta.scan_id,
@@ -411,6 +453,13 @@ fn encode_and_write_rows(
411453
let rows = batch.num_rows();
412454
let cols = std::cmp::min(batch.num_columns(), attrs.len());
413455
if cols == 0 || rows == 0 {
456+
tracing::trace!(
457+
target = "executor::server",
458+
rows,
459+
cols,
460+
attrs = attrs.len(),
461+
"execution: empty batch or no attrs; skipping write"
462+
);
414463
return 0;
415464
}
416465
// Working buffers reused across rows to minimize allocations
@@ -554,9 +603,11 @@ fn open_data_flow(_conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
554603
let phys = storage.physical_plan.as_ref().expect("checked above");
555604
// Reserve capacity to avoid HashMap reallocation while registering
556605
let scan_count = count_scans(phys) as usize;
606+
trace!(scan_count, "open_data_flow: reserving scan registry capacity");
557607
storage.registry.reserve(scan_count);
558608
// Register channels per scan in the connection-local registry; no response payload
559-
let _ = for_each_scan::<_, Error>(phys, |id, _table_oid| {
609+
let _ = for_each_scan::<_, Error>(phys, |id, table_oid| {
610+
trace!(scan_id = id, table_oid, "open_data_flow: registering scan channel");
560611
let _ = storage.registry.register(id, 16);
561612
Ok(())
562613
});
@@ -578,6 +629,7 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
578629

579630
let plan = Arc::clone(storage.physical_plan.as_ref().expect("checked above"));
580631
let pg_attrs = storage.pg_attrs.clone().unwrap_or_default();
632+
tracing::trace!(target = "executor::server", pg_attrs = pg_attrs.len(), "start_data_flow: attrs snapshot");
581633
// Build a fresh TaskContext for execution
582634
let state = SessionStateBuilder::new().build();
583635
let ctx = state.task_ctx();
@@ -586,6 +638,7 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
586638
let client_pid_val = conn
587639
.client_pid
588640
.load(std::sync::atomic::Ordering::Relaxed); // snapshot pid
641+
trace!(conn_id, "start_data_flow: spawning execution task");
589642
let handle = tokio::spawn(async move {
590643
// Execute a single partition; PgScanExec uses a single stream
591644
match plan.execute(0, ctx) {
@@ -606,6 +659,12 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
606659
continue;
607660
}
608661
let wrote = encode_and_write_rows(&batch, &pg_attrs, &mut ring);
662+
tracing::trace!(
663+
target = "executor::server",
664+
rows = wrote,
665+
cols = batch.num_columns(),
666+
"execution: batch encoded and written"
667+
);
609668
if wrote > 0 {
610669
let pid = client_pid_val;
611670
if pid > 0 && pid != i32::MAX {
@@ -618,6 +677,11 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
618677
tracing::trace!(target = "pg_fusion::server", "execution stream completed");
619678
// Write EOF sentinel row to result ring to unblock backend
620679
let _ = protocol::result::write_eof(&mut ring);
680+
// Nudge the backend in case it is waiting on latch
681+
let pid = client_pid_val;
682+
if pid > 0 && pid != i32::MAX {
683+
unsafe { libc::kill(pid as libc::pid_t, libc::SIGUSR1) };
684+
}
621685
}
622686
Err(e) => {
623687
tracing::error!(
@@ -629,11 +693,13 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
629693
});
630694
storage.exec_task = Some(handle);
631695
// Send a tiny ack to the backend indicating execution is ready
696+
trace!("start_data_flow: sending ExecReady to backend");
632697
protocol::exec::prepare_exec_ready(&mut conn.send_buffer)?;
633698
// No demo row: result rows will be written by the execution task
634699
// Kick off initial heap block requests for each scan using slot 0
635700
if let Some(phys) = storage.physical_plan.as_ref() {
636701
let _ = for_each_scan::<_, Error>(phys, |id, table_oid| {
702+
trace!(scan_id = id, table_oid, slot_id = 0, "start_data_flow: initial heap request");
637703
request_heap_block(&mut conn.send_buffer, id, table_oid, 0)?;
638704
Ok(())
639705
});
@@ -642,12 +708,9 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
642708
}
643709

644710
fn end_data_flow(storage: &mut Storage) -> Result<TaskResult> {
645-
// Cancel running execution task if present
646-
if let Some(handle) = storage.exec_task.take() {
647-
handle.abort();
648-
}
649-
// Close and clear all registered scan channels
650-
storage.registry.close_and_clear();
711+
// Use unified reset to ensure state machine returns to Initialized and
712+
// any in-flight execution task is aborted, while preserving registry object.
713+
storage.flush();
651714
Ok(TaskResult::Noop)
652715
}
653716

executor/src/shm.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,24 @@ pub fn copy_vis(slot: usize, vis_len: usize) -> Vec<u8> {
4141
unsafe { std::slice::from_raw_parts(ptr, len) }.to_vec()
4242
}
4343

44-
/// Borrow the heap page bytes from shared memory for the given `slot`.
44+
/// Borrow the heap page bytes from shared memory for the given connection and `slot`.
4545
/// Lifetime is 'static since the region lives for the process lifetime; caller
4646
/// must ensure the producer won't mutate the slot while reading.
47-
pub unsafe fn block_slice(slot: usize) -> &'static [u8] {
47+
pub unsafe fn block_slice(conn_offset: usize, slot: usize) -> &'static [u8] {
4848
let (base, layout) = get();
49-
let ptr = slot_ptr_calc(base, layout, slot, 0);
49+
let per = layout.layout.size();
50+
let conn_base = base.add(conn_offset * per);
51+
let ptr = slot_ptr_calc(conn_base, layout, slot, 0);
5052
std::slice::from_raw_parts(ptr, layout.block_len)
5153
}
5254

5355
/// Borrow the visibility bitmap bytes from shared memory for the given `slot`.
5456
/// Lifetime is 'static; see notes in `block_slice`.
55-
pub unsafe fn vis_slice(slot: usize, vis_len: usize) -> &'static [u8] {
57+
pub unsafe fn vis_slice(conn_offset: usize, slot: usize, vis_len: usize) -> &'static [u8] {
5658
let (base, layout) = get();
57-
let ptr = vis_ptr_calc(base, layout, slot, 0);
59+
let per = layout.layout.size();
60+
let conn_base = base.add(conn_offset * per);
61+
let ptr = vis_ptr_calc(conn_base, layout, slot, 0);
5862
let len = vis_len.min(layout.vis_bytes_per_block);
5963
std::slice::from_raw_parts(ptr, len)
6064
}

0 commit comments

Comments
 (0)