diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py index bfc5e8a75a331e9be4a4a35dc1d2dcfb62085501..5b874baf84315cf2017b77aea54c3600e78ceea2 100644 --- a/coordinator/ProcessorSurveys.py +++ b/coordinator/ProcessorSurveys.py @@ -218,33 +218,11 @@ def process_in_job_survey(jobPath,status,config,component): date = datetime.datetime.now() - # creating grouped survey files - group_directory = f"{jobPath}/Groups" - Path(group_directory).mkdir(parents=True, exist_ok=True) - - origins_list = df_join["Origin"].unique() - groups = {i:[i] for i in origins_list} - - assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()]) - - groups.update(config['Survey']['Groups']) - - for group_name,group_content in groups.items(): - - logger.info(f"Creating survey group {group_name} which includes {group_content}") - - df_group = df_join.loc[df_join["Origin"].isin(group_content)] - - group_surveys_filename = f"group_{group_name}.csv" - group_surveys_filepath = f"{group_directory}/{group_surveys_filename}" - - df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL) - # prepare environment for clustering calc call_R = False - output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000" - Path(output_directory).mkdir(parents=True, exist_ok=True) + upload_directory = f"{jobPath}/upload" + Path(upload_directory).mkdir(parents=True, exist_ok=True) if call_R: @@ -334,8 +312,66 @@ def process_in_job_survey(jobPath,status,config,component): sources_path = clustering_output_path_list[0] + elif 'Groups' in config['Survey']: + # if 'Groups' is defined in the config, create grouped survey files and run python version + + logger.debug('Preparing grouped survey files') + group_directory = f"{jobPath}/Groups" + Path(group_directory).mkdir(parents=True, exist_ok=True) + + origins_list = df_join["Origin"].unique() + groups = {i:[i] for i in origins_list} + + assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()]) + + groups.update(config['Survey']['Groups']) + + # remove groups that are listed in GroupsToIgnore + if 'GroupsToIgnore' in config['Survey']: + for group_name in config['Survey']['GroupsToIgnore']: + if group_name in groups: + logger.info(f"Removing group {group_name} from list of groups") + del groups[group_name] + + for group_name,group_content in groups.items(): + + logger.info(f"Creating survey group {group_name} which includes {group_content}") + + df_group = df_join.loc[df_join["Origin"].isin(group_content)] + + group_surveys_filename = f"surveys_{group_name}.csv" + group_surveys_filepath = f"{group_directory}/{group_surveys_filename}" + + df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL) + + output_directory = f"{jobPath}/source_gen/{group_name}" + Path(output_directory).mkdir(parents=True, exist_ok=True) + + sources_path = run_case( + config_path = config['Survey']['pySourcesConfigFilename'], + survey_path = group_surveys_filepath, + reference_date = config['StartString'], + # Day 0 (current day) is always included + # Days -2,-1 and 0 are needed to initialise spores in NAME + # days 1-7 are forecast days + # days 8 and 9 are buffers in case NAME needs to run with the + # previous day's source job + day_offsets = [-2,9], + output_dir = output_directory) + + logger.debug('Placing copy of result in job directory with conventional name') + + output_filename = f"sources_{group_name}_{config['StartString']}.csv" + output_path = f"{jobPath}/upload/{output_filename}" + + logger.debug(f"as {output_path}") + + copyfile(sources_path, output_path) else: - # run python version + # run python version without grouping surveys + + output_directory = f"{jobPath}/source_gen" + Path(output_directory).mkdir(parents=True, exist_ok=True) sources_path = run_case( config_path = config['Survey']['pySourcesConfigFilename'], @@ -349,18 +385,25 @@ def process_in_job_survey(jobPath,status,config,component): day_offsets = [-2,9], output_dir = output_directory) - logger.debug('Placing copy of result in job directory with conventional name') + logger.debug('Placing copy of result in job directory with conventional name') + + output_filename = f"sources_{config['StartString']}.csv" + output_path = f"{jobPath}/upload/{output_filename}" + + logger.debug(f"as {output_path}") + + copyfile(sources_path, output_path) - output_filename = f"sources_{config['StartString']}.csv" - output_path = f"{output_directory}/{output_filename}" - logger.debug(f"as {output_path}") + upload_filenames = f"sources_*{config['StartString']}.csv" + upload_path = f"{jobPath}/upload/{upload_filenames}" - copyfile(sources_path, output_path) + # glob list of output files + upload_path_list = glob(upload_path) proc_out = {} # Output files available for upload - proc_out['output'] = [output_path] + proc_out['output'] = upload_path_list # Processing files available for clearing proc_out['clearup'] = None