FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 4c0fd6c9 authored by Dr T. Mona's avatar Dr T. Mona
Browse files

Add EnvSuitPipeline, to handle the met extraction pipeline with a mini-coordinator script.

parent 350b7f5a
No related branches found
No related tags found
No related merge requests found
......@@ -23,7 +23,7 @@ my $debugTimeString = getTimestampNow();
my $debugOutput = 1;
my $debugNoFTPDownload = 0;
my $debugNoUpload = 0;
my $debugNoUpload = 1;
my $debugNoPlot = 0;
# read in project configuration from input argument
......
......@@ -37,6 +37,7 @@ import rasterio as rio
from flagdir import jobStatus # gitlab project created by jws52
import NAMEPreProcessor as npp # script in EWS-Coordinator project
import EnvSuitPipeline as esp # script in EWS-Coordinator project
print('Preparing for operational!')
print("make sure to `conda activate py3EWSepi` environment!")
......@@ -455,14 +456,14 @@ def process_in_job(jobPath,config):
for region in regions:
logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear")
cmd_envsuit = ["/storage/app/EWS/General/EWS-met_extractor/operational.sh",config["StartString"],config["RegionName"],region]
description_short = 'env2.0'
description_long = f"pipeline of environmental suitability v2.0 for {region}"
subprocess_and_log(cmd_envsuit,description_short,description_long)
pipeline_config = config["Environment"]
try:
esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False)
except:
logger.exception(f"Some failure when converting with runnig EnvSuitPipeline.py")
raise
logger.info('Finished running environmental suitability 2.0')
# TODO: Check that the output appears as expected
return
......
import sys
import os
import shutil
import json
import pandas as pd
import logging
import subprocess
import numpy as np
import datetime as dt
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('EnvProcessor.pipeline')
###############################################################################
def loadConfig(configFile):
logger.info(f"Loading config file: {configFile}")
try:
with open(configFile) as config_file:
config = json.load(config_file)
except:
logger.exception(f"Failure opening config file {configFile}")
raise
return config
def getParameter(config, parameter):
logger.info(f"Getting {parameter} from the config")
try:
result = config[parameter]
except:
logger.exception(f"No {parameter} in the config file!")
raise
return result
def generate_temporal_points(file, datestr, timeresolution, nDaysForecast):
datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0)
starttime = datenow + dt.timedelta(hours=timeresolution)
endtime = datenow + dt.timedelta(days=nDaysForecast) + dt.timedelta(hours=timeresolution)
timediff = dt.timedelta(hours=timeresolution)
timeseries = []
curr = starttime
outfile = open(file, "w")
# Generate the time series excluding the end time.
while curr < endtime:
timeseries.append(curr)
outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M"))
outfile.write(outDateString)
curr += timediff
outfile.close()
return outfile
def run_multi_processor(multiprocPath,command,check=True):
try:
multiproc = multiprocPath + 'run_multi_processor.sh'
process = subprocess.run([multiproc,command],check=check)
except:
logger.exception(f"Some failure when running multi processor on {command}")
raise
return process
def pipeline_subprocess(binPath, workPath, command, multi=True):
os.chdir(workPath)
logger.info(f"Change work directory to {workPath}")
try:
if (multi == True):
logger.info(f"Run {command} in multi process mode.")
multi_process = run_multi_processor(binPath, command)
else:
logger.info(f"Run {command} in single process mode.")
pass # TODO add not multi proc mode
except:
logger.exception(f"Some failure when running {command}", exc_info=True)
raise
return multi_process
#######################################
def run_pipeline(pipeline_config, region, dateString, extracted = False):
# Get parameters from the config
templatePath = getParameter(pipeline_config,'TEMPLATE_PATH')
workPath = getParameter(pipeline_config,'WORK_PATH')
inPath = getParameter(pipeline_config,'INPUT_PATH')
outPath = getParameter(pipeline_config,'OUTPUT_PATH')
runType = getParameter(pipeline_config,'RUN_TYPE')
nDayExtraction = getParameter(pipeline_config,'EXTRACTION_DAYS')
nDayForecast = getParameter(pipeline_config,'FORECAST_DAYS')
templateName = 'template_' + runType + '_config.json'
template_configFile = templatePath + templateName
config = loadConfig(template_configFile)
run_configName = 'run_config.json'
run_configFile = workPath + run_configName
# Get spatial points file for the region
region_spatial_points_file = templatePath + 'input_spatial_points_' + region + '.csv'
input_spatial_points_file = workPath + 'input_spatial_points.csv'
shutil.copy(region_spatial_points_file,input_spatial_points_file)
spatial_points = pd.read_csv(input_spatial_points_file)
spatial_dim = spatial_points.shape[0]
# Generate input (extraction) and output temporal points files
timeresolution = 3 # hours
input_temporal_points_file = workPath + 'input_temporal_points.csv'
try:
logger.info(f"Generate input temporal points to: {input_temporal_points_file}")
generate_temporal_points(input_temporal_points_file, dateString, timeresolution, nDayExtraction)
except:
logger.exception(f"Some failure when generate {input_temporal_points_file}", exc_info=True)
output_temporal_points_file = workPath + 'output_temporal_points.csv'
try:
logger.info(f"Generate output temporal points to: {output_temporal_points_file}")
generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast)
except:
logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info=True)
temporal_points = pd.read_csv(output_temporal_points_file)
temporal_dim = temporal_points.shape[0]
# Modify run_config
config['POST_PROCESSING']['PROCESSORS'][0]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file
config['TIMEPOINTS_FILE_PATH'] = input_temporal_points_file
config['OUTPUT_DIR'] = workPath
config['SPATIAL_POINTS_FILE_PATH'] = input_spatial_points_file
config['FIELD_NAME_CONSTANTS_PATH'] = workPath + 'field_name_constants.csv'
config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/'
strains = getParameter(pipeline_config, 'STRAINS')
## START RUNS ####################
os.chdir(workPath)
if (extracted == False):
clean = workPath + 'clean.sh'
try:
subprocess.run(clean, check=True)
logger.info('Work directory cleaned')
except:
logger.exception(f"Some failure when running {clean}", exc_info=True)
raise
for strain in strains:
# Modify strain specific suitability parts of the config
if pipeline_config['PARAMS'][strain]['future_steps'] > 0:
for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])):
config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['DURATION'] = pipeline_config['PARAMS'][strain]['future_steps']
else:
for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])):
config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE"
config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['suitability_modules'] = pipeline_config['PARAMS'][strain]['suitability_modules']
config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['thresholds'] = pipeline_config['PARAMS'][strain]['thresholds']
# Write run_json
os.chdir(workPath)
with open(run_configFile, 'w') as run_config:
json.dump(config, run_config, indent=4)
run_config.close()
# Run generate_all.sh
generate_all = workPath + 'generate_all.sh'
try:
subprocess.run(generate_all, check=True)
except:
logger.exception(f"Some failure when running {generate_all}", exc_info=True)
raise
if (extracted == False):
extractionPath = workPath + 'extraction/'
pipeline_subprocess(workPath, extractionPath, 'run_regridding.sh')
chunksPath = workPath + 'chunks/'
pipeline_subprocess(workPath, chunksPath, 'run_rechunk.sh')
extracted = True
logger.info('Data extracted and chunked')
logger.info(f"Starting {strain} suitability ---------------------------------")
envSuitPath = workPath + 'post_processing/RIE/'
pipeline_subprocess(workPath, envSuitPath, 'run_post_processing.sh')
merge = envSuitPath + 'run_merge_results.sh'
try:
subprocess.run(merge, check=True)
except:
logger.exception(f"Some failure when running {merge}", exc_info=True)
raise
resultFile = envSuitPath + 'RIE.csv'
strain_outPath = os.path.join(outPath,'ENVIRONMENT_2.0_'+dateString,'processed',region,strain)
strain_outFile = strain_outPath + '/RIE_value.csv'
# Check results dimension
result = pd.read_csv(resultFile)
result_dims = result.shape
if ((result_dims[0] != spatial_dim) or (result_dims[1] != (temporal_dim + 6))): # + 6 required because there are extra columns in the result file
logger.error(f"Result dimension {result_dims} is not match with the expectation ({spatial_dim}, {temporal_dim + 6})")
raise IndexError
if not os.path.exists(strain_outPath):
os.makedirs(strain_outPath)
shutil.copy(resultFile,strain_outFile)
logger.info(f"{strain} result successfully created and moved to {strain_outPath}/")
logger.info('SUCCESSFULLY FINISHED')
......@@ -11,6 +11,47 @@
"ServerKey" : "/storage/app/EWS/General/EWS-Coordinator/ssh_key_willow",
"Environment" : {
"ServerPathTemplate" : "/storage/sftp/metofficeupload/upload/Ethiopia/fromMO/daily_name/",
"InputFileTemplate" : "WR_EnvSuit_Met_Ethiopia_${StartString}",
"TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/",
"WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/",
"INPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/",
"OUTPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/",
"RUN_TYPE": "operational",
"EXTRACTION_DAYS": 7,
"FORECAST_DAYS": 6,
"STRAINS": ["LeafRust","StemRust", "StripeRust"],
"PARAMS": {
"LeafRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,20,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StemRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,24,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StripeRust": {
"suitability_modules": ["v1_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2.37,19.8],
"precipitation": 0,
"relative_humidity": 90
}
}
},
"EWS-Plotting" : {
"ShellScripts" : [
"/storage/app/EWS/General/EWS-Plotting-Dev/python/ethiopia/run_eth_env_plotting_pine.sh"
......
......@@ -11,6 +11,47 @@
"ServerKey" : "/storage/app/EWS/General/EWS-Coordinator/ssh_key_willow",
"Environment" : {
"ServerPathTemplate" : "/storage/sftp/metofficeupload/upload/SouthAsia/fromMO/daily_name/",
"InputFileTemplate" : "WR_EnvSuit_Met_SouthAsia_${StartString}",
"TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/",
"WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/",
"INPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/",
"OUTPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/",
"RUN_TYPE": "operational",
"EXTRACTION_DAYS": 7,
"FORECAST_DAYS": 6,
"STRAINS": ["LeafRust","StemRust", "StripeRust"],
"PARAMS": {
"LeafRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,20,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StemRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,24,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StripeRust": {
"suitability_modules": ["v1_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2.37,19.8],
"precipitation": 0,
"relative_humidity": 90
}
}
},
"EWS-Plotting" : {
"ShellScripts" : [
"/storage/app/EWS/General/EWS-Plotting-Dev/python/bangladesh/run_bang_env_plotting_pine.sh",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment