FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
gapiutil.py 7.17 KiB
Newer Older
Dr Rich Wareham's avatar
Dr Rich Wareham committed
"""
Utility functions which should have been part of the Google API client.

"""
Robin Goodall's avatar
Robin Goodall committed
import logging
from googleapiclient.errors import HttpError
from time import sleep
Robin Goodall's avatar
Robin Goodall committed
LOG = logging.getLogger(__name__)
Robin Goodall's avatar
Robin Goodall committed

def list_all(list_cb, *, page_size=500, retries=2, retry_delay=5, items_key='items',  **kwargs):
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    """
    Simple wrapper for Google Client SDK list()-style callables. Repeatedly fetches pages of
    results merging all the responses together. Returns the merged "items" arrays from the
    responses. The key used to get the "items" array from the response may be overridden via the
    items_key argument.

    """
    # Loop while we wait for nextPageToken to be "none"
    page_token = None
    resources = []
    while True:
Robin Goodall's avatar
Robin Goodall committed
        try:
            list_response = list_cb(pageToken=page_token, maxResults=page_size, **kwargs).execute()
        except HttpError as err:
            if (err.resp.status >= 400 and retries > 0):
Robin Goodall's avatar
Robin Goodall committed
                retries -= 1
                LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason)
Robin Goodall's avatar
Robin Goodall committed
                sleep(retry_delay)
                continue
            if retries == 0:
                LOG.error(
                    'Error response: %s %s - retry count exceeded', err.resp.status,
                    err.resp.reason
                )
                LOG.error('Error content: %r', err.content)
Robin Goodall's avatar
Robin Goodall committed
            raise
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        resources.extend(list_response.get(items_key, []))

        # Get the token for the next page
        page_token = list_response.get('nextPageToken')
        if page_token is None:
            break

    return resources


def list_all_in_list(directory_service, list_cb, *, item_ids=[], id_key='key', batch_size=1000,
                     page_size=500, retries=2, retry_delay=5, items_key='items',  **kwargs):
    """
    Wrapper for Google Client SDK list()-style callables, operating on a list of items. Invokes
    the "list_cb" Google API method for each item in the "item_ids" list, repeatedly fetching
    pages of results for each item and merging them together. The key used to identify the
    original items in Google is specified by the "id_key" argument. Returns a dictionary mapping
    the orginal item IDs to the merged "items" arrays from the responses for each item.

    This is equivalent to calling list_all() for each item in the "item_ids" list, and collecting
    all the results in a dictionary, except that it uses the Google batch processing API to reduce
    the number of API calls. The arguments from "page_size" onwards work in the same way as
    list_all().

    """
    # Invoke list_cb for each item in the item_ids list, using the batch processing API. The
    # Directory API supports a maximum batch size of 1000. See:
    # https://developers.google.com/admin-sdk/directory/v1/guides/batch
    resources = {item_id: [] for item_id in item_ids}
    for i in range(0, len(item_ids), batch_size):
        batch_item_ids = item_ids[i:i+batch_size]

        # New requests needed to get the next page of results for any item in this batch
        new_requests = {}

        # Batch response handler
        def handle_batch_response(item_id, response, exception):
            if exception:
                raise exception
            resources[item_id].extend(response.get(items_key, []))

            # Build a new request for the next page, if there is one
            page_token = response.get('nextPageToken')
            if page_token:
                request = list_cb(
                    pageToken=page_token, maxResults=page_size, **kwargs, **{id_key: item_id}
                )
                new_requests[item_id] = request

        # Form the initial batch request, for the first page of results for each item
        batch = directory_service.new_batch_http_request()
        for item_id in batch_item_ids:
            request = list_cb(maxResults=page_size, **kwargs, **{id_key: item_id})
            batch.add(request, request_id=item_id, callback=handle_batch_response)

        # Loop while there are additional pages to be returned for any item in the batch
        while True:
            # Process the batch request
            while True:
                try:
                    batch.execute()
                except HttpError as err:
                    if (err.resp.status == 503 and retries > 0):
                        retries -= 1
                        LOG.warn('503: Service unavailable - retrying')
                        sleep(retry_delay)
                        continue
                    if retries == 0:
                        LOG.error('503: Service unavailable - retry count exceeded')
                    raise
                break

            # Form a new batch for the next page of results for each item, if any
            if new_requests:
                batch = directory_service.new_batch_http_request()
                for item_id, request in new_requests.items():
                    batch.add(request, request_id=item_id, callback=handle_batch_response)
                new_requests.clear()
            else:
                break

    return resources


def get_all_in_list(directory_service, get_cb, *, item_ids=[], id_key='key', batch_size=1000,
                    retries=2, retry_delay=5, **kwargs):
    """
    Wrapper for Google Client SDK get()-style callables, operating on a list of items. Invokes the
    "get_cb" Google API method for each item in the "item_ids" list, returning a list resources.
    The key used to identify the items in Google is specified by the "id_key" argument.

    This is equivalent to calling "get_cb" for each item in the "item_ids" list, and collecting all
    the results in a list, except that it uses the Google batch processing API to reduce the number
    of API calls.

    """
    # Invoke get_cb for each item in the item_ids list, using the batch processing API. The
    # Directory API supports a maximum batch size of 1000. See:
    # https://developers.google.com/admin-sdk/directory/v1/guides/batch
    resources = []

    # Batch response handler
    def handle_batch_response(request_id, response, exception):
        if exception:
            raise exception
        resources.append(response)

    for i in range(0, len(item_ids), batch_size):
        batch_item_ids = item_ids[i:i+batch_size]

        # Form the batch request
        batch = directory_service.new_batch_http_request()
        for item_id in batch_item_ids:
            request = get_cb(**kwargs, **{id_key: item_id})
            batch.add(request, callback=handle_batch_response)

        # Process the batch request
        while True:
            try:
                batch.execute()
            except HttpError as err:
                if (err.resp.status >= 400 and retries > 0):
                    retries -= 1
                    LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason)
                    sleep(retry_delay)
                    continue
                if retries == 0:
                    LOG.error(
                        'Error response: %s %s - retry count exceeded', err.resp.status,
                        err.resp.reason
                    )
                    LOG.error('Error content: %r', err.content)
                raise
            break

    return resources