Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions apps/api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ dependencies = [
# ControlTimeoutError; `p4p.client.asyncio.Disconnected` ->
# ControlNotConnectedError; `RemoteError` -> ControlWriteRejectedError.
"p4p>=4,<5",
# globus-sdk (TransferPort arc): production Globus Transfer client used by
# `GlobusTransferPort` at `cora/operation/adapters/`, the first real
# `TransferPort` substrate. The adapter takes an already-authorized
# `TransferClient` by injection (the OAuth2 dance is a composition-root
# concern, not the adapter's), builds a `TransferData` payload, and maps
# Globus task status (ACTIVE / INACTIVE / SUCCEEDED / FAILED) into
# `TransferState`; INACTIVE -> Suspended (credential expiry), a FAILED task
# carrying subtasks_failed > 0 -> the partial signal. Error mapping:
# `GlobusAPIError` dispatched on `.http_status` -> Rejected / AccessDenied /
# EndpointUnreachable; `NetworkError` / connection / timeout -> the
# transport families. Pin <4 to flag a future major SDK shift in CI.
"globus-sdk>=3,<4",
]

[dependency-groups]
Expand Down
176 changes: 176 additions & 0 deletions apps/api/src/cora/api/_distribution_materializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""DistributionMaterializer: leg B of stage-then-reconstruct.

Sequences the three acts that turn "bytes copied to another tier" into a fact
CORA trusts: (1) move the bytes over a `TransferPort`, (2) on success record a
new `Distribution` of the SAME raw Dataset at the analysis-tier Storage Supply
(`register_distribution`), (3) record a checksum `Attestation` over the landed
bytes (`record_attestation`), whose Match flips the Distribution to Verified in
the Data BC projection. That Verified-at-tier fact is exactly what leg C's
start_run gate reads ([[project_run_input_dependency_design]]).

This is an orchestration concern, so it lives in `cora.api` (the only module
that may reach both `cora.operation.ports` and the Data BC handlers). It is the
materialize edge job the EdgeConductor will drive; here it is a self-contained
unit with injected collaborators, exercised against fakes.

## What it owns vs trusts

It OWNS the sequence and the transfer gate: it begins the transfer, observes to
a terminal (waiting through a non-terminal `Suspended`), and registers ONLY if
the transfer Succeeded. It TRUSTS the caller's `RegisterDistribution`: the
caller (which holds the parent Dataset) builds it with the byte-identical-copy
fields (checksum / byte_size / media_type equal to the parent Dataset); the
Data BC decider enforces that equality. The `RecordAttestation` is built here
from the registration's `dataset_id` plus the new Distribution id.

## Eventual-consistency note

`record_attestation` returns an attestation id for Match, Mismatch, AND
Unreachable; the Distribution's flip to Verified (Match) or Stale (Mismatch) is
a projection-only update the Data BC applies from `AttestationRecorded`. So a
successful `materialize` means "moved + registered + attested", not "Verified";
the Verified status is the projection's eventual flip, which leg C's gate reads
later. This mirrors the safety-envelope eventual-consistency window.

## Deferred

A long-running continuous sync would complete via a later signal rather than a
synchronous observe-loop; this unit polls to a terminal with a runaway bound.
Real-adapter poll backoff and the signal-driven variant are deferred.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

from cora.data.features.record_attestation.command import RecordAttestation
from cora.infrastructure.routing import NIL_SENTINEL_ID
from cora.operation.ports.transfer_port import TransferState

if TYPE_CHECKING:
from uuid import UUID

from cora.data.features.record_attestation.handler import Handler as RecordAttestationHandler
from cora.data.features.register_distribution.command import RegisterDistribution
from cora.data.features.register_distribution.handler import (
Handler as RegisterDistributionHandler,
)
from cora.operation.ports.transfer_port import (
TransferHandle,
TransferPort,
TransferProgress,
TransferRequest,
)

_MAX_OBSERVATIONS = 10_000
"""Runaway bound on the observe-loop. A real transfer reaches a terminal in far
fewer polls; this is a backstop against a substrate that never terminates, not a
tuning knob. Exceeding it raises rather than spinning forever."""

_CHECKSUM_VERIFIED_KIND = "ChecksumVerified"
"""The AttestationKind wire value the materialize edge job requests. Mirrored as
a literal because `cora.api` may not import `cora.data.aggregates` (tach); the
record_attestation handler re-validates it against the closed AttestationKind."""


@dataclass(frozen=True)
class MaterializationOutcome:
"""The result of one materialize: the transfer terminal plus what it recorded.

`distribution_id` and `attestation_id` are None when the transfer did not
Succeed (nothing is registered off an incomplete move). `materialized` is the
one-call success predicate. `transfer_state` and `transfer_detail` carry the
terminal observation so a caller can record why a non-success move stopped.
"""

transfer_state: TransferState
distribution_id: UUID | None = None
attestation_id: UUID | None = None
transfer_detail: str | None = None

@property
def materialized(self) -> bool:
"""True iff the move Succeeded and a Distribution was registered."""
return self.distribution_id is not None


@dataclass
class DistributionMaterializer:
"""Drives transfer -> register_distribution -> record_attestation.

Construct with a `TransferPort` and the two Data BC handlers (the bare
`Handler` protocols `register_distribution.bind` / `record_attestation.bind`
return); call `materialize` per move. See the module docstring for what it
owns vs trusts.
"""

transfer_port: TransferPort
register_distribution: RegisterDistributionHandler
record_attestation: RecordAttestationHandler

