diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 2d4ed9694ffbf4da8efa2e048b33ee2a80b0e1ba..5ec7c1b803ceb99588e092c2c7c6485a8e2377c5 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -2,19 +2,12 @@ '''Functions to process the advisory component.''' import logging -from typing import List # gitlab projects # TODO: Package these projects so they are robust for importing -from AdvisoryBuilder import DataGatherer # created by jws52 -from processor_base import ProcessorBase - -from ProcessorUtils import ( - add_filters_to_sublogger, - endScript, - open_and_check_config, - short_name -) +from AdvisoryBuilder import DataGatherer # created by jws52 +from coordinator.ProcessorUtils import add_filters_to_sublogger, short_name +from coordinator.processor_base import ProcessorBase class ProcessorAdvisory(ProcessorBase): diff --git a/coordinator/ProcessorDeposition.py b/coordinator/ProcessorDeposition.py index 3e47683dd6f5eb46e1722c7ee0d832c63e0c41a5..cd0bb8d6a18ebab2ff335c216c1ea6b0375843c1 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/ProcessorDeposition.py @@ -1,24 +1,21 @@ #ProcessorDeposition.py '''Functions to process the deposition component.''' -from glob import glob import logging -from pathlib import Path import os +from glob import glob +from pathlib import Path from string import Template import iris from iris.cube import CubeList -from ProcessorUtils import ( - get_only_existing_globs, - subprocess_and_log, - add_filters_to_sublogger, -) from coordinator import ProcessorUtils +from coordinator.ProcessorUtils import subprocess_and_log, get_only_existing_globs from coordinator.processor_base import ProcessorBase from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor + logger = logging.getLogger(__name__) class ProcessorDeposition(ProcessorBase): diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py index 1ce3a203d09fd0b896760528c05ef3f92257e78a..7615931aa3c5ba41ae2245529014efe4628ae5d1 100644 --- a/coordinator/ProcessorEnvironment.py +++ b/coordinator/ProcessorEnvironment.py @@ -7,14 +7,14 @@ import logging from pathlib import Path import os -from processor_base import ProcessorBase -from ProcessorServer import ( +from coordinator.processor_base import ProcessorBase +from coordinator.ProcessorServer import ( get_data_from_server, ) from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo -import EnvSuitPipeline as esp +import coordinator.EnvSuitPipeline as esp from ProcessorUtils import ( get_only_existing_globs, short_name, add_filters_to_sublogger diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 6fa843d41e80cfc77323e0012d312cbc5f026e2f..2333de804c31bbc33490521a9bdf00026ecc6135 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -22,18 +22,19 @@ from EpiModel import ( # created by rs481 plotRaster ) from EpiModel.EpiPrep import prep -from processor_base import ProcessorBase +from coordinator.processor_base import ProcessorBase from ews_postprocessing.epi.epi_post_processor import EPIPostPostProcessor -from ProcessorUtils import ( - calc_epi_date_range, - open_and_check_config, - get_only_existing_globs, - endJob, - add_filters_to_sublogger, - short_name, - disease_latin_name_dict +from coordinator.ProcessorUtils import ( + calc_epi_date_range, + open_and_check_config, + get_only_existing_globs, + endJob, + add_filters_to_sublogger, + short_name, + disease_latin_name_dict ) +logger = logging.getLogger(__name__) class ProcessorEpidemiology(ProcessorBase): @@ -59,7 +60,7 @@ class ProcessorEpidemiology(ProcessorBase): def process_pre_job_epi(self, input_args: dict): '''Returns a boolean as to whether the job is ready for full processing.''' - self.logger.info('started process_pre_job_epi()') + logger.info('started process_pre_job_epi()') # check pre-requisite jobs are complete # query_past_successes(input_args, 'Deposition') @@ -83,7 +84,7 @@ class ProcessorEpidemiology(ProcessorBase): # warn if it is a long timespan date_diff = end_time - start_time if date_diff.days > 100: - self.logger.warning("More than 100 days will be calculated over, likely longer than any single season") + logger.warning("More than 100 days will be calculated over, likely longer than any single season") return True @@ -113,10 +114,10 @@ class ProcessorEpidemiology(ProcessorBase): config_filename = f"{configName_withoutEpi}_{epiString}" - self.logger.debug(f"length of config filename is {len(config_filename)}.") + logger.debug(f"length of config filename is {len(config_filename)}.") if len(config_filename) > 254: - self.logger.info(f"filename length is too long, it will raise an OSError, using a short form instead") + logger.info(f"filename length is too long, it will raise an OSError, using a short form instead") # epi cases are not described in filename, an interested user # must look in the json file for details. @@ -274,7 +275,7 @@ class ProcessorEpidemiology(ProcessorBase): return dfm3 def process_in_job_epi(self, jobPath,status,config,component): - self.logger.info('started process_in_job_epi()') + logger.info('started process_in_job_epi()') # TODO: Some of this is modifying config before epi model is run. Determine # how to account for that @@ -348,7 +349,7 @@ class ProcessorEpidemiology(ProcessorBase): case_specific_path = f"{jobPath}/{region}/{disease}/" Path(case_specific_path).mkdir(parents=True, exist_ok=True) - self.logger.info(f"Preparing for epidemiology calc of {disease} in {region}") + logger.info(f"Preparing for epidemiology calc of {disease} in {region}") # create config_filename to describe job configuration config_filename = self.create_epi_config_string(config,case_specific_path,start_string,end_string) @@ -381,7 +382,7 @@ class ProcessorEpidemiology(ProcessorBase): component='Deposition') except: - self.logger.exception(f"Unexpected error in {component} data preparation") + logger.exception(f"Unexpected error in {component} data preparation") status.reset('ERROR') endJob(status,premature=True) @@ -389,7 +390,7 @@ class ProcessorEpidemiology(ProcessorBase): if 'Environment' in config_epi: - self.logger.info('Preparing environmental suitability data') + logger.info('Preparing environmental suitability data') try: prep.gather_dependent_models( @@ -405,14 +406,14 @@ class ProcessorEpidemiology(ProcessorBase): except: - self.logger.exception(f"Unexpected error in {component} data preparation") + logger.exception(f"Unexpected error in {component} data preparation") status.reset('ERROR') endJob(status,premature=True) # prepare a copy of the host data - self.logger.info('Preparing a copy of the host raster data') + logger.info('Preparing a copy of the host raster data') # TargetRaster defines the grid that the epi model works on. assert 'TargetRaster' in config_epi['Host'] @@ -446,7 +447,7 @@ class ProcessorEpidemiology(ProcessorBase): shutil.copyfile(src_host,dst_host) config_epi['Host']['TargetRaster'] = dst_host - self.logger.info('Preparing a copy of the host data as csv') + logger.info('Preparing a copy of the host data as csv') dst_host_csv = dst_host.replace('.tif','.csv') @@ -460,7 +461,7 @@ class ProcessorEpidemiology(ProcessorBase): # Preparing any continue-run files if is_continue is True: - self.logger.debug('This is a continue run.') + logger.debug('This is a continue run.') for ci in config_epi['Epi']: @@ -468,7 +469,7 @@ class ProcessorEpidemiology(ProcessorBase): # Get results of last day and prepare as input - self.logger.info(f"Getting a copy of the {model_name} model data to continue from") + logger.info(f"Getting a copy of the {model_name} model data to continue from") try: prep.gather_dependent_models( @@ -483,7 +484,7 @@ class ProcessorEpidemiology(ProcessorBase): component=model_name) except: - self.logger.exception(f"Unexpected error in {model_name} data preparation") + logger.exception(f"Unexpected error in {model_name} data preparation") status.reset('ERROR') endJob(status,premature=True) @@ -500,8 +501,8 @@ class ProcessorEpidemiology(ProcessorBase): config_epi['StartTimeShort'] = continue_start_date.strftime('%Y%m%d%H%M') def print_item(item): - self.logger.debug(f"Item {item}") - self.logger.debug(json.dumps(item,indent=2)) + logger.debug(f"Item {item}") + logger.debug(json.dumps(item,indent=2)) def iterate(items): for item in items.items(): if hasattr(item,'items'): @@ -510,12 +511,12 @@ class ProcessorEpidemiology(ProcessorBase): else: print_item(item) - self.logger.debug('Incremental configuration looks like:') + logger.debug('Incremental configuration looks like:') iterate(config_epi) - self.logger.debug('Complete configuration looks like:') - self.logger.debug(json.dumps(config_epi,indent=2)) + logger.debug('Complete configuration looks like:') + logger.debug(json.dumps(config_epi,indent=2)) # write the complete configuration file to job directory with open(f"{case_specific_path}/{config_filename}.json",'w') as write_file: @@ -523,13 +524,13 @@ class ProcessorEpidemiology(ProcessorBase): # run epi model - self.logger.info('About to run the epi model.') + logger.info('About to run the epi model.') try: model.run_epi_model(f"{case_specific_path}/{config_filename}.json") except: - self.logger.exception('Unexpected error in EpiModel') + logger.exception('Unexpected error in EpiModel') raise # perform calc on output @@ -557,11 +558,11 @@ class ProcessorEpidemiology(ProcessorBase): analysis_desc, analysis_value = analysis_func(infection) - self.logger.info(f"For case {outfile}") - self.logger.info('Infection {:s} is {:.2e}'.format( analysis_desc, analysis_value)) + logger.info(f"For case {outfile}") + logger.info('Infection {:s} is {:.2e}'.format( analysis_desc, analysis_value)) # to save tif as png for easy viewing - self.logger.debug('Saving tif output as png for easier viewing') + logger.debug('Saving tif output as png for easier viewing') plotRaster.save_raster_as_png(outfile) # comparison figure @@ -569,7 +570,7 @@ class ProcessorEpidemiology(ProcessorBase): # TODO: make this plot configurable? with function or args? #logger.info('Plotting epi output alongside contributing components') # figure_func = getattr(EpiAnalysis,'plot_compare_host_env_dep_infection') - self.logger.info('Plotting composite image of epi formulations') + logger.info('Plotting composite image of epi formulations') figure_func = getattr(EpiAnalysis,'plot_compare_epi_cases') # isolate the config for this function, in case of modifications @@ -666,7 +667,7 @@ class ProcessorEpidemiology(ProcessorBase): def process_EWS_plotting_epi(self, jobPath,config) -> [str]: '''Returns a list of output files for transfer.''' - self.logger.info('started process_EWS_plotting_epi()') + logger.info('started process_EWS_plotting_epi()') # initalise necessary variables from config @@ -678,7 +679,7 @@ class ProcessorEpidemiology(ProcessorBase): epi_case_operational = config['Epidemiology']['EWS-Plotting']['EpiCase'] if epi_case_operational == 'none': - self.logger.info('Config specifies not to call to EWS-Plotting') + logger.info('Config specifies not to call to EWS-Plotting') return [] diseases = config['Epidemiology']['DiseaseNames'] @@ -734,7 +735,7 @@ class ProcessorEpidemiology(ProcessorBase): # only run season so far (i.e. historic dates) if they exist if (seasonsofar_run_config is not None) & os.path.exists(epi_seasonsofar_fn): - self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{seasonsofar_run_config}\n{chart_config}") + logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{seasonsofar_run_config}\n{chart_config}") epi_processor_1 = EPIPostPostProcessor() epi_processor_1.set_param_config_files(sys_params_file_arg=sys_config, @@ -754,7 +755,7 @@ class ProcessorEpidemiology(ProcessorBase): run_config = config['Epidemiology']['EWS-Plotting']['RunConfig_seasonplusforecast'] - self.logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") + logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") epi_processor_2 = EPIPostPostProcessor() epi_processor_2.set_param_config_files(sys_params_file_arg=sys_config, @@ -781,7 +782,7 @@ class ProcessorEpidemiology(ProcessorBase): # check there is some output from EWS-plotting if not ews_plotting_output_globs: - self.logger.error('EWS-Plotting did not produce any output') + logger.error('EWS-Plotting did not produce any output') raise RuntimeError # provide to list for transfer diff --git a/coordinator/ProcessorServer.py b/coordinator/ProcessorServer.py index 82a3b39e5149f764a497ba8571cb08091ee91e1f..a59fc1ef32957a9d2a54ca70647142ca85eb78f4 100644 --- a/coordinator/ProcessorServer.py +++ b/coordinator/ProcessorServer.py @@ -1,165 +1,19 @@ #ProcessorServer.py '''Functions to communicate with server sites for download and upload.''' -import datetime import logging -import os import tarfile from pathlib import Path from string import Template -from typing import List from iris import load from iris.cube import CubeList -from ProcessorUtils import ( - add_filters_to_sublogger, - endScript, - open_and_check_config, - remove_path_from_tar_members, - subprocess_and_log -) +from coordinator.ProcessorUtils import subprocess_and_log, remove_path_from_tar_members + logger = logging.getLogger(__name__) -# def process_pre_job_server_download(input_args: dict): -# '''This is set up for environmental suitability v2.0 and deposition. -# Returns a boolean as to whether the job is ready for full processing.''' -# -# logger.info('started process_pre_job_willow_download()') -# -# # Check if there is a file available on willow -# logger.debug('Checking for file(s) on remote server') -# -# config_paths: List[str] = input_args['config_paths'] -# start_date: str = input_args['start_date'] -# component: str = input_args['component'] -# -# for i,config_path in enumerate(config_paths): -# -# config = open_and_check_config(config_path) -# -# config['StartString'] = start_date -# -# file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) -# file_name = Template(config[component]['InputFileTemplate']).substitute(**config) -# logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz") -# -# timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() -# -# server_name: str = config['ServerName'] -# full_file_path = f"{file_path}/{file_name}.tar.gz" -# -# """ -# check the incoming met data tar exists and is a valid tar file (on either remote or local machine) -# """ -# if server_name == "": # means the file is local -# data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path) -# else: -# cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name, -# f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"] -# run_in_shell: bool = False -# -# description_short = 'subprocess_ssh' -# description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" -# -# status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False, -# shell = run_in_shell) -# data_is_ready = status.returncode == 0 -# -# if not data_is_ready: -# -# # a time check in UTC. If it's late, raise warning, if very late, raise error -# -# time_0 = config[component]['TimeExpectedAvailable'] -# time_0 = datetime.datetime.strptime(time_0,'%H%M') -# -# time_until_warn = datetime.timedelta(hours=4) -# time_until_error = datetime.timedelta(hours=5) -# -# time_warn = (time_0 + time_until_warn).time() -# time_error = (time_0 + time_until_error).time() -# -# message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}" -# -# if timenow > time_error: -# # job is not able to proceed -# -# logger.warning(message) -# -# return False -# -# elif timenow > time_warn: -# # job is not ready to proceed -# -# logger.warning(message) -# endScript(premature=True) -# -# else: -# # some other problem with the job -# -# logger.info(message) -# endScript(premature=True) -# -# elif data_is_ready: -# logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed") -# -# return True - -# def upload(config,FilesToSend,component): -# -# usual_path = f"{config['StartString']}_0000/" -# -# component_path = { -# 'Environment' : usual_path, -# 'Deposition' : usual_path, -# 'Epidemiology' : usual_path, -# 'Survey' : f"SURVEYDATA_{config['StartString']}_0000/", -# 'Scraper' : usual_path, -# 'MetResample' : f"MET_RESAMPLE_{config['StartString']}_0000/", -# 'Advisory' : usual_path } -# -# -# # TODO: make path discern Daily or Weekly sub-directory -# -# OutputServerPath = f"{config['ServerPath']}/{component_path[component]}" -# -# logger.info(f"Trying upload to {config['ServerName']}:{OutputServerPath}") -# -# logger.info(f"File(s) that will be put on remote server: {FilesToSend}") -# -# if len(FilesToSend) == 0: -# logger.warning('No files to send, so skipping this task') -# raise IndexError -# -# logger.debug("Making path directory on remote server if it doesn't already exist") -# -# server_key = config['ServerKey'] -# if server_key == "": -# ssh_cmd = f"mkdir -p {OutputServerPath}" -# run_in_shell: bool = True -# else: -# ssh_cmd = ["ssh", "-i", server_key, "-o", "StrictHostKeyChecking=no", config['ServerName'], -# f"mkdir -p {OutputServerPath}"] -# run_in_shell: bool = False -# -# description_short = 'upload ssh' -# description_long = 'make remote directory' -# subprocess_and_log(ssh_cmd, description_short, description_long, shell=run_in_shell) -# -# logger.debug('Sending file(s) to remote server') -# -# if server_key == "": -# scp_cmd = ["scp", "-r", *FilesToSend, OutputServerPath] -# else: -# scp_cmd = ["scp", "-ri", server_key, "-o", "StrictHostKeyChecking=no", *FilesToSend, -# f"{config['ServerName']}:{OutputServerPath}"] -# -# description_short = 'upload scp' -# description_long = 'scp files to remote directory' -# subprocess_and_log(scp_cmd, description_short, description_long) -# -# return def get_data_from_server(jobPath,config,component): diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index 558fb7768dd68b5a21e81e2962f7d5a6e23e8333..99fffd1be3ddd6dc04b17402527f6c20818ea658 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -34,15 +34,15 @@ from numpy import all as np_all from numpy import any as np_any from pandas import read_csv, concat -from ProcessorSurveysODK import get_ODK_form_as_csv -from ProcessorSurveysODKSA import get_ODK_SA_form_as_csv -from ProcessorSurveysODKv2 import get_ODKv2_form_as_csv -from ProcessorSurveysWRSIS import get_WRSIS_form_as_csv -from ProcessorSurveysWRT import get_WRT_form_as_csv -from ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv -from ProcessorSurveysnewODK import get_newODK_form_as_csv -from ProcessorSurveysnewODK2 import get_newODK2_form_as_csv -from processor_base import ProcessorBase +from coordinator.ProcessorSurveysODK import get_ODK_form_as_csv +from coordinator.ProcessorSurveysODKSA import get_ODK_SA_form_as_csv +from coordinator.ProcessorSurveysODKv2 import get_ODKv2_form_as_csv +from coordinator.ProcessorSurveysWRSIS import get_WRSIS_form_as_csv +from coordinator.ProcessorSurveysWRT import get_WRT_form_as_csv +from coordinator.ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv +from coordinator.ProcessorSurveysnewODK import get_newODK_form_as_csv +from coordinator.ProcessorSurveysnewODK2 import get_newODK2_form_as_csv +from coordinator.processor_base import ProcessorBase from source_gen.clustering import run_case, logit diff --git a/coordinator/ProcessorSurveysODK.py b/coordinator/ProcessorSurveysODK.py index d81777ca5f782985d8a2f3a5b872a24fa7f3ce8b..74dec59871295b59e055dd65972c271d6cc9719f 100644 --- a/coordinator/ProcessorSurveysODK.py +++ b/coordinator/ProcessorSurveysODK.py @@ -4,18 +4,17 @@ import datetime import logging import os -from pathlib import Path -from string import Template import subprocess - +from pathlib import Path from shutil import copyfile +from string import Template -from ProcessorUtils import ( - subprocess_and_log, - endJob, - add_filters_to_sublogger, +from coordinator.ProcessorUtils import ( + subprocess_and_log, + endJob, ) + logger = logging.getLogger(__name__) def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, status): diff --git a/coordinator/ProcessorSurveysODKSA.py b/coordinator/ProcessorSurveysODKSA.py index 2d2f9e5803c6496a3a62cd9a20b04d213b789218..e79b525ece013a0fe46045699a7e772abc62a41f 100644 --- a/coordinator/ProcessorSurveysODKSA.py +++ b/coordinator/ProcessorSurveysODKSA.py @@ -5,20 +5,17 @@ formatting.""" import datetime import logging import os +import subprocess from csv import QUOTE_MINIMAL from pathlib import Path +from shutil import copyfile from string import Template -import subprocess from pandas import read_csv -from shutil import copyfile -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - subprocess_and_log, - endJob, - add_filters_to_sublogger, -) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import subprocess_and_log, endJob + logger = logging.getLogger(__name__) diff --git a/coordinator/ProcessorSurveysODKv2.py b/coordinator/ProcessorSurveysODKv2.py index fb1ba8ec1ba941b57667b579268a48f18fdbea52..bb1c677874af95e70b06d0b61cf9eb7d9cb2efe6 100644 --- a/coordinator/ProcessorSurveysODKv2.py +++ b/coordinator/ProcessorSurveysODKv2.py @@ -14,11 +14,9 @@ from pathlib import Path from pandas import read_csv from shutil import copyfile -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, -) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import endJob + logger = logging.getLogger(__name__) diff --git a/coordinator/ProcessorSurveyskobotoolbox.py b/coordinator/ProcessorSurveyskobotoolbox.py index 182e2232d119efad248f005dc64c6df4f1667963..2b39a9bf5ebd23c92560dde939118bce111fb65f 100644 --- a/coordinator/ProcessorSurveyskobotoolbox.py +++ b/coordinator/ProcessorSurveyskobotoolbox.py @@ -11,11 +11,9 @@ import requests from shutil import copyfile from pandas import DataFrame -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, -) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import endJob + logger = logging.getLogger(__name__) diff --git a/coordinator/ProcessorSurveysnewODK.py b/coordinator/ProcessorSurveysnewODK.py index a275a396b520c1a15eb51424e936db235a219304..040b46bd4feb0aeb26abcfa770371d378eb1cfa7 100644 --- a/coordinator/ProcessorSurveysnewODK.py +++ b/coordinator/ProcessorSurveysnewODK.py @@ -11,11 +11,9 @@ import requests from shutil import copyfile from pandas import DataFrame -from ProcessorSurveyUtils import parse_columns -from ProcessorUtils import ( - endJob, - add_filters_to_sublogger, -) +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorUtils import endJob + logger = logging.getLogger(__name__) diff --git a/coordinator/ProcessorSurveysnewODK2.py b/coordinator/ProcessorSurveysnewODK2.py index afb015125f3bb7e73a76eb4a64845f3057ccd675..c29b0c8848d1027a3114a428044360bc3609f6b1 100644 --- a/coordinator/ProcessorSurveysnewODK2.py +++ b/coordinator/ProcessorSurveysnewODK2.py @@ -10,17 +10,14 @@ from shutil import copyfile import requests -from ProcessorSurveyUtils import parse_columns -from ProcessorSurveysnewODK import ( +from coordinator.ProcessorSurveyUtils import parse_columns +from coordinator.ProcessorSurveysnewODK import ( cases_incident, cases_severity, get_from_kobotoolbox, build_dataframe ) -from ProcessorUtils import ( - endJob, -) - +from coordinator.ProcessorUtils import endJob logger = logging.getLogger(__name__) diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index 17bb499af782ec0c0950cf7a67748f1a113e479c..fee2ea72e5fabc211e7568f60db6d600690bef5f 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -553,7 +553,9 @@ def setup_logging(job_file_path: str, is_live: bool, log_level: str): print(f"is_live = False, so removing email handler from logging config") log_config_dict['handlers'].pop('handler_buffered_email') for logger_key in log_config_dict['loggers'].keys(): - log_config_dict['loggers'][logger_key]['handlers'].remove('handler_buffered_email') + handlers = log_config_dict['loggers'][logger_key]['handlers'] + if 'handler_buffered_email' in handlers: + handlers.remove('handler_buffered_email') assert log_level in loglevels for logger_key in log_config_dict['loggers'].keys(): diff --git a/coordinator/processor_base.py b/coordinator/processor_base.py index 98051fbc9d25ea772d576099dda0be5051c4c420..345c07d114465fcc67d98d3626b659c57871daed 100755 --- a/coordinator/processor_base.py +++ b/coordinator/processor_base.py @@ -20,6 +20,8 @@ from abc import abstractmethod, ABCMeta from typing import List, Union, Any, Dict from coordinator import ProcessorUtils +from coordinator.ProcessorUtils import short_name, open_and_check_config, endScript, endJob, append_item_to_list, \ + clear_up print("Make sure to `conda activate py3EWSepi` environment!") @@ -37,18 +39,7 @@ import sys from flagdir import jobStatus # created by jws52 # submodules of this project -from ProcessorUtils import ( - append_item_to_list, - clear_up, - endScript, - endJob, - open_and_check_config, - short_name, -) - -""" -Default logger - will be overridden by the Processor.setup_logging() method when called from the command line -""" + logger = logging.getLogger(__name__) diff --git a/tests/integration/partial/integration_test_utils.py b/tests/integration/partial/integration_test_utils.py index 3ec85fe0ded48e6df10bd0983d8ad5c7b03787cc..d9fb9f77e4ce7d95f5ca20282d44fb112ffa7882 100644 --- a/tests/integration/partial/integration_test_utils.py +++ b/tests/integration/partial/integration_test_utils.py @@ -10,13 +10,13 @@ from zipfile import ZipFile from HTMLTestRunner import HTMLTestRunner -import ProcessorUtils -from Processor import Processor +from coordinator.processor_base import ProcessorBase class IntegrationTestUtils: EMAIL_CRED_PATH: str = "../../test_data/test_deployment/envs/Cred_gmail.json" + LOGGING_CONFIG_PATH: str = "../../test_data/test_deployment/envs/test_log_config.json" DEFAULT_CONFIG_FILE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json" TEST_WORKSPACE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/workspace/" TEMP_CONFIG_FILE_NAME: str = None @@ -174,7 +174,7 @@ class IntegrationTestUtils: @staticmethod def run_partial_integration_test_pipeline(component: str, start_date: str, - processor: Processor, + processor: ProcessorBase, **kwargs): """ @@ -204,17 +204,10 @@ class IntegrationTestUtils: for key, value in kwargs.items(): args_dict[key] = value - # universal_config: dict = processor.build_universal_config(config_paths, component) - # workspacePath = universal_config['WorkspacePathout'] - # - # job_path: str = f'{workspacePath}{ProcessorUtils.short_name[component]}_{start_date}' - # logPath = f"{job_path}/log.txt" - # - # is_live: bool = args_dict['live'] - # processor.setup_logging(logPath, is_live) - # - # log_level = args_dict['log_level'] - # processor.set_log_level(log_level) + # need EMAIL_CRED in the environment before we run a Processor + os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH + # need LOGGING_CONFIG_PATH in the environment before we run a Processor + os.environ["LOGGING_CONFIG"] = IntegrationTestUtils.LOGGING_CONFIG_PATH try: processor.run_process(args_dict) @@ -225,7 +218,7 @@ class IntegrationTestUtils: @staticmethod def run_external_pipeline(component: str, start_date: str, - processor: Processor, + processor: ProcessorBase, **kwargs): args_dict: dict = {} diff --git a/tests/integration/partial/test_advisory.py b/tests/integration/partial/test_advisory.py index d8558cd7459895a0e0508f9fbc671c9225b5861c..a5df57a3a577e359f53047744f6d281ffd37231c 100644 --- a/tests/integration/partial/test_advisory.py +++ b/tests/integration/partial/test_advisory.py @@ -2,7 +2,7 @@ import copy import os import unittest -from ProcessorAdvisory import ProcessorAdvisory +from coordinator.ProcessorAdvisory import ProcessorAdvisory from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.advisory_test_suite import BaseAdvisoryTestSuite @@ -69,8 +69,6 @@ class TestAdvisory(BaseAdvisoryTestSuite.AdvisoryTestSuite): @staticmethod def run_advisory_pipeline(): component = 'Advisory' - # need EMAIL_CRED in the environment before we create a Processor - os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH advisory_processor = ProcessorAdvisory() IntegrationTestUtils.run_partial_integration_test_pipeline(component, IntegrationTestUtils.TEST_START_DATE, diff --git a/tests/integration/partial/test_deposition.py b/tests/integration/partial/test_deposition.py index 96268cb9510e7a6d5fc71c4f0474a43b622d139f..66a6bdaaf5ed56c49cb81e592eecc9f089fa19c2 100644 --- a/tests/integration/partial/test_deposition.py +++ b/tests/integration/partial/test_deposition.py @@ -2,7 +2,7 @@ import copy import os import unittest -from ProcessorDeposition import ProcessorDeposition +from coordinator.ProcessorDeposition import ProcessorDeposition from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite diff --git a/tests/integration/partial/test_env_suit.py b/tests/integration/partial/test_env_suit.py index 843556a46f285d9c7ea66d71ed99b1a94bf6be38..3bb44c659a079d5122231d4be2c52bf8ff6c35e0 100644 --- a/tests/integration/partial/test_env_suit.py +++ b/tests/integration/partial/test_env_suit.py @@ -2,7 +2,7 @@ import copy import os import unittest -from ProcessorEnvironment import ProcessorEnvironment +from coordinator.ProcessorEnvironment import ProcessorEnvironment from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.env_suit_test_suite import BaseEnvSuitTestSuite diff --git a/tests/integration/partial/test_epi.py b/tests/integration/partial/test_epi.py index a5433b9a0842265cd62a8d8437f58888cc563ed7..8f46c15f064d277d9789f7b7cdc4301d27cfeb4a 100644 --- a/tests/integration/partial/test_epi.py +++ b/tests/integration/partial/test_epi.py @@ -2,7 +2,7 @@ import copy import os import unittest -from ProcessorEpidemiology import ProcessorEpidemiology +from coordinator.ProcessorEpidemiology import ProcessorEpidemiology from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.epi_test_suite import BaseEpiTestSuite diff --git a/tests/integration/partial/test_survey.py b/tests/integration/partial/test_survey.py index 0026f5206ed6771858d74b04ef95f4888b1fda53..816fd2e504995ff511effc43abce89a36effdb41 100644 --- a/tests/integration/partial/test_survey.py +++ b/tests/integration/partial/test_survey.py @@ -2,7 +2,7 @@ import copy import os import unittest -from ProcessorSurveys import ProcessorSurveys +from coordinator.ProcessorSurveys import ProcessorSurveys from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.survey_test_suite import BaseSurveyTestSuite