Newer
Older
"""
Utility functions which should have been part of the Google API client.
"""
import logging
from googleapiclient.errors import HttpError
from time import sleep
def list_all(list_cb, *, page_size=500, retries=2, retry_delay=5, items_key='items', **kwargs):
"""
Simple wrapper for Google Client SDK list()-style callables. Repeatedly fetches pages of
results merging all the responses together. Returns the merged "items" arrays from the
responses. The key used to get the "items" array from the response may be overridden via the
items_key argument.
"""
# Loop while we wait for nextPageToken to be "none"
page_token = None
resources = []
while True:
try:
list_response = list_cb(pageToken=page_token, maxResults=page_size, **kwargs).execute()
except HttpError as err:
if (err.resp.status >= 400 and retries > 0):
LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason)
sleep(retry_delay)
continue
if retries == 0:
LOG.error(
'Error response: %s %s - retry count exceeded', err.resp.status,
err.resp.reason
)
LOG.error('Error content: %r', err.content)
resources.extend(list_response.get(items_key, []))
# Get the token for the next page
page_token = list_response.get('nextPageToken')
if page_token is None:
break
return resources
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def list_all_in_list(directory_service, list_cb, *, item_ids=[], id_key='key', batch_size=1000,
page_size=500, retries=2, retry_delay=5, items_key='items', **kwargs):
"""
Wrapper for Google Client SDK list()-style callables, operating on a list of items. Invokes
the "list_cb" Google API method for each item in the "item_ids" list, repeatedly fetching
pages of results for each item and merging them together. The key used to identify the
original items in Google is specified by the "id_key" argument. Returns a dictionary mapping
the orginal item IDs to the merged "items" arrays from the responses for each item.
This is equivalent to calling list_all() for each item in the "item_ids" list, and collecting
all the results in a dictionary, except that it uses the Google batch processing API to reduce
the number of API calls. The arguments from "page_size" onwards work in the same way as
list_all().
"""
# Invoke list_cb for each item in the item_ids list, using the batch processing API. The
# Directory API supports a maximum batch size of 1000. See:
# https://developers.google.com/admin-sdk/directory/v1/guides/batch
resources = {item_id: [] for item_id in item_ids}
for i in range(0, len(item_ids), batch_size):
batch_item_ids = item_ids[i:i+batch_size]
# New requests needed to get the next page of results for any item in this batch
new_requests = {}
# Batch response handler
def handle_batch_response(item_id, response, exception):
if exception:
raise exception
resources[item_id].extend(response.get(items_key, []))
# Build a new request for the next page, if there is one
page_token = response.get('nextPageToken')
if page_token:
request = list_cb(
pageToken=page_token, maxResults=page_size, **kwargs, **{id_key: item_id}
)
new_requests[item_id] = request
# Form the initial batch request, for the first page of results for each item
batch = directory_service.new_batch_http_request()
for item_id in batch_item_ids:
request = list_cb(maxResults=page_size, **kwargs, **{id_key: item_id})
batch.add(request, request_id=item_id, callback=handle_batch_response)
# Loop while there are additional pages to be returned for any item in the batch
while True:
# Process the batch request
while True:
try:
batch.execute()
except HttpError as err:
if (err.resp.status == 503 and retries > 0):
retries -= 1
LOG.warn('503: Service unavailable - retrying')
sleep(retry_delay)
continue
if retries == 0:
LOG.error('503: Service unavailable - retry count exceeded')
raise
break
# Form a new batch for the next page of results for each item, if any
if new_requests:
batch = directory_service.new_batch_http_request()
for item_id, request in new_requests.items():
batch.add(request, request_id=item_id, callback=handle_batch_response)
new_requests.clear()
else:
break
return resources
def get_all_in_list(directory_service, get_cb, *, item_ids=[], id_key='key', batch_size=1000,
retries=2, retry_delay=5, **kwargs):
"""
Wrapper for Google Client SDK get()-style callables, operating on a list of items. Invokes the
"get_cb" Google API method for each item in the "item_ids" list, returning a list resources.
The key used to identify the items in Google is specified by the "id_key" argument.
This is equivalent to calling "get_cb" for each item in the "item_ids" list, and collecting all
the results in a list, except that it uses the Google batch processing API to reduce the number
of API calls.
"""
# Invoke get_cb for each item in the item_ids list, using the batch processing API. The
# Directory API supports a maximum batch size of 1000. See:
# https://developers.google.com/admin-sdk/directory/v1/guides/batch
resources = []
# Batch response handler
def handle_batch_response(request_id, response, exception):
if exception:
raise exception
resources.append(response)
for i in range(0, len(item_ids), batch_size):
batch_item_ids = item_ids[i:i+batch_size]
# Form the batch request
batch = directory_service.new_batch_http_request()
for item_id in batch_item_ids:
request = get_cb(**kwargs, **{id_key: item_id})
batch.add(request, callback=handle_batch_response)
# Process the batch request
while True:
try:
batch.execute()
except HttpError as err:
if (err.resp.status >= 400 and retries > 0):
LOG.warn('Error response: %s %s - retrying', err.resp.status, err.resp.reason)
sleep(retry_delay)
continue
if retries == 0:
LOG.error(
'Error response: %s %s - retry count exceeded', err.resp.status,
err.resp.reason
)
LOG.error('Error content: %r', err.content)