FAQ | This is a LIVE service | Changelog

Skip to content
Snippets Groups Projects
sync.py 36.8 KiB
Newer Older
Dr Rich Wareham's avatar
Dr Rich Wareham committed
"""
Synchronise Google Directory with a local LDAP directory.

"""
import crypt
import dataclasses
import itertools
import logging
Dr Rich Wareham's avatar
Dr Rich Wareham committed
import re
import secrets
Dr Rich Wareham's avatar
Dr Rich Wareham committed
import typing

Robin Goodall's avatar
Robin Goodall committed
from googleapiclient import discovery, errors
Dr Rich Wareham's avatar
Dr Rich Wareham committed

from . import config
from . import gapiauth
from . import gapidomain
from . import gapiutil
from . import ldap
from . import limits
from . import naming

LOG = logging.getLogger(__name__)

# Scopes required to perform read-only actions.
READ_ONLY_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user.readonly',
    'https://www.googleapis.com/auth/admin.directory.group.readonly',
    'https://www.googleapis.com/auth/admin.directory.group.member.readonly',
    'https://www.googleapis.com/auth/apps.groups.settings'
# Scopes *in addition to READ_ONLY_SCOPES* required to perform a full update.
Dr Rich Wareham's avatar
Dr Rich Wareham committed
WRITE_SCOPES = [
    'https://www.googleapis.com/auth/admin.directory.user',
    'https://www.googleapis.com/auth/admin.directory.group',
    'https://www.googleapis.com/auth/admin.directory.group.member'
Dr Rich Wareham's avatar
Dr Rich Wareham committed
]


@dataclasses.dataclass
class Configuration(config.ConfigurationDataclassMixin):
    # A regular expression which is used to match the organization unit path for Google users who
    # should be excluded from the list returned by Google. Those users do not exist for the
    # purposes of the rest of the sync and so if they appear in the list of managed users this
    # script will attempt to re-add them and fail in the process. Use this setting for users who
    # are managed completely outside of this script.
    ignore_google_org_unit_path_regex: typing.Union[str, None] = None

    # The organization unit path in which new accounts are placed
    new_user_org_unit_path: str = '/'

    # Suffix appended to the names of groups created in Google. The Google group name will be
    # "{groupName}{group_name_suffix}", where {groupName} is the Lookup group name.
    group_name_suffix: str = ' from lookup.cam.ac.uk'

    # Settings to be applied to groups in Google. These settings are applied to both new and
    # existing groups imported from Lookup.
    # See https://developers.google.com/admin-sdk/groups-settings/v1/reference/groups#json
    group_settings: dict = dataclasses.field(default_factory=lambda: {
      'whoCanJoin': 'INVITED_CAN_JOIN',
      'whoCanViewMembership': 'ALL_IN_DOMAIN_CAN_VIEW',
      'whoCanViewGroup': 'ALL_MEMBERS_CAN_VIEW',
      'whoCanPostMessage': 'ALL_IN_DOMAIN_CAN_POST',
      'allowWebPosting': 'false',
      'messageModerationLevel': 'MODERATE_ALL_MESSAGES',
      'includeInGlobalAddressList': 'false',
      'whoCanLeaveGroup': 'NONE_CAN_LEAVE',
      'whoCanContactOwner': 'ALL_MANAGERS_CAN_CONTACT',
      'whoCanModerateMembers': 'OWNERS_ONLY',
      'whoCanDiscoverGroup': 'ALL_IN_DOMAIN_CAN_DISCOVER',
    })

    # Inter-batch delay in seconds. This is useful to avoid hitting Google rate limits.
    inter_batch_delay: numbers.Real = 5

    # Batch size for Google API calls. Google supports batching requests together into one API
    # call.
    batch_size: int = 50

Robin Goodall's avatar
Robin Goodall committed
    # Number of times to retry HTTP requests if a 503 "Service Unavailable" received
    http_retries: int = 2

    # Delay in seconds between HTTP 503 response retries
    http_retry_delay: numbers.Real = 5

Dr Rich Wareham's avatar
Dr Rich Wareham committed

def sync(configuration, *, read_only=True):
    """Perform sync given configuration dictionary."""
    if read_only:
        LOG.info('Performing synchronisation in READ ONLY mode.')
    else:
        LOG.info('Performing synchronisation in WRITE mode.')

    # Parse configuration
    sync_config = Configuration.from_dict(configuration.get('sync', {}))
    gapi_auth_config = gapiauth.Configuration.from_dict(
        configuration.get('google_api', {}).get('auth', {}))
    gapi_domain_config = gapidomain.Configuration.from_dict(
        configuration.get('google_domain', {}))
    ldap_config = ldap.Configuration.from_dict(configuration.get('ldap', {}))
    limits_config = limits.Configuration.from_dict(configuration.get('limits', {}))

    # Load appropriate Google credentials.
    creds = (
        gapi_auth_config.load_credentials(read_only=read_only)
        .with_scopes(READ_ONLY_SCOPES + ([] if read_only else WRITE_SCOPES))
        .with_subject(gapi_domain_config.admin_user)
    )

    # Functions to translate the unique identifiers of users and groups in Lookup (uids and
    # groupIDs) to and from the unique identifiers used in Google (email addresses).
    #
    # For users:   {uid}     <-> {uid}@{domain}
    # For groups:  {groupID} <-> {groupID}@{groups_domain}
    #
    # Additionally, valid uids (CRSids) match the regex [a-z][a-z0-9]{3,7}, and valid groupIDs
    # match the regex [0-9]{6,8}.
    user_email_regex = re.compile('^[a-z][a-z0-9]{3,7}@.*$')
    group_email_regex = re.compile('^[0-9]{6,8}@.*$')

    groups_domain = (
        gapi_domain_config.groups_domain
        if gapi_domain_config.groups_domain is not None
        else gapi_domain_config.name
    )

    def uid_to_email(uid):
        return f'{uid}@{gapi_domain_config.name}'

    def email_to_uid(email):
        return email.split('@')[0] if user_email_regex.match(email) else None

    def groupID_to_email(groupID):
        return f'{groupID}@{groups_domain}'

    def email_to_groupID(email):
        return email.split('@')[0] if group_email_regex.match(email) else None

    # --------------------------------------------------------------------------------------------
    # Load current user and group data from Lookup.
    # --------------------------------------------------------------------------------------------

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Get a set containing all CRSids. These are all the people who are eligible to be in our
    # GSuite instance. If a user is in GSuite and is *not* present in this list then they are
    # suspended.
    LOG.info('Reading eligible user entries from LDAP')
    eligible_uids = ldap_config.get_eligible_uids()
    LOG.info('Total LDAP user entries: %s', len(eligible_uids))

    # Get a set containing all groupIDs. These are all the groups that are eligible to be in our
    # GSuite instance. If a group is in GSuite and is *not* present in this list then it is
    # deleted.
    LOG.info('Reading eligible group entries from LDAP')
    eligible_groupIDs = ldap_config.get_eligible_groupIDs()
    LOG.info('Total LDAP group entries: %s', len(eligible_groupIDs))
Dr Rich Wareham's avatar
Dr Rich Wareham committed

    # Get a list of managed users. These are all the people who match the "managed_user_filter" in
    # the LDAP settings.
    LOG.info('Reading managed user entries from LDAP')
    managed_user_entries = ldap_config.get_managed_user_entries()

    # Form a mapping from uid to managed user.
    managed_user_entries_by_uid = {u.uid: u for u in managed_user_entries}

    # Form a set of all *managed user* uids
    managed_user_uids = set(managed_user_entries_by_uid.keys())
    LOG.info('Total managed user entries: %s', len(managed_user_uids))

    # Sanity check: the managed users should be a subset of the eligible ones.
    if len(managed_user_uids - eligible_uids) != 0:
        raise RuntimeError('Sanity check failed: some managed uids were not in the eligible set')

    # Get a list of managed groups. These are all the groups that match the "managed_group_filter"
    # in the LDAP settings.
    LOG.info('Reading managed group entries from LDAP')
    managed_group_entries = ldap_config.get_managed_group_entries()

    # Form a mapping from groupID to managed group.
    managed_group_entries_by_groupID = {g.groupID: g for g in managed_group_entries}

    # Form a set of all *managed group* groupIDs
    managed_group_groupIDs = set(managed_group_entries_by_groupID.keys())
    LOG.info('Total managed group entries: %s', len(managed_group_groupIDs))
    LOG.info('Total managed group members: %s', sum([len(g.uids) for g in managed_group_entries]))

    # Sanity check: the managed groups should be a subset of the eligible ones.
    if len(managed_group_groupIDs - eligible_groupIDs) != 0:
        raise RuntimeError(
            'Sanity check failed: some managed groupIDs were not in the eligible set'
        )

    # --------------------------------------------------------------------------------------------
    # Load current user and group data from Google.
    # --------------------------------------------------------------------------------------------

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Build the directory service using Google API discovery.
    directory_service = discovery.build('admin', 'directory_v1', credentials=creds)

    # Also build the groupssettings service, which is a parallel API to manage group settings
    groupssettings_service = discovery.build('groupssettings', 'v1', credentials=creds)

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Retrieve information on all users excluding domain admins.
    LOG.info('Getting information on Google domain users')
    fields = [
        'id', 'isAdmin', 'orgUnitPath', 'primaryEmail', 'suspended', 'suspensionReason',
        'name(givenName, familyName)',
    ]
    all_google_users = gapiutil.list_all(
        directory_service.users().list, items_key='users', domain=gapi_domain_config.name,
        query='isAdmin=false', fields='nextPageToken,users(' + ','.join(fields) + ')',
Robin Goodall's avatar
Robin Goodall committed
        retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
    # Retrieve information on all groups
    LOG.info('Getting information on Google domain groups')
    fields = ['id', 'email', 'name', 'description']
    all_google_groups = gapiutil.list_all(
        directory_service.groups().list, items_key='groups', domain=groups_domain,
        fields='nextPageToken,groups(' + ','.join(fields) + ')',
        retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
    )

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Strip any "to be ignored" users out of the results.
    if sync_config.ignore_google_org_unit_path_regex is not None:
        LOG.info(
            'Ignoring users whose organization unit path matches %r',
            sync_config.ignore_google_org_unit_path_regex)
        # Check that all users have an orgUnitPath
        missing_org = [
            u for u in all_google_users if 'orgUnitPath' not in u
        ]
        if len(missing_org) != 0:
            LOG.error('User entries missing orgUnitPath: %s (starting with %s)',
                      len(missing_org),
                      missing_org[0]['primaryEmail'] if 'primaryEmail' in missing_org[0]
                      else 'user with blank email')
            raise RuntimeError('Sanity check failed: at least one user is missing orgUnitPath')
        # Remove users matching regex
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        regex = re.compile(sync_config.ignore_google_org_unit_path_regex)
        all_google_users = [
            u for u in all_google_users if not regex.match(u['orgUnitPath'])
        ]

    # Strip out any users with uids (extracted from the local-part of the email address) that
    # aren't valid CRSids. These users can't have come from Lookup, and so should not be managed
    # (suspended) by this script.
    all_google_users = [u for u in all_google_users if email_to_uid(u['primaryEmail'])]

    # Strip out any groups whose email addresses don't match the pattern for groups created from
    # Lookup groupIDs, and which therefore should not be managed (deleted) by this script.
    all_google_groups = [g for g in all_google_groups if email_to_groupID(g['email'])]

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Sanity check. There should be no admins in the returned results.
    if any(u.get('isAdmin', False) for u in all_google_users):
        raise RuntimeError('Sanity check failed: admin users in user list')

    # Form mappings from uid/groupID to Google user/group.
    all_google_users_by_uid = {email_to_uid(u['primaryEmail']): u for u in all_google_users}
    all_google_groups_by_groupID = {email_to_groupID(g['email']): g for g in all_google_groups}
    # Form sets of all Google-side uids and groupIDs. The all_google_uids set is all users
    # including the suspended ones and the suspended_google_uids set is only the suspended users.
    # Non suspended users are therefore all_google_uids - suspended_google_uids. Groups do not have
    # any concept of being suspended.
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    all_google_uids = set(all_google_users_by_uid.keys())
    all_google_groupIDs = set(all_google_groups_by_groupID.keys())
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    suspended_google_uids = {uid for uid, u in all_google_users_by_uid.items() if u['suspended']}

    # Sanity check. We should not have lost anything. (I.e. the uids and groupIDs should be
    # unique.)
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    if len(all_google_uids) != len(all_google_users):
        raise RuntimeError('Sanity check failed: user list changed length')
    if len(all_google_groupIDs) != len(all_google_groups):
        raise RuntimeError('Sanity check failed: group list changed length')

    # Retrieve all Google group settings.
    fields = ['email', *[k for k in sync_config.group_settings.keys()]]
    all_google_group_settings = gapiutil.get_all_in_list(
        groupssettings_service, groupssettings_service.groups().get,
        item_ids=[g['email'] for g in all_google_groups], id_key='groupUniqueId',
        batch_size=sync_config.batch_size, fields=','.join(fields),
        retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
    )

    # Form a mapping from groupID to Google group settings.
    all_google_group_settings_by_groupID = {
        email_to_groupID(g['email']): g for g in all_google_group_settings
    }

    # Santiy check. We should have settings for each managed group.
    if len(all_google_group_settings_by_groupID) != len(all_google_groups):
        raise RuntimeError('Sanity check failed: group settings list does not match group list')

    # Retrieve all Google group memberships. This is a mapping from internal Google group ids to
    # lists of member resources.
    fields = ['id', 'email']
    all_google_members = gapiutil.list_all_in_list(
        directory_service, directory_service.members().list,
        item_ids=[g['id'] for g in all_google_groups], id_key='groupKey',
        batch_size=sync_config.batch_size, items_key='members',
        fields='nextPageToken,members(' + ','.join(fields) + ')',
        retries=sync_config.http_retries, retry_delay=sync_config.http_retry_delay,
    )

    # Santiy check. We should have a group members list for each managed group.
    if len(all_google_members) != len(all_google_groups):
        raise RuntimeError('Sanity check failed: groups in members map do not match group list')
Dr Rich Wareham's avatar
Dr Rich Wareham committed

    # Log some stats.
    LOG.info('Total Google users: %s', len(all_google_uids))
    LOG.info('Total Google groups: %s', len(all_google_groupIDs))
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    LOG.info(
        'Suspended Google users: %s', sum(1 if u['suspended'] else 0 for u in all_google_users))
    LOG.info(
        'Total Google group members: %s', sum([len(m) for g, m in all_google_members.items()])
    )

    # --------------------------------------------------------------------------------------------
    # Compute differences between the Lookup and Google data.
    # --------------------------------------------------------------------------------------------
Dr Rich Wareham's avatar
Dr Rich Wareham committed

    # For each user which exists in Google or the managed user set which is eligible, determine if
    # they need updating/creating. If so, record a patch/insert for the user.
    LOG.info('Calculating updates...')
    google_user_updates = {}
    google_user_creations = {}
    for uid, managed_user_entry in managed_user_entries_by_uid.items():
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        # Heuristically determine the given and family names.
        names = naming.get_names(
            uid=uid, display_name=managed_user_entry.displayName, cn=managed_user_entry.cn,
            sn=managed_user_entry.sn)

        # Form expected user resource fields.
        expected_google_user = {
            'name': {
                'givenName': names.given_name,
                'familyName': names.family_name,
            },
        }

        # Find existing Google user (if any).
        existing_google_user = all_google_users_by_uid.get(uid)

        if existing_google_user is not None:
            # See if we need to change the existing user
            # Unless anything needs changing, the patch is empty.
            patch = {}

            # Determine how to patch user's name.
            google_user_name = existing_google_user.get('name', {})
            patch_name = {}
            if google_user_name.get('givenName') != expected_google_user['name']['givenName']:
                patch_name['givenName'] = names.given_name
            if google_user_name.get('familyName') != expected_google_user['name']['familyName']:
                patch_name['familyName'] = names.family_name
            if len(patch_name) > 0:
                patch['name'] = patch_name

            # Only record non-empty patches.
            if len(patch) > 0:
                google_user_updates[uid] = patch
        else:
            # No existing Google user. Record the new resource. Generate a new user password and
            # send Google the hash. It doesn't matter what this password is since we never have the
            # user log in with it. For password-only applications the user can make use of an
            # application-specific password.
            new_user = {
                'primaryEmail': uid_to_email(uid),
Dr Rich Wareham's avatar
Dr Rich Wareham committed
                **expected_google_user,
            }
            google_user_creations[uid] = new_user

    # For each group which exists in Google or the managed group set which is eligible, determine
    # if it needs updating/creating. If so, record a patch/insert for the group.
    google_group_updates = {}
    google_group_creations = {}
    for groupID, managed_group_entry in managed_group_entries_by_groupID.items():
        # Form expected group resource fields. The 2 Google APIs we use here to update groups in
        # Google each have different maximum lengths for group names and descriptions, and
        # empirically the APIs don't function properly if either limit is exceeded, so we use the
        # minimum of the 2 documented maximum field lengths (73 characters for names and 300
        # characters for descriptions).
        expected_google_group = {
            'name': _trim_text(
                managed_group_entry.groupName, maxlen=73, suffix=sync_config.group_name_suffix
            ),
            'description': _trim_text(managed_group_entry.description, maxlen=300)
        }

        # Find existing Google group (if any).
        existing_google_group = all_google_groups_by_groupID.get(groupID)

        if existing_google_group is not None:
            # See if we need to change the existing group
            # Unless anything needs changing, the patch is empty.
            patch = {}

            if existing_google_group.get('name') != expected_google_group['name']:
                patch['name'] = expected_google_group['name']
            if existing_google_group.get('description') != expected_google_group['description']:
                patch['description'] = expected_google_group['description']

            # Only record non-empty patches.
            if len(patch) > 0:
                google_group_updates[groupID] = patch
        else:
            # No existing Google group, so create one.
            google_group_creations[groupID] = {
                'email': groupID_to_email(groupID),
                **expected_google_group
            }

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Form a set of all the uids which need patching.
    uids_to_update = set(google_user_updates.keys())
    LOG.info('Number of existing users to update: %s', len(uids_to_update))

    # Form a set of all the groupIDs which need patching.
    groupIDs_to_update = set(google_group_updates.keys())
    LOG.info('Number of existing groups to update: %s', len(groupIDs_to_update))

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Form a set of all the uids which need adding.
    uids_to_add = set(google_user_creations.keys())
    LOG.info('Number of users to add: %s', len(uids_to_add))

    # Form a set of all the groupIDs which need adding.
    groupIDs_to_add = set(google_group_creations.keys())
    LOG.info('Number of groups to add: %s', len(groupIDs_to_add))

Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # Form a set of all uids which need reactivating. We reactive users who are in the managed user
    # list *and* the suspended user list.
    uids_to_reactivate = suspended_google_uids & managed_user_uids
    LOG.info('Number of users to reactivate: %s', len(uids_to_reactivate))

    # Form a set of all uids which should be suspended. This is all the unsuspended Google uids
    # which do not appear in our eligible user list.
    uids_to_suspend = (all_google_uids - suspended_google_uids) - eligible_uids
    LOG.info('Number of users to suspend: %s', len(uids_to_suspend))

    # Form a set of all groupIDs which need deleting.
    groupIDs_to_delete = all_google_groupIDs - eligible_groupIDs
    LOG.info('Number of groups to delete: %s', len(groupIDs_to_delete))

    # For each managed group, determine which members to insert or delete. These are lists of
    # (groupID, uid) tuples.
    members_to_insert = []
    members_to_delete = []
    for groupID, managed_group_entry in managed_group_entries_by_groupID.items():
        # Find the existing Google group members.
        existing_google_group = all_google_groups_by_groupID.get(groupID)
        if existing_google_group:
            existing_members = all_google_members[existing_google_group['id']]
            existing_member_uids = set([email_to_uid(m['email']) for m in existing_members])
        else:
            existing_member_uids = set()

        # Members to insert. This is restricted to the managed user set, so that we don't attempt
        # to insert a member resource for a non-existent user.
        insert_uids = (
            (managed_group_entry.uids - existing_member_uids).intersection(managed_user_uids)
        )
        members_to_insert.extend([(groupID, uid) for uid in insert_uids])

        # Members to delete. This is restricted to the eligible user set, so that we don't bother
        # to delete a member resource when the user is suspended (and so we won't need to re-add
        # it if the user is reactivated).
        delete_uids = (
            (existing_member_uids - managed_group_entry.uids).intersection(eligible_uids)
        )
        members_to_delete.extend([(groupID, uid) for uid in delete_uids])

    LOG.info('Number of group members to insert: %s', len(members_to_insert))
    LOG.info('Number of group members to delete: %s', len(members_to_delete))

    # --------------------------------------------------------------------------------------------
    # Enforce limits on how much data to change in Google.
    # --------------------------------------------------------------------------------------------

    # Calculate percentage change to users, groups and group members.
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    user_change_percentage = 100. * (
        len(uids_to_add | uids_to_update | uids_to_reactivate | uids_to_suspend)
        /
        max(1, len(all_google_uids))
    )
    LOG.info('Configuration will modify %.2f%% of users', user_change_percentage)

    group_change_percentage = 100. * (
        len(groupIDs_to_add | groupIDs_to_update | groupIDs_to_delete)
        /
        max(1, len(all_google_groupIDs))
    )
    LOG.info('Configuration will modify %.2f%% of groups', group_change_percentage)

    member_change_percentage = 100. * (
        (len(members_to_insert) + len(members_to_delete))
        /
        max(1, sum([len(m) for g, m in all_google_members.items()]))
    )
    LOG.info('Configuration will modify %.2f%% of group members', member_change_percentage)

    # Enforce percentage change sanity checks.
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    if (limits_config.abort_user_change_percentage is not None and
            user_change_percentage > limits_config.abort_user_change_percentage):
        LOG.error(
            'Modification of %.2f%% of users is greater than limit of %.2f%%. Aborting.',
            user_change_percentage, limits_config.abort_user_change_percentage
        )
        raise RuntimeError('Aborting due to large user change percentage')
    if (limits_config.abort_group_change_percentage is not None and
            group_change_percentage > limits_config.abort_group_change_percentage):
        LOG.error(
            'Modification of %.2f%% of groups is greater than limit of %.2f%%. Aborting.',
            group_change_percentage, limits_config.abort_group_change_percentage
        )
        raise RuntimeError('Aborting due to large group change percentage')
    if (limits_config.abort_member_change_percentage is not None and
            member_change_percentage > limits_config.abort_member_change_percentage):
        LOG.error(
            'Modification of %.2f%% of group members is greater than limit of %.2f%%. Aborting.',
            member_change_percentage, limits_config.abort_member_change_percentage
        )
        raise RuntimeError('Aborting due to large group member change percentage')
Dr Rich Wareham's avatar
Dr Rich Wareham committed

    # Cap maximum size of various operations.
    if limits_config.max_new_users is not None and len(uids_to_add) > limits_config.max_new_users:
        # Ensure that we do not attempt to insert a group member for any of the users not added as
        # a result of this cap, since these users won't exist in Google
        capped_uids_to_add = _limit(uids_to_add, limits_config.max_new_users)
        uids_not_added = uids_to_add - capped_uids_to_add
        members_to_insert = [(g, u) for g, u in members_to_insert if u not in uids_not_added]
        uids_to_add = capped_uids_to_add
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        LOG.info('Capped number of new users to %s', len(uids_to_add))
    if (limits_config.max_new_groups is not None and
            len(groupIDs_to_add) > limits_config.max_new_groups):
        # Ensure that we do not attempt to insert a group member for any of the groups not added
        # as a result of this cap, since these groups won't exist in Google
        capped_groupIDs_to_add = _limit(groupIDs_to_add, limits_config.max_new_groups)
        groupIDs_not_added = groupIDs_to_add - capped_groupIDs_to_add
        members_to_insert = [(g, u) for g, u in members_to_insert if g not in groupIDs_not_added]
        groupIDs_to_add = capped_groupIDs_to_add
        LOG.info('Capped number of new groups to %s', len(groupIDs_to_add))
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    if (limits_config.max_suspended_users is not None and
            len(uids_to_suspend) > limits_config.max_suspended_users):
        uids_to_suspend = _limit(uids_to_suspend, limits_config.max_suspended_users)
        LOG.info('Capped number of users to suspend to %s', len(uids_to_suspend))
    if (limits_config.max_deleted_groups is not None and
            len(groupIDs_to_delete) > limits_config.max_deleted_groups):
        groupIDs_to_delete = _limit(groupIDs_to_delete, limits_config.max_deleted_groups)
        LOG.info('Capped number of groups to delete to %s', len(groupIDs_to_delete))
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    if (limits_config.max_reactivated_users is not None and
            len(uids_to_reactivate) > limits_config.max_reactivated_users):
        uids_to_reactivate = _limit(uids_to_reactivate, limits_config.max_reactivated_users)
        LOG.info('Capped number of users to reactivate to %s', len(uids_to_reactivate))
    if (limits_config.max_updated_users is not None and
            len(uids_to_update) > limits_config.max_updated_users):
        uids_to_update = _limit(uids_to_update, limits_config.max_updated_users)
        LOG.info('Capped number of users to update to %s', len(uids_to_update))
    if (limits_config.max_updated_groups is not None and
            len(groupIDs_to_update) > limits_config.max_updated_groups):
        groupIDs_to_update = _limit(groupIDs_to_update, limits_config.max_updated_groups)
        LOG.info('Capped number of groups to update to %s', len(groupIDs_to_update))
    if (limits_config.max_inserted_members is not None and
            len(members_to_insert) > limits_config.max_inserted_members):
        members_to_insert = members_to_insert[0:limits_config.max_inserted_members]
        LOG.info('Capped number of group members to insert to %s', len(members_to_insert))
    if (limits_config.max_deleted_members is not None and
            len(members_to_delete) > limits_config.max_deleted_members):
        members_to_delete = members_to_delete[0:limits_config.max_deleted_members]
        LOG.info('Capped number of group members to delete to %s', len(members_to_delete))

    # --------------------------------------------------------------------------------------------
    # Finally, perform the actual updates in Google.
    # --------------------------------------------------------------------------------------------
Dr Rich Wareham's avatar
Dr Rich Wareham committed

    # A generator which will generate patch() and insert() calls to the directory service to
    # perform the actions required to update users
    def user_api_requests():
Dr Rich Wareham's avatar
Dr Rich Wareham committed
        # Update existing users.
        user_updates = {uid: google_user_updates[uid] for uid in uids_to_update}
        for uid, update in user_updates.items():
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Update user "%s": "%r"', uid, update)
            yield directory_service.users().patch(userKey=google_id, body=update)

        # Suspend old users
        for uid in uids_to_suspend:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Suspending user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': True})

        # Reactivate returning users
        for uid in uids_to_reactivate:
            google_id = all_google_users_by_uid[uid]['id']
            LOG.info('Reactivating user: "%s"', uid)
            yield directory_service.users().patch(userKey=google_id, body={'suspended': False})

        # Create new users
        for uid in uids_to_add:
            # Generate a random password which is thrown away.
            new_user = {**{
                'hashFunction': 'crypt',
                'password': crypt.crypt(secrets.token_urlsafe(), crypt.METHOD_SHA512),
                'orgUnitPath': sync_config.new_user_org_unit_path,
Dr Rich Wareham's avatar
Dr Rich Wareham committed
            }, **google_user_creations[uid]}
            redacted_user = {**new_user, **{'password': 'REDACTED'}}
            LOG.info('Adding user "%s": %s', uid, redacted_user)
            yield directory_service.users().insert(body=new_user)

    # A generator which will generate patch(), insert() and delete() calls to the directory
    # service to perform the actions required to update groups
    def group_api_requests():
        # Update existing groups
        group_updates = {groupID: google_group_updates[groupID] for groupID in groupIDs_to_update}
        for groupID, update in group_updates.items():
            google_id = all_google_groups_by_groupID[groupID]['id']
            LOG.info('Update group "%s": "%r"', groupID, update)
            yield directory_service.groups().patch(groupKey=google_id, body=update)

        # Delete cancelled groups
        for groupID in groupIDs_to_delete:
            google_id = all_google_groups_by_groupID[groupID]['id']
            LOG.info('Deleting group: "%s"', groupID)
            yield directory_service.groups().delete(groupKey=google_id)

        # Create new groups
        for groupID in groupIDs_to_add:
            new_group = google_group_creations[groupID]
            LOG.info('Adding group "%s": %s', groupID, new_group)
            yield directory_service.groups().insert(body=new_group)

    # A generator which will generate patch() calls to the groupssettings service to set or
    # update the required group settings.
    def group_settings_api_requests():
        # Apply all settings to new groups.
        for groupID in groupIDs_to_add:
            email = groupID_to_email(groupID)
            settings = sync_config.group_settings
            LOG.info('Updating settings for new group "%s": %s', groupID, settings)
            yield groupssettings_service.groups().patch(groupUniqueId=email, body=settings)

        # Apply any settings that differ to pre-existing groups.
        for groupID, settings in all_google_group_settings_by_groupID.items():
            patch = {k: v for k, v in sync_config.group_settings.items() if settings.get(k) != v}
            if patch:
                email = groupID_to_email(groupID)
                LOG.info('Updating settings for existing group "%s": %s', groupID, patch)
                yield groupssettings_service.groups().patch(groupUniqueId=email, body=patch)

    # A generator which will generate insert() and delete() calls to the directory service to
    # perform the actions required to update group members
    def member_api_requests():
        # Insert new members
        for groupID, uid in members_to_insert:
            group_key = groupID_to_email(groupID)
            user_key = uid_to_email(uid)
            LOG.info('Adding user "%s" to group "%s"', user_key, group_key)
            yield directory_service.members().insert(groupKey=group_key, body={'email': user_key})

        # Delete removed members
        for groupID, uid in members_to_delete:
            group_key = groupID_to_email(groupID)
            user_key = uid_to_email(uid)
            LOG.info('Removing user "%s" from group "%s"', user_key, group_key)
            yield directory_service.members().delete(groupKey=group_key, memberKey=user_key)

    # Process an iterable list of requests to the specified Google service in batches. These APIs
    # support a maximum batch size of 1000. See:
Dr Rich Wareham's avatar
Dr Rich Wareham committed
    # https://developers.google.com/admin-sdk/directory/v1/guides/batch
    def process_requests(service, requests):
        for request_batch in _grouper(requests, n=sync_config.batch_size):
            # Form batch request.
            batch = service.new_batch_http_request()
            for request in request_batch:
                batch.add(request, callback=_handle_batch_response)

            # Execute the batch request if not in read only mode. Otherwise log that we would
            # have.
            if not read_only:
                LOG.info('Issuing batch request to Google.')
                time.sleep(sync_config.inter_batch_delay)
                retries = sync_config.http_retries
                while True:
                    try:
                        batch.execute()
                    except errors.HttpError as err:
                        if (err.resp.status == 503 and retries > 0):
                            retries -= 1
                            LOG.warn('503: Service unavailable - retrying')
                            time.sleep(sync_config.http_retry_delay)
                            continue
                        if retries == 0:
                            LOG.error('503: Service unavailable - retry count exceeded')
                        raise
                    break
            else:
                LOG.info('Not issuing batch request in read-only mode.')

    # Process all the user, group and group member updates
    process_requests(directory_service, user_api_requests())
    process_requests(directory_service, group_api_requests())
    process_requests(groupssettings_service, group_settings_api_requests())
    process_requests(directory_service, member_api_requests())
Dr Rich Wareham's avatar
Dr Rich Wareham committed


def _handle_batch_response(request_id, response, exception):
    if exception is not None:
        LOG.error('Error performing request: %s', exception)
        LOG.error('Response: %r', response)


def _limit(s, limit):
    """
    Given a set, s, and a numeric limit, return a set which has no more than *limit* elements. The
    exact set of elements retained is not specified.

    >>> s = set('ABCDEFGHIJKLMNOPQ')
    >>> len(s) > 5
    True
    >>> len(_limit(s, 5)) == 5
    True
    >>> len(_limit(s, 500)) == len(s)
    True

    All elements of the returned set are taken from input set.

    >>> s_prime = _limit(s, 5)
    >>> s_prime - s
    set()

    """
    return {e for _, e in itertools.takewhile(lambda p: p[0] < limit, enumerate(s))}


def _grouper(iterable, *, n):
    """
    Group an iterable into chunks of at most *n* elements. A generator which yields iterables
    representing slices of *iterable*.

    >>> [list(i) for i in _grouper('ABCDEFGH', n=3)]
    [['A', 'B', 'C'], ['D', 'E', 'F'], ['G', 'H']]
    >>> def generator(stop):
    ...     for x in range(stop):
    ...         yield x
    >>> [list(i) for i in _grouper(generator(10), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
    >>> [list(i) for i in _grouper(generator(12), n=3)]
    [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10, 11]]

    The implementation of this function attempts to be efficient; the chunks are iterables which
    are generated on demand rather than being constructed first. Hence this function can deal with
    iterables which would fill memory if intermediate chunks were stored.

    >>> i = _grouper(generator(100000000000000000000), n=1000000000000000)
    >>> next(next(i))
    0

    """
    it = iter(iterable)
    while True:
        next_chunk_it = itertools.islice(it, n)
        try:
            first = next(next_chunk_it)
        except StopIteration:
            return
        yield itertools.chain((first,), next_chunk_it)


def _trim_text(text, *, maxlen, cont='...', suffix=''):
    """
    Trim text to be no more than "maxlen" characters long, terminating it with "cont" if it had
    to be truncated. If supplied, "suffix" is appended to the string after truncating, and the
    truncation point adjusted so that the total length remains less than "maxlen".

    """
    return (
        text[0:maxlen-len(cont)-len(suffix)]+cont+suffix
        if len(text)+len(suffix) > maxlen else text+suffix
    )