diff --git a/EnvSuitPipeline.py b/EnvSuitPipeline.py index b3768577766a629f0637bb749c2ba6006dbe05fd..6981a4b155cc45763057032e7a673d98cdd0422e 100644 --- a/EnvSuitPipeline.py +++ b/EnvSuitPipeline.py @@ -3,19 +3,14 @@ import json import logging import os import shutil -import subprocess -import sys -import numpy as np import pandas as pd -from typing import List +from met_processing.runner.common import job_runner +from met_processing.runner.common.generate_all_jobs import generate_all_jobs -from met_processing.slurm.chunk import generate_chunk_jobs -from met_processing.slurm.common.generate_run_all_slurm_jobs import generate_run_all_slurm_script -from met_processing.slurm.extraction import generate_extraction_jobs -from met_processing.slurm.post_processing import generate_processor_jobs -from met_processing.common import processor_pool + +MAX_WORKERS: int = 3 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('Processor.pipeline') @@ -68,27 +63,6 @@ def generate_temporal_points(file, datestr, timeresolution, nDaysForecast): outfile.close() return outfile -### Met extractor ############ - -def pipeline_subprocess(workPath, command, multi=True): - logger.info(f"Change work directory to {workPath}") - os.chdir(workPath) - - try: - if (multi == True): - commands: List[str] = command.split(",") - - logger.info(f"Run {command} in multi process mode.") - processor_pool.main(workPath, commands, max_workers=10, chunk_size=1) - else: - logger.info(f"Run {command} in single process mode.") - pass # TODO add not multi proc mode - except: - logger.exception(f"Some failure when running {command}", exc_info=True) - raise - - return - def clean(workPath): # Clean temporary files and folders from the working directory try: @@ -119,45 +93,33 @@ def generate_all(sys_config, run_config): # Run all generate try: - generate_extraction_jobs.generate_jobs(run_config, sys_config, slurm_mode) - - if slurm_mode: - generate_chunk_jobs.generate_jobs(run_config, sys_config) - - generate_processor_jobs.generate_jobs(run_config, sys_config, slurm_mode) - - if slurm_mode: - generate_run_all_slurm_script(run_config, sys_config) - - except: + generate_all_jobs(run_config, sys_config, slurm_mode) + except Exception: logger.exception(f"Some failure when running one of the generate job", exc_info=True) raise return -def run_extraction(workPath): - extractionPath = workPath + 'extraction/' - extraction_process = pipeline_subprocess(extractionPath, 'run_regridding.sh') +def run_extraction(work_path): + logger.info(f"Running regridding in multi process mode.") + job_runner.run_extraction(work_path, **{"MAX_WORKERS": MAX_WORKERS}) + logger.info('Data extracted and chunked') - chunksPath = workPath + 'chunks/' - extraction_process = pipeline_subprocess(chunksPath, 'run_rechunk.sh') +def run_post_processing(work_path): + logger.info(f"Running post-processing in multi process mode.") + job_runner.run_post_processing(work_path, **{"MAX_WORKERS": MAX_WORKERS}) logger.info('Data extracted and chunked') - return extraction_process - -def run_merger(workPath): - merge = workPath + 'run_merge_results.sh' +def run_merger(work_path): try: - merge_process = subprocess.run(merge, check=True) - except: - logger.exception(f"Some failure when running {merge}", exc_info=True) + job_runner.run_merge_post_processing(work_path) + except Exception: + logger.exception(f"Some failure when running merge RIE", exc_info=True) raise - return merge_process - ####################################### @@ -257,7 +219,7 @@ def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent logger.info(f"Starting {processor_name} post processor ---------------------------------") processorPath = workPath + 'post_processing/' + processor_name + '/' - pipeline_subprocess(processorPath, 'run_post_processing.sh') + run_post_processing(processorPath) run_merger(processorPath) else: @@ -290,7 +252,7 @@ def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent logger.info(f"Starting {strain} suitability ---------------------------------") envSuitPath = workPath + 'post_processing/RIE/' - pipeline_subprocess(envSuitPath, 'run_post_processing.sh') + run_post_processing(envSuitPath) run_merger(envSuitPath) diff --git a/Processor.py b/Processor.py index c00450d1f5c61656a86ca71c4d2bde7ff9e8803c..6be77cb721d334a6eb8453b6624b0ff56c6f988e 100755 --- a/Processor.py +++ b/Processor.py @@ -416,10 +416,11 @@ def run_Process(): with jobStatus(jobPath) as status: # check for a status file in job directory - if status.had_initial_status: - logger.info(f"Job path already exists and has status {status.status}") - - endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) + ##todo lawrence comment this back in + # if status.had_initial_status: + # logger.info(f"Job path already exists and has status {status.status}") + # + # endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) logger.info(f"Current status of job directory is {status.status}") diff --git a/ProcessorComponents.py b/ProcessorComponents.py index 9e25b26baae8c930a21ce98919f7d6c2b23a3035..ba3015742c34563fe8edc118cc7e77b34c94150f 100644 --- a/ProcessorComponents.py +++ b/ProcessorComponents.py @@ -1332,6 +1332,7 @@ def process_in_job_env2_0(jobPath,status,config,component): pipeline_config = config["Environment"] try: + #todo lawrence comment this back to original esp.run_pipeline(pipeline_config, region, config["StartString"], extracted=False) except: logger.exception(f"Some failure when running EnvSuitPipeline.py") diff --git a/configs/docker/run/launchDocker_template.sh b/configs/docker/run/launchDocker_template.sh index c5cb519798da6ff365cc96a7d9a95fcadb1327e2..4ec8bf0253be39cfbc2839646af4d84d7fb9b2cd 100755 --- a/configs/docker/run/launchDocker_template.sh +++ b/configs/docker/run/launchDocker_template.sh @@ -1 +1 @@ -sudo docker run -it -v "<install_location>/code:/storage/app/EWS_prod/code" -v "<install_location>/regions:/storage/app/EWS_prod/regions" -v "<install_location>/envs/credentials:/storage/app/EWS_prod/envs/credentials" -w "/storage/app/EWS_prod/code" ews_coordinator +sudo docker run -it -v "<install_location>/code:/storage/app/EWS_prod/code" -v "<install_location>/regions:/storage/app/EWS_prod/regions" -v "<install_location>/envs/credentials:/storage/app/EWS_prod/envs/credentials" -w "/storage/app/EWS_prod/code" lb584/ews_coordinator