diff --git a/ENVDataProcessor.pl b/ENVDataProcessor.pl index 1de0227edae55cb9cf98bede309a87d2ae8501b9..497d52d8efbe9e3a4535d43d5f113ec2e0b62e7e 100755 --- a/ENVDataProcessor.pl +++ b/ENVDataProcessor.pl @@ -23,7 +23,7 @@ my $debugTimeString = getTimestampNow(); my $debugOutput = 1; my $debugNoFTPDownload = 0; -my $debugNoUpload = 0; +my $debugNoUpload = 1; my $debugNoPlot = 0; # read in project configuration from input argument diff --git a/EnvProcessor.py b/EnvProcessor.py index a19c959f4689fdec236a3a86d6b2e15e980803d3..6edda84c804674ee3856f1a88eef7f53e7f85cc1 100644 --- a/EnvProcessor.py +++ b/EnvProcessor.py @@ -37,6 +37,7 @@ import rasterio as rio from flagdir import jobStatus # gitlab project created by jws52 import NAMEPreProcessor as npp # script in EWS-Coordinator project +import EnvSuitPipeline as esp # script in EWS-Coordinator project print('Preparing for operational!') print("make sure to `conda activate py3EWSepi` environment!") @@ -455,14 +456,14 @@ def process_in_job(jobPath,config): for region in regions: logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear") - cmd_envsuit = ["/storage/app/EWS/General/EWS-met_extractor/operational.sh",config["StartString"],config["RegionName"],region] - description_short = 'env2.0' - description_long = f"pipeline of environmental suitability v2.0 for {region}" - - subprocess_and_log(cmd_envsuit,description_short,description_long) + pipeline_config = config["Environment"] + try: + esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) + except: + logger.exception(f"Some failure when converting with runnig EnvSuitPipeline.py") + raise logger.info('Finished running environmental suitability 2.0') - # TODO: Check that the output appears as expected return diff --git a/EnvSuitPipeline.py b/EnvSuitPipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..595484baf9d9959e29e9d57edc8dcbbb762a3c8c --- /dev/null +++ b/EnvSuitPipeline.py @@ -0,0 +1,232 @@ +import sys +import os +import shutil +import json +import pandas as pd +import logging +import subprocess +import numpy as np +import datetime as dt + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger('EnvProcessor.pipeline') + +############################################################################### + +def loadConfig(configFile): + logger.info(f"Loading config file: {configFile}") + + try: + with open(configFile) as config_file: + config = json.load(config_file) + except: + logger.exception(f"Failure opening config file {configFile}") + raise + + return config + + +def getParameter(config, parameter): + logger.info(f"Getting {parameter} from the config") + + try: + result = config[parameter] + except: + logger.exception(f"No {parameter} in the config file!") + raise + + return result + + +def generate_temporal_points(file, datestr, timeresolution, nDaysForecast): + datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0) + + starttime = datenow + dt.timedelta(hours=timeresolution) + endtime = datenow + dt.timedelta(days=nDaysForecast) + dt.timedelta(hours=timeresolution) + timediff = dt.timedelta(hours=timeresolution) + + timeseries = [] + curr = starttime + + outfile = open(file, "w") + # Generate the time series excluding the end time. + while curr < endtime: + timeseries.append(curr) + outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M")) + outfile.write(outDateString) + curr += timediff + + outfile.close() + return outfile + + +def run_multi_processor(multiprocPath,command,check=True): + try: + multiproc = multiprocPath + 'run_multi_processor.sh' + process = subprocess.run([multiproc,command],check=check) + except: + logger.exception(f"Some failure when running multi processor on {command}") + raise + + return process + + +def pipeline_subprocess(binPath, workPath, command, multi=True): + os.chdir(workPath) + logger.info(f"Change work directory to {workPath}") + + try: + if (multi == True): + logger.info(f"Run {command} in multi process mode.") + multi_process = run_multi_processor(binPath, command) + else: + logger.info(f"Run {command} in single process mode.") + pass # TODO add not multi proc mode + except: + logger.exception(f"Some failure when running {command}", exc_info=True) + raise + + return multi_process + + +####################################### + +def run_pipeline(pipeline_config, region, dateString, extracted = False): + # Get parameters from the config + templatePath = getParameter(pipeline_config,'TEMPLATE_PATH') + workPath = getParameter(pipeline_config,'WORK_PATH') + inPath = getParameter(pipeline_config,'INPUT_PATH') + outPath = getParameter(pipeline_config,'OUTPUT_PATH') + + runType = getParameter(pipeline_config,'RUN_TYPE') + nDayExtraction = getParameter(pipeline_config,'EXTRACTION_DAYS') + nDayForecast = getParameter(pipeline_config,'FORECAST_DAYS') + + templateName = 'template_' + runType + '_config.json' + template_configFile = templatePath + templateName + config = loadConfig(template_configFile) + + run_configName = 'run_config.json' + run_configFile = workPath + run_configName + + # Get spatial points file for the region + region_spatial_points_file = templatePath + 'input_spatial_points_' + region + '.csv' + input_spatial_points_file = workPath + 'input_spatial_points.csv' + shutil.copy(region_spatial_points_file,input_spatial_points_file) + spatial_points = pd.read_csv(input_spatial_points_file) + spatial_dim = spatial_points.shape[0] + + # Generate input (extraction) and output temporal points files + timeresolution = 3 # hours + + input_temporal_points_file = workPath + 'input_temporal_points.csv' + try: + logger.info(f"Generate input temporal points to: {input_temporal_points_file}") + generate_temporal_points(input_temporal_points_file, dateString, timeresolution, nDayExtraction) + except: + logger.exception(f"Some failure when generate {input_temporal_points_file}", exc_info=True) + + output_temporal_points_file = workPath + 'output_temporal_points.csv' + try: + logger.info(f"Generate output temporal points to: {output_temporal_points_file}") + generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast) + except: + logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info=True) + temporal_points = pd.read_csv(output_temporal_points_file) + temporal_dim = temporal_points.shape[0] + + # Modify run_config + config['POST_PROCESSING']['PROCESSORS'][0]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file + config['TIMEPOINTS_FILE_PATH'] = input_temporal_points_file + config['OUTPUT_DIR'] = workPath + config['SPATIAL_POINTS_FILE_PATH'] = input_spatial_points_file + + config['FIELD_NAME_CONSTANTS_PATH'] = workPath + 'field_name_constants.csv' + config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/' + + strains = getParameter(pipeline_config, 'STRAINS') + +## START RUNS #################### + os.chdir(workPath) + + if (extracted == False): + clean = workPath + 'clean.sh' + try: + subprocess.run(clean, check=True) + logger.info('Work directory cleaned') + except: + logger.exception(f"Some failure when running {clean}", exc_info=True) + raise + + + for strain in strains: + # Modify strain specific suitability parts of the config + if pipeline_config['PARAMS'][strain]['future_steps'] > 0: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['DURATION'] = pipeline_config['PARAMS'][strain]['future_steps'] + else: + for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])): + config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE" + + config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['suitability_modules'] = pipeline_config['PARAMS'][strain]['suitability_modules'] + config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['thresholds'] = pipeline_config['PARAMS'][strain]['thresholds'] + + # Write run_json + os.chdir(workPath) + + with open(run_configFile, 'w') as run_config: + json.dump(config, run_config, indent=4) + + run_config.close() + + # Run generate_all.sh + generate_all = workPath + 'generate_all.sh' + try: + subprocess.run(generate_all, check=True) + except: + logger.exception(f"Some failure when running {generate_all}", exc_info=True) + raise + + + if (extracted == False): + extractionPath = workPath + 'extraction/' + pipeline_subprocess(workPath, extractionPath, 'run_regridding.sh') + + chunksPath = workPath + 'chunks/' + pipeline_subprocess(workPath, chunksPath, 'run_rechunk.sh') + + extracted = True + logger.info('Data extracted and chunked') + + logger.info(f"Starting {strain} suitability ---------------------------------") + envSuitPath = workPath + 'post_processing/RIE/' + pipeline_subprocess(workPath, envSuitPath, 'run_post_processing.sh') + + merge = envSuitPath + 'run_merge_results.sh' + try: + subprocess.run(merge, check=True) + except: + logger.exception(f"Some failure when running {merge}", exc_info=True) + raise + + resultFile = envSuitPath + 'RIE.csv' + strain_outPath = os.path.join(outPath,'ENVIRONMENT_2.0_'+dateString,'processed',region,strain) + strain_outFile = strain_outPath + '/RIE_value.csv' + + # Check results dimension + result = pd.read_csv(resultFile) + result_dims = result.shape + + if ((result_dims[0] != spatial_dim) or (result_dims[1] != (temporal_dim + 6))): # + 6 required because there are extra columns in the result file + logger.error(f"Result dimension {result_dims} is not match with the expectation ({spatial_dim}, {temporal_dim + 6})") + raise IndexError + + if not os.path.exists(strain_outPath): + os.makedirs(strain_outPath) + + shutil.copy(resultFile,strain_outFile) + logger.info(f"{strain} result successfully created and moved to {strain_outPath}/") + + logger.info('SUCCESSFULLY FINISHED') + + diff --git a/config_ENV_Ethiopia_template.json b/config_ENV_Ethiopia_template.json index f85ec75ddfe5ac3b19603e65f83d5c5b1023ee8b..ecdc4c1c5641b47a409ada3c83b4ec9a42a34d4f 100644 --- a/config_ENV_Ethiopia_template.json +++ b/config_ENV_Ethiopia_template.json @@ -11,6 +11,47 @@ "ServerKey" : "/storage/app/EWS/General/EWS-Coordinator/ssh_key_willow", "Environment" : { "ServerPathTemplate" : "/storage/sftp/metofficeupload/upload/Ethiopia/fromMO/daily_name/", + "InputFileTemplate" : "WR_EnvSuit_Met_Ethiopia_${StartString}", + "TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/", + "WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/", + "INPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/", + "OUTPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/", + "RUN_TYPE": "operational", + "EXTRACTION_DAYS": 7, + "FORECAST_DAYS": 6, + "STRAINS": ["LeafRust","StemRust", "StripeRust"], + "PARAMS": { + "LeafRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,20,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StemRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,24,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StripeRust": { + "suitability_modules": ["v1_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2.37,19.8], + "precipitation": 0, + "relative_humidity": 90 + } + } + }, "EWS-Plotting" : { "ShellScripts" : [ "/storage/app/EWS/General/EWS-Plotting-Dev/python/ethiopia/run_eth_env_plotting_pine.sh" diff --git a/config_ENV_SouthAsia_template.json b/config_ENV_SouthAsia_template.json index 6ff9058ee0fdfd7ecf3f8f9fe3979fb72a4350a6..e8072eab705fd404a55a479f5c784b070f76a121 100644 --- a/config_ENV_SouthAsia_template.json +++ b/config_ENV_SouthAsia_template.json @@ -11,6 +11,47 @@ "ServerKey" : "/storage/app/EWS/General/EWS-Coordinator/ssh_key_willow", "Environment" : { "ServerPathTemplate" : "/storage/sftp/metofficeupload/upload/SouthAsia/fromMO/daily_name/", + "InputFileTemplate" : "WR_EnvSuit_Met_SouthAsia_${StartString}", + "TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/", + "WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/", + "INPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/", + "OUTPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/", + "RUN_TYPE": "operational", + "EXTRACTION_DAYS": 7, + "FORECAST_DAYS": 6, + "STRAINS": ["LeafRust","StemRust", "StripeRust"], + "PARAMS": { + "LeafRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,20,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StemRust": { + "suitability_modules": ["semibool_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2,15,24,30], + "precipitation": 0, + "relative_humidity": 90 + } + }, + "StripeRust": { + "suitability_modules": ["v1_dewperiod"], + "past_steps": 0, + "future_steps": 7, + "thresholds": { + "temperature": [2.37,19.8], + "precipitation": 0, + "relative_humidity": 90 + } + } + }, "EWS-Plotting" : { "ShellScripts" : [ "/storage/app/EWS/General/EWS-Plotting-Dev/python/bangladesh/run_bang_env_plotting_pine.sh",