diff --git a/coordinator/processor_met_resample.py b/coordinator/processor_met_resample.py deleted file mode 100644 index 213e3c6ff80f34d0a85e399ca85419b1d55fa695..0000000000000000000000000000000000000000 --- a/coordinator/processor_met_resample.py +++ /dev/null @@ -1,484 +0,0 @@ -#ProcessorMetResample.py -"""Functions to process NAME met data for the wheat blast model of Mauricio -Fernandes, Felipe de Vargas, and team. This works a lot like the met extractor, -but gathers met data across many input folders, and performs time resampling. - -CAUTION: This is considered a temporary 'product', so it will run in the -operational system but will not be fully documented or tested. Expect it to be -removed or superceded. -""" - -# import copy -# import datetime -# import logging -# import os -# from zipfile import ZipFile, ZIP_DEFLATED -# -# from iris import load_cube -# from pandas import DataFrame, infer_freq, read_csv, to_datetime -# -# from EpiModel.EpiPrep import lister, loader, prep -# from EpiModel.EpiUtils import ( -# datetime_range, -# parse_template_string) -# from coordinator.processor_base import ProcessorBase -# from coordinator.utils.processor_utils import calc_epi_date_range, short_name, open_and_check_config -# -# -# logger = logging.getLogger(__name__) -# -# class ProcessorMetResample(ProcessorBase): -# -# def process_pre_job(self, args) -> bool: -# logger.debug('Performing process_pre_job()') -# -# # If it will work on a single forecast, get a dedicated download -# #return process_pre_job_server_download(args) -# # return query_past_successes(args, 'Environment') -# return True -# -# def process_in_job(self, jobPath, status, configjson, component) -> object: -# return self.process_in_job_met_resample(jobPath, status, configjson, component) -# -# -# def process_post_job(self, jobPath, configjson): -# pass -# -# def __init__(self) -> None: -# super().__init__() -# -# def gather_data( -# self, -# config, -# start_date, -# end_date, -# component='MetResample', -# lister_kwargs={}): -# """Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py -# Originally from coordinator ProcessEpidemiology.py.""" -# -# # TODO: Replace calls to this function with calls to -# # EpiModel.EpiPrep.prep.gather_dependent_models() -# -# # TODO: Simplify the set of required arguments . Check if config is necessary. -# -# # Use config-defined file lister -# listerfunc = config[component]['FileListerFunction'] -# if isinstance(listerfunc,str): -# try: -# # See if the function is available in this file -# file_lister = globals().get(config[component]['FileListerFunction']) -# except: -# file_lister = getattr( -# lister, -# config[component]['FileListerFunction']) -# elif callable(listerfunc): -# file_lister = listerfunc -# -# file_loader = getattr( -# loader, -# config[component]['FileLoaderFunction']) -# -# config_for_lister = config.copy() -# -# #lister_kwargs['reference_date']=config['ReferenceTime'] -# lister_kwargs['is_met']=config[component].get('is_met',False) -# -# loader_kwargs= {} -# loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') -# #loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative') -# -# file_of_origins = prep.prep_input(config_for_lister,start_date,end_date, -# component=component, -# file_lister=file_lister, -# file_loader=file_loader, -# lister_kwargs=lister_kwargs, -# **loader_kwargs) -# -# assert os.path.isfile(config[component]['FileNamePrepared']) -# -# return file_of_origins -# -# def load_prepared_file(self,fn: str) -> DataFrame: -# """Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py -# """ -# -# df = read_csv( -# fn, -# header=[0,1], -# index_col=0) -# -# df.index = to_datetime(df.index,format='%Y%m%d%H%M') -# -# return df -# -# -# def resample_data(self,config,component,resample_scale='1H'): -# """Increases time resolution of data files by filling in missing time -# points with next available time point. i.e. assumes each time point is an -# average over the period since the last time point, as is the case for NAME -# output. -# -# Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py -# """ -# -# -# # load dataframe from config -# file_name_prepared = config[component]['FileNamePrepared'] -# -# df_original = self.load_prepared_file(file_name_prepared) -# -# time_resolution = df_original.index[1] - df_original.index[0] -# -# input_freq = infer_freq(df_original.index) -# -# print(f"Resampling {component} from {input_freq} to {resample_scale}") -# -# # TODO: Work out how to get lister to extend the time range of first or last -# # day for the instantaneous met data. -# -# # TODO: Check whether to reapply this for precipitation, the only field -# # where the first time point may be missing. -# # In order for backfill to work for the earliest time point, we need to -# # temporarily provide a prior timepoint -# #dfm1 = df_original.iloc[[0],:] -# #dfm1.index = dfm1.index - time_resolution -# #dfm1[:] = nan -# #df_original = concat([dfm1,df_original]) -# -# resampler = df_original.resample(resample_scale) -# -# resample_method = config[component].get('resampling','backfill') -# if resample_method == 'backfill': -# print('Upsampling by backfilling') -# df_out = resampler.bfill() -# elif resample_method == 'interpolate': -# print('Upsampling by interpolating linearly') -# df_out = resampler.interpolate(method='linear') -# else: -# raise Exception('Unknown resampling method in config.') -# -# # remove the temporary prior timepoint -# #df_out = df_out.drop(df_out.index[0]) -# -# # save the result -# save_path = file_name_prepared.replace('.csv',f"_{resample_scale}.csv") -# df_out.to_csv(save_path) -# -# return df_out, save_path -# -# def gather_and_resample( -# self, -# config_met, -# reference_date_str, -# calculation_span_days): -# """Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" -# -# components = config_met['components'] -# -# start_time, end_time = calc_epi_date_range( -# reference_date_str, -# calculation_span_days) -# -# paths = [] -# origins = [] -# -# for component in components: -# -# logger.info(f"Working on {component}") -# -# # parse the output file path -# fnp = parse_template_string( -# config_met[component]['FileNamePrepared'], -# config_met, -# f"{component} FileNamePrepared") -# config_met[component]['FileNamePrepared'] = fnp -# -# logger.debug('Making the output directory') -# output_dir = os.path.dirname(config_met[component]['FileNamePrepared']) -# os.makedirs(output_dir,exist_ok=True) -# -# logger.debug('Gathering data') -# -# file_of_origins = self.gather_data( -# copy.deepcopy(config_met), -# start_time,#datetime.datetime(2023,6,7,3) -# end_time,#datetime.datetime(2023,6,25,0), -# component=component, -# ) -# -# origins += [file_of_origins] -# -# logger.debug('Performing resampling') -# -# _, path = self.resample_data(copy.deepcopy(config_met), component) -# -# paths += [path] -# -# logger.info('Finished gather_and_resample()') -# -# return origins, paths -# -# -# def process_in_job_met_resample(self, jobPath, status, config, component): -# """ -# """ -# -# logger.info('started process_in_job_met_resample()') -# -# # Initialise output variable -# proc_out = {} -# # Output files available for upload -# proc_out['output'] = [] -# # Processing files available for clearing -# proc_out['clearup'] = [] -# -# # Obtain a dedicated copy of the latest forecast data -# #data_result = get_data_from_server(jobPath,config,component) -# # -# #if data_result['output'] is not None: -# # proc_out['output'] = data_result['output'] -# # -# #proc_out['clearup'] += data_result['clearup'] -# -# config_i = config[component].copy() -# -# config_i['jobPath'] = jobPath -# -# # provide top-level arguments to component's config -# for k,v in config.items(): -# if k not in short_name.keys(): -# config_i[k]=v -# -# # Get run config -# -# # path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs, -# # but on the extended EastAfrica grid that includes Zambia. -# ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/' -# -# # set up the required inputs -# #"Host" : { -# # #"Bounds" : [38.4,7.8,40.2,8.8] # Roughly box around Arsi/East Shewa in Ethiopia -# # #"Bounds" : [38.8,7.1,39.1,7.3] # Tiny box of 4 gridpoints in West Arsi in Ethiopia -# # "Bounds" : [28.0,-15.0,29.0,-14.0] # 1deg x 1deg box in centre of Zambia wheat producing areas, north of Lusaka -# #}, -# -# config_met_path = config[component].get('RunConfig') -# config_met = open_and_check_config(config_met_path) -# config_met['jobPath'] = jobPath -# -# # provide top-level arguments to component's config -# for k,v in config.items(): -# if k not in short_name.keys(): -# config_met[k]=v -# -# logger.info('Calling gather_and_resample()') -# -# origins, paths_out = self.gather_and_resample( -# config_met, -# reference_date_str = config['StartString'], -# calculation_span_days = config[component]['CalculationSpanDays'] -# ) -# -# # zip the files that will be provided to collaborators -# files_to_zip = origins + paths_out -# -# zip_path = f"{jobPath}/data_met_prep_{config['StartString']}.zip" -# -# with ZipFile(zip_path, 'w') as zipf: -# -# for file_to_zip in files_to_zip: -# -# filename_in_archive = os.path.basename(file_to_zip) -# -# zipf.write( -# file_to_zip, -# arcname=filename_in_archive, -# compress_type=ZIP_DEFLATED) -# -# # 1H met files to upload -# proc_out['output'] += [zip_path] -# proc_out['clearup'] += files_to_zip -# -# return proc_out -# -# def has_times(cube_filename,desired_times,constraint=None): -# """Checks the times listed in desired_times are within the date range -# available in the single cube loaded from cube_filename. -# Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" -# -# cube = load_cube(cube_filename,constraint=constraint) -# -# tc = cube.coord('time') -# tc_dates = tc.units.num2date(tc.points) -# -# try: -# for dt in desired_times: -# # Check it is within range -# assert dt >= tc_dates[0], f"desired_time {dt} is out of range of cube from {cube_filename}" -# assert dt <= tc_dates[-1], f"desired_time {dt} is out of range of cube from {cube_filename}" -# -# # Check exact time is available -# #assert sum([dt == tcdi for tcdi in tc_dates])==1 -# return True -# -# except AssertionError: -# return False -# -# def list_files_historical_daily( -# config, -# startTime, -# endTime, -# component, -# is_met=False, -# **kwargs): -# '''Expecting time points are from multiple files, using the first day -# of values at the date of that file. -# If the files are calculated with forecast data, this means all env suit -# is based on 1-day forecasts. -# If the data are based on analysis, this keeps an even chunking. -# One file for all time points in forecast. -# -# is_met (default False) is False for spore deposition, and True for met -# variables extracted on a consistent grid (including _M_ met variables which -# NAME has already performed some backfilling to provide timepoints available -# for _I_ data but missing for _M_ data). For non-met variables, the first -# available time point is assumed to be at 03:00 representing 00:01-03:00 -# since they are typically averages of the time period since the timestamp of -# the last datapoint. Met variables are generally instantaneous so the first -# available datapoint is at 00:00. The start time is set by the startTime -# parameter (i.e. start hour should be 3 for non-met data, and 0 for met -# data). For the last day in the range, met data may also exist for -# 00:00+1day. This parameter determines whether to look for that last -# instantaneous time point. -# -# Returns list of files and timespan between startTime and endTime that each -# file should be considered for. -# -# Based on a copy from -# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py -# Copied from ews-epidemiology.EpiPrep.lister.py''' -# -# # TODO: Update EpiPrep.lister with this version -# -# config_copy = config.copy() -# -# desiredTimes = datetime_range(startTime,endTime+datetime.timedelta(seconds=1),days=1) -# -# files = [] -# timespans = [] -# lastfile = '' -# lastaltfile = '' -# for desiredTime in desiredTimes: -# -# initialTime = desiredTime # since startTime is 0300 , otherwise use + datetime.timedelta(seconds=1) -# nextDayTime = desiredTime + datetime.timedelta(hours=23,minutes=59) -# -# timespan = [initialTime,nextDayTime] -# -# config_copy['DateString'] = desiredTime.strftime('%Y%m%d') -# -# # determine configuration's file path. Note the use of variables from -# # first level of config -# FilePath = parse_template_string( -# config[component]['PathTemplate'], -# config_copy, -# f"{component} PathTemplate") -# -# AltFilePath = None -# if 'PathTemplateAlternative' in config[component]: -# AltFilePath = parse_template_string( -# config[component]['PathTemplateAlternative'], -# config_copy, -# f"{component} PathTemplate") -# -# FileName = parse_template_string( -# config[component]['FileNameTemplate'], -# config_copy, -# f"{component} FileNameTemplate") -# -# file = f"{FilePath}{FileName}" -# altfile = f"{AltFilePath}{FileName}" -# -# logging.debug(f"Trying file {file}") -# if AltFilePath is not None: -# logging.debug(f"Otherwise trying {altfile}") -# -# file_has_times = False -# if os.path.isfile(file): -# file_has_times = has_times(file,timespan) -# -# altfile_has_times = False -# if os.path.isfile(altfile): -# altfile_has_times = has_times(altfile,timespan) -# -# lastfile_has_times = False -# if lastfile is not None: -# if os.path.isfile(lastfile): -# lastfile_has_times = has_times(lastfile,timespan) -# -# lastaltfile_has_times = False -# if lastaltfile is not None: -# if os.path.isfile(lastaltfile): -# lastaltfile_has_times = has_times(lastaltfile,timespan) -# -# -# # 1st preference: use FileTemplate (e.g. analysis) -# if os.path.isfile(file) & file_has_times: -# -# files += [file] -# -# timespans += [timespan] -# -# lastfile = file -# -# # 2nd preference: continuation of first FileTemplate (e.g. analysis) -# elif os.path.isfile(lastfile) & lastfile_has_times: -# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") -# -# timespan = [lastInitialTime,nextDayTime] -# -# timespans[-1] = timespan -# -# # 3rd preference: use alternative FileTemplate (e.g. latest forecast) -# elif os.path.isfile(altfile) & altfile_has_times: -# -# files += [altfile] -# -# timespans += [timespan] -# -# lastaltfile = altfile -# -# # 4th preference: continuation of alternative FileTemplate (e.g. forecast) -# elif os.path.isfile(lastaltfile) & lastaltfile_has_times: -# -# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") -# -# timespan = [lastInitialTime,nextDayTime] -# -# timespans[-1] = timespan -# -# else: -# logging.error(f"No {component} path for day of {desiredTime}") -# raise Exception -# -# lastInitialTime = timespan[0] -# -# # Extend reach of final day, just in case the file exists. -# # Useful for the case of instantaneous met variables, where it does exist -# if is_met is True: -# timespans[-1][1] += datetime.timedelta(hours=3) -# -# logging.info('Each timespan and file is:') -# for timespan, filen in zip(timespans,files): logging.info(f"{timespan}: {filen}") -# -# return files, timespans -# -# if __name__ == '__main__': -# processor = ProcessorMetResample() -# processor.run_processor("MetResample") \ No newline at end of file diff --git a/scripts/run_MetResample_Processor.sh b/scripts/run_MetResample_Processor.sh deleted file mode 100755 index 4869136e024fb1a887b1071ddaa9b34d89d11da5..0000000000000000000000000000000000000000 --- a/scripts/run_MetResample_Processor.sh +++ /dev/null @@ -1,15 +0,0 @@ -##!/bin/bash -# -## get path of this script (to point to files within the same git repo) -#proc_path="$(dirname "$(readlink -f "$0")")" -# -#source "${proc_path}"/run_utils.sh -# -#setup_virtual_env -# -#python "${proc_path}"/../coordinator/extra/ProcessorMetResample.py "$@" -#exit_code=$? -# -#teardown_virtual_env -# -#exit $exit_code