FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorServer.py 5.50 KiB
#ProcessorServer.py
'''Functions to communicate with server sites for download and upload.'''

import datetime
import logging
import os
import tarfile
from string import Template
from typing import List

from ProcessorUtils import (
        open_and_check_config,
        subprocess_and_log,
        endScript,
        add_filters_to_sublogger
)

logger = logging.getLogger('Processor.Server')
add_filters_to_sublogger(logger)

def process_pre_job_server_download(input_args: dict):
    '''This is set up for environmental suitability v2.0 and deposition.
    Returns a boolean as to whether the job is ready for full processing.'''

    logger.info('started process_pre_job_willow_download()')

    # Check if there is a file available on willow
    logger.debug('Checking for file(s) on remote server')

    config_paths: List[str] = input_args['config_paths']
    start_date: str = input_args['start_date']
    component: str = input_args['component']

    for i,config_path in enumerate(config_paths):

        config = open_and_check_config(config_path)

        config['StartString'] = start_date

        file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
        file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
        logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz")

        timenow = datetime.datetime.now(tz=datetime.timezone.utc).time()

        server_name: str = config['ServerName']
        full_file_path = f"{file_path}/{file_name}.tar.gz"

        """
        check the incoming met data tar exists and is a valid tar file (on either remote or local machine)
        """
        if server_name == "":  # means the file is local
            data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path)
        else:
            cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name,
                              f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"]
            run_in_shell: bool = False

            description_short = 'subprocess_ssh'
            description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"

            status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False,
                                        shell = run_in_shell)
            data_is_ready = status.returncode == 0

        if not data_is_ready:

            # a time check in UTC. If it's late, raise warning, if very late, raise error

            time_0 = config[component]['TimeExpectedAvailable']
            time_0 = datetime.datetime.strptime(time_0,'%H%M')

            time_until_warn = datetime.timedelta(hours=4)
            time_until_error = datetime.timedelta(hours=5)

            time_warn = (time_0 + time_until_warn).time()
            time_error = (time_0 + time_until_error).time()

            message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}"

            if timenow > time_error:
                # job is not able to proceed

                logger.warning(message)

                return False

            elif timenow > time_warn:
                # job is not ready to proceed

                logger.warning(message)
                endScript(premature=True)

            else:
                # some other problem with the job

                logger.info(message)
                endScript(premature=True)

        elif data_is_ready:
            logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed")

    return True

def upload(config,FilesToSend,component):

    usual_path = f"{config['StartString']}_0000/"

    component_path = {
            'Environment' : usual_path,
            'Deposition' : usual_path,
            'Epidemiology' : usual_path,
            'Survey' : f"SURVEYDATA_{config['StartString']}_0000/",
            'Scraper' : usual_path,
            'Advisory' : usual_path }


    # TODO: make path discern Daily or Weekly sub-directory

    OutputServerPath = f"{config['ServerPath']}/{component_path[component]}"

    logger.info(f"Trying upload to {config['ServerName']}:{OutputServerPath}")

    logger.info(f"File(s) that will be put on remote server: {FilesToSend}")

    if len(FilesToSend) == 0:
        logger.warning('No files to send, so skipping this task')
        raise IndexError

    logger.debug("Making path directory on remote server if it doesn't already exist")

    server_key = config['ServerKey']
    if server_key == "":
        ssh_cmd = f"mkdir -p {OutputServerPath}"
        run_in_shell: bool = True
    else:
        ssh_cmd = ["ssh", "-i", server_key, "-o", "StrictHostKeyChecking=no", config['ServerName'],
                   f"mkdir -p {OutputServerPath}"]
        run_in_shell: bool = False

    description_short = 'upload ssh'
    description_long = 'make remote directory'
    subprocess_and_log(ssh_cmd, description_short, description_long, shell=run_in_shell)

    logger.debug('Sending file(s) to remote server')

    if server_key == "":
        scp_cmd = ["scp", "-r", *FilesToSend, OutputServerPath]
    else:
        scp_cmd = ["scp", "-ri", server_key, "-o", "StrictHostKeyChecking=no", *FilesToSend,
                   f"{config['ServerName']}:{OutputServerPath}"]

    description_short = 'upload scp'
    description_long = 'scp files to remote directory'
    subprocess_and_log(scp_cmd, description_short, description_long)

    return