From fc540db0b7393232c2b987e880a1fc067518ff3b Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Fri, 17 May 2024 14:41:41 +0100 Subject: [PATCH] moving flag_dir into the coordinator package --- ews/coordinator/processor_base.py | 3 +- ews/coordinator/utils/jobstatus.py | 233 +++++++++++++++++++ ews/coordinator/utils/processor_utils.py | 3 +- tests/unit/coordinator/utils/__init__.py | 0 tests/unit/coordinator/utils/test_flagdir.py | 145 ++++++++++++ 5 files changed, 381 insertions(+), 3 deletions(-) create mode 100644 ews/coordinator/utils/jobstatus.py create mode 100644 tests/unit/coordinator/utils/__init__.py create mode 100644 tests/unit/coordinator/utils/test_flagdir.py diff --git a/ews/coordinator/processor_base.py b/ews/coordinator/processor_base.py index 4a93861..08985e8 100755 --- a/ews/coordinator/processor_base.py +++ b/ews/coordinator/processor_base.py @@ -20,6 +20,7 @@ from abc import abstractmethod, ABCMeta from typing import List from ews.coordinator.utils import processor_utils +from ews.coordinator.utils.jobstatus import jobStatus from ews.coordinator.utils.processor_utils import short_name, open_and_check_config, end_script, end_job, append_item_to_list, \ clear_up @@ -35,8 +36,6 @@ import os from pathlib import Path import sys -# gitlab projects -from flagdir import jobStatus # created by jws52 # submodules of this project diff --git a/ews/coordinator/utils/jobstatus.py b/ews/coordinator/utils/jobstatus.py new file mode 100644 index 0000000..23fb542 --- /dev/null +++ b/ews/coordinator/utils/jobstatus.py @@ -0,0 +1,233 @@ +#jobstatus.py +''' +Basic flag-file handling within a provided directory. + +Running with `conda activate py3EWSepi`. +''' +import os +from glob import glob + +class jobStatus: + ''' + Context manager to determine and change status of a job based on status file + in a given directory. + + Only possible status values are 'SUCCESS', 'INPROGRESS', 'HELD', 'WARNING' + and 'ERROR'. + + Use jobStatus.reset(status) to change the current status file. + + Use jobStatus.get() to perform a fresh search for status file. + + If you don't want any status files for this directory anymore, use + jobStatus.cleanup() + + Suggested usage (in between task commands): + jobPath = 'testjob' + js = jobStatus(jobPath) + print(js.status) # None + js.set('INPROGRESS') + print(js.status) # 'INPROGRESS' + print(js.statusPath) # 'testjob/STATUS_INPROGRESS' + js.reset('WARNING') + print(js.status) # 'WARNING' + print(js.statusPath) # 'testjob/STATUS_WARNING' + js.is_warning() + status = js.get() + js.reset('SUCCESS') + #js.cleanup() # if success file not needed + ''' + + _status_dict = { + #-1: None, + 0: 'SUCCESS', + 1: 'INPROGRESS', + 2: 'WARNING', + 3: 'ERROR', + 4: 'HELD', + 5: 'SUCCESS_WITH_WARNING', + 6: 'SUCCESS_WITH_ERROR' + } + + _acceptable_end_status = ['SUCCESS','ERROR','HELD','SUCCESS_WITH_WARNING','SUCCESS_WITH_ERROR'] + + _default_end_status = 'ERROR' + + def __init__(self,jobPath): + + # determine job directory + + self.jobPath = jobPath + + print(f"job path is {self.jobPath}") + + if not os.path.isdir(self.jobPath): + raise FileNotFoundError + + def __enter__(self): + + # determine status + + # check if it already exists + self.status = self.get() + + self.had_initial_status = self.has_status() + + # otherwise, initialise + if not self.had_initial_status: + print('No initial status, so setting to INPROGRESS') + self.set('INPROGRESS') + + return self + + def __exit__(self, type, value, traceback): + + status = self.get() + + if self.had_initial_status: + + print('exiting without changing status') + + return + + if status not in self._acceptable_end_status: + + print(f"Changing status from {status} to {self._default_end_status}") + + self.reset(self._default_end_status) + + def cleanup(self): + # remove any current status file + statusPaths = glob(f"{self.jobPath}/STATUS_*") + + #print(f"statusPaths are\n:{statusPaths}") + + for f in statusPaths: + + try: + print(f' removing {f}') + os.remove(f) + self.status = None + + except OSError: + pass # there is no file to remove + + return + + def reset(self,new_status_in): + + # ensure status is acceptable + # for now, handle string only + assert isinstance(new_status_in,str) + assert new_status_in in self._status_dict.values() + + # check statusPath has already been defined + assert hasattr(self,'statusPath') + + # modify new status if a warning or error once occurred + new_status = new_status_in + top_statuses = ['WARNING','ERROR'] + if (self.status in top_statuses) & (new_status not in top_statuses): + new_status = f"{new_status_in}_WITH_{self.status}" + + new_statusPath = f"{self.jobPath}/STATUS_{new_status}" + + if self.statusPath is not None: + + print(f'Renaming {self.statusPath} to {new_statusPath}') + + os.rename(self.statusPath,new_statusPath) + + else: + + print(f'Creating {new_statusPath}') + + # by running set below + + self.set(new_status) + + return + + def set(self,status): + + # ensure it is acceptable status + # for now, handle string only + assert isinstance(status,str) + print(status) + assert status in self._status_dict.values() + + # set file status + self.statusPath = f"{self.jobPath}/STATUS_{status}" + + open(self.statusPath,'a').close() + + # set object status + self.status = status + return + + def get_from_file_path(self,statusPath): + '''Convert path to status string.''' + + filename = statusPath.split('/')[-1] + + statusString = filename.split("STATUS_")[-1] + + print(statusString) + if statusString not in self._status_dict.values(): + # Status file descriptor not recognised + raise KeyError + + return statusString + + def get_status_path(self): + '''This might not be needed all the time.''' + + statusFiles = glob(f"{self.jobPath}/STATUS_*") + + if len(statusFiles) > 1: + print('ERROR: require only one status file in job directory') + raise AssertionError + + elif len(statusFiles) == 0: + return None + + else: + return statusFiles[0] + + def get(self,statuspath=None): + '''Fresh look for status file and set object status based on that. + Does not modify self.status directly''' + + if statuspath is None: + self.statusPath = self.get_status_path() + + if self.statusPath is None: + status = None + else: + status = self.get_from_file_path(self.statusPath) + + return status + + # logical form of status + def has_status(self): + return (hasattr(self,'status')) & (self.status in self._status_dict.values()) + + def is_success(self): + self.get() + return self.status == 'SUCCESS' + + def is_inprogress(self): + self.get() + return self.status == 'INPROGRESS' + + def is_warning(self): + self.get() + return self.status == 'WARNING' + + def is_error(self): + self.get() + return self.status == 'ERROR' + + def is_held(self): + self.get() + return self.status == 'HELD' diff --git a/ews/coordinator/utils/processor_utils.py b/ews/coordinator/utils/processor_utils.py index 789511e..de5267f 100644 --- a/ews/coordinator/utils/processor_utils.py +++ b/ews/coordinator/utils/processor_utils.py @@ -18,7 +18,8 @@ from typing import List from iris import load from iris.cube import CubeList -from flagdir import jobStatus # created by jws52 +from ews.coordinator.utils.jobstatus import jobStatus + logger = logging.getLogger(__name__) diff --git a/tests/unit/coordinator/utils/__init__.py b/tests/unit/coordinator/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/coordinator/utils/test_flagdir.py b/tests/unit/coordinator/utils/test_flagdir.py new file mode 100644 index 0000000..ea793c1 --- /dev/null +++ b/tests/unit/coordinator/utils/test_flagdir.py @@ -0,0 +1,145 @@ +# test_flagdir.py +''' +Unit tests for jobStatus class. + +Basic rules for jobstatus.jobStatus to follow: +1) It should be called as a context manager +2) If a status already exists, it exits without touching it + + + +''' +import unittest +import tempfile +import shutil +import os + +from ews.coordinator.utils import jobstatus + + +class Test_jobStatus(unittest.TestCase): + '''Functions to test file and directory manipulation.''' + + + # TODO: replace this SetUp and tearDown approach with a context manager, for more robust workspace deletion + def setUp(self): + # TODO: Is it worth placing this temporary workspace in the project directory? + self.jobdir = tempfile.mkdtemp() + + + def tearDown(self): + shutil.rmtree(self.jobdir) + + + def test_basic(self): + ''' + Basic test of unittest functionality. + ''' + self.assertEqual(True, True) + + + def test_context_call_assigns_status(self): + with jobstatus.jobStatus(self.jobdir) as js: + self.assertTrue(hasattr(js, 'status')) + + + def test_direct_call_does_not_assign_status(self): + # because it should be used as a context manager + + self.assertFalse(hasattr(jobstatus.jobStatus(self.jobdir), 'status')) + + + def test_init_fails_when_path_not_exists(self): + self.assertRaises(FileNotFoundError, jobstatus.jobStatus, self.jobdir + 'fake') + + + def test_get_from_file_path_fails_if_bad_file_status(self): + # make a status file in job directory + statusPath = self.jobdir + '/STATUS_UNKNOWN' + with open(statusPath, 'w'): pass + + self.assertRaises(KeyError, jobstatus.jobStatus.get_from_file_path, jobstatus.jobStatus, statusPath) + + + def test_set_can_create_status_file(self): + js = jobstatus.jobStatus(self.jobdir) + + statusPathExpected = self.jobdir + '/STATUS_SUCCESS' + self.assertFalse(os.path.isfile(statusPathExpected)) + + js.set('SUCCESS') + + self.assertTrue(os.path.isfile(statusPathExpected)) + + + def test_get_gets_status_if_file_is_present(self): + # make a status file in job directory + statusPath = self.jobdir + '/STATUS_SUCCESS' + with open(statusPath, 'w'): pass + + js = jobstatus.jobStatus(self.jobdir) + + self.assertIsInstance(js.get(), str) + + + def test_cleanup_can_remove_status(self): + # make a status file in job directory + statusPathStart = self.jobdir + '/STATUS_INPROGRESS' + with open(statusPathStart, 'w'): pass + self.assertTrue(os.path.isfile(statusPathStart)) + + with jobstatus.jobStatus(self.jobdir) as js: + js.cleanup() + + self.assertFalse(os.path.isfile(statusPathStart)) + + + def test_reset_can_change_status(self): + # make a status file in job directory + statusPathStart = self.jobdir + '/STATUS_INPROGRESS' + with open(statusPathStart, 'w'): pass + + with jobstatus.jobStatus(self.jobdir) as js: + statusPathEnd = self.jobdir + '/STATUS_SUCCESS' + + js.reset('SUCCESS') + + self.assertFalse(os.path.isfile(statusPathStart)) + self.assertTrue(os.path.isfile(statusPathEnd)) + + + def test_existing_status_is_untouched(self): + # make a status file in job directory + statusPathStart = self.jobdir + '/STATUS_WARNING' + with open(statusPathStart, 'w'): pass + + with jobstatus.jobStatus(self.jobdir) as js: + pass + + self.assertTrue(os.path.isfile(statusPathStart)) + + + def test_existing_warning_status_is_remembered(self): + # make a status file in job directory + statusPathStart = self.jobdir + '/STATUS_WARNING' + with open(statusPathStart, 'w'): pass + + with jobstatus.jobStatus(self.jobdir) as js: + js.reset('SUCCESS') + + statusPathExpected = self.jobdir + '/STATUS_SUCCESS_WITH_WARNING' + + self.assertTrue(os.path.isfile(statusPathExpected)) + + + def test_existing_error_status_is_remembered(self): + # make a status file in job directory + statusPathStart = self.jobdir + '/STATUS_ERROR' + with open(statusPathStart, 'w'): pass + + with jobstatus.jobStatus(self.jobdir) as js: + js.reset('SUCCESS') + + statusPathExpected = self.jobdir + '/STATUS_SUCCESS_WITH_ERROR' + + self.assertTrue(os.path.isfile(statusPathExpected)) -- GitLab