diff --git a/setup.py b/setup.py index a4d26588e..e5a168660 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,7 @@ "topaz3 = dlstbx.wrapper.topaz3_wrapper:Topaz3Wrapper", "xia2 = dlstbx.wrapper.xia2:Xia2Wrapper", "xia2.multiplex = dlstbx.wrapper.xia2_multiplex:Xia2MultiplexWrapper", + "xia2.multiplex_filtering = dlstbx.wrapper.xia2_multiplex_filtering:Xia2MultiplexFilteringWrapper", "xia2.strategy = dlstbx.wrapper.xia2_strategy:Xia2StrategyWrapper", "xia2.to_shelxcde = dlstbx.wrapper.xia2_to_shelxcde:Xia2toShelxcdeWrapper", "xia2.ssx = dlstbx.wrapper.xia2_ssx:Xia2SsxWrapper", diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d74ff5d1e..3c005b53d 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -74,6 +74,7 @@ class RelatedDCIDs(pydantic.BaseModel): dcids: List[int] sample_id: Optional[int] = pydantic.Field(gt=0, default=None) sample_group_id: Optional[int] = pydantic.Field(gt=0, default=None) + name: Optional[str] = None class MetalIdParameters(pydantic.BaseModel): @@ -233,6 +234,32 @@ class MultiplexParameters(pydantic.BaseModel): trigger_every_collection: bool +class MultiplexFilteringParameters(pydantic.BaseModel): + dcid: int = pydantic.Field(gt=0) + multiplex_job: pathlib.Path + automatic: Optional[bool] = False + comment: Optional[str] = None + diffraction_plan_info: Optional[DiffractionPlanInfo] = None + recipe: Optional[str] = None + beamline: str + multiplex_id: Optional[int] = pydantic.Field(default=0, gt=0) + cluster_num: Optional[str] = None + backoff_delay: float = pydantic.Field(default=8, alias="backoff-delay") + backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try") + backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier") + multiplex_parameters: dict + filtering_group_size: Dict[str, int] = pydantic.Field( + default={"default": 50}, alias="filtering-group-size" + ) + + @pydantic.field_validator("multiplex_parameters", mode="before") + @classmethod + def convert_multiplex_parameters(cls, v): + if isinstance(v, ChainMapWithReplacement): + return dict(v) + return v + + class Xia2SsxReduceParameters(pydantic.BaseModel): dcid: int = pydantic.Field(gt=0) related_dcids: List[RelatedDCIDs] @@ -2168,6 +2195,12 @@ def trigger_multiplex( job_parameters.append(("sample_id", str(group.sample_id))) else: job_parameters.append(("sample_group_id", str(group.sample_group_id))) + + # Pass these to xia2.multiplex_filtering to avoid making all the same queries as above all over again + + for idx, dcid in enumerate(dcids): + job_parameters.append((f"group_dcid_{idx}", str(dcid))) + if parameters.spacegroup: job_parameters.append(("spacegroup", parameters.spacegroup)) if ( @@ -2206,6 +2239,351 @@ def trigger_multiplex( return {"success": True, "return_value": jobids} + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) + def trigger_multiplex_filtering( + self, + rw: RecipeWrapper, + *, + parameters: MultiplexFilteringParameters, + session: sqlalchemy.orm.session.Session, + transaction: int, + message: Dict, + **kwargs, + ): + """Trigger a xia2.multiplex_filtering job for a given data collection. + This is triggered from a completed xia2.multiplex job and makes use of this + existing file structure. The filtering algorithms identify if there are any groups + of images which do not correlate with the rest of the data. The size of these + image groups can be varied. + + Currently, running filtering on clusters is not supported, so it is first verified + whether or not the triggered job corresponds to a cluster. + + A check is then run that the given multiplex directory exists, the job has finished + running, and that all the necessary files have finished copying. If they have not, a + delay is used like in xia2.multiplex. + + delay = backoff-delay * backoff-multiplier ** ntry + + If any xia2.multiplex jobs are still running (or files copying) after backoff-max-try + iterations, then xia2.multiplex_filtering will not run. + + The filtering parameters are then set (note future iterations will likey vary these) and + new ProcessingJob, ProcessingJobImageSweep and ProcessingJobParameter entries + will be created, and the resulting list of processingJobIds will be sent to + the `processing_recipe` queue. + + Note that initially this will only be triggered for VMXm. This filtering is done here + in the trigger service as future plans will roll out generally to all multiplex jobs. + + Recipe parameters: + - target: set this to "multiplex_filtering" + - beamline: the beamline as a string + - dcid: the dataCollectionId for the given data collection + - comment: a comment to be stored in the ProcessingJob.comment field + - automatic: boolean value passed to ProcessingJob.automatic field + - multiplex_job: location of the parent multiplex job ({ispyb_working_directory}/xia2.multiplex) + - multiplex_id:processingJobId of the parent multiplex job + - cluster_num: if the triggering multiplex job was a cluster, list the number + - multiplex_parameters: ispyb_processing_parameters for the parent multiplex job + - backoff-delay: base delay (in seconds) for exponential backoff in the event + that one or more xia2-dials processing job is still running or yet to start + - backoff-max-try: the number of times that a message re-sent before giving up + - backoff-multiplier: the multiplier for exponential backoff + + Example recipe parameters: + { + "target": "multiplex_filtering", + "dcid": "21152574", + "recipe": "postprocessing-multiplex-filtering", + "comment": "Extra filtering using xia2.multiplex triggered by automatic xia2.multiplex", + "automatic": true, + "beamline": "i02-2", + "multiplex_job": "/dls/mx/data/nt37104/nt37104-202/tmp/zocalo/VMXi-AB7814/well_62/images/image_291258/b8a346a9-c5cb-4eb5-8208-6115eb283910/xia2.multiplex", + "multiplex_id": "37941124", + "cluster_num": "$cluster_number", + "multiplex_parameters": { + "data": [ + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.refl" + ], + "sample_id": [ + "7087938" + ], + "group_dcid_0": [ + "21152550" + ], + "group_dcid_1": [ + "21152556" + ], + "group_dcid_2": [ + "21152559" + ], + "group_dcid_3": [ + "21152562" + ], + "group_dcid_4": [ + "21152565" + ], + "group_dcid_5": [ + "21152568" + ], + "group_dcid_6": [ + "21152574" + ], + "clustering.method": [ + "coordinate" + ], + "clustering.output_clusters": [ + "true" + ] + }, + "filtering_group_size": { + "default": 50, + "i02-1": 10 + }, + "backoff-delay": "8", + "backoff-max-try": "10", + "backoff-multiplier": "2" + } + """ + + # For now, just trigger for VMXm + if parameters.beamline != "i02-1": + self.log.info( + f"xia2.multiplex_filtering currently not triggered for beamline {parameters.beamline}." + ) + return {"success": True} + + # even though could be multiple different multiplex jobs for a given DCID, only have one here + # Think this is because the multiplex filtering is triggered off the individual multiplex jobs + + if parameters.cluster_num != "None": + is_cluster = True + else: + is_cluster = False + + if is_cluster: + self.log.info( + f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." + ) + return {"success": True} + else: + multiplex_dir = parameters.multiplex_job + if not multiplex_dir.is_dir(): + self.log.error( + f"Given multiplex directory {multiplex_dir} does not exist. Aborting job." + ) + return {"success": True} + else: + self.log.info(f"Previous multiplex job at {multiplex_dir}") + + # Add in delay in case not finished + status = { + "ntry": 0, + } + if isinstance(message, dict): + status.update(message.get("trigger-status", {})) + message_delay = int( + parameters.backoff_delay + * parameters.backoff_multiplier ** status["ntry"] + ) + status["ntry"] += 1 + self.log.debug( + f"dcid={parameters.dcid}\nmessage_delay={message_delay}\n{status}" + ) + + min_start_time = datetime.now() - timedelta(hours=24) + query = ( + ( + session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( + ProcessingJob, + ProcessingJob.processingJobId + == AutoProcProgram.processingJobId, + ) + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) + .filter(ProcessingJob.automatic == True) # noqa E712 + .filter(ProcessingJob.recordTimestamp > min_start_time) # noqa E711 + .filter( + AutoProcProgram.processingJobId == parameters.multiplex_id + ) # check only parent multiplex + .filter( + or_( + AutoProcProgram.processingStatus == None, # noqa E711 + AutoProcProgram.processingStartTime == None, # noqa E711 + ) + ) + ) + + # If there are any running (or yet to start) jobs, then checkpoint with delay + waiting_processing_jobs = query.all() + + # Check if finished copying files over + + needed_files = [ + multiplex_dir / "models_scaled.expt", + multiplex_dir / "observations_scaled.refl", + multiplex_dir / "scaled.mtz", + multiplex_dir / "xia2-multiplex-working.phil", + multiplex_dir / "xia2.multiplex.json", + ] + + for mplx_file in needed_files: + if not mplx_file.is_file(): + waiting_processing_jobs.append(parameters.multiplex_id) + self.log.info( + f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." + ) + break + + if len(waiting_processing_jobs) > 0: + self.log.info( + f"Waiting on xia2.multiplex {parameters.multiplex_job} from processing job id {parameters.multiplex_id} to finish processing (dcid {parameters.dcid})" + ) + if status["ntry"] > parameters.backoff_max_try: + # Give up waiting for this program to finish and trigger + # multiplex with remaining related results are available + self.log.info( + f"max-try exceeded, giving up waiting for related processings for dcid {parameters.dcid}\n" + ) + else: + # Send results to myself for next round of processing + rw.checkpoint( + { + "trigger-status": status, + }, + delay=message_delay, + transaction=transaction, + ) + + return {"success": True} + + dc = ( + session.query(DataCollection) + .filter(DataCollection.dataCollectionId == parameters.dcid) + .one() + ) + + # define params -> use image_mode because dataset mode is essentially equivalent to clustering + + job_parameters: list[tuple[str, str]] = [] + + job_parameters.extend([("input", str(multiplex_dir))]) + + if ( + parameters.diffraction_plan_info + and parameters.diffraction_plan_info.anomalousScatterer + ): + job_parameters.extend( + [ + ("anomalous", "true"), + ("absorption_level", "high"), + ] + ) + + group_size = parameters.filtering_group_size.get( + parameters.beamline, parameters.filtering_group_size["default"] + ) + + job_parameters.extend( + [ + ("filtering.method", "deltacchalf"), + ("deltacchalf.stdcutoff", "3"), + ("deltacchalf.mode", "image_group"), + ("deltacchalf.group_size", str(group_size)), + ] + ) + + # compile all dcids actually used in previous multiplex job + # saves re-doing all the ispyb queries that multiplex does + # these are necessary as much filtering is done + # eg related dcids stored in the data base can be mix of grid scans and rotation which shoudl not be merged + + dcids = [] + + for i in parameters.multiplex_parameters: + if "group_dcid" in i: + dcids.append(parameters.multiplex_parameters[i][0]) + + # Need either sample_id or sample_group_id in the parameters so that synchweb will display the group name (ie xia2.multiplex_filtering (C7d2_)) + + if "sample_id" in parameters.multiplex_parameters: + sample_id = parameters.multiplex_parameters["sample_id"][0] + job_parameters.append(("sample_id", sample_id)) + self.log.debug(f"Has sample_id {sample_id}") + else: + sample_group_id = parameters.multiplex_parameters["sample_group_id"][0] + job_parameters.append(("sample_group_id", sample_group_id)) + self.log.debug(f"Has sample_group_id {sample_group_id}") + + # put parameters into ispyb data base + + jp = self.ispyb.mx_processing.get_job_params() + jp["automatic"] = parameters.automatic + jp["comments"] = parameters.comment + jp["datacollectionid"] = parameters.dcid + jp["display_name"] = "xia2.multiplex_filtering" + jp["recipe"] = "postprocessing-xia2-multiplex-filtering" + jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) + self.log.debug(f"xia2.multiplex_filtering trigger: generated JobID {jobid}") + + # Need to add all related dcids to jisp so that synchweb has a label for how many datasets are involved (ie 7x xia2.multiplex_filtering) + + query = ( + session.query(DataCollection) + .filter(DataCollection.dataCollectionId.in_(dcids)) + .options( + Load(DataCollection).load_only( + DataCollection.dataCollectionId, + DataCollection.wavelength, + DataCollection.startImageNumber, + DataCollection.numberOfImages, + raiseload=True, + ) + ) + ) + + for dc in query.all(): + jisp = self.ispyb.mx_processing.get_job_image_sweep_params() + jisp["datacollectionid"] = dc.dataCollectionId + jisp["start_image"] = dc.startImageNumber + jisp["end_image"] = dc.startImageNumber + dc.numberOfImages - 1 + jisp["job_id"] = jobid + jispid = self.ispyb.mx_processing.upsert_job_image_sweep( + list(jisp.values()) + ) + self.log.debug( + f"xia2.multiplex_filtering trigger: generated JobImageSweepID {jispid}" + ) + + for k, v in job_parameters: + jpp = self.ispyb.mx_processing.get_job_parameter_params() + jpp["job_id"] = jobid + jpp["parameter_key"] = k + jpp["parameter_value"] = v + jppid = self.ispyb.mx_processing.upsert_job_parameter( + list(jpp.values()) + ) + self.log.debug( + f"xia2.multiplex_filtering trigger generated JobParameterID {jppid} with {k}={v}" + ) + # send final message + + message = {"recipes": [], "parameters": {"ispyb_process": jobid}} + rw.transport.send("processing_recipe", message) + + self.log.info( + f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" + ) + + return {"success": True, "return_value": jobid} + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_xia2_ssx_reduce( self, diff --git a/src/dlstbx/wrapper/xia2_multiplex.py b/src/dlstbx/wrapper/xia2_multiplex.py index 17be9e0b1..164112a31 100644 --- a/src/dlstbx/wrapper/xia2_multiplex.py +++ b/src/dlstbx/wrapper/xia2_multiplex.py @@ -151,6 +151,9 @@ def construct_commandline(self, params): if params.get("ispyb_parameters"): ignore = {"sample_id", "sample_group_id"} + for param in params["ispyb_parameters"]: + if "group_dcid" in param: + ignore.add(param) translation = { "d_min": "resolution.d_min", "spacegroup": "symmetry.space_group", @@ -421,6 +424,8 @@ def is_final_result(final_file: pathlib.Path) -> bool: ) self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + self.recwrap.environment.update({"cluster_number": cluster_num}) + # Send results to ispyb and trigger downstream recipe steps for this dataset self.log.info( f"Triggering downstream recipe steps for dataset: '{dataset_name}'" diff --git a/src/dlstbx/wrapper/xia2_multiplex_filtering.py b/src/dlstbx/wrapper/xia2_multiplex_filtering.py new file mode 100644 index 000000000..1cf568fb9 --- /dev/null +++ b/src/dlstbx/wrapper/xia2_multiplex_filtering.py @@ -0,0 +1,437 @@ +from __future__ import annotations + +import json +import pathlib +import shutil +import subprocess +import time +from fnmatch import fnmatch + +import iotbx.merging_statistics +from cctbx import uctbx + +import dlstbx.util.symlink +from dlstbx.wrapper import Wrapper + + +def lookup(merging_stats, item, shell): + i_bin = {"innerShell": 0, "outerShell": -1}.get(shell) + if i_bin is not None: + return merging_stats[item][i_bin] + return merging_stats["overall"][item] + + +class Xia2MultiplexFilteringWrapper(Wrapper): + """ + Wrapper for xia2.multiplex_filtering. Largely based off wrapper for xia2.multiplex. + + xia2.multiplex_filtering takes existing multiplex results and performs some + additional filtering to improve data reduction quality. + + """ + + _logger_name = "dlstbx.wrap.xia2.multiplex_filtering" + name = "xia2.multiplex_filtering" + + def send_results_to_ispyb( + self, z, xtriage_results=None, cluster_num=None, attachments=[] + ): + ispyb_command_list = [] + + # Place holder code for future iterations where may run filtering jobs on clusters + + if cluster_num is not None: + register_autoproc_prog = { + "ispyb_command": "register_processing", + "program": "xia2.multiplex_filtering", + "cmdline": "xia2.multiplex_filtering (ap-zoc)", + "environment": {"cluster": cluster_num}, + "store_result": "ispyb_autoprocprogram_id", + } + ispyb_command_list.append(register_autoproc_prog) + + # Step 1: Add new record to AutoProc, keep the AutoProcID + + register_autoproc = { + "ispyb_command": "write_autoproc", + "autoproc_id": None, + "store_result": "ispyb_autoproc_id", + "spacegroup": z["spacegroup"], + "refinedcell_a": z["unit_cell"][0], + "refinedcell_b": z["unit_cell"][1], + "refinedcell_c": z["unit_cell"][2], + "refinedcell_alpha": z["unit_cell"][3], + "refinedcell_beta": z["unit_cell"][4], + "refinedcell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(register_autoproc) + + # Step 2: Store scaling results, linked to the AutoProcID + # Keep the AutoProcScalingID + + insert_scaling = z["scaling_statistics"] + insert_scaling.update( + { + "ispyb_command": "insert_scaling", + "autoproc_id": "$ispyb_autoproc_id", + "store_result": "ispyb_autoprocscaling_id", + } + ) + ispyb_command_list.append(insert_scaling) + + # Step 3: Store integration result, linked to the ScalingID + # Use pre-registered integration id for 'Filtered' dataset + + if cluster_num is not None: + integration_id = None + else: + integration_id = "$ispyb_integration_id" + + integration = { + "ispyb_command": "upsert_integration", + "scaling_id": "$ispyb_autoprocscaling_id", + "integration_id": integration_id, + "cell_a": z["unit_cell"][0], + "cell_b": z["unit_cell"][1], + "cell_c": z["unit_cell"][2], + "cell_alpha": z["unit_cell"][3], + "cell_beta": z["unit_cell"][4], + "cell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(integration) + + # Step 4: Upload attachments + + if attachments: + for attachment in attachments: + upload_attachment = { + "ispyb_command": "add_program_attachment", + "program_id": "$ispyb_autoprocprogram_id", + "file_name": attachment["file_name"], + "file_path": attachment["file_path"], + "file_type": attachment["file_type"], + "importance_rank": attachment["importance_rank"], + } + ispyb_command_list.append(upload_attachment) + + # Step 5: Register successful processing for cluster jobs (placeholder code) + + if cluster_num is not None: + update_autoproc_prog = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "processing successful", + "status": "success", + } + ispyb_command_list.append(update_autoproc_prog) + + if xtriage_results is not None: + for level, messages in xtriage_results.items(): + for message in messages: + if ( + message["text"] + == "The merging statistics indicate that the data may be assigned to the wrong space group." + ): + # this is not a useful warning for multi-crystal + continue + ispyb_command_list.append( + { + "ispyb_command": "add_program_message", + "program_id": "$ispyb_autoprocprogram_id", + "message": message["text"], + "description": message["summary"], + "severity": {0: "INFO", 1: "WARNING", 2: "ERROR"}.get( + message["level"] + ), + } + ) + + self.log.debug(f"Sending {ispyb_command_list}") + self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + + def construct_commandline(self, params): + command = ["xia2.multiplex_filtering"] + + # xia2.multiplex_filtering uses previous multiplex job files + # so no specific data files to be input, just add ispyb_parameters to cmdline + + if params.get("ispyb_parameters"): + ignore = {"sample_id", "sample_group_id"} + translation = { + "d_min": "resolution.d_min", + } + for param, value in params["ispyb_parameters"].items(): + if param not in ignore: + command.append(translation.get(param, param) + "=" + value[0]) + + return command + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + + params = self.recwrap.recipe_step["job_parameters"] + + command = self.construct_commandline(params) + + working_directory = pathlib.Path(params["working_directory"]) + results_directory = pathlib.Path(params["results_directory"]) + + # Create working directory with symbolic link + + working_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + working_directory, params["create_symlink"] + ) + + # run xia2.multiplex_filtering in working directory + + self.log.info(f"command: {' '.join(command)}") + try: + start_time = time.perf_counter() + result = subprocess.run( + command, + capture_output=True, + timeout=params.get("timeout"), + cwd=working_directory, + ) + runtime = time.perf_counter() - start_time + self.log.info(f"xia2.multiplex_filtering took {runtime} seconds") + self._runtime_hist.observe(runtime) + except subprocess.TimeoutExpired as te: + success = False + self.log.warning( + f"xia2.multiplex_filtering timed out: {te.timeout}\n {te.cmd}" + ) + self.log.debug(te.stdout) + self.log.debug(te.stderr) + self._timeout_counter.inc() + else: + if success := not result.returncode: + self.log.info("xia2.multiplex_filtering successful") + else: + self.log.info( + f"xia2.multiplex_filtering failed with exitcode {result.returncode}" + ) + self.log.debug(result.stdout) + self.log.debug(result.stderr) + self.log.info(f"working_directory: {working_directory}") + + filtered_unmerged_mtz = working_directory / "filtered_unmerged.mtz" + multiplex_filtering_json = working_directory / "xia2.multiplex_filtering.json" + + if not (filtered_unmerged_mtz.is_file() and multiplex_filtering_json.is_file()): + success = False + + # Create results directory + results_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + results_directory, params["create_symlink"] + ) + if pipeline_final_params := params.get("pipeline-final", []): + final_directory = pathlib.Path(pipeline_final_params["path"]) + final_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + final_directory, params["create_symlink"] + ) + + # Maybe move this function elsewhere + def is_final_result(final_file: pathlib.Path) -> bool: + return any( + fnmatch(str(final_file.name), patt) + for patt in pipeline_final_params["patterns"] + ) + + keep_ext = { + ".png": None, + ".log": "log", + ".json": None, + ".expt": None, + ".refl": None, + ".mtz": None, + ".html": "log", + ".sca": None, + } + keep = { + "filtered.mtz": "result", + "filtered_unmerged.mtz": "result", + "filtered.expt": "result", + "filtered.refl": "result", + "filtered.sca": "result", + "merging-stats.json": "graph", + "xia2.multiplex_filtering.json": "result", + } + + # Record these log files first so they appear at the top of the list + # of attachments in SynchWeb + + primary_log_files = [ + working_directory / "xia2.multiplex_filtering.html", + working_directory / "xia2.multiplex_filtering.log", + ] + + allfiles = [] + + if success: + with multiplex_filtering_json.open("r") as fh: + d = json.load(fh) + + # Retain place holder logic from multiplex wrapper for future iterations with clusters + # Note that 'all data' corresponds to the parent multiplex job + # It is also added to the multiplex_filtering_json so the html has user-friendly comparisons + # However, as it is just a copy of the results from multiplex, it is ignored here + + for dataset_name, dataset in d["datasets"].items(): + if dataset_name == "Filtered": + base_dir = working_directory + dimple_symlink = "dimple-xia2.multiplex_filtering" + cluster_prefix = "" + cluster_num = None + elif "coordinate cluster" in dataset_name: + self.log.warning("Not currently applying filtering to clusters") + continue + elif dataset_name == "All data": + self.log.warning( + 'Results for "All Data" already in parent xia2.multiplex run.' + ) + continue + else: + self.log.warning( + f"Ignoring unrecognised dataset pattern {dataset_name}" + ) + continue + + filtered_unmerged_mtz = ( + base_dir / f"{cluster_prefix}filtered_unmerged.mtz" + ) + i_obs = iotbx.merging_statistics.select_data( + filtered_unmerged_mtz.as_posix(), data_labels=None + ) + merging_stats = dataset["merging_stats"] + merging_stats_anom = dataset["merging_stats_anom"] + with (base_dir / f"{cluster_prefix}merging-stats.json").open("w") as fh: + json.dump(merging_stats, fh) + + ispyb_d = { + "commandline": " ".join(command), + "spacegroup": i_obs.space_group().type().lookup_symbol(), + "unit_cell": list(i_obs.unit_cell().parameters()), + "scaling_statistics": {}, + } + for shell in ("overall", "innerShell", "outerShell"): + ispyb_d["scaling_statistics"][shell] = { + "cc_half": lookup(merging_stats, "cc_one_half", shell), + "completeness": lookup(merging_stats, "completeness", shell), + "mean_i_sig_i": lookup( + merging_stats, "i_over_sigma_mean", shell + ), + "multiplicity": lookup(merging_stats, "multiplicity", shell), + "n_tot_obs": lookup(merging_stats, "n_obs", shell), + "n_tot_unique_obs": lookup(merging_stats, "n_uniq", shell), + "r_merge": lookup(merging_stats, "r_merge", shell), + "res_lim_high": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_min", shell) + ), + "res_lim_low": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_max", shell) + ), + "anom_completeness": lookup( + merging_stats_anom, "anom_completeness", shell + ), + "anom_multiplicity": lookup( + merging_stats_anom, "multiplicity", shell + ), + "cc_anom": lookup(merging_stats_anom, "cc_anom", shell), + "r_meas_all_iplusi_minus": lookup( + merging_stats_anom, "r_meas", shell + ), + } + xtriage_results = dataset.get("xtriage") + attachments = [] + + for filename in set(primary_log_files + list(base_dir.iterdir())): + filetype = None + if not filename.is_file(): + continue + filetype = keep_ext.get(filename.suffix) + for file_pattern in keep: + if filename.name.endswith(file_pattern): + filetype = keep[file_pattern] + if filetype is None: + continue + + destination = results_directory / filename.name + if ( + destination.as_posix() in allfiles + and filename not in primary_log_files + ): + destination = ( + results_directory / f"{cluster_prefix}{filename.name}" + ) + + if destination.as_posix() not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + if pipeline_final_params and is_final_result(destination): + destination = final_directory / destination.name + if destination not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + + if filetype: + attachments.append( + { + "file_path": destination.parent.as_posix(), + "file_name": destination.name, + "file_type": filetype, + "importance_rank": ( + 1 + if destination.name.endswith( + ( + "filtered.mtz", + "xia2.multiplex_filtering.html", + "xia2.multiplex_filtering.log", + ) + ) + else 2 + ), + } + ) + + # Add parameters to the environment to be picked up downstream by trigger function + + self.recwrap.environment.update( + { + "filtered_mtz": ( + results_directory / f"{cluster_prefix}filtered.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update( + { + "filtered_unmerged_mtz": ( + results_directory / f"{cluster_prefix}filtered_unmerged.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + + # Send results to ispyb and trigger downstream recipe steps for this dataset + + self.log.info( + f"Triggering downstream recipe steps for dataset: '{dataset_name}" + ) + self.send_results_to_ispyb( + ispyb_d, + xtriage_results=xtriage_results, + cluster_num=cluster_num, + attachments=attachments, + ) + self._success_counter.inc() + else: + self._failure_counter.inc() + return success