diff --git a/EnvSuitPipeline.py b/EnvSuitPipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..595484baf9d9959e29e9d57edc8dcbbb762a3c8c --- /dev/null +++ b/EnvSuitPipeline.py @@ -0,0 +1,232 @@ +import sys +import os +import shutil +import json +import pandas as pd +import logging +import subprocess +import numpy as np +import datetime as dt + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger('EnvProcessor.pipeline') + +############################################################################### + +def loadConfig(configFile): + logger.info(f"Loading config file: {configFile}") + + try: + with open(configFile) as config_file: + config = json.load(config_file) + except: + logger.exception(f"Failure opening config file {configFile}") + raise + + return config + + +def getParameter(config, parameter): + logger.info(f"Getting {parameter} from the config") + + try: + result = config[parameter] + except: + logger.exception(f"No {parameter} in the config file!") + raise + + return result + + +def generate_temporal_points(file, datestr, timeresolution, nDaysForecast): + datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0) + + starttime = datenow + dt.timedelta(hours=timeresolution) + endtime = datenow + dt.timedelta(days=nDaysForecast) + dt.timedelta(hours=timeresolution) + timediff = dt.timedelta(hours=timeresolution) + + timeseries = [] + curr = starttime + + outfile = open(file, "w") + # Generate the time series excluding the end time. + while curr < endtime: + timeseries.append(curr) + outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M")) + outfile.write(outDateString) + curr += timediff + + outfile.close() + return outfile + + +def run_multi_processor(multiprocPath,command,check=True): + try: + multiproc = multiprocPath + 'run_multi_processor.sh' + process = subprocess.run([multiproc,command],check=check) + except: + logger.exception(f"Some failure when running multi processor on {command}") + raise + + return process + + +def pipeline_subprocess(binPath, workPath, command, multi=True): + os.chdir(workPath) + logger.info(f"Change work directory to {workPath}") + + try: + if (multi == True): + logger.info(f"Run {command} in multi process mode.") + multi_process = run_multi_processor(binPath, command) + else: + logger.info(f"Run {command} in single process mode.") + pass # TODO add not multi proc mode + except: + logger.exception(f"Some failure when running {command}", exc_info=True) + raise + + return multi_process + + +####################################### + +def run_pipeline(pipeline_config, region, dateString, extracted = False): + # Get parameters from the config + templatePath = getParameter(pipeline_config,'TEMPLATE_PATH') + workPath = getParameter(pipeline_config,'WORK_PATH') + inPath = getParameter(pipeline_config,'INPUT_PATH') + outPath = getParameter(pipeline_config,'OUTPUT_PATH') + + runType = getParameter(pipeline_config,'RUN_TYPE') + nDayExtraction = getParameter(pipeline_config,'EXTRACTION_DAYS') + nDayForecast = getParameter(pipeline_config,'FORECAST_DAYS') + + templateName = 'template_' + runType + '_config.json' + template_configFile = templatePath + templateName + config = loadConfig(template_configFile) + + run_configName = 'run_config.json' + run_configFile = workPath + run_configName + + # Get spatial points file for the region + region_spatial_points_file = templatePath + 'input_spatial_points_' + region + '.csv' + input_spatial_points_file = workPath + 'input_spatial_points.csv' + shutil.copy(region_spatial_points_file,input_spatial_points_file) + spatial_points = pd.read_csv(input_spatial_points_file) + spatial_dim = spatial_points.shape[0] + + # Generate input (extraction) and output temporal points files + timeresolution = 3 # hours + + input_temporal_points_file = workPath + 'input_temporal_points.csv' + try: + logger.info(f"Generate input temporal points to: {input_temporal_points_file}") + generate_temporal_points(input_temporal_points_file, dateString, timeresolution, nDayExtraction) + except: + logger.exception(f"Some failure when generate {input_temporal_points_file}", exc_info=True) + + output_temporal_points_file = workPath + 'output_temporal_points.csv' + try: + logger.info(f"Generate output temporal points to: {output_temporal_points_file}") + generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast) + except: + logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info=True) + temporal_points = pd.read_csv(output_temporal_points_file) + temporal_dim = temporal_points.shape[0] + + # Modify run_config + config['POST_PROCESSING']['PROCESSORS'][0]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file + config['TIMEPOINTS_FILE_PATH'] = input_temporal_points_file + config['OUTPUT_DIR'] = workPath + config['SPATIAL_POINTS_FILE_PATH'] = input_spatial_points_file + + config['FIELD_NAME_CONSTANTS_PATH'] = workPath + 'field_name_constants.csv' + config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/' + + strains = getParameter(pipeline_config, 'STRAINS') + +## START RUNS #################### + os.chdir(workPath) + + if (extracted == False): + clean = workPath + 'clean.sh' + try: + subprocess.run(clean, check=True) + logger.info('Work directory cleaned') + except: + logger.exception(f"Some failure when running {clean}", exc_info=True) + raise + + + for strain in strains: + # Modify strain specific suitability parts of the config + if pipeline_config['PARAMS'][strain]['future_steps'] > 0: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['DURATION'] = pipeline_config['PARAMS'][strain]['future_steps'] + else: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE" + + config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['suitability_modules'] = pipeline_config['PARAMS'][strain]['suitability_modules'] + config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['thresholds'] = pipeline_config['PARAMS'][strain]['thresholds'] + + # Write run_json + os.chdir(workPath) + + with open(run_configFile, 'w') as run_config: + json.dump(config, run_config, indent=4) + + run_config.close() + + # Run generate_all.sh + generate_all = workPath + 'generate_all.sh' + try: + subprocess.run(generate_all, check=True) + except: + logger.exception(f"Some failure when running {generate_all}", exc_info=True) + raise + + + if (extracted == False): + extractionPath = workPath + 'extraction/' + pipeline_subprocess(workPath, extractionPath, 'run_regridding.sh') + + chunksPath = workPath + 'chunks/' + pipeline_subprocess(workPath, chunksPath, 'run_rechunk.sh') + + extracted = True + logger.info('Data extracted and chunked') + + logger.info(f"Starting {strain} suitability ---------------------------------") + envSuitPath = workPath + 'post_processing/RIE/' + pipeline_subprocess(workPath, envSuitPath, 'run_post_processing.sh') + + merge = envSuitPath + 'run_merge_results.sh' + try: + subprocess.run(merge, check=True) + except: + logger.exception(f"Some failure when running {merge}", exc_info=True) + raise + + resultFile = envSuitPath + 'RIE.csv' + strain_outPath = os.path.join(outPath,'ENVIRONMENT_2.0_'+dateString,'processed',region,strain) + strain_outFile = strain_outPath + '/RIE_value.csv' + + # Check results dimension + result = pd.read_csv(resultFile) + result_dims = result.shape + + if ((result_dims[0] != spatial_dim) or (result_dims[1] != (temporal_dim + 6))): # + 6 required because there are extra columns in the result file + logger.error(f"Result dimension {result_dims} is not match with the expectation ({spatial_dim}, {temporal_dim + 6})") + raise IndexError + + if not os.path.exists(strain_outPath): + os.makedirs(strain_outPath) + + shutil.copy(resultFile,strain_outFile) + logger.info(f"{strain} result successfully created and moved to {strain_outPath}/") + + logger.info('SUCCESSFULLY FINISHED') + + diff --git a/ProcessorComponents.py b/ProcessorComponents.py index 723f54e324dca88a07cae5a1a74bd839170c31c4..a2135e6a7723fedbde86d735e12b70b526b3bf63 100644 --- a/ProcessorComponents.py +++ b/ProcessorComponents.py @@ -12,8 +12,10 @@ from glob import glob from shutil import copyfile from pathlib import Path from string import Template +from distutils.dir_util import copy_tree import NAMEPreProcessor as npp # script in EWS-Coordinator project +import EnvSuitPipeline as esp # script in EWS-Coordinator project from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, add_filters_to_sublogger @@ -55,7 +57,11 @@ def process_pre_job_server_download(input_args): timenow = datetime.datetime.now(tz=datetime.timezone.utc).time() - status = subprocess.run(["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"]) + cmd_ssh = ["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"] + description_short = 'subprocess_ssh' + description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz" + + status = subprocess_and_log(cmd_ssh,description_short,description_long,check=False) if status.returncode == 1: @@ -96,6 +102,30 @@ def process_pre_job_server_download(input_args): return True +# TODO: Test this out +def process_copy_past_job_env2_0(jobPath,config): + '''For when we want to skip process_in_job() to test the other components of + this script. Currently hard-wired.''' + + # TODO: remove this hard-wired assumption + jobPath_to_copy = f"{jobPath}/../{startstring}_bak/" + + assert os.path.exists(jobPath_to_copy) + + dir_src = f"{jobPath_to_copy}/processed/" + + dir_dst = f"{jobPath}/processed/" + + logger.info(f"Copying from {dir_src}") + + logger.info(f"to {dir_dst}") + + copy_tree(dir_src,dir_dst) + + logger.info('Copying complete') + + return + #TODO def process_pre_job_epi(input_args): '''Returns a boolean as to whether the job is ready for full processing.''' @@ -363,7 +393,7 @@ def process_in_job_survey(jobPath,status,config,component): return [output_path] -#TODO +#TODO test if this works def process_in_job_env2_0(jobPath,status,config,component): logger.info('started process_in_job_env2_0()') @@ -372,24 +402,25 @@ def process_in_job_env2_0(jobPath,status,config,component): file_path = Template(config[component]['ServerPathTemplate']).substitute(**config) file_name = Template(config[component]['InputFileTemplate']).substitute(**config) + #TODO: check if file exists already (may be the case for multiple configs in one) + # TODO: perform ssh file transfer in python instead of subprocess cmd_scp = ["scp","-i",config['ServerKey'],f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath] description_short = 'env2 scp' description_long = 'Copying file from remote server to job directory' + subprocess_and_log(cmd_scp,description_short, description_long) - #subprocess.run(cmd_scp) - logger.info('untarring the input file') # TODO: untar file in python (with tarfile module) instead of subprocess cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] description_short = 'env2 tar' description_long = 'Untarring the input file' + subprocess_and_log(cmd_tar,description_short, description_long) - #subprocess.run(cmd_tar) - # basic check that contents are as expected + # basic check that contents are as expected for 7-day forecast # 57 files of NAME .txt timesteps and one summary png file if len(os.listdir(f"{jobPath}/{file_name}")) != 58: msg = f"Insufficient contents of untarred file in directory {jobPath}/{file_name}" @@ -415,33 +446,15 @@ def process_in_job_env2_0(jobPath,status,config,component): for region in regions: logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") - cmd_envsuit = ["/storage/app/EWS/General/EWS-met_extractor/operational.sh",config["StartString"],config["RegionName"],region] - description_short = 'env2.0' - description_long = f"pipeline of environmental suitability v2.0 for {region}" - - subprocess_and_log(cmd_envsuit,description_short,description_long) + pipeline_config = config["Environment"] + try: + esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) + except: + logger.exception(f"Some failure when converting with runnig EnvSuitPipeline.py") + raise logger.info('Finished running environmental suitability 2.0') - #try: - # #load env. op. scrip - # env_op_process = subprocess.run( - # cmd_env2, - # check=True, - # stdout = subprocess.PIPE, - # stderr = subprocess.STDOUT) - - # for line in env_op_process.stdout.decode('utf-8').split(r'\n'): - # logger.info('env2.0 : ' + line) - - #except subprocess.CalledProcessError as e: - - # for line in e.stdout.decode('utf-8').split(r'\n'): - # logger.info('env 2.0: ' + line) - - # logger.error('Some failure when running env.suit. pipeline.', exc_info=True) - # raise - # TODO: Check that the output appears as expected return @@ -610,7 +623,9 @@ def process_EWS_plotting_survey(jobPath,config): #TODO test if this works def process_EWS_plotting_env2_0(jobPath,config): - '''Returns a list of output files for transfer.''' + '''Calls EWS-plotting by configuring the plotting arguments and passing + them to the underlying script. + Returns a list of output files for transfer.''' logger.info('started process_EWS_plotting_env2_0()') @@ -622,7 +637,7 @@ def process_EWS_plotting_env2_0(jobPath,config): 'R_LIBS' : '/home/ewsmanager/R-packages-EWS-plotting/plotting_rlibs/', } - input_dir = f"{config['WorkspacePath']}ENVIRONMENT_{config['StartString']}/processed/" + input_dir = f"{jobPath}/processed/" output_dir = f"{jobPath}/plotting/" @@ -641,7 +656,11 @@ def process_EWS_plotting_env2_0(jobPath,config): # Note that this runs all disease types available - # TODO: Add logic to determine which diseases to plot + # logic to determine which diseases to plot + diseases_processed = os.listdir(f"{input_dir}/{region}/") + plot_stem = 'StemRust' in diseases_processed + plot_stripe = 'StripeRust' in diseases_processed + plot_leaf = 'StripeRust' in diseases_processed # prepare command plot_command = [ @@ -656,9 +675,9 @@ def process_EWS_plotting_env2_0(jobPath,config): '--run_config_file',run_config, '--plot_daily_images','True', '--plot_hourly_images','False', - '--plot_stem','True', - '--plot_stripe','True', - '--plot_leaf','False'] + '--plot_stem',str(plot_stem), + '--plot_stripe',str(plot_stripe), + '--plot_leaf',str(plot_leaf)] logger.info(f"Running EWS-Plotting command:\n'{' '.join(plot_command)}'") @@ -668,12 +687,15 @@ def process_EWS_plotting_env2_0(jobPath,config): # check the output EWSPlottingOutputDir = f"{output_dir}/{region.lower()}/images/" - EWSPlottingOutputGlobs += [ - # daily plots - f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", - # weekly plots - f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] + #EWSPlottingOutputGlobs += [ + # # daily plots + # f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png", + # # weekly plots + # f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"] + + EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"] + # check the output EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False) # check there is some output from EWS-plotting diff --git a/config_Ethiopia_template.json b/config_Ethiopia.json similarity index 77% rename from config_Ethiopia_template.json rename to config_Ethiopia.json index 7eb1d5b5731c6357977054d14d8aed1b632961e3..48e4697834445b0ff2297ef66e6c0070023d1362 100644 --- a/config_Ethiopia_template.json +++ b/config_Ethiopia.json @@ -22,6 +22,46 @@ "ProcessPreJob" : "process_pre_job_server_download", "ProcessInJob" : "process_in_job_env2_0", "ProcessEWSPlotting" : "process_EWS_plotting_env2_0", + "TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/", + "WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/", + "INPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/", + "OUTPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/", + "RUN_TYPE": "operational", + "EXTRACTION_DAYS": 7, + "FORECAST_DAYS": 6, + "STRAINS": ["LeafRust","StemRust", "StripeRust"], + "PARAMS": { + "LeafRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,20,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StemRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,24,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StripeRust": { + "suitability_modules": ["v1_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2.37,19.8], + "precipitation": 0, + "relative_humidity": 90 + } + } + }, "EWS-Plotting" : { "Ethiopia" : { "RunConfig" : "/storage/app/EWS/General/EWS-Plotting/python/data/json_config/RUN_CONFIG_ETH_PINE.json", diff --git a/config_SouthAsia.json b/config_SouthAsia.json index 2340f102d1e3849d474424dd7c8c7bd203c18050..e2eb3c770fc047084d272ee55fb3e2f50653560d 100644 --- a/config_SouthAsia.json +++ b/config_SouthAsia.json @@ -1,5 +1,5 @@ { - "DiseaseName" : "Stripe", + "DiseaseName" : "Leaf", "RegionName" : "SouthAsia", "SubRegionNames" : ["Bangladesh","Nepal"], "StartTime" : "?", @@ -22,6 +22,46 @@ "ProcessPreJob" : "process_pre_job_server_download", "ProcessInJob" : "process_in_job_env2_0", "ProcessEWSPlotting" : "process_EWS_plotting_env2_0", + "TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/", + "WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/", + "INPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/", + "OUTPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/", + "RUN_TYPE": "operational", + "EXTRACTION_DAYS": 7, + "FORECAST_DAYS": 6, + "STRAINS": ["LeafRust","StemRust", "StripeRust"], + "PARAMS": { + "LeafRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,20,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StemRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,24,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StripeRust": { + "suitability_modules": ["v1_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2.37,19.8], + "precipitation": 0, + "relative_humidity": 90 + } + } + }, "EWS-Plotting" : { "Bangladesh" : { "RunConfig" : "/storage/app/EWS/General/EWS-Plotting-Dev/python/data/json_config/RUN_CONFIG_BANG_PINE.json", @@ -55,9 +95,65 @@ "ProcessPreJob" : "process_pre_job_epi", "ProcessInJob" : "process_in_job_epi", "ProcessEWSPlotting" : "process_EWS_plotting_epi", + "Deposition" : { + "VariableName" : "P_RECONDITA_DEPOSITION", + "PathTemplate" : "${WorkspacePath}DEPOSITION_${StartString}/WR_NAME_SouthAsia_${StartString}/", + "SuccessFileTemplate" : "${WorkspacePath}DEPOSITION_${StartString}/DEPOSITION_SUCCESS.txt", + "FileNameTemplate" : "deposition_srcs_allregions_C1_T${tIndexPlusOneString}_${iTimeString}.txt", + "FileNamePrepared" : "?" + }, + "Environment" : { + "PathTemplate" : "${WorkspacePath}ENVIRONMENT_2.0_${StartString}/processed/${SubRegionName}/${DiseaseName}Rust/", + "SuccessFileTemplate" : "${WorkspacePath}ENVIRONMENT_${StartString}/ENVIRONMENT_SUCCESS.txt", + "FileNameTemplate" : "RIE_value.csv", + "FileNamePrepared" : "?" + }, + "Epi" : [ + { + "model" : "Env", + "modelArguments" : {}, + "infectionRasterFileName" : "?", + "description": "env. suitability", + "analysis" : { + "vmin" : 0.0e+0, + "vmax" : 1.5e+1, + "subplot_position" : [0,0], + "cmapString" : "CMRmap_r", + "bounds" : [87.9, 92.7, 20.5, 26.7], + "UTMprojection" : 45 + } + },{ + "model" : "log10Dep", + "modelArguments" : {}, + "infectionRasterFileName" : "?", + "description": "log(spore deposition)", + "analysis" : { + "vmin" : 0.0e+0, + "vmax" : 3.0e+2, + "subplot_position" : [0,1], + "cmapString" : "CMRmap_r", + "bounds" : [87.9, 92.7, 20.5, 26.7], + "UTMprojection" : 45 + } + },{ + "model" : "log10DepEnv", + "modelArguments" : {}, + "infectionRasterFileName" : "?", + "description": "EPI: env.suit $\\times$ log(dep.)", + "analysis" : { + "vmin" : 0.0e+0, + "vmax" : 1.5e+1, + "subplot_position" : [0,2], + "cmapString" : "CMRmap_r", + "bounds" : [87.9, 92.7, 20.5, 26.7], + "UTMprojection" : 45 + } + } + ], "EWS-Plotting" : { - "RunConfig" : "?", - "PythonScript" : "?" + "RunConfig" : "/storage/app/EWS/General/EWS-Plotting/python/data/json_config/RUN_CONFIG_BANG_PINE.json", + "PythonScript" : "/storage/app/EWS/General/EWS-Plotting/python/bangladesh/ews_plotting_epi_bang_main.py", + "EpiCase" : "log10DepEnv" } } } \ No newline at end of file