Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 189 additions & 88 deletions src/murfey/client/contexts/fib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import re
import threading
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from datetime import datetime
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Type, TypeVar

Expand All @@ -25,12 +24,6 @@
lock = threading.Lock()


@dataclass
class MillingImage:
file: Path
timestamp: float


def _number_from_name(name: str) -> int:
"""
In the AutoTEM and Maps workflows for the FIB, the sites and images are
Expand Down Expand Up @@ -170,10 +163,10 @@ def _parse_boolean(text: str):
# Map class attribute to element name
# Paths are relative to the "Site" node
"preparation": "PreparationSiteLocation/StagePosition/StagePosition",
"chunk_coincidence": "Parameters/ChunkCoincidenceStagePosition/StagePosition",
"chunk": "ChunkSiteLocation/StagePosition/StagePosition",
"thinning_1": "Parameters/ThinningStagePosition/StagePosition",
"thinning_2": "ThinningSiteLocation/StagePosition/StagePosition",
"thinning_1": "ThinningSiteLocation/StagePosition/StagePosition",
"chunk_coincidence": "Parameters/ChunkCoincidenceStagePosition/StagePosition",
"thinning_2": "Parameters/ThinningStagePosition/StagePosition",
}


Expand Down Expand Up @@ -233,6 +226,13 @@ def _file_transferred_to(
return destination


@dataclass
class FIBImage:
images: list[Path] = field(default_factory=list)
output_file: Path | None = None
is_submitted: bool = False


class FIBContext(Context):
def __init__(
self,
Expand All @@ -245,7 +245,7 @@ def __init__(
self._basepath = basepath
self._machine_config = machine_config
self._site_info: dict[int, LamellaSiteInfo] = {}
self._drift_correction_images: dict[int, list[MillingImage]] = {}
self._drift_correction_images: dict[int, FIBImage] = {}

def post_transfer(
self,
Expand All @@ -262,7 +262,6 @@ def post_transfer(
# AutoTEM
# -----------------------------------------------------------------------------
if self._acquisition_software == "autotem":
parts = transferred_file.parts
if transferred_file.name == "ProjectData.dat":
logger.info(f"Found metadata file {transferred_file} for parsing")

Expand All @@ -289,82 +288,32 @@ def post_transfer(
# Update existing dict
self._site_info[site_num] = site_info_new
logger.info(f"Updating metadata for site {site_num}")
return None

elif "DCImages" in parts and transferred_file.suffix == ".png":
lamella_name = parts[parts.index("Sites") + 1]
lamella_number = _number_from_name(lamella_name)
time_from_name = transferred_file.name.split("-")[:6]
timestamp = datetime.timestamp(
datetime(
year=int(time_from_name[0]),
month=int(time_from_name[1]),
day=int(time_from_name[2]),
hour=int(time_from_name[3]),
minute=int(time_from_name[4]),
second=int(time_from_name[5]),
)
)
if not (source := _get_source(transferred_file, environment)):
logger.warning(f"No source found for file {transferred_file}")
return
if not (
destination_file := _file_transferred_to(
environment=environment,
source=source,
file_path=transferred_file,
rsync_basepath=Path(
self._machine_config.get("rsync_basepath", "")
),
)
):
logger.warning(
f"File {transferred_file.name!r} not found on storage system"
)
return
if not self._drift_correction_images.get(lamella_number):
self._drift_correction_images[lamella_number] = [
MillingImage(
timestamp=timestamp,
file=destination_file,
)
]
else:
self._drift_correction_images[lamella_number].append(
MillingImage(
timestamp=timestamp,
file=destination_file,
)
)
gif_list = [
l.file
for l in sorted(
self._drift_correction_images[lamella_number],
key=lambda x: x.timestamp,
)
]
raw_directory = Path(
environment.default_destinations[self._basepath]
).name
# Submit job to backend to construct a GIF
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow.correlative_router",
function_name="make_gif",
token=self._token,
instrument_name=environment.instrument_name,
data={
"lamella_number": lamella_number,
"images": [str(file) for file in gif_list],
"raw_directory": raw_directory,
},
# Endpoint kwargs
year=datetime.now().year,
visit_name=environment.visit,
session_id=environment.murfey_session,
)
# Post drift correction GIF request if it hasn't already been done
if (
(fib_image := self._drift_correction_images.get(site_num, None))
is not None
and not fib_image.is_submitted
and fib_image.output_file is not None
):
if self._make_gif(
environment=environment,
lamella_number=site_num,
images=sorted(fib_image.images),
output_file=fib_image.output_file,
):
with lock:
self._drift_correction_images[
site_num
].is_submitted = True
return None

elif (
"DCImages" in transferred_file.parts
and transferred_file.suffix == ".png"
):
self._make_drift_correction_gif(transferred_file, environment)

# -----------------------------------------------------------------------------
# Maps
# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -491,9 +440,9 @@ def _parse_autotem_metadata(self, file: Path):
)

# Iteratively update fields in the MillingSteps model it's not None
for field, path, func in ACTIVITY_FIELD_MAP:
for field_name, path, func in ACTIVITY_FIELD_MAP:
if (value := _parse_xml_text(activity, path, func)) is not None:
step_info.__setattr__(field, value)
step_info.__setattr__(field_name, value)

# Add info for current step to the site info model
site_info.steps.__setattr__(
Expand All @@ -506,6 +455,158 @@ def _parse_autotem_metadata(self, file: Path):
logger.info(f"Successfully extracted AutoTEM metadata from file {file}")
return all_site_info

def _make_drift_correction_gif(
self,
file: Path,
environment: MurfeyInstanceEnvironment,
):
"""
Helper function to create GIFs using the drift correction images seen by the
FIBContext class. The function uses the metadata returned
"""
parts = file.parts
try:
lamella_name = parts[parts.index("Sites") + 1]
lamella_number = _number_from_name(lamella_name)
except Exception:
logger.warning(
f"Could not extract metadata from file {file}", exc_info=True
)
return None
if not (source := _get_source(file, environment)):
logger.warning(f"No source found for file {file}")
return
if not (
destination_file := _file_transferred_to(
environment=environment,
source=source,
file_path=file,
rsync_basepath=Path(self._machine_config.get("rsync_basepath", "")),
)
):
logger.warning(f"File {file.name!r} not found on storage system")
return

# Create FIBImage instance for this lamella site, or update existing one
if not self._drift_correction_images.get(lamella_number):
with lock:
self._drift_correction_images[lamella_number] = FIBImage(
images=[destination_file]
)
else:
with lock:
self._drift_correction_images[lamella_number].images.append(
destination_file
)
self._drift_correction_images[lamella_number].is_submitted = False

# Determine the output directory to save the milling image to
output_file = self._drift_correction_images[lamella_number].output_file
if output_file is None:
# Early exits if data for creating output image path is absent
# No site info
if (site_info := self._site_info.get(lamella_number)) is None:
logger.debug(f"No metadata found for site {lamella_number} yet")
return None
# No project name
if (project_name := site_info.project_name) is None:
logger.warning(f"No project name associated with site {lamella_number}")
return None
# No stage position information
if all(
getattr(site_info.stage_info, stage_name, None) is None
for stage_name in STAGE_POSITION_NAMES.keys()
):
logger.warning(
f"No stage position information associated with site {lamella_number}"
)
return None
# Determine the slot number
slot_number: int | None = None
for stage_name in reversed(STAGE_POSITION_NAMES.keys()):
if (
stage_info := getattr(site_info.stage_info, stage_name, None)
) is None:
continue
if stage_info.slot_number is None:
continue
else:
slot_number = stage_info.slot_number
break
# Early exit if no slot number
if slot_number is None:
logger.warning(
f"Could not determine slot number of site {lamella_number}"
)
return None
# Determine the path to save the GIF to
try:
visit_index = destination_file.parts.index(environment.visit)
visit_dir = list(reversed(destination_file.parents))[visit_index]
output_file = (
visit_dir
/ "processed"
/ project_name
/ f"grid_{slot_number}"
/ "drift_correction"
/ f"lamella_{lamella_number}.gif"
)
with lock:
self._drift_correction_images[
lamella_number
].output_file = output_file
except Exception:
logger.error(
f"Could not construct drift correction GIF output path for site {lamella_number}"
)
return None

# Submit job to backend to construct a GIF
if self._make_gif(
environment=environment,
lamella_number=lamella_number,
images=sorted(self._drift_correction_images[lamella_number].images),
output_file=output_file,
):
# Mark this dataset as having been submitted
with lock:
self._drift_correction_images[lamella_number].is_submitted = True
logger.info(
f"Submitted request to create drift correction GIF for site {lamella_number}"
)
return None

def _make_gif(
self,
environment: MurfeyInstanceEnvironment,
lamella_number: int,
images: list[Path],
output_file: Path,
):
"""
Submits a POST request to the backend server to create a GIF using the
JSON payload provided. The payload will contain
"""
try:
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow_fib.router",
function_name="make_gif",
token=self._token,
instrument_name=environment.instrument_name,
data={
"lamella_number": lamella_number,
"images": [str(file) for file in images],
"output_file": str(output_file),
},
# Endpoint kwargs
session_id=environment.murfey_session,
)
return True
except Exception:
logger.error(f"Could not submit GIF for site {lamella_number}")
return False

def _register_atlas(self, file: Path, environment: MurfeyInstanceEnvironment):
"""
Constructs the URL and dictionary to be posted to the server, which then triggers
Expand Down
Loading
Loading