FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit a667087d authored by J.W. Smith's avatar J.W. Smith
Browse files

feat: New component - met data resampler

Supporting ongoing collaboration with Mauricio Fernandes and Felipe de Vargas, and Will Thurston, to develop wheat blast and fusarium head blight models driven by UK Met Office met data.

This processor is put it a subdirectory called 'extra' because it is not part of the core products being generated at Cambridge. As such, it will not have substantial documentation or testing, until a time when it is made a core product, or superceded by work elsewhere.
parent 59faa403
No related branches found
No related tags found
No related merge requests found
......@@ -117,6 +117,7 @@ def upload(config,FilesToSend,component):
'Epidemiology' : usual_path,
'Survey' : f"SURVEYDATA_{config['StartString']}_0000/",
'Scraper' : usual_path,
'MetResample' : f"MET_RESAMPLE_{config['StartString']}_0000/",
'Advisory' : usual_path }
......
......@@ -28,6 +28,7 @@ short_name = {
'Epidemiology' : 'EPI',
'Survey' : 'SURVEYDATA',
'Scraper' : 'SCRAPER',
'MetResample' : 'MET_RESAMPLE',
}
disease_latin_name_dict = {
......
#ProcessorMetResample.py
"""Functions to process NAME met data for the wheat blast model of Mauricio
Fernandes, Felipe de Vargas, and team. This works a lot like the met extractor,
but gathers met data across many input folders, and performs time resampling.
CAUTION: This is considered a temporary 'product', so it will run in the
operational system but will not be fully documented or tested. Expect it to be
removed or superceded.
"""
import copy
import datetime
import logging
import os
from iris import load_cube
from pandas import DataFrame, infer_freq, read_csv, to_datetime
from EpiModel.EpiPrep import lister, loader, prep
from EpiModel.EpiUtils import (
datetime_range,
parse_template_string)
from Processor import Processor
from ProcessorServer import (
process_pre_job_server_download,
get_data_from_server
)
from ProcessorUtils import (
add_filters_to_sublogger,
calc_epi_date_range,
open_and_check_config,
query_past_successes,
short_name)
class ProcessorMetResample(Processor):
def process_pre_job(self, args):
self.logger.debug('Performing process_pre_job()')
# If it will work on a single forecast, get a dedicated download
#return process_pre_job_server_download(args)
return query_past_successes(args, 'Environment')
def process_in_job(self, jobPath, status, configjson, component) -> object:
return self.process_in_job_met_resample(jobPath, status, configjson, component)
def process_post_job(self, jobPath, configjson):
pass
def __init__(self) -> None:
super().__init__()
logger = logging.getLogger('Processor.Extra.MetResample')
add_filters_to_sublogger(logger)
def gather_data(
self,
config,
start_date,
end_date,
component='MetResample',
lister_kwargs={}):
"""Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
Originally from coordinator ProcessEpidemiology.py."""
# TODO: Replace calls to this function with calls to
# EpiModel.EpiPrep.prep.gather_dependent_models()
# TODO: Simplify the set of required arguments . Check if config is necessary.
# Use config-defined file lister
listerfunc = config[component]['FileListerFunction']
if isinstance(listerfunc,str):
try:
# See if the function is available in this file
file_lister = globals().get(config[component]['FileListerFunction'])
except:
file_lister = getattr(
lister,
config[component]['FileListerFunction'])
elif callable(listerfunc):
file_lister = listerfunc
file_loader = getattr(
loader,
config[component]['FileLoaderFunction'])
config_for_lister = config.copy()
#lister_kwargs['reference_date']=config['ReferenceTime']
lister_kwargs['is_met']=config[component].get('is_met',False)
loader_kwargs= {}
loader_kwargs['VariableName']= config_for_lister[component].get('VariableName')
#loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative')
prep.prep_input(config_for_lister,start_date,end_date,
component=component,
file_lister=file_lister,
file_loader=file_loader,
lister_kwargs=lister_kwargs,
**loader_kwargs)
assert os.path.isfile(config[component]['FileNamePrepared'])
return
def load_prepared_file(self,fn: str) -> DataFrame:
"""Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
"""
df = read_csv(
fn,
header=[0,1],
index_col=0)
df.index = to_datetime(df.index,format='%Y%m%d%H%M')
return df
def resample_data(self,config,component,resample_scale='1H'):
"""Increases time resolution of data files by filling in missing time
points with next available time point. i.e. assumes each time point is an
average over the period since the last time point, as is the case for NAME
output.
Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
"""
# load dataframe from config
file_name_prepared = config[component]['FileNamePrepared']
df_original = self.load_prepared_file(file_name_prepared)
time_resolution = df_original.index[1] - df_original.index[0]
input_freq = infer_freq(df_original.index)
print(f"Resampling {component} from {input_freq} to {resample_scale}")
# TODO: Work out how to get lister to extend the time range of first or last
# day for the instantaneous met data.
# TODO: Check whether to reapply this for precipitation, the only field
# where the first time point may be missing.
# In order for backfill to work for the earliest time point, we need to
# temporarily provide a prior timepoint
#dfm1 = df_original.iloc[[0],:]
#dfm1.index = dfm1.index - time_resolution
#dfm1[:] = nan
#df_original = concat([dfm1,df_original])
resampler = df_original.resample(resample_scale)
resample_method = config[component].get('resampling','backfill')
if resample_method == 'backfill':
print('Upsampling by backfilling')
df_out = resampler.bfill()
elif resample_method == 'interpolate':
print('Upsampling by interpolating linearly')
df_out = resampler.interpolate(method='linear')
else:
raise Exception('Unknown resampling method in config.')
# remove the temporary prior timepoint
#df_out = df_out.drop(df_out.index[0])
# save the result
save_path = file_name_prepared.replace('.csv',f"_{resample_scale}.csv")
df_out.to_csv(save_path)
return df_out, save_path
def gather_and_resample(
self,
config_met,
reference_date_str,
calculation_span_days):
"""Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py"""
components = config_met['components']
start_time, end_time = calc_epi_date_range(
reference_date_str,
calculation_span_days)
paths = []
for component in components:
self.logger.info(f"Working on {component}")
# parse the output file path
fnp = parse_template_string(
config_met[component]['FileNamePrepared'],
config_met,
f"{component} FileNamePrepared")
config_met[component]['FileNamePrepared'] = fnp
self.logger.debug('Making the output directory')
output_dir = os.path.dirname(config_met[component]['FileNamePrepared'])
os.makedirs(output_dir,exist_ok=True)
self.logger.debug('Gathering data')
self.gather_data(
copy.deepcopy(config_met),
start_time,#datetime.datetime(2023,6,7,3)
end_time,#datetime.datetime(2023,6,25,0),
component=component,
)
self.logger.debug('Performing resampling')
_, path = self.resample_data(copy.deepcopy(config_met), component)
paths += [path]
self.logger.info('Finished gather_and_resample()')
return paths
def process_in_job_met_resample(self, jobPath, status, config, component):
"""
"""
self.logger.info('started process_in_job_met_resample()')
# Initialise output variable
proc_out = {}
# Output files available for upload
proc_out['output'] = []
# Processing files available for clearing
proc_out['clearup'] = []
# Obtain a dedicated copy of the latest forecast data
#data_result = get_data_from_server(jobPath,config,component)
#
#if data_result['output'] is not None:
# proc_out['output'] = data_result['output']
#
#proc_out['clearup'] += data_result['clearup']
config_i = config[component].copy()
config_i['jobPath'] = jobPath
# provide top-level arguments to component's config
for k,v in config.items():
if k not in short_name.keys():
config_i[k]=v
# Get run config
# path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs,
# but on the extended EastAfrica grid that includes Zambia.
ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/'
# set up the required inputs
#"Host" : {
# #"Bounds" : [38.4,7.8,40.2,8.8] # Roughly box around Arsi/East Shewa in Ethiopia
# #"Bounds" : [38.8,7.1,39.1,7.3] # Tiny box of 4 gridpoints in West Arsi in Ethiopia
# "Bounds" : [28.0,-15.0,29.0,-14.0] # 1deg x 1deg box in centre of Zambia wheat producing areas, north of Lusaka
#},
config_met_path = config[component].get('RunConfig')
config_met = open_and_check_config(config_met_path)
config_met['jobPath'] = jobPath
# provide top-level arguments to component's config
for k,v in config.items():
if k not in short_name.keys():
config_met[k]=v
self.logger.info('Calling gather_and_resample()')
paths_out = self.gather_and_resample(
config_met,
reference_date_str = config['StartString'],
calculation_span_days = config[component]['CalculationSpanDays']
)
# 1H met files to upload
proc_out['output'] += paths_out
return proc_out
def has_times(cube_filename,desired_times,constraint=None):
"""Checks the times listed in desired_times are within the date range
available in the single cube loaded from cube_filename.
Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py"""
cube = load_cube(cube_filename,constraint=constraint)
tc = cube.coord('time')
tc_dates = tc.units.num2date(tc.points)
try:
for dt in desired_times:
# Check it is within range
assert dt >= tc_dates[0], f"desired_time {dt} is out of range of cube from {cube_filename}"
assert dt <= tc_dates[-1], f"desired_time {dt} is out of range of cube from {cube_filename}"
# Check exact time is available
#assert sum([dt == tcdi for tcdi in tc_dates])==1
return True
except AssertionError:
return False
def list_files_historical_daily(
config,
startTime,
endTime,
component,
is_met=False,
**kwargs):
'''Expecting time points are from multiple files, using the first day
of values at the date of that file.
If the files are calculated with forecast data, this means all env suit
is based on 1-day forecasts.
If the data are based on analysis, this keeps an even chunking.
One file for all time points in forecast.
is_met (default False) is False for spore deposition, and True for met
variables extracted on a consistent grid (including _M_ met variables which
NAME has already performed some backfilling to provide timepoints available
for _I_ data but missing for _M_ data). For non-met variables, the first
available time point is assumed to be at 03:00 representing 00:01-03:00
since they are typically averages of the time period since the timestamp of
the last datapoint. Met variables are generally instantaneous so the first
available datapoint is at 00:00. The start time is set by the startTime
parameter (i.e. start hour should be 3 for non-met data, and 0 for met
data). For the last day in the range, met data may also exist for
00:00+1day. This parameter determines whether to look for that last
instantaneous time point.
Returns list of files and timespan between startTime and endTime that each
file should be considered for.
Based on a copy from
/home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
Copied from ews-epidemiology.EpiPrep.lister.py'''
# TODO: Update EpiPrep.lister with this version
config_copy = config.copy()
desiredTimes = datetime_range(startTime,endTime+datetime.timedelta(seconds=1),days=1)
files = []
timespans = []
lastfile = ''
lastaltfile = ''
for desiredTime in desiredTimes:
initialTime = desiredTime # since startTime is 0300 , otherwise use + datetime.timedelta(seconds=1)
nextDayTime = desiredTime + datetime.timedelta(hours=23,minutes=59)
timespan = [initialTime,nextDayTime]
config_copy['DateString'] = desiredTime.strftime('%Y%m%d')
# determine configuration's file path. Note the use of variables from
# first level of config
FilePath = parse_template_string(
config[component]['PathTemplate'],
config_copy,
f"{component} PathTemplate")
AltFilePath = None
if 'PathTemplateAlternative' in config[component]:
AltFilePath = parse_template_string(
config[component]['PathTemplateAlternative'],
config_copy,
f"{component} PathTemplate")
FileName = parse_template_string(
config[component]['FileNameTemplate'],
config_copy,
f"{component} FileNameTemplate")
file = f"{FilePath}{FileName}"
altfile = f"{AltFilePath}{FileName}"
logging.debug(f"Trying file {file}")
if AltFilePath is not None:
logging.debug(f"Otherwise trying {altfile}")
file_has_times = False
if os.path.isfile(file):
file_has_times = has_times(file,timespan)
altfile_has_times = False
if os.path.isfile(altfile):
altfile_has_times = has_times(altfile,timespan)
lastfile_has_times = False
if lastfile is not None:
if os.path.isfile(lastfile):
lastfile_has_times = has_times(lastfile,timespan)
lastaltfile_has_times = False
if lastaltfile is not None:
if os.path.isfile(lastaltfile):
lastaltfile_has_times = has_times(lastaltfile,timespan)
# 1st preference: use FileTemplate (e.g. analysis)
if os.path.isfile(file) & file_has_times:
files += [file]
timespans += [timespan]
lastfile = file
# 2nd preference: continuation of first FileTemplate (e.g. analysis)
elif os.path.isfile(lastfile) & lastfile_has_times:
logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file")
timespan = [lastInitialTime,nextDayTime]
timespans[-1] = timespan
# 3rd preference: use alternative FileTemplate (e.g. latest forecast)
elif os.path.isfile(altfile) & altfile_has_times:
files += [altfile]
timespans += [timespan]
lastaltfile = altfile
# 4th preference: continuation of alternative FileTemplate (e.g. forecast)
elif os.path.isfile(lastaltfile) & lastaltfile_has_times:
logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file")
timespan = [lastInitialTime,nextDayTime]
timespans[-1] = timespan
else:
logging.error(f"No {component} path for day of {desiredTime}")
raise Exception
lastInitialTime = timespan[0]
# Extend reach of final day, just in case the file exists.
# Useful for the case of instantaneous met variables, where it does exist
if is_met is True:
timespans[-1][1] += datetime.timedelta(hours=3)
logging.info('Each timespan and file is:')
for timespan, filen in zip(timespans,files): logging.info(f"{timespan}: {filen}")
return files, timespans
if __name__ == '__main__':
processor = ProcessorMetResample()
processor.run_processor("MetResample")
\ No newline at end of file
#!/bin/bash
# get path of this script (to point to files within the same git repo)
proc_path="$(dirname "$(readlink -f "$0")")"
source "${proc_path}"/run_utils.sh
setup_conda_env
python "${proc_path}"/../coordinator/extra/ProcessorMetResample.py "$@"
exit_code=$?
teardown_conda_env
exit $exit_code
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment