Something went wrong on our end
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
full_test_deposition.py 2.44 KiB
import copy
import os
import sys
from coordinator.ProcessorDeposition import ProcessorDeposition
from integration.partial.integration_test_utils import IntegrationTestUtils
from integration.test_suites.depo_test_suite import BaseDepoTestSuite
class FullTestDeposition(BaseDepoTestSuite.DepoTestSuite):
def set_expected_values(self):
self.EA_CSV_COUNT = 9
self.ETH_CSV_COUNT = 9
self.EA_PNG_COUNT = 3
self.ETH_PNG_COUNT = 3
def setUp(self) -> None:
self.set_expected_values()
path = IntegrationTestUtils.TEST_OUT_PATH
if path is None \
or not os.listdir(IntegrationTestUtils.TEST_OUT_PATH):
FullTestDeposition.write_temp_run_config_file()
FullTestDeposition.run_depo_pipeline()
else:
print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun")
@staticmethod
def write_temp_run_config_file():
default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
run_dict: dict = copy.deepcopy(default_config_dict)
# BaseDepoTestSuite.DepoTestSuite.TEST_OUT_PATH = IntegrationTestUtils.TEST_WORKSPACE_PATH + prefix + os.sep
run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH
run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH
os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True)
IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json"
IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME)
@staticmethod
def run_depo_pipeline():
depo_processor = ProcessorDeposition()
IntegrationTestUtils.run_external_pipeline(BaseDepoTestSuite.DepoTestSuite.DEPO_COMPONENT_NAME,
IntegrationTestUtils.TEST_START_DATE,
depo_processor)
if __name__ == '__main__':
_success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestDeposition,
test_prefix = "deposition",
processor_dir = BaseDepoTestSuite.DepoTestSuite.DEPO_PROCESSOR_DIR)
if not _success:
sys.exit(1)