import copy import os from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite): def set_expected_values(self): self.EA_CSV_COUNT = 9 self.ETH_CSV_COUNT = 9 self.EA_PNG_COUNT = 3 self.ETH_PNG_COUNT = 3 def setUp(self) -> None: self.set_expected_values() if IntegrationTestUtils.TEST_OUT_PATH is None \ or not os.path.isdir(IntegrationTestUtils.TEST_OUT_PATH): FullTestDeposition.write_temp_run_config_file() FullTestDeposition.run_depo_pipeline() else: print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun") @staticmethod def write_temp_run_config_file(): default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config) run_dict: dict = copy.deepcopy(default_config_dict) # BaseDepoTestSuite.DepoTestSuite.TEST_OUT_PATH = IntegrationTestUtils.TEST_WORKSPACE_PATH + prefix + os.sep run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True) IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json" IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME) @staticmethod def run_depo_pipeline(): IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME, IntegrationTestUtils.TEST_START_DATE, IntegrationTestUtils.EMAIL_CRED_PATH) if __name__ == '__main__': _success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestDeposition, test_prefix = "deposition", processor_dir = BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR) if not _success: exit(1)