""" Utility functions which should have been part of the Google API client. """ import logging from googleapiclient.errors import HttpError from time import sleep LOG = logging.getLogger(__name__) def list_all(list_cb, *, page_size=500, retries=2, retry_delay=5, items_key='items', **kwargs): """ Simple wrapper for Google Client SDK list()-style callables. Repeatedly fetches pages of results merging all the responses together. Returns the merged "items" arrays from the responses. The key used to get the "items" array from the response may be overridden via the items_key argument. """ # Loop while we wait for nextPageToken to be "none" page_token = None resources = [] while True: try: list_response = list_cb(pageToken=page_token, maxResults=page_size, **kwargs).execute() except HttpError as err: if (err.resp.status >= 400 and retries > 0): retries -= 1 LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason) sleep(retry_delay) continue if retries == 0: LOG.error( 'Error response: %s %s - retry count exceeded', err.resp.status, err.resp.reason ) LOG.error('Error content: %r', err.content) raise resources.extend(list_response.get(items_key, [])) # Get the token for the next page page_token = list_response.get('nextPageToken') if page_token is None: break return resources def list_all_in_list(directory_service, list_cb, *, item_ids=[], id_key='key', batch_size=1000, page_size=500, retries=2, retry_delay=5, items_key='items', **kwargs): """ Wrapper for Google Client SDK list()-style callables, operating on a list of items. Invokes the "list_cb" Google API method for each item in the "item_ids" list, repeatedly fetching pages of results for each item and merging them together. The key used to identify the original items in Google is specified by the "id_key" argument. Returns a dictionary mapping the orginal item IDs to the merged "items" arrays from the responses for each item. This is equivalent to calling list_all() for each item in the "item_ids" list, and collecting all the results in a dictionary, except that it uses the Google batch processing API to reduce the number of API calls. The arguments from "page_size" onwards work in the same way as list_all(). """ # Invoke list_cb for each item in the item_ids list, using the batch processing API. The # Directory API supports a maximum batch size of 1000. See: # https://developers.google.com/admin-sdk/directory/v1/guides/batch resources = {item_id: [] for item_id in item_ids} for i in range(0, len(item_ids), batch_size): batch_item_ids = item_ids[i:i+batch_size] # New requests needed to get the next page of results for any item in this batch new_requests = {} # Batch response handler def handle_batch_response(item_id, response, exception): if exception: raise exception resources[item_id].extend(response.get(items_key, [])) # Build a new request for the next page, if there is one page_token = response.get('nextPageToken') if page_token: request = list_cb( pageToken=page_token, maxResults=page_size, **kwargs, **{id_key: item_id} ) new_requests[item_id] = request # Form the initial batch request, for the first page of results for each item batch = directory_service.new_batch_http_request() for item_id in batch_item_ids: request = list_cb(maxResults=page_size, **kwargs, **{id_key: item_id}) batch.add(request, request_id=item_id, callback=handle_batch_response) # Loop while there are additional pages to be returned for any item in the batch while True: # Process the batch request while True: try: batch.execute() except HttpError as err: if (err.resp.status == 503 and retries > 0): retries -= 1 LOG.warn('503: Service unavailable - retrying') sleep(retry_delay) continue if retries == 0: LOG.error('503: Service unavailable - retry count exceeded') raise break # Form a new batch for the next page of results for each item, if any if new_requests: batch = directory_service.new_batch_http_request() for item_id, request in new_requests.items(): batch.add(request, request_id=item_id, callback=handle_batch_response) new_requests.clear() else: break return resources def get_all_in_list(directory_service, get_cb, *, item_ids=[], id_key='key', batch_size=1000, retries=2, retry_delay=5, **kwargs): """ Wrapper for Google Client SDK get()-style callables, operating on a list of items. Invokes the "get_cb" Google API method for each item in the "item_ids" list, returning a list resources. The key used to identify the items in Google is specified by the "id_key" argument. This is equivalent to calling "get_cb" for each item in the "item_ids" list, and collecting all the results in a list, except that it uses the Google batch processing API to reduce the number of API calls. """ # Invoke get_cb for each item in the item_ids list, using the batch processing API. The # Directory API supports a maximum batch size of 1000. See: # https://developers.google.com/admin-sdk/directory/v1/guides/batch resources = [] # Batch response handler def handle_batch_response(request_id, response, exception): if exception: raise exception resources.append(response) for i in range(0, len(item_ids), batch_size): batch_item_ids = item_ids[i:i+batch_size] # Form the batch request batch = directory_service.new_batch_http_request() for item_id in batch_item_ids: request = get_cb(**kwargs, **{id_key: item_id}) batch.add(request, callback=handle_batch_response) # Process the batch request while True: try: batch.execute() except HttpError as err: if (err.resp.status >= 400 and retries > 0): retries -= 1 LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason) sleep(retry_delay) continue if retries == 0: LOG.error( 'Error response: %s %s - retry count exceeded', err.resp.status, err.resp.reason ) LOG.error('Error content: %r', err.content) raise break return resources