FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 47c254fb authored by L. Bower's avatar L. Bower
Browse files

removing met-resample code as now in it's own repo...

removing met-resample code as now in it's own repo https://gitlab.developers.cam.ac.uk/gilligan-epid/wheat-rusts/met-resampling
parent abdd9aea
No related branches found
No related tags found
No related merge requests found
"""Functions to process NAME met data for the wheat blast model of Mauricio
Fernandes, Felipe de Vargas, and team. This works a lot like the met extractor,
but gathers met data across many input folders, and performs time resampling.
CAUTION: This is considered a temporary 'product', so it will run in the
operational system but will not be fully documented or tested. Expect it to be
removed or superceded.
# import copy
# import datetime
# import logging
# import os
# from zipfile import ZipFile, ZIP_DEFLATED
# from iris import load_cube
# from pandas import DataFrame, infer_freq, read_csv, to_datetime
# from EpiModel.EpiPrep import lister, loader, prep
# from EpiModel.EpiUtils import (
# datetime_range,
# parse_template_string)
# from coordinator.processor_base import ProcessorBase
# from coordinator.utils.processor_utils import calc_epi_date_range, short_name, open_and_check_config
# logger = logging.getLogger(__name__)
# class ProcessorMetResample(ProcessorBase):
# def process_pre_job(self, args) -> bool:
# logger.debug('Performing process_pre_job()')
# # If it will work on a single forecast, get a dedicated download
# #return process_pre_job_server_download(args)
# # return query_past_successes(args, 'Environment')
# return True
# def process_in_job(self, jobPath, status, configjson, component) -> object:
# return self.process_in_job_met_resample(jobPath, status, configjson, component)
# def process_post_job(self, jobPath, configjson):
# pass
# def __init__(self) -> None:
# super().__init__()
# def gather_data(
# self,
# config,
# start_date,
# end_date,
# component='MetResample',
# lister_kwargs={}):
# """Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
# Originally from coordinator ProcessEpidemiology.py."""
# # TODO: Replace calls to this function with calls to
# # EpiModel.EpiPrep.prep.gather_dependent_models()
# # TODO: Simplify the set of required arguments . Check if config is necessary.
# # Use config-defined file lister
# listerfunc = config[component]['FileListerFunction']
# if isinstance(listerfunc,str):
# try:
# # See if the function is available in this file
# file_lister = globals().get(config[component]['FileListerFunction'])
# except:
# file_lister = getattr(
# lister,
# config[component]['FileListerFunction'])
# elif callable(listerfunc):
# file_lister = listerfunc
# file_loader = getattr(
# loader,
# config[component]['FileLoaderFunction'])
# config_for_lister = config.copy()
# #lister_kwargs['reference_date']=config['ReferenceTime']
# lister_kwargs['is_met']=config[component].get('is_met',False)
# loader_kwargs= {}
# loader_kwargs['VariableName']= config_for_lister[component].get('VariableName')
# #loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative')
# file_of_origins = prep.prep_input(config_for_lister,start_date,end_date,
# component=component,
# file_lister=file_lister,
# file_loader=file_loader,
# lister_kwargs=lister_kwargs,
# **loader_kwargs)
# assert os.path.isfile(config[component]['FileNamePrepared'])
# return file_of_origins
# def load_prepared_file(self,fn: str) -> DataFrame:
# """Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
# """
# df = read_csv(
# fn,
# header=[0,1],
# index_col=0)
# df.index = to_datetime(df.index,format='%Y%m%d%H%M')
# return df
# def resample_data(self,config,component,resample_scale='1H'):
# """Increases time resolution of data files by filling in missing time
# points with next available time point. i.e. assumes each time point is an
# average over the period since the last time point, as is the case for NAME
# output.
# Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
# """
# # load dataframe from config
# file_name_prepared = config[component]['FileNamePrepared']
# df_original = self.load_prepared_file(file_name_prepared)
# time_resolution = df_original.index[1] - df_original.index[0]
# input_freq = infer_freq(df_original.index)
# print(f"Resampling {component} from {input_freq} to {resample_scale}")
# # TODO: Work out how to get lister to extend the time range of first or last
# # day for the instantaneous met data.
# # TODO: Check whether to reapply this for precipitation, the only field
# # where the first time point may be missing.
# # In order for backfill to work for the earliest time point, we need to
# # temporarily provide a prior timepoint
# #dfm1 = df_original.iloc[[0],:]
# #dfm1.index = dfm1.index - time_resolution
# #dfm1[:] = nan
# #df_original = concat([dfm1,df_original])
# resampler = df_original.resample(resample_scale)
# resample_method = config[component].get('resampling','backfill')
# if resample_method == 'backfill':
# print('Upsampling by backfilling')
# df_out = resampler.bfill()
# elif resample_method == 'interpolate':
# print('Upsampling by interpolating linearly')
# df_out = resampler.interpolate(method='linear')
# else:
# raise Exception('Unknown resampling method in config.')
# # remove the temporary prior timepoint
# #df_out = df_out.drop(df_out.index[0])
# # save the result
# save_path = file_name_prepared.replace('.csv',f"_{resample_scale}.csv")
# df_out.to_csv(save_path)
# return df_out, save_path
# def gather_and_resample(
# self,
# config_met,
# reference_date_str,
# calculation_span_days):
# """Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py"""
# components = config_met['components']
# start_time, end_time = calc_epi_date_range(
# reference_date_str,
# calculation_span_days)
# paths = []
# origins = []
# for component in components:
# logger.info(f"Working on {component}")
# # parse the output file path
# fnp = parse_template_string(
# config_met[component]['FileNamePrepared'],
# config_met,
# f"{component} FileNamePrepared")
# config_met[component]['FileNamePrepared'] = fnp
# logger.debug('Making the output directory')
# output_dir = os.path.dirname(config_met[component]['FileNamePrepared'])
# os.makedirs(output_dir,exist_ok=True)
# logger.debug('Gathering data')
# file_of_origins = self.gather_data(
# copy.deepcopy(config_met),
# start_time,#datetime.datetime(2023,6,7,3)
# end_time,#datetime.datetime(2023,6,25,0),
# component=component,
# )
# origins += [file_of_origins]
# logger.debug('Performing resampling')
# _, path = self.resample_data(copy.deepcopy(config_met), component)
# paths += [path]
# logger.info('Finished gather_and_resample()')
# return origins, paths
# def process_in_job_met_resample(self, jobPath, status, config, component):
# """
# """
# logger.info('started process_in_job_met_resample()')
# # Initialise output variable
# proc_out = {}
# # Output files available for upload
# proc_out['output'] = []
# # Processing files available for clearing
# proc_out['clearup'] = []
# # Obtain a dedicated copy of the latest forecast data
# #data_result = get_data_from_server(jobPath,config,component)
# #
# #if data_result['output'] is not None:
# # proc_out['output'] = data_result['output']
# #
# #proc_out['clearup'] += data_result['clearup']
# config_i = config[component].copy()
# config_i['jobPath'] = jobPath
# # provide top-level arguments to component's config
# for k,v in config.items():
# if k not in short_name.keys():
# config_i[k]=v
# # Get run config
# # path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs,
# # but on the extended EastAfrica grid that includes Zambia.
# ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/'
# # set up the required inputs
# #"Host" : {
# # #"Bounds" : [38.4,7.8,40.2,8.8] # Roughly box around Arsi/East Shewa in Ethiopia
# # #"Bounds" : [38.8,7.1,39.1,7.3] # Tiny box of 4 gridpoints in West Arsi in Ethiopia
# # "Bounds" : [28.0,-15.0,29.0,-14.0] # 1deg x 1deg box in centre of Zambia wheat producing areas, north of Lusaka
# #},
# config_met_path = config[component].get('RunConfig')
# config_met = open_and_check_config(config_met_path)
# config_met['jobPath'] = jobPath
# # provide top-level arguments to component's config
# for k,v in config.items():
# if k not in short_name.keys():
# config_met[k]=v
# logger.info('Calling gather_and_resample()')
# origins, paths_out = self.gather_and_resample(
# config_met,
# reference_date_str = config['StartString'],
# calculation_span_days = config[component]['CalculationSpanDays']
# )
# # zip the files that will be provided to collaborators
# files_to_zip = origins + paths_out
# zip_path = f"{jobPath}/data_met_prep_{config['StartString']}.zip"
# with ZipFile(zip_path, 'w') as zipf:
# for file_to_zip in files_to_zip:
# filename_in_archive = os.path.basename(file_to_zip)
# zipf.write(
# file_to_zip,
# arcname=filename_in_archive,
# compress_type=ZIP_DEFLATED)
# # 1H met files to upload
# proc_out['output'] += [zip_path]
# proc_out['clearup'] += files_to_zip
# return proc_out
# def has_times(cube_filename,desired_times,constraint=None):
# """Checks the times listed in desired_times are within the date range
# available in the single cube loaded from cube_filename.
# Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py"""
# cube = load_cube(cube_filename,constraint=constraint)
# tc = cube.coord('time')
# tc_dates = tc.units.num2date(tc.points)
# try:
# for dt in desired_times:
# # Check it is within range
# assert dt >= tc_dates[0], f"desired_time {dt} is out of range of cube from {cube_filename}"
# assert dt <= tc_dates[-1], f"desired_time {dt} is out of range of cube from {cube_filename}"
# # Check exact time is available
# #assert sum([dt == tcdi for tcdi in tc_dates])==1
# return True
# except AssertionError:
# return False
# def list_files_historical_daily(
# config,
# startTime,
# endTime,
# component,
# is_met=False,
# **kwargs):
# '''Expecting time points are from multiple files, using the first day
# of values at the date of that file.
# If the files are calculated with forecast data, this means all env suit
# is based on 1-day forecasts.
# If the data are based on analysis, this keeps an even chunking.
# One file for all time points in forecast.
# is_met (default False) is False for spore deposition, and True for met
# variables extracted on a consistent grid (including _M_ met variables which
# NAME has already performed some backfilling to provide timepoints available
# for _I_ data but missing for _M_ data). For non-met variables, the first
# available time point is assumed to be at 03:00 representing 00:01-03:00
# since they are typically averages of the time period since the timestamp of
# the last datapoint. Met variables are generally instantaneous so the first
# available datapoint is at 00:00. The start time is set by the startTime
# parameter (i.e. start hour should be 3 for non-met data, and 0 for met
# data). For the last day in the range, met data may also exist for
# 00:00+1day. This parameter determines whether to look for that last
# instantaneous time point.
# Returns list of files and timespan between startTime and endTime that each
# file should be considered for.
# Based on a copy from
# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py
# Copied from ews-epidemiology.EpiPrep.lister.py'''
# # TODO: Update EpiPrep.lister with this version
# config_copy = config.copy()
# desiredTimes = datetime_range(startTime,endTime+datetime.timedelta(seconds=1),days=1)
# files = []
# timespans = []
# lastfile = ''
# lastaltfile = ''
# for desiredTime in desiredTimes:
# initialTime = desiredTime # since startTime is 0300 , otherwise use + datetime.timedelta(seconds=1)
# nextDayTime = desiredTime + datetime.timedelta(hours=23,minutes=59)
# timespan = [initialTime,nextDayTime]
# config_copy['DateString'] = desiredTime.strftime('%Y%m%d')
# # determine configuration's file path. Note the use of variables from
# # first level of config
# FilePath = parse_template_string(
# config[component]['PathTemplate'],
# config_copy,
# f"{component} PathTemplate")
# AltFilePath = None
# if 'PathTemplateAlternative' in config[component]:
# AltFilePath = parse_template_string(
# config[component]['PathTemplateAlternative'],
# config_copy,
# f"{component} PathTemplate")
# FileName = parse_template_string(
# config[component]['FileNameTemplate'],
# config_copy,
# f"{component} FileNameTemplate")
# file = f"{FilePath}{FileName}"
# altfile = f"{AltFilePath}{FileName}"
# logging.debug(f"Trying file {file}")
# if AltFilePath is not None:
# logging.debug(f"Otherwise trying {altfile}")
# file_has_times = False
# if os.path.isfile(file):
# file_has_times = has_times(file,timespan)
# altfile_has_times = False
# if os.path.isfile(altfile):
# altfile_has_times = has_times(altfile,timespan)
# lastfile_has_times = False
# if lastfile is not None:
# if os.path.isfile(lastfile):
# lastfile_has_times = has_times(lastfile,timespan)
# lastaltfile_has_times = False
# if lastaltfile is not None:
# if os.path.isfile(lastaltfile):
# lastaltfile_has_times = has_times(lastaltfile,timespan)
# # 1st preference: use FileTemplate (e.g. analysis)
# if os.path.isfile(file) & file_has_times:
# files += [file]
# timespans += [timespan]
# lastfile = file
# # 2nd preference: continuation of first FileTemplate (e.g. analysis)
# elif os.path.isfile(lastfile) & lastfile_has_times:
# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file")
# timespan = [lastInitialTime,nextDayTime]
# timespans[-1] = timespan
# # 3rd preference: use alternative FileTemplate (e.g. latest forecast)
# elif os.path.isfile(altfile) & altfile_has_times:
# files += [altfile]
# timespans += [timespan]
# lastaltfile = altfile
# # 4th preference: continuation of alternative FileTemplate (e.g. forecast)
# elif os.path.isfile(lastaltfile) & lastaltfile_has_times:
# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file")
# timespan = [lastInitialTime,nextDayTime]
# timespans[-1] = timespan
# else:
# logging.error(f"No {component} path for day of {desiredTime}")
# raise Exception
# lastInitialTime = timespan[0]
# # Extend reach of final day, just in case the file exists.
# # Useful for the case of instantaneous met variables, where it does exist
# if is_met is True:
# timespans[-1][1] += datetime.timedelta(hours=3)
# logging.info('Each timespan and file is:')
# for timespan, filen in zip(timespans,files): logging.info(f"{timespan}: {filen}")
# return files, timespans
# if __name__ == '__main__':
# processor = ProcessorMetResample()
# processor.run_processor("MetResample")
\ No newline at end of file
## get path of this script (to point to files within the same git repo)
#proc_path="$(dirname "$(readlink -f "$0")")"
#source "${proc_path}"/run_utils.sh
#python "${proc_path}"/../coordinator/extra/ProcessorMetResample.py "$@"
#exit $exit_code
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment