Newer
Older
"""
Synchronise Google Directory with a local LDAP directory.
"""
import crypt
import dataclasses
import itertools
import logging
import numbers
import time
from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming
LOG = logging.getLogger(__name__)
# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
'https://www.googleapis.com/auth/admin.directory.group.readonly',
'https://www.googleapis.com/auth/admin.directory.group.member.readonly',
'https://www.googleapis.com/auth/apps.groups.settings'
# Scopes *in addition to READ_ONLY_SCOPES* required to perform a full update.
WRITE_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user',
'https://www.googleapis.com/auth/admin.directory.group',
'https://www.googleapis.com/auth/admin.directory.group.member'
]
@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
# A regular expression which is used to match the organization unit path for Google users who
# should be excluded from the list returned by Google. Those users do not exist for the
# purposes of the rest of the sync and so if they appear in the list of managed users this
# script will attempt to re-add them and fail in the process. Use this setting for users who
# are managed completely outside of this script.
ignore_google_org_unit_path_regex: typing.Union[str, None] = None
# The organization unit path in which new accounts are placed
new_user_org_unit_path: str = '/'
# Suffix appended to the names of groups created in Google. The Google group name will be
# "{groupName}{group_name_suffix}", where {groupName} is the Lookup group name.
group_name_suffix: str = ' from lookup.cam.ac.uk'
# Settings to be applied to groups in Google. These settings are applied to both new and
# existing groups imported from Lookup.
# See https://developers.google.com/admin-sdk/groups-settings/v1/reference/groups#json
group_settings: dict = dataclasses.field(default_factory=lambda: {
'whoCanJoin': 'INVITED_CAN_JOIN',
'whoCanViewMembership': 'ALL_IN_DOMAIN_CAN_VIEW',
'whoCanViewGroup': 'ALL_MEMBERS_CAN_VIEW',
'whoCanPostMessage': 'ALL_IN_DOMAIN_CAN_POST',
'allowWebPosting': 'false',
'messageModerationLevel': 'MODERATE_ALL_MESSAGES',
'includeInGlobalAddressList': 'false',
'whoCanLeaveGroup': 'NONE_CAN_LEAVE',
'whoCanContactOwner': 'ALL_MANAGERS_CAN_CONTACT',
'whoCanModerateMembers': 'OWNERS_ONLY',
'whoCanDiscoverGroup': 'ALL_IN_DOMAIN_CAN_DISCOVER',
})
# Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
inter_batch_delay: numbers.Real = 5
# Batch size for Google API calls. Google supports batching requests together into one API
# call.
batch_size: int = 50
# Number of times to retry HTTP requests if a 503 "Service Unavailable" received
http_retries: int = 2
# Delay in seconds between HTTP 503 response retries
http_retry_delay: numbers.Real = 5
def sync(configuration, *, read_only=True):
"""Perform sync given configuration dictionary."""
if read_only:
LOG.info('Performing synchronisation in READ ONLY mode.')
else:
LOG.info('Performing synchronisation in WRITE mode.')
# Parse configuration
sync_config = Configuration.from_dict(configuration.get('sync', {}))
gapi_auth_config = gapiauth.Configuration.from_dict(
configuration.get('google_api', {}).get('auth', {}))
gapi_domain_config = gapidomain.Configuration.from_dict(
configuration.get('google_domain', {}))
ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))
# Load appropriate Google credentials.
creds = (
gapi_auth_config.load_credentials(read_only=read_only)
.with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
.with_subject(gapi_domain_config.admin_user)
)
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Functions to translate the unique identifiers of users and groups in Lookup (uids and
# groupIDs) to and from the unique identifiers used in Google (email addresses).
#
# For users: {uid} <-> {uid}@{domain}
# For groups: {groupID} <-> {groupID}@{groups_domain}
#
# Additionally, valid uids (CRSids) match the regex [a-z][a-z0-9]{3,7}, and valid groupIDs
# match the regex [0-9]{6,8}.
user_email_regex = re.compile('^[a-z][a-z0-9]{3,7}@.*$')
group_email_regex = re.compile('^[0-9]{6,8}@.*$')
groups_domain = (
gapi_domain_config.groups_domain
if gapi_domain_config.groups_domain is not None
else gapi_domain_config.name
)
def uid_to_email(uid):
return f'{uid}@{gapi_domain_config.name}'
def email_to_uid(email):
return email.split('@')[0] if user_email_regex.match(email) else None
def groupID_to_email(groupID):
return f'{groupID}@{groups_domain}'
def email_to_groupID(email):
return email.split('@')[0] if group_email_regex.match(email) else None
# --------------------------------------------------------------------------------------------
# Load current user and group data from Lookup.
# --------------------------------------------------------------------------------------------
# Get a set containing all CRSids. These are all the people who are eligible to be in our
# GSuite instance. If a user is in GSuite and is *not* present in this list then they are
# suspended.
LOG.info('Reading eligible user entries from LDAP')
eligible_uids = ldap_config.get_eligible_uids()
LOG.info('Total LDAP user entries: %s', len(eligible_uids))
# Get a set containing all groupIDs. These are all the groups that are eligible to be in our
# GSuite instance. If a group is in GSuite and is *not* present in this list then it is
# deleted.
LOG.info('Reading eligible group entries from LDAP')
eligible_groupIDs = ldap_config.get_eligible_groupIDs()
LOG.info('Total LDAP group entries: %s', len(eligible_groupIDs))
# Get a list of managed users. These are all the people who match the "managed_user_filter" in
# the LDAP settings.
LOG.info('Reading managed user entries from LDAP')
managed_user_entries = ldap_config.get_managed_user_entries()
# Form a mapping from uid to managed user.
managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}
# Form a set of all *managed user* uids
managed_user_uids = set(managed_user_entries_by_uid.keys())
LOG.info('Total managed user entries: %s', len(managed_user_uids))
# Sanity check: the managed users should be a subset of the eligible ones.
if len(managed_user_uids - eligible_uids) != 0:
raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')
# Get a list of managed groups. These are all the groups that match the "managed_group_filter"
# in the LDAP settings.
LOG.info('Reading managed group entries from LDAP')
managed_group_entries = ldap_config.get_managed_group_entries()
# Form a mapping from groupID to managed group.
managed_group_entries_by_groupID = {g.groupID: g for g in managed_group_entries}
# Form a set of all *managed group* groupIDs
managed_group_groupIDs = set(managed_group_entries_by_groupID.keys())
LOG.info('Total managed group entries: %s', len(managed_group_groupIDs))
LOG.info('Total managed group members: %s', sum([len(g.uids) for g in managed_group_entries]))
# Sanity check: the managed groups should be a subset of the eligible ones.
if len(managed_group_groupIDs - eligible_groupIDs) != 0:
raise RuntimeError(
'Sanity check failed: some managed groupIDs were not in the eligible set'
)
# --------------------------------------------------------------------------------------------
# Load current user and group data from Google.
# --------------------------------------------------------------------------------------------
# Build the directory service using Google API discovery.
directory_service = discovery.build('admin', 'directory_v1', credentials=creds)
# Also build the groupssettings service, which is a parallel API to manage group settings
groupssettings_service = discovery.build('groupssettings', 'v1', credentials=creds)
# Retrieve information on all users excluding domain admins.
LOG.info('Getting information on Google domain users')
fields = [
'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
'name(givenName, familyName)',
]
all_google_users = gapiutil.list_all(
directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
# Retrieve information on all groups
LOG.info('Getting information on Google domain groups')
fields = ['id', 'email', 'name', 'description']
all_google_groups = gapiutil.list_all(
directory_service.groups().list, items_key='groups', domain=groups_domain,
fields='nextPageToken,groups(' + ','.join(fields) + ')',
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
)
# Strip any "to be ignored" users out of the results.
if sync_config.ignore_google_org_unit_path_regex is not None:
LOG.info(
'Ignoring users whose organization unit path matches %r',
sync_config.ignore_google_org_unit_path_regex)
# Check that all users have an orgUnitPath
missing_org = [
u for u in all_google_users if 'orgUnitPath' not in u
]
if len(missing_org) != 0:
LOG.error('User entries missing orgUnitPath: %s (starting with %s)',
len(missing_org),
missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0]
else 'user with blank email')
raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath')
# Remove users matching regex
regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
all_google_users = [
u for u in all_google_users if not regex.match(u['orgUnitPath'])
]
# Strip out any users with uids (extracted from the local-part of the email address) that
# aren't valid CRSids. These users can't have come from Lookup, and so should not be managed
# (suspended) by this script.
all_google_users = [u for u in all_google_users if email_to_uid(u['primaryEmail'])]
# Strip out any groups whose email addresses don't match the pattern for groups created from
# Lookup groupIDs, and which therefore should not be managed (deleted) by this script.
all_google_groups = [g for g in all_google_groups if email_to_groupID(g['email'])]
# Sanity check. There should be no admins in the returned results.
if any(u.get('isAdmin', False) for u in all_google_users):
raise RuntimeError('Sanity check failed: admin users in user list')
# Form mappings from uid/groupID to Google user/group.
all_google_users_by_uid = {email_to_uid(u['primaryEmail']): u for u in all_google_users}
all_google_groups_by_groupID = {email_to_groupID(g['email']): g for g in all_google_groups}
# Form sets of all Google-side uids and groupIDs. The all_google_uids set is all users
# including the suspended ones and the suspended_google_uids set is only the suspended users.
# Non suspended users are therefore all_google_uids - suspended_google_uids. Groups do not have
# any concept of being suspended.
all_google_uids = set(all_google_users_by_uid.keys())
all_google_groupIDs = set(all_google_groups_by_groupID.keys())
suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}
# Sanity check. We should not have lost anything. (I.e. the uids and groupIDs should be
# unique.)
if len(all_google_uids) != len(all_google_users):
raise RuntimeError('Sanity check failed: user list changed length')
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
if len(all_google_groupIDs) != len(all_google_groups):
raise RuntimeError('Sanity check failed: group list changed length')
# Retrieve all Google group settings.
fields = ['email', *[k for k in sync_config.group_settings.keys()]]
all_google_group_settings = gapiutil.get_all_in_list(
groupssettings_service, groupssettings_service.groups().get,
item_ids=[g['email'] for g in all_google_groups], id_key='groupUniqueId',
batch_size=sync_config.batch_size, fields=','.join(fields),
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
)
# Form a mapping from groupID to Google group settings.
all_google_group_settings_by_groupID = {
email_to_groupID(g['email']): g for g in all_google_group_settings
}
# Santiy check. We should have settings for each managed group.
if len(all_google_group_settings_by_groupID) != len(all_google_groups):
raise RuntimeError('Sanity check failed: group settings list does not match group list')
# Retrieve all Google group memberships. This is a mapping from internal Google group ids to
# lists of member resources.
fields = ['id', 'email']
all_google_members = gapiutil.list_all_in_list(
directory_service, directory_service.members().list,
item_ids=[g['id'] for g in all_google_groups], id_key='groupKey',
batch_size=sync_config.batch_size, items_key='members',
fields='nextPageToken,members(' + ','.join(fields) + ')',
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
)
# Santiy check. We should have a group members list for each managed group.
if len(all_google_members) != len(all_google_groups):
raise RuntimeError('Sanity check failed: groups in members map do not match group list')
# Log some stats.
LOG.info('Total Google users: %s', len(all_google_uids))
LOG.info('Total Google groups: %s', len(all_google_groupIDs))
LOG.info(
'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))
LOG.info(
'Total Google group members: %s', sum([len(m) for g, m in all_google_members.items()])
)
# --------------------------------------------------------------------------------------------
# Compute differences between the Lookup and Google data.
# --------------------------------------------------------------------------------------------
# For each user which exists in Google or the managed user set which is eligible, determine if
# they need updating/creating. If so, record a patch/insert for the user.
LOG.info('Calculating updates...')
google_user_updates = {}
google_user_creations = {}
for uid, managed_user_entry in managed_user_entries_by_uid.items():
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# Heuristically determine the given and family names.
names = naming.get_names(
uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
sn=managed_user_entry.sn)
# Form expected user resource fields.
expected_google_user = {
'name': {
'givenName': names.given_name,
'familyName': names.family_name,
},
}
# Find existing Google user (if any).
existing_google_user = all_google_users_by_uid.get(uid)
if existing_google_user is not None:
# See if we need to change the existing user
# Unless anything needs changing, the patch is empty.
patch = {}
# Determine how to patch user's name.
google_user_name = existing_google_user.get('name', {})
patch_name = {}
if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
patch_name['givenName'] = names.given_name
if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
patch_name['familyName'] = names.family_name
if len(patch_name) > 0:
patch['name'] = patch_name
# Only record non-empty patches.
if len(patch) > 0:
google_user_updates[uid] = patch
else:
# No existing Google user. Record the new resource. Generate a new user password and
# send Google the hash. It doesn't matter what this password is since we never have the
# user log in with it. For password-only applications the user can make use of an
# application-specific password.
new_user = {
**expected_google_user,
}
google_user_creations[uid] = new_user
# For each group which exists in Google or the managed group set which is eligible, determine
# if it needs updating/creating. If so, record a patch/insert for the group.
google_group_updates = {}
google_group_creations = {}
for groupID, managed_group_entry in managed_group_entries_by_groupID.items():
# Form expected group resource fields. The 2 Google APIs we use here to update groups in
# Google each have different maximum lengths for group names and descriptions, and
# empirically the APIs don't function properly if either limit is exceeded, so we use the
# minimum of the 2 documented maximum field lengths (73 characters for names and 300
# characters for descriptions).
expected_google_group = {
'name': _trim_text(
managed_group_entry.groupName, maxlen=73, suffix=sync_config.group_name_suffix
),
'description': _trim_text(managed_group_entry.description, maxlen=300)
}
# Find existing Google group (if any).
existing_google_group = all_google_groups_by_groupID.get(groupID)
if existing_google_group is not None:
# See if we need to change the existing group
# Unless anything needs changing, the patch is empty.
patch = {}
if existing_google_group.get('name') != expected_google_group['name']:
patch['name'] = expected_google_group['name']
if existing_google_group.get('description') != expected_google_group['description']:
patch['description'] = expected_google_group['description']
# Only record non-empty patches.
if len(patch) > 0:
google_group_updates[groupID] = patch
else:
# No existing Google group, so create one.
google_group_creations[groupID] = {
'email': groupID_to_email(groupID),
**expected_google_group
}
# Form a set of all the uids which need patching.
uids_to_update = set(google_user_updates.keys())
LOG.info('Number of existing users to update: %s', len(uids_to_update))
# Form a set of all the groupIDs which need patching.
groupIDs_to_update = set(google_group_updates.keys())
LOG.info('Number of existing groups to update: %s', len(groupIDs_to_update))
# Form a set of all the uids which need adding.
uids_to_add = set(google_user_creations.keys())
LOG.info('Number of users to add: %s', len(uids_to_add))
# Form a set of all the groupIDs which need adding.
groupIDs_to_add = set(google_group_creations.keys())
LOG.info('Number of groups to add: %s', len(groupIDs_to_add))
# Form a set of all uids which need reactivating. We reactive users who are in the managed user
# list *and* the suspended user list.
uids_to_reactivate = suspended_google_uids & managed_user_uids
LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))
# Form a set of all uids which should be suspended. This is all the unsuspended Google uids
# which do not appear in our eligible user list.
uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
LOG.info('Number of users to suspend: %s', len(uids_to_suspend))
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# Form a set of all groupIDs which need deleting.
groupIDs_to_delete = all_google_groupIDs - eligible_groupIDs
LOG.info('Number of groups to delete: %s', len(groupIDs_to_delete))
# For each managed group, determine which members to insert or delete. These are lists of
# (groupID, uid) tuples.
members_to_insert = []
members_to_delete = []
for groupID, managed_group_entry in managed_group_entries_by_groupID.items():
# Find the existing Google group members.
existing_google_group = all_google_groups_by_groupID.get(groupID)
if existing_google_group:
existing_members = all_google_members[existing_google_group['id']]
existing_member_uids = set([email_to_uid(m['email']) for m in existing_members])
else:
existing_member_uids = set()
# Members to insert. This is restricted to the managed user set, so that we don't attempt
# to insert a member resource for a non-existent user.
insert_uids = (
(managed_group_entry.uids - existing_member_uids).intersection(managed_user_uids)
)
members_to_insert.extend([(groupID, uid) for uid in insert_uids])
# Members to delete. This is restricted to the eligible user set, so that we don't bother
# to delete a member resource when the user is suspended (and so we won't need to re-add
# it if the user is reactivated).
delete_uids = (
(existing_member_uids - managed_group_entry.uids).intersection(eligible_uids)
)
members_to_delete.extend([(groupID, uid) for uid in delete_uids])
LOG.info('Number of group members to insert: %s', len(members_to_insert))
LOG.info('Number of group members to delete: %s', len(members_to_delete))
# --------------------------------------------------------------------------------------------
# Enforce limits on how much data to change in Google.
# --------------------------------------------------------------------------------------------
# Calculate percentage change to users, groups and group members.
user_change_percentage = 100. * (
len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
/
max(1, len(all_google_uids))
)
LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)
group_change_percentage = 100. * (
len(groupIDs_to_add | groupIDs_to_update | groupIDs_to_delete)
/
max(1, len(all_google_groupIDs))
)
LOG.info('Configuration will modify %.2f%% of groups', group_change_percentage)
member_change_percentage = 100. * (
(len(members_to_insert) + len(members_to_delete))
/
max(1, sum([len(m) for g, m in all_google_members.items()]))
)
LOG.info('Configuration will modify %.2f%% of group members', member_change_percentage)
# Enforce percentage change sanity checks.
if (limits_config.abort_user_change_percentage is not None and
user_change_percentage > limits_config.abort_user_change_percentage):
LOG.error(
'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
user_change_percentage, limits_config.abort_user_change_percentage
)
raise RuntimeError('Aborting due to large user change percentage')
if (limits_config.abort_group_change_percentage is not None and
group_change_percentage > limits_config.abort_group_change_percentage):
LOG.error(
'Modification of %.2f%% of groups is greater than limit of %.2f%%. Aborting.',
group_change_percentage, limits_config.abort_group_change_percentage
)
raise RuntimeError('Aborting due to large group change percentage')
if (limits_config.abort_member_change_percentage is not None and
member_change_percentage > limits_config.abort_member_change_percentage):
LOG.error(
'Modification of %.2f%% of group members is greater than limit of %.2f%%. Aborting.',
member_change_percentage, limits_config.abort_member_change_percentage
)
raise RuntimeError('Aborting due to large group member change percentage')
# Cap maximum size of various operations.
if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
# Ensure that we do not attempt to insert a group member for any of the users not added as
# a result of this cap, since these users won't exist in Google
capped_uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
uids_not_added = uids_to_add - capped_uids_to_add
members_to_insert = [(g, u) for g, u in members_to_insert if u not in uids_not_added]
uids_to_add = capped_uids_to_add
LOG.info('Capped number of new users to %s', len(uids_to_add))
if (limits_config.max_new_groups is not None and
len(groupIDs_to_add) > limits_config.max_new_groups):
# Ensure that we do not attempt to insert a group member for any of the groups not added
# as a result of this cap, since these groups won't exist in Google
capped_groupIDs_to_add = _limit(groupIDs_to_add, limits_config.max_new_groups)
groupIDs_not_added = groupIDs_to_add - capped_groupIDs_to_add
members_to_insert = [(g, u) for g, u in members_to_insert if g not in groupIDs_not_added]
groupIDs_to_add = capped_groupIDs_to_add
LOG.info('Capped number of new groups to %s', len(groupIDs_to_add))
if (limits_config.max_suspended_users is not None and
len(uids_to_suspend) > limits_config.max_suspended_users):
uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
if (limits_config.max_deleted_groups is not None and
len(groupIDs_to_delete) > limits_config.max_deleted_groups):
groupIDs_to_delete = _limit(groupIDs_to_delete, limits_config.max_deleted_groups)
LOG.info('Capped number of groups to delete to %s', len(groupIDs_to_delete))
if (limits_config.max_reactivated_users is not None and
len(uids_to_reactivate) > limits_config.max_reactivated_users):
uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
if (limits_config.max_updated_users is not None and
len(uids_to_update) > limits_config.max_updated_users):
uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
LOG.info('Capped number of users to update to %s', len(uids_to_update))
if (limits_config.max_updated_groups is not None and
len(groupIDs_to_update) > limits_config.max_updated_groups):
groupIDs_to_update = _limit(groupIDs_to_update, limits_config.max_updated_groups)
LOG.info('Capped number of groups to update to %s', len(groupIDs_to_update))
if (limits_config.max_inserted_members is not None and
len(members_to_insert) > limits_config.max_inserted_members):
members_to_insert = members_to_insert[0:limits_config.max_inserted_members]
LOG.info('Capped number of group members to insert to %s', len(members_to_insert))
if (limits_config.max_deleted_members is not None and
len(members_to_delete) > limits_config.max_deleted_members):
members_to_delete = members_to_delete[0:limits_config.max_deleted_members]
LOG.info('Capped number of group members to delete to %s', len(members_to_delete))
# --------------------------------------------------------------------------------------------
# Finally, perform the actual updates in Google.
# --------------------------------------------------------------------------------------------
# A generator which will generate patch() and insert() calls to the directory service to
# perform the actions required to update users
def user_api_requests():
# Update existing users.
user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
for uid, update in user_updates.items():
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Update user "%s": "%r"', uid, update)
yield directory_service.users().patch(userKey=google_id, body=update)
# Suspend old users
for uid in uids_to_suspend:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Suspending user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': True})
# Reactivate returning users
for uid in uids_to_reactivate:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Reactivating user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': False})
# Create new users
for uid in uids_to_add:
# Generate a random password which is thrown away.
new_user = {**{
'hashFunction': 'crypt',
'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
'orgUnitPath': sync_config.new_user_org_unit_path,
}, **google_user_creations[uid]}
redacted_user = {**new_user, **{'password': 'REDACTED'}}
LOG.info('Adding user "%s": %s', uid, redacted_user)
yield directory_service.users().insert(body=new_user)
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
# A generator which will generate patch(), insert() and delete() calls to the directory
# service to perform the actions required to update groups
def group_api_requests():
# Update existing groups
group_updates = {groupID: google_group_updates[groupID] for groupID in groupIDs_to_update}
for groupID, update in group_updates.items():
google_id = all_google_groups_by_groupID[groupID]['id']
LOG.info('Update group "%s": "%r"', groupID, update)
yield directory_service.groups().patch(groupKey=google_id, body=update)
# Delete cancelled groups
for groupID in groupIDs_to_delete:
google_id = all_google_groups_by_groupID[groupID]['id']
LOG.info('Deleting group: "%s"', groupID)
yield directory_service.groups().delete(groupKey=google_id)
# Create new groups
for groupID in groupIDs_to_add:
new_group = google_group_creations[groupID]
LOG.info('Adding group "%s": %s', groupID, new_group)
yield directory_service.groups().insert(body=new_group)
# A generator which will generate patch() calls to the groupssettings service to set or
# update the required group settings.
def group_settings_api_requests():
# Apply all settings to new groups.
for groupID in groupIDs_to_add:
email = groupID_to_email(groupID)
settings = sync_config.group_settings
LOG.info('Updating settings for new group "%s": %s', groupID, settings)
yield groupssettings_service.groups().patch(groupUniqueId=email, body=settings)
# Apply any settings that differ to pre-existing groups.
for groupID, settings in all_google_group_settings_by_groupID.items():
patch = {k: v for k, v in sync_config.group_settings.items() if settings.get(k) != v}
if patch:
email = groupID_to_email(groupID)
LOG.info('Updating settings for existing group "%s": %s', groupID, patch)
yield groupssettings_service.groups().patch(groupUniqueId=email, body=patch)
# A generator which will generate insert() and delete() calls to the directory service to
# perform the actions required to update group members
def member_api_requests():
# Insert new members
for groupID, uid in members_to_insert:
group_key = groupID_to_email(groupID)
user_key = uid_to_email(uid)
LOG.info('Adding user "%s" to group "%s"', user_key, group_key)
yield directory_service.members().insert(groupKey=group_key, body={'email': user_key})
# Delete removed members
for groupID, uid in members_to_delete:
group_key = groupID_to_email(groupID)
user_key = uid_to_email(uid)
LOG.info('Removing user "%s" from group "%s"', user_key, group_key)
yield directory_service.members().delete(groupKey=group_key, memberKey=user_key)
# Process an iterable list of requests to the specified Google service in batches. These APIs
# support a maximum batch size of 1000. See:
# https://developers.google.com/admin-sdk/directory/v1/guides/batch
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
def process_requests(service, requests):
for request_batch in _grouper(requests, n=sync_config.batch_size):
# Form batch request.
batch = service.new_batch_http_request()
for request in request_batch:
batch.add(request, callback=_handle_batch_response)
# Execute the batch request if not in read only mode. Otherwise log that we would
# have.
if not read_only:
LOG.info('Issuing batch request to Google.')
time.sleep(sync_config.inter_batch_delay)
retries = sync_config.http_retries
while True:
try:
batch.execute()
except errors.HttpError as err:
if (err.resp.status == 503 and retries > 0):
retries -= 1
LOG.warn('503: Service unavailable - retrying')
time.sleep(sync_config.http_retry_delay)
continue
if retries == 0:
LOG.error('503: Service unavailable - retry count exceeded')
raise
break
else:
LOG.info('Not issuing batch request in read-only mode.')
# Process all the user, group and group member updates
process_requests(directory_service, user_api_requests())
process_requests(directory_service, group_api_requests())
process_requests(groupssettings_service, group_settings_api_requests())
process_requests(directory_service, member_api_requests())
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
def _handle_batch_response(request_id, response, exception):
if exception is not None:
LOG.error('Error performing request: %s', exception)
LOG.error('Response: %r', response)
def _limit(s, limit):
"""
Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
exact set of elements retained is not specified.
>>> s = set('ABCDEFGHIJKLMNOPQ')
>>> len(s) > 5
True
>>> len(_limit(s, 5)) == 5
True
>>> len(_limit(s, 500)) == len(s)
True
All elements of the returned set are taken from input set.
>>> s_prime = _limit(s, 5)
>>> s_prime - s
set()
"""
return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}
def _grouper(iterable, *, n):
"""
Group an iterable into chunks of at most *n* elements. A generator which yields iterables
representing slices of *iterable*.
>>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
>>> def generator(stop):
... for x in range(stop):
... yield x
>>> [list(i) for i in _grouper(generator(10), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> [list(i) for i in _grouper(generator(12), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]
The implementation of this function attempts to be efficient; the chunks are iterables which
are generated on demand rather than being constructed first. Hence this function can deal with
iterables which would fill memory if intermediate chunks were stored.
>>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
>>> next(next(i))
0
"""
it = iter(iterable)
while True:
next_chunk_it = itertools.islice(it, n)
try:
first = next(next_chunk_it)
except StopIteration:
return
yield itertools.chain((first,), next_chunk_it)
def _trim_text(text, *, maxlen, cont='...', suffix=''):
"""
Trim text to be no more than "maxlen" characters long, terminating it with "cont" if it had
to be truncated. If supplied, "suffix" is appended to the string after truncating, and the
truncation point adjusted so that the total length remains less than "maxlen".
"""
return (
text[0:maxlen-len(cont)-len(suffix)]+cont+suffix
if len(text)+len(suffix) > maxlen else text+suffix
)