From ee80d2a19b61f9810cb4f0df85f85983bcbc3ae0 Mon Sep 17 00:00:00 2001 From: lb584 <lb584@cam.ac.uk> Date: Thu, 11 Jul 2024 12:20:51 +0100 Subject: [PATCH] removing processor_scraper.py, media scraper now lives in its own gitlab repo https://gitlab.developers.cam.ac.uk/gilligan-epid/wheat-rusts/media-scraper --- ews/coordinator/processor_scraper.py | 468 --------------------------- 1 file changed, 468 deletions(-) delete mode 100644 ews/coordinator/processor_scraper.py diff --git a/ews/coordinator/processor_scraper.py b/ews/coordinator/processor_scraper.py deleted file mode 100644 index 41f8243..0000000 --- a/ews/coordinator/processor_scraper.py +++ /dev/null @@ -1,468 +0,0 @@ -#ProcessorScraper.py -'''Functions to process the media scraper reports available from Asif Al -Faisal's dashboard component. - -Downloads a csv file of news reports and reformats them as survey records to -provide to the wheat rust early warning system. - -The format and content of the csv of news reports is based on ARRCC work by -Asif Al Faisal (CIMMYT-Bangladesh).''' - -import datetime -import json -import os -from pathlib import Path -import requests -import smtplib -import ssl -import subprocess - -import certifi -from numpy import where -from pandas import concat, DataFrame, read_csv, Series, set_option - -from processor_base import ProcessorBase -# gitlab projects -# TODO: Package these projects so they are robust for importing - -from flagdir import jobStatus # created by jws52 - - -class ProcessorScraper(ProcessorBase): - - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - - def process_pre_job(self, args): - return True - - def process_in_job(self, jobPath, status, configjson, component) -> object: - return self.process_in_job_media_scraper(jobPath, status, configjson, component) - - def process_post_job(self, jobPath, configjson): - pass - - def __init__(self) -> None: - super().__init__() - - - """ LIFECYCLE FUNCTIONS INHERITED FROM PROCESSOR.PY """ - - # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv - # # ODK v1.11.2: - # FMT = '%d-%b-%Y' - # FMT_SHORT = '%d-%b-%Y' - # ODK v1.18.0: - FMT = '%b %d, %Y %H:%m:%S %p' - FMT_SHORT = '%b %d, %Y' - - # url location of latest news report search - URL_DEFAULT = 'http://arrcc-viewer.herokuapp.com/assets/sample_data/data.zip' - - def get_news_reports_from_url(self, job_dir: str, url = URL_DEFAULT) -> None: - '''Downloads the news report data available on the ARRCC media report - dashboard, into the provided directory. - - Does not return anything''' - - assert os.path.exists(job_dir) - - # Get the zip file from the url and immediately write a local copy - fn_zip = f"{job_dir}/data.zip" - with open(fn_zip,'wb') as zipfile: - zipfile.write(requests.get(url).content) - - # unzip it - dir_unzip = f"{job_dir}/data/" - Path(dir_unzip).mkdir(parents=True, exist_ok=False) - cmd_unzip = ['unzip',fn_zip,'-d',dir_unzip] - subprocess.run(cmd_unzip) - - return - - def read_news_reports(self, job_dir: str) -> DataFrame: - '''Opens the news reports in the provided directory. - - Returns a pandas dataframe.''' - - fn = f"{job_dir}/data/NEWS REPORTS.csv" - - dateparse = lambda x: datetime.datetime.strptime(x, '%d-%m-%y') - - df = read_csv( - fn, - index_col=0, - header=0, - parse_dates=['Date'], - date_parser=dateparse) - - return df - - def estimate_rust( - self, - description: Series, - disease: str, - return_type: str) -> Series: - '''Works with pandas series''' - - # check for alternative naming - if disease in ['yellow','stripe']: - - any_disease = description.str.contains('yellow') | description.str.contains('stripe') - - else: - any_disease = description.str.contains(disease) - - return_dict = { - 'incidence':'medium', - 'severity':'30' - } - - prevalence = where(any_disease,return_dict[return_type],'na') - - return prevalence - - def guess_stage( - self, - date: Series, - country: Series) -> Series: - - #TODO: provide typical phenology dates per country - - # all of the country - # based on Moin's estimates from vegetative to ripening & maturing - # note his Start Date and End Dates are always 30 days apart - # so, this is sticking to the middle of the range - stage_start_dates_bangladesh = { - 'tillering':'5 Dec', # ~26 days { combined 30 days to complete - 'boot' :'31 Dec', # ~4 days { - 'heading' :'4 Jan', # 20 days - 'flowering':'24 Jan', # 10 days - 'milk' :'3 Feb', # 10 days - 'dough' :'13 Feb', # ~15 days { combined 25 days to complete - 'maturity' :'28 Mar', # ~10 days { - 'NA' :'9 Mar'} # total ~95 days - - # main season wheat in Terai - # based on Madan's estimates of min and max duration, taking the mean - # from vegetative to ripening & maturing - stage_start_dates_nepal_terai = { - 'tillering':'24 Dec', # ~ 56 days { combined 66 days to complete - 'boot' :'18 Feb', # ~10 days { - 'heading' :'28 Feb', # 10 days - 'flowering':'9 Mar', # 5 days - 'milk' :'14 Mar', # ~12 days { combined 27 days to complete - 'dough' :'26 Mar', # ~15 days { - 'maturity' :'10 Apr', # 10 days - 'NA' :'20 Apr'} # total ~118 days - - # TODO: Less important: implement main season wheat in mid-hills from Madan's estimates - # and determine how to distinguish Terai from mid-hills in news report - stage_start_dates_nepal_midhills = { - 'tillering':'', - 'boot' :'', - 'heading' :'', - 'flowering':'', - 'milk' :'', - 'dough' :'', - 'maturity' :'', - 'NA' :''} - - - # mainly for the north - # assume the same as Nepal Terai as for last year. - # TODO: get estimates specific to Pakistan - stage_start_dates_pakistan = stage_start_dates_nepal_terai - - # mainly for Haryana district - # assume the same as Nepal Terai as for last year. - # TODO: get estimates specific to India NW districts - stage_start_dates_india = stage_start_dates_nepal_terai - - stage_start_dates_by_country = { - 'Bangladesh' : stage_start_dates_bangladesh, - 'Nepal' : stage_start_dates_nepal_terai, - 'India' : stage_start_dates_india, - 'Pakistan' : stage_start_dates_pakistan} - - df = DataFrame({'date':date,'country':country}) - dates_by_entry = country.apply(lambda val: stage_start_dates_by_country[val]) - df2 = DataFrame.from_records(dates_by_entry.values) - df2.index = df.index - df3 = concat([df,df2],axis='columns') - - # handle Dec-Jan crossover (is there a neater way of doing this?) - - df3['year'] = df3['date'].apply(lambda di: di.year) - df3['lastyear'] = df3['date'].apply(lambda di: di.year-1) - - stages_thisyear = {coln:df3.apply(lambda row: datetime.datetime.strptime(f"{row[coln]} {row['year']}",'%d %b %Y'),axis='columns') for coln in stage_start_dates_bangladesh.keys()} - stages_lastyear = {coln:df3.apply(lambda row: datetime.datetime.strptime(f"{row[coln]} {row['lastyear']}",'%d %b %Y'),axis='columns') for coln in stage_start_dates_bangladesh.keys()} - df3_thisyear = DataFrame.from_records(stages_thisyear) - df3_lastyear = DataFrame.from_records(stages_lastyear) - - # Use knowledge of order of phenological stages to determine which dates are from last year - stage_order = ['tillering','boot','heading','flowering','milk','dough','maturity','NA'] - - df4 = df3_thisyear[stage_order] - - # check each stage in turn - for i,stage in enumerate(stage_order[:-1]): - stage_ip1 = stage_order[i+1] - # if the earlier stage has a later date, use last year's dates - df4[stage] = where(df4[stage]<df4[stage_ip1],df4[stage],df3_lastyear[stage]) - - # find out which stages start earlier than the survey date - date_compare = df4.le(date,axis='rows') - - # get name of latest valid stage - stage_series = date_compare.apply(lambda row: row.iloc[::-1].idxmax(),axis='columns') - - return stage_series - - def reformat_news_reports(self, df_in: DataFrame) -> DataFrame: - '''Reformats a dataframe of news reports to match BGRI wheat rust survey - data entries (making assumptions where necessary). First checks input is as - expected.''' - - #Check contents are as expected - cols = df_in.columns - expected_cols = ['Lon','Lat','Date','Type','Link','Country','State','District'] - - for expected_col in expected_cols: - assert expected_col in cols - - # re-order dataframe, with newest entry last - df = df_in.copy() - df.sort_values('Date',ascending=True,inplace=True) - - assumption_dict = { - 'field_area' : 1} - - output_dict = { - 'SubmissionDate' : df['Date'], - 'start' : df['Date'], - 'end' : df['Date'], - 'today' : df['Date'], - 'deviceid' : 999, - 'subscriberid' : 999, - 'imei' : 999, - 'phonenumber' : 999, - 'username' : 999, - 'country_list' : df['Country'], - 'blast_rust' : 'Rust', - 'surveyor_name' : 'News report', - 'institution' : 'na', - 'mobile_num' : 999, - 'site_information-survey_site' : 'Farmer field', - 'site_information-crop' : 'NA', - 'site_information-field_area' : assumption_dict['field_area'], - 'site_information-unit_m2' : 999, - 'site_information-field_size' : 999, - 'site_information-variety' : 'NA', - 'site_information-growth_stage' : self.guess_stage(df['Date'],df['Country']), - 'survey_infromation-location_name' : 999, - 'survey_infromation-location_blast' : 999, - 'survey_infromation-sampColor' : 999, - 'survey_infromation-dateRange' : 999, - 'survey_infromation-fieldNumber' : 999, - 'survey_infromation-diseaseIncidencePercentage' : 999, - 'survey_infromation-severityPercentage' : 999, - 'survey_infromation-survey_date' : df['Date'].apply(lambda cell: cell.strftime(self.FMT_SHORT)), - 'survey_infromation-site_name' : '"'+df['District'].astype(str)+', '+df['State'].astype(str)+', '+df['Country'].astype(str)+'"', - 'survey_infromation-location-Latitude' : df['Lat'], - 'survey_infromation-location-Longitude' : df['Lon'], - 'survey_infromation-location-Altitude' : -999, - 'survey_infromation-location-Accuracy' : -999, - 'stem_rust-stemrust_incidence' : self.estimate_rust(df['Type'],'stem','incidence'), - 'stem_rust-Stemrust_severity' : self.estimate_rust(df['Type'],'stem','severity'), - 'stem_rust-stemrust_host_plant_reaction' : 'na', - 'leaf_rust-leafrust_incidence' : self.estimate_rust(df['Type'],'leaf','incidence'), - 'leaf_rust-leafrust_severity' : self.estimate_rust(df['Type'],'leaf','severity'), - 'leaf_rust-leafrust_host_plant_reaction' : 'na', - 'yellow_rust-yellowrust_incidence' : self.estimate_rust(df['Type'],'yellow','incidence'), - 'yellow_rust-yellowrust_severity' : self.estimate_rust(df['Type'],'yellow','severity'), - 'yellow_rust-yellowrust_host_plant_reaction' : 'na', - 'septoria-septoria_incidence' : 'na', - 'septoria-septoria_severity' : 'na', - 'other_diseases_group-other_diseases' : -999, - 'score_diseases_count' : -999, - 'SET-OF-score_diseases' : -999, - 'samples_collected' : -999, - 'samples_type' : -999, - 'sample_size-number_stemrust_live' : -999, - 'sample_size-number_stemrust_dead_dna' : -999, - 'sample_size-number_yellowrust_live' : -999, - 'sample_size-number_yellowrust_dead' : -999, - 'sample_size-number_leafrust_live' : -999, - 'sample_size-using_barcode' : -999, - 'live_stemrust_samples_count' : -999, - 'SET-OF-live_stemrust_samples' : -999, - 'dead_stemrust_samples_count' : -999, - 'SET-OF-dead_stemrust_samples' : -999, - 'live_yellowrust_samples_count' : -999, - 'SET-OF-live_yellowrust_samples' : -999, - 'dead_yellowrust_samples_count' : -999, - 'SET-OF-dead_yellowrust_samples' : -999, - 'live_leafrust_samples_count' : -999, - 'SET-OF-live_leafrust_samples' : -999, - 'comment' : df['Link'], - 'meta-instanceID' : -999, - 'meta-instanceName' : -999, - 'KEY' : -999} - - df_out = DataFrame(output_dict) - - return df_out - - EMAIL_MSG = """Subject: ARRCC latest scraped media reports - - Here is an update of what is on the ARRCC media scraper platform. - - The latest entry is below. The full set for this season, auto-formatted for input to NAME source calcs in a basic way, is available at: - {0} - - Check all new webpages for validity and extra info (e.g. field area, variety), then edit and copy any relevant entries to: - /storage/app/EWS_prod/regions/SouthAsia/resources/coordinator/assets/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv - - Then, check that the survey data processor succeeds with these new entries. - - Thanks, Jake - - {1} - """ - - def send_email( - self, - output_fn: str, - data_str: str, - email_credential_fn: str, - ) -> None: - - - msg = self.EMAIL_MSG.format(output_fn,data_str) - - with open(email_credential_fn,'r') as f: - gmail_config = json.load(f) - - maintainers = gmail_config['toaddrs'] - - # Create a secure SSL context - context = ssl.create_default_context(cafile=certifi.where()) - - # It is indicated that gmail requires port 465 for SMTP_SSL, otherwise port - # 587 with .starttls() from - # https://realpython.com/python-send-email/#sending-a-plain-text-email I - # think port 587 is meant to make sense for the typical python logging - # smtphandler but that doesn't apply here - port = 465 # gmail_config['port'] - - with smtplib.SMTP_SSL(gmail_config['host'], port, context=context) as server: - - server.login(gmail_config['user'], gmail_config['pass']) - - server.sendmail(gmail_config['user'], maintainers, msg) - - self.logger.info('Message sent!') - - return - - def process_in_job_media_scraper( - self, - jobPath: str, - status: jobStatus, - config: dict, - component: str = 'Scraper' - ) -> None: - """ - 1) Get a latest copy of the news report data from dashboard URL. - 2) TODO: Reformat to match BGRI wheat rust survey data entries . - 3) Filter to latest news reports. - 4) Output to csv. - 5) email any new reports to maintainers, so they can copy into - SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv where appropriate. - """ - - config_scraper = config['Scraper'].copy() - - dateString = config['StartString'] - - self.logger.info("1) Getting a latest copy of the news report data from dashboard URL") - - url = config['Scraper']['URL'] - - self.get_news_reports_from_url(jobPath,url) - - reports_in = self.read_news_reports(jobPath) - - # TODO: Reformat - self.logger.info("2) Reformat to match BGRI wheat rust survey data entries") - - # (making assumptions where necessary) - reports = self.reformat_news_reports(reports_in) - - self.logger.info("3) Filter to latest news reports") - - this_season_starts_str = config['Scraper']['seasonStartString'] # '20201201' - this_season_starts = datetime.datetime.strptime(this_season_starts_str,'%Y%m%d') - - latest_reports = reports[reports['SubmissionDate']>=this_season_starts] - - # TODO: Low priority: Determine differences from last reformatted set of news reports - - self.logger.info("4) Output to csv") - - output_fn = f"{jobPath}/latest_reports_as_proxy_surveys.csv" - - # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv - latest_reports.to_csv( - output_fn, - index=False, - date_format=self.FMT) - - is_sending_email = config['Scraper'].get('SendEmail',True) - - if is_sending_email == True: - - self.logger.info("5) email any new reports to maintainers, so they can copy into") - self.logger.info("SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv where appropriate") - - selected_columns = [ - 'SubmissionDate', - 'site_information-field_area', - 'site_information-growth_stage', - 'survey_infromation-site_name', - 'country_list', - 'stem_rust-stemrust_incidence', - 'stem_rust-Stemrust_severity', - 'yellow_rust-yellowrust_incidence', - 'yellow_rust-yellowrust_severity', - 'leaf_rust-leafrust_incidence', - 'leaf_rust-leafrust_severity', - 'comment'] - - # remove pandas display limitation, so full web address is shown from comment - set_option('display.max_colwidth', None) - latest_report_selection = latest_reports.iloc[-1,:].loc[selected_columns].__str__() - - # get the email credentials file path from the environment variables - assert 'EMAIL_CRED' in os.environ - email_credential_fn = os.environ['EMAIL_CRED'] - assert os.path.exists(email_credential_fn) - - self.send_email( - output_fn, - latest_report_selection, - email_credential_fn = email_credential_fn) - - proc_out = {} - # Output files available for upload - proc_out['output'] = None - # Processing files available for clearing - proc_out['clearup'] = None - - return proc_out - - -if __name__ == '__main__': - processor = ProcessorScraper() - processor.run_processor("Scraper", "SCRAPER") -- GitLab