From 6250637649c57563c95374d10227ec1c0945892f Mon Sep 17 00:00:00 2001
From: tm689 <tm689@cam.ac.uk>
Date: Wed, 16 Aug 2023 14:48:59 +0100
Subject: [PATCH] feat: Grouped surveys and source gen.

Adding the functionality to group surveys and run source gen on each groups. Grouping is only run if it is defined in the config (otherwise it use all available surveys), and it is based on the 'Origin' column of the surveys. All unique 'Origin' will create its own group alongside the ones defined in the config. Groups can be ignored by defining 'GroupsToIgnore' in the config (this has no effect on 'Origin' types).
---
 coordinator/ProcessorSurveys.py | 105 ++++++++++++++++++++++----------
 1 file changed, 74 insertions(+), 31 deletions(-)

diff --git a/coordinator/ProcessorSurveys.py b/coordinator/ProcessorSurveys.py
index bfc5e8a..5b874ba 100644
--- a/coordinator/ProcessorSurveys.py
+++ b/coordinator/ProcessorSurveys.py
@@ -218,33 +218,11 @@ def process_in_job_survey(jobPath,status,config,component):
 
     date = datetime.datetime.now()
 
-    # creating grouped survey files
-    group_directory = f"{jobPath}/Groups"
-    Path(group_directory).mkdir(parents=True, exist_ok=True)
-
-    origins_list = df_join["Origin"].unique()
-    groups = {i:[i] for i in origins_list}
-
-    assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()])
-
-    groups.update(config['Survey']['Groups'])
-    
-    for group_name,group_content in groups.items():
-
-        logger.info(f"Creating survey group {group_name} which includes {group_content}")
-        
-        df_group = df_join.loc[df_join["Origin"].isin(group_content)]
-
-        group_surveys_filename = f"group_{group_name}.csv"
-        group_surveys_filepath = f"{group_directory}/{group_surveys_filename}"
-        
-        df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL)
-
     # prepare environment for clustering calc
     call_R = False
 
-    output_directory = f"{jobPath}/SURVEYDATA_{config['StartString']}_0000"
-    Path(output_directory).mkdir(parents=True, exist_ok=True)
+    upload_directory = f"{jobPath}/upload"
+    Path(upload_directory).mkdir(parents=True, exist_ok=True)
 
     if call_R:
 
@@ -334,8 +312,66 @@ def process_in_job_survey(jobPath,status,config,component):
 
         sources_path = clustering_output_path_list[0]
 
+    elif 'Groups' in config['Survey']:
+        # if 'Groups' is defined in the config, create grouped survey files and run python version
+    
+        logger.debug('Preparing grouped survey files')
+        group_directory = f"{jobPath}/Groups"
+        Path(group_directory).mkdir(parents=True, exist_ok=True)
+
+        origins_list = df_join["Origin"].unique()
+        groups = {i:[i] for i in origins_list}
+
+        assert not np_any([k in origins_list for k in config['Survey']['Groups'].keys()])
+
+        groups.update(config['Survey']['Groups'])
+        
+        # remove groups that are listed in GroupsToIgnore
+        if 'GroupsToIgnore' in config['Survey']:
+            for group_name in config['Survey']['GroupsToIgnore']:
+                if group_name in groups:
+                    logger.info(f"Removing group {group_name} from list of groups")
+                    del groups[group_name]
+        
+        for group_name,group_content in groups.items():
+
+            logger.info(f"Creating survey group {group_name} which includes {group_content}")
+            
+            df_group = df_join.loc[df_join["Origin"].isin(group_content)]
+
+            group_surveys_filename = f"surveys_{group_name}.csv"
+            group_surveys_filepath = f"{group_directory}/{group_surveys_filename}"
+            
+            df_group.to_csv(group_surveys_filepath, index=False, quoting=csv.QUOTE_MINIMAL)
+
+            output_directory = f"{jobPath}/source_gen/{group_name}"
+            Path(output_directory).mkdir(parents=True, exist_ok=True)
+
+            sources_path = run_case(
+                    config_path = config['Survey']['pySourcesConfigFilename'],
+                    survey_path = group_surveys_filepath,
+                    reference_date = config['StartString'],
+                    # Day 0 (current day) is always included
+                    # Days -2,-1 and 0 are needed to initialise spores in NAME
+                    # days 1-7 are forecast days
+                    # days 8 and 9 are buffers in case NAME needs to run with the 
+                    # previous day's source job
+                    day_offsets = [-2,9],
+                    output_dir = output_directory)
+            
+            logger.debug('Placing copy of result in job directory with conventional name')
+            
+            output_filename = f"sources_{group_name}_{config['StartString']}.csv"
+            output_path = f"{jobPath}/upload/{output_filename}"
+
+            logger.debug(f"as {output_path}")
+
+            copyfile(sources_path, output_path)
     else:
-        # run python version
+        # run python version without grouping surveys
+
+        output_directory = f"{jobPath}/source_gen"
+        Path(output_directory).mkdir(parents=True, exist_ok=True)
 
         sources_path = run_case(
                 config_path = config['Survey']['pySourcesConfigFilename'],
@@ -349,18 +385,25 @@ def process_in_job_survey(jobPath,status,config,component):
                 day_offsets = [-2,9],
                 output_dir = output_directory)
 
-    logger.debug('Placing copy of result in job directory with conventional name')
+        logger.debug('Placing copy of result in job directory with conventional name')
+
+        output_filename = f"sources_{config['StartString']}.csv"
+        output_path = f"{jobPath}/upload/{output_filename}"
+
+        logger.debug(f"as {output_path}")
+
+        copyfile(sources_path, output_path)
 
-    output_filename = f"sources_{config['StartString']}.csv"
-    output_path = f"{output_directory}/{output_filename}"
 
-    logger.debug(f"as {output_path}")
+    upload_filenames = f"sources_*{config['StartString']}.csv"
+    upload_path = f"{jobPath}/upload/{upload_filenames}"
 
-    copyfile(sources_path, output_path)
+    # glob list of output files
+    upload_path_list = glob(upload_path)
 
     proc_out = {}
     # Output files available for upload
-    proc_out['output'] = [output_path]
+    proc_out['output'] = upload_path_list
     # Processing files available for clearing
     proc_out['clearup'] = None
 
-- 
GitLab