Skip to content

Curator feature gaps#1332

Draft
BryanFauble wants to merge 2 commits intodevelopfrom
curator-feature-gaps
Draft

Curator feature gaps#1332
BryanFauble wants to merge 2 commits intodevelopfrom
curator-feature-gaps

Conversation

@BryanFauble
Copy link
Member

Problem

The Synapse Python Client's Curator functionality lacks several capabilities that teams need for programmatic metadata curation workflows:

  1. No CSV import/export for Grid sessions. Users cannot programmatically push CSV data into an active grid session or download grid data as CSV. The only way to populate a grid is through the web UI.

  2. No grid synchronization. There is no way to programmatically apply grid session changes back to the data source.

  3. No pre-commit validation. Validation statistics are only available after exporting a grid to a RecordSet (which commits a new version). Users cannot check whether their data passes schema validation before committing. The web UI achieves this via a WebSocket + CRDT protocol that is not exposed through any REST endpoint.

  4. No task cleanup on delete. Deleting a file-based curation task leaves the associated EntityView orphaned.

  5. Grid code coupled to CurationTask. All Grid and CurationTask classes live in a single curation.py file, making the module large and hard to navigate.

These gaps were identified in a brainstorming session with HTAN, NF, and AD teams who need fully programmatic curation pipelines without the grid UI.

Solution

Architecture

File extraction: Grid classes extracted from curation.py into a new grid.py module. CurationTask stays in curation.py.

REST async jobs (Features 1–3): Three new AsynchronousCommunicator subclasses following the existing start/poll/get pattern:

  • GridCsvImportRequestPOST /grid/import/csv/async
  • DownloadFromGridRequestPOST /grid/download/csv/async
  • SynchronizeGridRequestPOST /grid/synchronize/async

Each has a corresponding Grid method (import_csv, download_csv, synchronize).

import_csv auto-derives the column schema. The backend's CSV import endpoint requires a List<ColumnModel> with name and columnType for each column. Rather than forcing users to construct this manually, when schema is omitted:

  1. Column names are read from the CSV header or DataFrame columns.
  2. Column types are resolved from the JSON schema bound to the grid session (grid_json_schema_id) using the existing _get_column_type_from_js_property() mapper from file_based_metadata_task.py.
  3. Columns not found in the schema default to STRING.

Users can still pass explicit schema=List[Column] to override.

Pre-commit validation (Feature 3): The web UI gets per-row validation through a WebSocket connection to the grid session's CRDT model. Validation results are embedded at rows[n].metadata.rowValidation in the CRDT snapshot. No REST endpoint exposes this data — the only REST path (GridQueryRequestHandler) is agent/Bedrock-only.

The implementation adds:

  • grid_services.py — REST calls for create_grid_replica and get_grid_presigned_url.
  • grid_crdt_decoder.py — Python port of json-joy's indexed binary CRDT snapshot decoder. Handles the clock table, all 7 CRDT node types (CON, VAL, OBJ, VEC, STR, BIN, ARR), and tombstone filtering in ARR nodes. Uses a custom _cbor_item_byte_length() function to track exact CBOR byte consumption when multiple CBOR values are packed sequentially in a node buffer.
  • grid_websocket.py — Read-only WebSocket client implementing the JSON-Rx protocol (connect → receive [8, 'connected'] → send synchronize-clock → receive snapshot/patches → [5, subId] complete). Fetches the CBOR snapshot from its S3 presigned URL via httpx.
  • grid_query.pyGridSnapshot, GridRow, GridRowValidation dataclasses with computed properties (total_rows, valid_rows, invalid_rows, pending_rows, validation_summary).

Grid.get_snapshot() / Grid.get_validation() orchestrate the full flow: create replica → get presigned URL → connect WebSocket → decode snapshot → return GridSnapshot.

Task cleanup (Feature 4): CurationTask.delete(delete_file_view=True) fetches the task to get the FileBasedMetadataTaskProperties.file_view_id, deletes the task, then deletes the EntityView.

Dependencies: websockets>=12.0 and cbor2>=5.0 added to setup.cfg.

Key design decisions

  • The CRDT decoder is snapshot-only (read-only). It does not implement patch application or conflict resolution. This keeps the implementation small while covering the validation use case.
  • Tombstones in ARR nodes are preserved during decoding but filtered during view generation, matching json-joy's ArrNode.view() behavior.
  • The WebSocket client handles both initialization paths (snapshot-first and patch-first) since the server does not guarantee which it sends.
  • import_csv accepts path, dataframe, or file_handle_id — the upload is handled internally via multipart_upload_file_async / multipart_upload_dataframe_async, the same internal functions used by table upsert.

