FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
sync.py 15.3 KiB
Newer Older
Dr Rich Wareham's avatar
Dr Rich Wareham committed
"""
Synchronise Google Directory with a local LDAP directory.

"""
import crypt
import dataclasses
import itertools
import logging
Dr Rich Wareham's avatar
Dr Rich Wareham committed
import re
import secrets
Dr Rich Wareham's avatar
Dr Rich Wareham committed
import typing

from googleapiclient import discovery

from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming

LOG = logging.getLogger(__name__)

# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user.readonly',
]

# Scoped *in addition to READ_ONLY_SCOPES* required to perform a full update.
WRITE_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user',
]


@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
    # A regular expression which is used to match the organization unit path for Google users who
    # should be excluded from the list returned by Google. Those users do not exist for the
    # purposes of the rest of the sync and so if they appear in the list of managed users this
    # script will attempt to re-add them and fail in the process. Use this setting for users who
    # are managed completely outside of this script.
    ignore_google_org_unit_path_regex: typing.Union[str, None] = None

    # Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
    inter_batch_delay: numbers.Real = 5

    # Batch size for Google API calls. Google supports batching requests together into one API
    # call.
    batch_size: int = 50

Dr Rich Wareham's avatar
Dr Rich Wareham committed

def sync(configuration, *, read_only=True):
    """Perform sync given configuration dictionary."""
    if read_only:
        LOG.info('Performing synchronisation in READ ONLY mode.')
    else:
        LOG.info('Performing synchronisation in WRITE mode.')

    # Parse configuration
    sync_config = Configuration.from_dict(configuration.get('sync', {}))
    gapi_auth_config = gapiauth.Configuration.from_dict(
        configuration.get('google_api', {}).get('auth', {}))
    gapi_domain_config = gapidomain.Configuration.from_dict(
        configuration.get('google_domain', {}))
    ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
    limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))

    # Load appropriate Google credentials.
    creds = (
        gapi_auth_config.load_credentials(read_only=read_only)
        .with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
        .with_subject(gapi_domain_config.admin_user)
    )

    # Get a set containing all CRSids. These are all the people who are eligible to be in our
    # GSuite instance. If a user is in GSuite and is *not* present in this list then they are
    # suspended.
    LOG.info('Reading eligible user entries from LDAP')
    eligible_uids = ldap_config.get_eligible_uids()
    LOG.info('Total LDAP entries: %s', len(eligible_uids))

    # Get a list of managed users. These are all the people who match the "managed_user_filter" in
    # the LDAP settings.
    LOG.info('Reading managed user entries from LDAP')
    managed_user_entries = ldap_config.get_managed_user_entries()

    # Form a mapping from uid to managed user.
    managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}

    # Form a set of all *managed user* uids
    managed_user_uids = set(managed_user_entries_by_uid.keys())
    LOG.info('Total managed user entries: %s', len(managed_user_uids))

    # Sanity check: the managed users should be a subset of the eligible ones.
    if len(managed_user_uids - eligible_uids) != 0:
        raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')

    # Build the directory service using Google API discovery.
    directory_service = discovery.build('admin', 'directory_v1', credentials=creds)

    # Retrieve information on all users excluding domain admins.
    LOG.info('Getting information on Google domain users')
    fields = [
        'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
        'name(givenName, familyName)',
    ]
    all_google_users = gapiutil.list_all(
        directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
        query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
    )

    # Strip any "to be ignored" users out of the results.
    if sync_config.ignore_google_org_unit_path_regex is not None:
        LOG.info(
            'Ignoring users whose organization unit path matches %r',
            sync_config.ignore_google_org_unit_path_regex)
        regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
        all_google_users = [
            u for u in all_google_users if not regex.match(u['orgUnitPath'])
        ]

    # Sanity check. There should be no admins in the returned results.
    if any(u.get('isAdmin', False) for u in all_google_users):
        raise RuntimeError('Sanity check failed: admin users in user list')

    # Form a mapping from uid to Google user. We form the uid by splitting out the local-part of
    # the email address.
    all_google_users_by_uid = {u['primaryEmail'].split('@')[0]: u for u in all_google_users}

    # Form a set of all Google-side uids. The all_google_uids set is all users including the
    # suspended ones and the suspended_google_uids set is only the suspended users. Non suspended
    # users are therefore all_google_uids - suspended_google_uids.
    all_google_uids = set(all_google_users_by_uid.keys())
    suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}

    # Sanity check. We should not have lost anyone. (I.e. the uid should be unique.)
    if len(all_google_uids) != len(all_google_users):
        raise RuntimeError('Sanity check failed: user list changed length')

    # Log some stats.
    LOG.info('Total Google users: %s', len(all_google_uids))
    LOG.info(
        'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))

    # For each user which exists in Google or the managed user set which is eligible, determine if
    # they need updating/creating. If so, record a patch/insert for the user.
    LOG.info('Calculating updates...')
    google_user_updates = {}
    google_user_creations = {}
    for uid, managed_user_entry in managed_user_entries_by_uid.items():
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        # Heuristically determine the given and family names.
        names = naming.get_names(
            uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
            sn=managed_user_entry.sn)

        # Form expected user resource fields.
        expected_google_user = {
            'name': {
                'givenName': names.given_name,
                'familyName': names.family_name,
            },
        }

        # Find existing Google user (if any).
        existing_google_user = all_google_users_by_uid.get(uid)

        if existing_google_user is not None:
            # See if we need to change the existing user
            # Unless anything needs changing, the patch is empty.
            patch = {}

            # Determine how to patch user's name.
            google_user_name = existing_google_user.get('name', {})
            patch_name = {}
            if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
                patch_name['givenName'] = names.given_name
            if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
                patch_name['familyName'] = names.family_name
            if len(patch_name) > 0:
                patch['name'] = patch_name

            # Only record non-empty patches.
            if len(patch) > 0:
                google_user_updates[uid] = patch
        else:
            # No existing Google user. Record the new resource. Generate a new user password and
            # send Google the hash. It doesn't matter what this password is since we never have the
            # user log in with it. For password-only applications the user can make use of an
            # application-specific password.
            new_user = {
                **{
                    'primaryEmail': f'{uid}@{gapi_domain_config.name}',
                },
                **expected_google_user,
            }
            google_user_creations[uid] = new_user

    # Form a set of all the uids which need patching.
    uids_to_update = set(google_user_updates.keys())
    LOG.info('Number of existing users to update: %s', len(uids_to_update))

    # Form a set of all the uids which need adding.
    uids_to_add = set(google_user_creations.keys())
    LOG.info('Number of users to add: %s', len(uids_to_add))

    # Form a set of all uids which need reactivating. We reactive users who are in the managed user
    # list *and* the suspended user list.
    uids_to_reactivate = suspended_google_uids & managed_user_uids
    LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))

    # Form a set of all uids which should be suspended. This is all the unsuspended Google uids
    # which do not appear in our eligible user list.
    uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
    LOG.info('Number of users to suspend: %s', len(uids_to_suspend))

    # Calculate percentage change.
    user_change_percentage = 100. * (
        len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
        /
        max(1, len(all_google_uids))
    )
    LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)

    # Enforce percentage change sanity check.
    if (limits_config.abort_user_change_percentage is not None and
            user_change_percentage > limits_config.abort_user_change_percentage):
        LOG.error(
            'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
            user_change_percentage, limits_config.abort_user_change_percentage
        )
        raise RuntimeError('Aborting due to large user change percentage')

    # Cap maximum size of various operations.
    if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
        uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
        LOG.info('Capped number of new users to %s', len(uids_to_add))
    if (limits_config.max_suspended_users is not None and
            len(uids_to_suspend) > limits_config.max_suspended_users):
        uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
        LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
    if (limits_config.max_reactivated_users is not None and
            len(uids_to_reactivate) > limits_config.max_reactivated_users):
        uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
        LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
    if (limits_config.max_updated_users is not None and
            len(uids_to_update) > limits_config.max_updated_users):
        uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
        LOG.info('Capped number of users to update to %s', len(uids_to_update))

    # A generator which will generate patch() and insert() calls to the directory service to
    # perform the actions required
    def api_requests():
        # Update existing users.
        user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
        for uid, update in user_updates.items():
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Update user "%s": "%r"', uid, update)
            yield directory_service.users().patch(userKey=google_id, body=update)

        # Suspend old users
        for uid in uids_to_suspend:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Suspending user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': True})

        # Reactivate returning users
        for uid in uids_to_reactivate:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Reactivating user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': False})

        # Create new users
        for uid in uids_to_add:
            # Generate a random password which is thrown away.
            new_user = {**{
                'hashFunction': 'crypt',
                'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
            }, **google_user_creations[uid]}
            redacted_user = {**new_user, **{'password': 'REDACTED'}}
            LOG.info('Adding user "%s": %s', uid, redacted_user)
            yield directory_service.users().insert(body=new_user)

    # Make an chunked iterator of requests to the directory API. The Directory API supports a
    # maximum batch size of 1000. See:
    # https://developers.google.com/admin-sdk/directory/v1/guides/batch
    for request_batch in _grouper(api_requests(), n=sync_config.batch_size):
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        # Form batch request.
        batch = directory_service.new_batch_http_request()
        for request in request_batch:
            batch.add(request, callback=_handle_batch_response)

        # Execute the batch request if not in read only mode. Otherwise log that we would have.
        if not read_only:
            LOG.info('Issuing batch request to Google.')
            time.sleep(sync_config.inter_batch_delay)
