diff --git a/coordinator/Processor.py b/coordinator/Processor.py index ee37790cdde64c5e63ab12badcfd67888f6ba199..721d741d489ccc08a114017ef064b07867701118 100755 --- a/coordinator/Processor.py +++ b/coordinator/Processor.py @@ -17,7 +17,7 @@ or:: $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 ''' from abc import abstractmethod, ABCMeta -from typing import List, Union, Any +from typing import List, Union, Any, Dict print("Make sure to `conda activate py3EWSepi` environment!") @@ -62,29 +62,33 @@ class Processor: def __init__(self) -> None: super().__init__() - self.setup() + time_now = datetime.datetime.today() + # dateToday = timeNow.date() + self.todayString = time_now.strftime('%Y%m%d') + self.nowString = time_now.strftime('%Y%m%d-%H%M-%S') + # self.setup() - def setup(self): + def setup_logging(self, job_log_file_path: str, is_live: bool): # initialise default values for configuration script_name = 'Processor' - timeNow = datetime.datetime.today() - dateToday = timeNow.date() - self.todayString = timeNow.strftime('%Y%m%d') - self.nowString = timeNow.strftime('%Y%m%d-%H%M-%S') - # get the path to this script script_path = os.path.dirname(__file__) + '/' coordinator_path = script_path # log file for all jobs + ##todo how does this work when there are several processors running at once, i.o. errors? log_path_project = f"{coordinator_path}logs/log.txt" + # make the job_log_file_path parent dirs if it does not exist + job_log_file_parent_dirs = os.path.dirname(job_log_file_path) + Path(job_log_file_parent_dirs).mkdir(parents=True, exist_ok=True) + # job-specific log file will be written here until a job directory exits, when it will be moved there - self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" + # self.log_path_default = f"{coordinator_path}logs/log_{self.nowString}.txt" # get the email credentials file path from the environment variables assert 'EMAIL_CRED' in os.environ @@ -149,9 +153,16 @@ class Processor: 'level': 'INFO', 'formatter': 'simple', 'filters': ['mask_passwords'], - 'filename': self.log_path_default, + 'filename': job_log_file_path, 'mode': 'a', # 'a' for append }, + # logging for job to stderr (useful for Airflow to catch in its logs) + 'handler_stderr': { + 'class': 'logging.StreamHandler', + 'level': 'INFO', + 'formatter': 'simple', + 'filters': ['mask_passwords'] + }, # to email errors to maintainers 'handler_buffered_email': { 'class': 'BufferingSMTPHandler.BufferingSMTPHandler', @@ -172,7 +183,7 @@ class Processor: # i.e. with logging.getLogger('Process.') script_name: { 'level': 'INFO', - 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email'], + 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email', 'handler_stderr'], 'propagate': True, }, # this is activated when this script is called on the command line @@ -180,7 +191,7 @@ class Processor: # i.e. with logging.getLogger(__name__) when name == '__main__' '__main__': { 'level': 'INFO', - 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email'], + 'handlers': ['handler_rot_file', 'handler_file', 'handler_buffered_email', 'handler_stderr'], 'propagate': True, } } @@ -192,6 +203,10 @@ class Processor: # create a logger named according to how the file is called # logger = logging.getLogger(__name__) self.logger = logging.getLogger(script_name) + if not is_live: + # remove the log handler that would send emails + self.logger.handlers = [h for h in self.logger.handlers if + not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] self.loglevels = {'debug': logging.DEBUG, 'info': logging.INFO, @@ -281,7 +296,7 @@ class Processor: my_parser.add_argument( '-l', '--loglevel', action = 'store', - choices = list(self.loglevels.keys()), + choices = list(["debug", "info", "warning", "error", "critical"]), default = 'info', help = 'verbosity of log messaging (debug, info, warning, error, critical)\n default is debug', dest = 'log_level', # this names the attribute that will be parsed @@ -321,10 +336,10 @@ class Processor: # Check the args - self.logger.info(f"Command-line options are:\n{args}") + print(f"Command-line options are:\n{args}") if not isinstance(args.config_paths, list): - self.logger.error('Expecting a list of config paths') + print('Expecting a list of config paths') raise RuntimeError # check the startstring @@ -340,24 +355,23 @@ class Processor: assert provided_start_date <= today_date except (ValueError, AssertionError) as e: - self.logger.exception( + raise Exception( "Provided start date string is formatted incorrectly or out of range, or end date not also defined") - raise dictionary: dict = vars(args) return dictionary def set_log_level(self, log_level: str): - new_log_level = self.loglevels[log_level] + new_log_level_code = self.loglevels[log_level] # modify log level of all self.loggers - self.logger.info(f"logging level being changed to {new_log_level} because of command-line option") + self.logger.info(f"logging level being changed to {new_log_level_code} ({log_level}) because of command-line option") loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] - for logger_i in loggers: logger_i.setLevel(new_log_level) + for logger_i in loggers: logger_i.setLevel(new_log_level_code) - def build_universal_config(self, configs: list, component: str, universal_config = None): + def build_universal_config(self, configs: list, component: str, universal_config = None) -> dict: '''This config obtains aspects of each dict in configs that must be common to them all. ''' @@ -407,57 +421,76 @@ class Processor: return universal_config + def prepare_job_directory(self, job_path): + """ + create job directory or archive if already exists (due to a rerun) + + :param job_path: + :return: + """ + if os.path.exists(job_path): + archived_dir_path = f"{job_path}-ARCHIVE_{self.nowString}" + message = f"Job path {job_path} already exists, so moving it to {archived_dir_path}" + print(message) + os.rename(job_path, archived_dir_path) + + Path(job_path).mkdir(parents = True, exist_ok = False) + - def run_process(self, args: dict): + def run_process(self, + args: dict) -> None: # check initial state of each config file, and gather terms that must apply # across all provided configs - if not args["live"]: - # remove the log handler that would send emails - self.logger.handlers = [h for h in self.logger.handlers if - not isinstance(h, BufferingSMTPHandler.BufferingSMTPHandler)] - config_paths: List[str] = args['config_paths'] component: str = args['component'] start_date: str = args['start_date'] noupload: bool = args['noupload'] clearup: bool = args['clearup'] + # load universal configuration universal_config = self.build_universal_config(config_paths, component) - universal_config['StartString'] = start_date - self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") + # determine job directory + workspacePath = universal_config['WorkspacePathout'] + job_path: str = f'{workspacePath}{short_name[component]}_{start_date}' + self.prepare_job_directory(job_path) + + log_file_path = f"{job_path}/log.txt" + is_live: bool = args["live"] + self.setup_logging(log_file_path, is_live) + + self.set_log_level(args['log_level']) + self.set_component_logger() + self.logger.info("==========") + self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") + self.logger.info(f"Universal config is\n{json.dumps(universal_config, indent = 2)}") + self.logger.info(f"Job path will be {job_path}") workspacePath = universal_config['WorkspacePathout'] # process_pre_job = getattr(ProcessorComponents, universal_config['ProcessPreJob']) # # process_in_job = getattr(ProcessorComponents, universal_config['ProcessInJob']) # - # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) - - # determine job directory - jobPath: str = f'{workspacePath}{short_name[component]}_{start_date}' - self.logger.info(f"Job path will be {jobPath}") + # process_EWS_plotting = getattr(ProcessorComponents, universal_config['ProcessEWSPlotting']) # note that file date represents preceding 3 hours, so day's data starts at file timestamp 0300 UTC - startTime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') + start_time: datetime = datetime.datetime.strptime(start_date + '03', '%Y%m%d%H') + start_time_string: str = start_time.strftime('%Y-%m-%d-%H%M') # run any checks before creating a job directory # if this fails, then make a note once there is a job directory ready = self.process_pre_job(args) - - if not ready: - self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") - - # create job directory - Path(jobPath).mkdir(parents = True, exist_ok = True) + # + # if not ready: + # self.logger.info(f"Process_pre_job raised an error, continuing to create job directory to make a record of it") # lock job directory status: jobStatus - with jobStatus(jobPath) as status: + with jobStatus(job_path) as status: # lawrence comment in/out # check for a status file in job directory @@ -468,10 +501,7 @@ class Processor: self.logger.info(f"Current status of job directory is {status.status}") - # now that we have a useable job directory, move the log file there - logPathJob = f"{jobPath}/log.txt" - - self.move_default_logfile_handler(dstPathName = logPathJob) + # self.move_default_logfile_handler(dstPathName = logPathJob) # make a record if process_pre_job failed if not ready: @@ -482,7 +512,7 @@ class Processor: endJob(status, ignore_inprogress = True, premature = False) # files and directories that will be uploaded to public server - FilesToSend = [] + # files_to_send = [] # files and directories that will be earmarked for removal after a # successful job @@ -504,7 +534,7 @@ class Processor: # provide specific case details to template config - configjson['StartTime'] = startTime.strftime('%Y-%m-%d-%H%M') + configjson['StartTime'] = start_time_string configjson['StartString'] = start_date # from configtemplate create configFileName to describe the specific job @@ -514,13 +544,13 @@ class Processor: configjson['ConfigFilePath'] = configFileName # write the complete configuration file to job directory - with open(f"{jobPath}/{configFileName}.json", 'w') as write_file: + with open(f"{job_path}/{configFileName}.json", 'w') as write_file: json.dump(configjson, write_file, indent = 4) # proc_description = universal_config['ProcessInJob'] proc_description = 'ProcessInJob' try: - proc_out = self.process_in_job(jobPath, status, configjson, component) + proc_out: dict = self.process_in_job(job_path, status, configjson, component) except: self.logger.exception(f"Error in process_in_job") status.reset('ERROR') @@ -533,12 +563,12 @@ class Processor: 'output': None, 'clearup': None} - if 'output' in proc_out.keys(): - append_item_to_list( - proc_out['output'], - FilesToSend, - proc_description, - status) + # if 'output' in proc_out.keys(): + # append_item_to_list( + # proc_out['output'], + # files_to_send, + # proc_description, + # status) if 'clearup' in proc_out.keys(): append_item_to_list( @@ -552,7 +582,7 @@ class Processor: # proc_description = universal_config['ProcessEWSPlotting'] proc_description = 'ProcessEWSPlotting' try: - EWSPlottingOutputs = self.process_post_job(jobPath, configjson) + ews_plotting_outputs = self.process_post_job(job_path, configjson) except: self.logger.exception(f"Error in {proc_description}()") status.reset('ERROR') @@ -560,52 +590,52 @@ class Processor: self.logger.info('Finished with EWS-Plotting, appending images to list for transfer') - if EWSPlottingOutputs: - append_item_to_list( - EWSPlottingOutputs, - FilesToSend, - proc_description, - status) + # if ews_plotting_outputs: + # append_item_to_list( + # ews_plotting_outputs, + # files_to_send, + # proc_description, + # status) self.logger.info(f'Finished with config {configIndex + 1} of {config_paths_length}') - # send results to remote server - - if not noupload: - try: - ProcessorComponents.upload(universal_config, FilesToSend, component) - except IndexError: - status.reset('WARNING') - - except: - self.logger.exception('Failed to upload files to remote server') - status.reset('ERROR') - endJob(status, premature = True) - - # check if there is a second location on willow to provide results - if 'ServerPathExtra' in configjson[component]: - - self.logger.info('There is an extra path to send results to:') - - extra_path = configjson[component]['ServerPathExtra'] - - self.logger.info(extra_path) - - universal_config_extra = universal_config.copy() - universal_config_extra['ServerPath'] = extra_path - - try: - ProcessorComponents.upload(universal_config_extra, FilesToSend, component) - except IndexError: - status.reset('WARNING') - - except: - self.logger.exception('Failed to upload files to extra directory on remote server') - status.reset('ERROR') - endJob(status, premature = True) - - else: - self.logger.info('Because noupload argument was present, not sending results to remote server') + # # send results to remote server + # + # if not noupload: + # try: + # ProcessorComponents.upload(universal_config, files_to_send, component) + # except IndexError: + # status.reset('WARNING') + # + # except: + # self.logger.exception('Failed to upload files to remote server') + # status.reset('ERROR') + # endJob(status, premature = True) + # + # # check if there is a second location on willow to provide results + # if 'ServerPathExtra' in configjson[component]: + # + # self.logger.info('There is an extra path to send results to:') + # + # extra_path = configjson[component]['ServerPathExtra'] + # + # self.logger.info(extra_path) + # + # universal_config_extra = universal_config.copy() + # universal_config_extra['ServerPath'] = extra_path + # + # try: + # ProcessorComponents.upload(universal_config_extra, files_to_send, component) + # except IndexError: + # status.reset('WARNING') + # + # except: + # self.logger.exception('Failed to upload files to extra directory on remote server') + # status.reset('ERROR') + # endJob(status, premature = True) + # + # else: + # self.logger.info('Because noupload argument was present, not sending results to remote server') status.reset('SUCCESS') @@ -622,33 +652,38 @@ class Processor: endScript(premature = False) @abstractmethod - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: raise NotImplementedError @abstractmethod - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: raise NotImplementedError @abstractmethod - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: raise NotImplementedError + @abstractmethod + def set_component_logger(self): + """ + overridden in sub classes to set component-specific loggers + :return: + """ + raise NotImplementedError def run_processor(self, component: str): print("Make sure to `conda activate py3EWSepi` environment!") print("Make sure that flagdir package is available (on PYTHONPATH)") try: - self.logger.info("==========") - self.logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") - # load configurations - args_dict: dict = self.parse_and_check_args() - args_dict["component"] = component - self.set_log_level(args_dict['log_level']) - self.run_process(args_dict) + args: dict = self.parse_and_check_args() + args["component"] = component + + self.run_process(args) + except SystemExit as e: print("caught with code " + str(e.code)) - self.logger.info('run_process() exited') + if self.logger: + self.logger.info('run_process() exited') sys.exit(e.code) except: - self.logger.exception('Uncaught exception in run_Process:') - sys.exit(1) + raise Exception('Uncaught exception in run_Process:') diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 8815f3e0cd9707f01154b03833f7bc5917cb9e2e..75f033df07c25c1d35476741ea86de2b7f7aa744 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -13,15 +13,20 @@ from ProcessorUtils import ( add_filters_to_sublogger, endScript, open_and_check_config, - query_past_successes, short_name ) class ProcessorAdvisory(Processor): - def process_pre_job(self, args): - return self.process_pre_job_advisory(args) + def set_component_logger(self): + logger = logging.getLogger('Processor.Advisory') + add_filters_to_sublogger(logger) + + + def process_pre_job(self, args) -> bool: + # return self.process_pre_job_advisory(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: @@ -34,40 +39,7 @@ class ProcessorAdvisory(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Advisory') - add_filters_to_sublogger(logger) - - def process_pre_job_advisory(self,input_args: dict): - - self.logger.info('started process_pre_job_advisory()') - # check configs can be loaded - config_fns: List[str] = input_args['config_paths'] - for configFile in config_fns: - try: - config_i = open_and_check_config(configFile) - except: - self.logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) - - # check pre-requisite jobs are complete - - # Which jobs to check are defined by the config. It may include epi - # and surveys. - - # Note that yesterday's surveys should also have succeeded, but that - # needs a YesterdayDateString available for the survey part of - # advisory config to point to the status file. - - dependent_components = config_i['Advisory'].get( - 'DependentComponents', - ['Deposition','Environment']) - - for dependent_component in dependent_components: - - query_past_successes(input_args, dependent_component) - - return True def process_in_job_advisory(self, jobPath, status, config, component): '''Generates a word processor file containing some basic survey statistics @@ -107,4 +79,4 @@ class ProcessorAdvisory(Processor): if __name__ == '__main__': processor = ProcessorAdvisory() - processor.run_processor("Advisory") \ No newline at end of file + processor.run_processor("Advisory") diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py index 53a132ead95dff9ed484951dc8d5421aeec03108..aa8b519d4c4dd1d7e9ee381431387f83aac95fb7 100644 --- a/coordinator/ProcessorComponents.py +++ b/coordinator/ProcessorComponents.py @@ -12,14 +12,14 @@ from typing import List # coordinator stages: pre, in (during) and plotting. -from ProcessorServer import ( - process_pre_job_server_download, - upload -) +#from ProcessorServer import ( + # process_pre_job_server_download, + #upload +#) from ProcessorUtils import ( add_filters_to_sublogger, - query_past_successes + # query_past_successes ) # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 9fcba63baafb000262aec7a91d02c72ff67b16b6..68fe833a65e1321bf95db569b5e8b811fe926e7c 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -11,7 +11,6 @@ import iris from iris.cube import CubeList from Processor import Processor -from ProcessorServer import process_pre_job_server_download from ProcessorUtils import ( get_only_existing_globs, subprocess_and_log, @@ -24,24 +23,29 @@ class ProcessorDeposition(Processor): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - def process_pre_job(self, args): - return process_pre_job_server_download(args) + def process_pre_job(self, args) -> bool: + # return process_pre_job_server_download(args) + return True - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: return self.process_in_job_dep(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_dep(jobPath, configjson) def __init__(self) -> None: super().__init__() + + """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + + + def set_component_logger(self): logger = logging.getLogger('Processor.Deposition') add_filters_to_sublogger(logger) - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ def process_in_job_dep(self, jobPath, status, config, component): self.logger.info('started process_in_job_dep()') @@ -104,7 +108,7 @@ class ProcessorDeposition(Processor): return proc_out - def process_EWS_plotting_dep(self, jobPath, config): + def process_EWS_plotting_dep(self, jobPath, config) -> [str]: '''Returns a list of output files for transfer.''' self.logger.info('started process_EWS_plotting_dep()') @@ -121,7 +125,7 @@ class ProcessorDeposition(Processor): deposition_data_file_name = Template(config['Deposition']['DataFileTemplate']).substitute(**config) name_file_wildcard = f"{deposition_path}/{deposition_data_file_name}" - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] for region in regions: @@ -160,28 +164,29 @@ class ProcessorDeposition(Processor): depo_processor.process() # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" + ews_plotting_output_dir = f"{output_dir}/images/" #EWSPlottingOutputGlobs += [ # # daily plots # f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png", # # weekly plots # f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"] - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + ews_plotting_output_globs += [f"{ews_plotting_output_dir}*"] - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs, inplace = False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': processor = ProcessorDeposition() processor.run_processor("Deposition") + diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index 4d1a6468929cdda345365ac9a3c6ecfea03647eb..041ce5dc8841ac14eb370c431cba77f6f75f21cf 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -10,7 +10,6 @@ import os from Processor import Processor from ProcessorServer import ( get_data_from_server, - process_pre_job_server_download ) from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo @@ -23,25 +22,27 @@ from ProcessorUtils import ( class ProcessorEnvironment(Processor): - - def process_pre_job(self, args): - return process_pre_job_server_download(args) + def set_component_logger(self): + logger = logging.getLogger('Processor.Environment') + add_filters_to_sublogger(logger) + + def process_pre_job(self, args): + # return process_pre_job_server_download(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: return self.process_in_job_env2_0(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_env2_0(jobPath, configjson) def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Environment') - add_filters_to_sublogger(logger) - def process_in_job_env2_0(self, jobPath,status,config,component): + def process_in_job_env2_0(self, jobPath,status,config,component) -> dict: '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' self.logger.info('started process_in_job_env2_0()') @@ -121,7 +122,7 @@ class ProcessorEnvironment(Processor): run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' #TODO test if this works - def process_EWS_plotting_env2_0(self, jobPath,config): + def process_EWS_plotting_env2_0(self, jobPath,config) -> [str]: '''Configures the plotting arguments and calls EWS-plotting as a python module. Returns a list of output files for transfer.''' @@ -133,7 +134,7 @@ class ProcessorEnvironment(Processor): subregions = config['SubRegionNames'] - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] # work on each region for region in subregions: @@ -175,27 +176,27 @@ class ProcessorEnvironment(Processor): env_suit_processor.process() # check the output - EWSPlottingOutputDir = f"{output_dir}/images/" + ews_plotting_output_dir = f"{output_dir}/images/" #EWSPlottingOutputGlobs += [ # # daily plots # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", # # weekly plots # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + ews_plotting_output_globs = [f"{ews_plotting_output_dir}*"] # check the output - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide list for transfer - EWSPlottingOutputs = sorted([file for glob_str in EWSPlottingOutputGlobs for file in glob(glob_str)]) + ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 2714e2a4ccca821990ee50a6523e75e7ad879896..7ab4aa180650ddefcddeed5a244b12f8432659a5 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -31,28 +31,30 @@ from ProcessorUtils import ( get_only_existing_globs, endJob, add_filters_to_sublogger, - query_past_successes, short_name, disease_latin_name_dict ) class ProcessorEpidemiology(Processor): - def process_pre_job(self, args): + + def set_component_logger(self): + logger = logging.getLogger('Processor.Epi') + add_filters_to_sublogger(logger) + + def process_pre_job(self, args) -> bool: return self.process_pre_job_epi(args) - def process_in_job(self, jobPath, status, configjson, component) -> object: + def process_in_job(self, jobPath, status, configjson, component) -> dict: return self.process_in_job_epi(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_epi(jobPath, configjson) def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Epi') - add_filters_to_sublogger(logger) def process_pre_job_epi(self, input_args: dict): '''Returns a boolean as to whether the job is ready for full processing.''' @@ -60,9 +62,9 @@ class ProcessorEpidemiology(Processor): self.logger.info('started process_pre_job_epi()') # check pre-requisite jobs are complete - query_past_successes(input_args, 'Deposition') - - query_past_successes(input_args, 'Environment') + # query_past_successes(input_args, 'Deposition') + # + # query_past_successes(input_args, 'Environment') config_fns: List[str] = input_args['config_paths'] @@ -661,7 +663,7 @@ class ProcessorEpidemiology(Processor): return proc_out - def process_EWS_plotting_epi(self, jobPath,config): + def process_EWS_plotting_epi(self, jobPath,config) -> [str]: '''Returns a list of output files for transfer.''' self.logger.info('started process_EWS_plotting_epi()') @@ -705,7 +707,7 @@ class ProcessorEpidemiology(Processor): Path(ews_plot_dir).mkdir(parents=True, exist_ok=True) # loop over diseases - EWSPlottingOutputGlobs = [] + ews_plotting_output_globs = [] for disease in diseases: disease_short = disease.lower().replace('rust','') @@ -770,21 +772,22 @@ class ProcessorEpidemiology(Processor): epi_processor_2.process() # check the output - EWSPlottingOutputDir = f"{ews_plot_dir}/images/" + ews_plotting_output_dir = f"{ews_plot_dir}/images/" # TODO: Make this smarter, connected to the results of EWSPlottingEPIBase.plot_epi() - EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] + ews_plotting_output_globs: [str] = [] + ews_plotting_output_globs += [f"{ews_plotting_output_dir}infection_{plotting_region_name_lower}_*{disease_short}*.png"] - EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) # check there is some output from EWS-plotting - if not EWSPlottingOutputGlobs: + if not ews_plotting_output_globs: self.logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide to list for transfer - EWSPlottingOutputs = [item for EWSPlottingOutput in EWSPlottingOutputGlobs for item in glob(EWSPlottingOutput)] + ews_plotting_outputs: [str] = [item for ews_plotting_output in ews_plotting_output_globs for item in glob(ews_plotting_output)] - return EWSPlottingOutputs + return ews_plotting_outputs if __name__ == '__main__': diff --git a/coordinator/ProcessorScraper.py b/coordinator/ProcessorScraper.py index 3d98905a2c51edd1cd6acb2937e402a8fea745ca..1b2d9336a40dcebcebb8afe9ab4dfdfa6f43314e 100644 --- a/coordinator/ProcessorScraper.py +++ b/coordinator/ProcessorScraper.py @@ -34,6 +34,11 @@ class ProcessorScraper(Processor): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + + def set_component_logger(self): + logger = logging.getLogger('Processor.Scraper') + add_filters_to_sublogger(logger) + def process_pre_job(self, args): return True @@ -45,11 +50,9 @@ class ProcessorScraper(Processor): def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Scraper') - add_filters_to_sublogger(logger) - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ + """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv # # ODK v1.11.2: diff --git a/coordinator/ProcessorServer.py b/coordinator/ProcessorServer.py index 685b6cc5436843ea658ced268b644f2897fdda43..2e13b03f531df4a07e753a695be0066052fa0d06 100644 --- a/coordinator/ProcessorServer.py +++ b/coordinator/ProcessorServer.py @@ -23,144 +23,144 @@ from ProcessorUtils import ( logger = logging.getLogger('Processor.Server') add_filters_to_sublogger(logger) -def process_pre_job_server_download(input_args: dict): - '''This is set up for environmental suitability v2.0 and deposition. - Returns a boolean as to whether the job is ready for full processing.''' - - logger.info('started process_pre_job_willow_download()') - - # Check if there is a file available on willow - logger.debug('Checking for file(s) on remote server') - - config_paths: List[str] = input_args['config_paths'] - start_date: str = input_args['start_date'] - component: str = input_args['component'] - - for i,config_path in enumerate(config_paths): - - config = open_and_check_config(config_path) - - config['StartString'] = start_date - - file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) - file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz") - - timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() - - server_name: str = config['ServerName'] - full_file_path = f"{file_path}/{file_name}.tar.gz" - - """ - check the incoming met data tar exists and is a valid tar file (on either remote or local machine) - """ - if server_name == "": # means the file is local - data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path) - else: - cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name, - f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"] - run_in_shell: bool = False - - description_short = 'subprocess_ssh' - description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" - - status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False, - shell = run_in_shell) - data_is_ready = status.returncode == 0 - - if not data_is_ready: - - # a time check in UTC. If it's late, raise warning, if very late, raise error - - time_0 = config[component]['TimeExpectedAvailable'] - time_0 = datetime.datetime.strptime(time_0,'%H%M') - - time_until_warn = datetime.timedelta(hours=4) - time_until_error = datetime.timedelta(hours=5) - - time_warn = (time_0 + time_until_warn).time() - time_error = (time_0 + time_until_error).time() - - message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}" - - if timenow > time_error: - # job is not able to proceed - - logger.warning(message) - - return False - - elif timenow > time_warn: - # job is not ready to proceed - - logger.warning(message) - endScript(premature=True) - - else: - # some other problem with the job - - logger.info(message) - endScript(premature=True) - - elif data_is_ready: - logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed") - - return True - -def upload(config,FilesToSend,component): - - usual_path = f"{config['StartString']}_0000/" - - component_path = { - 'Environment' : usual_path, - 'Deposition' : usual_path, - 'Epidemiology' : usual_path, - 'Survey' : f"SURVEYDATA_{config['StartString']}_0000/", - 'Scraper' : usual_path, - 'MetResample' : f"MET_RESAMPLE_{config['StartString']}_0000/", - 'Advisory' : usual_path } - - - # TODO: make path discern Daily or Weekly sub-directory - - OutputServerPath = f"{config['ServerPath']}/{component_path[component]}" - - logger.info(f"Trying upload to {config['ServerName']}:{OutputServerPath}") - - logger.info(f"File(s) that will be put on remote server: {FilesToSend}") - - if len(FilesToSend) == 0: - logger.warning('No files to send, so skipping this task') - raise IndexError - - logger.debug("Making path directory on remote server if it doesn't already exist") - - server_key = config['ServerKey'] - if server_key == "": - ssh_cmd = f"mkdir -p {OutputServerPath}" - run_in_shell: bool = True - else: - ssh_cmd = ["ssh", "-i", server_key, "-o", "StrictHostKeyChecking=no", config['ServerName'], - f"mkdir -p {OutputServerPath}"] - run_in_shell: bool = False - - description_short = 'upload ssh' - description_long = 'make remote directory' - subprocess_and_log(ssh_cmd, description_short, description_long, shell=run_in_shell) - - logger.debug('Sending file(s) to remote server') - - if server_key == "": - scp_cmd = ["scp", "-r", *FilesToSend, OutputServerPath] - else: - scp_cmd = ["scp", "-ri", server_key, "-o", "StrictHostKeyChecking=no", *FilesToSend, - f"{config['ServerName']}:{OutputServerPath}"] - - description_short = 'upload scp' - description_long = 'scp files to remote directory' - subprocess_and_log(scp_cmd, description_short, description_long) - - return +# def process_pre_job_server_download(input_args: dict): +# '''This is set up for environmental suitability v2.0 and deposition. +# Returns a boolean as to whether the job is ready for full processing.''' +# +# logger.info('started process_pre_job_willow_download()') +# +# # Check if there is a file available on willow +# logger.debug('Checking for file(s) on remote server') +# +# config_paths: List[str] = input_args['config_paths'] +# start_date: str = input_args['start_date'] +# component: str = input_args['component'] +# +# for i,config_path in enumerate(config_paths): +# +# config = open_and_check_config(config_path) +# +# config['StartString'] = start_date +# +# file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) +# file_name = Template(config[component]['InputFileTemplate']).substitute(**config) +# logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz") +# +# timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() +# +# server_name: str = config['ServerName'] +# full_file_path = f"{file_path}/{file_name}.tar.gz" +# +# """ +# check the incoming met data tar exists and is a valid tar file (on either remote or local machine) +# """ +# if server_name == "": # means the file is local +# data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path) +# else: +# cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name, +# f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"] +# run_in_shell: bool = False +# +# description_short = 'subprocess_ssh' +# description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" +# +# status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False, +# shell = run_in_shell) +# data_is_ready = status.returncode == 0 +# +# if not data_is_ready: +# +# # a time check in UTC. If it's late, raise warning, if very late, raise error +# +# time_0 = config[component]['TimeExpectedAvailable'] +# time_0 = datetime.datetime.strptime(time_0,'%H%M') +# +# time_until_warn = datetime.timedelta(hours=4) +# time_until_error = datetime.timedelta(hours=5) +# +# time_warn = (time_0 + time_until_warn).time() +# time_error = (time_0 + time_until_error).time() +# +# message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}" +# +# if timenow > time_error: +# # job is not able to proceed +# +# logger.warning(message) +# +# return False +# +# elif timenow > time_warn: +# # job is not ready to proceed +# +# logger.warning(message) +# endScript(premature=True) +# +# else: +# # some other problem with the job +# +# logger.info(message) +# endScript(premature=True) +# +# elif data_is_ready: +# logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed") +# +# return True + +# def upload(config,FilesToSend,component): +# +# usual_path = f"{config['StartString']}_0000/" +# +# component_path = { +# 'Environment' : usual_path, +# 'Deposition' : usual_path, +# 'Epidemiology' : usual_path, +# 'Survey' : f"SURVEYDATA_{config['StartString']}_0000/", +# 'Scraper' : usual_path, +# 'MetResample' : f"MET_RESAMPLE_{config['StartString']}_0000/", +# 'Advisory' : usual_path } +# +# +# # TODO: make path discern Daily or Weekly sub-directory +# +# OutputServerPath = f"{config['ServerPath']}/{component_path[component]}" +# +# logger.info(f"Trying upload to {config['ServerName']}:{OutputServerPath}") +# +# logger.info(f"File(s) that will be put on remote server: {FilesToSend}") +# +# if len(FilesToSend) == 0: +# logger.warning('No files to send, so skipping this task') +# raise IndexError +# +# logger.debug("Making path directory on remote server if it doesn't already exist") +# +# server_key = config['ServerKey'] +# if server_key == "": +# ssh_cmd = f"mkdir -p {OutputServerPath}" +# run_in_shell: bool = True +# else: +# ssh_cmd = ["ssh", "-i", server_key, "-o", "StrictHostKeyChecking=no", config['ServerName'], +# f"mkdir -p {OutputServerPath}"] +# run_in_shell: bool = False +# +# description_short = 'upload ssh' +# description_long = 'make remote directory' +# subprocess_and_log(ssh_cmd, description_short, description_long, shell=run_in_shell) +# +# logger.debug('Sending file(s) to remote server') +# +# if server_key == "": +# scp_cmd = ["scp", "-r", *FilesToSend, OutputServerPath] +# else: +# scp_cmd = ["scp", "-ri", server_key, "-o", "StrictHostKeyChecking=no", *FilesToSend, +# f"{config['ServerName']}:{OutputServerPath}"] +# +# description_short = 'upload scp' +# description_long = 'scp files to remote directory' +# subprocess_and_log(scp_cmd, description_short, description_long) +# +# return def get_data_from_server(jobPath,config,component): diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index a215b992da1616d768ce179ef2cb120689b61fdf..2cda25ec07a00ed041be7ad8ed395e76048296a6 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -55,22 +55,25 @@ from ProcessorUtils import ( class ProcessorSurveys(Processor): + def set_component_logger(self): + logger = logging.getLogger('Processor.Surveys') + add_filters_to_sublogger(logger) + def process_pre_job(self, args): - return self.process_pre_job_survey(args) + # return self.process_pre_job_survey(args) + return True def process_in_job(self, jobPath, status, configjson, component) -> object: return self.process_in_job_survey(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson): + def process_post_job(self, jobPath, configjson) -> [str]: return self.process_EWS_plotting_survey(jobPath, configjson) def __init__(self) -> None: super().__init__() - logger = logging.getLogger('Processor.Surveys') - add_filters_to_sublogger(logger) self.GET_FORM_AS_CSV_DICT = { 'ODK': get_ODK_form_as_csv, @@ -83,11 +86,11 @@ class ProcessorSurveys(Processor): 'newODK2' : get_newODK2_form_as_csv, } - def process_pre_job_survey(self, input_args): - '''Returns a boolean as to whether the job is ready for full processing.''' - self.logger.info('started process_pre_job_survey(), nothing to do') - - return True + # def process_pre_job_survey(self, input_args): + # '''Returns a boolean as to whether the job is ready for full processing.''' + # self.logger.info('started process_pre_job_survey(), nothing to do') + # + # return True def process_in_job_survey(self, jobPath,status,config,component): self.logger.info('started process_in_job_survey()') diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index 4b3fbe2eb935c5f0ac9c56da36eab9f88873557c..cbcede7547f4e191073296017de5b0eac81af1f4 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -119,7 +119,8 @@ def open_and_check_config(configFile): return config -def get_only_existing_globs(file_globs,inplace=True): + +def get_only_existing_globs(file_globs: [str], inplace = True): if inplace: for i,fg in enumerate(file_globs): @@ -321,31 +322,31 @@ def query_component_success(config_i,job_run: str, job_to_check: str): return True -def query_past_successes(input_args: dict, - component_to_query: str = 'Deposition'): - '''Checks if deposition and environment jobs are already completed - successfully. If not, it raises an error.''' - - component: str = input_args['component'] - - assert component_to_query in ['Deposition','Environment','Epidemiology'] - - # check configs can be loaded - config_fns: List[str] = input_args['config_paths'] - for configFile in config_fns: - try: - config_i = open_and_check_config(configFile) - except: - logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature=True) - - # some config initialisation is necessary - config_i['StartString'] = input_args['start_date'] - - # check if dependent job is readily available - query_component_success(config_i,component,component_to_query) - - return True +# def query_past_successes(input_args: dict, +# component_to_query: str = 'Deposition'): +# '''Checks if deposition and environment jobs are already completed +# successfully. If not, it raises an error.''' +# +# component: str = input_args['component'] +# +# assert component_to_query in ['Deposition','Environment','Epidemiology'] +# +# # check configs can be loaded +# config_fns: List[str] = input_args['config_paths'] +# for configFile in config_fns: +# try: +# config_i = open_and_check_config(configFile) +# except: +# logger.exception(f"Failure in opening or checking config {configFile}") +# endScript(premature=True) +# +# # some config initialisation is necessary +# config_i['StartString'] = input_args['start_date'] +# +# # check if dependent job is readily available +# query_component_success(config_i,component,component_to_query) +# +# return True def dataframe_to_series(df: pd.DataFrame) -> pd.Series: """Reformate a pandas Dataframe into a pandas Series. diff --git a/coordinator/extra/ProcessorMetResample.py b/coordinator/extra/ProcessorMetResample.py index 64f6942224d350e27544257999be887a7cda311b..c8b1b943f0b837352b363853e93a78625dc9a821 100644 --- a/coordinator/extra/ProcessorMetResample.py +++ b/coordinator/extra/ProcessorMetResample.py @@ -23,25 +23,21 @@ from EpiModel.EpiUtils import ( parse_template_string) from Processor import Processor -from ProcessorServer import ( - process_pre_job_server_download, - get_data_from_server -) from ProcessorUtils import ( add_filters_to_sublogger, calc_epi_date_range, open_and_check_config, - query_past_successes, short_name) class ProcessorMetResample(Processor): - def process_pre_job(self, args): + def process_pre_job(self, args) -> bool: self.logger.debug('Performing process_pre_job()') - + # If it will work on a single forecast, get a dedicated download #return process_pre_job_server_download(args) - return query_past_successes(args, 'Environment') + # return query_past_successes(args, 'Environment') + return True def process_in_job(self, jobPath, status, configjson, component) -> object: return self.process_in_job_met_resample(jobPath, status, configjson, component) @@ -52,9 +48,12 @@ class ProcessorMetResample(Processor): def __init__(self) -> None: super().__init__() + + def set_component_logger(self): logger = logging.getLogger('Processor.Extra.MetResample') add_filters_to_sublogger(logger) - + + def gather_data( self, config, @@ -98,11 +97,11 @@ class ProcessorMetResample(Processor): #loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative') file_of_origins = prep.prep_input(config_for_lister,start_date,end_date, - component=component, - file_lister=file_lister, - file_loader=file_loader, - lister_kwargs=lister_kwargs, - **loader_kwargs) + component=component, + file_lister=file_lister, + file_loader=file_loader, + lister_kwargs=lister_kwargs, + **loader_kwargs) assert os.path.isfile(config[component]['FileNamePrepared']) @@ -117,7 +116,7 @@ class ProcessorMetResample(Processor): fn, header=[0,1], index_col=0) - + df.index = to_datetime(df.index,format='%Y%m%d%H%M') return df @@ -133,7 +132,7 @@ class ProcessorMetResample(Processor): /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py """ - + # load dataframe from config file_name_prepared = config[component]['FileNamePrepared'] @@ -158,7 +157,7 @@ class ProcessorMetResample(Processor): #df_original = concat([dfm1,df_original]) resampler = df_original.resample(resample_scale) - + resample_method = config[component].get('resampling','backfill') if resample_method == 'backfill': print('Upsampling by backfilling') @@ -194,7 +193,7 @@ class ProcessorMetResample(Processor): paths = [] origins = [] - + for component in components: self.logger.info(f"Working on {component}") @@ -217,8 +216,8 @@ class ProcessorMetResample(Processor): start_time,#datetime.datetime(2023,6,7,3) end_time,#datetime.datetime(2023,6,25,0), component=component, - ) - + ) + origins += [file_of_origins] self.logger.debug('Performing resampling') @@ -263,7 +262,7 @@ class ProcessorMetResample(Processor): config_i[k]=v # Get run config - + # path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs, # but on the extended EastAfrica grid that includes Zambia. ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/' @@ -285,12 +284,12 @@ class ProcessorMetResample(Processor): config_met[k]=v self.logger.info('Calling gather_and_resample()') - + origins, paths_out = self.gather_and_resample( config_met, reference_date_str = config['StartString'], calculation_span_days = config[component]['CalculationSpanDays'] - ) + ) # zip the files that will be provided to collaborators files_to_zip = origins + paths_out @@ -302,7 +301,7 @@ class ProcessorMetResample(Processor): for file_to_zip in files_to_zip: filename_in_archive = os.path.basename(file_to_zip) - + zipf.write( file_to_zip, arcname=filename_in_archive, diff --git a/docs/_source/iaas_setup.rst b/docs/_source/iaas_setup.rst index fa90fcb31c47c1fc2a052451910aabc6b1be175a..c9f220113ce516de2947ae15d741cb648e0778c3 100644 --- a/docs/_source/iaas_setup.rst +++ b/docs/_source/iaas_setup.rst @@ -589,6 +589,12 @@ airflow db migrate edit the resulting ${AIRFLOW_HOME}/airflow.cfg file on line 435 to the mysyl database sql_alchemy_conn = mysql+mysqldb://airflow_user:<password_from_above>@localhost:3306/airflow_db +while you are editing the config file, set the timezone to be your local timezone, rather than UTC. As this system works +on a regular, local timezone, daylight saving hours need to be taken into account so it runs at the same local time each day. + +default_timezone = Europe/London +(note that the UI has a dropdown in the top right to set the displayed timezone, but the scheduler will run on the server timezone) + **install the mysql provider** sudo apt install gcc; @@ -617,6 +623,8 @@ airflow users create --role Admin --username admin --email lb584@cam.ac.uk --fi (you can set the password to something more secure when logged in) + + run the scheduler and the webserver airflow scheduler; @@ -626,4 +634,83 @@ can run as demons with the -D flag connect to the webserver http://<server_ip>:8090 +**set up airflow as a service** + +Once you have got airflow running from the command line, you will need to set it up as a service (which will run at startup and in the background) + +1: create a service file for airflow scheduler and webserver: + +sudo touch /etc/systemd/system/airflow-scheduler.service +sudo touch /etc/systemd/system/airflow-webserver.service + +edit the contents of both files to be: + +.. code-block:: bash + + [Unit] + Description=Airflow scheduler daemon + After=network.target mysql.service + Wants=mysql.service + [Service] + EnvironmentFile=/storage/airflow/airflow.cfg + User=ewsmanager + Group=ewsmanager + Type=simple + ExecStart=/usr/bin/bash -c 'export AIRFLOW_HOME=/storage/airflow ; source /storage/airflow/airflow-env/bin/activate ; airflow webserver' + Restart=no + RestartSec=5s + PrivateTmp=true + [Install] + WantedBy=multi-user.target + +.. code-block:: bash + + [Unit] + Description=Airflow scheduler daemon + After=network.target mysql.service + Wants=mysql.service + [Service] + EnvironmentFile=/storage/airflow/airflow.cfg + User=ewsmanager + Group=ewsmanager + Type=simple + ExecStart=/usr/bin/bash -c 'export AIRFLOW_HOME=/storage/airflow ; source /storage/airflow/airflow-env/bin/activate ; airflow scheduler' + Restart=no + RestartSec=5s + PrivateTmp=true + [Install] + WantedBy=multi-user.target + +2: reload the service daemon, enable and start the services + +sudo systemctl daemon-reload + +sudo systemctl enable airflow-scheduler.service + +sudo systemctl enable airflow-webserver.service + +sudo systemctl start airflow-scheduler.service + +sudo systemctl start airflow-webserver.service + +sudo systemctl restart airflow-scheduler.service + +sudo systemctl restart airflow-webserver.service + +sudo systemctl stop airflow-scheduler.service + +sudo systemctl stop airflow-webserver.service + +3: check the status of the services + +sudo systemctl status airflow-scheduler.service + +sudo systemctl status airflow-webserver.service + +4: check the logs + +sudo journalctl -r -u airflow-scheduler.service + +sudo journalctl -r -u airflow-webserver.service + diff --git a/docs/api.rst b/docs/api.rst index 92c6af18c4618c7a5de60e983a8304f22acc9f17..3d57073301b307ffbeb4f08da253caf2ac081c6f 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -6,7 +6,7 @@ API :template: custom-module-template.rst :recursive: - coordinator +.. coordinator plotting ews_postprocessing met_processing diff --git a/tests/integration/full/full_test_deposition.py b/tests/integration/full/full_test_deposition.py index 87ff3eeeb285c744250b0bc0109be3e47be7dfdd..14d1d11d5532e321270897084766af0dbf9f78d7 100644 --- a/tests/integration/full/full_test_deposition.py +++ b/tests/integration/full/full_test_deposition.py @@ -10,8 +10,8 @@ from integration.test_suites.depo_test_suite import BaseDepoTestSuite class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite): def set_expected_values(self): - self.EA_CSV_COUNT = 12 - self.ETH_CSV_COUNT = 12 + self.EA_CSV_COUNT = 9 + self.ETH_CSV_COUNT = 9 self.EA_PNG_COUNT = 3 self.ETH_PNG_COUNT = 3 diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index c52a98a3bfefa7fb723d61356085b2a3c4533d35..3ec85fe0ded48e6df10bd0983d8ad5c7b03787cc 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -10,6 +10,7 @@ from zipfile import ZipFile from HTMLTestRunner import HTMLTestRunner +import ProcessorUtils from Processor import Processor @@ -189,20 +190,31 @@ class IntegrationTestUtils: args_dict: dict = {} + config_paths = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME] + # note, possible to override these values in the kwargs loop below args_dict['live'] = False args_dict['noupload'] = True args_dict['start_date'] = start_date args_dict['component'] = component - args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME] + args_dict['config_paths'] = config_paths args_dict['log_level'] = 'info' args_dict['clearup'] = True for key, value in kwargs.items(): args_dict[key] = value - log_level = args_dict['log_level'] - processor.set_log_level(log_level) + # universal_config: dict = processor.build_universal_config(config_paths, component) + # workspacePath = universal_config['WorkspacePathout'] + # + # job_path: str = f'{workspacePath}{ProcessorUtils.short_name[component]}_{start_date}' + # logPath = f"{job_path}/log.txt" + # + # is_live: bool = args_dict['live'] + # processor.setup_logging(logPath, is_live) + # + # log_level = args_dict['log_level'] + # processor.set_log_level(log_level) try: processor.run_process(args_dict) @@ -230,9 +242,6 @@ class IntegrationTestUtils: for key, value in kwargs.items(): args_dict[key] = value - log_level = args_dict['log_level'] - processor.set_log_level(log_level) - try: processor.run_process(args_dict) except SystemExit: diff --git a/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json b/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json index 1f0e3cbbe79e339096da458f657bf00cbea6de2e..86a99e4134bbd1bc2a8a6de6836af8f7227b18a6 100644 --- a/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json +++ b/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json @@ -13,7 +13,7 @@ "ProcessPreJob" : "process_pre_job_survey", "ProcessInJob" : "process_in_job_survey", "ProcessEWSPlotting": "process_EWS_plotting_survey", - "AcceptableDowntimeDays": 35, + "AcceptableDowntimeDays": 70, "SeasonStartString" : "20220930", "SkipServerDownload" : true, "ServerCredentialsFile" : "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/Cred-ODK-EIAR.json", @@ -21,12 +21,15 @@ "FormEdits" : { "wheat_rust_survey_1_0" : { "add" : { - "Origin" : "ODK-server" + "Origin" : "ODK-server", + "PublishedLevel" : "Raw" + } }, "akpyJHvYxkLKPkxFJnPyTW" : { "add" : { - "Origin" : "kobo-server" + "Origin" : "kobo-server", + "PublishedLevel" : "Raw" }, "filter_by_list": { "surveyor_infromation-country" : ["Kenya", "Ethiopia"] @@ -67,7 +70,7 @@ "LeafRust": { "suitability_modules": ["semibool_dewperiod"], "past_steps": 0, - "future_steps": 7, + "future_steps": 2, "thresholds": { "temperature": [2,15,20,30], "precipitation": 0, @@ -77,7 +80,7 @@ "StemRust": { "suitability_modules": ["semibool_dewperiod"], "past_steps": 0, - "future_steps": 7, + "future_steps": 2, "thresholds": { "temperature": [2,15,24,30], "precipitation": 0, @@ -97,7 +100,7 @@ "LeafRust_TempOnly": { "suitability_modules": ["semibool_dewperiod"], "past_steps": 0, - "future_steps": 7, + "future_steps": 2, "thresholds": { "temperature": [2,15,20,30], "precipitation": -1, @@ -107,7 +110,7 @@ "StemRust_TempOnly": { "suitability_modules": ["semibool_dewperiod"], "past_steps": 0, - "future_steps": 7, + "future_steps": 2, "thresholds": { "temperature": [2,15,24,30], "precipitation": -1,