async def materialize(
self,
transfer: TransferRequest,
registration: RegisterDistribution,
*,
principal_id: UUID,
correlation_id: UUID,
causation_id: UUID | None = None,
surface_id: UUID = NIL_SENTINEL_ID,
) -> MaterializationOutcome:
"""Move the bytes, then (only on success) register + attest the copy.

Returns a `MaterializationOutcome`. A non-Succeeded transfer terminal
short-circuits with no Distribution registered. Transfer-port errors and
Data BC domain errors propagate to the caller unchanged.
"""
handle = await self.transfer_port.begin(transfer)
progress = await self._observe_to_terminal(handle)
if progress.state is not TransferState.SUCCEEDED:
return MaterializationOutcome(
transfer_state=progress.state, transfer_detail=progress.detail
)

distribution_id = await self.register_distribution(
registration,
principal_id=principal_id,
correlation_id=correlation_id,
causation_id=causation_id,
surface_id=surface_id,
)
attestation_id = await self.record_attestation(
RecordAttestation(
dataset_id=registration.dataset_id,
distribution_id=distribution_id,
kind=_CHECKSUM_VERIFIED_KIND,
),
principal_id=principal_id,
correlation_id=correlation_id,
causation_id=causation_id,
surface_id=surface_id,
)
return MaterializationOutcome(
transfer_state=TransferState.SUCCEEDED,
distribution_id=distribution_id,
attestation_id=attestation_id,
transfer_detail=progress.detail,
)

async def _observe_to_terminal(self, handle: TransferHandle) -> TransferProgress:
"""Poll `observe` until a terminal, waiting through a non-terminal Suspended.

Bounded by `_MAX_OBSERVATIONS` as a runaway backstop; a real long-running
transfer would terminalize via a later signal instead (deferred).
"""
for _ in range(_MAX_OBSERVATIONS):
progress = await self.transfer_port.observe(handle)
if progress.state.is_terminal:
return progress
msg = (
f"transfer {handle!r} did not reach a terminal within {_MAX_OBSERVATIONS} observations"
)
raise RuntimeError(msg)


__all__ = ["DistributionMaterializer", "MaterializationOutcome"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""DatasetDistributionLookup port: cross-BC query for a Dataset's Distributions.

Used by the Run BC start_run gate (leg C of stage-then-reconstruct) to check
that a reconstruction's input Dataset has a Verified Distribution before the Run
may start ([[project_run_input_dependency_design]]). Cross-BC mirror of
`SupplyLookup` / `ClearanceLookup`: one implementor (Data BC ships the Postgres
adapter reading `proj_data_distribution_summary`), multiple consumers (the Run
start gate first). It lives in `cora.infrastructure.ports` because Run may not
import the Data-internal `cora.data.ports.DistributionLookup` (that one is the
Edition-shaped lowest-id canonical pick, a different need).

## Decider-gates, not port-gates

Returns EVERY non-Discarded Distribution for the Dataset regardless of status,
so the start_run decider can both gate on Verified AND produce a useful
diagnostic ("the input has a Distribution but it is Stale" vs "no Distribution
at all"). This is the `SupplyLookup` posture: the port returns rows, the decider
partitions on `status`. It deliberately does NOT reuse the canonical-pick query,
whose lowest-id row may be Stale while a higher-id Distribution is Verified.

`status` is the `DistributionStatus` value as a plain string (matches the
projection's TEXT column); `supply_id` is carried for the deferred reachability
check (which Storage Supply / tier the copy rests on); `distribution_id` is
carried for diagnostics and the eventual lineage record.
"""

from dataclasses import dataclass
from typing import Protocol
from uuid import UUID


@dataclass(frozen=True)
class DatasetDistributionLookupResult:
"""A non-Discarded Distribution of a Dataset, for the Run-start input gate."""

distribution_id: UUID
dataset_id: UUID
supply_id: UUID
status: str


class DatasetDistributionLookup(Protocol):
"""Cross-BC port: query a Dataset's non-Discarded Distributions from the Run BC."""

async def find_by_dataset(
self, dataset_id: UUID
) -> tuple[DatasetDistributionLookupResult, ...]:
"""Return every non-Discarded Distribution for `dataset_id` (any status).

Empty tuple when the Dataset has no non-Discarded Distribution. The
decider gates on `status == "Verified"`; the port does not filter on
status so the decider can distinguish Stale from absent.
"""
...


class NoDatasetDistributionsLookup:
"""Test stub: every Dataset has no Distribution (the not-present gate path).

The conservative default for tests that do not seed the input gate: the
start_run decider sees an input with no Verified Distribution and raises.
"""

async def find_by_dataset(
self, dataset_id: UUID
) -> tuple[DatasetDistributionLookupResult, ...]:
_ = dataset_id
return ()


class SeededDatasetDistributionLookup:
"""Test stub: returns the Distributions configured per Dataset id.

Construct with a mapping `{dataset_id: (result, ...)}`; an unmapped Dataset
returns an empty tuple (absent). Lets a gate test seed a Verified row, a
Stale-only row, or no row to exercise each decider branch.
"""

def __init__(self, by_dataset: dict[UUID, tuple[DatasetDistributionLookupResult, ...]]) -> None:
self._by_dataset = dict(by_dataset)

async def find_by_dataset(
self, dataset_id: UUID
) -> tuple[DatasetDistributionLookupResult, ...]:
return self._by_dataset.get(dataset_id, ())


__all__ = [
"DatasetDistributionLookup",
"DatasetDistributionLookupResult",
"NoDatasetDistributionsLookup",
"SeededDatasetDistributionLookup",
]
Loading
Loading