Dr Rich Wareham's avatar
Dr Rich Wareham committed
            batch.execute()
        else:
            LOG.info('Not issuing batch request in read-only mode.')


def _handle_batch_response(request_id, response, exception):
    if exception is not None:
        LOG.error('Error performing request: %s', exception)
        LOG.error('Response: %r', response)


def _limit(s, limit):
    """
    Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
    exact set of elements retained is not specified.

    >>> s = set('ABCDEFGHIJKLMNOPQ')
    >>> len(s) > 5
    True
    >>> len(_limit(s, 5)) == 5
    True
    >>> len(_limit(s, 500)) == len(s)
    True

    All elements of the returned set are taken from input set.

    >>> s_prime = _limit(s, 5)
    >>> s_prime - s
    set()

    """
    return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}


def _grouper(iterable, *, n):
    """
    Group an iterable into chunks of at most *n* elements. A generator which yields iterables
    representing slices of *iterable*.

    >>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
    [['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
    >>> def generator(stop):
    ...     for x in range(stop):
    ...         yield x
    >>> [list(i) for i in _grouper(generator(10), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
    >>> [list(i) for i in _grouper(generator(12), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]

    The implementation of this function attempts to be efficient; the chunks are iterables which
    are generated on demand rather than being constructed first. Hence this function can deal with
    iterables which would fill memory if intermediate chunks were stored.

    >>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
    >>> next(next(i))
    0

    """
    it = iter(iterable)
    while True:
        next_chunk_it = itertools.islice(it, n)
        try:
            first = next(next_chunk_it)
        except StopIteration:
            return
        yield itertools.chain((first,), next_chunk_it)