diff --git a/coordinator/ProcessorEpidemiology.py b/coordinator/ProcessorEpidemiology.py index 7b1cb2d819ae1d74ffc8fb80d55625fbfe89d0be..23f193b8bdec60a2e096e6355425bec507cdc381 100644 --- a/coordinator/ProcessorEpidemiology.py +++ b/coordinator/ProcessorEpidemiology.py @@ -5,13 +5,13 @@ import datetime from glob import glob import json import logging -from pathlib import Path import os +from pathlib import Path import shutil -from typing import List +from typing import List, Union -from numpy import all, any, argmax, unique, where -from pandas import read_csv, DataFrame, to_datetime +from numpy import all, any, argmax, unique, allclose +from pandas import MultiIndex, read_csv, DataFrame, to_datetime from rasterio import open as rio_open # gitlab projects @@ -142,12 +142,24 @@ def create_epi_config_string(config,jobPath,startString,endString): return config_filename +def are_indices_close(idx1: MultiIndex, idx2: MultiIndex, atol=2.51e-6) -> bool: + """An absolute tolerance of 2.51e-6 relates to differences between the + grid's of NAME vn7.2 output and met-extractor in 2022.""" + + assert idx1.nlevels == idx2.nlevels + num_levels = idx1.nlevels + + # a stricter check is idx_i.equals(idx_0) + + levels_close = [] + for i in range(num_levels): + close_i = allclose(idx1.get_level_values(i),idx2.get_level_values(i),atol=atol,rtol=0) + levels_close += [close_i] + + return all(levels_close) -def raster_to_csv(raster_fn,csv_fn): +def raster_to_series(raster_fn): - # create a csv version and save in the job directory, - # to compare host raster with dep and env suit - # note this can be time-varying by providing additional rows with rio_open(raster_fn,'r') as host_raster: host_arr = host_raster.read(1) shape = host_raster.shape @@ -161,16 +173,51 @@ def raster_to_csv(raster_fn,csv_fn): # build into a dataframe # (rasters start in the top left, so descending latitude coordinates) host_df = DataFrame(data=host_arr,index=lats[::-1],columns=lons) + host_df.index.name = 'latitude' + host_df.columns.name = 'longitude' # rearrange to ascending latitude corodinates host_df.sort_index(axis='rows',inplace=True) # make spatial coordinates a multi-index, like for dep and env suit csvs host_series = host_df.stack() - # for now, provide a nominal date of validity to enable a time column - # so far, using mapspam which is a static map, so time is irrelevant - host_series.name = '201908150000' - host_df2 = DataFrame(host_series).T - host_df2.to_csv(csv_fn) + return host_series + +def rasters_to_csv( + raster_fns_dict: dict, + csv_fn: str, + ): + """Takes a dictionary of raster files with associated times and saves them + as rows of a single csv. The csv columns and index structure matches model + outputs as expected by the epi model. Used to prepare the host data.""" + + host_serieses = [] + first = True + for date_valid_from, raster_fn in raster_fns_dict.items(): + + host_series = raster_to_series(raster_fn) + + # for now, provide a nominal date of validity to enable a time column + # so far, using mapspam which is a static map, so time is irrelevant + host_series.name = date_valid_from + + # conform indices (handle float differences) + if first: + idx_0 = host_series.index + + if not first: + idx_i = host_series.index + + indices_are_close = are_indices_close(idx_0,idx_i) + assert indices_are_close, (f"Coordinates of host_rasters do not match.\nFailed for {raster_fn}.") + host_series.index = idx_0 + + first = False + + host_serieses += [host_series] + + host_df = DataFrame(host_serieses) + + host_df.to_csv(csv_fn) return @@ -182,55 +229,63 @@ def get_model_divided_by_host_fraction( """when model_infection pressure has units of [ha_infected/ha_cell] we want [ha_infected/ha_wheat] to compare with surveys (because surveys only sample the wheat covered landscape) - so we must load the host raster and divide all model results by it.""" + so we must load the host raster and divide all model results by it. + + TODO: Instead of doing as post-processing in coordinator, best do it within + the ews-epidemiology package. + """ print('Converting units of prediction from ha_infected/ha_cell to ha_infect/ha_wheat') # load host raster host_fn = hostCSV host_df = read_csv(host_fn,index_col=0,header=[0,1]) - num_host_dates = len(host_df.index) - if num_host_dates == 1: - # parse static host map - index_name = host_df.index[0] - host_df.rename(index={index_name:'host_frac'},inplace=True) - - else: - # TODO: parse dynamic host map - raise Exception - - # conform the structure with infection dataframe + host_df.columns = host_df.columns.set_levels([lvl.astype('float') for lvl in host_df.columns.levels]) + host_df.index = to_datetime(host_df.index,format='%Y%m%d%H%M') host_df = host_df.T - host_df['latitude'] = host_df.index.get_level_values(0).astype('float') - host_df['longitude'] = host_df.index.get_level_values(1).astype('float') - host_df = host_df.sort_values(by=['longitude','latitude'],ascending=[True,False],axis='rows') - dfm2 = dfm.merge(host_df,how='left',on=['longitude','latitude']) - - if model_colns is None: - model_colns = [coln for coln in dfm2 if ('model_' in coln) & (coln not in ['model_date','model_date_offset'])] - - print('Applying unit conversion to the following columns:\n'+'\n'.join(model_colns)) - - # loop over each column that needs converting - for coln in model_colns: - - # ensure prediction is zero wherever there is no host - #assert all(dfm2[dfm2['host_frac']==0][coln]==0.) - # divide model results by host raster in-place - # use np where to handle cases where denominator is zero - dfm2[coln] = where(dfm2['host_frac']==0,dfm2['host_frac'],dfm2[coln] / dfm2['host_frac']) - - # check for anomalously large values - where_too_big = dfm2[coln] > 1.00001 - if any(where_too_big): - msg = 'ERROR: Unit conversion failed, host area seems to be smaller than predicted infection area in a cell' - print(msg) - raise Exception - - # clip any values that are above 1 - dfm2[coln].clip(0.,1.,inplace=True) + + # conform the structure with infection dataframe + + # conform indices (coordinates) + host_df.index = host_df.index.reorder_levels(['longitude','latitude']) + host_df.sort_index(level=['longitude','latitude'],ascending=[True,False],inplace=True) + + indices_are_close = are_indices_close(host_df.index,dfm.index) + assert indices_are_close, ('Coordinates of model grid do not match host map.') + host_df.index = dfm.index + + # conform columns (dates) + column_end_dates = dfm.columns.map(lambda x: x[-12:]) + model_dates = to_datetime(column_end_dates, format='%Y%m%d%H%M') - datetime.timedelta(days=1) + dfm2 = dfm.copy() + dfm2.columns = model_dates + + # Set a host value for every model date, based forward-filling dated map to + # next available date + host_df_resampled = host_df.reindex(dfm2.columns,axis='columns',method='ffill') + assert not host_df_resampled.isna().any().any(), ('Dates of host rasters do not cover all model dates') + + # new approach, take advantage of pandas broadcasting + print('Applying unit conversion to all columns in output') + dfm3 = dfm2.divide(host_df_resampled) + # Handle cases of zero division + dfm3[host_df_resampled<=0]=0 + + # check for anomalously large values + where_too_big = dfm3 > 1.00001 + if any(where_too_big): + msg = 'ERROR: Unit conversion failed, host area seems to be smaller than predicted infection area in a cell' + print(msg) + raise Exception + + # clip any values that are above 1 + # (Expect this is not needed, but may help resolve float precision issues) + dfm3.clip(0.,1.,inplace=True) - return dfm2 + # Retain original column names + dfm3.columns = dfm.columns + + return dfm3 def process_in_job_epi(jobPath,status,config,component): logger.info('started process_in_job_epi()') @@ -425,22 +480,49 @@ def process_in_job_epi(jobPath,status,config,component): # prepare a copy of the host data logger.info('Preparing a copy of the host raster data') + + # TargetRaster defines the grid that the epi model works on. + assert 'TargetRaster' in config_epi['Host'] + + # It should have been generated in advance by the user, by reprojecting + # the available host map (e.g. MapSPAM) to the NAME output grid. + # wheat_raster_reprojection.py is available to support this. - src_host = config_epi['Host']['HostRaster'] - fn_host = os.path.basename(src_host) - dst_host = f"{jobDataPath}/{fn_host}" + if 'HostRasters' in config_epi['Host']: + # HostRasters is a dictionary with date: filename entries describing + # different host rasters valid at different times i.e. a simple + # representation of dynamic host, so prepare a host file as is done + # for the Deposition and Environment components. - # copy the tif to the job directory and refer to that instead - shutil.copyfile(src_host,dst_host) - config_epi['Host']['HostRaster'] = dst_host + # All host maps should have the same spatial grid as the TargetRaster - logger.info('Preparing a copy of the host data as csv') + rasters_dict = config_epi['Host']['HostRasters'] - dst_host_csv = dst_host.replace('.tif','.csv') + dst_host_csv = f"{jobDataPath}/data_input_host.csv" + + rasters_to_csv(rasters_dict,dst_host_csv) + + else: + # There is a host raster applicable to all times, i.e. static host - raster_to_csv(dst_host,dst_host_csv) + src_host = config_epi['Host']['TargetRaster'] + fn_host = os.path.basename(src_host) + dst_host = f"{jobDataPath}/{fn_host}" + + # copy the tif to the job directory and refer to that instead + shutil.copyfile(src_host,dst_host) + config_epi['Host']['TargetRaster'] = dst_host + + logger.info('Preparing a copy of the host data as csv') + + dst_host_csv = dst_host.replace('.tif','.csv') + + rasters_to_csv( + {'201001010000': dst_host}, + dst_host_csv) config_epi['Host']['HostCSV'] = dst_host_csv + config_epi['Host']['FileNamePrepared'] = dst_host_csv # provide fundamental config elements to config_epi for k,v in config.items(): @@ -553,10 +635,6 @@ def process_in_job_epi(jobPath,status,config,component): config_epi['Host']['HostCSV'], model_colns = model_colns) - # reshape structure for consistency with original infectionRaster - df_full.set_index(['longitude','latitude'],inplace=True) - df_full.drop('host_frac',axis='columns',inplace=True) - # save to csv outfile_hawheat = f"{epiconf['infectionRasterFileName']}{unit_description}_progression.csv" df_full.to_csv(outfile_hawheat,header=True,index=True) diff --git a/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json b/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json index 219438084bf03216f519df2fb6a20b45d0529d53..2842f2df9207dc2e6f7ecad76f0156957807587c 100644 --- a/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json +++ b/tests/test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json @@ -165,7 +165,10 @@ "ProcessEWSPlotting" : "process_EWS_plotting_epi", "Host" : { "MaxFieldsPerCell" : "1", - "HostRaster" : "../../test_data/test_deployment/regions/EastAfrica/resources/epimodel/assets/wheat_area_frac_MapSPAM2010_EastAfrica_clipped.tif" + "TargetRaster" : "../../test_data/test_deployment/regions/EastAfrica/resources/epimodel/assets/wheat_area_frac_MapSPAM2010_EastAfrica_clipped.tif", + "HostRasters" : { + "201001010000" : "../../test_data/test_deployment/regions/EastAfrica/resources/epimodel/assets/wheat_area_frac_MapSPAM2010_EastAfrica_clipped.tif" + } }, "Deposition" : { "VariableNames" : ["P_GRAMINIS_DEPOSITION"],