import copy import os import sys from ews.coordinator.processor_deposition import ProcessorDeposition from ews.coordinator.processor_environment import ProcessorEnvironment from ews.coordinator.processor_epidemiology import ProcessorEpidemiology from integration.partial.integration_test_utils import IntegrationTestUtils from integration.test_suites.depo_test_suite import BaseDepoTestSuite from integration.test_suites.env_suit_test_suite import BaseEnvSuitTestSuite from integration.test_suites.epi_test_suite import BaseEpiTestSuite class FullTestEpi(BaseEpiTestSuite.EpiTestSuite): def set_expected_values(self): # nothing to override here pass def setUp(self) -> None: self.set_expected_values() if IntegrationTestUtils.TEST_OUT_PATH is None \ or not os.listdir(IntegrationTestUtils.TEST_OUT_PATH): # if True: FullTestEpi.write_temp_run_config_files() FullTestEpi.run_dependent_pipelines() FullTestEpi.run_epi_pipeline() else: print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun") @staticmethod def write_temp_run_config_files(): default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config) run_dict: dict = copy.deepcopy(default_config_dict) run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH # we need to run the env suit pipeline as well run_dict['Environment']['WORK_PATH'] = IntegrationTestUtils.TEST_OUT_PATH run_dict['Environment']['INPUT_PATH'] = IntegrationTestUtils.TEST_OUT_PATH run_dict['Environment']['OUTPUT_PATH'] = IntegrationTestUtils.TEST_OUT_PATH #make sure we activate server download run_dict['Survey']['SkipServerDownload'] = False # change the config for the Epi so it only runs on a single past date. Requires use of a different # FileListerFunction and a different calculation span run_dict['Epidemiology']['CalculationSpanDays'] = [0, 1] run_dict['Epidemiology']['Deposition']['FileListerFunction'] = "list_onefile_operational" run_dict['Epidemiology']['Environment']['FileListerFunction'] = "list_onefile_operational" """ When set to true, continue mode speeds up computation time and reduces data volume by starting from the start of model formulations. This requires each epidemiological model formulation described under 'Epi' to have an inputs specified alongside Deposition and Environment. In this test version, we do not have prior runs to draw from, so setting to false """ run_dict['Epidemiology']['continue'] = False previous_day_string: str = IntegrationTestUtils.get_day_before_as_string(IntegrationTestUtils.TEST_START_DATE) run_dict['Survey']['SeasonStartString'] = previous_day_string # may be reusing a non-timestamped output file during development, so allow extant TEST_OUT_PATH os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True) IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json" IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME) @staticmethod def run_dependent_pipelines(): depo_processor = ProcessorDeposition() IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME, BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR, IntegrationTestUtils.TEST_START_DATE, depo_processor) env_processor = ProcessorEnvironment() IntegrationTestUtils.run_external_pipeline(BaseEnvSuitTestSuite.EnvSuitTestSuite.ENV_COMPONENT_NAME, BaseEnvSuitTestSuite.EnvSuitTestSuite.ENV_PROCESSOR_DIR, IntegrationTestUtils.TEST_START_DATE, env_processor) pass @staticmethod def run_epi_pipeline(): epi_processor = ProcessorEpidemiology() IntegrationTestUtils.run_external_pipeline(BaseEpiTestSuite.EpiTestSuite.EPI_COMPONENT_NAME, BaseEpiTestSuite.EpiTestSuite.EPI_PROCESSOR_DIR, IntegrationTestUtils.TEST_START_DATE, epi_processor) if __name__ == '__main__': _success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestEpi, test_prefix = "epi", processor_dir = BaseEpiTestSuite.EpiTestSuite.EPI_PROCESSOR_DIR) if not _success: sys.exit(1)