Source code for oktalib.oktalib

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: oktalib.py
#
# Copyright 2018 Costas Tyfoxylos
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
#  DEALINGS IN THE SOFTWARE.
#

"""
Main code for oktalib.

.. _Google Python Style Guide:
   https://google.github.io/styleguide/pyguide.html

"""

import json
import logging

import backoff
from requests import Session

from .entities import (Group,
                       User,
                       Application,
                       AdminRole)
from .oktalibexceptions import (AuthFailed,
                                InvalidGroup,
                                InvalidApplication,
                                ApiLimitReached,
                                ServerError)

__author__ = '''Costas Tyfoxylos <ctyfoxylos@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''2018-01-08'''
__copyright__ = '''Copyright 2018, Costas Tyfoxylos'''
__credits__ = ["Costas Tyfoxylos"]
__license__ = '''MIT'''
__maintainer__ = '''Costas Tyfoxylos'''
__email__ = '''<ctyfoxylos@schubergphilis.com>'''
__status__ = '''Development'''  # "Prototype", "Development", "Production".

# This is the main prefix used for logging
LOGGER_BASENAME = '''oktalib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())


[docs] class Okta: """Models the api of okta.""" def __init__(self, host, token): logger_name = f'{LOGGER_BASENAME}.{self.__class__.__name__}' self._logger = logging.getLogger(logger_name) self.host = host self.api = f'{host}/api/v1' self.token = token self.session = self._setup_session() self._monkey_patch_session() def _setup_session(self): session = Session() session.get(self.host) session.headers.update({'accept': 'application/json', 'content-type': 'application/json', 'authorization': f'SSWS {self.token}'}) url = f'{self.api}/users/me/' response = session.get(url) if not response.ok: raise AuthFailed(response.content) return session def _monkey_patch_session(self): """Gets original request method and overrides it with the patched one. Returns: Response: Response instance. """ self.session.original_request = self.session.request self.session.request = self._patched_request @backoff.on_exception(backoff.expo, ApiLimitReached, max_time=60) def _patched_request(self, method, url, **kwargs): """Patch the original request method from requests.Sessions library. Args: method (str): HTTP verb as string. url (str): string. kwargs: keyword arguments. Raises: ApiLimitReached: Raised when the Okta API limit is reached. Returns: Response: Response instance. """ self._logger.debug(f'Using patched request for method {method}, url {url}, kwargs {kwargs}') response = self.session.original_request(method, url, **kwargs) # noqa if response.status_code == 429: self._logger.warning('Api is exhausted for endpoint, backing off.') raise ApiLimitReached return response @property def groups(self): """The groups configured in okta. Returns: generator: The generator of groups configured in okta """ url = f'{self.api}/groups' for data in self._get_paginated_url(url): yield Group(self, data)
[docs] def create_group(self, name, description): """Creates a group in okta. Args: name: The name of the group to create description: The description of the group to create Returns: The created group object on success, None otherwise """ url = f'{self.api}/groups' payload = {'profile': {'name': name, 'description': description}} response = self.session.post(url, data=json.dumps(payload)) if not response.ok: self._logger.error(response.json()) return Group(self, response.json()) if response.ok else None
[docs] def get_group_type_by_name(self, name, group_type='OKTA_GROUP'): """Retrieves the group type of okta by name. Args: group_type: The type of okta group to retrieve name: The name of the group to retrieve Returns: Group: The group if a match is found else None """ group = next((group for group in self.search_groups_by_name(name) if group.type == group_type), None) return group
[docs] def get_group_by_name(self, name): """Retrieves the first group (of any type) by name. Args: name: The name of the group to retrieve Returns: Group: The group if a match is found else None """ return next((group for group in self.search_groups_by_name(name) if group.name == name), None)
[docs] def get_group_by_id(self, group_id): """Retrieves the group (of any type) by id. Args: group_id: The id of the group to retrieve Returns: Group: The group if a match is found else None """ url = f'{self.api}/groups/{group_id}' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return Group(self, response.json()) if response.ok else None
[docs] def search_groups_by_name(self, name): """Retrieves the groups (of any type) by name. Args: name: The name of the groups to retrieve Returns: list: A list of groups if a match is found else an empty list """ url = f'{self.api}/groups?q={name}' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return [Group(self, data) for data in response.json()] if response.ok else []
[docs] def delete_group(self, name): """Deletes a group from okta. Args: name: The name of the group to delete Returns: bool: True on success, False otherwise Raises: InvalidGroup: The group provided as argument does not exist. """ group = self.get_group_by_name(name) if not group: raise InvalidGroup(name) return group.delete()
def _get_paginated_url(self, url, result_limit=100): response = self._validate_response(url, {'limit': result_limit}) yield from response.json() next_link = response.links.get('next', {}).get('url') while next_link: response = self._validate_response(url=next_link) yield from response.json() next_link = response.links.get('next', {}).get('url') def _validate_response(self, url, params=None): response = self.session.get(url=url, params=params) if not response.ok: try: error_message = response.json().get('errorSummary') except (ValueError, AttributeError): error_message = response.text raise ServerError(error_message) from None return response @property def users(self): """The users configured in okta. Returns: generator: The generator of users configured in okta """ url = f'{self.api}/users' for data in self._get_paginated_url(url): yield User(self, data)
[docs] def create_user(self, # pylint: disable=too-many-arguments first_name, last_name, email, login, password=None, enabled=True): """Creates a user in okta. Args: first_name: The first name of the user last_name: The last name of the user email: The email of the user login: The login of the user password: The password of the user enabled: A flag whether the user should be enabled or not Defaults to True Returns: User: The created user on success, None otherwise """ enabled = 'true' if enabled else 'false' url = f'{self.api}/users?activate={enabled}' payload = {'profile': {'firstName': first_name, 'lastName': last_name, 'email': email, 'login': login}} if password: payload.update({'credentials': {'password': {'value': password}}}) response = self.session.post(url=url, data=json.dumps(payload)) if not response.ok: self._logger.error(response.json()) return User(self, response.json()) if response.ok else None
[docs] def get_user_by_login(self, login): """Retrieves a user by login. Args: login: The login to match the user with Returns: User: The user if found, None otherwise """ url = f'{self.api}/users?filter=profile.login+eq+"{login}"' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return None return next((User(self, data) for data in response.json() if data.get('profile', {}).get('login', '') == login), None)
[docs] def search_users(self, value): """Retrieves a list of users by looking into name, last name and email. Args: value: The value to match with Returns: list: The users if found, empty list otherwise """ url = f'{self.api}/users?q={value}' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return [User(self, data) for data in response.json()]
[docs] def search_users_by_email(self, email): """Retrieves a list of users by email. Args: email: The email to match the user with Returns: list: The users if found, empty list otherwise """ url = f'{self.api}/users?filter=profile.email+eq+"{email}"' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return [User(self, data) for data in response.json()]
[docs] def get_user_assigned_roles_by_id(self, user_id): """Retrieves if any, admin roles assigned to the user by id. Args: id: The user ID to match the user with Returns: list: A list of the user's roles if found, None otherwise """ url = f'{self.api}/users/{user_id}/roles' response = self.session.get(url) if not response.ok: self._logger.error(response.json()) return None return [AdminRole(self, data) for data in response.json()]
[docs] def assign_role_to_user_by_id(self, user_id, role_name): """Assigns an admin role to a user by id. Args: user_id: The user ID to match the user with role_name: The name of the role to assign Returns: User: The response, None otherwise """ url = f'{self.api}/users/{user_id}/roles' data = {"type": role_name} response = self.session.post(url, json=data) if not response.ok: self._logger.error(response.json()) return None return AdminRole(self, response.json())
[docs] def remove_role_from_user_by_id(self, user_id, role_id): """Remove an admin role from a user by id. Args: user_id: The user ID to match the user with role_id: The id of the role to remove Returns: User: The response, None otherwise """ url = f'{self.api}/users/{user_id}/roles/{role_id}' response = self.session.delete(url) if not response.ok: self._logger.error(response.json()) return False return True
@property def applications(self): """The applications configured in okta. Returns: generator: The generator of applications configured in okta """ url = f'{self.api}/apps' for data in self._get_paginated_url(url): yield Application(self, data)
[docs] def get_application_by_id(self, id_): """Retrieves an application by id. Args: id_: The id of the application to retrieve Returns: Application Object """ app = next((app for app in self.applications if app.id == id_), None) return app
[docs] def get_application_by_label(self, label): """Retrieves an application by label. Args: label: The label of the application to retrieve Returns: Application Object """ app = next((app for app in self.applications if app.label.lower() == label.lower()), None) return app
[docs] def assign_group_to_application(self, application_label, group_name): """Assigns a group to an application. Args: application_label: The label of the application to assign the group to group_name: The group name to assign to the application Returns: True on success, False otherwise """ application = self.get_application_by_label(application_label) if not application: raise InvalidApplication(application_label) group = self.get_group_by_name(group_name) if not group: raise InvalidGroup(group_name) return application.add_group_by_id(group.id)
[docs] def remove_group_from_application(self, application_label, group_name): """Removes a group from an application. Args: application_label: The label of the application to remove the group from group_name: The name of the group to remove from the application Returns: True on success, False otherwise """ application = self.get_application_by_label(application_label) if not application: raise InvalidApplication(application_label) group = self.get_group_by_name(group_name) if not group: raise InvalidGroup(group_name) return application.remove_group_by_id(group.id)