""" Synchronise Google Directory with a local LDAP directory. """ import crypt import dataclasses import itertools import logging import numbers import re import secrets import time import typing from googleapiclient import discovery, errors from . import config from . import gapiauth from . import gapidomain from . import gapiutil from . import ldap from . import limits from . import naming LOG = logging.getLogger(__name__) # Scopes required to perform read-only actions. READ_ONLY_SCOPES = [ 'https://www.googleapis.com/auth/admin.directory.user.readonly', ] # Scoped *in addition to READ_ONLY_SCOPES* required to perform a full update. WRITE_SCOPES = [ 'https://www.googleapis.com/auth/admin.directory.user', ] @dataclasses.dataclass class Configuration(config.ConfigurationDataclassMixin): # A regular expression which is used to match the organization unit path for Google users who # should be excluded from the list returned by Google. Those users do not exist for the # purposes of the rest of the sync and so if they appear in the list of managed users this # script will attempt to re-add them and fail in the process. Use this setting for users who # are managed completely outside of this script. ignore_google_org_unit_path_regex: typing.Union[str, None] = None # The organization unit path in which new accounts are placed new_user_org_unit_path: str = '/' # Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits. inter_batch_delay: numbers.Real = 5 # Batch size for Google API calls. Google supports batching requests together into one API # call. batch_size: int = 50 # Number of times to retry HTTP requests if a 503 "Service Unavailable" received http_retries: int = 2 # Delay in seconds between HTTP 503 response retries http_retry_delay: numbers.Real = 5 def sync(configuration, *, read_only=True): """Perform sync given configuration dictionary.""" if read_only: LOG.info('Performing synchronisation in READ ONLY mode.') else: LOG.info('Performing synchronisation in WRITE mode.') # Parse configuration sync_config = Configuration.from_dict(configuration.get('sync', {})) gapi_auth_config = gapiauth.Configuration.from_dict( configuration.get('google_api', {}).get('auth', {})) gapi_domain_config = gapidomain.Configuration.from_dict( configuration.get('google_domain', {})) ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {})) limits_config = limits.Configuration.from_dict(configuration.get('limits', {})) # Load appropriate Google credentials. creds = ( gapi_auth_config.load_credentials(read_only=read_only) .with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES)) .with_subject(gapi_domain_config.admin_user) ) # Get a set containing all CRSids. These are all the people who are eligible to be in our # GSuite instance. If a user is in GSuite and is *not* present in this list then they are # suspended. LOG.info('Reading eligible user entries from LDAP') eligible_uids = ldap_config.get_eligible_uids() LOG.info('Total LDAP entries: %s', len(eligible_uids)) # Get a list of managed users. These are all the people who match the "managed_user_filter" in # the LDAP settings. LOG.info('Reading managed user entries from LDAP') managed_user_entries = ldap_config.get_managed_user_entries() # Form a mapping from uid to managed user. managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries} # Form a set of all *managed user* uids managed_user_uids = set(managed_user_entries_by_uid.keys()) LOG.info('Total managed user entries: %s', len(managed_user_uids)) # Sanity check: the managed users should be a subset of the eligible ones. if len(managed_user_uids - eligible_uids) != 0: raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set') # Build the directory service using Google API discovery. directory_service = discovery.build('admin', 'directory_v1', credentials=creds) # Retrieve information on all users excluding domain admins. LOG.info('Getting information on Google domain users') fields = [ 'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason', 'name(givenName, familyName)', ] all_google_users = gapiutil.list_all( directory_service.users().list, items_key='users', domain=gapi_domain_config.name, query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')', retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay, ) # Strip any "to be ignored" users out of the results. if sync_config.ignore_google_org_unit_path_regex is not None: LOG.info( 'Ignoring users whose organization unit path matches %r', sync_config.ignore_google_org_unit_path_regex) # Check that all users have an orgUnitPath missing_org = [ u for u in all_google_users if 'orgUnitPath' not in u ] if len(missing_org) != 0: LOG.error('User entries missing orgUnitPath: %s (starting with %s)', len(missing_org), missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0] else 'user with blank email') raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath') # Remove users matching regex regex = re.compile(sync_config.ignore_google_org_unit_path_regex) all_google_users = [ u for u in all_google_users if not regex.match(u['orgUnitPath']) ] # Sanity check. There should be no admins in the returned results. if any(u.get('isAdmin', False) for u in all_google_users): raise RuntimeError('Sanity check failed: admin users in user list') # Form a mapping from uid to Google user. We form the uid by splitting out the local-part of # the email address. all_google_users_by_uid = {u['primaryEmail'].split('@')[0]: u for u in all_google_users} # Form a set of all Google-side uids. The all_google_uids set is all users including the # suspended ones and the suspended_google_uids set is only the suspended users. Non suspended # users are therefore all_google_uids - suspended_google_uids. all_google_uids = set(all_google_users_by_uid.keys()) suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']} # Sanity check. We should not have lost anyone. (I.e. the uid should be unique.) if len(all_google_uids) != len(all_google_users): raise RuntimeError('Sanity check failed: user list changed length') # Log some stats. LOG.info('Total Google users: %s', len(all_google_uids)) LOG.info( 'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users)) # For each user which exists in Google or the managed user set which is eligible, determine if # they need updating/creating. If so, record a patch/insert for the user. LOG.info('Calculating updates...') google_user_updates = {} google_user_creations = {} for uid, managed_user_entry in managed_user_entries_by_uid.items(): # Heuristically determine the given and family names. names = naming.get_names( uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn, sn=managed_user_entry.sn) # Form expected user resource fields. expected_google_user = { 'name': { 'givenName': names.given_name, 'familyName': names.family_name, }, } # Find existing Google user (if any). existing_google_user = all_google_users_by_uid.get(uid) if existing_google_user is not None: # See if we need to change the existing user # Unless anything needs changing, the patch is empty. patch = {} # Determine how to patch user's name. google_user_name = existing_google_user.get('name', {}) patch_name = {} if google_user_name.get('givenName') != expected_google_user['name']['givenName']: patch_name['givenName'] = names.given_name if google_user_name.get('familyName') != expected_google_user['name']['familyName']: patch_name['familyName'] = names.family_name if len(patch_name) > 0: patch['name'] = patch_name # Only record non-empty patches. if len(patch) > 0: google_user_updates[uid] = patch else: # No existing Google user. Record the new resource. Generate a new user password and # send Google the hash. It doesn't matter what this password is since we never have the # user log in with it. For password-only applications the user can make use of an # application-specific password. new_user = { **{ 'primaryEmail': f'{uid}@{gapi_domain_config.name}', }, **expected_google_user, } google_user_creations[uid] = new_user # Form a set of all the uids which need patching. uids_to_update = set(google_user_updates.keys()) LOG.info('Number of existing users to update: %s', len(uids_to_update)) # Form a set of all the uids which need adding. uids_to_add = set(google_user_creations.keys()) LOG.info('Number of users to add: %s', len(uids_to_add)) # Form a set of all uids which need reactivating. We reactive users who are in the managed user # list *and* the suspended user list. uids_to_reactivate = suspended_google_uids & managed_user_uids LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate)) # Form a set of all uids which should be suspended. This is all the unsuspended Google uids # which do not appear in our eligible user list. uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids LOG.info('Number of users to suspend: %s', len(uids_to_suspend)) # Calculate percentage change. user_change_percentage = 100. * ( len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend) / max(1, len(all_google_uids)) ) LOG.info('Configuration will modify %.2f%% of users', user_change_percentage) # Enforce percentage change sanity check. if (limits_config.abort_user_change_percentage is not None and user_change_percentage > limits_config.abort_user_change_percentage): LOG.error( 'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.', user_change_percentage, limits_config.abort_user_change_percentage ) raise RuntimeError('Aborting due to large user change percentage') # Cap maximum size of various operations. if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users: uids_to_add = _limit(uids_to_add, limits_config.max_new_users) LOG.info('Capped number of new users to %s', len(uids_to_add)) if (limits_config.max_suspended_users is not None and len(uids_to_suspend) > limits_config.max_suspended_users): uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users) LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend)) if (limits_config.max_reactivated_users is not None and len(uids_to_reactivate) > limits_config.max_reactivated_users): uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users) LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate)) if (limits_config.max_updated_users is not None and len(uids_to_update) > limits_config.max_updated_users): uids_to_update = _limit(uids_to_update, limits_config.max_updated_users) LOG.info('Capped number of users to update to %s', len(uids_to_update)) # A generator which will generate patch() and insert() calls to the directory service to # perform the actions required def api_requests(): # Update existing users. user_updates = {uid: google_user_updates[uid] for uid in uids_to_update} for uid, update in user_updates.items(): google_id = all_google_users_by_uid[uid]['id'] LOG.info('Update user "%s": "%r"', uid, update) yield directory_service.users().patch(userKey=google_id, body=update) # Suspend old users for uid in uids_to_suspend: google_id = all_google_users_by_uid[uid]['id'] LOG.info('Suspending user: "%s"', uid) yield directory_service.users().patch(userKey=google_id, body={'suspended': True}) # Reactivate returning users for uid in uids_to_reactivate: google_id = all_google_users_by_uid[uid]['id'] LOG.info('Reactivating user: "%s"', uid) yield directory_service.users().patch(userKey=google_id, body={'suspended': False}) # Create new users for uid in uids_to_add: # Generate a random password which is thrown away. new_user = {**{ 'hashFunction': 'crypt', 'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512), 'orgUnitPath': sync_config.new_user_org_unit_path, }, **google_user_creations[uid]} redacted_user = {**new_user, **{'password': 'REDACTED'}} LOG.info('Adding user "%s": %s', uid, redacted_user) yield directory_service.users().insert(body=new_user) # Make an chunked iterator of requests to the directory API. The Directory API supports a # maximum batch size of 1000. See: # https://developers.google.com/admin-sdk/directory/v1/guides/batch for request_batch in _grouper(api_requests(), n=sync_config.batch_size): # Form batch request. batch = directory_service.new_batch_http_request() for request in request_batch: batch.add(request, callback=_handle_batch_response) # Execute the batch request if not in read only mode. Otherwise log that we would have. if not read_only: LOG.info('Issuing batch request to Google.') time.sleep(sync_config.inter_batch_delay) retries = sync_config.http_retries while True: try: batch.execute() except errors.HttpError as err: if (err.resp.status == 503 and retries > 0): retries -= 1 LOG.warn('503: Service unavailable - retrying') time.sleep(sync_config.http_retry_delay) continue if retries == 0: LOG.error('503: Service unavailable - retry count exceeded') raise break else: LOG.info('Not issuing batch request in read-only mode.') def _handle_batch_response(request_id, response, exception): if exception is not None: LOG.error('Error performing request: %s', exception) LOG.error('Response: %r', response) def _limit(s, limit): """ Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The exact set of elements retained is not specified. >>> s = set('ABCDEFGHIJKLMNOPQ') >>> len(s) > 5 True >>> len(_limit(s, 5)) == 5 True >>> len(_limit(s, 500)) == len(s) True All elements of the returned set are taken from input set. >>> s_prime = _limit(s, 5) >>> s_prime - s set() """ return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))} def _grouper(iterable, *, n): """ Group an iterable into chunks of at most *n* elements. A generator which yields iterables representing slices of *iterable*. >>> [list(i) for i in _grouper('ABCDEFGH', n=3)] [['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']] >>> def generator(stop): ... for x in range(stop): ... yield x >>> [list(i) for i in _grouper(generator(10), n=3)] [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] >>> [list(i) for i in _grouper(generator(12), n=3)] [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]] The implementation of this function attempts to be efficient; the chunks are iterables which are generated on demand rather than being constructed first. Hence this function can deal with iterables which would fill memory if intermediate chunks were stored. >>> i = _grouper(generator(100000000000000000000), n=1000000000000000) >>> next(next(i)) 0 """ it = iter(iterable) while True: next_chunk_it = itertools.islice(it, n) try: first = next(next_chunk_it) except StopIteration: return yield itertools.chain((first,), next_chunk_it)