From 8fa1cc2bdf0264b8ce92068fc44a2b06b33f5015 Mon Sep 17 00:00:00 2001
From: lb584 <lb584@cam.ac.uk>
Date: Wed, 24 Aug 2022 13:56:55 +0100
Subject: [PATCH] untarring NAME data with python reading NAME.nc files rather
 than NAME.txt files (#19)

---
 NAMEPreProcessor.py    |  6 ++++-
 Processor.py           |  2 +-
 ProcessorComponents.py | 56 +++++++++++++++++++-----------------------
 ProcessorUtils.py      | 13 ++++++++++
 4 files changed, 44 insertions(+), 33 deletions(-)

diff --git a/NAMEPreProcessor.py b/NAMEPreProcessor.py
index 2900cea..bb74ac6 100644
--- a/NAMEPreProcessor.py
+++ b/NAMEPreProcessor.py
@@ -5,6 +5,10 @@ import cftime
 import iris.util
 from iris.cube import CubeList, Cube
 
+"""
+THIS IS NO LONGER USED NOW WE ARE PROCESSING THE .NC FILES SUPPLIED BY THE MET OFFICE
+DELETE AFTER A "COOLING OFF" PERIOD
+"""
 
 def save_cube(cube: iris.cube.Cube,
               sanitised_field_name: str,
@@ -82,7 +86,7 @@ if __name__ == '__main__':
 
     """
     example usage:
-    /media/scratch/lb584_scratch/projects/ews/input/eth/MET_20200707/WR_EnvSuit_Met_Ethiopia_20200707/Met_Data_*.txt
+    /media/scratch/lb584_scratch/projects/ews/input/eth/MET_20200707/WR_EnvSuit_Met_Ethiopia_20200707/*.nc
     /media/scratch/lb584_scratch/projects/ews/output/name_processing/another_dir
     """
 
diff --git a/Processor.py b/Processor.py
index 777553d..4d224ee 100755
--- a/Processor.py
+++ b/Processor.py
@@ -415,7 +415,7 @@ def run_Process():
     # lock job directory
     with jobStatus(jobPath) as status:
 
-        #lawrence check in/out
+        #lawrence comment in/out
         # check for a status file in job directory
         if status.had_initial_status:
             logger.info(f"Job path already exists and has status {status.status}")
diff --git a/ProcessorComponents.py b/ProcessorComponents.py
index 55183b1..883cd57 100644
--- a/ProcessorComponents.py
+++ b/ProcessorComponents.py
@@ -8,17 +8,20 @@ import datetime
 from distutils.dir_util import copy_tree
 from glob import glob
 import json
-from locale import normalize
+import tarfile
 import logging
 import os
 from pathlib import Path
 import re
+
+import iris
 import requests
 import shutil
 from shutil import copyfile
 import subprocess
 from string import Template
 
+from iris.cube import CubeList, Cube
 from numpy import all as np_all
 from numpy import argmax, unique
 from pandas import read_csv, Series, DataFrame, concat, to_datetime, json_normalize
@@ -43,7 +46,9 @@ from plotting.common.utils import EnvSuitDiseaseInfo
 from plotting.common.plotting_coordinator.ews_env_disease_plotting_coordinator import EWSPlottingEnvSuitBase
 from plotting.common.plotting_coordinator.ews_depo_disease_plotting_coordinator import EWSPlottingDepoBase
 from plotting.common.plotting_coordinator.ews_epi_disease_plotting_coordinator import EWSPlottingEPIBase
-from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, add_filters_to_sublogger
+from ProcessorUtils import open_and_check_config, get_only_existing_globs, subprocess_and_log, endScript, endJob, \
+    add_filters_to_sublogger, remove_path_from_tar_members
+
 
 # TODO: Replace subprocess scp and ssh commands with paramiko.SSHClient() instance
 
@@ -1294,40 +1299,29 @@ def process_in_job_env2_0(jobPath,status,config,component):
     description_short = 'env2 scp'
     description_long = 'Copying file from remote server to job directory'
 
-    #lawrence comment in/out
+    # lawrence comment in/out
     subprocess_and_log(cmd_scp,description_short, description_long)
 
     logger.info('untarring the input file')
 
-    # TODO: untar file in python (with tarfile module) instead of subprocess
-    cmd_tar = ["tar","-xzf",f"{jobPath}/{file_name}.tar.gz","-C",jobPath]
-    description_short = 'env2 tar'
-    description_long = 'Untarring the input file'
-
-    #lawrence comment in/out
-    subprocess_and_log(cmd_tar,description_short, description_long)
-
-    # basic check that contents are as expected for 7-day forecast
-    # 57 files of NAME .txt timesteps and some number of log files
-    if len(glob(f"{jobPath}/{file_name}/Met_Data_C1_T*.txt")) != 57:
-        msg = f"Insufficient contents of untarred file in directory {jobPath}/{file_name}"
-        logger.error(msg)
-        raise RuntimeError(msg)
-
-    # convert format of met data so that met extraction pipeline can work with it
-    logger.info('Convert .txt input files to .nc.tar.gz')
-
-    input_files_glob = f"{jobPath}/{file_name}/Met_Data_C1_T*.txt"
-
+    # untar incoming name data
     output_directory = f"{jobPath}/NAME_Met_as_netcdf"
-
-    #lawrence comment in/out
-    try:
-        npp.process_met_office_NAME(input_files_glob,output_directory)
-    except:
-        logger.exception(f"Some failure when converting NAME data from .txt to nc.tar.gz")
-
-    # TODO: check that process_met_office_NAME() produced output as expected
+    Path(output_directory).mkdir(parents=True, exist_ok=True)
+    tarfile_name = f"{jobPath}/{file_name}.tar.gz"
+    with tarfile.open(tarfile_name) as tar:
+        members = remove_path_from_tar_members(tar)
+        tar.extractall(output_directory, members = members)
+
+    # basic check that contents are as expected for 7-day forecast (57 timepoints in all files)
+    cube_wildcard = f"{output_directory}/*.nc"
+    cubes: CubeList = iris.load(cube_wildcard)
+    for cube in cubes:
+        coord = cube.coord("time")
+        timepoint_count = coord.shape[0]
+        if timepoint_count != 57:
+            msg = f"Unexpected number of timepoints ({timepoint_count}) in cube {cube.name()}"
+            logger.error(msg)
+            raise RuntimeError(msg)
 
     region = config['RegionName']
 
diff --git a/ProcessorUtils.py b/ProcessorUtils.py
index 58b58ef..4e0dee3 100644
--- a/ProcessorUtils.py
+++ b/ProcessorUtils.py
@@ -4,8 +4,11 @@
 import glob
 import json
 import logging
+import os
 import re
 import subprocess
+import tarfile
+
 import sys
 
 # define logging filter to obscure ODK passwords
@@ -166,3 +169,13 @@ def endJob(status,ignore_inprogress=False,**kwargs):
         status.reset('ERROR')
 
     endScript(**kwargs)
+
+
+def remove_path_from_tar_members(tf: tarfile.TarFile):
+    """
+    strips the parent path from files within a tar file before untaring. means untarred files will not have subdirs.
+    Only used with the tarfile package
+    """
+    for member in tf.getmembers():
+        member.path = os.path.basename(member.path)
+        yield member
-- 
GitLab