Newer
Older
import copy
import os
from integration.partial.integration_test_utils import IntegrationTestUtils
L. Bower
committed
from integration.test_suites.depo_test_suite import BaseDepoTestSuite
L. Bower
committed
class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite):
def set_expected_values(self):
self.EA_CSV_COUNT = 9
self.ETH_CSV_COUNT = 9
self.EA_PNG_COUNT = 3
self.ETH_PNG_COUNT = 3
L. Bower
committed
def setUp(self) -> None:
self.set_expected_values()
if IntegrationTestUtils.TEST_OUT_PATH is None \
or not os.path.isdir(IntegrationTestUtils.TEST_OUT_PATH):
L. Bower
committed
FullTestDeposition.write_temp_run_config_file()
FullTestDeposition.run_depo_pipeline()
else:
print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun")
@staticmethod
def write_temp_run_config_file():
L. Bower
committed
default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
run_dict: dict = copy.deepcopy(default_config_dict)
L. Bower
committed
# BaseDepoTestSuite.DepoTestSuite.TEST_OUT_PATH = IntegrationTestUtils.TEST_WORKSPACE_PATH + prefix + os.sep
run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH
run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH
os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True)
IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json"
L. Bower
committed
IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME)
@staticmethod
def run_depo_pipeline():
IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME,
IntegrationTestUtils.EMAIL_CRED_PATH)
if __name__ == '__main__':
L. Bower
committed
_success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestDeposition,
test_prefix = "deposition",
processor_dir = BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR)
if not _success:
exit(1)