Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions src/dlstbx/services/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import os.path
import xml.etree.ElementTree as ET
from datetime import datetime
from pathlib import Path

import workflows.recipe
from workflows.services.common_service import CommonService

from dlstbx.util import INDUSTRIAL_CODES


class Dropfile:
"""A class encapsulating the XML dropfile tree as it is built up."""
Expand Down Expand Up @@ -132,6 +135,16 @@ def rangifier(numbers):
group = list(group)
yield group[0][1], group[-1][1]

def visit_is_archivable(
self, visit: str, allowed_industrial_visit_codes: tuple[str, ...]
) -> bool:
"""Return false if visit has an industrial code that is not in the list of allowed industrial visit codes."""
if visit.startswith(tuple(INDUSTRIAL_CODES)) and not visit.startswith(
allowed_industrial_visit_codes
):
return False
return True

def archive_dcid(self, rw, header, message):
"""Archive collected datafiles connected to a data collection."""

Expand All @@ -141,6 +154,21 @@ def archive_dcid(self, rw, header, message):

# Extract parameters from the recipe
params = rw.recipe_step["parameters"]

allowed_industrial_visit_codes = tuple(
self.config.storage.get(
"zocalo.archiver.allowed_industrial_visit_codes", []
)
)
if not self.visit_is_archivable(
params["visit"], allowed_industrial_visit_codes
):
self.log.info(
f"Skipping archiving of {params['pattern']} because it is from a forbidden visit"
)
self._transport.transaction_commit(txn)
return

self.log.info("Attempting to archive %s", params["pattern"])

settings = params.copy()
Expand All @@ -151,10 +179,12 @@ def archive_dcid(self, rw, header, message):

file_range_limit = int(settings.get("limit-files", 0))

filepaths = params["pattern"].split("/")
_, _, beamline, _, _, visit_id = filepaths[0:6]
filepath = Path(params["pattern"])
dataset_name = Path(*filepath.parts[6:-1]).as_posix() or "topdir"
beamline = params["beamline"]
visit_id = params["visit"]

df = Dropfile(visit_id.upper(), beamline, "/".join(filepaths[6:-1]) or "topdir")
df = Dropfile(visit_id.upper(), beamline, dataset_name)

message_out = {"success": 0, "failed": 0}
files_not_found = []
Expand Down Expand Up @@ -287,26 +317,28 @@ def archive_filelist(self, rw, header, message):
)
file_range_limit = int(params.get("limit-files", 0))

filepaths = filelist[0].split("/")
beamline = "unknown"
visit_id = "unknown"
try:
if filepaths[1:3] == ["dls", "mx"]:
beamline = "i02-2" # VMXi currently only beamline with new visit path structure
else:
beamline = filepaths[2]
visit_id = filepaths[5]
except IndexError:
pass
visit_id = params.get("visit", visit_id)
beamline = params.get("beamline", beamline)

# Conditionally acknowledge receipt of the message
txn = self._transport.transaction_begin(subscription_id=header["subscription"])
self._transport.ack(header, transaction=txn)

visit_id = params["visit"]
beamline = params["beamline"]
allowed_industrial_visit_codes = tuple(
self.config.storage.get(
"zocalo.archiver.allowed_industrial_visit_codes", []
)
)
if not self.visit_is_archivable(visit_id, allowed_industrial_visit_codes):
self.log.info(
f"Skipping archiving of {filelist} because it is from a forbidden visit"
)
self._transport.transaction_commit(txn)
return

filepath = Path(filelist[0])
dataset_name = Path(*filepath.parts[6:-1]).as_posix() or "topdir"
# Archive files
df = Dropfile(visit_id.upper(), beamline, "/".join(filepaths[6:-1]) or "topdir")
df = Dropfile(visit_id.upper(), beamline, dataset_name)

message_out = {"success": 0, "failed": 0}
files_not_found = []
Expand Down
Loading