Newer
Older
from unittest import TestSuite, TestLoader, TestCase
from HtmlTestRunner import HTMLTestRunner
EMAIL_CRED_PATH: str = "../../test_data/test_deployment/envs/Cred_gmail.json"
DEFAULT_CONFIG_FILE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json"
L. Bower
committed
TEST_WORKSPACE_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/workspace/"
TEMP_CONFIG_FILE_NAME: str = None
L. Bower
committed
TEST_ASSETS_PATH: str = "../../test_data/test_deployment/regions/EastAfrica/resources/coordinator/assets/"
EXAMPLE_SURVEY_FILE_PATH: str = TEST_ASSETS_PATH + "example_survey_run.zip"
EXAMPLE_DEPO_FILE_PATH: str = TEST_ASSETS_PATH + "example_depo_run.zip"
EXAMPLE_ENV_SUIT_FILE_PATH: str = TEST_ASSETS_PATH + "example_env_suit_run.zip"
TEST_OUT_PATH: str = None
TEST_START_DATE: str = None
TEST_JOB_DIR: str = None
@staticmethod
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument('--config', required = True)
parser.add_argument('--outdir', required = True)
parser.add_argument('--email_cred', required = True)
parser.add_argument('--test_report_dir', required = False)
parser.add_argument('--run_date_type', required = False)
parser.add_argument('--custom_run_date', required = False)
parser.add_argument('unittest_args', nargs='*')
return parser
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@staticmethod
def rune(test_case: [TestCase], job_prefix: str):
_parser = IntegrationTestUtils.build_arg_parser()
_args = _parser.parse_args()
_config_file: str = _args.config
_outdir: str = _args.outdir
_email_cred_path: str = _args.email_cred
_test_report_dir: str = _args.test_report_dir
_run_date_type: str = _args.run_date_type
_custom_run_date: str = _args.custom_run_date
IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH = _config_file
nowstring: str = IntegrationTestUtils.get_now_string()
prefix: str = f"temp_{job_prefix}_" + nowstring
# prefix: str = f"temp_{job_prefix}"
IntegrationTestUtils.TEST_OUT_PATH = _outdir + prefix + os.sep
IntegrationTestUtils.EMAIL_CRED_PATH = _email_cred_path
IntegrationTestUtils.TEST_START_DATE = IntegrationTestUtils.generate_run_date(_run_date_type,
_custom_run_date)
IntegrationTestUtils.TEST_JOB_DIR = os.path.join(IntegrationTestUtils.TEST_OUT_PATH,
f"{job_prefix}_" +
IntegrationTestUtils.TEST_START_DATE)
# Now set the sys.argv to the unittest_args (leaving sys.argv[0] alone)
sys.argv[1:] = _args.unittest_args
tests: TestSuite = TestLoader().loadTestsFromTestCase(test_case)
if _test_report_dir is None:
_test_report_dir = IntegrationTestUtils.TEST_JOB_DIR
runner = HTMLTestRunner(output = _test_report_dir, combine_reports = True)
runner.run(tests)
@staticmethod
def load_json_file(file: str) -> dict:
with open(file) as config_file:
config: dict = json.load(config_file)
return config
@staticmethod
def write_json_file(values: dict, file: str):
with open(file, 'w') as file:
json.dump(values, file, indent = 4)
@staticmethod
def get_now_string() -> str:
L. Bower
committed
nowstring: str = datetime.datetime.today().strftime('%Y-%m-%d_%H%M%S')
@staticmethod
def unpack_zip(zip_to_unpack: str, out_file: str):
with ZipFile(zip_to_unpack) as zf: # open the zip file
for target_file in zf.namelist(): # check if the file exists in the archive
zf.extract(target_file, out_file)
@staticmethod
def count_files_in_wildcard(wildcard: str) -> int:
results = glob.glob(wildcard)
return len(results)
def check_file_not_empty(file_path: str) -> bool:
@staticmethod
def check_file_exists(file_path: str) -> bool:
return os.path.isfile(file_path)
@staticmethod
def check_wildcard_exists_and_not_empty(wildcard: str) -> bool:
"""
requires at least one file matching the wildcard to exist and not be empty
"""
result = False
files: List[str] = glob.glob(wildcard)
for file in files:
result = IntegrationTestUtils.check_file_not_empty(file)
if result is False:
break
return result
@staticmethod
def check_file_exists_and_not_empty(file_path: str) -> bool:
file_exists = IntegrationTestUtils.check_file_exists(file_path)
file_not_empty = IntegrationTestUtils.check_file_not_empty(file_path)
return file_exists and file_not_empty
@staticmethod
def run_unittest_pipeline(component: str,
start_date: str,
**kwargs):
# need EMAIL_CRED in the environment before we import Processor
os.environ["EMAIL_CRED"] = IntegrationTestUtils.EMAIL_CRED_PATH
from Processor import run_Process, set_log_level
args_dict: dict = {}
# note, possible to override these values in the kwargs loop below
args_dict['live'] = False
args_dict['noupload'] = True
args_dict['start_date'] = start_date
args_dict['component'] = component
L. Bower
committed
args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
args_dict['log_level'] = 'info'
for key, value in kwargs.items():
args_dict[key] = value
log_level = args_dict['log_level']
set_log_level(log_level)
try:
run_Process(args_dict)
except SystemExit:
# we will eventually want to throw these to the calling class to be dealt with
pass
@staticmethod
def run_external_pipeline(component: str,
start_date: str,
email_cred_path: str,
**kwargs):
# need EMAIL_CRED in the environment before we import Processor
os.environ["EMAIL_CRED"] = email_cred_path
import Processor
reload(Processor)
from Processor import run_Process, set_log_level
args_dict: dict = {}
# note, possible to override these values in the kwargs loop below
args_dict['live'] = False
args_dict['noupload'] = True
args_dict['start_date'] = start_date
args_dict['component'] = component
L. Bower
committed
args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_NAME]
args_dict['clearup'] = True
for key, value in kwargs.items():
args_dict[key] = value
log_level = args_dict['log_level']
set_log_level(log_level)
try:
run_Process(args_dict)
except SystemExit:
# we will eventually want to throw these to the calling class to be dealt with
pass
@staticmethod
def get_day_before_as_string(input_date_string: str) -> str:
date_format = "%Y%m%d"
input_date: datetime = datetime.datetime.strptime(input_date_string, date_format)
yesterday_date = input_date - datetime.timedelta(days = 1)
yesterday_string = yesterday_date.strftime(date_format)
return yesterday_string
@staticmethod
def generate_run_date(run_date_type: str,
custom_run_date: str) -> str:
if run_date_type == "today":
print("today")
today = datetime.date.today()
today_string = today.strftime("%Y%m%d")
result = today_string
elif run_date_type == "yesterday":
print("yesterday")
today = datetime.date.today()
yesterday: datetime = today - datetime.timedelta(days = 1)
yesterday_string = yesterday.strftime("%Y%m%d")
result = yesterday_string
elif run_date_type == "custom":
result = custom_run_date
else:
print("default")
result = "20230126"
return result