Newer
Older
EMAIL_CRED_PATH: str = "../../test_data/test_deployment/envs/Cred_gmail.json"
DEFAULT_CONFIG_FILE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json"
L. Bower
committed
TEST_WORKSPACE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/workspace/"
TEMP_CONFIG_FILE_NAME: str = "temp_config.json"
TEST_ASSETS_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/assets/"
EXAMPLE_SURVEY_FILE_PATH: str = TEST_ASSETS_PATH + "example_survey_run.zip"
EXAMPLE_DEPO_FILE_PATH: str = TEST_ASSETS_PATH + "example_depo_run.zip"
EXAMPLE_ENV_SUIT_FILE_PATH: str = TEST_ASSETS_PATH + "example_env_suit_run.zip"
@staticmethod
def load_json_file(file: str) -> dict:
with open(file) as config_file:
config: dict = json.load(config_file)
return config
@staticmethod
def write_json_file(values: dict, file: str):
with open(file, 'w') as file:
json.dump(values, file, indent = 4)
@staticmethod
def get_now_string() -> str:
L. Bower
committed
nowstring: str = datetime.datetime.today().strftime('%Y-%m-%d_%H%M%S')
@staticmethod
def unpack_zip(zip_to_unpack: str, out_file: str):
with ZipFile(zip_to_unpack) as zf: # open the zip file
for target_file in zf.namelist(): # check if the file exists in the archive
zf.extract(target_file, out_file)
@staticmethod
def count_files_in_wildcard(wildcard: str) -> int:
results = glob.glob(wildcard)
return len(results)
def check_file_not_empty(file_path: str) -> bool:
@staticmethod
def check_file_exists(file_path: str) -> bool:
return os.path.isfile(file_path)
@staticmethod
def check_wildcard_exists_and_not_empty(wildcard: str) -> bool:
"""
requires at least one file matching the wildcard to exist and not be empty
"""
result = False
files: List[str] = glob.glob(wildcard)
for file in files:
result = IntegrationTestUtils.check_file_not_empty(file)
if result is False:
break
return result
@staticmethod
def check_file_exists_and_not_empty(file_path: str) -> bool:
file_exists = IntegrationTestUtils.check_file_exists(file_path)
file_not_empty = IntegrationTestUtils.check_file_not_empty(file_path)
return file_exists and file_not_empty
@staticmethod
def run_unittest_pipeline(component: str,
start_date: str,
**kwargs):
# need EMAIL_CRED in the environment before we import Processor
os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH
from Processor import run_Process, set_log_level
args_dict: dict = {}
# note, possible to override these values in the kwargs loop below
args_dict['live'] = False
args_dict['noupload'] = True
args_dict['start_date'] = start_date
args_dict['component'] = component
L. Bower
committed
args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
args_dict['log_level'] = 'info'
for key, value in kwargs.items():
args_dict[key] = value
log_level = args_dict['log_level']
set_log_level(log_level)
try:
run_Process(args_dict)
except SystemExit:
# we will eventually want to throw these to the calling class to be dealt with
pass
@staticmethod
def run_external_pipeline(component: str,
start_date: str,
email_cred_path: str,
**kwargs):
# need EMAIL_CRED in the environment before we import Processor
os.environ["EMAIL_CRED"] = email_cred_path
from Processor import run_Process, set_log_level
args_dict: dict = {}
# note, possible to override these values in the kwargs loop below
args_dict['live'] = False # what is this
args_dict['noupload'] = True # need to work out where to upload the results too when a test
args_dict['start_date'] = start_date
args_dict['component'] = component
L. Bower
committed
args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
args_dict['log_level'] = 'info'
args_dict['clearup'] = False # what is this
for key, value in kwargs.items():
args_dict[key] = value
log_level = args_dict['log_level']
set_log_level(log_level)
try:
run_Process(args_dict)
except SystemExit:
# we will eventually want to throw these to the calling class to be dealt with
pass
@staticmethod
def generate_run_date(run_date_type: str,
custom_run_date: str) -> str:
if run_date_type == "today":
print("today")
today = datetime.date.today()
today_string = today.strftime("%Y%m%d")
result = today_string
elif run_date_type == "yesterday":
print("yesterday")
today = datetime.date.today()
yesterday: datetime = today - datetime.timedelta(days = 1)
yesterday_string = yesterday.strftime("%Y%m%d")
result = yesterday_string
elif run_date_type == "custom":
result = custom_run_date
else:
print("default")
result = "20230126"
return result