FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
full_test_deposition.py 2.55 KiB
Newer Older
import copy
import os
import sys
L. Bower's avatar
L. Bower committed
from ews.coordinator.processor_deposition import ProcessorDeposition
from integration.partial.integration_test_utils import IntegrationTestUtils
from integration.test_suites.depo_test_suite import BaseDepoTestSuite
class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite):
L. Bower's avatar
L. Bower committed
    def set_expected_values(self):
L. Bower's avatar
L. Bower committed
        self.EA_CSV_COUNT = 9
        self.ETH_CSV_COUNT = 9
L. Bower's avatar
L. Bower committed
        self.EA_PNG_COUNT = 3
        self.ETH_PNG_COUNT = 3
        path = IntegrationTestUtils.TEST_OUT_PATH
        if path is None \
                or not os.listdir(IntegrationTestUtils.TEST_OUT_PATH):
            FullTestDeposition.write_temp_run_config_file()
            FullTestDeposition.run_depo_pipeline()
            print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun")


    @staticmethod
    def write_temp_run_config_file():
        default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
        default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
        run_dict: dict = copy.deepcopy(default_config_dict)
        # BaseDepoTestSuite.DepoTestSuite.TEST_OUT_PATH = IntegrationTestUtils.TEST_WORKSPACE_PATH + prefix + os.sep
        run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH
        run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH
        os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True)
        IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json"
        IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME)


    @staticmethod
    def run_depo_pipeline():
        depo_processor = ProcessorDeposition()
        IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME,
L. Bower's avatar
L. Bower committed
                                                   BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR,
                                                   IntegrationTestUtils.TEST_START_DATE,


if __name__ == '__main__':
    _success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestDeposition,
                                                                             test_prefix = "deposition",
                                                                             processor_dir = BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR)
    if not _success:
        sys.exit(1)