FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
full_test_deposition.py 2.44 KiB
import copy
import os
import sys

from coordinator.ProcessorDeposition import ProcessorDeposition
from integration.partial.integration_test_utils import IntegrationTestUtils
from integration.test_suites.depo_test_suite import BaseDepoTestSuite


class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite):

    def set_expected_values(self):
        self.EA_CSV_COUNT = 9
        self.ETH_CSV_COUNT = 9
        self.EA_PNG_COUNT = 3
        self.ETH_PNG_COUNT = 3


    def setUp(self) -> None:

        self.set_expected_values()

        path = IntegrationTestUtils.TEST_OUT_PATH
        if path is None \
                or not os.listdir(IntegrationTestUtils.TEST_OUT_PATH):
            FullTestDeposition.write_temp_run_config_file()
            FullTestDeposition.run_depo_pipeline()
        else:
            print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun")


    @staticmethod
    def write_temp_run_config_file():

        default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
        default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
        run_dict: dict = copy.deepcopy(default_config_dict)
        # BaseDepoTestSuite.DepoTestSuite.TEST_OUT_PATH = IntegrationTestUtils.TEST_WORKSPACE_PATH + prefix + os.sep
        run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH
        run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH

        os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True)
        IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json"
        IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME)


    @staticmethod
    def run_depo_pipeline():
        depo_processor = ProcessorDeposition()
        IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME,
                                                   IntegrationTestUtils.TEST_START_DATE,
                                                   depo_processor)


if __name__ == '__main__':
    _success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestDeposition,
                                                                             test_prefix = "deposition",
                                                                             processor_dir = BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR)
    if not _success:
        sys.exit(1)