Something went wrong on our end
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ProcessorServer.py 5.50 KiB
#ProcessorServer.py
'''Functions to communicate with server sites for download and upload.'''
import datetime
import logging
import os
import tarfile
from string import Template
from typing import List
from ProcessorUtils import (
open_and_check_config,
subprocess_and_log,
endScript,
add_filters_to_sublogger
)
logger = logging.getLogger('Processor.Server')
add_filters_to_sublogger(logger)
def process_pre_job_server_download(input_args: dict):
'''This is set up for environmental suitability v2.0 and deposition.
Returns a boolean as to whether the job is ready for full processing.'''
logger.info('started process_pre_job_willow_download()')
# Check if there is a file available on willow
logger.debug('Checking for file(s) on remote server')
config_paths: List[str] = input_args['config_paths']
start_date: str = input_args['start_date']
component: str = input_args['component']
for i,config_path in enumerate(config_paths):
config = open_and_check_config(config_path)
config['StartString'] = start_date
file_path = Template(config[component]['ServerPathTemplate']).substitute(**config)
file_name = Template(config[component]['InputFileTemplate']).substitute(**config)
logger.info(f"Checking for existence of {file_path}/{file_name}.tar.gz")
timenow = datetime.datetime.now(tz=datetime.timezone.utc).time()
server_name: str = config['ServerName']
full_file_path = f"{file_path}/{file_name}.tar.gz"
"""
check the incoming met data tar exists and is a valid tar file (on either remote or local machine)
"""
if server_name == "": # means the file is local
data_is_ready = os.path.exists(full_file_path) and tarfile.is_tarfile(full_file_path)
else:
cmd_check_file = ["ssh", "-i", config['ServerKey'], "-o", "StrictHostKeyChecking=no", server_name,
f"test -f {full_file_path} && tar -tzf {full_file_path} >/dev/null"]
run_in_shell: bool = False
description_short = 'subprocess_ssh'
description_long = f"Checking for existence of {file_path}/{file_name}.tar.gz"
status = subprocess_and_log(cmd_check_file, description_short, description_long, check = False,
shell = run_in_shell)
data_is_ready = status.returncode == 0
if not data_is_ready:
# a time check in UTC. If it's late, raise warning, if very late, raise error
time_0 = config[component]['TimeExpectedAvailable']
time_0 = datetime.datetime.strptime(time_0,'%H%M')
time_until_warn = datetime.timedelta(hours=4)
time_until_error = datetime.timedelta(hours=5)
time_warn = (time_0 + time_until_warn).time()
time_error = (time_0 + time_until_error).time()
message = f"Data not yet available for config {i+1} of {len(config_paths)}, expected between {time_0.time()} and {time_error} and long before {time_error}"
if timenow > time_error:
# job is not able to proceed
logger.warning(message)
return False
elif timenow > time_warn:
# job is not ready to proceed
logger.warning(message)
endScript(premature=True)
else:
# some other problem with the job
logger.info(message)
endScript(premature=True)
elif data_is_ready:
logger.info(f"Data is available for config {i+1} of {len(config_paths)}, calculation shall proceed")
return True
def upload(config,FilesToSend,component):
usual_path = f"{config['StartString']}_0000/"
component_path = {
'Environment' : usual_path,
'Deposition' : usual_path,
'Epidemiology' : usual_path,
'Survey' : f"SURVEYDATA_{config['StartString']}_0000/",
'Scraper' : usual_path,
'Advisory' : usual_path }
# TODO: make path discern Daily or Weekly sub-directory
OutputServerPath = f"{config['ServerPath']}/{component_path[component]}"
logger.info(f"Trying upload to {config['ServerName']}:{OutputServerPath}")
logger.info(f"File(s) that will be put on remote server: {FilesToSend}")
if len(FilesToSend) == 0:
logger.warning('No files to send, so skipping this task')
raise IndexError
logger.debug("Making path directory on remote server if it doesn't already exist")
server_key = config['ServerKey']
if server_key == "":
ssh_cmd = f"mkdir -p {OutputServerPath}"
run_in_shell: bool = True
else:
ssh_cmd = ["ssh", "-i", server_key, "-o", "StrictHostKeyChecking=no", config['ServerName'],
f"mkdir -p {OutputServerPath}"]
run_in_shell: bool = False
description_short = 'upload ssh'
description_long = 'make remote directory'
subprocess_and_log(ssh_cmd, description_short, description_long, shell=run_in_shell)
logger.debug('Sending file(s) to remote server')
if server_key == "":
scp_cmd = ["scp", "-r", *FilesToSend, OutputServerPath]
else:
scp_cmd = ["scp", "-ri", server_key, "-o", "StrictHostKeyChecking=no", *FilesToSend,
f"{config['ServerName']}:{OutputServerPath}"]
description_short = 'upload scp'
description_long = 'scp files to remote directory'
subprocess_and_log(scp_cmd, description_short, description_long)
return