Conversation
- Introduced new data models for grid session snapshots and per-row validation results in `grid_query.py`. - Updated asynchronous job mixin to include new endpoints for grid CSV import, download, and synchronization. - Refactored import paths in integration and unit tests to align with the new grid models.
| grid = grid.import_csv(path="path/to/metadata.csv", schema=schema) | ||
|
|
||
| # Download grid data as a local CSV file | ||
| file_path = grid.download_csv(download_location="/tmp") |
There was a problem hiding this comment.
This needs to change as it was flagged:
Test results:
Issue: [B108:hardcoded_tmp_directory] Probable insecure usage of temp file/directory.
Severity: Medium Confidence: Medium
CWE: CWE-377 (https://cwe.mitre.org/data/definitions/377.html)
More Info: https://bandit.readthedocs.io/en/0.0.0/plugins/b108_hardcoded_tmp_directory.html
Location: ./docs/guides/extensions/curator/scripts/grid_session_operations.py:58:48
57 # Download grid data as a local CSV file
58 file_path = grid.download_csv(download_location="/tmp")
59 print(f"Downloaded grid data to: {file_path}")
| async with websockets.connect( | ||
| presigned_url, | ||
| close_timeout=10, | ||
| open_timeout=self.connect_timeout, | ||
| ) as ws: | ||
| try: | ||
| async with asyncio.timeout(self.connect_timeout): | ||
| # Wait for initial messages until sync complete | ||
| async for raw_message in ws: | ||
| message = self._parse_message(raw_message) | ||
| if message is None: | ||
| continue | ||
|
|
||
| msg_type = message[0] | ||
|
|
||
| if msg_type == JSONRX_NOTIFICATION: | ||
| method = message[1] if len(message) > 1 else None | ||
| if method == "connected": | ||
| logger.debug("Grid WebSocket connected") | ||
| # Send clock sync with empty clock | ||
| sync_msg = json.dumps( | ||
| [ | ||
| JSONRX_REQUEST_COMPLETE, | ||
| 1, | ||
| "synchronize-clock", | ||
| [], | ||
| ] | ||
| ) | ||
| await ws.send(sync_msg) | ||
| elif method == "ping": | ||
| pass # Ignore keep-alive pings | ||
|
|
||
| elif msg_type == JSONRX_RESPONSE_DATA: | ||
| payload = message[2] if len(message) > 2 else None | ||
| if isinstance(payload, dict): | ||
| payload_type = payload.get("type") | ||
| if payload_type == "snapshot": | ||
| snapshot_url = payload.get("body") | ||
| logger.debug("Received snapshot URL") | ||
| elif payload_type == "patch": | ||
| patches.append(payload.get("body")) | ||
| elif payload is not None: | ||
| # Raw patch data | ||
| patches.append(payload) | ||
|
|
||
| elif msg_type == JSONRX_RESPONSE_COMPLETE: | ||
| logger.debug("Grid sync complete") | ||
| break | ||
|
|
||
| # Safety: don't loop forever | ||
| if len(patches) > 10000: | ||
| logger.warning( | ||
| "Received >10000 patches without sync " | ||
| "complete, stopping" | ||
| ) | ||
| break | ||
|
|
||
| except TimeoutError: | ||
| logger.warning( | ||
| "Grid WebSocket timed out after %.1fs waiting for " | ||
| "sync complete signal", | ||
| self.connect_timeout, | ||
| ) | ||
| except websockets.exceptions.ConnectionClosed: | ||
| logger.debug("WebSocket connection closed during sync") |
There was a problem hiding this comment.
Note: There is an issue in this logic that is NOT resolved. The result of it is that getting the snapshot of data from the Grid session over websocket it not working. It appears to hang forever and only times out.
To recreate this I set a project_id and folder_id, and used a hard coded path to the metadata.csv:
"""
Script: Complete programmatic CSV upload and validation workflow.
Demonstrates the full end-to-end flow for power users who work
entirely through the Python client without the grid UI.
"""
from synapseclient import Synapse
from synapseclient.extensions.curator import (
create_record_based_metadata_task,
query_schema_registry,
)
from synapseclient.models import Grid
syn = Synapse()
syn.login()
# 1. Find schema and create curation task
schema_uri = query_schema_registry(
synapse_client=syn, dcc="ad", datatype="IndividualAnimalMetadataTemplate"
)
record_set, curation_task, _ = create_record_based_metadata_task(
synapse_client=syn,
project_id="syn123456789",
folder_id="syn987654321",
record_set_name="StudyMetadata",
record_set_description="Animal study metadata",
curation_task_name="StudyMetadata_Curation",
upsert_keys=["individualID"],
instructions="Complete all required fields.",
schema_uri=schema_uri,
bind_schema_to_record_set=True,
)
# 2. Import CSV data into a grid session
# Column schema is auto-derived from the CSV header and the
# JSON schema bound to the grid.
grid = Grid(record_set_id=record_set.id).create()
grid = grid.import_csv(path="metadata.csv")
print(f"Imported {grid.csv_import_total_count} rows")
# 3. Check validation before committing
snapshot = grid.get_snapshot()
summary = snapshot.validation_summary
print(f"Validation: {summary['valid']}/{summary['total']} valid")
if summary["invalid"] > 0:
print("Validation errors found:")
for row in snapshot.rows:
if row.validation and not row.validation.is_valid:
print(f" Row {row.row_id}: " f"{row.validation.validation_error_message}")
# Fix errors and re-import if needed...
# 4. Commit when ready
grid = grid.export_to_record_set()
print(f"Exported to RecordSet version {grid.record_set_version_number}")
# 5. Clean up
grid.delete()
metadata.csv:
individualID,species,sex,genotype,modelSystemName,ageDeath,ageDeathUnit,brainWeight,tissueWeight,bedding,waterpH,lightCycle,roomTemperature,roomHumidity
IND-001,Mus musculus,male,5XFAD/WT,5XFAD,6,months,0.42,0.38,corn cob,7.0,12/12,22,50
IND-002,Mus musculus,female,5XFAD/WT,5XFAD,6,months,0.41,0.37,corn cob,7.0,12/12,22,50
IND-003,Mus musculus,male,WT/WT,wildtype,6,months,0.44,0.40,corn cob,7.0,12/12,22,50
IND-004,Mus musculus,female,WT/WT,wildtype,6,months,0.43,0.39,corn cob,7.0,12/12,22,50
IND-005,Mus musculus,male,5XFAD/WT,5XFAD,12,months,0.40,0.36,corn cob,7.0,12/12,22,52
IND-006,Mus musculus,female,5XFAD/WT,5XFAD,12,months,0.39,0.35,corn cob,7.0,12/12,22,52
IND-007,Mus musculus,male,WT/WT,wildtype,12,months,0.45,0.41,corn cob,7.0,12/12,22,52
IND-008,Mus musculus,female,WT/WT,wildtype,12,months,0.44,0.40,corn cob,7.0,12/12,22,52
WebSocket Timeout / Hang Issues — Areas for Review
1. grid_websocket.py:110-112 — Loop doesn't break after receiving snapshot URL
Once the snapshot URL arrives in a type-4 message, the loop keeps running until
type-5 (JSONRX_RESPONSE_COMPLETE) arrives — or the timeout fires. If the server
never sends type-5, the full connect_timeout budget is wasted. Adding a break
after snapshot_url = payload.get("body") would short-circuit immediately.
2. grid_websocket.py:77 vs 80 — connect_timeout is double-counted
open_timeout=self.connect_timeout (line 77) and
asyncio.timeout(self.connect_timeout) (line 80) run sequentially from the same
value. If the handshake consumes 25s of a 30s budget, the receive loop only has 5s.
These should be separate parameters (e.g., open_timeout + read_timeout).
3. grid_websocket.py:74-78 — WebSocket close adds up to 10s after timeout fires
close_timeout=10 means after asyncio.timeout fires, the context manager attempts
a graceful close handshake for up to another 10s. Worst-case wall time is
connect_timeout + close_timeout (40s at defaults), not just connect_timeout.
4. grid_websocket.py:94-99 — synchronize-clock [] payload is unverified protocol
Sending an empty clock tells the server "send me everything from the start." If the
server responds with patches instead of a snapshot URL (possible for a newly-created
session), the payload_type == "snapshot" branch is never hit and type-5 may never
arrive. The fallback silently returns an empty GridSnapshot(). Needs to be verified
against the Synapse Grid WebSocket API docs.
5. grid_websocket.py:163-164 — S3 fetch timeout is hardcoded and outside caller control
httpx.AsyncClient().get(snapshot_url, timeout=30.0) uses a fixed 30s independent
of connect_timeout. The caller's timeout parameter has no effect on this step.
6. grid.py:1364-1381 — REST calls before WebSocket have no timeout
create_grid_replica(...) and get_grid_presigned_url(...) are called with no
explicit timeout. If either hangs, connect_timeout never fires — control never
reaches the WebSocket code.
| # Column schema is auto-derived from the CSV header and the | ||
| # JSON schema bound to the grid. | ||
| grid = Grid(record_set_id=record_set.id).create() | ||
| grid = grid.import_csv(path="metadata.csv") |
There was a problem hiding this comment.
How these files are passed into this scripts should be updated to be at the top of the file as a script constant so everything is set up in a single spot.
Problem
The Synapse Python Client's Curator functionality lacks several capabilities that teams need for programmatic metadata curation workflows:
No CSV import/export for Grid sessions. Users cannot programmatically push CSV data into an active grid session or download grid data as CSV. The only way to populate a grid is through the web UI.
No grid synchronization. There is no way to programmatically apply grid session changes back to the data source.
No pre-commit validation. Validation statistics are only available after exporting a grid to a RecordSet (which commits a new version). Users cannot check whether their data passes schema validation before committing. The web UI achieves this via a WebSocket + CRDT protocol that is not exposed through any REST endpoint.
No task cleanup on delete. Deleting a file-based curation task leaves the associated EntityView orphaned.
Grid code coupled to CurationTask. All Grid and CurationTask classes live in a single
curation.pyfile, making the module large and hard to navigate.These gaps were identified in a brainstorming session with HTAN, NF, and AD teams who need fully programmatic curation pipelines without the grid UI.
Solution
Architecture
File extraction: Grid classes extracted from
curation.pyinto a newgrid.pymodule. CurationTask stays incuration.py.REST async jobs (Features 1–3): Three new
AsynchronousCommunicatorsubclasses following the existing start/poll/get pattern:GridCsvImportRequest→POST /grid/import/csv/asyncDownloadFromGridRequest→POST /grid/download/csv/asyncSynchronizeGridRequest→POST /grid/synchronize/asyncEach has a corresponding
Gridmethod (import_csv,download_csv,synchronize).import_csvauto-derives the column schema. The backend's CSV import endpoint requires aList<ColumnModel>withnameandcolumnTypefor each column. Rather than forcing users to construct this manually, whenschemais omitted:grid_json_schema_id) using the existing_get_column_type_from_js_property()mapper fromfile_based_metadata_task.py.STRING.Users can still pass explicit
schema=List[Column]to override.Pre-commit validation (Feature 3): The web UI gets per-row validation through a WebSocket connection to the grid session's CRDT model. Validation results are embedded at
rows[n].metadata.rowValidationin the CRDT snapshot. No REST endpoint exposes this data — the only REST path (GridQueryRequestHandler) is agent/Bedrock-only.The implementation adds:
grid_services.py— REST calls forcreate_grid_replicaandget_grid_presigned_url.grid_crdt_decoder.py— Python port of json-joy's indexed binary CRDT snapshot decoder. Handles the clock table, all 7 CRDT node types (CON, VAL, OBJ, VEC, STR, BIN, ARR), and tombstone filtering in ARR nodes. Uses a custom_cbor_item_byte_length()function to track exact CBOR byte consumption when multiple CBOR values are packed sequentially in a node buffer.grid_websocket.py— Read-only WebSocket client implementing the JSON-Rx protocol (connect → receive[8, 'connected']→ sendsynchronize-clock→ receive snapshot/patches →[5, subId]complete). Fetches the CBOR snapshot from its S3 presigned URL via httpx.grid_query.py—GridSnapshot,GridRow,GridRowValidationdataclasses with computed properties (total_rows,valid_rows,invalid_rows,pending_rows,validation_summary).Grid.get_snapshot()/Grid.get_validation()orchestrate the full flow: create replica → get presigned URL → connect WebSocket → decode snapshot → returnGridSnapshot.Task cleanup (Feature 4):
CurationTask.delete(delete_file_view=True)fetches the task to get theFileBasedMetadataTaskProperties.file_view_id, deletes the task, then deletes the EntityView.Dependencies:
websockets>=12.0andcbor2>=5.0added tosetup.cfg.Key design decisions
ArrNode.view()behavior.import_csvacceptspath,dataframe, orfile_handle_id— the upload is handled internally viamultipart_upload_file_async/multipart_upload_dataframe_async, the same internal functions used by table upsert.Testing
Grid,GridSnapshot,GridRow,GridRowValidation,create_grid_replica,get_grid_presigned_url,GridSnapshotDecoder,GridWebSocketClient)._cbor_item_byte_length()returns exact byte counts and_read_cbor_value()advances the reader position correctly without consuming trailing data.{'total': 3, 'valid': 1, 'invalid': 1, 'pending': 1}.synapseclient.models.curation.delete_grid_session→synapseclient.models.grid.delete_grid_session(andlist_grid_sessions) to match the new module structure.metadata_curation.mdprogrammatically validated against their source script files to confirm start/end lines are within bounds and point to the correct content.