FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
full_test_survey.py 2.39 KiB
Newer Older
import copy
import os
import sys

L. Bower's avatar
L. Bower committed
from ews.coordinator.processor_surveys import ProcessorSurveys
from integration.partial.integration_test_utils import IntegrationTestUtils
from integration.test_suites.survey_test_suite import BaseSurveyTestSuite
class FullTestSurvey(BaseSurveyTestSuite.SurveyTestSuite):
    def set_expected_values(self):
        #  no values to set
        pass

    def setUp(self) -> None:

        self.set_expected_values()

        if IntegrationTestUtils.TEST_OUT_PATH is None \
                or not os.listdir(IntegrationTestUtils.TEST_OUT_PATH):
            FullTestSurvey.write_temp_run_config_file()
            FullTestSurvey.run_survey_pipeline()
            print(f"output in {IntegrationTestUtils.TEST_OUT_PATH} already written, skipping rerun")


    @staticmethod
    def write_temp_run_config_file():
        default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
        default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
        run_dict: dict = copy.deepcopy(default_config_dict)
        run_dict['WorkspacePathout'] = IntegrationTestUtils.TEST_OUT_PATH
        run_dict['WorkspacePath'] = IntegrationTestUtils.TEST_OUT_PATH
        run_dict['Survey']['SkipServerDownload'] = False

        os.makedirs(IntegrationTestUtils.TEST_OUT_PATH, exist_ok = True)
        IntegrationTestUtils.TEMP_CONFIG_FILE_NAME = IntegrationTestUtils.TEST_OUT_PATH + "temp_config.json"
        IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_NAME)

    @staticmethod
    def run_survey_pipeline():
        survey_processor = ProcessorSurveys()
        IntegrationTestUtils.run_external_pipeline(BaseSurveyTestSuite.SurveyTestSuite.SURVEY_COMPONENT_NAME,
L. Bower's avatar
L. Bower committed
                                                    BaseSurveyTestSuite.SurveyTestSuite.SURVEY_PROCESSOR_DIR,
                                                   IntegrationTestUtils.TEST_START_DATE,


if __name__ == '__main__':
    _success: bool = IntegrationTestUtils.run_full_integration_test_pipeline(FullTestSurvey,
                                                                             test_prefix = "survey",
                                                                             processor_dir = BaseSurveyTestSuite.SurveyTestSuite.SURVEY_PROCESSOR_DIR)
    if not _success:
        sys.exit(1)