diff --git a/coordinator/processor_met_resample.py b/coordinator/processor_met_resample.py index 81e8f4fcd481902f21ba07eba88b4c90fe5657de..213e3c6ff80f34d0a85e399ca85419b1d55fa695 100644 --- a/coordinator/processor_met_resample.py +++ b/coordinator/processor_met_resample.py @@ -8,477 +8,477 @@ operational system but will not be fully documented or tested. Expect it to be removed or superceded. """ -import copy -import datetime -import logging -import os -from zipfile import ZipFile, ZIP_DEFLATED - -from iris import load_cube -from pandas import DataFrame, infer_freq, read_csv, to_datetime - -from EpiModel.EpiPrep import lister, loader, prep -from EpiModel.EpiUtils import ( - datetime_range, - parse_template_string) -from coordinator.processor_base import ProcessorBase -from coordinator.utils.processor_utils import calc_epi_date_range, short_name, open_and_check_config - - -logger = logging.getLogger(__name__) - -class ProcessorMetResample(ProcessorBase): - - def process_pre_job(self, args) -> bool: - logger.debug('Performing process_pre_job()') - - # If it will work on a single forecast, get a dedicated download - #return process_pre_job_server_download(args) - # return query_past_successes(args, 'Environment') - return True - - def process_in_job(self, jobPath, status, configjson, component) -> object: - return self.process_in_job_met_resample(jobPath, status, configjson, component) - - - def process_post_job(self, jobPath, configjson): - pass - - def __init__(self) -> None: - super().__init__() - - def gather_data( - self, - config, - start_date, - end_date, - component='MetResample', - lister_kwargs={}): - """Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py - Originally from coordinator ProcessEpidemiology.py.""" - - # TODO: Replace calls to this function with calls to - # EpiModel.EpiPrep.prep.gather_dependent_models() - - # TODO: Simplify the set of required arguments . Check if config is necessary. - - # Use config-defined file lister - listerfunc = config[component]['FileListerFunction'] - if isinstance(listerfunc,str): - try: - # See if the function is available in this file - file_lister = globals().get(config[component]['FileListerFunction']) - except: - file_lister = getattr( - lister, - config[component]['FileListerFunction']) - elif callable(listerfunc): - file_lister = listerfunc - - file_loader = getattr( - loader, - config[component]['FileLoaderFunction']) - - config_for_lister = config.copy() - - #lister_kwargs['reference_date']=config['ReferenceTime'] - lister_kwargs['is_met']=config[component].get('is_met',False) - - loader_kwargs= {} - loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') - #loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative') - - file_of_origins = prep.prep_input(config_for_lister,start_date,end_date, - component=component, - file_lister=file_lister, - file_loader=file_loader, - lister_kwargs=lister_kwargs, - **loader_kwargs) - - assert os.path.isfile(config[component]['FileNamePrepared']) - - return file_of_origins - - def load_prepared_file(self,fn: str) -> DataFrame: - """Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py - """ - - df = read_csv( - fn, - header=[0,1], - index_col=0) - - df.index = to_datetime(df.index,format='%Y%m%d%H%M') - - return df - - - def resample_data(self,config,component,resample_scale='1H'): - """Increases time resolution of data files by filling in missing time - points with next available time point. i.e. assumes each time point is an - average over the period since the last time point, as is the case for NAME - output. - - Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py - """ - - - # load dataframe from config - file_name_prepared = config[component]['FileNamePrepared'] - - df_original = self.load_prepared_file(file_name_prepared) - - time_resolution = df_original.index[1] - df_original.index[0] - - input_freq = infer_freq(df_original.index) - - print(f"Resampling {component} from {input_freq} to {resample_scale}") - - # TODO: Work out how to get lister to extend the time range of first or last - # day for the instantaneous met data. - - # TODO: Check whether to reapply this for precipitation, the only field - # where the first time point may be missing. - # In order for backfill to work for the earliest time point, we need to - # temporarily provide a prior timepoint - #dfm1 = df_original.iloc[[0],:] - #dfm1.index = dfm1.index - time_resolution - #dfm1[:] = nan - #df_original = concat([dfm1,df_original]) - - resampler = df_original.resample(resample_scale) - - resample_method = config[component].get('resampling','backfill') - if resample_method == 'backfill': - print('Upsampling by backfilling') - df_out = resampler.bfill() - elif resample_method == 'interpolate': - print('Upsampling by interpolating linearly') - df_out = resampler.interpolate(method='linear') - else: - raise Exception('Unknown resampling method in config.') - - # remove the temporary prior timepoint - #df_out = df_out.drop(df_out.index[0]) - - # save the result - save_path = file_name_prepared.replace('.csv',f"_{resample_scale}.csv") - df_out.to_csv(save_path) - - return df_out, save_path - - def gather_and_resample( - self, - config_met, - reference_date_str, - calculation_span_days): - """Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" - - components = config_met['components'] - - start_time, end_time = calc_epi_date_range( - reference_date_str, - calculation_span_days) - - paths = [] - origins = [] - - for component in components: - - logger.info(f"Working on {component}") - - # parse the output file path - fnp = parse_template_string( - config_met[component]['FileNamePrepared'], - config_met, - f"{component} FileNamePrepared") - config_met[component]['FileNamePrepared'] = fnp - - logger.debug('Making the output directory') - output_dir = os.path.dirname(config_met[component]['FileNamePrepared']) - os.makedirs(output_dir,exist_ok=True) - - logger.debug('Gathering data') - - file_of_origins = self.gather_data( - copy.deepcopy(config_met), - start_time,#datetime.datetime(2023,6,7,3) - end_time,#datetime.datetime(2023,6,25,0), - component=component, - ) - - origins += [file_of_origins] - - logger.debug('Performing resampling') - - _, path = self.resample_data(copy.deepcopy(config_met), component) - - paths += [path] - - logger.info('Finished gather_and_resample()') - - return origins, paths - - - def process_in_job_met_resample(self, jobPath, status, config, component): - """ - """ - - logger.info('started process_in_job_met_resample()') - - # Initialise output variable - proc_out = {} - # Output files available for upload - proc_out['output'] = [] - # Processing files available for clearing - proc_out['clearup'] = [] - - # Obtain a dedicated copy of the latest forecast data - #data_result = get_data_from_server(jobPath,config,component) - # - #if data_result['output'] is not None: - # proc_out['output'] = data_result['output'] - # - #proc_out['clearup'] += data_result['clearup'] - - config_i = config[component].copy() - - config_i['jobPath'] = jobPath - - # provide top-level arguments to component's config - for k,v in config.items(): - if k not in short_name.keys(): - config_i[k]=v - - # Get run config - - # path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs, - # but on the extended EastAfrica grid that includes Zambia. - ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/' - - # set up the required inputs - #"Host" : { - # #"Bounds" : [38.4,7.8,40.2,8.8] # Roughly box around Arsi/East Shewa in Ethiopia - # #"Bounds" : [38.8,7.1,39.1,7.3] # Tiny box of 4 gridpoints in West Arsi in Ethiopia - # "Bounds" : [28.0,-15.0,29.0,-14.0] # 1deg x 1deg box in centre of Zambia wheat producing areas, north of Lusaka - #}, - - config_met_path = config[component].get('RunConfig') - config_met = open_and_check_config(config_met_path) - config_met['jobPath'] = jobPath - - # provide top-level arguments to component's config - for k,v in config.items(): - if k not in short_name.keys(): - config_met[k]=v - - logger.info('Calling gather_and_resample()') - - origins, paths_out = self.gather_and_resample( - config_met, - reference_date_str = config['StartString'], - calculation_span_days = config[component]['CalculationSpanDays'] - ) - - # zip the files that will be provided to collaborators - files_to_zip = origins + paths_out - - zip_path = f"{jobPath}/data_met_prep_{config['StartString']}.zip" - - with ZipFile(zip_path, 'w') as zipf: - - for file_to_zip in files_to_zip: - - filename_in_archive = os.path.basename(file_to_zip) - - zipf.write( - file_to_zip, - arcname=filename_in_archive, - compress_type=ZIP_DEFLATED) - - # 1H met files to upload - proc_out['output'] += [zip_path] - proc_out['clearup'] += files_to_zip - - return proc_out - -def has_times(cube_filename,desired_times,constraint=None): - """Checks the times listed in desired_times are within the date range - available in the single cube loaded from cube_filename. - Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" - - cube = load_cube(cube_filename,constraint=constraint) - - tc = cube.coord('time') - tc_dates = tc.units.num2date(tc.points) - - try: - for dt in desired_times: - # Check it is within range - assert dt >= tc_dates[0], f"desired_time {dt} is out of range of cube from {cube_filename}" - assert dt <= tc_dates[-1], f"desired_time {dt} is out of range of cube from {cube_filename}" - - # Check exact time is available - #assert sum([dt == tcdi for tcdi in tc_dates])==1 - return True - - except AssertionError: - return False - -def list_files_historical_daily( - config, - startTime, - endTime, - component, - is_met=False, - **kwargs): - '''Expecting time points are from multiple files, using the first day - of values at the date of that file. - If the files are calculated with forecast data, this means all env suit - is based on 1-day forecasts. - If the data are based on analysis, this keeps an even chunking. - One file for all time points in forecast. - - is_met (default False) is False for spore deposition, and True for met - variables extracted on a consistent grid (including _M_ met variables which - NAME has already performed some backfilling to provide timepoints available - for _I_ data but missing for _M_ data). For non-met variables, the first - available time point is assumed to be at 03:00 representing 00:01-03:00 - since they are typically averages of the time period since the timestamp of - the last datapoint. Met variables are generally instantaneous so the first - available datapoint is at 00:00. The start time is set by the startTime - parameter (i.e. start hour should be 3 for non-met data, and 0 for met - data). For the last day in the range, met data may also exist for - 00:00+1day. This parameter determines whether to look for that last - instantaneous time point. - - Returns list of files and timespan between startTime and endTime that each - file should be considered for. - - Based on a copy from - /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py - Copied from ews-epidemiology.EpiPrep.lister.py''' - - # TODO: Update EpiPrep.lister with this version - - config_copy = config.copy() - - desiredTimes = datetime_range(startTime,endTime+datetime.timedelta(seconds=1),days=1) - - files = [] - timespans = [] - lastfile = '' - lastaltfile = '' - for desiredTime in desiredTimes: - - initialTime = desiredTime # since startTime is 0300 , otherwise use + datetime.timedelta(seconds=1) - nextDayTime = desiredTime + datetime.timedelta(hours=23,minutes=59) - - timespan = [initialTime,nextDayTime] - - config_copy['DateString'] = desiredTime.strftime('%Y%m%d') - - # determine configuration's file path. Note the use of variables from - # first level of config - FilePath = parse_template_string( - config[component]['PathTemplate'], - config_copy, - f"{component} PathTemplate") - - AltFilePath = None - if 'PathTemplateAlternative' in config[component]: - AltFilePath = parse_template_string( - config[component]['PathTemplateAlternative'], - config_copy, - f"{component} PathTemplate") - - FileName = parse_template_string( - config[component]['FileNameTemplate'], - config_copy, - f"{component} FileNameTemplate") - - file = f"{FilePath}{FileName}" - altfile = f"{AltFilePath}{FileName}" - - logging.debug(f"Trying file {file}") - if AltFilePath is not None: - logging.debug(f"Otherwise trying {altfile}") - - file_has_times = False - if os.path.isfile(file): - file_has_times = has_times(file,timespan) - - altfile_has_times = False - if os.path.isfile(altfile): - altfile_has_times = has_times(altfile,timespan) - - lastfile_has_times = False - if lastfile is not None: - if os.path.isfile(lastfile): - lastfile_has_times = has_times(lastfile,timespan) - - lastaltfile_has_times = False - if lastaltfile is not None: - if os.path.isfile(lastaltfile): - lastaltfile_has_times = has_times(lastaltfile,timespan) - - - # 1st preference: use FileTemplate (e.g. analysis) - if os.path.isfile(file) & file_has_times: - - files += [file] - - timespans += [timespan] - - lastfile = file - - # 2nd preference: continuation of first FileTemplate (e.g. analysis) - elif os.path.isfile(lastfile) & lastfile_has_times: - logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") - - timespan = [lastInitialTime,nextDayTime] - - timespans[-1] = timespan - - # 3rd preference: use alternative FileTemplate (e.g. latest forecast) - elif os.path.isfile(altfile) & altfile_has_times: - - files += [altfile] - - timespans += [timespan] - - lastaltfile = altfile - - # 4th preference: continuation of alternative FileTemplate (e.g. forecast) - elif os.path.isfile(lastaltfile) & lastaltfile_has_times: - - logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") - - timespan = [lastInitialTime,nextDayTime] - - timespans[-1] = timespan - - else: - logging.error(f"No {component} path for day of {desiredTime}") - raise Exception - - lastInitialTime = timespan[0] - - # Extend reach of final day, just in case the file exists. - # Useful for the case of instantaneous met variables, where it does exist - if is_met is True: - timespans[-1][1] += datetime.timedelta(hours=3) - - logging.info('Each timespan and file is:') - for timespan, filen in zip(timespans,files): logging.info(f"{timespan}: {filen}") - - return files, timespans - -if __name__ == '__main__': - processor = ProcessorMetResample() - processor.run_processor("MetResample") \ No newline at end of file +# import copy +# import datetime +# import logging +# import os +# from zipfile import ZipFile, ZIP_DEFLATED +# +# from iris import load_cube +# from pandas import DataFrame, infer_freq, read_csv, to_datetime +# +# from EpiModel.EpiPrep import lister, loader, prep +# from EpiModel.EpiUtils import ( +# datetime_range, +# parse_template_string) +# from coordinator.processor_base import ProcessorBase +# from coordinator.utils.processor_utils import calc_epi_date_range, short_name, open_and_check_config +# +# +# logger = logging.getLogger(__name__) +# +# class ProcessorMetResample(ProcessorBase): +# +# def process_pre_job(self, args) -> bool: +# logger.debug('Performing process_pre_job()') +# +# # If it will work on a single forecast, get a dedicated download +# #return process_pre_job_server_download(args) +# # return query_past_successes(args, 'Environment') +# return True +# +# def process_in_job(self, jobPath, status, configjson, component) -> object: +# return self.process_in_job_met_resample(jobPath, status, configjson, component) +# +# +# def process_post_job(self, jobPath, configjson): +# pass +# +# def __init__(self) -> None: +# super().__init__() +# +# def gather_data( +# self, +# config, +# start_date, +# end_date, +# component='MetResample', +# lister_kwargs={}): +# """Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py +# Originally from coordinator ProcessEpidemiology.py.""" +# +# # TODO: Replace calls to this function with calls to +# # EpiModel.EpiPrep.prep.gather_dependent_models() +# +# # TODO: Simplify the set of required arguments . Check if config is necessary. +# +# # Use config-defined file lister +# listerfunc = config[component]['FileListerFunction'] +# if isinstance(listerfunc,str): +# try: +# # See if the function is available in this file +# file_lister = globals().get(config[component]['FileListerFunction']) +# except: +# file_lister = getattr( +# lister, +# config[component]['FileListerFunction']) +# elif callable(listerfunc): +# file_lister = listerfunc +# +# file_loader = getattr( +# loader, +# config[component]['FileLoaderFunction']) +# +# config_for_lister = config.copy() +# +# #lister_kwargs['reference_date']=config['ReferenceTime'] +# lister_kwargs['is_met']=config[component].get('is_met',False) +# +# loader_kwargs= {} +# loader_kwargs['VariableName']= config_for_lister[component].get('VariableName') +# #loader_kwargs['VariableNameAlternative']= config_for_lister['Deposition'].get('VariableNameAlternative') +# +# file_of_origins = prep.prep_input(config_for_lister,start_date,end_date, +# component=component, +# file_lister=file_lister, +# file_loader=file_loader, +# lister_kwargs=lister_kwargs, +# **loader_kwargs) +# +# assert os.path.isfile(config[component]['FileNamePrepared']) +# +# return file_of_origins +# +# def load_prepared_file(self,fn: str) -> DataFrame: +# """Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py +# """ +# +# df = read_csv( +# fn, +# header=[0,1], +# index_col=0) +# +# df.index = to_datetime(df.index,format='%Y%m%d%H%M') +# +# return df +# +# +# def resample_data(self,config,component,resample_scale='1H'): +# """Increases time resolution of data files by filling in missing time +# points with next available time point. i.e. assumes each time point is an +# average over the period since the last time point, as is the case for NAME +# output. +# +# Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py +# """ +# +# +# # load dataframe from config +# file_name_prepared = config[component]['FileNamePrepared'] +# +# df_original = self.load_prepared_file(file_name_prepared) +# +# time_resolution = df_original.index[1] - df_original.index[0] +# +# input_freq = infer_freq(df_original.index) +# +# print(f"Resampling {component} from {input_freq} to {resample_scale}") +# +# # TODO: Work out how to get lister to extend the time range of first or last +# # day for the instantaneous met data. +# +# # TODO: Check whether to reapply this for precipitation, the only field +# # where the first time point may be missing. +# # In order for backfill to work for the earliest time point, we need to +# # temporarily provide a prior timepoint +# #dfm1 = df_original.iloc[[0],:] +# #dfm1.index = dfm1.index - time_resolution +# #dfm1[:] = nan +# #df_original = concat([dfm1,df_original]) +# +# resampler = df_original.resample(resample_scale) +# +# resample_method = config[component].get('resampling','backfill') +# if resample_method == 'backfill': +# print('Upsampling by backfilling') +# df_out = resampler.bfill() +# elif resample_method == 'interpolate': +# print('Upsampling by interpolating linearly') +# df_out = resampler.interpolate(method='linear') +# else: +# raise Exception('Unknown resampling method in config.') +# +# # remove the temporary prior timepoint +# #df_out = df_out.drop(df_out.index[0]) +# +# # save the result +# save_path = file_name_prepared.replace('.csv',f"_{resample_scale}.csv") +# df_out.to_csv(save_path) +# +# return df_out, save_path +# +# def gather_and_resample( +# self, +# config_met, +# reference_date_str, +# calculation_span_days): +# """Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" +# +# components = config_met['components'] +# +# start_time, end_time = calc_epi_date_range( +# reference_date_str, +# calculation_span_days) +# +# paths = [] +# origins = [] +# +# for component in components: +# +# logger.info(f"Working on {component}") +# +# # parse the output file path +# fnp = parse_template_string( +# config_met[component]['FileNamePrepared'], +# config_met, +# f"{component} FileNamePrepared") +# config_met[component]['FileNamePrepared'] = fnp +# +# logger.debug('Making the output directory') +# output_dir = os.path.dirname(config_met[component]['FileNamePrepared']) +# os.makedirs(output_dir,exist_ok=True) +# +# logger.debug('Gathering data') +# +# file_of_origins = self.gather_data( +# copy.deepcopy(config_met), +# start_time,#datetime.datetime(2023,6,7,3) +# end_time,#datetime.datetime(2023,6,25,0), +# component=component, +# ) +# +# origins += [file_of_origins] +# +# logger.debug('Performing resampling') +# +# _, path = self.resample_data(copy.deepcopy(config_met), component) +# +# paths += [path] +# +# logger.info('Finished gather_and_resample()') +# +# return origins, paths +# +# +# def process_in_job_met_resample(self, jobPath, status, config, component): +# """ +# """ +# +# logger.info('started process_in_job_met_resample()') +# +# # Initialise output variable +# proc_out = {} +# # Output files available for upload +# proc_out['output'] = [] +# # Processing files available for clearing +# proc_out['clearup'] = [] +# +# # Obtain a dedicated copy of the latest forecast data +# #data_result = get_data_from_server(jobPath,config,component) +# # +# #if data_result['output'] is not None: +# # proc_out['output'] = data_result['output'] +# # +# #proc_out['clearup'] += data_result['clearup'] +# +# config_i = config[component].copy() +# +# config_i['jobPath'] = jobPath +# +# # provide top-level arguments to component's config +# for k,v in config.items(): +# if k not in short_name.keys(): +# config_i[k]=v +# +# # Get run config +# +# # path to some analysis met reruns provided by Will. These mirror the usual weekly EWS analysis jobs, +# # but on the extended EastAfrica grid that includes Zambia. +# ANJOBDIR2 = '${WorkspacePath2}/WR_EnvSuit_Met_Ethiopia_${DateString}/' +# +# # set up the required inputs +# #"Host" : { +# # #"Bounds" : [38.4,7.8,40.2,8.8] # Roughly box around Arsi/East Shewa in Ethiopia +# # #"Bounds" : [38.8,7.1,39.1,7.3] # Tiny box of 4 gridpoints in West Arsi in Ethiopia +# # "Bounds" : [28.0,-15.0,29.0,-14.0] # 1deg x 1deg box in centre of Zambia wheat producing areas, north of Lusaka +# #}, +# +# config_met_path = config[component].get('RunConfig') +# config_met = open_and_check_config(config_met_path) +# config_met['jobPath'] = jobPath +# +# # provide top-level arguments to component's config +# for k,v in config.items(): +# if k not in short_name.keys(): +# config_met[k]=v +# +# logger.info('Calling gather_and_resample()') +# +# origins, paths_out = self.gather_and_resample( +# config_met, +# reference_date_str = config['StartString'], +# calculation_span_days = config[component]['CalculationSpanDays'] +# ) +# +# # zip the files that will be provided to collaborators +# files_to_zip = origins + paths_out +# +# zip_path = f"{jobPath}/data_met_prep_{config['StartString']}.zip" +# +# with ZipFile(zip_path, 'w') as zipf: +# +# for file_to_zip in files_to_zip: +# +# filename_in_archive = os.path.basename(file_to_zip) +# +# zipf.write( +# file_to_zip, +# arcname=filename_in_archive, +# compress_type=ZIP_DEFLATED) +# +# # 1H met files to upload +# proc_out['output'] += [zip_path] +# proc_out['clearup'] += files_to_zip +# +# return proc_out +# +# def has_times(cube_filename,desired_times,constraint=None): +# """Checks the times listed in desired_times are within the date range +# available in the single cube loaded from cube_filename. +# Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py""" +# +# cube = load_cube(cube_filename,constraint=constraint) +# +# tc = cube.coord('time') +# tc_dates = tc.units.num2date(tc.points) +# +# try: +# for dt in desired_times: +# # Check it is within range +# assert dt >= tc_dates[0], f"desired_time {dt} is out of range of cube from {cube_filename}" +# assert dt <= tc_dates[-1], f"desired_time {dt} is out of range of cube from {cube_filename}" +# +# # Check exact time is available +# #assert sum([dt == tcdi for tcdi in tc_dates])==1 +# return True +# +# except AssertionError: +# return False +# +# def list_files_historical_daily( +# config, +# startTime, +# endTime, +# component, +# is_met=False, +# **kwargs): +# '''Expecting time points are from multiple files, using the first day +# of values at the date of that file. +# If the files are calculated with forecast data, this means all env suit +# is based on 1-day forecasts. +# If the data are based on analysis, this keeps an even chunking. +# One file for all time points in forecast. +# +# is_met (default False) is False for spore deposition, and True for met +# variables extracted on a consistent grid (including _M_ met variables which +# NAME has already performed some backfilling to provide timepoints available +# for _I_ data but missing for _M_ data). For non-met variables, the first +# available time point is assumed to be at 03:00 representing 00:01-03:00 +# since they are typically averages of the time period since the timestamp of +# the last datapoint. Met variables are generally instantaneous so the first +# available datapoint is at 00:00. The start time is set by the startTime +# parameter (i.e. start hour should be 3 for non-met data, and 0 for met +# data). For the last day in the range, met data may also exist for +# 00:00+1day. This parameter determines whether to look for that last +# instantaneous time point. +# +# Returns list of files and timespan between startTime and endTime that each +# file should be considered for. +# +# Based on a copy from +# /home/jws52/projects/SouthAsia/blast-fernandes-adaptation/code/prep_met.py +# Copied from ews-epidemiology.EpiPrep.lister.py''' +# +# # TODO: Update EpiPrep.lister with this version +# +# config_copy = config.copy() +# +# desiredTimes = datetime_range(startTime,endTime+datetime.timedelta(seconds=1),days=1) +# +# files = [] +# timespans = [] +# lastfile = '' +# lastaltfile = '' +# for desiredTime in desiredTimes: +# +# initialTime = desiredTime # since startTime is 0300 , otherwise use + datetime.timedelta(seconds=1) +# nextDayTime = desiredTime + datetime.timedelta(hours=23,minutes=59) +# +# timespan = [initialTime,nextDayTime] +# +# config_copy['DateString'] = desiredTime.strftime('%Y%m%d') +# +# # determine configuration's file path. Note the use of variables from +# # first level of config +# FilePath = parse_template_string( +# config[component]['PathTemplate'], +# config_copy, +# f"{component} PathTemplate") +# +# AltFilePath = None +# if 'PathTemplateAlternative' in config[component]: +# AltFilePath = parse_template_string( +# config[component]['PathTemplateAlternative'], +# config_copy, +# f"{component} PathTemplate") +# +# FileName = parse_template_string( +# config[component]['FileNameTemplate'], +# config_copy, +# f"{component} FileNameTemplate") +# +# file = f"{FilePath}{FileName}" +# altfile = f"{AltFilePath}{FileName}" +# +# logging.debug(f"Trying file {file}") +# if AltFilePath is not None: +# logging.debug(f"Otherwise trying {altfile}") +# +# file_has_times = False +# if os.path.isfile(file): +# file_has_times = has_times(file,timespan) +# +# altfile_has_times = False +# if os.path.isfile(altfile): +# altfile_has_times = has_times(altfile,timespan) +# +# lastfile_has_times = False +# if lastfile is not None: +# if os.path.isfile(lastfile): +# lastfile_has_times = has_times(lastfile,timespan) +# +# lastaltfile_has_times = False +# if lastaltfile is not None: +# if os.path.isfile(lastaltfile): +# lastaltfile_has_times = has_times(lastaltfile,timespan) +# +# +# # 1st preference: use FileTemplate (e.g. analysis) +# if os.path.isfile(file) & file_has_times: +# +# files += [file] +# +# timespans += [timespan] +# +# lastfile = file +# +# # 2nd preference: continuation of first FileTemplate (e.g. analysis) +# elif os.path.isfile(lastfile) & lastfile_has_times: +# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") +# +# timespan = [lastInitialTime,nextDayTime] +# +# timespans[-1] = timespan +# +# # 3rd preference: use alternative FileTemplate (e.g. latest forecast) +# elif os.path.isfile(altfile) & altfile_has_times: +# +# files += [altfile] +# +# timespans += [timespan] +# +# lastaltfile = altfile +# +# # 4th preference: continuation of alternative FileTemplate (e.g. forecast) +# elif os.path.isfile(lastaltfile) & lastaltfile_has_times: +# +# logging.warning(f"No {component} path for day of {desiredTime}, extending timespan of previous file") +# +# timespan = [lastInitialTime,nextDayTime] +# +# timespans[-1] = timespan +# +# else: +# logging.error(f"No {component} path for day of {desiredTime}") +# raise Exception +# +# lastInitialTime = timespan[0] +# +# # Extend reach of final day, just in case the file exists. +# # Useful for the case of instantaneous met variables, where it does exist +# if is_met is True: +# timespans[-1][1] += datetime.timedelta(hours=3) +# +# logging.info('Each timespan and file is:') +# for timespan, filen in zip(timespans,files): logging.info(f"{timespan}: {filen}") +# +# return files, timespans +# +# if __name__ == '__main__': +# processor = ProcessorMetResample() +# processor.run_processor("MetResample") \ No newline at end of file diff --git a/scripts/run_Advisory_Processor.sh b/scripts/run_Advisory_Processor.sh index 8113b725af8f24a8540539c9376f607b6dac9498..d34a1fe0f544bed95f3c5ef7dc4cef31d3cdf539 100755 --- a/scripts/run_Advisory_Processor.sh +++ b/scripts/run_Advisory_Processor.sh @@ -5,11 +5,11 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env python "${proc_path}"/../coordinator/ProcessorAdvisory.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_Deposition_Processor.sh b/scripts/run_Deposition_Processor.sh index bccfa0eb172c1e6e2300b80a01519b1c9b821a4e..67c4b7aa8ad4914fa07b5bec9e7f7e6c012048b2 100755 --- a/scripts/run_Deposition_Processor.sh +++ b/scripts/run_Deposition_Processor.sh @@ -5,13 +5,13 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env echo "Running ProcessorDeposition.py with path: " pwd; python "${proc_path}"/../coordinator/ProcessorDeposition.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_Environment_Processor.sh b/scripts/run_Environment_Processor.sh index 6e17d0b004a3fb499db8ae239dea43f7243075bc..cb7046c7da9d8d3f20de3f3ffb57076997f09d27 100755 --- a/scripts/run_Environment_Processor.sh +++ b/scripts/run_Environment_Processor.sh @@ -5,11 +5,11 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env python "${proc_path}"/../coordinator/ProcessorEnvironment.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_Epidemiology_Processor.sh b/scripts/run_Epidemiology_Processor.sh index 1ee5570ffca53453dd9cfedddfc2a18c293a8cd4..3606c5bcd6b0c8e07d6c64ddc97048ce6849c3d2 100755 --- a/scripts/run_Epidemiology_Processor.sh +++ b/scripts/run_Epidemiology_Processor.sh @@ -5,11 +5,11 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env python "${proc_path}"/../coordinator/ProcessorEpidemiology.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_MetResample_Processor.sh b/scripts/run_MetResample_Processor.sh index 574e36cbf9aabb37a4637671b946075c6fa9383b..4869136e024fb1a887b1071ddaa9b34d89d11da5 100755 --- a/scripts/run_MetResample_Processor.sh +++ b/scripts/run_MetResample_Processor.sh @@ -1,15 +1,15 @@ -#!/bin/bash - -# get path of this script (to point to files within the same git repo) -proc_path="$(dirname "$(readlink -f "$0")")" - -source "${proc_path}"/run_utils.sh - -setup_conda_env - -python "${proc_path}"/../coordinator/extra/ProcessorMetResample.py "$@" -exit_code=$? - -teardown_conda_env - -exit $exit_code +##!/bin/bash +# +## get path of this script (to point to files within the same git repo) +#proc_path="$(dirname "$(readlink -f "$0")")" +# +#source "${proc_path}"/run_utils.sh +# +#setup_virtual_env +# +#python "${proc_path}"/../coordinator/extra/ProcessorMetResample.py "$@" +#exit_code=$? +# +#teardown_virtual_env +# +#exit $exit_code diff --git a/scripts/run_Processor.sh b/scripts/run_Processor.sh index 8e1fd2fd68a2221845a664294a8097460d2d17db..95f4ee65d33543af69ed681bafab09ad2b6489b7 100755 --- a/scripts/run_Processor.sh +++ b/scripts/run_Processor.sh @@ -1,89 +1,91 @@ -#!/bin/bash - -original_args=("$@") - -SHORT=p:h -OPTS=$(getopt -a --options $SHORT -- "$@") -echo $OPTS -eval set -- "$OPTS" - -while : -do - case "$1" in - -p ) - component="$2" - shift 2 - ;; - -h | --help) - "Runs the appropriate Processor component using the -p flag, all other args are passed through" - exit 2 - ;; - --) - shift; - break - ;; - *) - echo "Unexpected option: $1" - ;; - esac -done - -printf "component is %s" "$component" - -if [ "$component" = "Environment" ]; then - processor_class="ProcessorEnvironment.py" -elif [ "$component" = "Deposition" ];then - processor_class="ProcessorDeposition.py" -elif [ "$component" = "Survey" ];then - processor_class="ProcessorSurveys.py" -elif [ "$component" = "Advisory" ];then - processor_class="ProcessorAdvisory.py" -elif [ "$component" = "Scraper" ];then - processor_class="ProcessorScraper.py" -elif [ "$component" = "Epidemiology" ];then - processor_class="ProcessorEpidemiology.py" -else - printf "component '%s' not recognised" "$component" -fi - - -# directory containing all environment -envs=/storage/app/EWS_prod/envs - -# directory containing all custom python packages -bin=/storage/app/EWS_prod/code/ - -# provide custom python packages so they can be imported -flagdir=${bin}/flagdir/ -epimodel=${bin}/epimodel/ -advisory=${bin}/advisory_builder/ -met_processing=${bin}/met_extractor_v2/ -met_processor=${bin}/environmental_suitability/ -plotting=${bin}/plotting/ -post_processing=${bin}/post_processing/ -source_gen=${bin}/source_gen/ - -export PYTHONPATH=$PYTHONPATH:$flagdir:$epimodel:$advisory:$met_processing:$met_processor:$plotting:$source_gen:$post_processing - -# provide path to email credentials for logging - -export EMAIL_CRED=${envs}/credentials/Cred_gmail.json - -# activate conda environment of python modules so they can be imported -#TODO: Move conda_env from bin to envs -conda_env=${envs}/conda/py3EWS -source /storage/app/miniconda3/bin/activate ${conda_env} - -# get path of this script (to point to files within the same git repo) -proc_path="$( dirname "$(readlink -f "$0" )" )" - -# run the processor with all arguments -processor=${proc_path}/../coordinator/${processor_class} -printf "processor is %s\n\n" "$processor" -python "${processor}" "${original_args[@]}" -exit_code=$?; - -# deactivate conda environment -source /storage/app/miniconda3/bin/deactivate ${conda_env} - -exit $exit_code; +#REDUNDANT NOW WE CALL PROCESSOR-SPECIFIC SCRIPTS TO RUN PROCESSORS + +##!/bin/bash +# +#original_args=("$@") +# +#SHORT=p:h +#OPTS=$(getopt -a --options $SHORT -- "$@") +#echo $OPTS +#eval set -- "$OPTS" +# +#while : +#do +# case "$1" in +# -p ) +# component="$2" +# shift 2 +# ;; +# -h | --help) +# "Runs the appropriate Processor component using the -p flag, all other args are passed through" +# exit 2 +# ;; +# --) +# shift; +# break +# ;; +# *) +# echo "Unexpected option: $1" +# ;; +# esac +#done +# +#printf "component is %s" "$component" +# +#if [ "$component" = "Environment" ]; then +# processor_class="ProcessorEnvironment.py" +#elif [ "$component" = "Deposition" ];then +# processor_class="ProcessorDeposition.py" +#elif [ "$component" = "Survey" ];then +# processor_class="ProcessorSurveys.py" +#elif [ "$component" = "Advisory" ];then +# processor_class="ProcessorAdvisory.py" +#elif [ "$component" = "Scraper" ];then +# processor_class="ProcessorScraper.py" +#elif [ "$component" = "Epidemiology" ];then +# processor_class="ProcessorEpidemiology.py" +#else +# printf "component '%s' not recognised" "$component" +#fi +# +# +## directory containing all environment +#envs=/storage/app/EWS_prod/envs +# +## directory containing all custom python packages +#bin=/storage/app/EWS_prod/code/ +# +## provide custom python packages so they can be imported +#flagdir=${bin}/flagdir/ +#epimodel=${bin}/epimodel/ +#advisory=${bin}/advisory_builder/ +#met_processing=${bin}/met_extractor_v2/ +#met_processor=${bin}/environmental_suitability/ +#plotting=${bin}/plotting/ +#post_processing=${bin}/post_processing/ +#source_gen=${bin}/source_gen/ +# +#export PYTHONPATH=$PYTHONPATH:$flagdir:$epimodel:$advisory:$met_processing:$met_processor:$plotting:$source_gen:$post_processing +# +## provide path to email credentials for logging +# +#export EMAIL_CRED=${envs}/credentials/Cred_gmail.json +# +## activate conda environment of python modules so they can be imported +##TODO: Move conda_env from bin to envs +#conda_env=${envs}/conda/py3EWS +#source /storage/app/miniconda3/bin/activate ${conda_env} +# +## get path of this script (to point to files within the same git repo) +#proc_path="$( dirname "$(readlink -f "$0" )" )" +# +## run the processor with all arguments +#processor=${proc_path}/../coordinator/${processor_class} +#printf "processor is %s\n\n" "$processor" +#python "${processor}" "${original_args[@]}" +#exit_code=$?; +# +## deactivate conda environment +#source /storage/app/miniconda3/bin/deactivate ${conda_env} +# +#exit $exit_code; diff --git a/scripts/run_Scraper_Processor.sh b/scripts/run_Scraper_Processor.sh index b9c17950c2722a7e6c210e933a1db92c86bf873f..631b288c885a41b1618f3596775b0c09ff132471 100755 --- a/scripts/run_Scraper_Processor.sh +++ b/scripts/run_Scraper_Processor.sh @@ -5,11 +5,11 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env python "${proc_path}"/../coordinator/ProcessorScraper.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_Survey_Processor.sh b/scripts/run_Survey_Processor.sh index f616bdaa69afd3990fcb92182bf186dcdac25fd8..628d66f510ca05c227f9c1a8895186aa970ddde4 100755 --- a/scripts/run_Survey_Processor.sh +++ b/scripts/run_Survey_Processor.sh @@ -5,11 +5,11 @@ proc_path="$(dirname "$(readlink -f "$0")")" source "${proc_path}"/run_utils.sh -setup_conda_env +setup_virtual_env python "${proc_path}"/../coordinator/ProcessorSurveys.py "$@" exit_code=$? -teardown_conda_env +teardown_virtual_env exit $exit_code diff --git a/scripts/run_utils.sh b/scripts/run_utils.sh index f1d875e43ff8a1776f8be101d5d64e717fdc651f..15c2adf3a15dcc3d4643445b22c0ca4fcbb4367a 100755 --- a/scripts/run_utils.sh +++ b/scripts/run_utils.sh @@ -1,6 +1,6 @@ #!/bin/bash -function setup_conda_env() { +function setup_virtual_env() { # directory containing all environment envs=/storage/app/EWS_prod/envs @@ -30,9 +30,8 @@ function setup_conda_env() { source ${venv}/ews_prod/bin/activate; } -function teardown_conda_env() { +function teardown_virtual_env() { # deactivate conda environment deactivate; - }