Source code for oktalib.entities.entities

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:
# Copyright 2018 Costas Tyfoxylos
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#  pylint: disable=too-many-lines
Main code for entities.

.. _Google Python Style Guide:


import json
import logging

from cachetools import cached, TTLCache

from oktalib.oktalibexceptions import (InvalidApplication,
from .core import Entity

__author__ = '''Costas Tyfoxylos <>'''
__docformat__ = '''google'''
__date__ = '''2018-01-08'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''entities'''
LOGGER = logging.getLogger(LOGGER_BASENAME)

[docs] class Group(Entity): """Models the group object of okta.""" @property def url(self): """The url of the group. Returns: string: The url of the group """ return f'{self._okta.api}/groups/{}' @property def type(self): """The type of the group. Returns: string: The name of the type of the group """ return self._data.get('type') @property def profile(self): """The profile of the group. Returns: dict: The profile of the group """ return self._data.get('profile') @property def name(self): """The name of the group. Returns: string: The name of the group """ return self._data.get('profile', {}).get('name') @name.setter def name(self, value): url = f'{self._okta.api}/groups/{}' payload = {'profile': {'name': value, 'description': self.description}} response = self._okta.session.put(url, data=json.dumps(payload)) if not response.ok: self._logger.error(f'Setting name failed. Response: {response.text}') else: self._update() @property def description(self): """The description of the group. Returns: string: The description of the group """ return self._data.get('profile', {}).get('description') @description.setter def description(self, value): url = f'{self._okta.api}/groups/{}' payload = {'profile': {'name':, 'description': value}} response = self._okta.session.put(url, data=json.dumps(payload)) if not response.ok: self._logger.error(f'Setting description failed. Response: {response.text}') else: self._update() @property def last_membership_updated_at(self): """The date and time of the group's last membership update. Returns: datetime: The datetime object of when the group's memberships were last updated """ return self._get_date_from_key('lastMembershipUpdated') @property def object_classes(self): """The classes of the group. Returns: tuple: The tuple of the classes of the group """ return tuple(self._data.get('objectClass')) @property def users(self): """The users of the group. Returns: generator: A generator of User objects for the users of the group """ url = self._data.get('_links', {}).get('users', {}).get('href') for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield User(self._okta, data) @property def applications(self): """The applications of the group. Returns: generator: A generator of Application objects for the applications of the group """ url = self._data.get('_links', {}).get('apps', {}).get('href') for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield Application(self._okta, data)
[docs] def delete(self): """Deletes the group from okta. Returns: bool: True on success, False otherwise """ url = f'{self._okta.api}/groups/{}' response = self._okta.session.delete(url) return response.ok
[docs] def add_to_application_with_label(self, application_label): """Adds the group to an application. Args: application_label: The label of the application to add the group to Returns: True on success, False otherwise """ application = self._okta.get_application_by_label(application_label) if not application: raise InvalidApplication(application_label) return application.add_group_by_id(
[docs] def remove_from_application_with_label(self, application_label): """Removes the group from an application. Args: application_label: The label of the application to remove the group from Returns: True on success, False otherwise """ application = self._okta.get_application_by_label(application_label) if not application: raise InvalidApplication(application_label) return application.remove_group_by_id(
[docs] def add_user_by_login(self, login): """Adds a user to the group. Args: login: The login of the user to add Returns: True on success, False otherwise """ user = next((user for user in self._okta.users if user.login.lower() == login.lower()), None) if not user: raise InvalidUser(login) url = f'{self._okta.api}/groups/{}/users/{}' response = self._okta.session.put(url) if not response.ok: self._logger.error(f'Adding user failed. Response: {response.text}') return response.ok
[docs] def remove_user_by_login(self, login): """Removes a user from the group. Args: login: The login of the user to remove Returns: True on success, False otherwise """ user = next((user for user in self._okta.users if user.login == login), None) if not user: raise InvalidUser(login) url = f'{self._okta.api}/groups/{}/users/{}' response = self._okta.session.delete(url) if not response.ok: self._logger.error(f'Removing user failed. Response: {response.text}') return response.ok
[docs] def add_user_by_id(self, id_): """Adds a user to the group. Args: id_: The id of the user to add Returns: True on success, False otherwise """ url = f'{self._okta.api}/groups/{}/users/{id_}' response = self._okta.session.put(url) if not response.ok: self._logger.error(f'Adding user failed. Response: {response.text}') return response.ok
[docs] def remove_user_by_id(self, id_): """Remove a user from the group. Args: id_: The id of the user to remove Returns: True on success, False otherwise """ url = f'{self._okta.api}/groups/{}/users/{id_}' response = self._okta.session.delete(url) if not response.ok: self._logger.error(f'Removing user failed. Response: {response.text}') return response.ok
[docs] class GroupAssignment(Group): """Models the group assignment object of okta for apps.""" def __init__(self, okta_instance, data): self._okta = okta_instance self._group_assignment_data = data group_data = self._get_group_data() Group.__init__(self, okta_instance, group_data) @property def priority(self): """The priority of the group assignment. Returns: int: The priority of the group. """ return self._group_assignment_data.get('priority') def _get_group_data(self): """The group data of the inherited group that the group assignment refers to. Returns: group_data (dict): The group data of the parent group that the group assignment refers to. """ url = self._group_assignment_data.get('_links', {}).get('group', {}).get('href') response = self._okta.session.get(url) if not response.ok: self._logger.error(response.text) return response.json() @property @cached(cache=TTLCache(maxsize=100, ttl=60)) def profile_role(self): """Profile role.""" return self._group_assignment_data.get('profile', {}).get('role') @property @cached(cache=TTLCache(maxsize=100, ttl=60)) def profile_saml_roles(self): """Profile saml roles.""" return self._group_assignment_data.get('profile', {}).get('samlRoles', [])
[docs] class AdminRole(Entity): """Models the admin role object of okta.""" @property def id(self): """The id of the role. Returns: string: The id of the role """ return self._data.get('id') @property def label(self): """The label of the role. Returns: string: The label of the role """ return self._data.get('label') @property def type(self): """The type of the role. Returns: string: The name of the type of the role """ return self._data.get('type') @property def status(self): """The status of the role. Returns: string: The status of the role """ return self._data.get('status') @property def created(self): """The date and time when the role was created. Returns: datetime: The datetime object of when the role was created """ return self._get_date_from_key('created') @property def last_updated(self): """The date and time of the role when it was last updated. Returns: datetime: The datetime object of when the role was last updated """ return self._get_date_from_key('lastUpdated') @property def assignment_type(self): """The assignment type of the role. Returns: string: The assignment type the role """ return self._data.get('assignmentType')
[docs] class User(Entity): """Models the user object of okta.""" @property def url(self): """The url of the user. Returns: string: The url of the user """ return self._data.get('_links', {}).get('self', {}).get('href') @property def status(self): """The status of the user. Returns: string: The status of the user """ return self._data.get('status') @property def activated_at(self): """The date and time of the users's activation. Returns: datetime: The datetime object of when the user was activated """ return self._get_date_from_key('activated') @property def status_changed_at(self): """The date and time of the users's status change. Returns: datetime: The datetime object of when the user had last changed status """ return self._get_date_from_key('statusChanged') @property def last_login_at(self): """The date and time of the users's last login. Returns: datetime: The datetime object of when the user last logged in """ return self._get_date_from_key('lastLogin') @property def password_changed_at(self): """The date and time of the users's last password change. Returns: datetime: The datetime object of when the user last changed password """ return self._get_date_from_key('passwordChanged') @property def first_name(self): """The first name of the user. Returns: string: The first name of the user """ return self._data.get('profile', {}).get('firstName') @first_name.setter def first_name(self, value): """First name setter.""" self._update_profile_attribute({'firstName': value}) @property def last_name(self): """The last name of the user. Returns: string: The last name of the user """ return self._data.get('profile', {}).get('lastName') @last_name.setter def last_name(self, value): """Last name setter.""" self._update_profile_attribute({'lastName': value}) @property def manager(self): """The manager of the user. Returns: string: The manager of the user """ return self._data.get('profile', {}).get('manager') @manager.setter def manager(self, value): """Manager setter.""" self._update_profile_attribute({'manager': value}) @property def display_name(self): """The display name of the user. Returns: string: The display name of the user """ return self._data.get('profile', {}).get('displayName') @display_name.setter def display_name(self, value): """Display name setter.""" self._update_profile_attribute({'displayName': value}) @property def title(self): """The title of the user. Returns: string: The title of the user """ return self._data.get('profile', {}).get('title') @title.setter def title(self, value): """Title setter.""" self._update_profile_attribute({'title': value}) @property def locale(self): """The locale of the user. Returns: string: The locale of the user """ return self._data.get('profile', {}).get('locale') @locale.setter def locale(self, value): """Locale setter.""" self._update_profile_attribute({'locale': value}) @property def employee_number(self): """The employee number of the user. Returns: string: The employee number of the user """ return self._data.get('profile', {}).get('employeeNumber') @employee_number.setter def employee_number(self, value): """Employee number setter.""" self._update_profile_attribute({'employeeNumber': value}) @property def zip_code(self): """The zip code of the user. Returns: string: The zip code of the user """ return self._data.get('profile', {}).get('zipCode') @zip_code.setter def zip_code(self, value): """Zip number setter.""" self._update_profile_attribute({'zipCode': value}) @property def city(self): """The city of the user. Returns: string: The city of the user """ return self._data.get('profile', {}).get('city') @city.setter def city(self, value): """City setter.""" self._update_profile_attribute({'city': value}) @property def street_address(self): """The street address of the user. Returns: string: The street address of the user """ return self._data.get('profile', {}).get('streetAddress') @street_address.setter def street_address(self, value): """Street address setter.""" self._update_profile_attribute({'streetAddress': value}) @property def contry_code(self): """The contry code of the user. Returns: string: The country code of the user """ return self._data.get('profile', {}).get('countryCode') @contry_code.setter def contry_code(self, value): """Country code setter.""" self._update_profile_attribute({'countryCode': value}) @property def organization(self): """The organization of the user. Returns: string: The organization of the user """ return self._data.get('profile', {}).get('organization') @organization.setter def organization(self, value): """Organization setter.""" self._update_profile_attribute({'organization': value}) @property def department(self): """The department of the user. Returns: string: The department of the user """ return self._data.get('profile', {}).get('department') @department.setter def department(self, value): """Department setter.""" self._update_profile_attribute({'department': value}) @property def primary_phone(self): """The primary phone of the user. Returns: string: The primary phone of the user """ return self._data.get('profile', {}).get('primaryPhone') @primary_phone.setter def primary_phone(self, value): """Primary phone setter.""" self._update_profile_attribute({'primaryPhone': value}) @property def mobile_phone(self): """The mobile phone of the user. Returns: string: The mobile phone of the user """ return self._data.get('profile', {}).get('mobilePhone') @mobile_phone.setter def mobile_phone(self, value): """Mobile phone setter.""" self._update_profile_attribute({'mobilePhone': value}) @property def email(self): """The email of the user. Returns: string: The email of the user """ return self._data.get('profile', {}).get('email') @email.setter def email(self, value): """Email setter.""" self._update_profile_attribute({'email': value}) @property def second_email(self): """The second email of the user. Returns: string: The second email of the user """ return self._data.get('profile', {}).get('secondEmail') @second_email.setter def second_email(self, value): """Second email setter.""" self._update_profile_attribute({'secondEmail': value}) @property def login(self): """The login of the user. Returns: string: The login of the user """ return self._data.get('profile', {}).get('login') @login.setter def login(self, value): """Login setter.""" self._update_profile_attribute({'login': value}) def _update_profile_attribute(self, attribute): if not self.update_profile({'profile': attribute}): raise UnableToUpdate(f'Failed to update with payload {attribute}') self._update() @property def credentials(self): """The credentials of the user. Returns: dictionary: The credentials of the user """ return self._data.get('credentials') @property def roles(self): """Lists the admin roles the user has. Returns: generator: A generator of roles objects for which the user is member of """ url = f'{self._okta.api}/users/{}/roles' for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield AdminRole(self._okta, data) @property def groups(self): """Lists the groups the user is a member of. Returns: generator: A generator of Group objects for which the user is member of """ url = f'{self._okta.api}/users/{}/groups' for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield Group(self._okta, data)
[docs] def delete(self): """Deletes the user from okta. Returns: bool: True on success, False otherwise """ # The first request deactivates the user, the second one deletes response = self._okta.session.delete(self.url) if not response.ok: self._logger.error(response.text) else: self._okta.session.delete(self.url) if not response.ok: self._logger.error(response.text) return response.ok
def _post_lifecycle(self, url, message): response = if not response.ok: self._logger.error(f'{message}\nResponse: {response.text}') else: self._update() return response.ok
[docs] def activate(self): """Activate the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/activate?sendEmail=false' return self._post_lifecycle(url, 'Activating user failed')
[docs] def deactivate(self): """Deactivate the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/deactivate' return self._post_lifecycle(url, 'Deactivating user failed')
[docs] def unlock(self): """Unlocks the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/unlock' return self._post_lifecycle(url, 'Unlocking user failed')
[docs] def expire_password(self): """Expires the user's password. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/expire_password' return self._post_lifecycle(url, "Expiring user's password failed")
[docs] def reset_password(self): """Resets the user's password. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/reset_password??sendEmail=false' return self._post_lifecycle(url, "Resetting user's password failed")
[docs] def set_temporary_password(self): """Sets a temporary password for the user. Returns: string: Password on success, None otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/expire_password?tempPassword=true' response = if not response.ok: error = f'Setting a temporary password failed\nResponse: {response.text}' self._logger.error(error) else: self._update() return response.json().get('tempPassword', None)
[docs] def suspend(self): """Suspends the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/suspend' return self._post_lifecycle(url, "Suspending user failed")
[docs] def unsuspend(self): """Unsuspends the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/lifecycle/unsuspend' return self._post_lifecycle(url, "Un-suspending user failed")
[docs] def update_password(self, old_password, new_password): """Changes the user's password. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/credentials/change_password' payload = {'oldPassword': {'value': old_password}, 'newPassword': {'value': new_password}} response =, data=json.dumps(payload)) if not response.ok: self._logger.error(response.text) return response.ok
[docs] def set_password(self, password): """Set a password for the user. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}' payload = {'credentials': {'password': {'value': password}}} response = self._okta.session.put(url, data=json.dumps(payload)) if not response.ok: self._logger.error(response.text) return response.ok
[docs] def update_profile(self, new_profile): """Update a user's profile in okta. Args: new_profile: A object with attributes to change (example: {'profile': {'firstName': 'Test'}}) Returns: Bool: True or False depending on success """ url = f'{self._okta.api}/users/{}' response =, data=json.dumps(new_profile)) if not response.ok: self._logger.error(response.text) return response.ok
[docs] def update_security_question(self, password, question, answer): """Changes the user's security question and answer. Returns: True on success, False otherwise """ url = f'{self._okta.api}/users/{}/credentials/change_recovery_question' payload = {"password": {"value": password}, "recovery_question": {"question": question, "answer": answer}} response =, data=json.dumps(payload)) if not response.ok: self._logger.error(response.text) return response.ok
[docs] class UserAssignment(User): """Models the user assignment object of okta for apps.""" def __init__(self, okta_instance, data): self._okta = okta_instance self._user_assignment_data = data user_data = self._get_user_data() User.__init__(self, okta_instance, user_data) def _get_user_data(self): """The parent user data that the user assignment refers to. Returns: user_data (dict): The parent user data that the user assignment refers to. """ url = self._user_assignment_data.get('_links', {}).get('user', {}).get('href') response = self._okta.session.get(url) if not response.ok: self._logger.error(response.text) return response.json() @property def group(self): """The group that the user assignment refers to. Returns: group (Group): The group that the user assignment refers to. """ url = self._user_assignment_data.get('_links', {}).get('group', {}).get('href') response = self._okta.session.get(url) if not response.ok: self._logger.error(response.text) return Group(self._okta, response.json()) @property def email(self): """The email of the user. Returns: email (str): The email of the user. """ return self._user_assignment_data.get('profile', {}).get('email') @property @cached(cache=TTLCache(maxsize=100, ttl=60)) def profile_role(self): """Profile role.""" return self._user_assignment_data.get('profile', {}).get('role') @property @cached(cache=TTLCache(maxsize=100, ttl=60)) def profile_saml_roles(self): """Profile saml roles.""" return self._user_assignment_data.get('profile', {}).get('samlRoles', [])
[docs] class Application(Entity): """Models the apps in okta.""" @property def url(self): """The url of the application. Returns: string: The url of the application """ return f'{self._okta.api}/apps/{}' @property def name(self): """The name of the application. Returns: basestring: The name of the application """ return self._data.get('name') @property def label(self): """The label of the application. Returns: basestring: The label of the application """ return self._data.get('label') @property def status(self): """The status of the application. Returns: basestring: The status of the application """ return self._data.get('status') @property def accessibility(self): """The accessibility of the application. Returns: dictionary: The accessibility of the application """ return self._data.get('accessibility') @property def visibility(self): """The visibility of the application. Returns: dictionary: The visibility of the application """ return self._data.get('visibility') @property def features(self): """The features of the application. Returns: dictionary: The features of the application """ return self._data.get('features') @property def sign_on_mode(self): """The sign on mode of the application. Returns: basestring: The sign on mode of the application """ return self._data.get('sign_on_mode') @property def credentials(self): """The credentials of the application. Returns: dictionary: The credentials of the application """ return self._data.get('credentials') @property def settings(self): """The settings of the application. Returns: dictionary: The settings of the application """ return self._data.get('settings', {}).get('app') @property def notification_settings(self): """The notification settings of the application. Returns: dictionary: The notification settings of the application """ return self._data.get('settings', {}).get('notifications') @property def sign_on_settings(self): """The sign on settings of the application. Returns: dictionary: The sign on settings of the application """ return self._data.get('settings', {}).get('signOn') @property def users(self): """The users of the application. Returns: generator: A generator of User objects for the users of the application """ url = self._data.get('_links', {}).get('users', {}).get('href') for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield User(self._okta, data) @property def groups(self): """The groups of the application. Returns: generator: A generator of Group objects for the groups of the application """ url = self._data.get('_links', {}).get('groups', {}).get('href') for group in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield self._okta.get_group_by_id(group.get('id', '')) @property def group_assignments(self): """The group assignments to the application. Returns: generator: A generator of group assignments for application """ url = self._data.get('_links', {}).get('groups', {}).get('href') for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield GroupAssignment(self._okta, data)
[docs] def get_group_assignment_by_group_name(self, name): """Retrieves a group assignment by a group name. Args: name: The name of the group assignment to retrieve. Returns: group_assignment (GroupAssignment) : The matching group assignment if found else None. """ return next((group for group in self.group_assignments if == name), None)
@property def user_assignments(self): """The user assignments to the application. Returns: generator: A generator of user assignments for application """ url = self._data.get('_links', {}).get('users', {}).get('href') for data in self._okta._get_paginated_url(url): # pylint: disable=protected-access # noqa yield UserAssignment(self._okta, data)
[docs] def get_user_assignment_by_email(self, email): """Retrieves a user assignment by a user email. Args: email: The email of the user assignment to retrieve. Returns: user_assignment (UserAssignment) : The matching user assignment if found else None. """ return next((user for user in self.user_assignments if == email.lower()), None)
[docs] def activate(self): """Activates the application. Returns: bool: True on success, False otherwise """ if self.status == 'ACTIVE': return True url = self._data.get('_links', {}).get('activate').get('href') response = if not response.ok: self._logger.error(f'Response: {response.text}') else: self._update() return response.ok
[docs] def deactivate(self): """Deactivates the application. Returns: bool: True on success, False otherwise """ if self.status == 'INACTIVE': return True url = self._data.get('_links', {}).get('deactivate').get('href') response = if not response.ok: self._logger.error(f'Response: {response.text}') else: self._update() return response.ok
[docs] def get_associated_saml_roles(self): """Returns the Saml IAM Roles associated with the application. Returns: list: List of saml iam roles """ url = f'{self._okta.api}/internal/apps/{}/types' response = self._okta.session.get(url) if not response.ok: self._logger.error(f'Response: {response.text}') return [] return response.json().get('SamlIamRole', [])
[docs] def add_group_by_id(self, group_id): """Adds a group to the application. Args: group_id: The id of the group to add Returns: True on success, False otherwise """ url = f'{self._okta.api}/apps/{}/groups/{group_id}' response = self._okta.session.put(url) if not response.ok: self._logger.error(f'Adding group failed. Response: {response.text}') return response.ok
[docs] def add_group_by_name(self, group_name): """Adds a group to the application. Args: group_name: The name of the group to add Returns: True on success, False otherwise """ group = self._okta.get_group_by_name(group_name) if not group: raise InvalidGroup(group_name) url = f'{self._okta.api}/apps/{}/groups/{}' response = self._okta.session.put(url, data=json.dumps({})) if not response.ok: self._logger.error(f'Adding group failed. Response: {response.text}') return response.ok
[docs] def remove_group_by_id(self, group_id): """Removes a group from the application. Args: group_id: The id of the group to remove Returns: True on success, False otherwise """ url = f'{self._okta.api}/apps/{}/groups/{group_id}' response = self._okta.session.delete(url) if not response.ok: self._logger.error(f'Removing group failed. Response: {response.text}') return response.ok
[docs] def remove_group_by_name(self, group_name): """Removes a group from the application. Args: group_name: The name of the group to remove Returns: True on success, False otherwise """ group = self._okta.get_group_by_name(group_name) if not group: raise InvalidGroup(group_name) url = f'{self._okta.api}/apps/{}/groups/{}' response = self._okta.session.delete(url) if not response.ok: self._logger.error(f'Removing group failed. Response: {response.text}') return response.ok
[docs] def assign_group_to_saml_user_roles(self, group_id, role, saml_roles): """Assigns an okta group to an okta application with saml user roles. Args: group_id: The id of the group to be associated role: The aws role that okta uses to assume SAML roles in other accounts saml_roles: the SAML Roles to be assumed Returns: Bool: The status of the assignment( True or False ) """ url = f'{self._okta.api}/apps/{}/groups/{group_id}' payload = {'id': group_id, 'profile': {'role': role, 'samlRoles': saml_roles}} response = self._okta.session.put(url, json=payload) if not response.ok: self._logger.error(f'Assigning group to the saml user roles failed. Response: {response.text}') return response.ok