#ProcessorScraper.py '''Functions to process the media scraper reports available from Asif Al Faisal's dashboard component. Downloads a csv file of news reports and reformats them as survey records to provide to the wheat rust early warning system. The format and content of the csv of news reports is based on ARRCC work by Asif Al Faisal (CIMMYT-Bangladesh).''' import datetime import json import logging import os from pathlib import Path import requests import smtplib import ssl import subprocess import certifi from numpy import where from pandas import concat, DataFrame, read_csv, Series, set_option # gitlab projects # TODO: Package these projects so they are robust for importing from flagdir import jobStatus # created by jws52 from ProcessorUtils import add_filters_to_sublogger logger = logging.getLogger('Processor.Scraper') add_filters_to_sublogger(logger) # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv # # ODK v1.11.2: # FMT = '%d-%b-%Y' # FMT_SHORT = '%d-%b-%Y' # ODK v1.18.0: FMT = '%b %d, %Y %H:%m:%S %p' FMT_SHORT = '%b %d, %Y' # url location of latest news report search URL_DEFAULT = 'http://arrcc-viewer.herokuapp.com/assets/sample_data/data.zip' def get_news_reports_from_url(job_dir: str, url = URL_DEFAULT) -> None: '''Downloads the news report data available on the ARRCC media report dashboard, into the provided directory. Does not return anything''' assert os.path.exists(job_dir) # Get the zip file from the url and immediately write a local copy fn_zip = f"{job_dir}/data.zip" with open(fn_zip,'wb') as zipfile: zipfile.write(requests.get(url).content) # unzip it dir_unzip = f"{job_dir}/data/" Path(dir_unzip).mkdir(parents=True, exist_ok=False) cmd_unzip = ['unzip',fn_zip,'-d',dir_unzip] subprocess.run(cmd_unzip) return def read_news_reports(job_dir: str) -> DataFrame: '''Opens the news reports in the provided directory. Returns a pandas dataframe.''' fn = f"{job_dir}/data/NEWS REPORTS.csv" dateparse = lambda x: datetime.datetime.strptime(x, '%d-%m-%y') df = read_csv( fn, index_col=0, header=0, parse_dates=['Date'], date_parser=dateparse) return df def estimate_rust( description: Series, disease: str, return_type: str) -> Series: '''Works with pandas series''' # check for alternative naming if disease in ['yellow','stripe']: any_disease = description.str.contains('yellow') | description.str.contains('stripe') else: any_disease = description.str.contains(disease) return_dict = { 'incidence':'medium', 'severity':'30' } prevalence = where(any_disease,return_dict[return_type],'na') return prevalence def guess_stage( date: Series, country: Series) -> Series: #TODO: provide typical phenology dates per country # all of the country # based on Moin's estimates from vegetative to ripening & maturing # note his Start Date and End Dates are always 30 days apart # so, this is sticking to the middle of the range stage_start_dates_bangladesh = { 'tillering':'5 Dec', # ~26 days { combined 30 days to complete 'boot' :'31 Dec', # ~4 days { 'heading' :'4 Jan', # 20 days 'flowering':'24 Jan', # 10 days 'milk' :'3 Feb', # 10 days 'dough' :'13 Feb', # ~15 days { combined 25 days to complete 'maturity' :'28 Mar', # ~10 days { 'NA' :'9 Mar'} # total ~95 days # main season wheat in Terai # based on Madan's estimates of min and max duration, taking the mean # from vegetative to ripening & maturing stage_start_dates_nepal_terai = { 'tillering':'24 Dec', # ~ 56 days { combined 66 days to complete 'boot' :'18 Feb', # ~10 days { 'heading' :'28 Feb', # 10 days 'flowering':'9 Mar', # 5 days 'milk' :'14 Mar', # ~12 days { combined 27 days to complete 'dough' :'26 Mar', # ~15 days { 'maturity' :'10 Apr', # 10 days 'NA' :'20 Apr'} # total ~118 days # TODO: Less important: implement main season wheat in mid-hills from Madan's estimates # and determine how to distinguish Terai from mid-hills in news report stage_start_dates_nepal_midhills = { 'tillering':'', 'boot' :'', 'heading' :'', 'flowering':'', 'milk' :'', 'dough' :'', 'maturity' :'', 'NA' :''} # mainly for the north # assume the same as Nepal Terai as for last year. # TODO: get estimates specific to Pakistan stage_start_dates_pakistan = stage_start_dates_nepal_terai # mainly for Haryana district # assume the same as Nepal Terai as for last year. # TODO: get estimates specific to India NW districts stage_start_dates_india = stage_start_dates_nepal_terai stage_start_dates_by_country = { 'Bangladesh' : stage_start_dates_bangladesh, 'Nepal' : stage_start_dates_nepal_terai, 'India' : stage_start_dates_india, 'Pakistan' : stage_start_dates_pakistan} df = DataFrame({'date':date,'country':country}) dates_by_entry = country.apply(lambda val: stage_start_dates_by_country[val]) df2 = DataFrame.from_records(dates_by_entry.values) df2.index = df.index df3 = concat([df,df2],axis='columns') # handle Dec-Jan crossover (is there a neater way of doing this?) df3['year'] = df3['date'].apply(lambda di: di.year) df3['lastyear'] = df3['date'].apply(lambda di: di.year-1) stages_thisyear = {coln:df3.apply(lambda row: datetime.datetime.strptime(f"{row[coln]} {row['year']}",'%d %b %Y'),axis='columns') for coln in stage_start_dates_bangladesh.keys()} stages_lastyear = {coln:df3.apply(lambda row: datetime.datetime.strptime(f"{row[coln]} {row['lastyear']}",'%d %b %Y'),axis='columns') for coln in stage_start_dates_bangladesh.keys()} df3_thisyear = DataFrame.from_records(stages_thisyear) df3_lastyear = DataFrame.from_records(stages_lastyear) # Use knowledge of order of phenological stages to determine which dates are from last year stage_order = ['tillering','boot','heading','flowering','milk','dough','maturity','NA'] df4 = df3_thisyear[stage_order] # check each stage in turn for i,stage in enumerate(stage_order[:-1]): stage_ip1 = stage_order[i+1] # if the earlier stage has a later date, use last year's dates df4[stage] = where(df4[stage]<df4[stage_ip1],df4[stage],df3_lastyear[stage]) # find out which stages start earlier than the survey date date_compare = df4.le(date,axis='rows') # get name of latest valid stage stage_series = date_compare.apply(lambda row: row.iloc[::-1].idxmax(),axis='columns') return stage_series def reformat_news_reports(df_in: DataFrame) -> DataFrame: '''Reformats a dataframe of news reports to match BGRI wheat rust survey data entries (making assumptions where necessary). First checks input is as expected.''' #Check contents are as expected cols = df_in.columns expected_cols = ['Lon','Lat','Date','Type','Link','Country','State','District'] for expected_col in expected_cols: assert expected_col in cols # re-order dataframe, with newest entry last df = df_in.copy() df.sort_values('Date',ascending=True,inplace=True) assumption_dict = { 'field_area' : 1} output_dict = { 'SubmissionDate' : df['Date'], 'start' : df['Date'], 'end' : df['Date'], 'today' : df['Date'], 'deviceid' : 999, 'subscriberid' : 999, 'imei' : 999, 'phonenumber' : 999, 'username' : 999, 'country_list' : df['Country'], 'blast_rust' : 'Rust', 'surveyor_name' : 'News report', 'institution' : 'na', 'mobile_num' : 999, 'site_information-survey_site' : 'Farmer field', 'site_information-crop' : 'NA', 'site_information-field_area' : assumption_dict['field_area'], 'site_information-unit_m2' : 999, 'site_information-field_size' : 999, 'site_information-variety' : 'NA', 'site_information-growth_stage' : guess_stage(df['Date'],df['Country']), 'survey_infromation-location_name' : 999, 'survey_infromation-location_blast' : 999, 'survey_infromation-sampColor' : 999, 'survey_infromation-dateRange' : 999, 'survey_infromation-fieldNumber' : 999, 'survey_infromation-diseaseIncidencePercentage' : 999, 'survey_infromation-severityPercentage' : 999, 'survey_infromation-survey_date' : df['Date'].apply(lambda cell: cell.strftime(FMT_SHORT)), 'survey_infromation-site_name' : '"'+df['District'].astype(str)+', '+df['State'].astype(str)+', '+df['Country'].astype(str)+'"', 'survey_infromation-location-Latitude' : df['Lat'], 'survey_infromation-location-Longitude' : df['Lon'], 'survey_infromation-location-Altitude' : -999, 'survey_infromation-location-Accuracy' : -999, 'stem_rust-stemrust_incidence' : estimate_rust(df['Type'],'stem','incidence'), 'stem_rust-Stemrust_severity' : estimate_rust(df['Type'],'stem','severity'), 'stem_rust-stemrust_host_plant_reaction' : 'na', 'leaf_rust-leafrust_incidence' : estimate_rust(df['Type'],'leaf','incidence'), 'leaf_rust-leafrust_severity' : estimate_rust(df['Type'],'leaf','severity'), 'leaf_rust-leafrust_host_plant_reaction' : 'na', 'yellow_rust-yellowrust_incidence' : estimate_rust(df['Type'],'yellow','incidence'), 'yellow_rust-yellowrust_severity' : estimate_rust(df['Type'],'yellow','severity'), 'yellow_rust-yellowrust_host_plant_reaction' : 'na', 'septoria-septoria_incidence' : 'na', 'septoria-septoria_severity' : 'na', 'other_diseases_group-other_diseases' : -999, 'score_diseases_count' : -999, 'SET-OF-score_diseases' : -999, 'samples_collected' : -999, 'samples_type' : -999, 'sample_size-number_stemrust_live' : -999, 'sample_size-number_stemrust_dead_dna' : -999, 'sample_size-number_yellowrust_live' : -999, 'sample_size-number_yellowrust_dead' : -999, 'sample_size-number_leafrust_live' : -999, 'sample_size-using_barcode' : -999, 'live_stemrust_samples_count' : -999, 'SET-OF-live_stemrust_samples' : -999, 'dead_stemrust_samples_count' : -999, 'SET-OF-dead_stemrust_samples' : -999, 'live_yellowrust_samples_count' : -999, 'SET-OF-live_yellowrust_samples' : -999, 'dead_yellowrust_samples_count' : -999, 'SET-OF-dead_yellowrust_samples' : -999, 'live_leafrust_samples_count' : -999, 'SET-OF-live_leafrust_samples' : -999, 'comment' : df['Link'], 'meta-instanceID' : -999, 'meta-instanceName' : -999, 'KEY' : -999} df_out = DataFrame(output_dict) return df_out EMAIL_MSG = """Subject: ARRCC latest scraped media reports Here is an update of what is on the ARRCC media scraper platform. The latest entry is below. The full set for this season, auto-formatted for input to NAME source calcs in a basic way, is available at: {0} Check all new webpages for validity and extra info (e.g. field area, variety), then edit and copy any relevant entries to: /storage/app/EWS_prod/regions/SouthAsia/resources/coordinator/assets/SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv Then, check that the survey data processor succeeds with these new entries. Thanks, Jake {1} """ def send_email( output_fn: str, data_str: str, email_credential_fn: str, ) -> None: msg = EMAIL_MSG.format(output_fn,data_str) with open(email_credential_fn,'r') as f: gmail_config = json.load(f) maintainers = gmail_config['toaddrs'] # Create a secure SSL context context = ssl.create_default_context(cafile=certifi.where()) # It is indicated that gmail requires port 465 for SMTP_SSL, otherwise port # 587 with .starttls() from # https://realpython.com/python-send-email/#sending-a-plain-text-email I # think port 587 is meant to make sense for the typical python logging # smtphandler but that doesn't apply here port = 465 # gmail_config['port'] with smtplib.SMTP_SSL(gmail_config['host'], port, context=context) as server: server.login(gmail_config['user'], gmail_config['pass']) server.sendmail(gmail_config['user'], maintainers, msg) logger.info('Message sent!') return def process_in_job_media_scraper( jobPath: str, status: jobStatus, config: dict, component: str = 'Scraper' ) -> None: """ 1) Get a latest copy of the news report data from dashboard URL. 2) TODO: Reformat to match BGRI wheat rust survey data entries . 3) Filter to latest news reports. 4) Output to csv. 5) email any new reports to maintainers, so they can copy into SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv where appropriate. """ config_scraper = config['Scraper'].copy() dateString = config['StartString'] logger.info("1) Getting a latest copy of the news report data from dashboard URL") url = config['Scraper']['URL'] get_news_reports_from_url(jobPath,url) reports_in = read_news_reports(jobPath) # TODO: Reformat logger.info("2) Reformat to match BGRI wheat rust survey data entries") # (making assumptions where necessary) reports = reformat_news_reports(reports_in) logger.info("3) Filter to latest news reports") this_season_starts_str = config['Scraper']['seasonStartString'] # '20201201' this_season_starts = datetime.datetime.strptime(this_season_starts_str,'%Y%m%d') latest_reports = reports[reports['SubmissionDate']>=this_season_starts] # TODO: Low priority: Determine differences from last reformatted set of news reports logger.info("4) Output to csv") output_fn = f"{jobPath}/latest_reports_as_proxy_surveys.csv" # date format conforms to format used in SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv latest_reports.to_csv( output_fn, index=False, date_format=FMT) is_sending_email = config['Scraper'].get('SendEmail',True) if is_sending_email == True: logger.info("5) email any new reports to maintainers, so they can copy into") logger.info("SURVEYDATA_MANUAL/LIVE_SURVEYDATA_TOUSE.csv where appropriate") selected_columns = [ 'SubmissionDate', 'site_information-field_area', 'site_information-growth_stage', 'survey_infromation-site_name', 'country_list', 'stem_rust-stemrust_incidence', 'stem_rust-Stemrust_severity', 'yellow_rust-yellowrust_incidence', 'yellow_rust-yellowrust_severity', 'leaf_rust-leafrust_incidence', 'leaf_rust-leafrust_severity', 'comment'] # remove pandas display limitation, so full web address is shown from comment set_option('display.max_colwidth', None) latest_report_selection = latest_reports.iloc[-1,:].loc[selected_columns].__str__() # get the email credentials file path from the environment variables assert 'EMAIL_CRED' in os.environ email_credential_fn = os.environ['EMAIL_CRED'] assert os.path.exists(email_credential_fn) send_email( output_fn, latest_report_selection, email_credential_fn = email_credential_fn) proc_out = {} # Output files available for upload proc_out['output'] = None # Processing files available for clearing proc_out['clearup'] = None return proc_out