From 033f384ecb3662399c8e38129e61d33bbf82cf6e Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Tue, 16 Aug 2022 10:56:10 +0100 Subject: [PATCH] making generate_all get called from coordinator --- EnvSuitPipeline.py | 25 ++++++------------------- Processor.py | 9 +++++---- ProcessorComponents.py | 15 +++++++++------ 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/EnvSuitPipeline.py b/EnvSuitPipeline.py index 3e231d1..c15ad57 100644 --- a/EnvSuitPipeline.py +++ b/EnvSuitPipeline.py @@ -4,18 +4,13 @@ import logging import os import shutil import subprocess -import sys +from typing import List -import numpy as np import pandas as pd -from typing import List - -from met_processing.slurm.chunk import generate_chunk_jobs -from met_processing.slurm.common.generate_run_all_slurm_jobs import generate_run_all_slurm_script -from met_processing.slurm.extraction import generate_extraction_jobs -from met_processing.slurm.post_processing import generate_processor_jobs from met_processing.common import processor_pool +from met_processing.slurm.common.generate_all_jobs import generate_all_jobs + logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('Processor.pipeline') @@ -119,15 +114,8 @@ def generate_all(sys_config, run_config): # Run all generate try: - generate_extraction_jobs.generate_jobs(run_config, sys_config, slurm_mode) - - if slurm_mode: - generate_chunk_jobs.generate_jobs(run_config, sys_config) - - generate_processor_jobs.generate_jobs(run_config, sys_config, slurm_mode) - if slurm_mode: - generate_run_all_slurm_script(run_config, sys_config) + generate_all_jobs(run_config, sys_config, slurm_mode) except: logger.exception(f"Some failure when running one of the generate job", exc_info=True) @@ -161,12 +149,11 @@ def run_merger(workPath): ####################################### -def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent_overwrite = True): +def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent_overwrite = False): ''' The prevent_overwrite parameter can be set to False if you want to re-run a job in-place. ''' - #test commit # Get parameters from the config resourcesPath = getParameter(pipeline_config,'RESOURCES_PATH') workPath = getParameter(pipeline_config,'WORK_PATH') + 'ENVIRONMENT_2.0_' + dateString + '/' @@ -286,7 +273,7 @@ def run_pipeline(pipeline_config, region, dateString, extracted = False, prevent # Extract if (extracted == False): - run_extraction(workPath) + # run_extraction(workPath) extracted = True logger.info(f"Starting {strain} suitability ---------------------------------") diff --git a/Processor.py b/Processor.py index c00450d..6be77cb 100755 --- a/Processor.py +++ b/Processor.py @@ -416,10 +416,11 @@ def run_Process(): with jobStatus(jobPath) as status: # check for a status file in job directory - if status.had_initial_status: - logger.info(f"Job path already exists and has status {status.status}") - - endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) + ##todo lawrence comment this back in + # if status.had_initial_status: + # logger.info(f"Job path already exists and has status {status.status}") + # + # endScript(premature = status.status not in ['SUCCESS','INPROGRESS']) logger.info(f"Current status of job directory is {status.status}") diff --git a/ProcessorComponents.py b/ProcessorComponents.py index 6ea3727..15d3bf5 100644 --- a/ProcessorComponents.py +++ b/ProcessorComponents.py @@ -1285,7 +1285,8 @@ def process_in_job_env2_0(jobPath,status,config,component): description_short = 'env2 scp' description_long = 'Copying file from remote server to job directory' - subprocess_and_log(cmd_scp,description_short, description_long) + #todo lawrence comment this back in + # subprocess_and_log(cmd_scp,description_short, description_long) logger.info('untarring the input file') @@ -1294,7 +1295,8 @@ def process_in_job_env2_0(jobPath,status,config,component): description_short = 'env2 tar' description_long = 'Untarring the input file' - subprocess_and_log(cmd_tar,description_short, description_long) + #todo lawrence comment this back in + # subprocess_and_log(cmd_tar,description_short, description_long) # basic check that contents are as expected for 7-day forecast # 57 files of NAME .txt timesteps and some number of log files @@ -1310,10 +1312,11 @@ def process_in_job_env2_0(jobPath,status,config,component): output_directory = f"{jobPath}/NAME_Met_as_netcdf" - try: - npp.process_met_office_NAME(input_files_glob,output_directory) - except: - logger.exception(f"Some failure when converting NAME data from .txt to nc.tar.gz") + #todo lawrence comment this back in + # try: + # npp.process_met_office_NAME(input_files_glob,output_directory) + # except: + # logger.exception(f"Some failure when converting NAME data from .txt to nc.tar.gz") # TODO: check that process_met_office_NAME() produced output as expected -- GitLab