Newer
Older
"""
Synchronise Google Directory with a local LDAP directory.
"""
import crypt
import dataclasses
import itertools
import logging
import numbers
import time
from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming
LOG = logging.getLogger(__name__)
# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
]
# Scoped *in addition to READ_ONLY_SCOPES* required to perform a full update.
WRITE_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user',
]
@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
# A regular expression which is used to match the organization unit path for Google users who
# should be excluded from the list returned by Google. Those users do not exist for the
# purposes of the rest of the sync and so if they appear in the list of managed users this
# script will attempt to re-add them and fail in the process. Use this setting for users who
# are managed completely outside of this script.
ignore_google_org_unit_path_regex: typing.Union[str, None] = None
# The organization unit path in which new accounts are placed
new_user_org_unit_path: str = '/'
# Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
inter_batch_delay: numbers.Real = 5
# Batch size for Google API calls. Google supports batching requests together into one API
# call.
batch_size: int = 50
# Number of times to retry HTTP requests if a 503 "Service Unavailable" received
http_retries: int = 2
# Delay in seconds between HTTP 503 response retries
http_retry_delay: numbers.Real = 5
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def sync(configuration, *, read_only=True):
"""Perform sync given configuration dictionary."""
if read_only:
LOG.info('Performing synchronisation in READ ONLY mode.')
else:
LOG.info('Performing synchronisation in WRITE mode.')
# Parse configuration
sync_config = Configuration.from_dict(configuration.get('sync', {}))
gapi_auth_config = gapiauth.Configuration.from_dict(
configuration.get('google_api', {}).get('auth', {}))
gapi_domain_config = gapidomain.Configuration.from_dict(
configuration.get('google_domain', {}))
ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))
# Load appropriate Google credentials.
creds = (
gapi_auth_config.load_credentials(read_only=read_only)
.with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
.with_subject(gapi_domain_config.admin_user)
)
# Get a set containing all CRSids. These are all the people who are eligible to be in our
# GSuite instance. If a user is in GSuite and is *not* present in this list then they are
# suspended.
LOG.info('Reading eligible user entries from LDAP')
eligible_uids = ldap_config.get_eligible_uids()
LOG.info('Total LDAP entries: %s', len(eligible_uids))
# Get a list of managed users. These are all the people who match the "managed_user_filter" in
# the LDAP settings.
LOG.info('Reading managed user entries from LDAP')
managed_user_entries = ldap_config.get_managed_user_entries()
# Form a mapping from uid to managed user.
managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}
# Form a set of all *managed user* uids
managed_user_uids = set(managed_user_entries_by_uid.keys())
LOG.info('Total managed user entries: %s', len(managed_user_uids))
# Sanity check: the managed users should be a subset of the eligible ones.
if len(managed_user_uids - eligible_uids) != 0:
raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')
# Build the directory service using Google API discovery.
directory_service = discovery.build('admin', 'directory_v1', credentials=creds)
# Retrieve information on all users excluding domain admins.
LOG.info('Getting information on Google domain users')
fields = [
'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
'name(givenName, familyName)',
]
all_google_users = gapiutil.list_all(
directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
)
# Strip any "to be ignored" users out of the results.
if sync_config.ignore_google_org_unit_path_regex is not None:
LOG.info(
'Ignoring users whose organization unit path matches %r',
sync_config.ignore_google_org_unit_path_regex)
# Check that all users have an orgUnitPath
missing_org = [
u for u in all_google_users if 'orgUnitPath' not in u
]
if len(missing_org) != 0:
LOG.error('User entries missing orgUnitPath: %s (starting with %s)',
len(missing_org),
missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0]
else 'user with blank email')
raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath')
# Remove users matching regex
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
all_google_users = [
u for u in all_google_users if not regex.match(u['orgUnitPath'])
]
# Sanity check. There should be no admins in the returned results.
if any(u.get('isAdmin', False) for u in all_google_users):
raise RuntimeError('Sanity check failed: admin users in user list')
# Form a mapping from uid to Google user. We form the uid by splitting out the local-part of
# the email address.
all_google_users_by_uid = {u['primaryEmail'].split('@')[0]: u for u in all_google_users}
# Form a set of all Google-side uids. The all_google_uids set is all users including the
# suspended ones and the suspended_google_uids set is only the suspended users. Non suspended
# users are therefore all_google_uids - suspended_google_uids.
all_google_uids = set(all_google_users_by_uid.keys())
suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}
# Sanity check. We should not have lost anyone. (I.e. the uid should be unique.)
if len(all_google_uids) != len(all_google_users):
raise RuntimeError('Sanity check failed: user list changed length')
# Log some stats.
LOG.info('Total Google users: %s', len(all_google_uids))
LOG.info(
'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))
# For each user which exists in Google or the managed user set which is eligible, determine if
# they need updating/creating. If so, record a patch/insert for the user.
LOG.info('Calculating updates...')
google_user_updates = {}
google_user_creations = {}
for uid, managed_user_entry in managed_user_entries_by_uid.items():
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# Heuristically determine the given and family names.
names = naming.get_names(
uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
sn=managed_user_entry.sn)
# Form expected user resource fields.
expected_google_user = {
'name': {
'givenName': names.given_name,
'familyName': names.family_name,
},
}
# Find existing Google user (if any).
existing_google_user = all_google_users_by_uid.get(uid)
if existing_google_user is not None:
# See if we need to change the existing user
# Unless anything needs changing, the patch is empty.
patch = {}
# Determine how to patch user's name.
google_user_name = existing_google_user.get('name', {})
patch_name = {}
if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
patch_name['givenName'] = names.given_name
if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
patch_name['familyName'] = names.family_name
if len(patch_name) > 0:
patch['name'] = patch_name
# Only record non-empty patches.
if len(patch) > 0:
google_user_updates[uid] = patch
else:
# No existing Google user. Record the new resource. Generate a new user password and
# send Google the hash. It doesn't matter what this password is since we never have the
# user log in with it. For password-only applications the user can make use of an
# application-specific password.
new_user = {
**{
'primaryEmail': f'{uid}@{gapi_domain_config.name}',
},
**expected_google_user,
}
google_user_creations[uid] = new_user
# Form a set of all the uids which need patching.
uids_to_update = set(google_user_updates.keys())
LOG.info('Number of existing users to update: %s', len(uids_to_update))
# Form a set of all the uids which need adding.
uids_to_add = set(google_user_creations.keys())
LOG.info('Number of users to add: %s', len(uids_to_add))
# Form a set of all uids which need reactivating. We reactive users who are in the managed user
# list *and* the suspended user list.
uids_to_reactivate = suspended_google_uids & managed_user_uids
LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))
# Form a set of all uids which should be suspended. This is all the unsuspended Google uids
# which do not appear in our eligible user list.
uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
LOG.info('Number of users to suspend: %s', len(uids_to_suspend))
# Calculate percentage change.
user_change_percentage = 100. * (
len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
/
max(1, len(all_google_uids))
)
LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)
# Enforce percentage change sanity check.
if (limits_config.abort_user_change_percentage is not None and
user_change_percentage > limits_config.abort_user_change_percentage):
LOG.error(
'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
user_change_percentage, limits_config.abort_user_change_percentage
)
raise RuntimeError('Aborting due to large user change percentage')
# Cap maximum size of various operations.
if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
LOG.info('Capped number of new users to %s', len(uids_to_add))
if (limits_config.max_suspended_users is not None and
len(uids_to_suspend) > limits_config.max_suspended_users):
uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
if (limits_config.max_reactivated_users is not None and
len(uids_to_reactivate) > limits_config.max_reactivated_users):
uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
if (limits_config.max_updated_users is not None and
len(uids_to_update) > limits_config.max_updated_users):
uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
LOG.info('Capped number of users to update to %s', len(uids_to_update))
# A generator which will generate patch() and insert() calls to the directory service to
# perform the actions required
def api_requests():
# Update existing users.
user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
for uid, update in user_updates.items():
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Update user "%s": "%r"', uid, update)
yield directory_service.users().patch(userKey=google_id, body=update)
# Suspend old users
for uid in uids_to_suspend:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Suspending user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': True})
# Reactivate returning users
for uid in uids_to_reactivate:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Reactivating user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': False})
# Create new users
for uid in uids_to_add:
# Generate a random password which is thrown away.
new_user = {**{
'hashFunction': 'crypt',
'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
'orgUnitPath': sync_config.new_user_org_unit_path,
}, **google_user_creations[uid]}
redacted_user = {**new_user, **{'password': 'REDACTED'}}
LOG.info('Adding user "%s": %s', uid, redacted_user)
yield directory_service.users().insert(body=new_user)
# Make an chunked iterator of requests to the directory API. The Directory API supports a
# maximum batch size of 1000. See:
# https://developers.google.com/admin-sdk/directory/v1/guides/batch
for request_batch in _grouper(api_requests(), n=sync_config.batch_size):
# Form batch request.
batch = directory_service.new_batch_http_request()
for request in request_batch:
batch.add(request, callback=_handle_batch_response)
# Execute the batch request if not in read only mode. Otherwise log that we would have.
if not read_only:
LOG.info('Issuing batch request to Google.')
time.sleep(sync_config.inter_batch_delay)
retries = sync_config.http_retries
while True:
try:
batch.execute()
except errors.HttpError as err:
if (err.resp.status == 503 and retries > 0):
retries -= 1
LOG.warn('503: Service unavailable - retrying')
time.sleep(sync_config.http_retry_delay)
continue
if retries == 0:
LOG.error('503: Service unavailable - retry count exceeded')
raise
break
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
else:
LOG.info('Not issuing batch request in read-only mode.')
def _handle_batch_response(request_id, response, exception):
if exception is not None:
LOG.error('Error performing request: %s', exception)
LOG.error('Response: %r', response)
def _limit(s, limit):
"""
Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
exact set of elements retained is not specified.
>>> s = set('ABCDEFGHIJKLMNOPQ')
>>> len(s) > 5
True
>>> len(_limit(s, 5)) == 5
True
>>> len(_limit(s, 500)) == len(s)
True
All elements of the returned set are taken from input set.
>>> s_prime = _limit(s, 5)
>>> s_prime - s
set()
"""
return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}
def _grouper(iterable, *, n):
"""
Group an iterable into chunks of at most *n* elements. A generator which yields iterables
representing slices of *iterable*.
>>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
>>> def generator(stop):
... for x in range(stop):
... yield x
>>> [list(i) for i in _grouper(generator(10), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> [list(i) for i in _grouper(generator(12), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]
The implementation of this function attempts to be efficient; the chunks are iterables which
are generated on demand rather than being constructed first. Hence this function can deal with
iterables which would fill memory if intermediate chunks were stored.
>>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
>>> next(next(i))
0
"""
it = iter(iterable)
while True:
next_chunk_it = itertools.islice(it, n)
try:
first = next(next_chunk_it)
except StopIteration:
return
yield itertools.chain((first,), next_chunk_it)