diff --git a/ipsframework/services.py b/ipsframework/services.py index 711dfff5..02367cfa 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -62,97 +62,97 @@ class RunningTask(NamedTuple): args: list[str] -def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *args, **keywords): - """This is used by - :meth:`TaskPool.submit_dask_tasks` as the +def launch(executable: Any, + task_name: str, + working_dir: Union[str, os.PathLike], + *args, **kwargs): + """ This is used by :meth:`TaskPool.submit_dask_tasks` as the input to :meth:`dask.distributed.Client.submit`. - Valid keywords: - * `worker_event_logfile` - where JSON log messages are written - * `logfile` - where the task output is written; if not specified, STDOUT used - * `errfile` - where the task error output is written; if not specified, STDOUT used + Valid kwargs: + * `logfile` - where the task output is written; if not specified, + STDOUT used + * `errfile` - where the task error output is written; if not specified, + STDOUT used * `task_env` - A dictionary of environment variables to set * `timeout` - The timeout in seconds for the task to complete. - * `cpus_per_proc` - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. - * `oversubscribe` - If `True`, then the number of processes can exceed the number of cores on the node. Default is `False`. + * `cpus_per_proc` - The number of cpus per process to use for the task. + This implies that the DVMPlugin has set up a DVM daemon for this node. + * `oversubscribe` - If `True`, then the number of processes can exceed the + number of cores on the node. Default is `False`. If the worker has the attribute `dvm_uri_file`, then we are running with a DVM (Distributed Virtual Machine) so the `binary` needs a `prun` prepended pointing to that. - If the worker doesn't have a `lock` attribute, then we create one by - assigning a threading lock to it. This is used to ensure that - the worker's event log is written to in a thread-safe manner. - - :param binary: The binary to launch. Either a string or a class. + :param executable: The binary to launch. Either a string or a class. :param task_name: The name of the task. :param working_dir: The working directory in which to run this task :returns: The task name and the return value from running the binary. """ + import logging from dask.distributed import get_worker # pylint: disable=import-outside-toplevel - worker = get_worker() - if not hasattr(worker, 'lock'): - worker.lock = threading.Lock() + # Later, we use Client.forward_logging() to handle these log messages. + log = logging.getLogger('launch') - worker_name = ''.join(c for c in worker.name if c.isalnum()) + worker = get_worker() + task_key = worker.get_current_task() - worker.logger.info(f'Launching task {task_name} with worker {worker_name} in {working_dir}') + log.info(f'Launching task {task_name} with id {task_key!s} and ' + f'worker {worker.name!s} in {working_dir}') start_time = time.time() os.chdir(working_dir) - worker_event_log = sys.stdout - try: - event_logfile = keywords['worker_event_logfile'].format(worker_name) - except (KeyError, AttributeError): - worker.logger.warning('No worker_event_logfile specified, using stdout for logging') - else: - worker_event_log = open(event_logfile, 'a') - worker.logger.info(f'Worker event log file: {event_logfile}') - ret_val = None - if isinstance(binary, str): - task_stdout = sys.stdout + if isinstance(executable, str): + # This is presumably an external binary executable to be executed + # via a subprocess.Popen() + + # Do we write the Popen stdout to sys.stdout or to a file? + subprocess_stdout = sys.stdout try: - log_filename = keywords['logfile'] + log_filename = kwargs['logfile'] except KeyError: - worker.logger.info('No logfile specified, using stdout for task output') + log.info('No logfile specified, using stdout for task output') else: - task_stdout = open(log_filename, 'w') - worker.logger.info(f'Task output log file: {log_filename}') + subprocess_stdout = open(log_filename, 'w') + log.info(f'Task output log file: {log_filename}') + # Repeat the same for stderr task_stderr = subprocess.STDOUT try: - err_filename = keywords['errfile'] + subprocess_stderr = kwargs['errfile'] except KeyError: - worker.logger.info('No errfile specified, using STDOUT for task errors') + log.info('No errfile specified, using STDOUT for task errors') else: try: - task_stderr = open(err_filename, 'w') + task_stderr = open(subprocess_stderr, 'w') except OSError: - worker.logger.info(f'Could not open errfile {err_filename}, using STDOUT for task errors') + log.info(f'Could not open errfile {subprocess_stderr}, ' + f'using STDOUT for task errors') task_stderr = subprocess.STDOUT else: - worker.logger.info(f'Task error log file: {err_filename}') + log.info(f'Task error log file: {subprocess_stderr}') - task_env = keywords.get('task_env', {}) + task_env = kwargs.get('task_env', {}) new_env = os.environ.copy() new_env.update(task_env) if 'HWLOC_XMLFILE' in new_env: - worker.logger.debug('Removing HWLOC_XMLFILE from task environment') + log.debug('Removing HWLOC_XMLFILE from task environment') del new_env['HWLOC_XMLFILE'] # Check that the DVM environment variables are set. if hasattr(worker, 'dvm_uri_file'): dvm_uri_file = Path(worker.dvm_uri_file) if not dvm_uri_file.exists(): - worker.logger.error(f'DVM URI file {dvm_uri_file} does not exist') - print(f'DVM URI file {dvm_uri_file} does not exist', flush=True) + log.error(f'DVM URI file {dvm_uri_file} does not exist') + # print(f'DVM URI file {dvm_uri_file} does not exist', flush=True) else: - worker.logger.debug(f'Using DVM URI file: {dvm_uri_file}') - print(f'Using DVM URI file: {dvm_uri_file}', flush=True) + log.debug(f'Using DVM URI file: {dvm_uri_file}') + # print(f'Using DVM URI file: {dvm_uri_file}', flush=True) # PMIX_SERVER_URI41 is used by prun to figure out how to talk to the DVM # It can be defined in `task_env` or in `os.environ`, so we look in @@ -160,119 +160,144 @@ def launch(binary: Any, task_name: str, working_dir: Union[str, os.PathLike], *a # in some HPC environments to ensure the output appears in the logs. if task_env is not None and task_env != {}: if 'PMIX_SERVER_URI41' in task_env: - worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 " + log.debug(f"DVM environment variable PMIX_SERVER_URI41 " f"set in task_env to " f"{task_env['PMIX_SERVER_URI41']}") - print(f'DVM environment variable PMIX_SERVER_URI41 set in task_' - f'env to {task_env["PMIX_SERVER_URI41"]}', flush=True) + # print(f'DVM environment variable PMIX_SERVER_URI41 set in task_' + # f'env to {task_env["PMIX_SERVER_URI41"]}', flush=True) if 'PMIX_SERVER_URI41' in os.environ: - worker.logger.debug(f"DVM environment variable PMIX_SERVER_URI41 set " + log.debug(f"DVM environment variable PMIX_SERVER_URI41 set " f"in os.environ to " f"{os.environ['PMIX_SERVER_URI41']}") - print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ ' - f'to {os.environ["PMIX_SERVER_URI41"]}', flush=True) + # print(f'DVM environment variable PMIX_SERVER_URI41 set in os.environ ' + # f'to {os.environ["PMIX_SERVER_URI41"]}', flush=True) - timeout = float(keywords.get('timeout', 1.0e9)) + timeout = float(kwargs.get('timeout', 1.0e9)) - cmd = f'{binary} {" ".join(map(str, args))}' + cmd = f'{executable} {" ".join(map(str, args))}' - worker.logger.debug(f'Launching task {task_name} with command: {cmd}') + log.debug(f'Launching task {task_name} with command: {cmd}') - with worker.lock: - print( - json.dumps({'eventType': 'IPS_LAUNCH_DASK_TASK', 'event_time': time.time(), 'comment': f'task_name = {task_name}, Target = {cmd}'}), - file=worker_event_log, - ) + worker.log_event('ips', + { + 'eventType' : 'IPS_LAUNCH_DASK_TASK', + 'event_time': start_time, + 'state' : 'Running', + 'comment' : f'task_name = {task_name}, ' + f'Task key = {task_key!s}, ' + f'Target = {cmd}' + }) cmd_lst = cmd.split() try: - process = subprocess.Popen(cmd_lst, stdout=task_stdout, + process = subprocess.Popen(cmd_lst, stdout=subprocess_stdout, stderr=task_stderr, cwd=working_dir, preexec_fn=os.setsid, env=new_env) # noqa: PLW1509 (TODO: look into this to potentially avoid deadlocks) except Exception as e: - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': time.time(), - 'comment': f'task_name = {task_name} Exception when calling {binary!s}: {e}', - 'operation': ' '.join(map(str, args)), - } - ), - file=worker_event_log, - ) - worker.logger.error(f'Failed to launch task {task_name} with command {cmd}: {e}') + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}', + 'operation' : ' '.join(map(str, args)), + }) + log.error(f'Failed to launch task {task_name} with ' + f'command {cmd}: {e}') raise try: ret_val = process.wait(timeout) finish_time = time.time() - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': finish_time, - 'comment': f'task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s', - 'start_time': start_time, - 'elapsed_time': finish_time - start_time, - 'target': binary, - 'operation': ' '.join(map(str, args)), - } - ), - file=worker_event_log, - ) + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'state' : 'Succeeded', + 'comment' : f'task_name = ' + f'{task_name},' + f' elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : executable, + 'operation' : ' '.join(map(str, args)), + }) + except subprocess.TimeoutExpired: - with worker.lock: - print( - json.dumps({'eventType': 'IPS_TASK_END', 'event_time': time.time(), 'comment': f'task_name = {task_name}, timed-out after {timeout}s'}), - file=worker_event_log, - ) - os.killpg(process.pid, signal.SIGKILL) - worker.logger.error(f'Task {task_name} with command {cmd} timed out after {timeout}s') + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Timed out', + 'comment' : f'task_name = {task_name}, ' + f'timed-out after ' + f'{timeout}s'}) + process.kill() + log.error(f'Task {task_name} with command {cmd} timed out ' + f'after {timeout}s') ret_val = -1 except Exception as e: - with worker.lock: - print( - json.dumps( - {'eventType': 'IPS_TASK_END', 'event_time': time.time(), 'comment': f'task_name = {task_name} Exception when calling {binary!s}: {e}'} - ), - ) - worker.logger.error(f'Task {task_name} with command {cmd} failed with {e}') + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}'}) + log.error(f'Task {task_name} with command {cmd} failed with {e!s}') + elif isinstance(executable, Callable): + # binary not a string, but is a python callable, so we call it directly + # with the given *args + worker.log_event('ips', + { + 'eventType' : 'IPS_LAUNCH_DASK_TASK', + 'event_time': time.time(), + 'state' : 'Running', + 'comment' : f'task_name = {task_name}, ' + f'Target = {executable.__name__}(' + f'{",".join(map(str, args))})', + }) + + try: + ret_val = executable(*args) + + finish_time = time.time() + + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'state' : 'Succeeded', + 'comment' : f'task_name = {task_name}, ' + f'elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : executable.__name__, + 'return_value': ret_val, + 'operation' : f'({",".join(map(str, args))})', + }) + except Exception as e: + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}'}) + log.error(f'Task {task_name} with callable {executable!s} failed ' + f'with {e!s}') else: - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_LAUNCH_DASK_TASK', - 'event_time': time.time(), - 'comment': f'task_name = {task_name}, Target = {binary.__name__}({",".join(map(str, args))})', - } - ), - file=worker_event_log, - ) - ret_val = binary(*args) - finish_time = time.time() - with worker.lock: - print( - json.dumps( - { - 'eventType': 'IPS_TASK_END', - 'event_time': finish_time, - 'comment': f'task_name = {task_name}, elapsed time = {finish_time - start_time:.2f}s', - 'start_time': start_time, - 'elapsed_time': finish_time - start_time, - 'target': binary.__name__, - 'return_value': ret_val, - 'operation': f'({",".join(map(str, args))})', - } - ), - file=worker_event_log, - ) + raise RuntimeError(f'Binary argument {executable!s} is not a string or ' + f'callable, cannot launch task {task_name}') - worker.logger.info(f'Task {task_name} finished with return value: {ret_val}') + log.info(f'Task {task_name} finished with return value: {ret_val}') return task_name, ret_val @@ -2940,6 +2965,32 @@ def add_task(self, task_name: str, nproc: int, working_dir: str, binary: str, *a self.serial_pool = self.serial_pool and (nproc == 1) self.queued_tasks[task_name] = Task(task_name, nproc, working_dir, binary_fullpath, *args, **keywords['keywords']) + def _process_dask_event(self, event): + """ This will create an IPS monitor event from a Dask event + + These events will have been created in `launch()`. As they are + created, this callback will be invoked to send the corresponding + IPS monitor event. + + This callback is registered in `submit_dask_tasks()`. + + :param event: Dask event tuple of (timestamp, dict) + """ + timestamp, message = event + + self.services.debug(f'Processing dask event: {message!s}, ' + f'timestamp: {timestamp!s}') + + if 'worker' in message: + # Sneaky Dask will surreptitiously add 'worker', which is + # not mentioned in the API documentation. If we don't remove + # this, _send_monitor_event() will fail because it doesn't expect + # this argument. + del message['worker'] + + self.services._send_monitor_event(**message) + + def submit_dask_tasks( self, block=True, @@ -3081,7 +3132,7 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi '--port', '0', ] - self.services.info(f'Scheduler args: {' '.join(args)}') + self.services.info(f'Scheduler args: {" ".join(args)}') self.dask_sched_popen = subprocess.Popen(args) self.dask_sched_pid = self.dask_sched_popen.pid self.services.info(f'Scheduler pid: ' @@ -3169,6 +3220,10 @@ def _make_worker_args(num_workers: int, num_threads: int, use_shifter: bool, shi # logger so that it can be captured by the services. self.dask_client.forward_logging() + # Register callback to handle 'ips' events sent by + # launch() that will be converted to IPS monitor events. + self.dask_client.subscribe_topic('ips', self._process_dask_event) + if dask_worker_plugin is not None: # TODO But what if there is more than one worker plugin? # TODO And what about scheduler plugins? @@ -3315,6 +3370,7 @@ def _shutdown_dask(self): if self.dask_client is not None: # Shutdown handles ending client, scheduler, and workers + self.dask_client.unsubscribe_topic('ips') # unregister handler self.dask_client.shutdown() # TODO a more gentle way to shutdown: @@ -3404,36 +3460,6 @@ def get_dask_finished_tasks_status(self): self._shutdown_dask() self.services.debug(f'get_dask_finished_tasks_status: after _shutdown_dask()') - if self.worker_event_logfile is not None: - self.services.debug(f'get_dask_finished_tasks_status: worker_event_logfile: ' - f'{self.worker_event_logfile!s}') - try: - events = [] - for worker in worker_names: - filename = self.worker_event_logfile.format(worker) - try: - lines = open(filename).readlines() - except IOError: - self.services.exception('Error opening dask worker log file: %s', filename) - else: - # convert to dict and sort by event_time - for line in lines: - try: - events.append(json.loads(line.strip())) - except json.decoder.JSONDecodeError: - self.services.exception('Error reading line %s from dask worker log file: %s', line.strip(), filename) - - events.sort(key=itemgetter('event_time')) - for event in events: - self.services._send_monitor_event(**event) - except Exception as e: - # If it fails for any other reason, make sure we can continue - self.services.exception('Error while reading dask worker log files: %s', str(e)) - else: - for worker in worker_names: - if os.path.isfile(self.worker_event_logfile.format(worker)): - os.remove(self.worker_event_logfile.format(worker)) - self.finished_tasks = {} self.active_tasks = {} self.services.wait_task(self.dask_workers_tid)