FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit eb0eba10 authored by J.W. Smith's avatar J.W. Smith
Browse files

Updating Env Suit v2.0 calculation. Copied from operational EnvProcessor.py in...

Updating Env Suit v2.0 calculation. Copied from operational EnvProcessor.py in master branch. Not tested for this commit.
parent 3602f345
No related branches found
No related tags found
No related merge requests found
import sys
import os
import shutil
import json
import pandas as pd
import logging
import subprocess
import numpy as np
import datetime as dt
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('EnvProcessor.pipeline')
###############################################################################
def loadConfig(configFile):
logger.info(f"Loading config file: {configFile}")
try:
with open(configFile) as config_file:
config = json.load(config_file)
except:
logger.exception(f"Failure opening config file {configFile}")
raise
return config
def getParameter(config, parameter):
logger.info(f"Getting {parameter} from the config")
try:
result = config[parameter]
except:
logger.exception(f"No {parameter} in the config file!")
raise
return result
def generate_temporal_points(file, datestr, timeresolution, nDaysForecast):
datenow = dt.datetime(int(datestr[:4]), int(datestr[4:6]), int(datestr[6:8]), 0, 0)
starttime = datenow + dt.timedelta(hours=timeresolution)
endtime = datenow + dt.timedelta(days=nDaysForecast) + dt.timedelta(hours=timeresolution)
timediff = dt.timedelta(hours=timeresolution)
timeseries = []
curr = starttime
outfile = open(file, "w")
# Generate the time series excluding the end time.
while curr < endtime:
timeseries.append(curr)
outDateString = "{}\n".format(curr.strftime("%Y%m%d%H%M"))
outfile.write(outDateString)
curr += timediff
outfile.close()
return outfile
def run_multi_processor(multiprocPath,command,check=True):
try:
multiproc = multiprocPath + 'run_multi_processor.sh'
process = subprocess.run([multiproc,command],check=check)
except:
logger.exception(f"Some failure when running multi processor on {command}")
raise
return process
def pipeline_subprocess(binPath, workPath, command, multi=True):
os.chdir(workPath)
logger.info(f"Change work directory to {workPath}")
try:
if (multi == True):
logger.info(f"Run {command} in multi process mode.")
multi_process = run_multi_processor(binPath, command)
else:
logger.info(f"Run {command} in single process mode.")
pass # TODO add not multi proc mode
except:
logger.exception(f"Some failure when running {command}", exc_info=True)
raise
return multi_process
#######################################
def run_pipeline(pipeline_config, region, dateString, extracted = False):
# Get parameters from the config
templatePath = getParameter(pipeline_config,'TEMPLATE_PATH')
workPath = getParameter(pipeline_config,'WORK_PATH')
inPath = getParameter(pipeline_config,'INPUT_PATH')
outPath = getParameter(pipeline_config,'OUTPUT_PATH')
runType = getParameter(pipeline_config,'RUN_TYPE')
nDayExtraction = getParameter(pipeline_config,'EXTRACTION_DAYS')
nDayForecast = getParameter(pipeline_config,'FORECAST_DAYS')
templateName = 'template_' + runType + '_config.json'
template_configFile = templatePath + templateName
config = loadConfig(template_configFile)
run_configName = 'run_config.json'
run_configFile = workPath + run_configName
# Get spatial points file for the region
region_spatial_points_file = templatePath + 'input_spatial_points_' + region + '.csv'
input_spatial_points_file = workPath + 'input_spatial_points.csv'
shutil.copy(region_spatial_points_file,input_spatial_points_file)
spatial_points = pd.read_csv(input_spatial_points_file)
spatial_dim = spatial_points.shape[0]
# Generate input (extraction) and output temporal points files
timeresolution = 3 # hours
input_temporal_points_file = workPath + 'input_temporal_points.csv'
try:
logger.info(f"Generate input temporal points to: {input_temporal_points_file}")
generate_temporal_points(input_temporal_points_file, dateString, timeresolution, nDayExtraction)
except:
logger.exception(f"Some failure when generate {input_temporal_points_file}", exc_info=True)
output_temporal_points_file = workPath + 'output_temporal_points.csv'
try:
logger.info(f"Generate output temporal points to: {output_temporal_points_file}")
generate_temporal_points(output_temporal_points_file, dateString, timeresolution, nDayForecast)
except:
logger.exception(f"Some failure when generate {output_temporal_points_file}", exc_info=True)
temporal_points = pd.read_csv(output_temporal_points_file)
temporal_dim = temporal_points.shape[0]
# Modify run_config
config['POST_PROCESSING']['PROCESSORS'][0]['TIMEPOINTS_FILE_PATH'] = output_temporal_points_file
config['TIMEPOINTS_FILE_PATH'] = input_temporal_points_file
config['OUTPUT_DIR'] = workPath
config['SPATIAL_POINTS_FILE_PATH'] = input_spatial_points_file
config['FIELD_NAME_CONSTANTS_PATH'] = workPath + 'field_name_constants.csv'
config['NCDF_DIR_PATH'] = inPath + 'ENVIRONMENT_2.0_' + dateString + '/NAME_Met_as_netcdf/'
strains = getParameter(pipeline_config, 'STRAINS')
## START RUNS ####################
os.chdir(workPath)
if (extracted == False):
clean = workPath + 'clean.sh'
try:
subprocess.run(clean, check=True)
logger.info('Work directory cleaned')
except:
logger.exception(f"Some failure when running {clean}", exc_info=True)
raise
for strain in strains:
# Modify strain specific suitability parts of the config
if pipeline_config['PARAMS'][strain]['future_steps'] > 0:
for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])):
config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['DURATION'] = pipeline_config['PARAMS'][strain]['future_steps']
else:
for i in range(len(config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'])):
config['POST_PROCESSING']['PROCESSORS'][0]['FUTURE_FIELDS'][i]['ENABLED'] = "FALSE"
config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['suitability_modules'] = pipeline_config['PARAMS'][strain]['suitability_modules']
config['POST_PROCESSING']['PROCESSORS'][0]['PARAMS']['thresholds'] = pipeline_config['PARAMS'][strain]['thresholds']
# Write run_json
os.chdir(workPath)
with open(run_configFile, 'w') as run_config:
json.dump(config, run_config, indent=4)
run_config.close()
# Run generate_all.sh
generate_all = workPath + 'generate_all.sh'
try:
subprocess.run(generate_all, check=True)
except:
logger.exception(f"Some failure when running {generate_all}", exc_info=True)
raise
if (extracted == False):
extractionPath = workPath + 'extraction/'
pipeline_subprocess(workPath, extractionPath, 'run_regridding.sh')
chunksPath = workPath + 'chunks/'
pipeline_subprocess(workPath, chunksPath, 'run_rechunk.sh')
extracted = True
logger.info('Data extracted and chunked')
logger.info(f"Starting {strain} suitability ---------------------------------")
envSuitPath = workPath + 'post_processing/RIE/'
pipeline_subprocess(workPath, envSuitPath, 'run_post_processing.sh')
merge = envSuitPath + 'run_merge_results.sh'
try:
subprocess.run(merge, check=True)
except:
logger.exception(f"Some failure when running {merge}", exc_info=True)
raise
resultFile = envSuitPath + 'RIE.csv'
strain_outPath = os.path.join(outPath,'ENVIRONMENT_2.0_'+dateString,'processed',region,strain)
strain_outFile = strain_outPath + '/RIE_value.csv'
# Check results dimension
result = pd.read_csv(resultFile)
result_dims = result.shape
if ((result_dims[0] != spatial_dim) or (result_dims[1] != (temporal_dim + 6))): # + 6 required because there are extra columns in the result file
logger.error(f"Result dimension {result_dims} is not match with the expectation ({spatial_dim}, {temporal_dim + 6})")
raise IndexError
if not os.path.exists(strain_outPath):
os.makedirs(strain_outPath)
shutil.copy(resultFile,strain_outFile)
logger.info(f"{strain} result successfully created and moved to {strain_outPath}/")
logger.info('SUCCESSFULLY FINISHED')
......@@ -12,8 +12,10 @@ from glob import glob
from shutil import copyfile
from pathlib import Path
from string import Template
from distutils.dir_util import copy_tree
import NAMEPreProcessor as npp # script in EWS-Coordinator project
import EnvSuitPipeline as esp # script in EWS-Coordinator project
from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, add_filters_to_sublogger
......@@ -55,7 +57,11 @@ def process_pre_job_server_download(input_args):
timenow = datetime.datetime.now(tz=datetime.timezone.utc).time()
status = subprocess.run(["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"])
cmd_ssh = ["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"]
description_short = 'subprocess_ssh'
description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"
status = subprocess_and_log(cmd_ssh,description_short,description_long,check=False)
if status.returncode == 1:
......@@ -96,6 +102,30 @@ def process_pre_job_server_download(input_args):
return True
# TODO: Test this out
def process_copy_past_job_env2_0(jobPath,config):
'''For when we want to skip process_in_job() to test the other components of
this script. Currently hard-wired.'''
# TODO: remove this hard-wired assumption
jobPath_to_copy = f"{jobPath}/../{startstring}_bak/"
assert os.path.exists(jobPath_to_copy)
dir_src = f"{jobPath_to_copy}/processed/"
dir_dst = f"{jobPath}/processed/"
logger.info(f"Copying from {dir_src}")
logger.info(f"to {dir_dst}")
copy_tree(dir_src,dir_dst)
logger.info('Copying complete')
return
#TODO
def process_pre_job_epi(input_args):
'''Returns a boolean as to whether the job is ready for full processing.'''
......@@ -363,7 +393,7 @@ def process_in_job_survey(jobPath,status,config,component):
return [output_path]
#TODO
#TODO test if this works
def process_in_job_env2_0(jobPath,status,config,component):
logger.info('started process_in_job_env2_0()')
......@@ -372,24 +402,25 @@ def process_in_job_env2_0(jobPath,status,config,component):
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
#TODO: check if file exists already (may be the case for multiple configs in one)
# TODO: perform ssh file transfer in python instead of subprocess
cmd_scp = ["scp","-i",config['ServerKey'],f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath]
description_short = 'env2 scp'
description_long = 'Copying file from remote server to job directory'
subprocess_and_log(cmd_scp,description_short, description_long)
#subprocess.run(cmd_scp)
logger.info('untarring the input file')
# TODO: untar file in python (with tarfile module) instead of subprocess
cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath]
description_short = 'env2 tar'
description_long = 'Untarring the input file'
subprocess_and_log(cmd_tar,description_short, description_long)
#subprocess.run(cmd_tar)
# basic check that contents are as expected
# basic check that contents are as expected for 7-day forecast
# 57 files of NAME .txt timesteps and one summary png file
if len(os.listdir(f"{jobPath}/{file_name}")) != 58:
msg = f"Insufficient contents of untarred file in directory {jobPath}/{file_name}"
......@@ -415,33 +446,15 @@ def process_in_job_env2_0(jobPath,status,config,component):
for region in regions:
logger.info(f"Calling environmental suitability 2.0 for {region} so wait for output to appear")
cmd_envsuit = ["/storage/app/EWS/General/EWS-met_extractor/operational.sh",config["StartString"],config["RegionName"],region]
description_short = 'env2.0'
description_long = f"pipeline of environmental suitability v2.0 for {region}"
subprocess_and_log(cmd_envsuit,description_short,description_long)
pipeline_config = config["Environment"]
try:
esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False)
except:
logger.exception(f"Some failure when converting with runnig EnvSuitPipeline.py")
raise
logger.info('Finished running environmental suitability 2.0')
#try:
# #load env. op. scrip
# env_op_process = subprocess.run(
# cmd_env2,
# check=True,
# stdout = subprocess.PIPE,
# stderr = subprocess.STDOUT)
# for line in env_op_process.stdout.decode('utf-8').split(r'\n'):
# logger.info('env2.0 : ' + line)
#except subprocess.CalledProcessError as e:
# for line in e.stdout.decode('utf-8').split(r'\n'):
# logger.info('env 2.0: ' + line)
# logger.error('Some failure when running env.suit. pipeline.', exc_info=True)
# raise
# TODO: Check that the output appears as expected
return
......@@ -610,7 +623,9 @@ def process_EWS_plotting_survey(jobPath,config):
#TODO test if this works
def process_EWS_plotting_env2_0(jobPath,config):
'''Returns a list of output files for transfer.'''
'''Calls EWS-plotting by configuring the plotting arguments and passing
them to the underlying script.
Returns a list of output files for transfer.'''
logger.info('started process_EWS_plotting_env2_0()')
......@@ -622,7 +637,7 @@ def process_EWS_plotting_env2_0(jobPath,config):
'R_LIBS' : '/home/ewsmanager/R-packages-EWS-plotting/plotting_rlibs/',
}
input_dir = f"{config['WorkspacePath']}ENVIRONMENT_{config['StartString']}/processed/"
input_dir = f"{jobPath}/processed/"
output_dir = f"{jobPath}/plotting/"
......@@ -641,7 +656,11 @@ def process_EWS_plotting_env2_0(jobPath,config):
# Note that this runs all disease types available
# TODO: Add logic to determine which diseases to plot
# logic to determine which diseases to plot
diseases_processed = os.listdir(f"{input_dir}/{region}/")
plot_stem = 'StemRust' in diseases_processed
plot_stripe = 'StripeRust' in diseases_processed
plot_leaf = 'StripeRust' in diseases_processed
# prepare command
plot_command = [
......@@ -656,9 +675,9 @@ def process_EWS_plotting_env2_0(jobPath,config):
'--run_config_file',run_config,
'--plot_daily_images','True',
'--plot_hourly_images','False',
'--plot_stem','True',
'--plot_stripe','True',
'--plot_leaf','False']
'--plot_stem',str(plot_stem),
'--plot_stripe',str(plot_stripe),
'--plot_leaf',str(plot_leaf)]
logger.info(f"Running EWS-Plotting command:\n'{' '.join(plot_command)}'")
......@@ -668,12 +687,15 @@ def process_EWS_plotting_env2_0(jobPath,config):
# check the output
EWSPlottingOutputDir = f"{output_dir}/{region.lower()}/images/"
EWSPlottingOutputGlobs += [
# daily plots
f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png",
# weekly plots
f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"]
#EWSPlottingOutputGlobs += [
# # daily plots
# f"{EWSPlottingOutputDir}Daily/suitability_{region.lower()}_*_rust_daily_20*.png",
# # weekly plots
# f"{EWSPlottingOutputDir}Weekly/suitability_{region.lower()}_*_rust_total_20*.png"]
EWSPlottingOutputGlobs += [f"{EWSPlottingOutputDir}*"]
# check the output
EWSPlottingOutputGlobs = get_only_existing_globs(EWSPlottingOutputGlobs,inplace=False)
# check there is some output from EWS-plotting
......
......@@ -22,6 +22,46 @@
"ProcessPreJob" : "process_pre_job_server_download",
"ProcessInJob" : "process_in_job_env2_0",
"ProcessEWSPlotting" : "process_EWS_plotting_env2_0",
"TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/",
"WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/",
"INPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/",
"OUTPUT_PATH": "/storage/app/EWS/Ethiopia/Workspace/",
"RUN_TYPE": "operational",
"EXTRACTION_DAYS": 7,
"FORECAST_DAYS": 6,
"STRAINS": ["LeafRust","StemRust", "StripeRust"],
"PARAMS": {
"LeafRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,20,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StemRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,24,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StripeRust": {
"suitability_modules": ["v1_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2.37,19.8],
"precipitation": 0,
"relative_humidity": 90
}
}
},
"EWS-Plotting" : {
"Ethiopia" : {
"RunConfig" : "/storage/app/EWS/General/EWS-Plotting/python/data/json_config/RUN_CONFIG_ETH_PINE.json",
......
{
"DiseaseName" : "Stripe",
"DiseaseName" : "Leaf",
"RegionName" : "SouthAsia",
"SubRegionNames" : ["Bangladesh","Nepal"],
"StartTime" : "?",
......@@ -22,6 +22,46 @@
"ProcessPreJob" : "process_pre_job_server_download",
"ProcessInJob" : "process_in_job_env2_0",
"ProcessEWSPlotting" : "process_EWS_plotting_env2_0",
"TEMPLATE_PATH": "/storage/app/EWS/General/EWS-met_extractor/configs/",
"WORK_PATH": "/storage/app/EWS/General/EWS-met_extractor/",
"INPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/",
"OUTPUT_PATH": "/storage/app/EWS/SouthAsia/Workspace/",
"RUN_TYPE": "operational",
"EXTRACTION_DAYS": 7,
"FORECAST_DAYS": 6,
"STRAINS": ["LeafRust","StemRust", "StripeRust"],
"PARAMS": {
"LeafRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,20,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StemRust": {
"suitability_modules": ["semibool_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2,15,24,30],
"precipitation": 0,
"relative_humidity": 90
}
},
"StripeRust": {
"suitability_modules": ["v1_dewperiod"],
"past_steps": 0,
"future_steps": 7,
"thresholds": {
"temperature": [2.37,19.8],
"precipitation": 0,
"relative_humidity": 90
}
}
},
"EWS-Plotting" : {
"Bangladesh" : {
"RunConfig" : "/storage/app/EWS/General/EWS-Plotting-Dev/python/data/json_config/RUN_CONFIG_BANG_PINE.json",
......@@ -55,9 +95,65 @@
"ProcessPreJob" : "process_pre_job_epi",
"ProcessInJob" : "process_in_job_epi",
"ProcessEWSPlotting" : "process_EWS_plotting_epi",
"Deposition" : {
"VariableName" : "P_RECONDITA_DEPOSITION",
"PathTemplate" : "${WorkspacePath}DEPOSITION_${StartString}/WR_NAME_SouthAsia_${StartString}/",
"SuccessFileTemplate" : "${WorkspacePath}DEPOSITION_${StartString}/DEPOSITION_SUCCESS.txt",
"FileNameTemplate" : "deposition_srcs_allregions_C1_T${tIndexPlusOneString}_${iTimeString}.txt",
"FileNamePrepared" : "?"
},
"Environment" : {
"PathTemplate" : "${WorkspacePath}ENVIRONMENT_2.0_${StartString}/processed/${SubRegionName}/${DiseaseName}Rust/",
"SuccessFileTemplate" : "${WorkspacePath}ENVIRONMENT_${StartString}/ENVIRONMENT_SUCCESS.txt",
"FileNameTemplate" : "RIE_value.csv",
"FileNamePrepared" : "?"
},
"Epi" : [
{
"model" : "Env",
"modelArguments" : {},
"infectionRasterFileName" : "?",
"description": "env. suitability",
"analysis" : {
"vmin" : 0.0e+0,
"vmax" : 1.5e+1,
"subplot_position" : [0,0],
"cmapString" : "CMRmap_r",
"bounds" : [87.9, 92.7, 20.5, 26.7],
"UTMprojection" : 45
}
},{
"model" : "log10Dep",
"modelArguments" : {},
"infectionRasterFileName" : "?",
"description": "log(spore deposition)",
"analysis" : {
"vmin" : 0.0e+0,
"vmax" : 3.0e+2,
"subplot_position" : [0,1],
"cmapString" : "CMRmap_r",
"bounds" : [87.9, 92.7, 20.5, 26.7],
"UTMprojection" : 45
}
},{
"model" : "log10DepEnv",
"modelArguments" : {},
"infectionRasterFileName" : "?",
"description": "EPI: env.suit $\\times$ log(dep.)",
"analysis" : {
"vmin" : 0.0e+0,
"vmax" : 1.5e+1,
"subplot_position" : [0,2],
"cmapString" : "CMRmap_r",
"bounds" : [87.9, 92.7, 20.5, 26.7],
"UTMprojection" : 45
}
}
],
"EWS-Plotting" : {
"RunConfig" : "?",
"PythonScript" : "?"
"RunConfig" : "/storage/app/EWS/General/EWS-Plotting/python/data/json_config/RUN_CONFIG_BANG_PINE.json",
"PythonScript" : "/storage/app/EWS/General/EWS-Plotting/python/bangladesh/ews_plotting_epi_bang_main.py",
"EpiCase" : "log10DepEnv"
}
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment