From 6d84b27feda3b93cf1883d9dba6efb8e33314ac0 Mon Sep 17 00:00:00 2001
From: lb584 <lb584@cam.ac.uk>
Date: Tue, 16 Aug 2022 15:45:50 +0100
Subject: [PATCH] moving calls to run env suit pipeline out of EnvSuitPipeline
 and into the extractor package

---
 EnvSuitPipeline.py     | 73 ++++++++++++++++++++----------------------
 ProcessorComponents.py |  2 +-
 2 files changed, 36 insertions(+), 39 deletions(-)

diff --git a/EnvSuitPipeline.py b/EnvSuitPipeline.py
index c8b9104..8499667 100644
--- a/EnvSuitPipeline.py
+++ b/EnvSuitPipeline.py
@@ -9,8 +9,10 @@ from typing import List
 import pandas as pd
 
 from met_processing.common import processor_pool
+from met_processing.runner.common import job_runner
 from met_processing.runner.common.generate_all_jobs import generate_all_jobs
 
+MAX_WORKERS: int = 3
 
 logging.basicConfig(level=logging.DEBUG)
 logger = logging.getLogger('Processor.pipeline')
@@ -65,24 +67,24 @@ def generate_temporal_points(file, datestr, timeresolution, nDaysForecast):
 
 ### Met extractor ############
 
-def pipeline_subprocess(workPath, command, multi=True):
-    logger.info(f"Change work directory to {workPath}")
-    os.chdir(workPath)
-
-    try:
-        if (multi == True):
-            commands: List[str] = command.split(",")
-
-            logger.info(f"Run {command} in multi process mode.")
-            processor_pool.main(workPath, commands, max_workers=10, chunk_size=1)
-        else:
-            logger.info(f"Run {command} in single process mode.")
-            pass # TODO add not multi proc mode
-    except:
-        logger.exception(f"Some failure when running {command}", exc_info=True)
-        raise
-
-    return
+# def pipeline_subprocess(workPath, command, multi=True):
+#     logger.info(f"Change work directory to {workPath}")
+#     os.chdir(workPath)
+#
+#     try:
+#         if (multi == True):
+#             commands: List[str] = command.split(",")
+#
+#             logger.info(f"Run {command} in multi process mode.")
+#             processor_pool.main(workPath, commands, max_workers=10, chunk_size=1)
+#         else:
+#             logger.info(f"Run {command} in single process mode.")
+#             pass # TODO add not multi proc mode
+#     except:
+#         logger.exception(f"Some failure when running {command}", exc_info=True)
+#         raise
+#
+#     return
 
 
 def clean(workPath): # Clean temporary files and folders from the working directory
@@ -114,38 +116,33 @@ def generate_all(sys_config, run_config):
 
     # Run all generate
     try:
-
         generate_all_jobs(run_config, sys_config, slurm_mode)
-
-    except:
+    except Exception:
         logger.exception(f"Some failure when running one of the generate job", exc_info=True)
         raise
 
     return
 
 
-def run_extraction(workPath):
-    extractionPath = workPath + 'extraction/'
-    extraction_process = pipeline_subprocess(extractionPath, 'run_regridding.sh')
+def run_extraction(work_path):
+    logger.info(f"Running regridding in multi process mode.")
+    job_runner.run_extraction(work_path, **{"MAX_WORKERS": MAX_WORKERS})
+    logger.info('Data extracted and chunked')
 
-    chunksPath = workPath + 'chunks/'
-    extraction_process = pipeline_subprocess(chunksPath, 'run_rechunk.sh')
 
+def run_post_processing(work_path):
+    logger.info(f"Running post-processing in multi process mode.")
+    job_runner.run_post_processing(work_path, **{"MAX_WORKERS": MAX_WORKERS})
     logger.info('Data extracted and chunked')
 
-    return extraction_process
-
 
-def run_merger(workPath):
-    merge = workPath + 'run_merge_results.sh'
+def run_merger(work_path):
     try:
-        merge_process = subprocess.run(merge, check=True)
-    except:
-        logger.exception(f"Some failure when running {merge}", exc_info=True)
+        job_runner.run_merge_post_processing(work_path)
+    except Exception:
+        logger.exception(f"Some failure when running merge RIE", exc_info=True)
         raise
 
-    return merge_process
-
 
 #######################################
 
@@ -245,7 +242,7 @@ def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent
 
             logger.info(f"Starting {processor_name} post processor ---------------------------------")
             processorPath = workPath + 'post_processing/' + processor_name + '/'
-            pipeline_subprocess(processorPath, 'run_post_processing.sh')
+            run_post_processing(processorPath)
 
             run_merger(processorPath)
         else:
@@ -273,12 +270,12 @@ def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent
 
                 # Extract
                 if (extracted == False):
-                    # run_extraction(workPath)
+                    run_extraction(workPath)
                     extracted = True
 
                 logger.info(f"Starting {strain} suitability ---------------------------------")
                 envSuitPath = workPath + 'post_processing/RIE/'
-                pipeline_subprocess(envSuitPath, 'run_post_processing.sh')
+                run_post_processing(envSuitPath)
 
                 run_merger(envSuitPath)
 
diff --git a/ProcessorComponents.py b/ProcessorComponents.py
index 15d3bf5..747ba2a 100644
--- a/ProcessorComponents.py
+++ b/ProcessorComponents.py
@@ -1326,7 +1326,7 @@ def process_in_job_env2_0(jobPath,status,config,component):
 
     pipeline_config = config["Environment"]
     try:
-        esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False)
+        esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=True)
     except:
         logger.exception(f"Some failure when running EnvSuitPipeline.py")
         raise
-- 
GitLab