From b7a9dc3c47a5e0c2dac61069a8f8a42353405e17 Mon Sep 17 00:00:00 2001 From: jws52 <jws52@cam.ac.uk> Date: Thu, 23 Nov 2023 16:02:09 +0000 Subject: [PATCH] feat: Advisory depends on success of earlier jobs --- coordinator/ProcessorAdvisory.py | 42 ++++++++++++++++++++++++++-- coordinator/ProcessorEpidemiology.py | 4 ++- coordinator/ProcessorUtils.py | 20 ++++++------- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 1a9bd51..8815f3e 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -2,19 +2,26 @@ '''Functions to process the advisory component.''' import logging +from typing import List # gitlab projects # TODO: Package these projects so they are robust for importing from AdvisoryBuilder import DataGatherer # created by jws52 from Processor import Processor -from ProcessorUtils import add_filters_to_sublogger, short_name +from ProcessorUtils import ( + add_filters_to_sublogger, + endScript, + open_and_check_config, + query_past_successes, + short_name +) class ProcessorAdvisory(Processor): def process_pre_job(self, args): - return True + return self.process_pre_job_advisory(args) def process_in_job(self, jobPath, status, configjson, component) -> object: @@ -30,6 +37,37 @@ class ProcessorAdvisory(Processor): logger = logging.getLogger('Processor.Advisory') add_filters_to_sublogger(logger) + def process_pre_job_advisory(self,input_args: dict): + + self.logger.info('started process_pre_job_advisory()') + + # check configs can be loaded + config_fns: List[str] = input_args['config_paths'] + for configFile in config_fns: + try: + config_i = open_and_check_config(configFile) + except: + self.logger.exception(f"Failure in opening or checking config {configFile}") + endScript(premature=True) + + # check pre-requisite jobs are complete + + # Which jobs to check are defined by the config. It may include epi + # and surveys. + + # Note that yesterday's surveys should also have succeeded, but that + # needs a YesterdayDateString available for the survey part of + # advisory config to point to the status file. + + dependent_components = config_i['Advisory'].get( + 'DependentComponents', + ['Deposition','Environment']) + + for dependent_component in dependent_components: + + query_past_successes(input_args, dependent_component) + + return True def process_in_job_advisory(self, jobPath, status, config, component): '''Generates a word processor file containing some basic survey statistics diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index a462f72..7044dd8 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -97,7 +97,9 @@ class ProcessorEpidemiology(Processor): self.logger.info('started process_pre_job_epi()') # check pre-requisite jobs are complete - query_past_successes(input_args) + query_past_successes(input_args, 'Deposition') + + query_past_successes(input_args, 'Environment') config_fns: List[str] = input_args['config_paths'] diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index ce6924f..8e02679 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -269,26 +269,29 @@ def query_component_success(config_i,job_run: str, job_to_check: str): it raises an error.''' # check if deposition data is readily available - dep_success_file = Template(config_i[job_run][job_to_check]['SuccessFileTemplate']).substitute(**config_i) + status_file = Template(config_i[job_run][job_to_check]['SuccessFileTemplate']).substitute(**config_i) try: - query_proceed(dep_success_file,'deposition') + query_proceed(status_file,'deposition') except: if 'AlternativeSuccessFileTemplate' not in config_i[job_run][job_to_check]: logger.warning(f"No AlternativeSuccessFileTemplate to check for") endScript(premature=True) - dep_success_file_alt = Template(config_i[job_run][job_to_check]['AlternativeSuccessFileTemplate']).substitute(**config_i) - query_proceed(dep_success_file_alt,job_to_check.lower()) + status_file_alt = Template(config_i[job_run][job_to_check]['AlternativeSuccessFileTemplate']).substitute(**config_i) + query_proceed(status_file_alt,job_to_check.lower()) return True -def query_past_successes(input_args: dict): +def query_past_successes(input_args: dict, + component_to_query: str = 'Deposition'): '''Checks if deposition and environment jobs are already completed successfully. If not, it raises an error.''' component: str = input_args['component'] + assert component_to_query in ['Deposition','Environment','Epidemiology'] + # check configs can be loaded config_fns: List[str] = input_args['config_paths'] for configFile in config_fns: @@ -301,11 +304,8 @@ def query_past_successes(input_args: dict): # some config initialisation is necessary config_i['StartString'] = input_args['start_date'] - # check if deposition data is readily available - query_component_success(config_i,component,'Deposition') - - # check if environment data is readily available - query_component_success(config_i,component,'Environment') + # check if dependent job is readily available + query_component_success(config_i,component,component_to_query) return True -- GitLab