5454 "null" : None ,
5555}
5656
57- SCATTER_JOB_PATTERN = re .compile (r"^(.+)_\d+$" )
57+ SCATTER_JOB_PATTERN = re .compile (r"^(.+)_( \d+) $" )
5858
5959CWLPROV_NONE = "https://w3id.org/cwl/prov#None"
6060
@@ -196,6 +196,7 @@ def __init__(self, root, workflow_name=None, license=None, readme=None,
196196 # map source files to destination files
197197 self .file_map = {}
198198 self .remap_names = remap_names
199+ self .data_root = "data"
199200
200201 @staticmethod
201202 def _get_step_maps (cwl_defs ):
@@ -213,11 +214,13 @@ def _get_step_maps(cwl_defs):
213214 def _resolve_plan (self , activity ):
214215 job_qname = activity .plan ()
215216 plan = activity .provenance .entity (job_qname )
217+ scatter_id = None
216218 if not plan :
217219 m = SCATTER_JOB_PATTERN .match (str (job_qname ))
218220 if m :
219221 plan = activity .provenance .entity (m .groups ()[0 ])
220- return plan
222+ scatter_id = m .groups ()[1 ]
223+ return plan , scatter_id
221224
222225 def _get_hash (self , prov_param ):
223226 k = prov_param .id .localpart
@@ -436,9 +439,11 @@ def add_action(self, crate, activity, parent_instrument=None):
436439 "@type" : "CreateAction" ,
437440 "name" : activity .label ,
438441 }))
439- plan = self ._resolve_plan (activity )
442+ plan , scatter_id = self ._resolve_plan (activity )
440443 plan_tag = plan .id .localpart
444+ dest_base = Path (self .data_root )
441445 if plan_tag == "main" :
446+ dest_base = dest_base / "main"
442447 assert str (activity .type ) == "wfprov:WorkflowRun"
443448 instrument = workflow
444449 self .roc_engine_run ["result" ] = action
@@ -453,6 +458,7 @@ def to_wf_p(k):
453458 if parts [0 ] == "main" :
454459 parts [0 ] = parent_instrument_fragment
455460 plan_tag = "/" .join (parts )
461+ dest_base = dest_base / (f"{ plan_tag } _{ scatter_id } " if scatter_id else f"{ plan_tag } " )
456462 tool_name = self .step_maps [parent_instrument_fragment ][plan_tag ]["tool" ]
457463 instrument = crate .dereference (f"{ workflow .id } #{ tool_name } " )
458464 control_action = self .control_actions .get (plan_tag )
@@ -476,12 +482,14 @@ def to_wf_p(k):
476482 action ["instrument" ] = instrument
477483 action ["startTime" ] = activity .start ().time .isoformat ()
478484 action ["endTime" ] = activity .end ().time .isoformat ()
479- action ["object" ] = self .add_action_params (crate , activity , to_wf_p , "usage" )
480- action ["result" ] = self .add_action_params (crate , activity , to_wf_p , "generation" )
485+ action ["object" ] = self .add_action_params (crate , activity , to_wf_p , "usage" ,
486+ dest_base / "in" if self .remap_names else "" )
487+ action ["result" ] = self .add_action_params (crate , activity , to_wf_p , "generation" ,
488+ dest_base / "out" if self .remap_names else "" )
481489 for job in activity .steps ():
482490 self .add_action (crate , job , parent_instrument = instrument )
483491
484- def add_action_params (self , crate , activity , to_wf_p , ptype = "usage" ):
492+ def add_action_params (self , crate , activity , to_wf_p , ptype = "usage" , dest_base = "" ):
485493 action_params = []
486494 all_roles = set ()
487495 for rel in getattr (activity , ptype )():
@@ -501,7 +509,7 @@ def add_action_params(self, crate, activity, to_wf_p, ptype="usage"):
501509 wf_p = crate .dereference (to_wf_p (k ))
502510 k = get_fragment (k )
503511 v = rel .entity ()
504- value = self .convert_param (v , crate )
512+ value = self .convert_param (v , crate , dest_base = dest_base )
505513 if value is None :
506514 continue # param is optional with no default and was not set
507515 if {"ro:Folder" , "wf4ever:File" } & set (str (_ ) for _ in v .types ()):
@@ -538,7 +546,7 @@ def _set_alternate_name(prov_param, action_p, parent=None):
538546 if "alternateName" in parent :
539547 action_p ["alternateName" ] = (Path (parent ["alternateName" ]) / basename ).as_posix ()
540548
541- def convert_param (self , prov_param , crate , convert_secondary = True , parent = None ):
549+ def convert_param (self , prov_param , crate , convert_secondary = True , parent = None , dest_base = "" ):
542550 type_names = frozenset (str (_ ) for _ in prov_param .types ())
543551 secondary_files = [_ .generated_entity () for _ in prov_param .derivations ()
544552 if str (_ .type ) == "cwlprov:SecondaryFile" ]
@@ -562,7 +570,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
562570 basename = getattr (prov_param , "basename" , hash_ )
563571 else :
564572 basename = hash_
565- dest = Path (parent .id if parent else "" ) / basename
573+ dest = Path (parent .id if parent else dest_base ) / basename
566574 action_p = crate .dereference (dest .as_posix ())
567575 if not action_p :
568576 source = self .root / Path ("data" ) / hash_ [:2 ] / hash_
@@ -583,7 +591,7 @@ def convert_param(self, prov_param, crate, convert_secondary=True, parent=None):
583591 basename = getattr (prov_param , "basename" , hash_ )
584592 else :
585593 basename = hash_
586- dest = Path (parent .id if parent else "" ) / basename
594+ dest = Path (parent .id if parent else dest_base ) / basename
587595 action_p = crate .dereference (dest .as_posix ())
588596 if not action_p :
589597 action_p = crate .add_directory (dest_path = dest )
0 commit comments