Testing

  • 134 existing unit tests pass (43 curation model tests + 91 curator extension tests), confirming the Grid extraction and CurationTask changes introduce no regressions.
  • Import verification: All new and modified public exports confirmed importable (Grid, GridSnapshot, GridRow, GridRowValidation, create_grid_replica, get_grid_presigned_url, GridSnapshotDecoder, GridWebSocketClient).
  • CRDT decoder CBOR byte-tracking: Tested sequential decoding of multiple CBOR values packed in a single buffer (string + integer + dict + null + boolean) to verify _cbor_item_byte_length() returns exact byte counts and _read_cbor_value() advances the reader position correctly without consuming trailing data.
  • GridSnapshot validation summary: Verified computed properties with a 3-row fixture (1 valid, 1 invalid, 1 no-validation) producing {'total': 3, 'valid': 1, 'invalid': 1, 'pending': 1}.
  • Mock patch targets updated: Unit test mocks updated from synapseclient.models.curation.delete_grid_sessionsynapseclient.models.grid.delete_grid_session (and list_grid_sessions) to match the new module structure.
  • Documentation line references: All 17 snippet inclusions in metadata_curation.md programmatically validated against their source script files to confirm start/end lines are within bounds and point to the correct content.
  • Integration tests for CSV import, download, synchronization, WebSocket snapshot validation, and task deletion with cleanup require a live Synapse environment and are not included in unit test runs.

- Introduced new data models for grid session snapshots and per-row validation results in `grid_query.py`.
- Updated asynchronous job mixin to include new endpoints for grid CSV import, download, and synchronization.
- Refactored import paths in integration and unit tests to align with the new grid models.
grid = grid.import_csv(path="path/to/metadata.csv", schema=schema)

# Download grid data as a local CSV file
file_path = grid.download_csv(download_location="/tmp")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to change as it was flagged:

Test results:

Issue: [B108:hardcoded_tmp_directory] Probable insecure usage of temp file/directory.
Severity: Medium Confidence: Medium
CWE: CWE-377 (https://cwe.mitre.org/data/definitions/377.html)
More Info: https://bandit.readthedocs.io/en/0.0.0/plugins/b108_hardcoded_tmp_directory.html
Location: ./docs/guides/extensions/curator/scripts/grid_session_operations.py:58:48
57 # Download grid data as a local CSV file
58 file_path = grid.download_csv(download_location="/tmp")
59 print(f"Downloaded grid data to: {file_path}")

Comment on lines +74 to +138
async with websockets.connect(
presigned_url,
close_timeout=10,
open_timeout=self.connect_timeout,
) as ws:
try:
async with asyncio.timeout(self.connect_timeout):
# Wait for initial messages until sync complete
async for raw_message in ws:
message = self._parse_message(raw_message)
if message is None:
continue

msg_type = message[0]

if msg_type == JSONRX_NOTIFICATION:
method = message[1] if len(message) > 1 else None
if method == "connected":
logger.debug("Grid WebSocket connected")
# Send clock sync with empty clock
sync_msg = json.dumps(
[
JSONRX_REQUEST_COMPLETE,
1,
"synchronize-clock",
[],
]
)
await ws.send(sync_msg)
elif method == "ping":
pass # Ignore keep-alive pings

elif msg_type == JSONRX_RESPONSE_DATA:
payload = message[2] if len(message) > 2 else None
if isinstance(payload, dict):
payload_type = payload.get("type")
if payload_type == "snapshot":
snapshot_url = payload.get("body")
logger.debug("Received snapshot URL")
elif payload_type == "patch":
patches.append(payload.get("body"))
elif payload is not None:
# Raw patch data
patches.append(payload)

elif msg_type == JSONRX_RESPONSE_COMPLETE:
logger.debug("Grid sync complete")
break

# Safety: don't loop forever
if len(patches) > 10000:
logger.warning(
"Received >10000 patches without sync "
"complete, stopping"
)
break

except TimeoutError:
logger.warning(
"Grid WebSocket timed out after %.1fs waiting for "
"sync complete signal",
self.connect_timeout,
)
except websockets.exceptions.ConnectionClosed:
logger.debug("WebSocket connection closed during sync")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: There is an issue in this logic that is NOT resolved. The result of it is that getting the snapshot of data from the Grid session over websocket it not working. It appears to hang forever and only times out.

To recreate this I set a project_id and folder_id, and used a hard coded path to the metadata.csv:

"""
Script: Complete programmatic CSV upload and validation workflow.
Demonstrates the full end-to-end flow for power users who work
entirely through the Python client without the grid UI.
"""

from synapseclient import Synapse
from synapseclient.extensions.curator import (
    create_record_based_metadata_task,
    query_schema_registry,
)
from synapseclient.models import Grid

syn = Synapse()
syn.login()

# 1. Find schema and create curation task
schema_uri = query_schema_registry(
    synapse_client=syn, dcc="ad", datatype="IndividualAnimalMetadataTemplate"
)

record_set, curation_task, _ = create_record_based_metadata_task(
    synapse_client=syn,
    project_id="syn123456789",
    folder_id="syn987654321",
    record_set_name="StudyMetadata",
    record_set_description="Animal study metadata",
    curation_task_name="StudyMetadata_Curation",
    upsert_keys=["individualID"],
    instructions="Complete all required fields.",
    schema_uri=schema_uri,
    bind_schema_to_record_set=True,
)

# 2. Import CSV data into a grid session
# Column schema is auto-derived from the CSV header and the
# JSON schema bound to the grid.
grid = Grid(record_set_id=record_set.id).create()
grid = grid.import_csv(path="metadata.csv")
print(f"Imported {grid.csv_import_total_count} rows")

# 3. Check validation before committing
snapshot = grid.get_snapshot()
summary = snapshot.validation_summary
print(f"Validation: {summary['valid']}/{summary['total']} valid")

if summary["invalid"] > 0:
    print("Validation errors found:")
    for row in snapshot.rows:
        if row.validation and not row.validation.is_valid:
            print(f"  Row {row.row_id}: " f"{row.validation.validation_error_message}")
    # Fix errors and re-import if needed...

# 4. Commit when ready
grid = grid.export_to_record_set()
print(f"Exported to RecordSet version {grid.record_set_version_number}")

# 5. Clean up
grid.delete()

metadata.csv:

individualID,species,sex,genotype,modelSystemName,ageDeath,ageDeathUnit,brainWeight,tissueWeight,bedding,waterpH,lightCycle,roomTemperature,roomHumidity
IND-001,Mus musculus,male,5XFAD/WT,5XFAD,6,months,0.42,0.38,corn cob,7.0,12/12,22,50
IND-002,Mus musculus,female,5XFAD/WT,5XFAD,6,months,0.41,0.37,corn cob,7.0,12/12,22,50
IND-003,Mus musculus,male,WT/WT,wildtype,6,months,0.44,0.40,corn cob,7.0,12/12,22,50
IND-004,Mus musculus,female,WT/WT,wildtype,6,months,0.43,0.39,corn cob,7.0,12/12,22,50
IND-005,Mus musculus,male,5XFAD/WT,5XFAD,12,months,0.40,0.36,corn cob,7.0,12/12,22,52
IND-006,Mus musculus,female,5XFAD/WT,5XFAD,12,months,0.39,0.35,corn cob,7.0,12/12,22,52
IND-007,Mus musculus,male,WT/WT,wildtype,12,months,0.45,0.41,corn cob,7.0,12/12,22,52
IND-008,Mus musculus,female,WT/WT,wildtype,12,months,0.44,0.40,corn cob,7.0,12/12,22,52

WebSocket Timeout / Hang Issues — Areas for Review

1. grid_websocket.py:110-112 — Loop doesn't break after receiving snapshot URL

Once the snapshot URL arrives in a type-4 message, the loop keeps running until
type-5 (JSONRX_RESPONSE_COMPLETE) arrives — or the timeout fires. If the server
never sends type-5, the full connect_timeout budget is wasted. Adding a break
after snapshot_url = payload.get("body") would short-circuit immediately.

2. grid_websocket.py:77 vs 80connect_timeout is double-counted

open_timeout=self.connect_timeout (line 77) and
asyncio.timeout(self.connect_timeout) (line 80) run sequentially from the same
value. If the handshake consumes 25s of a 30s budget, the receive loop only has 5s.
These should be separate parameters (e.g., open_timeout + read_timeout).

3. grid_websocket.py:74-78 — WebSocket close adds up to 10s after timeout fires

close_timeout=10 means after asyncio.timeout fires, the context manager attempts
a graceful close handshake for up to another 10s. Worst-case wall time is
connect_timeout + close_timeout (40s at defaults), not just connect_timeout.

4. grid_websocket.py:94-99synchronize-clock [] payload is unverified protocol

Sending an empty clock tells the server "send me everything from the start." If the
server responds with patches instead of a snapshot URL (possible for a newly-created
session), the payload_type == "snapshot" branch is never hit and type-5 may never
arrive. The fallback silently returns an empty GridSnapshot(). Needs to be verified
against the Synapse Grid WebSocket API docs.

5. grid_websocket.py:163-164 — S3 fetch timeout is hardcoded and outside caller control

httpx.AsyncClient().get(snapshot_url, timeout=30.0) uses a fixed 30s independent
of connect_timeout. The caller's timeout parameter has no effect on this step.

6. grid.py:1364-1381 — REST calls before WebSocket have no timeout

create_grid_replica(...) and get_grid_presigned_url(...) are called with no
explicit timeout. If either hangs, connect_timeout never fires — control never
reaches the WebSocket code.

# Column schema is auto-derived from the CSV header and the
# JSON schema bound to the grid.
grid = Grid(record_set_id=record_set.id).create()
grid = grid.import_csv(path="metadata.csv")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How these files are passed into this scripts should be updated to be at the top of the file as a script constant so everything is set up in a single spot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant