FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
sync.py 16.93 KiB
"""
Synchronise Google Directory with a local LDAP directory.

"""
import crypt
import dataclasses
import itertools
import logging
import numbers
import re
import secrets
import time
import typing

from googleapiclient import discovery, errors

from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming

LOG = logging.getLogger(__name__)

# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user.readonly',
]

# Scoped *in addition to READ_ONLY_SCOPES* required to perform a full update.
WRITE_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user',
]


@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
    # A regular expression which is used to match the organization unit path for Google users who
    # should be excluded from the list returned by Google. Those users do not exist for the
    # purposes of the rest of the sync and so if they appear in the list of managed users this
    # script will attempt to re-add them and fail in the process. Use this setting for users who
    # are managed completely outside of this script.
    ignore_google_org_unit_path_regex: typing.Union[str, None] = None

    # The organization unit path in which new accounts are placed
    new_user_org_unit_path: str = '/'

    # Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
    inter_batch_delay: numbers.Real = 5

    # Batch size for Google API calls. Google supports batching requests together into one API
    # call.
    batch_size: int = 50

    # Number of times to retry HTTP requests if a 503 "Service Unavailable" received
    http_retries: int = 2

    # Delay in seconds between HTTP 503 response retries
    http_retry_delay: numbers.Real = 5


def sync(configuration, *, read_only=True):
    """Perform sync given configuration dictionary."""
    if read_only:
        LOG.info('Performing synchronisation in READ ONLY mode.')
    else:
        LOG.info('Performing synchronisation in WRITE mode.')

    # Parse configuration
    sync_config = Configuration.from_dict(configuration.get('sync', {}))
    gapi_auth_config = gapiauth.Configuration.from_dict(
        configuration.get('google_api', {}).get('auth', {}))
    gapi_domain_config = gapidomain.Configuration.from_dict(
        configuration.get('google_domain', {}))
    ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
    limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))

    # Load appropriate Google credentials.
    creds = (
        gapi_auth_config.load_credentials(read_only=read_only)
        .with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
        .with_subject(gapi_domain_config.admin_user)
    )

    # Get a set containing all CRSids. These are all the people who are eligible to be in our
    # GSuite instance. If a user is in GSuite and is *not* present in this list then they are
    # suspended.
    LOG.info('Reading eligible user entries from LDAP')
    eligible_uids = ldap_config.get_eligible_uids()
    LOG.info('Total LDAP entries: %s', len(eligible_uids))

    # Get a list of managed users. These are all the people who match the "managed_user_filter" in
    # the LDAP settings.
    LOG.info('Reading managed user entries from LDAP')
    managed_user_entries = ldap_config.get_managed_user_entries()

    # Form a mapping from uid to managed user.
    managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}

    # Form a set of all *managed user* uids
    managed_user_uids = set(managed_user_entries_by_uid.keys())
    LOG.info('Total managed user entries: %s', len(managed_user_uids))

    # Sanity check: the managed users should be a subset of the eligible ones.
    if len(managed_user_uids - eligible_uids) != 0:
        raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')

    # Build the directory service using Google API discovery.
    directory_service = discovery.build('admin', 'directory_v1', credentials=creds)

    # Retrieve information on all users excluding domain admins.
    LOG.info('Getting information on Google domain users')
    fields = [
        'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
        'name(givenName, familyName)',
    ]
    all_google_users = gapiutil.list_all(
        directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
        query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
        retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
    )

    # Strip any "to be ignored" users out of the results.
    if sync_config.ignore_google_org_unit_path_regex is not None:
        LOG.info(
            'Ignoring users whose organization unit path matches %r',
            sync_config.ignore_google_org_unit_path_regex)
        # Check that all users have an orgUnitPath
        missing_org = [
            u for u in all_google_users if 'orgUnitPath' not in u
        ]
        if len(missing_org) != 0:
            LOG.error('User entries missing orgUnitPath: %s (starting with %s)',
                      len(missing_org),
                      missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0]
                      else 'user with blank email')
            raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath')
        # Remove users matching regex
        regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
        all_google_users = [
            u for u in all_google_users if not regex.match(u['orgUnitPath'])
        ]

    # Sanity check. There should be no admins in the returned results.
    if any(u.get('isAdmin', False) for u in all_google_users):
        raise RuntimeError('Sanity check failed: admin users in user list')

    # Form a mapping from uid to Google user. We form the uid by splitting out the local-part of
    # the email address.
    all_google_users_by_uid = {u['primaryEmail'].split('@')[0]: u for u in all_google_users}

    # Form a set of all Google-side uids. The all_google_uids set is all users including the
    # suspended ones and the suspended_google_uids set is only the suspended users. Non suspended
    # users are therefore all_google_uids - suspended_google_uids.
    all_google_uids = set(all_google_users_by_uid.keys())
    suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}

    # Sanity check. We should not have lost anyone. (I.e. the uid should be unique.)
    if len(all_google_uids) != len(all_google_users):
        raise RuntimeError('Sanity check failed: user list changed length')

    # Log some stats.
    LOG.info('Total Google users: %s', len(all_google_uids))
    LOG.info(
        'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))

    # For each user which exists in Google or the managed user set which is eligible, determine if
    # they need updating/creating. If so, record a patch/insert for the user.
    LOG.info('Calculating updates...')
    google_user_updates = {}
    google_user_creations = {}
    for uid, managed_user_entry in managed_user_entries_by_uid.items():
        # Heuristically determine the given and family names.
        names = naming.get_names(
            uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
            sn=managed_user_entry.sn)

        # Form expected user resource fields.
        expected_google_user = {
            'name': {
                'givenName': names.given_name,
                'familyName': names.family_name,
            },
        }

        # Find existing Google user (if any).
        existing_google_user = all_google_users_by_uid.get(uid)

        if existing_google_user is not None:
            # See if we need to change the existing user
            # Unless anything needs changing, the patch is empty.
            patch = {}

            # Determine how to patch user's name.
            google_user_name = existing_google_user.get('name', {})
            patch_name = {}
            if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
                patch_name['givenName'] = names.given_name
            if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
                patch_name['familyName'] = names.family_name
            if len(patch_name) > 0:
                patch['name'] = patch_name

            # Only record non-empty patches.
            if len(patch) > 0:
                google_user_updates[uid] = patch
        else:
            # No existing Google user. Record the new resource. Generate a new user password and
            # send Google the hash. It doesn't matter what this password is since we never have the
            # user log in with it. For password-only applications the user can make use of an
            # application-specific password.
            new_user = {
                **{
                    'primaryEmail': f'{uid}@{gapi_domain_config.name}',
                },
                **expected_google_user,
            }
            google_user_creations[uid] = new_user

    # Form a set of all the uids which need patching.
    uids_to_update = set(google_user_updates.keys())
    LOG.info('Number of existing users to update: %s', len(uids_to_update))

    # Form a set of all the uids which need adding.
    uids_to_add = set(google_user_creations.keys())
    LOG.info('Number of users to add: %s', len(uids_to_add))

    # Form a set of all uids which need reactivating. We reactive users who are in the managed user
    # list *and* the suspended user list.
    uids_to_reactivate = suspended_google_uids & managed_user_uids
    LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))

    # Form a set of all uids which should be suspended. This is all the unsuspended Google uids
    # which do not appear in our eligible user list.
    uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
    LOG.info('Number of users to suspend: %s', len(uids_to_suspend))

    # Calculate percentage change.
    user_change_percentage = 100. * (
        len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
        /
        max(1, len(all_google_uids))
    )
    LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)

    # Enforce percentage change sanity check.
    if (limits_config.abort_user_change_percentage is not None and
            user_change_percentage > limits_config.abort_user_change_percentage):
        LOG.error(
            'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
            user_change_percentage, limits_config.abort_user_change_percentage
        )
        raise RuntimeError('Aborting due to large user change percentage')

    # Cap maximum size of various operations.
    if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
        uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
        LOG.info('Capped number of new users to %s', len(uids_to_add))
    if (limits_config.max_suspended_users is not None and
            len(uids_to_suspend) > limits_config.max_suspended_users):
        uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
        LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
    if (limits_config.max_reactivated_users is not None and
            len(uids_to_reactivate) > limits_config.max_reactivated_users):
        uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
        LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
    if (limits_config.max_updated_users is not None and
            len(uids_to_update) > limits_config.max_updated_users):
        uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
        LOG.info('Capped number of users to update to %s', len(uids_to_update))

    # A generator which will generate patch() and insert() calls to the directory service to
    # perform the actions required
    def api_requests():
        # Update existing users.
        user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
        for uid, update in user_updates.items():
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Update user "%s": "%r"', uid, update)
            yield directory_service.users().patch(userKey=google_id, body=update)

        # Suspend old users
        for uid in uids_to_suspend:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Suspending user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': True})

        # Reactivate returning users
        for uid in uids_to_reactivate:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Reactivating user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': False})

        # Create new users
        for uid in uids_to_add:
            # Generate a random password which is thrown away.
            new_user = {**{
                'hashFunction': 'crypt',
                'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
                'orgUnitPath': sync_config.new_user_org_unit_path,
            }, **google_user_creations[uid]}
            redacted_user = {**new_user, **{'password': 'REDACTED'}}
            LOG.info('Adding user "%s": %s', uid, redacted_user)
            yield directory_service.users().insert(body=new_user)

    # Make an chunked iterator of requests to the directory API. The Directory API supports a
    # maximum batch size of 1000. See:
    # https://developers.google.com/admin-sdk/directory/v1/guides/batch
    for request_batch in _grouper(api_requests(), n=sync_config.batch_size):
        # Form batch request.
        batch = directory_service.new_batch_http_request()
        for request in request_batch:
            batch.add(request, callback=_handle_batch_response)

        # Execute the batch request if not in read only mode. Otherwise log that we would have.
        if not read_only:
            LOG.info('Issuing batch request to Google.')
            time.sleep(sync_config.inter_batch_delay)
            retries = sync_config.http_retries
            while True:
                try:
                    batch.execute()
                except errors.HttpError as err:
                    if (err.resp.status == 503 and retries > 0):
                        retries -= 1
                        LOG.warn('503: Service unavailable - retrying')
                        time.sleep(sync_config.http_retry_delay)
                        continue
                    if retries == 0:
                        LOG.error('503: Service unavailable - retry count exceeded')
                    raise
                break
        else:
            LOG.info('Not issuing batch request in read-only mode.')


def _handle_batch_response(request_id, response, exception):
    if exception is not None:
        LOG.error('Error performing request: %s', exception)
        LOG.error('Response: %r', response)


def _limit(s, limit):
    """
    Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
    exact set of elements retained is not specified.

    >>> s = set('ABCDEFGHIJKLMNOPQ')
    >>> len(s) > 5
    True
    >>> len(_limit(s, 5)) == 5
    True
    >>> len(_limit(s, 500)) == len(s)
    True

    All elements of the returned set are taken from input set.

    >>> s_prime = _limit(s, 5)
    >>> s_prime - s
    set()

    """
    return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}


def _grouper(iterable, *, n):
    """
    Group an iterable into chunks of at most *n* elements. A generator which yields iterables
    representing slices of *iterable*.

    >>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
    [['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
    >>> def generator(stop):
    ...     for x in range(stop):
    ...         yield x
    >>> [list(i) for i in _grouper(generator(10), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
    >>> [list(i) for i in _grouper(generator(12), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]

    The implementation of this function attempts to be efficient; the chunks are iterables which
    are generated on demand rather than being constructed first. Hence this function can deal with
    iterables which would fill memory if intermediate chunks were stored.

    >>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
    >>> next(next(i))
    0

    """
    it = iter(iterable)
    while True:
        next_chunk_it = itertools.islice(it, n)
        try:
            first = next(next_chunk_it)
        except StopIteration:
            return
        yield itertools.chain((first,), next_chunk_it)