diff --git a/coordinator/ProcessorAdvisory.py b/coordinator/ProcessorAdvisory.py index 1a9bd51f4c32f52e19917b3c140b29c3f548824c..8815f3e0cd9707f01154b03833f7bc5917cb9e2e 100644 --- a/coordinator/ProcessorAdvisory.py +++ b/coordinator/ProcessorAdvisory.py @@ -2,19 +2,26 @@ '''Functions to process the advisory component.''' import logging +from typing import List # gitlab projects # TODO: Package these projects so they are robust for importing from AdvisoryBuilder import DataGatherer # created by jws52 from Processor import Processor -from ProcessorUtils import add_filters_to_sublogger, short_name +from ProcessorUtils import ( + add_filters_to_sublogger, + endScript, + open_and_check_config, + query_past_successes, + short_name +) class ProcessorAdvisory(Processor): def process_pre_job(self, args): - return True + return self.process_pre_job_advisory(args) def process_in_job(self, jobPath, status, configjson, component) -> object: @@ -30,6 +37,37 @@ class ProcessorAdvisory(Processor): logger = logging.getLogger('Processor.Advisory') add_filters_to_sublogger(logger) + def process_pre_job_advisory(self,input_args: dict): + + self.logger.info('started process_pre_job_advisory()') + + # check configs can be loaded + config_fns: List[str] = input_args['config_paths'] + for configFile in config_fns: + try: + config_i = open_and_check_config(configFile) + except: + self.logger.exception(f"Failure in opening or checking config {configFile}") + endScript(premature=True) + + # check pre-requisite jobs are complete + + # Which jobs to check are defined by the config. It may include epi + # and surveys. + + # Note that yesterday's surveys should also have succeeded, but that + # needs a YesterdayDateString available for the survey part of + # advisory config to point to the status file. + + dependent_components = config_i['Advisory'].get( + 'DependentComponents', + ['Deposition','Environment']) + + for dependent_component in dependent_components: + + query_past_successes(input_args, dependent_component) + + return True def process_in_job_advisory(self, jobPath, status, config, component): '''Generates a word processor file containing some basic survey statistics diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 746bfbcd1203527652d61a237639072cf18be603..7044dd8439856141b432b196460edac235c739b9 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -18,7 +18,7 @@ from rasterio import open as rio_open # TODO: Package these projects so they are robust for importing from EpiModel import ( # created by rs481 EpiAnalysis, - EpiModel, + model, plotRaster ) from EpiModel.EpiPrep import prep @@ -97,7 +97,9 @@ class ProcessorEpidemiology(Processor): self.logger.info('started process_pre_job_epi()') # check pre-requisite jobs are complete - query_past_successes(input_args) + query_past_successes(input_args, 'Deposition') + + query_past_successes(input_args, 'Environment') config_fns: List[str] = input_args['config_paths'] @@ -560,7 +562,7 @@ class ProcessorEpidemiology(Processor): try: - EpiModel.run_epi_model(f"{case_specific_path}/{config_filename}.json") + model.run_epi_model(f"{case_specific_path}/{config_filename}.json") except: self.logger.exception('Unexpected error in EpiModel') raise diff --git a/coordinator/ProcessorUtils.py b/coordinator/ProcessorUtils.py index cb34994656511af7b04d821c1bfc6f2d88ea4dd4..8e026793b4f634c93502ba7cf05c77d6a7d65753 100644 --- a/coordinator/ProcessorUtils.py +++ b/coordinator/ProcessorUtils.py @@ -13,6 +13,10 @@ import subprocess import sys import tarfile from typing import List +import netCDF4 as nc +import numpy as np +import pandas as pd +import cf_units from flagdir import jobStatus # created by jws52 @@ -265,26 +269,29 @@ def query_component_success(config_i,job_run: str, job_to_check: str): it raises an error.''' # check if deposition data is readily available - dep_success_file = Template(config_i[job_run][job_to_check]['SuccessFileTemplate']).substitute(**config_i) + status_file = Template(config_i[job_run][job_to_check]['SuccessFileTemplate']).substitute(**config_i) try: - query_proceed(dep_success_file,'deposition') + query_proceed(status_file,'deposition') except: if 'AlternativeSuccessFileTemplate' not in config_i[job_run][job_to_check]: logger.warning(f"No AlternativeSuccessFileTemplate to check for") endScript(premature=True) - dep_success_file_alt = Template(config_i[job_run][job_to_check]['AlternativeSuccessFileTemplate']).substitute(**config_i) - query_proceed(dep_success_file_alt,job_to_check.lower()) + status_file_alt = Template(config_i[job_run][job_to_check]['AlternativeSuccessFileTemplate']).substitute(**config_i) + query_proceed(status_file_alt,job_to_check.lower()) return True -def query_past_successes(input_args: dict): +def query_past_successes(input_args: dict, + component_to_query: str = 'Deposition'): '''Checks if deposition and environment jobs are already completed successfully. If not, it raises an error.''' component: str = input_args['component'] + assert component_to_query in ['Deposition','Environment','Epidemiology'] + # check configs can be loaded config_fns: List[str] = input_args['config_paths'] for configFile in config_fns: @@ -297,10 +304,173 @@ def query_past_successes(input_args: dict): # some config initialisation is necessary config_i['StartString'] = input_args['start_date'] - # check if deposition data is readily available - query_component_success(config_i,component,'Deposition') + # check if dependent job is readily available + query_component_success(config_i,component,component_to_query) + + return True + +def dataframe_to_series(df: pd.DataFrame) -> pd.Series: + """Reformate a pandas Dataframe into a pandas Series. + + The dataframe should have the following structure: + - first two columns: longitude and latitude + - following columns: timestamps in YYYYMMDDhhmm format (e.g. 202311211500) + + The resulting Series will have the following structure: + - indexes: time, longitude, latitude.""" + + # longitude and latitude as index + df=df.set_index(["longitude","latitude"]) + + # convert dataframe to Series by stacking + s=df.stack() + + # rename indexes + s.index.names=["longitude","latitude","time"] + + # reorder index + s=s.reorder_levels(["time","latitude","longitude"]) + + # sort by timestamps + s=s.sort_index(level=0) + logger.info("Dataframe reformated to Series") + + return s - # check if environment data is readily available - query_component_success(config_i,component,'Environment') +def create_bounds(coord): + """Create bounds for a coordinate based on the difference between two adjacent values. + Works for non equidistant coordinates as well.""" + + # calculate the difference between two adjacent values and divide by 2 + diff = np.diff(coord)/2 + + # insert the first and last value at the beginning and end of the array + diff_lower = np.insert(diff, 0, diff[0]) + diff_upper = np.append(diff, diff[-1]) + + # bounds are the coordinate values minus/plus the difference + bounds = np.zeros((len(coord), 2)) + bounds[:, 0] = coord - diff_lower + bounds[:, 1] = coord + diff_upper + + return bounds + +def check_cf(units: str) -> str: + """Check if units are CF convention compliant.""" + try: cf_units.Unit(units) + except: raise ValueError(f"Units \"{units}\" not CF convention compliant!") + return units + +def create_nc_file(s: pd.Series, filename: str, data_attr: dict, compression_type="zlib", complevel=9, least_significant_digit=None): + """Create a netcdf file from a pandas Series. + + The Series should have the following indexes: + - time: str + - latitude: float + - longitude: float + + Path to the netcdf file should be filename: str + + Dictionary data_attr should have the following structure: + { "short_name": str, + "long_name": str + "units": str, + "description": str} + + Optional arguments: + - compression_type: str, default="zlib" + - least_significant_digit: int, default=5 + """ + ## Get coordinates ######################### + times=s.index.levels[0].values + # convert times to hours since 1970-01-01 00:00:00 + times=pd.to_datetime(times).astype(np.int64)//((10**9)*60*60) + + latitudes=s.index.levels[1].unique().values + longitudes=s.index.levels[2].unique().values + + ## Create bounds ########################### + lat_bnds=create_bounds(latitudes) + lon_bnds=create_bounds(longitudes) + + ## Create netcdf file ###################### + logger.info("Creating netcdf file") + # just to be safe, make sure dataset is not already open. + try: nc.Dataset(filename,"w",format="NETCDF4").close() + except: pass + + ncFile=nc.Dataset(filename,"w",format="NETCDF4") + + # create dimensions + ncFile.createDimension("time", None) + ncFile.createDimension("bnds",2) + ncFile.createDimension("latitude",len(latitudes)) + ncFile.createDimension("longitude",len(longitudes)) + + # create global attributes + ncFile.description=data_attr["description"] + ncFile.convension="CF-1.10" + + # create variables + time=ncFile.createVariable("time",np.int32,("time",)) + time.axis="T" + time.units="hours since 1970-01-01 00:00:00" + time.long_name="time" + time.calendar="gregorian" + + latitude_bnds=ncFile.createVariable("latitude_bnds",np.float32,("latitude","bnds",)) + latitude_bnds.long_name="latitude boundaries" + latitude_bnds.units="degrees_north" + + latitude=ncFile.createVariable("latitude",np.float32,("latitude",)) + latitude.axis="Y" + latitude.bounds="latitude_bnds" + latitude.units="degrees_north" + latitude.long_name="latitude" + latitude.standard_name="grid_latitude" + latitude.valid_min=latitudes.min().astype(np.float32) + latitude.valid_max=latitudes.max().astype(np.float32) + + longitude_bnds=ncFile.createVariable("longitude_bnds",np.float32,("longitude","bnds",)) + longitude_bnds.long_name="longitude boundaries" + longitude_bnds.units="degrees_east" + + longitude=ncFile.createVariable("longitude",np.float32,("longitude",)) + longitude.axis="X" + longitude.bounds="longitude_bnds" + longitude.units="degrees_east" + longitude.long_name="longitude" + longitude.standard_name="grid_longitude" + longitude.valid_min=longitudes.min().astype(np.float32) + longitude.valid_max=longitudes.max().astype(np.float32) + + latitude_longitude=ncFile.createVariable("latitude_longitude",np.int32) + latitude_longitude.grid_mapping_name="latitude_longitude" + latitude_longitude.longitude_of_prime_meridian=0.0 + latitude_longitude.earth_radius=6371229.0 + + + # create DATA variables + logger.info("Creating data variable") + DATA=ncFile.createVariable(data_attr["short_name"],np.float32,("time","latitude","longitude",), compression=compression_type, complevel=complevel, least_significant_digit=least_significant_digit) + DATA.short_name=data_attr["short_name"] + DATA.long_name=data_attr["long_name"] + DATA.units=check_cf(data_attr["units"]) + DATA.description=data_attr["description"] + DATA.grid_mapping="latitude_longitude" + DATA.coordinates="time latitude longitude" + + # write data to variables + time[:]=times + latitude_bnds[:]=lat_bnds + latitude[:]=latitudes + longitude_bnds[:]=lon_bnds + longitude[:]=longitudes + + # map data to DATA variable + DATA[:]=s.values.reshape(len(times),len(latitudes),len(longitudes)) + + ncFile.close() + logger.info(f"{filename} netCDF4 file created.") return True \ No newline at end of file