FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit 62506376 authored by Dr T. Mona's avatar Dr T. Mona
Browse files

feat: Grouped surveys and source gen.

Adding the functionality to group surveys and run source gen on each groups. Grouping is only run if it is defined in the config (otherwise it use all available surveys), and it is based on the 'Origin' column of the surveys. All unique 'Origin' will create its own group alongside the ones defined in the config. Groups can be ignored by defining 'GroupsToIgnore' in the config (this has no effect on 'Origin' types).
parent fc936201
No related branches found
No related tags found
No related merge requests found
......@@ -218,33 +218,11 @@ def process_in_job_survey(jobPath,status,config,component):
date = datetime.datetime.now()
# creating grouped survey files
group_directory = f"{jobPath}/Groups"
Path(group_directory).mkdir(parents=True, exist_ok=True)
origins_list = df_join["Origin"].unique()
groups = {i:[i] for i in origins_list}
assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()])
groups.update(config['Survey']['Groups'])
for group_name,group_content in groups.items():
logger.info(f"Creating survey group {group_name} which includes {group_content}")
df_group = df_join.loc[df_join["Origin"].isin(group_content)]
group_surveys_filename = f"group_{group_name}.csv"
group_surveys_filepath = f"{group_directory}/{group_surveys_filename}"
df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL)
# prepare environment for clustering calc
call_R = False
output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000"
Path(output_directory).mkdir(parents=True, exist_ok=True)
upload_directory = f"{jobPath}/upload"
Path(upload_directory).mkdir(parents=True, exist_ok=True)
if call_R:
......@@ -334,8 +312,66 @@ def process_in_job_survey(jobPath,status,config,component):
sources_path = clustering_output_path_list[0]
elif 'Groups' in config['Survey']:
# if 'Groups' is defined in the config, create grouped survey files and run python version
logger.debug('Preparing grouped survey files')
group_directory = f"{jobPath}/Groups"
Path(group_directory).mkdir(parents=True, exist_ok=True)
origins_list = df_join["Origin"].unique()
groups = {i:[i] for i in origins_list}
assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()])
groups.update(config['Survey']['Groups'])
# remove groups that are listed in GroupsToIgnore
if 'GroupsToIgnore' in config['Survey']:
for group_name in config['Survey']['GroupsToIgnore']:
if group_name in groups:
logger.info(f"Removing group {group_name} from list of groups")
del groups[group_name]
for group_name,group_content in groups.items():
logger.info(f"Creating survey group {group_name} which includes {group_content}")
df_group = df_join.loc[df_join["Origin"].isin(group_content)]
group_surveys_filename = f"surveys_{group_name}.csv"
group_surveys_filepath = f"{group_directory}/{group_surveys_filename}"
df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL)
output_directory = f"{jobPath}/source_gen/{group_name}"
Path(output_directory).mkdir(parents=True, exist_ok=True)
sources_path = run_case(
config_path = config['Survey']['pySourcesConfigFilename'],
survey_path = group_surveys_filepath,
reference_date = config['StartString'],
# Day 0 (current day) is always included
# Days -2,-1 and 0 are needed to initialise spores in NAME
# days 1-7 are forecast days
# days 8 and 9 are buffers in case NAME needs to run with the
# previous day's source job
day_offsets = [-2,9],
output_dir = output_directory)
logger.debug('Placing copy of result in job directory with conventional name')
output_filename = f"sources_{group_name}_{config['StartString']}.csv"
output_path = f"{jobPath}/upload/{output_filename}"
logger.debug(f"as {output_path}")
copyfile(sources_path, output_path)
else:
# run python version
# run python version without grouping surveys
output_directory = f"{jobPath}/source_gen"
Path(output_directory).mkdir(parents=True, exist_ok=True)
sources_path = run_case(
config_path = config['Survey']['pySourcesConfigFilename'],
......@@ -349,18 +385,25 @@ def process_in_job_survey(jobPath,status,config,component):
day_offsets = [-2,9],
output_dir = output_directory)
logger.debug('Placing copy of result in job directory with conventional name')
logger.debug('Placing copy of result in job directory with conventional name')
output_filename = f"sources_{config['StartString']}.csv"
output_path = f"{jobPath}/upload/{output_filename}"
logger.debug(f"as {output_path}")
copyfile(sources_path, output_path)
output_filename = f"sources_{config['StartString']}.csv"
output_path = f"{output_directory}/{output_filename}"
logger.debug(f"as {output_path}")
upload_filenames = f"sources_*{config['StartString']}.csv"
upload_path = f"{jobPath}/upload/{upload_filenames}"
copyfile(sources_path, output_path)
# glob list of output files
upload_path_list = glob(upload_path)
proc_out = {}
# Output files available for upload
proc_out['output'] = [output_path]
proc_out['output'] = upload_path_list
# Processing files available for clearing
proc_out['clearup'] = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment