diff --git a/NAMEPreProcessor.py b/NAMEPreProcessor.py index 2900ceafabd65c5093d08d63ec67d748dcece451..bb74ac6943d4f1c56e2a300dcdfb4de6767e1ddb 100644 --- a/NAMEPreProcessor.py +++ b/NAMEPreProcessor.py @@ -5,6 +5,10 @@ import cftime import iris.util from iris.cube import CubeList, Cube +""" +THIS IS NO LONGER USED NOW WE ARE PROCESSING THE .NC FILES SUPPLIED BY THE MET OFFICE +DELETE AFTER A "COOLING OFF" PERIOD +""" def save_cube(cube: iris.cube.Cube, sanitised_field_name: str, @@ -82,7 +86,7 @@ if __name__ == '__main__': """ example usage: - /media/scratch/lb584_scratch/projects/ews/input/eth/MET_20200707/WR_EnvSuit_Met_Ethiopia_20200707/Met_Data_*.txt + /media/scratch/lb584_scratch/projects/ews/input/eth/MET_20200707/WR_EnvSuit_Met_Ethiopia_20200707/*.nc /media/scratch/lb584_scratch/projects/ews/output/name_processing/another_dir """ diff --git a/Processor.py b/Processor.py index 777553d2caf31ac640f3becd52fadb28e8f19090..4d224ee82a14414a11ba06acef722f23428bbc99 100755 --- a/Processor.py +++ b/Processor.py @@ -415,7 +415,7 @@ def run_Process(): # lock job directory with jobStatus(jobPath) as status: - #lawrence check in/out + #lawrence comment in/out # check for a status file in job directory if status.had_initial_status: logger.info(f"Job path already exists and has status {status.status}") diff --git a/ProcessorComponents.py b/ProcessorComponents.py index 55183b1ea49576b774bcd6d37bbdfdb1490cbc8a..883cd57e4271faaaa9a6798ef82dcd6450fa78d5 100644 --- a/ProcessorComponents.py +++ b/ProcessorComponents.py @@ -8,17 +8,20 @@ import datetime from distutils.dir_util import copy_tree from glob import glob import json -from locale import normalize +import tarfile import logging import os from pathlib import Path import re + +import iris import requests import shutil from shutil import copyfile import subprocess from string import Template +from iris.cube import CubeList, Cube from numpy import all as np_all from numpy import argmax, unique from pandas import read_csv, Series, DataFrame, concat, to_datetime, json_normalize @@ -43,7 +46,9 @@ from plotting.common.utils import EnvSuitDiseaseInfo from plotting.common.plotting_coordinator.ews_env_disease_plotting_coordinator import EWSPlottingEnvSuitBase from plotting.common.plotting_coordinator.ews_depo_disease_plotting_coordinator import EWSPlottingDepoBase from plotting.common.plotting_coordinator.ews_epi_disease_plotting_coordinator import EWSPlottingEPIBase -from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, add_filters_to_sublogger +from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, \ + add_filters_to_sublogger, remove_path_from_tar_members + # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance @@ -1294,40 +1299,29 @@ def process_in_job_env2_0(jobPath,status,config,component): description_short = 'env2 scp' description_long = 'Copying file from remote server to job directory' - #lawrence comment in/out + # lawrence comment in/out subprocess_and_log(cmd_scp,description_short, description_long) logger.info('untarring the input file') - # TODO: untar file in python (with tarfile module) instead of subprocess - cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath] - description_short = 'env2 tar' - description_long = 'Untarring the input file' - - #lawrence comment in/out - subprocess_and_log(cmd_tar,description_short, description_long) - - # basic check that contents are as expected for 7-day forecast - # 57 files of NAME .txt timesteps and some number of log files - if len(glob(f"{jobPath}/{file_name}/Met_Data_C1_T*.txt")) != 57: - msg = f"Insufficient contents of untarred file in directory {jobPath}/{file_name}" - logger.error(msg) - raise RuntimeError(msg) - - # convert format of met data so that met extraction pipeline can work with it - logger.info('Convert .txt input files to .nc.tar.gz') - - input_files_glob = f"{jobPath}/{file_name}/Met_Data_C1_T*.txt" - + # untar incoming name data output_directory = f"{jobPath}/NAME_Met_as_netcdf" - - #lawrence comment in/out - try: - npp.process_met_office_NAME(input_files_glob,output_directory) - except: - logger.exception(f"Some failure when converting NAME data from .txt to nc.tar.gz") - - # TODO: check that process_met_office_NAME() produced output as expected + Path(output_directory).mkdir(parents=True, exist_ok=True) + tarfile_name = f"{jobPath}/{file_name}.tar.gz" + with tarfile.open(tarfile_name) as tar: + members = remove_path_from_tar_members(tar) + tar.extractall(output_directory, members = members) + + # basic check that contents are as expected for 7-day forecast (57 timepoints in all files) + cube_wildcard = f"{output_directory}/*.nc" + cubes: CubeList = iris.load(cube_wildcard) + for cube in cubes: + coord = cube.coord("time") + timepoint_count = coord.shape[0] + if timepoint_count != 57: + msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}" + logger.error(msg) + raise RuntimeError(msg) region = config['RegionName'] diff --git a/ProcessorUtils.py b/ProcessorUtils.py index 58b58efd4ee8f9d0233342dc7c6f688e0c962660..4e0dee38f820b2e67ec87b1c6f8fc02ca1814f76 100644 --- a/ProcessorUtils.py +++ b/ProcessorUtils.py @@ -4,8 +4,11 @@ import glob import json import logging +import os import re import subprocess +import tarfile + import sys # define logging filter to obscure ODK passwords @@ -166,3 +169,13 @@ def endJob(status,ignore_inprogress=False,**kwargs): status.reset('ERROR') endScript(**kwargs) + + +def remove_path_from_tar_members(tf: tarfile.TarFile): + """ + strips the parent path from files within a tar file before untaring. means untarred files will not have subdirs. + Only used with the tarfile package + """ + for member in tf.getmembers(): + member.path = os.path.basename(member.path) + yield member