From ce91e2767d26a87403443f14606efbb522e1a50a Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Mon, 13 May 2024 15:37:03 +0100 Subject: [PATCH] big restructure and renaming of coordinator package file names use snake case consolidation of wnv suit pipeline code --- coordinator/ENVDataPostProcessor.py | 121 ------ coordinator/EnvSuitPipeline.py | 298 -------------- coordinator/NAMEPreProcessor.py | 96 ----- coordinator/ProcessorComponents.py | 48 --- coordinator/ProcessorEnvironment.py | 202 ---------- coordinator/ProcessorServer.py | 77 ---- coordinator/logs/.gitignore | 4 - ...essorAdvisory.py => processor_advisory.py} | 7 +- coordinator/processor_base.py | 34 +- ...rDeposition.py => processor_deposition.py} | 22 +- coordinator/processor_environment.py | 371 ++++++++++++++++++ ...demiology.py => processor_epidemiology.py} | 19 +- ...tResample.py => processor_met_resample.py} | 8 +- ...ocessorScraper.py => processor_scraper.py} | 7 - ...ocessorSurveys.py => processor_surveys.py} | 27 +- coordinator/survey_servers/__init__.py | 0 .../processor_survey_utils.py} | 0 .../processor_surveys_kobotoolbox.py} | 6 +- .../processor_surveys_new_odk.py} | 6 +- .../processor_surveys_new_odk2.py} | 8 +- .../processor_surveys_odk.py} | 6 +- .../processor_surveys_odk_sa.py} | 6 +- .../processor_surveys_odk_v2.py} | 6 +- .../processor_surveys_wrsis.py} | 6 +- .../processor_surveys_wrt.py} | 6 +- coordinator/utils/__init__.py | 0 .../buffering_smtp_handler.py} | 0 .../processor_utils.py} | 259 ++++-------- tests/integration/full/full_test_advisory.py | 8 +- .../integration/full/full_test_deposition.py | 2 +- tests/integration/full/full_test_env_suit.py | 2 +- tests/integration/full/full_test_epi.py | 6 +- tests/integration/full/full_test_survey.py | 2 +- tests/integration/partial/test_advisory.py | 2 +- tests/integration/partial/test_deposition.py | 2 +- tests/integration/partial/test_env_suit.py | 2 +- tests/integration/partial/test_epi.py | 2 +- tests/integration/partial/test_survey.py | 2 +- 38 files changed, 529 insertions(+), 1151 deletions(-) delete mode 100755 coordinator/ENVDataPostProcessor.py delete mode 100644 coordinator/EnvSuitPipeline.py delete mode 100644 coordinator/NAMEPreProcessor.py delete mode 100644 coordinator/ProcessorComponents.py delete mode 100644 coordinator/ProcessorEnvironment.py delete mode 100644 coordinator/ProcessorServer.py delete mode 100644 coordinator/logs/.gitignore rename coordinator/{ProcessorAdvisory.py => processor_advisory.py} (91%) rename coordinator/{ProcessorDeposition.py => processor_deposition.py} (90%) create mode 100644 coordinator/processor_environment.py rename coordinator/{ProcessorEpidemiology.py => processor_epidemiology.py} (98%) rename coordinator/{extra/ProcessorMetResample.py => processor_met_resample.py} (98%) rename coordinator/{ProcessorScraper.py => processor_scraper.py} (98%) rename coordinator/{ProcessorSurveys.py => processor_surveys.py} (94%) create mode 100644 coordinator/survey_servers/__init__.py rename coordinator/{ProcessorSurveyUtils.py => survey_servers/processor_survey_utils.py} (100%) rename coordinator/{ProcessorSurveyskobotoolbox.py => survey_servers/processor_surveys_kobotoolbox.py} (98%) rename coordinator/{ProcessorSurveysnewODK.py => survey_servers/processor_surveys_new_odk.py} (98%) rename coordinator/{ProcessorSurveysnewODK2.py => survey_servers/processor_surveys_new_odk2.py} (98%) rename coordinator/{ProcessorSurveysODK.py => survey_servers/processor_surveys_odk.py} (98%) rename coordinator/{ProcessorSurveysODKSA.py => survey_servers/processor_surveys_odk_sa.py} (98%) rename coordinator/{ProcessorSurveysODKv2.py => survey_servers/processor_surveys_odk_v2.py} (99%) rename coordinator/{ProcessorSurveysWRSIS.py => survey_servers/processor_surveys_wrsis.py} (98%) rename coordinator/{ProcessorSurveysWRT.py => survey_servers/processor_surveys_wrt.py} (98%) create mode 100644 coordinator/utils/__init__.py rename coordinator/{BufferingSMTPHandler.py => utils/buffering_smtp_handler.py} (100%) rename coordinator/{ProcessorUtils.py => utils/processor_utils.py} (68%) diff --git a/coordinator/ENVDataPostProcessor.py b/coordinator/ENVDataPostProcessor.py deleted file mode 100755 index 218ee4f..0000000 --- a/coordinator/ENVDataPostProcessor.py +++ /dev/null @@ -1,121 +0,0 @@ -#ENVDataPostProcessor.py -'''Gathers RIE_value data from any parts into one file, for EPI model to run. - -This will be needed if any more parts-based calculations of environmental -suitability are run without the ews_plotting routine. - -Note that gather_RIE_values_v1_1() is only needed for dates missing from -/storage/app/EWS/Ethiopia/Plotting/output . - -gather_RIE_values_v1_3() is used in operational. -''' -import argparse -from glob import glob -import os -from pathlib import Path - -import numpy as np -import pandas as pd - -# Create the parser -my_parser = argparse.ArgumentParser(description='Different optional arguments') - -# Add the arguments -my_parser.add_argument( - '-inpath','--inpath', - metavar = 'path', - type = str, - required = True, - default = '/home/jws52/projects/py-coordination/ENVIRONMENT_20200115/env_suit_wheatrust_20200115/StripeRustOutput/', - help = 'the directory path to all of the separate part calcs') - -my_parser.add_argument( - '-outpath','--outpath', - metavar = 'path', - type = str, - required = True, - default = '/home/jws52/projects/py-coordination/ENVIRONMENT_20200115/EPI/ENV', - help = 'the directory path to gather separate part calcs') - -my_parser.add_argument( - '-rusttype','--rusttype', - choices = ['stripe','stem','leaf'], - type = str, - required = True, - default = 'stripe', - help = 'the directory path to gather separate part calcs') - -args = my_parser.parse_args() - -print(f"input arguments are:\n{args}") - -assert os.path.exists(args.inpath) - -# make output directory -outdir = f'{args.outpath}/{args.rusttype.title()}Rust' - -Path(outdir).mkdir(parents=True, exist_ok=True) - -def gather_RIE_values_v1_1(envpath=args.inpath,outpath=args.outpath,rusttype=args.rusttype): - - envparts = sorted(glob(envpath+'part_*/*/RIE_value.csv')) - - pdparts = [pd.read_csv(fn) for fn in envparts] - - pdall = pd.concat( - pdparts, - ignore_index=False, - keys=['part1','part2','part3','part4','part5','part6']) - - pdall.rename({'Unnamed: 0':'X'},axis=1,inplace=True) - - pdall.to_csv(f'{outpath}/{rusttype}/test_RIE_value.csv',index=False) - - #df2 = read_csv('test_RIE_value.csv') - - return pdall - -def gather_RIE_values_v1_3(envpath=args.inpath,outpath=args.outpath,rusttype=args.rusttype): - - envparts = sorted(glob(envpath+'Region*/*0000/RIE_value.csv')) - - pdparts = [pd.read_csv(fn) for fn in envparts] - - pdall = pd.concat( - pdparts, - ignore_index=False, - keys=['part1','part2','part3','part4','part5','part6']) - - pdall.set_index('Unnamed: 0',inplace=True) - - print(pdall) - - pdall.index.name = None - - pdall.to_csv(f'{outdir}/RIE_value.csv') - - #df2 = read_csv('test_RIE_value.csv') - - return pdall - - -def test_case(): - inpath_default = '/home/jws52/projects/py-coordination/ENVIRONMENT_20200115/env_suit_wheatrust_20200115/StripeRustOutput/' - outpath_default = '/home/jws52/projects/py-coordination/ENVIRONMENT_20200115/EPI/ENV' - rusttype_default = 'stripe' - - df2 = gather_RIE_values(inpath_default,outpath_default,rusttype_default) - - # example case to reproduce - fn1 = '/storage/app/EWS/Ethiopia/Workspace/ENVIRONMENT_20200115/EPI/ENVIRONMENT/Stripe/RIE_value.csv' - df1 = pd.read_csv(fn1) - - print('testing') - assert np.allclose(df1,df2) - -if __name__ == '__main__': - - df2 = gather_RIE_values_v1_3(args.inpath,args.outpath,args.rusttype) - -print("Finished!") - diff --git a/coordinator/EnvSuitPipeline.py b/coordinator/EnvSuitPipeline.py deleted file mode 100644 index eb830f8..0000000 --- a/coordinator/EnvSuitPipeline.py +++ /dev/null @@ -1,298 +0,0 @@ -"""Managing the :class:`met_processing` pipeline for the environmental suitability run.""" - -import datetime as dt -import json -import logging -import os -import shutil -from string import Template - -import pandas as pd - -from met_processing.common.params_file_parser import ParamsFileParser -from met_processing.runner.common import job_runner - -MAX_WORKERS: int = 10 - -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger('Processor.pipeline') - -############################################################################### - -def loadConfig(configFile): - '''Load a json config file.''' - - logger.info(f"Loading config file: {configFile}") - - try: - with open(configFile) as config_file: - config = json.load(config_file) - except: - logger.exception(f"Failure opening config file {configFile}") - raise - - return config - - -def getParameter(config, parameter): - '''Get a parameter from a config.''' - - logger.info(f"Getting {parameter} from the config") - - try: - result = config[parameter] - except: - logger.exception(f"No {parameter} in the config file!") - raise - - return result - - -def generate_temporal_points(file, datestr, timeresolution, nDaysForecast): - '''Create the time series excluding the end time.''' - - datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0) - - starttime = datenow + dt.timedelta(hours=timeresolution) - endtime = datenow + dt.timedelta(days=nDaysForecast) + dt.timedelta(hours=timeresolution) - timediff = dt.timedelta(hours=timeresolution) - - timeseries = [] - curr = starttime - - outfile = open(file, "w") - # Generate the time series excluding the end time. - while curr < endtime: - timeseries.append(curr) - outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M")) - outfile.write(outDateString) - curr += timediff - - outfile.close() - return outfile - - -def clean(workPath): - '''Clean temporary files and folders from the working directory.''' - - try: - logger.info(f"Clean temporary files and folders from {workPath}") - shutil.rmtree(workPath + 'extraction', ignore_errors=True) - shutil.rmtree(workPath + 'chunks', ignore_errors=True) - shutil.rmtree(workPath + 'post_processing', ignore_errors=True) - except: - logger.exception(f"Some failure when cleaning work directory", exc_info=True) - raise - - return - - -# def generate_all(sys_config, run_config): -# # Write run_config.json -# workPath = getParameter(run_config,'OUTPUT_DIR') -# run_configName = 'run_config.json' -# run_configFile = workPath + run_configName -# -# with open(run_configFile, 'w') as run_configJson: -# json.dump(run_config, run_configJson, indent=4) -# -# run_configJson.close() -# -# # Run all generate -# try: -# job_runner.generate_all_jobs(run_config, sys_config) -# except Exception: -# logger.exception(f"Some failure when running one of the generate job", exc_info=True) -# raise -# -# return - - -def run_extraction(run_params: dict, sys_params: dict): - '''Run weather data extraction with the :class:`met_processing.runner.common.job_runner` package.''' - - logger.info(f"Running regridding in multi process mode.") - job_runner.run_extraction(run_params, sys_params) - logger.info('Data extracted and chunked') - - -def run_post_processing(run_params: dict, sys_params: dict, processor_name: str): - '''Run post processing with the :class:`met_processing.runner.common.job_runner` package.''' - - logger.info(f"Running post-processing.") - job_runner.run_post_processing(run_params, sys_params, processor_name) - logger.info('Data post processing is completed') - - -# def run_merger(run_params: dict, sys_params: dict, processor_name: str): -# try: -# job_runner.run_merge_post_processing(run_params, sys_params, processor_name) -# except Exception: -# logger.exception(f"Some failure when running merge RIE", exc_info=True) -# raise - - -####################################### -#lawrence coment back to original (prevent_overwrite=True) -def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent_overwrite = True): - ''' - Run the whole :class:`met_processing` pipeline for environmental suitability. - ''' - # The prevent_overwrite parameter can be set to False if you want to re-run a job in-place. - - # Get parameters from the config - resourcesPath = getParameter(pipeline_config,'RESOURCES_PATH') - workPath = getParameter(pipeline_config,'WORK_PATH') + 'ENVIRONMENT_2.0_' + dateString + '/' - if not os.path.exists(workPath): - os.makedirs(workPath) - - inPath = getParameter(pipeline_config,'INPUT_PATH') - outPath = getParameter(pipeline_config,'OUTPUT_PATH') - - runType = getParameter(pipeline_config,'RUN_TYPE') - nDayExtraction = getParameter(pipeline_config,'EXTRACTION_DAYS') - nDayForecast = getParameter(pipeline_config,'FORECAST_DAYS') - - sys_config_file = getParameter(pipeline_config,'SYS_CONFIG') - sys_config = loadConfig(sys_config_file) - - templateName = 'template_' + runType + '_config.json' - template_configFile = resourcesPath + 'configs/' + templateName - config = loadConfig(template_configFile) - - # Before writing any files, check the output path doesn't exist already - # We might expect outPath to exist already, but not the processed subfolder - region_outPath = os.path.join(outPath,'ENVIRONMENT_2.0_'+dateString,'processed',region) - if prevent_overwrite: assert not os.path.exists(region_outPath) - - # Generate extraction (input) and output temporal points files - timeresolution = 3 # hours - - extraction_temporal_points_file = workPath + 'extraction_temporal_points.csv' - try: - logger.info(f"Generate extraction temporal points to: {extraction_temporal_points_file}") - if prevent_overwrite: assert not os.path.exists(extraction_temporal_points_file) - generate_temporal_points(extraction_temporal_points_file, dateString, timeresolution, nDayExtraction) - except: - logger.exception(f"Some failure when generate {extraction_temporal_points_file}", exc_info=True) - # extraction_temporal_points = pd.read_csv(extraction_temporal_points_file) - - output_temporal_points_file = workPath + 'output_temporal_points.csv' - try: - logger.info(f"Generate output temporal points to: {output_temporal_points_file}") - if prevent_overwrite: assert not os.path.exists(output_temporal_points_file) - generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast) - except: - logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info=True) - output_temporal_points = pd.read_csv(output_temporal_points_file) - temporal_dim = output_temporal_points.shape[0] - - # Modify run_config - config[ParamsFileParser.TIMEPOINTS_FILE_KEY] = extraction_temporal_points_file - config[ParamsFileParser.OUTPUT_DIR_KEY] = workPath - # config[ParamsFileParser.SPATIAL_POINTS_FILE_KEY] = input_spatial_points_file - # note that this field will only get used by the ewa5_met_data_extraction code, which uses a multi-processor module - config[ParamsFileParser.MAX_PROCESSORS_KEY] = MAX_WORKERS - - config['FIELD_NAME_CONSTANTS_PATH'] = getParameter(pipeline_config,'FIELD_NAME_CONSTANTS') - - if (runType == 'operational'): - config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/' - else: - config['NCDF_DIR_PATH'] = inPath - - ## START RUNS #################### - # os.chdir(workPath) - - N_processor = range(len(config['POST_PROCESSING']['PROCESSORS'])) - logger.info(f"Find {N_processor} processor") - - for p in N_processor: - processor_name = config['POST_PROCESSING']['PROCESSORS'][p]['PROCESSOR_NAME'] - if processor_name != 'RIE': - config['POST_PROCESSING']['PROCESSORS'][p]['TIMEPOINTS_FILE_PATH'] = extraction_temporal_points_file - - # Clean if extraction is not done - # if (extracted == False): - # clean(workPath) - - # generate_all(sys_config, config) - - # Extract - # if (extracted == False): - # run_extraction(config, sys_config) - # extracted = True - - logger.info(f"Starting {processor_name} post processor ---------------------------------") - run_post_processing(config, sys_config, processor_name) - - # run_merger(config, sys_config, processor_name) - else: - strains = getParameter(pipeline_config, 'STRAINS') - - config['POST_PROCESSING']['PROCESSORS'][p]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file - - for strain in strains: - # Modify strain specific suitability parts of the config - if pipeline_config['PARAMS'][strain]['future_steps'] > 0: - for i in range(len(config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'])): - config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'][i]['DURATION'] = pipeline_config['PARAMS'][strain]['future_steps'] - else: - for i in range(len(config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'])): - config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE" - - config['POST_PROCESSING']['PROCESSORS'][p]['PARAMS']['suitability_modules'] = pipeline_config['PARAMS'][strain]['suitability_modules'] - config['POST_PROCESSING']['PROCESSORS'][p]['PARAMS']['thresholds'] = pipeline_config['PARAMS'][strain]['thresholds'] - - # Clean if extraction is not done - # if (extracted == False): - # clean(workPath) - - # generate_all(sys_config, config) - - # Extract - # if (extracted == False): - # run_extraction(config, sys_config) - # extracted = True - - logger.info(f"Starting {strain} suitability ---------------------------------") - envSuitPath = workPath + 'post_processing/RIE/' - run_post_processing(config, sys_config, processor_name) - - # run_merger(config, sys_config, processor_name) - - resultFile = envSuitPath + 'RIE.nc' - strain_outPath = os.path.join(region_outPath,strain) - strain_outFile = strain_outPath + '/RIE_value.nc' - - # Check results dimension - # result = pd.read_csv(resultFile) - # result_dims = result.shape - - """ - read in the input_spatial_points.csv file and get the number of spatial points - this is used for - sense-checking the dimensions of the output - """ - # todo we could swap the spatial points file for a file specifying the expected dimensions - much smaller - region_spatial_points_file = resourcesPath + 'assets/' + 'input_spatial_points_' + region + '.csv' - spatial_points = pd.read_csv(region_spatial_points_file) - spatial_dim = spatial_points.shape[0] - - # if ((result_dims[0] != spatial_dim) or (result_dims[1] != (temporal_dim + 4))): # + 4 required because there are extra columns in the result file - # logger.error(f"Result dimension {result_dims} does not match with the expected: ({spatial_dim}, {temporal_dim + 4})") - # raise IndexError - - if not os.path.exists(strain_outPath): - os.makedirs(strain_outPath) - - shutil.copy(resultFile,strain_outFile) - - # todo - Add a flag to this part of the code to enable/disable csv writing as an option - # resultCSVFile = envSuitPath + 'RIE.csv' - # if os.path.isfile(resultCSVFile): - # strain_outFile = strain_outPath + '/RIE_value.csv' - # shutil.copy(resultCSVFile,strain_outFile) - - logger.info(f"{strain} result successfully created and moved to {strain_outPath}/") - - logger.info('SUCCESSFULLY FINISHED') diff --git a/coordinator/NAMEPreProcessor.py b/coordinator/NAMEPreProcessor.py deleted file mode 100644 index bb74ac6..0000000 --- a/coordinator/NAMEPreProcessor.py +++ /dev/null @@ -1,96 +0,0 @@ -import os -import sys - -import cftime -import iris.util -from iris.cube import CubeList, Cube - -""" -THIS IS NO LONGER USED NOW WE ARE PROCESSING THE .NC FILES SUPPLIED BY THE MET OFFICE -DELETE AFTER A "COOLING OFF" PERIOD -""" - -def save_cube(cube: iris.cube.Cube, - sanitised_field_name: str, - year_string: str, - month_string: str, - output_path: str): - - os.makedirs(output_path, exist_ok = True) - - file_name = "{}-no_part-{}_{}.nc".format(sanitised_field_name, year_string, month_string) - nc_file_path = os.path.join(output_path, file_name) - - print("writing {}".format(nc_file_path)) - iris.save(cube, nc_file_path) - - # nc_zip_file_path = "{}.zip".format(nc_file_path) - # print("zipping to {}".format(nc_zip_file_path)) - # with ZipFile(nc_zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zip: - # zip.write(nc_file_path, basename(nc_file_path)) - # - # print("deleting {}".format(nc_file_path)) - # os.remove(nc_file_path) - - -def process_met_office_NAME(name_file_wildcard: str, out_path: str): - - cube_list = iris.load_raw(name_file_wildcard) - - # get all the field names from the cubes as a unique set - field_names: set = set() - - for cube in cube_list: - field_names.add(cube.name()) - - # write a cube for each calendar month for each field type - for field_name in field_names: - - field_name: str = field_name - print(field_name) - cube: Cube = cube_list.extract(iris.Constraint(name = field_name)) - - for month in range(1, 13): - - month_constraint = iris.Constraint(time = lambda cell: cell.point.month == month) - - cubes_for_month: CubeList = cube.extract(month_constraint) - - if len(cubes_for_month) != 0: - cube_for_month: Cube = cubes_for_month.merge_cube() - - # hack, the 'cell_methods' field is NonType and this gets the save method of iris upset, - # so setting to an empty Tuple - cube_for_month.cell_methods = () - - time_coord = cube_for_month.coord('time') - sample_date = time_coord.units.num2date(time_coord.points[0]) - year_string = cftime.datetime.strftime(sample_date, "%Y") - month_string = cftime.datetime.strftime(sample_date, "%m") - # year_string = cftime.real_datetime.strftime(sample_date, "%Y") - # month_string = cftime.real_datetime.strftime(sample_date, "%m") - - # get rid of funny characters, for filename - sanitised_field_name = field_name.replace("(", "") - sanitised_field_name = sanitised_field_name.replace(")", "") - sanitised_field_name = sanitised_field_name.replace("/", "_") - - save_cube(cube_for_month, - sanitised_field_name, - year_string, - month_string, - out_path) - - -if __name__ == '__main__': - - """ - example usage: - /media/scratch/lb584_scratch/projects/ews/input/eth/MET_20200707/WR_EnvSuit_Met_Ethiopia_20200707/*.nc - /media/scratch/lb584_scratch/projects/ews/output/name_processing/another_dir - """ - - _name_file_wildcard = sys.argv[1] - _out_path = sys.argv[2] - - process_met_office_NAME(_name_file_wildcard, _out_path) diff --git a/coordinator/ProcessorComponents.py b/coordinator/ProcessorComponents.py deleted file mode 100644 index 268a6bd..0000000 --- a/coordinator/ProcessorComponents.py +++ /dev/null @@ -1,48 +0,0 @@ -#ProcessorComponents.py -'''Contains imports of all the specific functions to process survey data, -environmental suitability, spore deposition and epidemiology. These functions -are handled by Processor.py .''' - -import logging -import os -from typing import List - -# submodules of this project -# All of the process_* functions are callable from config files for the three -# coordinator stages: pre, in (during) and plotting. - - -#from ProcessorServer import ( - # process_pre_job_server_download, - #upload -#) - -from ProcessorUtils import ( - add_filters_to_sublogger, - # query_past_successes -) - -# TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance - -logger = logging.getLogger(__name__) - -# get path to this script -script_path = os.path.dirname(__file__)+'/' - -coordinator_path = script_path - -def do_nothing(*args, **kwargs) -> List: - '''Dummy function''' - - logger.info('Called do_nothing(). Nothing to do here') - - pass - return [] - -def do_nothing_return_true(*args, **kwargs) -> bool: - '''Dummy function''' - - logger.info('Called do_nothing_return_true(). Nothing to do here') - - pass - return True \ No newline at end of file diff --git a/coordinator/ProcessorEnvironment.py b/coordinator/ProcessorEnvironment.py deleted file mode 100644 index 0a26b95..0000000 --- a/coordinator/ProcessorEnvironment.py +++ /dev/null @@ -1,202 +0,0 @@ -#ProcessorEnvironment.py -'''Functions to process the environment component.''' - -import logging -import os -from distutils.dir_util import copy_tree -from glob import glob -from pathlib import Path - -from coordinator import EnvSuitPipeline -from coordinator.ProcessorServer import ( - get_data_from_server, -) -from coordinator.ProcessorUtils import add_filters_to_sublogger, short_name, get_only_existing_globs -from coordinator.processor_base import ProcessorBase -from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor -from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo - - -logger = logging.getLogger(__name__) - -class ProcessorEnvironment(ProcessorBase): - - def set_component_logger(self): - logger = logging.getLogger('Processor.Environment') - add_filters_to_sublogger(logger) - - def process_pre_job(self, args): - # return process_pre_job_server_download(args) - return True - - def process_in_job(self, jobPath, status, configjson, component) -> object: - return self.process_in_job_env2_0(jobPath, status, configjson, component) - - - def process_post_job(self, jobPath, configjson) -> [str]: - return self.process_EWS_plotting_env2_0(jobPath, configjson) - - - def __init__(self) -> None: - super().__init__() - - def process_in_job_env2_0(self, jobPath,status,config,component) -> dict: - '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' - - logger.info('started process_in_job_env2_0()') - - # Initialise output variable - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = [] - - logger.info('Copying file from remote server to job directory') - - data_result = get_data_from_server(jobPath,config,component) - - if data_result['output'] is not None: - proc_out['output'] = data_result['output'] - - proc_out['clearup'] += data_result['clearup'] - - region = config['RegionName'] - - logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") - - pipeline_config = config["Environment"] - try: - #todo lawrence comment this back to original (extracted=False) - EnvSuitPipeline.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) - except: - logger.exception(f"Some failure when running EnvSuitPipeline.py") - raise - - logger.info('Finished running environmental suitability 2.0') - - # TODO: Check that the output appears as expected - - return proc_out - - def process_copy_past_job_env2_0(self, jobPath,status,config,component): - '''For when we want to skip process_in_job() to test the other components of - this script. Currently hard-wired.''' - - # TODO: remove this hard-wired assumption - jobPath_to_copy = f"{jobPath}/../{short_name['Environment']}_{config['StartString']}_bak/" - - assert os.path.exists(jobPath_to_copy) - - dir_src = f"{jobPath_to_copy}/processed/" - - dir_dst = f"{jobPath}/processed/" - - logger.info(f"Copying from {dir_src}") - - logger.info(f"to {dir_dst}") - - copy_tree(dir_src,dir_dst) - - logger.info('Copying complete') - - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = None - - return proc_out - - '''class EWSPlottingEnvSuit(EWSPlottingEnvSuitBase): - - def set_custom_params(self, - sys_params_dict: dict, - chart_params_dict: dict, - run_params_dict: dict, - disease_csv_template_arg: str, - diseases: List[EnvSuitDiseaseInfo]): - # this is unique to the asia/east africa env suit, as we are not filtering within country boundaries - run_params_dict[RUN_PARAMS.FILTER_FOR_COUNTRY_KEY] = "False"''' - - #TODO test if this works - def process_EWS_plotting_env2_0(self, jobPath,config) -> [str]: - '''Configures the plotting arguments and calls EWS-plotting as a python module. - Returns a list of output files for transfer.''' - - logger.info('started process_EWS_plotting_env2_0()') - - main_region = config['RegionName'] - - input_dir = f"{jobPath}/processed/{main_region}" - - subregions = config['SubRegionNames'] - - ews_plotting_output_globs = [] - - # work on each region - for region in subregions: - - output_dir = f"{jobPath}/plotting/{region.lower()}" - rie_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.nc" - - Path(output_dir).mkdir(parents=True, exist_ok=True) - - sys_config = config['Environment']['EWS-Plotting']['SysConfig'] - run_config = config['Environment']['EWS-Plotting']['RunConfig'] - chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig'] - filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry'] - - # Note that this runs all disease types available - - logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") - - env_suit_processor = EnvSuitPostProcessor() - env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, - chart_params_file_arg = chart_config, - run_params_file_arg = run_config, - es_output_dir_arg = output_dir, - issue_date_arg = config['StartString'], - disease_nc_template_arg = rie_template_dir) - - env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE") - - # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts. - # TODO: move this part out into a config - extra_diseases = [ - EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", rie_template_dir), - EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", rie_template_dir), - EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", rie_template_dir), - ] - - env_suit_processor.add_diseases(diseases=extra_diseases) - - env_suit_processor.process() - - # check the output - ews_plotting_output_dir = f"{output_dir}/images/" - #EWSPlottingOutputGlobs += [ - # # daily plots - # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", - # # weekly plots - # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] - - ews_plotting_output_globs = [f"{ews_plotting_output_dir}*"] - - # check the output - ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) - - # check there is some output from EWS-plotting - if not ews_plotting_output_globs: - logger.error('EWS-Plotting did not produce any output') - raise RuntimeError - - # provide list for transfer - ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) - - return ews_plotting_outputs - - -if __name__ == '__main__': - processor = ProcessorEnvironment() - processor.run_processor("Environment") diff --git a/coordinator/ProcessorServer.py b/coordinator/ProcessorServer.py deleted file mode 100644 index a59fc1e..0000000 --- a/coordinator/ProcessorServer.py +++ /dev/null @@ -1,77 +0,0 @@ -#ProcessorServer.py -'''Functions to communicate with server sites for download and upload.''' - -import logging -import tarfile -from pathlib import Path -from string import Template - -from iris import load -from iris.cube import CubeList - -from coordinator.ProcessorUtils import subprocess_and_log, remove_path_from_tar_members - - -logger = logging.getLogger(__name__) - - -def get_data_from_server(jobPath,config,component): - - logger.info('Copying file from remote server to job directory') - - # Initialise output variable - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = [] - - file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) - file_name = Template(config[component]['InputFileTemplate']).substitute(**config) - - #TODO: check if file exists already (may be the case for multiple configs in one) - - # TODO: perform ssh file transfer in python instead of subprocess - server_name: str = config['ServerName'] - if server_name == "": - cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", jobPath] - else: - cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", - f"{server_name}:{file_path}/{file_name}.tar.gz", jobPath] - - description_short = 'env2 scp' - description_long = 'Copying file from remote server to job directory' - # lawrence comment in/out - subprocess_and_log(cmd_scp,description_short, description_long) - - logger.info('untarring the input file') - - # untar incoming name data - output_directory = f"{jobPath}/NAME_Met_as_netcdf" - Path(output_directory).mkdir(parents=True, exist_ok=True) - tarfile_name = f"{jobPath}/{file_name}.tar.gz" - with tarfile.open(tarfile_name) as tar: - members = remove_path_from_tar_members(tar) - tar.extractall(output_directory, members = members) - - # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) - cube_wildcard = f"{output_directory}/*.nc" - cubes: CubeList = load(cube_wildcard) - - # land_fraction and topography will only have a single timepoint (as these dont change over time), so we can ignore - # these when sense-checking the expected number of timepoints - ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"] - - for cube in cubes: - var_name = cube.name() - coord = cube.coord("time") - timepoint_count = coord.shape[0] - if timepoint_count != 57 and var_name not in ignore_list: - msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" - logger.error(msg) - raise RuntimeError(msg) - - # tar file can be removed if required - proc_out['clearup'] += [f"{jobPath}/{file_name}.tar.gz",] - - return proc_out diff --git a/coordinator/logs/.gitignore b/coordinator/logs/.gitignore deleted file mode 100644 index 28db78b..0000000 --- a/coordinator/logs/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Ignore everything in this directory -.gitignore -# Except this file -!.gitignore diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/processor_advisory.py similarity index 91% rename from coordinator/ProcessorAdvisory.py rename to coordinator/processor_advisory.py index 5ec7c1b..816cca2 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/processor_advisory.py @@ -6,17 +6,12 @@ import logging # gitlab projects # TODO: Package these projects so they are robust for importing from AdvisoryBuilder import DataGatherer # created by jws52 -from coordinator.ProcessorUtils import add_filters_to_sublogger, short_name +from coordinator.utils.processor_utils import short_name from coordinator.processor_base import ProcessorBase class ProcessorAdvisory(ProcessorBase): - def set_component_logger(self): - logger = logging.getLogger('Processor.Advisory') - add_filters_to_sublogger(logger) - - def process_pre_job(self, args) -> bool: # return self.process_pre_job_advisory(args) return True diff --git a/coordinator/processor_base.py b/coordinator/processor_base.py index 345c07d..f099b95 100755 --- a/coordinator/processor_base.py +++ b/coordinator/processor_base.py @@ -17,10 +17,10 @@ or:: $ ./run_Processor.sh -p Deposition --islive --config config_Nepal_template.json --noupload -s 20200715 ''' from abc import abstractmethod, ABCMeta -from typing import List, Union, Any, Dict +from typing import List -from coordinator import ProcessorUtils -from coordinator.ProcessorUtils import short_name, open_and_check_config, endScript, endJob, append_item_to_list, \ +from coordinator.utils import processor_utils +from coordinator.utils.processor_utils import short_name, open_and_check_config, end_script, end_job, append_item_to_list, \ clear_up @@ -179,7 +179,7 @@ class ProcessorBase: except: logger.exception(f"Failure in opening or checking config {configFile}") - endScript(premature = True) + end_script(premature = True) for key in keys: @@ -196,7 +196,7 @@ class ProcessorBase: if len(universal_config[key]) > 1: logger.error(f"Config files point to multiple {key} but this script can only handle one.") - endScript(premature = True) + end_script(premature = True) universal_config[key] = universal_config[key].pop() @@ -244,7 +244,7 @@ class ProcessorBase: """ Now we have the job_dir defined, we can set up the logging """ - ProcessorUtils.setup_logging(log_file_path, is_live, args['log_level']) + processor_utils.setup_logging(log_file_path, is_live, args['log_level']) logger.info("==========") logger.info(f"Logging started at {datetime.datetime.now().strftime('%Y %b %d %H:%M:%S')}") @@ -268,7 +268,7 @@ class ProcessorBase: if status.had_initial_status: logger.info(f"Job path already exists and has status {status.status}") - endScript(premature = status.status not in ['SUCCESS', 'INPROGRESS']) + end_script(premature = status.status not in ['SUCCESS', 'INPROGRESS']) logger.info(f"Current status of job directory is {status.status}") @@ -278,7 +278,7 @@ class ProcessorBase: f"Process_pre_job raised an error so making a record in the job file. For details, see earlier warnings log") status.reset('ERROR') - endJob(status, ignore_inprogress = True, premature = False) + end_job(status, ignore_inprogress = True, premature = False) # files and directories that will be earmarked for removal after a # successful job @@ -292,7 +292,7 @@ class ProcessorBase: logger.info(f'Working on config {configIndex + 1} of {config_paths_length}') try: - configjson = open_and_check_config(configtemplate) + configjson: dict = open_and_check_config(configtemplate) except: logger.exception(f"Failure in opening or checking config {configtemplate}") # TODO: This case should test flagdir.jobStatus.__exit__() @@ -320,7 +320,7 @@ class ProcessorBase: except: logger.exception(f"Error in process_in_job") status.reset('ERROR') - endJob(status, premature = True) + end_job(status, premature = True) # Set default case # This would be improved by implementing a class structure @@ -344,7 +344,7 @@ class ProcessorBase: except: logger.exception(f"Error in {proc_description}()") status.reset('ERROR') - endJob(status, premature = True) + end_job(status, premature = True) logger.info('Finished with EWS-Plotting, appending images to list for transfer') @@ -362,7 +362,7 @@ class ProcessorBase: clear_up(paths_to_clear, clearup_dest = clearup_dest_dir) - endScript(premature = False) + end_script(premature = False) @abstractmethod def process_pre_job(self, args) -> bool: @@ -373,15 +373,7 @@ class ProcessorBase: raise NotImplementedError @abstractmethod - def process_post_job(self, jobPath, configjson) -> [str]: - raise NotImplementedError - - @abstractmethod - def set_component_logger(self): - """ - overridden in sub classes to set component-specific loggers - :return: - """ + def process_post_job(self, jobPath, configjson): raise NotImplementedError def run_processor(self, component: str): diff --git a/coordinator/ProcessorDeposition.py b/coordinator/processor_deposition.py similarity index 90% rename from coordinator/ProcessorDeposition.py rename to coordinator/processor_deposition.py index cd0bb8d..e042c8d 100644 --- a/coordinator/ProcessorDeposition.py +++ b/coordinator/processor_deposition.py @@ -10,8 +10,8 @@ from string import Template import iris from iris.cube import CubeList -from coordinator import ProcessorUtils -from coordinator.ProcessorUtils import subprocess_and_log, get_only_existing_globs +from coordinator.utils import processor_utils +from coordinator.utils.processor_utils import subprocess_and_log, get_only_existing_globs from coordinator.processor_base import ProcessorBase from ews_postprocessing.deposition.deposition_post_processor import DepositionPostProcessor @@ -31,8 +31,8 @@ class ProcessorDeposition(ProcessorBase): return self.process_in_job_dep(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson) -> [str]: - return self.process_EWS_plotting_dep(jobPath, configjson) + def process_post_job(self, jobPath, configjson): + self.process_EWS_plotting_dep(jobPath, configjson) def __init__(self) -> None: @@ -42,12 +42,6 @@ class ProcessorDeposition(ProcessorBase): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - def set_component_logger(self): - # logger = logging.getLogger('Processor.Deposition') - # add_filters_to_sublogger(logger) - pass - - def process_in_job_dep(self, jobPath, status, config, component): logger.info('started process_in_job_dep()') @@ -166,12 +160,6 @@ class ProcessorDeposition(ProcessorBase): # check the output ews_plotting_output_dir = f"{output_dir}/images/" - #EWSPlottingOutputGlobs += [ - # # daily plots - # f"{EWSPlottingOutputDir}Daily/deposition_{region.lower()}_*_daily_20*.png", - # # weekly plots - # f"{EWSPlottingOutputDir}Weekly/deposition_{region.lower()}_*_total_20*.png"] - ews_plotting_output_globs += [f"{ews_plotting_output_dir}*"] ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs, inplace = False) @@ -189,7 +177,7 @@ class ProcessorDeposition(ProcessorBase): if __name__ == '__main__': print(__name__) - print(ProcessorUtils.__name__) + print(processor_utils.__name__) processor = ProcessorDeposition() processor.run_processor("Deposition") diff --git a/coordinator/processor_environment.py b/coordinator/processor_environment.py new file mode 100644 index 0000000..bb842bb --- /dev/null +++ b/coordinator/processor_environment.py @@ -0,0 +1,371 @@ +#ProcessorEnvironment.py +'''Functions to process the environment component.''' + +import datetime as dt +import json +import logging +import os +import shutil +from glob import glob +from pathlib import Path + +import pandas as pd + +# from coordinator.env_suit import env_suit_pipeline +from coordinator.processor_base import ProcessorBase +from coordinator.utils.processor_utils import get_only_existing_globs, get_input_data +from ews_postprocessing.environmental_suitability.env_suit_post_processor import EnvSuitPostProcessor +from ews_postprocessing.utils.disease_info import EnvSuitDiseaseInfo +from met_processing.common.params_file_parser import ParamsFileParser + + +logger = logging.getLogger(__name__) + +class ProcessorEnvironment(ProcessorBase): + + def process_pre_job(self, args) -> bool: + return True + + def process_in_job(self, job_path, status, configjson, component) -> object: + return self.process_in_job_env2_0(job_path, status, configjson, component) + + + def process_post_job(self, job_path, configjson): + self.process_ews_plotting(job_path, configjson) + + + def __init__(self) -> None: + super().__init__() + + def process_in_job_env2_0(self, jobPath,status,config,component) -> dict: + '''Download met data from remote, prepare data, and run :class:`EnvSuitPipeline` pipeline.''' + + logger.info('started process_in_job_env2_0()') + + # Initialise output variable + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = [] + + logger.info('Copying file from remote server to job directory') + + data_result = get_input_data(jobPath, config, component) + + if data_result['output'] is not None: + proc_out['output'] = data_result['output'] + + proc_out['clearup'] += data_result['clearup'] + + region = config['RegionName'] + + logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") + + pipeline_config = config["Environment"] + try: + #todo lawrence comment this back to original (extracted=False) + self.run_pipeline(pipeline_config, region, config["StartString"]) + except: + logger.exception(f"Some failure when running EnvSuitPipeline.py") + raise + + logger.info('Finished running environmental suitability 2.0') + + # TODO: Check that the output appears as expected + + return proc_out + + + @staticmethod + def loadConfig(configFile): + '''Load a json config file.''' + + logger.info(f"Loading config file: {configFile}") + + try: + with open(configFile) as config_file: + config = json.load(config_file) + except: + logger.exception(f"Failure opening config file {configFile}") + raise + + return config + + + @staticmethod + def getParameter(config, parameter): + '''Get a parameter from a config.''' + + logger.info(f"Getting {parameter} from the config") + + try: + result = config[parameter] + except: + logger.exception(f"No {parameter} in the config file!") + raise + + return result + + + @staticmethod + def generate_temporal_points(file, datestr, timeresolution, nDaysForecast): + '''Create the time series excluding the end time.''' + + datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0) + + starttime = datenow + dt.timedelta(hours = timeresolution) + endtime = datenow + dt.timedelta(days = nDaysForecast) + dt.timedelta(hours = timeresolution) + timediff = dt.timedelta(hours = timeresolution) + + timeseries = [] + curr = starttime + + outfile = open(file, "w") + # Generate the time series excluding the end time. + while curr < endtime: + timeseries.append(curr) + outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M")) + outfile.write(outDateString) + curr += timediff + + outfile.close() + return outfile + + + @staticmethod + def clean(workPath): + '''Clean temporary files and folders from the working directory.''' + + try: + logger.info(f"Clean temporary files and folders from {workPath}") + shutil.rmtree(workPath + 'extraction', ignore_errors = True) + shutil.rmtree(workPath + 'chunks', ignore_errors = True) + shutil.rmtree(workPath + 'post_processing', ignore_errors = True) + except: + logger.exception(f"Some failure when cleaning work directory", exc_info = True) + raise + + return + + + @staticmethod + def run_post_processing(run_params: dict, sys_params: dict, processor_name: str): + '''Run post processing with the :class:`met_processing.runner.common.job_runner` package.''' + + logger.info(f"Running post-processing.") + from met_processing.runner.common import job_runner + job_runner.run_post_processing(run_params, sys_params, processor_name) + logger.info('Data post processing is completed') + + + @staticmethod + def run_pipeline(pipeline_config, region, dateString, prevent_overwrite = True): + ''' + Run the whole :class:`met_processing` pipeline for environmental suitability. + ''' + # The prevent_overwrite parameter can be set to False if you want to re-run a job in-place. + + # Get parameters from the config + resourcesPath = ProcessorEnvironment.getParameter(pipeline_config, 'RESOURCES_PATH') + workPath = ProcessorEnvironment.getParameter(pipeline_config, 'WORK_PATH') + 'ENVIRONMENT_2.0_' + dateString + '/' + if not os.path.exists(workPath): + os.makedirs(workPath) + + inPath = ProcessorEnvironment.getParameter(pipeline_config, 'INPUT_PATH') + outPath = ProcessorEnvironment.getParameter(pipeline_config, 'OUTPUT_PATH') + + runType = ProcessorEnvironment.getParameter(pipeline_config, 'RUN_TYPE') + nDayExtraction = ProcessorEnvironment.getParameter(pipeline_config, 'EXTRACTION_DAYS') + nDayForecast = ProcessorEnvironment.getParameter(pipeline_config, 'FORECAST_DAYS') + + sys_config_file = ProcessorEnvironment.getParameter(pipeline_config, 'SYS_CONFIG') + sys_config = ProcessorEnvironment.loadConfig(sys_config_file) + + templateName = 'template_' + runType + '_config.json' + template_configFile = resourcesPath + 'configs/' + templateName + config = ProcessorEnvironment.loadConfig(template_configFile) + + # Before writing any files, check the output path doesn't exist already + # We might expect outPath to exist already, but not the processed subfolder + region_outPath = os.path.join(outPath, 'ENVIRONMENT_2.0_' + dateString, 'processed', region) + if prevent_overwrite: assert not os.path.exists(region_outPath) + + # Generate extraction (input) and output temporal points files + timeresolution = 3 # hours + + extraction_temporal_points_file = workPath + 'extraction_temporal_points.csv' + try: + logger.info(f"Generate extraction temporal points to: {extraction_temporal_points_file}") + if prevent_overwrite: assert not os.path.exists(extraction_temporal_points_file) + ProcessorEnvironment.generate_temporal_points(extraction_temporal_points_file, dateString, timeresolution, nDayExtraction) + except: + logger.exception(f"Some failure when generate {extraction_temporal_points_file}", exc_info = True) + # extraction_temporal_points = pd.read_csv(extraction_temporal_points_file) + + output_temporal_points_file = workPath + 'output_temporal_points.csv' + try: + logger.info(f"Generate output temporal points to: {output_temporal_points_file}") + if prevent_overwrite: assert not os.path.exists(output_temporal_points_file) + ProcessorEnvironment.generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast) + except: + logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info = True) + output_temporal_points = pd.read_csv(output_temporal_points_file) + temporal_dim = output_temporal_points.shape[0] + + # Modify run_config + config[ParamsFileParser.TIMEPOINTS_FILE_KEY] = extraction_temporal_points_file + config[ParamsFileParser.OUTPUT_DIR_KEY] = workPath + # config[ParamsFileParser.SPATIAL_POINTS_FILE_KEY] = input_spatial_points_file + + config['FIELD_NAME_CONSTANTS_PATH'] = ProcessorEnvironment.getParameter(pipeline_config, 'FIELD_NAME_CONSTANTS') + + if (runType == 'operational'): + config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/' + else: + config['NCDF_DIR_PATH'] = inPath + + ## START RUNS #################### + + N_processor = range(len(config['POST_PROCESSING']['PROCESSORS'])) + logger.info(f"Find {N_processor} processor") + + for p in N_processor: + processor_name = config['POST_PROCESSING']['PROCESSORS'][p]['PROCESSOR_NAME'] + if processor_name != 'RIE': + config['POST_PROCESSING']['PROCESSORS'][p]['TIMEPOINTS_FILE_PATH'] = extraction_temporal_points_file + + logger.info(f"Starting {processor_name} post processor ---------------------------------") + ProcessorEnvironment.run_post_processing(config, sys_config, processor_name) + + # run_merger(config, sys_config, processor_name) + else: + strains = ProcessorEnvironment.getParameter(pipeline_config, 'STRAINS') + + config['POST_PROCESSING']['PROCESSORS'][p]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file + + for strain in strains: + # Modify strain specific suitability parts of the config + if pipeline_config['PARAMS'][strain]['future_steps'] > 0: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'][i]['DURATION'] = \ + pipeline_config['PARAMS'][strain]['future_steps'] + else: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][p]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE" + + config['POST_PROCESSING']['PROCESSORS'][p]['PARAMS']['suitability_modules'] = \ + pipeline_config['PARAMS'][strain]['suitability_modules'] + config['POST_PROCESSING']['PROCESSORS'][p]['PARAMS']['thresholds'] = \ + pipeline_config['PARAMS'][strain]['thresholds'] + + logger.info(f"Starting {strain} suitability ---------------------------------") + envSuitPath = workPath + 'post_processing/RIE/' + ProcessorEnvironment.run_post_processing(config, sys_config, processor_name) + + # run_merger(config, sys_config, processor_name) + + resultFile = envSuitPath + 'RIE.nc' + strain_outPath = os.path.join(region_outPath, strain) + strain_outFile = strain_outPath + '/RIE_value.nc' + + """ + read in the input_spatial_points.csv file and get the number of spatial points - this is used for + sense-checking the dimensions of the output + """ + # todo we could swap the spatial points file for a file specifying the expected dimensions - much smaller + region_spatial_points_file = resourcesPath + 'assets/' + 'input_spatial_points_' + region + '.csv' + spatial_points = pd.read_csv(region_spatial_points_file) + spatial_dim = spatial_points.shape[0] + + if not os.path.exists(strain_outPath): + os.makedirs(strain_outPath) + + shutil.copy(resultFile, strain_outFile) + + # todo - Add a flag to this part of the code to enable/disable csv writing as an option + # resultCSVFile = envSuitPath + 'RIE.csv' + # if os.path.isfile(resultCSVFile): + # strain_outFile = strain_outPath + '/RIE_value.csv' + # shutil.copy(resultCSVFile,strain_outFile) + + logger.info(f"{strain} result successfully created and moved to {strain_outPath}/") + + logger.info('SUCCESSFULLY FINISHED') + + @staticmethod + def process_ews_plotting(jobPath, config) -> [str]: + '''Configures the plotting arguments and calls EWS-plotting as a python module. + Returns a list of output files for transfer.''' + + logger.info('started process_EWS_plotting_env2_0()') + + main_region = config['RegionName'] + + input_dir = f"{jobPath}/processed/{main_region}" + + subregions = config['SubRegionNames'] + + ews_plotting_output_globs = [] + + # work on each region + for region in subregions: + + output_dir = f"{jobPath}/plotting/{region.lower()}" + rie_template_dir = input_dir + "/{DISEASE_DIR}/RIE_value.nc" + + Path(output_dir).mkdir(parents=True, exist_ok=True) + + sys_config = config['Environment']['EWS-Plotting']['SysConfig'] + run_config = config['Environment']['EWS-Plotting']['RunConfig'] + chart_config = config['Environment']['EWS-Plotting'][region]['ChartConfig'] + filter_for_country = config['Environment']['EWS-Plotting'][region]['FilterForCountry'] + + # Note that this runs all disease types available + + logger.info(f"Running EWS-Plotting with the following configs:\n{sys_config}\n{run_config}\n{chart_config}") + + env_suit_processor = EnvSuitPostProcessor() + env_suit_processor.set_param_config_files(sys_params_file_arg = sys_config, + chart_params_file_arg = chart_config, + run_params_file_arg = run_config, + es_output_dir_arg = output_dir, + issue_date_arg = config['StartString'], + disease_nc_template_arg = rie_template_dir) + + env_suit_processor.run_params.FILTER_FOR_COUNTRY = (filter_for_country.upper() == "TRUE") + + # Include further diseases in plotting. In this case the irrigated suitabilite for the rusts. + # TODO: move this part out into a config + extra_diseases = [ + EnvSuitDiseaseInfo("Stem rust temp-only", "stem_rust_temponly", config['StartString'], "StemRust_TempOnly", rie_template_dir), + EnvSuitDiseaseInfo("Leaf rust temp-only", "leaf_rust_temponly", config['StartString'], "LeafRust_TempOnly", rie_template_dir), + EnvSuitDiseaseInfo("Stripe rust temp-only", "stripe_temponly", config['StartString'], "StripeRust_TempOnly", rie_template_dir), + ] + + env_suit_processor.add_diseases(diseases=extra_diseases) + + env_suit_processor.process() + + # check the output + ews_plotting_output_dir = f"{output_dir}/images/" + + ews_plotting_output_globs = [f"{ews_plotting_output_dir}*"] + + # check the output + ews_plotting_output_globs = get_only_existing_globs(ews_plotting_output_globs,inplace=False) + + # check there is some output from EWS-plotting + if not ews_plotting_output_globs: + logger.error('EWS-Plotting did not produce any output') + raise RuntimeError + + # provide list for transfer + ews_plotting_outputs: [str] = sorted([file for glob_str in ews_plotting_output_globs for file in glob(glob_str)]) + + return ews_plotting_outputs + + +if __name__ == '__main__': + processor = ProcessorEnvironment() + processor.run_processor("Environment") diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/processor_epidemiology.py similarity index 98% rename from coordinator/ProcessorEpidemiology.py rename to coordinator/processor_epidemiology.py index 2333de8..69038c1 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/processor_epidemiology.py @@ -25,12 +25,11 @@ from EpiModel.EpiPrep import prep from coordinator.processor_base import ProcessorBase from ews_postprocessing.epi.epi_post_processor import EPIPostPostProcessor -from coordinator.ProcessorUtils import ( +from coordinator.utils.processor_utils import ( calc_epi_date_range, open_and_check_config, get_only_existing_globs, - endJob, - add_filters_to_sublogger, + end_job, short_name, disease_latin_name_dict ) @@ -38,10 +37,6 @@ logger = logging.getLogger(__name__) class ProcessorEpidemiology(ProcessorBase): - def set_component_logger(self): - logger = logging.getLogger('Processor.Epi') - add_filters_to_sublogger(logger) - def process_pre_job(self, args) -> bool: return self.process_pre_job_epi(args) @@ -50,8 +45,8 @@ class ProcessorEpidemiology(ProcessorBase): return self.process_in_job_epi(jobPath, status, configjson, component) - def process_post_job(self, jobPath, configjson) -> [str]: - return self.process_EWS_plotting_epi(jobPath, configjson) + def process_post_job(self, jobPath, configjson): + self.process_EWS_plotting_epi(jobPath, configjson) def __init__(self) -> None: @@ -384,7 +379,7 @@ class ProcessorEpidemiology(ProcessorBase): logger.exception(f"Unexpected error in {component} data preparation") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) # configure filename of prepared environment data @@ -408,7 +403,7 @@ class ProcessorEpidemiology(ProcessorBase): logger.exception(f"Unexpected error in {component} data preparation") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) # prepare a copy of the host data @@ -486,7 +481,7 @@ class ProcessorEpidemiology(ProcessorBase): logger.exception(f"Unexpected error in {model_name} data preparation") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) # provide fundamental config elements to config_epi diff --git a/coordinator/extra/ProcessorMetResample.py b/coordinator/processor_met_resample.py similarity index 98% rename from coordinator/extra/ProcessorMetResample.py rename to coordinator/processor_met_resample.py index fb7d748..81e8f4f 100644 --- a/coordinator/extra/ProcessorMetResample.py +++ b/coordinator/processor_met_resample.py @@ -21,8 +21,9 @@ from EpiModel.EpiPrep import lister, loader, prep from EpiModel.EpiUtils import ( datetime_range, parse_template_string) -from coordinator.ProcessorUtils import add_filters_to_sublogger, calc_epi_date_range, short_name, open_and_check_config from coordinator.processor_base import ProcessorBase +from coordinator.utils.processor_utils import calc_epi_date_range, short_name, open_and_check_config + logger = logging.getLogger(__name__) @@ -46,11 +47,6 @@ class ProcessorMetResample(ProcessorBase): def __init__(self) -> None: super().__init__() - def set_component_logger(self): - logger = logging.getLogger('Processor.Extra.MetResample') - add_filters_to_sublogger(logger) - - def gather_data( self, config, diff --git a/coordinator/ProcessorScraper.py b/coordinator/processor_scraper.py similarity index 98% rename from coordinator/ProcessorScraper.py rename to coordinator/processor_scraper.py index 5e89c07..23ab96b 100644 --- a/coordinator/ProcessorScraper.py +++ b/coordinator/processor_scraper.py @@ -10,7 +10,6 @@ Asif Al Faisal (CIMMYT-Bangladesh).''' import datetime import json -import logging import os from pathlib import Path import requests @@ -27,18 +26,12 @@ from processor_base import ProcessorBase # TODO: Package these projects so they are robust for importing from flagdir import jobStatus # created by jws52 -from ProcessorUtils import add_filters_to_sublogger class ProcessorScraper(ProcessorBase): """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - - def set_component_logger(self): - logger = logging.getLogger('Processor.Scraper') - add_filters_to_sublogger(logger) - def process_pre_job(self, args): return True diff --git a/coordinator/ProcessorSurveys.py b/coordinator/processor_surveys.py similarity index 94% rename from coordinator/ProcessorSurveys.py rename to coordinator/processor_surveys.py index 6d81774..ce0d53f 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/processor_surveys.py @@ -34,14 +34,14 @@ from numpy import all as np_all from numpy import any as np_any from pandas import read_csv, concat -from coordinator.ProcessorSurveysODK import get_ODK_form_as_csv -from coordinator.ProcessorSurveysODKSA import get_ODK_SA_form_as_csv -from coordinator.ProcessorSurveysODKv2 import get_ODKv2_form_as_csv -from coordinator.ProcessorSurveysWRSIS import get_WRSIS_form_as_csv -from coordinator.ProcessorSurveysWRT import get_WRT_form_as_csv -from coordinator.ProcessorSurveyskobotoolbox import get_kobotoolbox_form_as_csv -from coordinator.ProcessorSurveysnewODK import get_newODK_form_as_csv -from coordinator.ProcessorSurveysnewODK2 import get_newODK2_form_as_csv +from coordinator.survey_servers.processor_surveys_odk import get_ODK_form_as_csv +from coordinator.survey_servers.processor_surveys_odk_sa import get_ODK_SA_form_as_csv +from coordinator.survey_servers.processor_surveys_odk_v2 import get_ODKv2_form_as_csv +from coordinator.survey_servers.processor_surveys_wrsis import get_WRSIS_form_as_csv +from coordinator.survey_servers.processor_surveys_wrt import get_WRT_form_as_csv +from coordinator.survey_servers.processor_surveys_kobotoolbox import get_kobotoolbox_form_as_csv +from coordinator.survey_servers.processor_surveys_new_odk import get_newODK_form_as_csv +from coordinator.survey_servers.processor_surveys_new_odk2 import get_newODK2_form_as_csv from coordinator.processor_base import ProcessorBase from source_gen.clustering import run_case @@ -50,11 +50,6 @@ logger = logging.getLogger(__name__) class ProcessorSurveys(ProcessorBase): - def set_component_logger(self): - # logger = logging.getLogger('Processor.Surveys') - # add_filters_to_sublogger(logger) - pass - def process_pre_job(self, args): # return self.process_pre_job_survey(args) return True @@ -82,12 +77,6 @@ class ProcessorSurveys(ProcessorBase): 'newODK2' : get_newODK2_form_as_csv, } - # def process_pre_job_survey(self, input_args): - # '''Returns a boolean as to whether the job is ready for full processing.''' - # logger.info('started process_pre_job_survey(), nothing to do') - # - # return True - def process_in_job_survey(self, jobPath,status,config,component): logger.info('started process_in_job_survey()') diff --git a/coordinator/survey_servers/__init__.py b/coordinator/survey_servers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/coordinator/ProcessorSurveyUtils.py b/coordinator/survey_servers/processor_survey_utils.py similarity index 100% rename from coordinator/ProcessorSurveyUtils.py rename to coordinator/survey_servers/processor_survey_utils.py diff --git a/coordinator/ProcessorSurveyskobotoolbox.py b/coordinator/survey_servers/processor_surveys_kobotoolbox.py similarity index 98% rename from coordinator/ProcessorSurveyskobotoolbox.py rename to coordinator/survey_servers/processor_surveys_kobotoolbox.py index 2b39a9b..131b873 100644 --- a/coordinator/ProcessorSurveyskobotoolbox.py +++ b/coordinator/survey_servers/processor_surveys_kobotoolbox.py @@ -11,8 +11,8 @@ import requests from shutil import copyfile from pandas import DataFrame -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -226,7 +226,7 @@ def get_kobotoolbox_form_as_csv(form_credentials: dict, jobPath: str, config: di if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysnewODK.py b/coordinator/survey_servers/processor_surveys_new_odk.py similarity index 98% rename from coordinator/ProcessorSurveysnewODK.py rename to coordinator/survey_servers/processor_surveys_new_odk.py index 040b46b..8374bc7 100644 --- a/coordinator/ProcessorSurveysnewODK.py +++ b/coordinator/survey_servers/processor_surveys_new_odk.py @@ -11,8 +11,8 @@ import requests from shutil import copyfile from pandas import DataFrame -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -252,7 +252,7 @@ def get_newODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, s if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysnewODK2.py b/coordinator/survey_servers/processor_surveys_new_odk2.py similarity index 98% rename from coordinator/ProcessorSurveysnewODK2.py rename to coordinator/survey_servers/processor_surveys_new_odk2.py index c29b0c8..af46f68 100644 --- a/coordinator/ProcessorSurveysnewODK2.py +++ b/coordinator/survey_servers/processor_surveys_new_odk2.py @@ -10,14 +10,14 @@ from shutil import copyfile import requests -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorSurveysnewODK import ( +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.surveys.processor_surveys_new_odk import ( cases_incident, cases_severity, get_from_kobotoolbox, build_dataframe ) -from coordinator.ProcessorUtils import endJob +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -266,7 +266,7 @@ def get_newODK2_form_as_csv(form_credentials: dict, jobPath: str, config: dict, if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysODK.py b/coordinator/survey_servers/processor_surveys_odk.py similarity index 98% rename from coordinator/ProcessorSurveysODK.py rename to coordinator/survey_servers/processor_surveys_odk.py index 74dec59..34780f6 100644 --- a/coordinator/ProcessorSurveysODK.py +++ b/coordinator/survey_servers/processor_surveys_odk.py @@ -9,9 +9,9 @@ from pathlib import Path from shutil import copyfile from string import Template -from coordinator.ProcessorUtils import ( +from coordinator.utils.processor_utils import ( subprocess_and_log, - endJob, + end_job, ) @@ -135,7 +135,7 @@ def get_ODK_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat if not ODK_copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using ODK download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysODKSA.py b/coordinator/survey_servers/processor_surveys_odk_sa.py similarity index 98% rename from coordinator/ProcessorSurveysODKSA.py rename to coordinator/survey_servers/processor_surveys_odk_sa.py index e79b525..8038c68 100644 --- a/coordinator/ProcessorSurveysODKSA.py +++ b/coordinator/survey_servers/processor_surveys_odk_sa.py @@ -13,8 +13,8 @@ from string import Template from pandas import read_csv -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import subprocess_and_log, endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import subprocess_and_log, end_job logger = logging.getLogger(__name__) @@ -247,7 +247,7 @@ def get_ODK_SA_form_as_csv(form_credentials: dict, jobPath: str, config: dict, s if not ODK_copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using ODK download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysODKv2.py b/coordinator/survey_servers/processor_surveys_odk_v2.py similarity index 99% rename from coordinator/ProcessorSurveysODKv2.py rename to coordinator/survey_servers/processor_surveys_odk_v2.py index bb1c677..a6cd2e9 100644 --- a/coordinator/ProcessorSurveysODKv2.py +++ b/coordinator/survey_servers/processor_surveys_odk_v2.py @@ -14,8 +14,8 @@ from pathlib import Path from pandas import read_csv from shutil import copyfile -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -423,7 +423,7 @@ def get_ODKv2_form_as_csv(form_credentials: dict, jobPath: str, config: dict, st if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysWRSIS.py b/coordinator/survey_servers/processor_surveys_wrsis.py similarity index 98% rename from coordinator/ProcessorSurveysWRSIS.py rename to coordinator/survey_servers/processor_surveys_wrsis.py index 2266757..fd60063 100644 --- a/coordinator/ProcessorSurveysWRSIS.py +++ b/coordinator/survey_servers/processor_surveys_wrsis.py @@ -11,8 +11,8 @@ import requests from shutil import copyfile from pandas import json_normalize -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -356,7 +356,7 @@ def get_WRSIS_form_as_csv(form_credentials: dict, jobPath: str, config: dict, st if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/ProcessorSurveysWRT.py b/coordinator/survey_servers/processor_surveys_wrt.py similarity index 98% rename from coordinator/ProcessorSurveysWRT.py rename to coordinator/survey_servers/processor_surveys_wrt.py index 984de3b..dd45eb2 100644 --- a/coordinator/ProcessorSurveysWRT.py +++ b/coordinator/survey_servers/processor_surveys_wrt.py @@ -11,8 +11,8 @@ import requests from shutil import copyfile from pandas import json_normalize -from coordinator.ProcessorSurveyUtils import parse_columns -from coordinator.ProcessorUtils import endJob +from coordinator.surveys.processor_survey_utils import parse_columns +from coordinator.utils.processor_utils import end_job logger = logging.getLogger(__name__) @@ -336,7 +336,7 @@ def get_WRT_form_as_csv(form_credentials: dict, jobPath: str, config: dict, stat if not copy_success: logger.error(f"Failed get a suitable copy of survey data.") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) logger.warning(f"Using download from {past_jobPath}.") diff --git a/coordinator/utils/__init__.py b/coordinator/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/coordinator/BufferingSMTPHandler.py b/coordinator/utils/buffering_smtp_handler.py similarity index 100% rename from coordinator/BufferingSMTPHandler.py rename to coordinator/utils/buffering_smtp_handler.py diff --git a/coordinator/ProcessorUtils.py b/coordinator/utils/processor_utils.py similarity index 68% rename from coordinator/ProcessorUtils.py rename to coordinator/utils/processor_utils.py index da7e13a..202570f 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/utils/processor_utils.py @@ -19,6 +19,8 @@ import netCDF4 as nc import numpy as np import pandas as pd import cf_units +from iris import load +from iris.cube import CubeList from flagdir import jobStatus # created by jws52 @@ -104,7 +106,7 @@ def add_filters_to_sublogger(logger_case): h.addFilter(PasswordODKFilter()) -def open_and_check_config(configFile): +def open_and_check_config(configFile) -> dict: logger.info("Loading config") try: @@ -188,7 +190,7 @@ def append_item_to_list( else: logger.error(f"Misunderstood output of {description}()") status.reset('ERROR') - endJob(status,premature=True) + end_job(status, premature=True) return @@ -214,9 +216,7 @@ def clear_up( return -def endScript( - premature: bool = True, - ): +def end_script(premature: bool = True): if not premature: @@ -230,7 +230,7 @@ def endScript( logger.info(f'--------') sys.exit(exit_code) -def endJob(status,ignore_inprogress=False,**kwargs): +def end_job(status, ignore_inprogress=False, **kwargs): # get status file fresh, and ensure one exists status.status = status.get() @@ -243,7 +243,7 @@ def endJob(status,ignore_inprogress=False,**kwargs): status.reset('ERROR') - endScript(**kwargs) + end_script(**kwargs) def calc_epi_date_range(init_str, span_days = [0, 6]): '''Date range is determined relative to init_date. @@ -303,7 +303,7 @@ def query_proceed(necessary_file,description): logger.info(f"Failed to find:\n{necessary_file}\nso {description} job has not yet succeeded for this date, so cannot run this job.") - endScript(premature=True) + end_script(premature=True) return False @@ -321,179 +321,13 @@ def query_component_success(config_i,job_run: str, job_to_check: str): if 'AlternativeSuccessFileTemplate' not in config_i[job_run][job_to_check]: logger.warning(f"No AlternativeSuccessFileTemplate to check for") - endScript(premature=True) + end_script(premature=True) status_file_alt = Template(config_i[job_run][job_to_check]['AlternativeSuccessFileTemplate']).substitute(**config_i) query_proceed(status_file_alt,job_to_check.lower()) return True -def dataframe_to_series(df: pd.DataFrame) -> pd.Series: - """Reformate a pandas Dataframe into a pandas Series. - - The dataframe should have the following structure: - - first two columns: longitude and latitude - - following columns: timestamps in YYYYMMDDhhmm format (e.g. 202311211500) - - The resulting Series will have the following structure: - - indexes: time, longitude, latitude.""" - - # longitude and latitude as index - df=df.set_index(["longitude","latitude"]) - - # convert dataframe to Series by stacking - s=df.stack() - - # rename indexes - s.index.names=["longitude","latitude","time"] - - # reorder index - s=s.reorder_levels(["time","latitude","longitude"]) - - # sort by timestamps - s=s.sort_index(level=0) - logger.info("Dataframe reformated to Series") - - return s - -def create_bounds(coord): - """Create bounds for a coordinate based on the difference between two adjacent values. - Works for non equidistant coordinates as well.""" - - # calculate the difference between two adjacent values and divide by 2 - diff = np.diff(coord)/2 - - # insert the first and last value at the beginning and end of the array - diff_lower = np.insert(diff, 0, diff[0]) - diff_upper = np.append(diff, diff[-1]) - - # bounds are the coordinate values minus/plus the difference - bounds = np.zeros((len(coord), 2)) - bounds[:, 0] = coord - diff_lower - bounds[:, 1] = coord + diff_upper - - return bounds - -def check_cf(units: str) -> str: - """Check if units are CF convention compliant.""" - try: cf_units.Unit(units) - except: raise ValueError(f"Units \"{units}\" not CF convention compliant!") - return units - -def create_nc_file(s: pd.Series, filename: str, data_attr: dict, compression_type="zlib", complevel=9, least_significant_digit=None): - """Create a netcdf file from a pandas Series. - - The Series should have the following indexes: - - time: str - - latitude: float - - longitude: float - - Path to the netcdf file should be filename: str - - Dictionary data_attr should have the following structure: - { "short_name": str, - "long_name": str - "units": str, - "description": str} - - Optional arguments: - - compression_type: str, default="zlib" - - least_significant_digit: int, default=5 - """ - - ## Get coordinates ######################### - times=s.index.levels[0].values - # convert times to hours since 1970-01-01 00:00:00 - times=pd.to_datetime(times).astype(np.int64)//((10**9)*60*60) - - latitudes=s.index.levels[1].unique().values - longitudes=s.index.levels[2].unique().values - - ## Create bounds ########################### - lat_bnds=create_bounds(latitudes) - lon_bnds=create_bounds(longitudes) - - ## Create netcdf file ###################### - logger.info("Creating netcdf file") - # just to be safe, make sure dataset is not already open. - try: nc.Dataset(filename,"w",format="NETCDF4").close() - except: pass - - ncFile=nc.Dataset(filename,"w",format="NETCDF4") - - # create dimensions - ncFile.createDimension("time", None) - ncFile.createDimension("bnds",2) - ncFile.createDimension("latitude",len(latitudes)) - ncFile.createDimension("longitude",len(longitudes)) - - # create global attributes - ncFile.description=data_attr["description"] - ncFile.convension="CF-1.10" - - # create variables - time=ncFile.createVariable("time",np.int32,("time",)) - time.axis="T" - time.units="hours since 1970-01-01 00:00:00" - time.long_name="time" - time.calendar="gregorian" - - latitude_bnds=ncFile.createVariable("latitude_bnds",np.float32,("latitude","bnds",)) - latitude_bnds.long_name="latitude boundaries" - latitude_bnds.units="degrees_north" - - latitude=ncFile.createVariable("latitude",np.float32,("latitude",)) - latitude.axis="Y" - latitude.bounds="latitude_bnds" - latitude.units="degrees_north" - latitude.long_name="latitude" - latitude.standard_name="grid_latitude" - latitude.valid_min=latitudes.min().astype(np.float32) - latitude.valid_max=latitudes.max().astype(np.float32) - - longitude_bnds=ncFile.createVariable("longitude_bnds",np.float32,("longitude","bnds",)) - longitude_bnds.long_name="longitude boundaries" - longitude_bnds.units="degrees_east" - - longitude=ncFile.createVariable("longitude",np.float32,("longitude",)) - longitude.axis="X" - longitude.bounds="longitude_bnds" - longitude.units="degrees_east" - longitude.long_name="longitude" - longitude.standard_name="grid_longitude" - longitude.valid_min=longitudes.min().astype(np.float32) - longitude.valid_max=longitudes.max().astype(np.float32) - - latitude_longitude=ncFile.createVariable("latitude_longitude",np.int32) - latitude_longitude.grid_mapping_name="latitude_longitude" - latitude_longitude.longitude_of_prime_meridian=0.0 - latitude_longitude.earth_radius=6371229.0 - - - # create DATA variables - logger.info("Creating data variable") - DATA=ncFile.createVariable(data_attr["short_name"],np.float32,("time","latitude","longitude",), compression=compression_type, complevel=complevel, least_significant_digit=least_significant_digit) - DATA.short_name=data_attr["short_name"] - DATA.long_name=data_attr["long_name"] - DATA.units=check_cf(data_attr["units"]) - DATA.description=data_attr["description"] - DATA.grid_mapping="latitude_longitude" - DATA.coordinates="time latitude longitude" - - # write data to variables - time[:]=times - latitude_bnds[:]=lat_bnds - latitude[:]=latitudes - longitude_bnds[:]=lon_bnds - longitude[:]=longitudes - - # map data to DATA variable - DATA[:]=s.values.reshape(len(times),len(latitudes),len(longitudes)) - - ncFile.close() - logger.info(f"{filename} netCDF4 file created.") - return True - def setup_logging(job_file_path: str, is_live: bool, log_level: str): """ @@ -522,7 +356,8 @@ def setup_logging(job_file_path: str, is_live: bool, log_level: str): # if there is no value set for the LOGGING_CONFIG file, set a default path. if not 'LOGGING_CONFIG' in os.environ: - log_config_path_project = os.path.join(os.path.dirname(__file__), "../configs", "logger", "template_log_config.json") + log_config_path_project = os.path.join(os.path.dirname(__file__), + "../../../../../../media/scratch/lb584_scratch/projects/ews_local_prod/regions/EastAfrica/resources", "configs", "logger", "template_log_config.json") print(f"ENVIRONMENT VARIABLE 'LOGGING_CONFIG' IS NOT SET, UISING DEFAULT FILE - {log_config_path_project}") else: log_config_path_project = os.environ['LOGGING_CONFIG'] @@ -544,7 +379,8 @@ def setup_logging(job_file_path: str, is_live: bool, log_level: str): # if there is no value set for the central project logger in the json file, set a default path. if not log_config_dict['handlers']['handler_project']['filename']: # todo how does this work when there are several processors running at once, i.o. errors? - log_path_project = os.path.join(os.path.dirname(__file__), "logs", "log.txt") + log_path_project = os.path.join(os.path.dirname(__file__), + "../../../../../../media/scratch/lb584_scratch/projects/ews_local_prod/regions/EastAfrica/resources", "configs", "logger", "log.txt") print(f"'HANDLER_PROJECT' HAS NO 'FILENAME' SET TO SETTING TO {log_path_project}") log_config_dict['handlers']['handler_project']['filename'] = log_path_project @@ -569,3 +405,72 @@ def setup_logging(job_file_path: str, is_live: bool, log_level: str): logging.config.dictConfig(log_config_dict) logger.info("Logging setup complete") + + +def get_input_data(job_path, config, component): + + """ + copies input files to job dir, using file name templates defined in the config file + :param job_path: + :param config: + :param component: + :return: + """ + + logger.info('Copying file from remote server to job directory') + + # Initialise output variable + proc_out = {} + # Output files available for upload + proc_out['output'] = None + # Processing files available for clearing + proc_out['clearup'] = [] + + file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) + file_name = Template(config[component]['InputFileTemplate']).substitute(**config) + + #TODO: check if file exists already (may be the case for multiple configs in one) + + # TODO: perform ssh file transfer in python instead of subprocess + server_name: str = config['ServerName'] + if server_name == "": + cmd_scp: list = ["scp", f"{file_path}/{file_name}.tar.gz", job_path] + else: + cmd_scp: list = ["scp", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", + f"{server_name}:{file_path}/{file_name}.tar.gz", job_path] + + description_short = 'env2 scp' + description_long = 'Copying file from remote server to job directory' + subprocess_and_log(cmd_scp,description_short, description_long) + + logger.info('untarring the input file') + + # untar incoming name data + output_directory = f"{job_path}/NAME_Met_as_netcdf" + Path(output_directory).mkdir(parents=True, exist_ok=True) + tarfile_name = f"{job_path}/{file_name}.tar.gz" + with tarfile.open(tarfile_name) as tar: + members = remove_path_from_tar_members(tar) + tar.extractall(output_directory, members = members) + + # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) + cube_wildcard = f"{output_directory}/*.nc" + cubes: CubeList = load(cube_wildcard) + + # land_fraction and topography will only have a single timepoint (as these dont change over time), so we can ignore + # these when sense-checking the expected number of timepoints + ignore_list = ["LAND_FRACTION", "TOPOGRAPHY"] + + for cube in cubes: + var_name = cube.name() + coord = cube.coord("time") + timepoint_count = coord.shape[0] + if timepoint_count != 57 and var_name not in ignore_list: + msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" + logger.error(msg) + raise RuntimeError(msg) + + # tar file can be removed if required + proc_out['clearup'] += [f"{job_path}/{file_name}.tar.gz", ] + + return proc_out diff --git a/tests/integration/full/full_test_advisory.py b/tests/integration/full/full_test_advisory.py index c7809d8..2337747 100644 --- a/tests/integration/full/full_test_advisory.py +++ b/tests/integration/full/full_test_advisory.py @@ -2,10 +2,10 @@ import copy import os import sys -from coordinator.ProcessorAdvisory import ProcessorAdvisory -from coordinator.ProcessorDeposition import ProcessorDeposition -from coordinator.ProcessorEnvironment import ProcessorEnvironment -from coordinator.ProcessorSurveys import ProcessorSurveys +from coordinator.processor_advisory import ProcessorAdvisory +from coordinator.processor_deposition import ProcessorDeposition +from coordinator.processor_environment import ProcessorEnvironment +from coordinator.processor_surveys import ProcessorSurveys from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.advisory_test_suite import BaseAdvisoryTestSuite diff --git a/tests/integration/full/full_test_deposition.py b/tests/integration/full/full_test_deposition.py index 0bb5a31..c07e3af 100644 --- a/tests/integration/full/full_test_deposition.py +++ b/tests/integration/full/full_test_deposition.py @@ -2,7 +2,7 @@ import copy import os import sys -from coordinator.ProcessorDeposition import ProcessorDeposition +from coordinator.processor_deposition import ProcessorDeposition from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite diff --git a/tests/integration/full/full_test_env_suit.py b/tests/integration/full/full_test_env_suit.py index 229168f..8d1989f 100644 --- a/tests/integration/full/full_test_env_suit.py +++ b/tests/integration/full/full_test_env_suit.py @@ -2,7 +2,7 @@ import copy import os import sys -from coordinator.ProcessorEnvironment import ProcessorEnvironment +from coordinator.processor_environment import ProcessorEnvironment from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.env_suit_test_suite import BaseEnvSuitTestSuite diff --git a/tests/integration/full/full_test_epi.py b/tests/integration/full/full_test_epi.py index 17569d1..8027cde 100644 --- a/tests/integration/full/full_test_epi.py +++ b/tests/integration/full/full_test_epi.py @@ -2,9 +2,9 @@ import copy import os import sys -from coordinator.ProcessorDeposition import ProcessorDeposition -from coordinator.ProcessorEnvironment import ProcessorEnvironment -from coordinator.ProcessorEpidemiology import ProcessorEpidemiology +from coordinator.processor_deposition import ProcessorDeposition +from coordinator.processor_environment import ProcessorEnvironment +from coordinator.processor_epidemiology import ProcessorEpidemiology from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.epi_test_suite import BaseEpiTestSuite diff --git a/tests/integration/full/full_test_survey.py b/tests/integration/full/full_test_survey.py index 2c1b3cc..cff7c65 100644 --- a/tests/integration/full/full_test_survey.py +++ b/tests/integration/full/full_test_survey.py @@ -2,7 +2,7 @@ import copy import os import sys -from coordinator.ProcessorSurveys import ProcessorSurveys +from coordinator.processor_surveys import ProcessorSurveys from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.survey_test_suite import BaseSurveyTestSuite diff --git a/tests/integration/partial/test_advisory.py b/tests/integration/partial/test_advisory.py index a5df57a..d0b01ce 100644 --- a/tests/integration/partial/test_advisory.py +++ b/tests/integration/partial/test_advisory.py @@ -2,7 +2,7 @@ import copy import os import unittest -from coordinator.ProcessorAdvisory import ProcessorAdvisory +from coordinator.processor_advisory import ProcessorAdvisory from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.advisory_test_suite import BaseAdvisoryTestSuite diff --git a/tests/integration/partial/test_deposition.py b/tests/integration/partial/test_deposition.py index 66a6bda..28cd5d4 100644 --- a/tests/integration/partial/test_deposition.py +++ b/tests/integration/partial/test_deposition.py @@ -2,7 +2,7 @@ import copy import os import unittest -from coordinator.ProcessorDeposition import ProcessorDeposition +from coordinator.processor_deposition import ProcessorDeposition from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite diff --git a/tests/integration/partial/test_env_suit.py b/tests/integration/partial/test_env_suit.py index 3bb44c6..3ba4286 100644 --- a/tests/integration/partial/test_env_suit.py +++ b/tests/integration/partial/test_env_suit.py @@ -2,7 +2,7 @@ import copy import os import unittest -from coordinator.ProcessorEnvironment import ProcessorEnvironment +from coordinator.processor_environment import ProcessorEnvironment from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.env_suit_test_suite import BaseEnvSuitTestSuite diff --git a/tests/integration/partial/test_epi.py b/tests/integration/partial/test_epi.py index 8f46c15..d2ec40a 100644 --- a/tests/integration/partial/test_epi.py +++ b/tests/integration/partial/test_epi.py @@ -2,7 +2,7 @@ import copy import os import unittest -from coordinator.ProcessorEpidemiology import ProcessorEpidemiology +from coordinator.processor_epidemiology import ProcessorEpidemiology from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.epi_test_suite import BaseEpiTestSuite diff --git a/tests/integration/partial/test_survey.py b/tests/integration/partial/test_survey.py index ab45b08..2a48e16 100644 --- a/tests/integration/partial/test_survey.py +++ b/tests/integration/partial/test_survey.py @@ -2,7 +2,7 @@ import copy import os import unittest -from coordinator.ProcessorSurveys import ProcessorSurveys +from coordinator.processor_surveys import ProcessorSurveys from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.survey_test_suite import BaseSurveyTestSuite -- GitLab