FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
Commit f1288ca5 authored by L. Bower's avatar L. Bower
Browse files

adding basic survey tests

parent 9cf51cc8
No related branches found
No related tags found
No related merge requests found
......@@ -30,7 +30,6 @@ dependencies:
- conda-content-trust=0.1.1=pyhd3eb1b0_0
- conda-package-handling=1.7.3=py39h27cfd23_1
- contextily=1.0.1=py_0
- coverage=6.5.0=pypi_0
- cryptography=36.0.0=py39h9ce1e76_0
- curl=7.78.0=hea6ffbf_0
- cycler=0.11.0=pyhd8ed1ab_0
......
......@@ -414,14 +414,17 @@ def get_kobotoolbox_form_as_csv(form_credentials: dict, jobPath: str, config: di
download_success = True
try:
skip_download: bool = config['Survey'].get('SkipServerDownload', False)
request = get_from_kobotoolbox(**form_credentials)
if not skip_download:
try:
except requests.exceptions.RequestException as e:
status.reset('WARNING')
request = get_from_kobotoolbox(**form_credentials)
download_success = False
except requests.exceptions.RequestException as e:
status.reset('WARNING')
download_success = False
# define filenames
csv_filename = f"SurveyData_{form_credentials['form_id']}.csv"
......@@ -429,7 +432,7 @@ def get_kobotoolbox_form_as_csv(form_credentials: dict, jobPath: str, config: di
csv_processed_filename = f"SurveyDataProcessed.csv"
csv_processed_path = f"{output_path}/{csv_processed_filename}"
if download_success:
if download_success and not skip_download:
# parse dataframe
dataframe_raw = build_dataframe(request)
......@@ -448,9 +451,9 @@ def get_kobotoolbox_form_as_csv(form_credentials: dict, jobPath: str, config: di
dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL)
if not download_success:
if not download_success or skip_download:
logger.info("Because server download failed somewhere, trying to recover by copying recent download")
logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download")
copy_success = False
......@@ -681,20 +684,23 @@ def get_WRSIS_form_as_csv(form_credentials: dict, jobPath: str, config: dict, st
logger.debug(f'Performing download from WRSIS between {start_date} and {end_date}')
try:
request = get_from_WRSIS(form_credentials,start_date,end_date)
assert "response" in request.json().keys()
skip_download: bool = config['Survey'].get('SkipServerDownload', False)
if not skip_download:
try:
request = get_from_WRSIS(form_credentials,start_date,end_date)
assert "response" in request.json().keys()
except requests.exceptions.RequestException as e:
status.reset('WARNING')
except requests.exceptions.RequestException as e:
status.reset('WARNING')
download_success = False
download_success = False
except AssertionError:
logger.warning(f"WRSIS returned incomplete data:\n{request.json()}")
status.reset('WARNING')
except AssertionError:
logger.warning(f"WRSIS returned incomplete data:\n{request.json()}")
status.reset('WARNING')
download_success = False
download_success = False
# define filenames
csv_filename = f"SurveyData_raw.csv"
......@@ -702,7 +708,7 @@ def get_WRSIS_form_as_csv(form_credentials: dict, jobPath: str, config: dict, st
csv_processed_filename = f"SurveyDataProcessed.csv"
csv_processed_path = f"{output_path}/{csv_processed_filename}"
if download_success:
if download_success and not skip_download:
# parse dataframe
logger.debug('Saving raw csv file')
......@@ -722,9 +728,9 @@ def get_WRSIS_form_as_csv(form_credentials: dict, jobPath: str, config: dict, st
dataframe_processed.to_csv(csv_processed_path,index=False,quoting=csv.QUOTE_MINIMAL)
if not download_success:
if not download_success or skip_download:
logger.info("Because server download failed somewhere, trying to recover by copying recent download")
logger.info("Because server download failed somewhere (or we are skipping downloads), trying to recover by copying recent download")
copy_success = False
......
import glob
import json
import os
from datetime import datetime
from zipfile import ZipFile
......@@ -41,3 +42,8 @@ class IntegrationTestUtils:
def count_files_in_wildcard(wildcard: str) -> int:
results = glob.glob(wildcard)
return len(results)
@staticmethod
def check_file_not_empty(file_path: str):
return os.stat(file_path).st_size != 0
......@@ -44,7 +44,6 @@ class TestEnvSuit(unittest.TestCase):
os.environ["EMAIL_CRED"] = "../test_data/test_deployment/envs/Cred_gmail.json"
from Processor import run_Process, set_log_level
args_dict: dict = {}
args_dict['component'] = 'Environment'
args_dict['config_paths'] = [IntegrationTestUtils.TEMP_CONFIG_FILE_PATH]
......
......@@ -5,17 +5,37 @@ import unittest
from integration.integration_test_utils import IntegrationTestUtils
class TestEnvSuit(unittest.TestCase):
class TestSurvey(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
default_config = '../test_data/test_deployment/regions/EastAfrica/resources/coordinator/configs/config_EastAfrica_fc_live.json'
self.default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
TEST_OUT_PATH: str = "not_set"
TEST_RUN_DATE: str = '20221001'
TEST_JOB_DIR: str = "SURVEYDATA_" + TEST_RUN_DATE
def test_env_suit_standard_inputs_expected_results1(self):
@classmethod
def setUpClass(cls) -> None:
TestSurvey.write_temp_run_config_file()
TestSurvey.run_survey_pipeline()
@staticmethod
def write_temp_run_config_file():
nowstring: str = IntegrationTestUtils.get_now_string()
# nowstring: str = ""
prefix: str = "temp_survey_" + nowstring
# prefix: str = ""
default_config = IntegrationTestUtils.DEFAULT_CONFIG_FILE_PATH
default_config_dict: dict = IntegrationTestUtils.load_json_file(default_config)
run_dict: dict = copy.deepcopy(default_config_dict)
TestSurvey.TEST_OUT_PATH = run_dict['WorkspacePathout'] + prefix + os.sep
run_dict['WorkspacePathout'] = TestSurvey.TEST_OUT_PATH
run_dict['WorkspacePath'] = TestSurvey.TEST_OUT_PATH
run_dict['Survey']['SkipServerDownload'] = True
run_dict['ServerName'] = '' # nothing, as local machine
IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_PATH)
@staticmethod
def run_survey_pipeline():
os.environ["EMAIL_CRED"] = "../test_data/test_deployment/envs/Cred_gmail.json"
from Processor import run_Process, set_log_level
......@@ -28,19 +48,22 @@ class TestEnvSuit(unittest.TestCase):
args_dict['noupload'] = True
set_log_level(args_dict['log_level'])
run_dict: dict = copy.deepcopy(self.default_config_dict)
test_out_path = run_dict['WorkspacePathout'] + "temp_survey_" + nowstring + os.sep
run_dict['WorkspacePathout'] = test_out_path
run_dict['WorkspacePath'] = test_out_path
run_dict['Survey']['SkipServerDownload'] = True
run_dict['ServerName'] = '' # nothing, as local machine
IntegrationTestUtils.unpack_zip(IntegrationTestUtils.EXAMPLE_SURVEY_FILE_PATH, TestSurvey.TEST_OUT_PATH)
IntegrationTestUtils.write_json_file(run_dict, IntegrationTestUtils.TEMP_CONFIG_FILE_PATH)
run_Process(args_dict)
IntegrationTestUtils.unpack_zip(IntegrationTestUtils.EXAMPLE_SURVEY_FILE_PATH, test_out_path)
def test_standard_run_input_status_success(self):
status_file_path = os.path.join(TestSurvey.TEST_OUT_PATH, TestSurvey.TEST_JOB_DIR, "STATUS_SUCCESS")
success_file_exists: bool = os.path.isfile(status_file_path)
self.assertTrue(success_file_exists)
run_Process(args_dict)
self.assertTrue(True)
def test_standard_run_input_merged_survey_created(self):
status_file_path = os.path.join(TestSurvey.TEST_OUT_PATH, TestSurvey.TEST_JOB_DIR,
"ExportCSV", "Merged_SurveyData.csv")
success_file_exists: bool = os.path.isfile(status_file_path)
file_not_empty: bool = IntegrationTestUtils.check_file_not_empty(status_file_path)
self.assertTrue(success_file_exists)
self.assertTrue(file_not_empty)
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment