-
Robin Goodall authoredRobin Goodall authored
Code owners
sync.py 16.93 KiB
"""
Synchronise Google Directory with a local LDAP directory.
"""
import crypt
import dataclasses
import itertools
import logging
import numbers
import re
import secrets
import time
import typing
from googleapiclient import discovery, errors
from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming
LOG = logging.getLogger(__name__)
# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user.readonly',
]
# Scoped *in addition to READ_ONLY_SCOPES* required to perform a full update.
WRITE_SCOPES = [
'https://www.googleapis.com/auth/admin.directory.user',
]
@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
# A regular expression which is used to match the organization unit path for Google users who
# should be excluded from the list returned by Google. Those users do not exist for the
# purposes of the rest of the sync and so if they appear in the list of managed users this
# script will attempt to re-add them and fail in the process. Use this setting for users who
# are managed completely outside of this script.
ignore_google_org_unit_path_regex: typing.Union[str, None] = None
# The organization unit path in which new accounts are placed
new_user_org_unit_path: str = '/'
# Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
inter_batch_delay: numbers.Real = 5
# Batch size for Google API calls. Google supports batching requests together into one API
# call.
batch_size: int = 50
# Number of times to retry HTTP requests if a 503 "Service Unavailable" received
http_retries: int = 2
# Delay in seconds between HTTP 503 response retries
http_retry_delay: numbers.Real = 5
def sync(configuration, *, read_only=True):
"""Perform sync given configuration dictionary."""
if read_only:
LOG.info('Performing synchronisation in READ ONLY mode.')
else:
LOG.info('Performing synchronisation in WRITE mode.')
# Parse configuration
sync_config = Configuration.from_dict(configuration.get('sync', {}))
gapi_auth_config = gapiauth.Configuration.from_dict(
configuration.get('google_api', {}).get('auth', {}))
gapi_domain_config = gapidomain.Configuration.from_dict(
configuration.get('google_domain', {}))
ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))
# Load appropriate Google credentials.
creds = (
gapi_auth_config.load_credentials(read_only=read_only)
.with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
.with_subject(gapi_domain_config.admin_user)
)
# Get a set containing all CRSids. These are all the people who are eligible to be in our
# GSuite instance. If a user is in GSuite and is *not* present in this list then they are
# suspended.
LOG.info('Reading eligible user entries from LDAP')
eligible_uids = ldap_config.get_eligible_uids()
LOG.info('Total LDAP entries: %s', len(eligible_uids))
# Get a list of managed users. These are all the people who match the "managed_user_filter" in
# the LDAP settings.
LOG.info('Reading managed user entries from LDAP')
managed_user_entries = ldap_config.get_managed_user_entries()
# Form a mapping from uid to managed user.
managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}
# Form a set of all *managed user* uids
managed_user_uids = set(managed_user_entries_by_uid.keys())
LOG.info('Total managed user entries: %s', len(managed_user_uids))
# Sanity check: the managed users should be a subset of the eligible ones.
if len(managed_user_uids - eligible_uids) != 0:
raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')
# Build the directory service using Google API discovery.
directory_service = discovery.build('admin', 'directory_v1', credentials=creds)
# Retrieve information on all users excluding domain admins.
LOG.info('Getting information on Google domain users')
fields = [
'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
'name(givenName, familyName)',
]
all_google_users = gapiutil.list_all(
directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
)
# Strip any "to be ignored" users out of the results.
if sync_config.ignore_google_org_unit_path_regex is not None:
LOG.info(
'Ignoring users whose organization unit path matches %r',
sync_config.ignore_google_org_unit_path_regex)
# Check that all users have an orgUnitPath
missing_org = [
u for u in all_google_users if 'orgUnitPath' not in u
]
if len(missing_org) != 0:
LOG.error('User entries missing orgUnitPath: %s (starting with %s)',
len(missing_org),
missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0]
else 'user with blank email')
raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath')
# Remove users matching regex
regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
all_google_users = [
u for u in all_google_users if not regex.match(u['orgUnitPath'])
]
# Sanity check. There should be no admins in the returned results.
if any(u.get('isAdmin', False) for u in all_google_users):
raise RuntimeError('Sanity check failed: admin users in user list')
# Form a mapping from uid to Google user. We form the uid by splitting out the local-part of
# the email address.
all_google_users_by_uid = {u['primaryEmail'].split('@')[0]: u for u in all_google_users}
# Form a set of all Google-side uids. The all_google_uids set is all users including the
# suspended ones and the suspended_google_uids set is only the suspended users. Non suspended
# users are therefore all_google_uids - suspended_google_uids.
all_google_uids = set(all_google_users_by_uid.keys())
suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}
# Sanity check. We should not have lost anyone. (I.e. the uid should be unique.)
if len(all_google_uids) != len(all_google_users):
raise RuntimeError('Sanity check failed: user list changed length')
# Log some stats.
LOG.info('Total Google users: %s', len(all_google_uids))
LOG.info(
'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))
# For each user which exists in Google or the managed user set which is eligible, determine if
# they need updating/creating. If so, record a patch/insert for the user.
LOG.info('Calculating updates...')
google_user_updates = {}
google_user_creations = {}
for uid, managed_user_entry in managed_user_entries_by_uid.items():
# Heuristically determine the given and family names.
names = naming.get_names(
uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
sn=managed_user_entry.sn)
# Form expected user resource fields.
expected_google_user = {
'name': {
'givenName': names.given_name,
'familyName': names.family_name,
},
}
# Find existing Google user (if any).
existing_google_user = all_google_users_by_uid.get(uid)
if existing_google_user is not None:
# See if we need to change the existing user
# Unless anything needs changing, the patch is empty.
patch = {}
# Determine how to patch user's name.
google_user_name = existing_google_user.get('name', {})
patch_name = {}
if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
patch_name['givenName'] = names.given_name
if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
patch_name['familyName'] = names.family_name
if len(patch_name) > 0:
patch['name'] = patch_name
# Only record non-empty patches.
if len(patch) > 0:
google_user_updates[uid] = patch
else:
# No existing Google user. Record the new resource. Generate a new user password and
# send Google the hash. It doesn't matter what this password is since we never have the
# user log in with it. For password-only applications the user can make use of an
# application-specific password.
new_user = {
**{
'primaryEmail': f'{uid}@{gapi_domain_config.name}',
},
**expected_google_user,
}
google_user_creations[uid] = new_user
# Form a set of all the uids which need patching.
uids_to_update = set(google_user_updates.keys())
LOG.info('Number of existing users to update: %s', len(uids_to_update))
# Form a set of all the uids which need adding.
uids_to_add = set(google_user_creations.keys())
LOG.info('Number of users to add: %s', len(uids_to_add))
# Form a set of all uids which need reactivating. We reactive users who are in the managed user
# list *and* the suspended user list.
uids_to_reactivate = suspended_google_uids & managed_user_uids
LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))
# Form a set of all uids which should be suspended. This is all the unsuspended Google uids
# which do not appear in our eligible user list.
uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
LOG.info('Number of users to suspend: %s', len(uids_to_suspend))
# Calculate percentage change.
user_change_percentage = 100. * (
len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
/
max(1, len(all_google_uids))
)
LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)
# Enforce percentage change sanity check.
if (limits_config.abort_user_change_percentage is not None and
user_change_percentage > limits_config.abort_user_change_percentage):
LOG.error(
'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
user_change_percentage, limits_config.abort_user_change_percentage
)
raise RuntimeError('Aborting due to large user change percentage')
# Cap maximum size of various operations.
if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
LOG.info('Capped number of new users to %s', len(uids_to_add))
if (limits_config.max_suspended_users is not None and
len(uids_to_suspend) > limits_config.max_suspended_users):
uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
if (limits_config.max_reactivated_users is not None and
len(uids_to_reactivate) > limits_config.max_reactivated_users):
uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
if (limits_config.max_updated_users is not None and
len(uids_to_update) > limits_config.max_updated_users):
uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
LOG.info('Capped number of users to update to %s', len(uids_to_update))
# A generator which will generate patch() and insert() calls to the directory service to
# perform the actions required
def api_requests():
# Update existing users.
user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
for uid, update in user_updates.items():
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Update user "%s": "%r"', uid, update)
yield directory_service.users().patch(userKey=google_id, body=update)
# Suspend old users
for uid in uids_to_suspend:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Suspending user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': True})
# Reactivate returning users
for uid in uids_to_reactivate:
google_id = all_google_users_by_uid[uid]['id']
LOG.info('Reactivating user: "%s"', uid)
yield directory_service.users().patch(userKey=google_id, body={'suspended': False})
# Create new users
for uid in uids_to_add:
# Generate a random password which is thrown away.
new_user = {**{
'hashFunction': 'crypt',
'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
'orgUnitPath': sync_config.new_user_org_unit_path,
}, **google_user_creations[uid]}
redacted_user = {**new_user, **{'password': 'REDACTED'}}
LOG.info('Adding user "%s": %s', uid, redacted_user)
yield directory_service.users().insert(body=new_user)
# Make an chunked iterator of requests to the directory API. The Directory API supports a
# maximum batch size of 1000. See:
# https://developers.google.com/admin-sdk/directory/v1/guides/batch
for request_batch in _grouper(api_requests(), n=sync_config.batch_size):
# Form batch request.
batch = directory_service.new_batch_http_request()
for request in request_batch:
batch.add(request, callback=_handle_batch_response)
# Execute the batch request if not in read only mode. Otherwise log that we would have.
if not read_only:
LOG.info('Issuing batch request to Google.')
time.sleep(sync_config.inter_batch_delay)
retries = sync_config.http_retries
while True:
try:
batch.execute()
except errors.HttpError as err:
if (err.resp.status == 503 and retries > 0):
retries -= 1
LOG.warn('503: Service unavailable - retrying')
time.sleep(sync_config.http_retry_delay)
continue
if retries == 0:
LOG.error('503: Service unavailable - retry count exceeded')
raise
break
else:
LOG.info('Not issuing batch request in read-only mode.')
def _handle_batch_response(request_id, response, exception):
if exception is not None:
LOG.error('Error performing request: %s', exception)
LOG.error('Response: %r', response)
def _limit(s, limit):
"""
Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
exact set of elements retained is not specified.
>>> s = set('ABCDEFGHIJKLMNOPQ')
>>> len(s) > 5
True
>>> len(_limit(s, 5)) == 5
True
>>> len(_limit(s, 500)) == len(s)
True
All elements of the returned set are taken from input set.
>>> s_prime = _limit(s, 5)
>>> s_prime - s
set()
"""
return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}
def _grouper(iterable, *, n):
"""
Group an iterable into chunks of at most *n* elements. A generator which yields iterables
representing slices of *iterable*.
>>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
[['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
>>> def generator(stop):
... for x in range(stop):
... yield x
>>> [list(i) for i in _grouper(generator(10), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> [list(i) for i in _grouper(generator(12), n=3)]
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]
The implementation of this function attempts to be efficient; the chunks are iterables which
are generated on demand rather than being constructed first. Hence this function can deal with
iterables which would fill memory if intermediate chunks were stored.
>>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
>>> next(next(i))
0
"""
it = iter(iterable)
while True:
next_chunk_it = itertools.islice(it, n)
try:
first = next(next_chunk_it)
except StopIteration:
return
yield itertools.chain((first,), next_chunk_it)