Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Copyright (c) 2021-2026 Claudio Satriano <satriano@ipgp.fr>

## [unreleased]

- Optional parallel processing for `scan_templates`: time chunks are
dispatched to worker processes, spreading the cross-correlation across
cores and overlapping waveform downloads. On by default with automatic
worker-count detection; configurable with the `template_scan_nprocs`
config option or the `--nprocs` command-line option (`1` disables it).
- New interactive curses pager for all ``print_`` commands
(``print_catalog``, ``print_pairs``, ``print_families``).
Automatically activated when output is a terminal; use ``--no-pager``
Expand Down
3 changes: 3 additions & 0 deletions requake/config/configspec.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ template_end_time = string(default=2021-08-24T00:00:00)
time_chunk = float(default=3600)
## Overlap between time chunks (in seconds)
time_chunk_overlap = float(default=60)
## Number of worker processes for the template scan.
## 0 means automatic selection; 1 disables parallelism.
template_scan_nprocs = integer(min=0, default=0)
## Minimum ratio between cross-correlation (cc) and median absolute deviation
## (MAD) of cross-correlation (cc_mad). A detection is declared when:
## cc/cc_mad > min_cc_mad_ratio
Expand Down
7 changes: 7 additions & 0 deletions requake/config/parse_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,13 @@ def parse_arguments(progname='requake'):
'File must be in SAC format, with a P pick in the '
'"A" header field'
)
scan_templates.add_argument(
'--nprocs',
type=_nonnegative_int,
default=None,
help='number of worker processes for scan_templates '
'(0: auto, 1: disable parallelism)'
)
# ---
# --- wfcache
wfcache = subparser.add_parser(
Expand Down
179 changes: 179 additions & 0 deletions requake/scan/scan_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
GNU General Public License v3.0 or later
(https://www.gnu.org/licenses/gpl-3.0-standalone.html)
"""
import io
import logging
import os
import sys
Expand Down Expand Up @@ -149,6 +150,179 @@ def _read_templates():
return templates


# ---------------------------------------------------------------------------
# Optional parallel scan over time chunks.
#
# Time chunks are independent units of work: each chunk scans all templates
# over one time window. Workers run in separate processes so the CPU-bound
# cross-correlation is spread across cores and the FDSN download I/O of
# different chunks overlaps. The config singleton is rebuilt inside each
# worker from a pickle-safe snapshot, because the network clients it holds
# are not pickleable. Detections produced twice in the overlap between
# adjacent chunks are deduplicated by the database UNIQUE constraint on
# (family_number, trace_id, evid), exactly as in the serial scan.
# ---------------------------------------------------------------------------
_worker_templates = []


def _template_time_chunks():
"""
Build the list of (t0, t1) windows scanned by the template scan.

The windows match the serial scan exactly: a new window starts every
``time_chunk`` seconds and spans ``time_chunk + time_chunk_overlap``.

:return: list of (starttime, endtime) tuples
:rtype: list
"""
chunks = []
time = config.template_start_time
time_chunk = config.time_chunk
overlap = config.time_chunk_overlap
while time <= config.template_end_time:
chunks.append((time, time + time_chunk + overlap))
time += time_chunk
return chunks


def _resolve_template_scan_nprocs(nchunks):
"""
Resolve the effective number of worker processes for the template scan.

The value comes from the ``--nprocs`` command-line option, falling back
to the ``template_scan_nprocs`` config parameter. A value of 0 selects the
number of available CPUs (minus one when more than one is available) and a
value of 1 disables parallelism. The result is capped by ``nchunks``.

:param nchunks: number of time chunks to process
:type nchunks: int
:return: effective number of worker processes (at least 1)
:rtype: int
"""
import multiprocessing
cli_nprocs = getattr(config.args, 'nprocs', None)
config_nprocs = getattr(config, 'template_scan_nprocs', 0)
requested = cli_nprocs if cli_nprocs is not None else config_nprocs
if requested is None or requested < 0:
requested = 0
if requested == 0:
base_nprocs = multiprocessing.cpu_count()
if base_nprocs > 1:
base_nprocs -= 1
else:
base_nprocs = requested
return min(max(1, base_nprocs), max(1, nchunks))


def _scan_templates_worker_initializer(cfg_dict, templates):
"""
Initialize a worker process for the parallel template scan.

The pickle-safe config snapshot is restored into the module-level config
singleton, the network clients are recreated inside the worker process and
the templates are stored for reuse across the chunks handled by the worker.

The client-connection and logging helpers are shared with the catalog scan
to keep a single definition of how a worker connects to data services.

:param cfg_dict: pickle-safe config snapshot from
:func:`requake.config.to_picklable_config_dict`
:type cfg_dict: dict
:param templates: list of template traces
:type templates: list
"""
import signal
from ..config import from_picklable_config_dict
from .scan_catalog_workers import (
_connect_worker_clients, _silence_worker_console_logging
)
global _worker_templates
_silence_worker_console_logging()
signal.signal(signal.SIGINT, signal.SIG_IGN)
restored_cfg = from_picklable_config_dict(cfg_dict)
config.clear()
config.update(restored_cfg)
_connect_worker_clients()
_worker_templates = templates


def _scan_chunk_worker(time_range):
"""
Scan all templates over one time chunk (worker process entry point).

The same :func:`_scan_family_template` used by the serial scan is called
here, so the detection logic is identical. Progress written to stdout by
that function is suppressed in the worker to keep the parent progress line
readable.

:param time_range: (starttime, endtime) of the chunk
:type time_range: tuple
:return: list of detection tuples (family_number, trace_id, event, cc_max)
:rtype: list
"""
import contextlib
t0, t1 = time_range
detections = []
with contextlib.redirect_stdout(io.StringIO()):
for template in _worker_templates:
try:
detection = _scan_family_template(template, t0, t1)
except NoWaveformError:
continue
if detection is not None:
detections.append(detection)
trace_cache.clear()
return detections


def _scan_templates_parallel(templates, nprocs):
"""
Scan templates over continuous data using a pool of worker processes.

Each time chunk is dispatched to a worker that scans all templates over
that window. Detections are collected in the parent process and written to
the database in a single transaction. Duplicate detections from the
overlap between adjacent chunks are removed by the database UNIQUE
constraint.

:param templates: list of template traces
:type templates: list
:param nprocs: number of worker processes
:type nprocs: int
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
from ..config import to_picklable_config_dict
chunks = _template_time_chunks()
cfg_dict = to_picklable_config_dict(config)
logger.info(
'Parallel template scan: %d time chunks, %d worker processes',
len(chunks), nprocs
)
with ProcessPoolExecutor(
max_workers=nprocs,
initializer=_scan_templates_worker_initializer,
initargs=(cfg_dict, templates),
) as executor:
futures = {
executor.submit(_scan_chunk_worker, chunk): idx
for idx, chunk in enumerate(chunks)
}
ordered = [None] * len(chunks)
for done, future in enumerate(as_completed(futures), start=1):
ordered[futures[future]] = future.result()
sys.stdout.write(f'Scanned {done}/{len(chunks)} time chunks\r')
sys.stdout.write('\n')
# Flatten in chunk order so that, for detections seen twice in the overlap
# between two chunks, the later chunk wins the database REPLACE, matching
# the serial scan order.
detections = [
detection for chunk_detections in ordered
for detection in chunk_detections
]
if detections:
write_template_detections(detections, append=True)


def scan_templates():
"""Scan a continuous waveform stream using one or more templates."""
try:
Expand All @@ -165,6 +339,11 @@ def scan_templates():
logger.info('Scan aborted by user; previous detections kept')
rq_exit(0)
clear_template_detections()
nchunks = len(_template_time_chunks())
nprocs = _resolve_template_scan_nprocs(nchunks)
if nprocs > 1:
_scan_templates_parallel(templates, nprocs)
return
time = config.template_start_time
time_chunk = config.time_chunk
overlap = config.time_chunk_overlap
Expand Down
Loading