FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 3bad0558 authored by J.W. Smith's avatar J.W. Smith
Browse files

env suit v2.0 running. Also more reliable calls to subprocess. No plotting yet.

parent 0854b215
No related branches found
No related tags found
No related merge requests found
......@@ -11,9 +11,9 @@ This runs all dates of a single forecast.
Example usage::
$ conda activate py3EWSepi
$ python3 EnvProcessor.py --date 20200324 --islive --config config_Nepal_template_stripe.json
$ python3 EnvProcessor.py --start-date 20200324 --islive --config config_Nepal_template_stripe.json
or::
$ ./run_EnvProcessor.sh --date 20200324 --islive --config config_Nepal_template_stripe.json
$ ./run_EnvProcessor.sh --start-date 20200324 --islive --config config_Nepal_template_stripe.json
'''
import sys
import os
......@@ -36,6 +36,7 @@ import pandas
import rasterio as rio
from flagdir import jobStatus # gitlab project created by jws52
import NAMEPreProcessor as npp # script in EWS-Coordinator project
print('Preparing for operational!')
print("make sure to `conda activate py3EWSepi` environment!")
......@@ -132,7 +133,7 @@ logConfigDict = {
}
},
'loggers' : {
'EnvProcessor' : { # this is activated with logging.getLogger('Process.')
script_name : { # this is activated with logging.getLogger('Process.')
'level' : 'DEBUG',
'handlers' : ['handler_rot_file','handler_file','handler_email'],
'propagate' : True,
......@@ -150,7 +151,7 @@ logging.config.dictConfig(logConfigDict)
print(__name__)
# create a logger named according to how the file is called
#logger = logging.getLogger(__name__)
logger = logging.getLogger('EnvProcessor')
logger = logging.getLogger(script_name)
def move_default_logfile_handler(dstPathName,srcPathName=log_path_default,FileHandlerName='handler_file',):
'''For on-the-fly move of logging from default file to another. Copies the
......@@ -268,7 +269,7 @@ def parse_and_check_args(todayString):
logger.info(f"Command-line options are:\n{args}")
if not args.live:
# do not email maintainers
# remove the log handler that would send emails
logger.handlers = [h for h in logger.handlers if not isinstance(h,logging.handlers.SMTPHandler)]
if not isinstance(args.config_paths,list):
......@@ -325,6 +326,30 @@ def open_and_check_config(configFile):
return config
def subprocess_and_log(cmd,description_short,description_long,check=True):
'''Run a shell command (described by a comma separated list) and send stdout
and stderr to logfile, and raise any exception.'''
try:
process = subprocess.run(
cmd,
check=check,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT)
for line in process.stdout.decode('utf-8').split(r'\n'):
logger.info(f"{description_short} : " + line)
except subprocess.CalledProcessError as e:
for line in e.stdout.decode('utf-8').split(r'\n'):
logger.info(f"{description_short} : " + line)
logger.exception(f"Some failure when running {description_long}", exc_info=True)
raise
return process
def process_pre_job(input_args):
'''This is set up for environmental suitability v2.0'''
......@@ -343,13 +368,17 @@ def process_pre_job(input_args):
file_name = Template(config['Environment']['InputFileTemplate']).substitute(**config)
logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz")
status = subprocess.run(["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"])
cmd_ssh = ["ssh","-i",config['ServerKey'],config['ServerName'],f"test -f {file_path}/{file_name}.tar.gz"]
description_short = 'subprocess_ssh'
description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"
subprocess_ssh = subprocess_and_log(cmd_ssh,description_short,description_long,check=False)
if status.returncode == 1:
if subprocess_ssh.returncode == 1:
logger.info(f"Data not yet available for config {i}")
endScript(premature=True)
elif status.returncode == 0:
elif subprocess_ssh.returncode == 0:
logger.info(f"Data is available for config {i}, calculation shall proceed")
return
......@@ -362,22 +391,53 @@ def process_in_job(jobPath,config):
file_path = Template(config['Environment']['ServerPathTemplate']).substitute(**config)
file_name = Template(config['Environment']['InputFileTemplate']).substitute(**config)
# TODO: perform ssh file transfer in python instead of subprocess
subprocess.run(["scp","-i",config['ServerKey'],f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath])
# perform ssh file transfer in python instead of subprocess
cmd_scp = ["scp","-i",config['ServerKey'],f"{config['ServerName']}:{file_path}/{file_name}.tar.gz", jobPath]
description_short = 'subprocess_scp'
description_long = 'copying file from remote server to job directory'
subprocess_and_log(cmd_scp,description_short,description_long)
logger.info('untarring the input file')
# TODO: untar file in python (with tarfile module) instead of subprocess
subprocess.run(["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath])
# untar file in python (with tarfile module) instead of subprocess
cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath]
description_short = 'subprocess_tar'
description_long = 'untarring the input file'
subprocess_and_log(cmd_tar,description_short,description_long)
# basic check that contents are as expected
# 57 files of NAME .txt timesteps and one summary png file
if len(os.listdir(f"{jobPath}/{file_name}")) != 58:
logger.error(f"Insufficient contents of input tar file")
# TODO: call envsuit v2.0
# convert format of met data so that met extraction pipeline can work with it
logger.info('Converting .txt input files to .nc.tar.gz')
input_files_glob = f"{jobPath}/WR_EnvSuit_Met_Ethiopia_*/Met_Data_C1_T*.txt"
output_directory = f"{jobPath}/NAME_Met_as_netcdf"
try:
npp.process_met_office_NAME(input_files_glob,output_directory)
except:
logger.exception(f"Some failure when converting NAME data from .txt to nc.tar.gz")
# TODO: check that process_met_office_NAME() produced output as expected
# TODO: call EWS-Plotting
logger.info('Calling environmental suitability 2.0 so wait for output to appear')
cmd_envsuit = ["/storage/app/EWS/General/EWS-met_extractor/operational.sh",config["StartString"]]
description_short = 'env2.0'
description_long = 'pipeline of environmental suitability v2.0'
subprocess_and_log(cmd_envsuit,description_short,description_long)
# TODO: Check that the output appears as expected
# TODO: Also modify process_EWS_plotting()
pass
return
......@@ -517,9 +577,9 @@ def run_Process():
# check for a status file in job directory
if status.had_initial_status:
logger.error(f"Job path already exists and has status {status.status}")
logger.warning(f"Job path already exists and has status {status.status}")
endJob(status,premature = status.status != 'SUCCESS')
endScript(premature = status.status not in ['SUCCESS','INPROGRESS'])
logger.info(f"Current status of job directory is {status.status}")
......@@ -557,7 +617,12 @@ def run_Process():
with open(f"{jobPath}/{configFileName}.json",'w') as write_file:
json.dump(configjson,write_file,indent=4)
process_in_job(jobPath,configjson)
try:
process_in_job(jobPath,configjson)
except:
logger.exception('Some error with process_in_job()')
status.reset('ERROR')
endJob(status,premature=True)
# Run EWS-plotting command
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment