Skip to content

Commit e5c0cd7

Browse files
committed
fix: extend shared memory
1 parent 08b5d13 commit e5c0cd7

File tree

3 files changed

+31
-5
lines changed

3 files changed

+31
-5
lines changed

executor/src/server.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,9 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
583583
let ctx = state.task_ctx();
584584

585585
let conn_id = conn.id;
586+
let client_pid_val = conn
587+
.client_pid
588+
.load(std::sync::atomic::Ordering::Relaxed); // snapshot pid
586589
let handle = tokio::spawn(async move {
587590
// Execute a single partition; PgScanExec uses a single stream
588591
match plan.execute(0, ctx) {
@@ -602,7 +605,13 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result<TaskR
602605
if pg_attrs.is_empty() {
603606
continue;
604607
}
605-
let _ = encode_and_write_rows(&batch, &pg_attrs, &mut ring);
608+
let wrote = encode_and_write_rows(&batch, &pg_attrs, &mut ring);
609+
if wrote > 0 {
610+
let pid = client_pid_val;
611+
if pid > 0 && pid != i32::MAX {
612+
unsafe { libc::kill(pid as libc::pid_t, libc::SIGUSR1) };
613+
}
614+
}
606615
}
607616
}
608617
}

postgres/src/lib.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,27 @@ unsafe extern "C-unwind" fn pg_fusion_shmem_request_hook() {
8989
.layout
9090
.size();
9191

92+
// Slot blocks (per-connection heap page buffers)
93+
let blksz = pgrx::pg_sys::BLCKSZ as usize;
94+
let slot_blocks_layout = executor::layout::slot_blocks_layout(
95+
worker::SLOTS_PER_CONN,
96+
blksz,
97+
worker::BLOCKS_PER_SLOT,
98+
)
99+
.expect("slot_blocks_layout");
100+
let slot_blocks_sz = slot_blocks_layout.layout.size() * num;
101+
102+
// Per-connection result ring buffers
103+
let result_ring_layout =
104+
executor::layout::result_ring_layout(worker::RESULT_RING_CAP).expect("result_ring_layout");
105+
let result_ring_sz = result_ring_layout.layout.size() * num;
106+
92107
let total = flags_sz
93108
.saturating_add(conns_sz)
94109
.saturating_add(stack_sz)
95-
.saturating_add(pid_sz);
110+
.saturating_add(pid_sz)
111+
.saturating_add(slot_blocks_sz)
112+
.saturating_add(result_ring_sz);
96113

97114
pgrx::pg_sys::RequestAddinShmemSpace(total);
98115
}

postgres/src/worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ const TOKIO_THREAD_NUMBER: usize = 1;
1818
pub(crate) const RECV_CAP: usize = 8192;
1919
pub(crate) const SEND_CAP: usize = 8192;
2020
// Heap block buffer configuration per connection
21-
const SLOTS_PER_CONN: usize = 2; // two logical slots per connection
22-
const BLOCKS_PER_SLOT: usize = 2; // double buffering inside each slot
23-
const RESULT_RING_CAP: usize = 64 * 1024; // bytes per-connection for result rows
21+
pub(crate) const SLOTS_PER_CONN: usize = 2; // two logical slots per connection
22+
pub(crate) const BLOCKS_PER_SLOT: usize = 2; // double buffering inside each slot
23+
pub(crate) const RESULT_RING_CAP: usize = 64 * 1024; // bytes per-connection for result rows
2424

2525
#[pg_guard]
2626
pub(crate) unsafe extern "C-unwind" fn init_datafusion_worker() {

0 commit comments

Comments
 (0)