Source code for nbiatoolkit.auth

import requests
import time
from typing import Union, Tuple
from .utils import NBIA_ENDPOINTS, NBIA_BASE_URLS
from cryptography.fernet import Fernet


[docs] def encrypt_credentials(key: bytes, username: str, password: str) -> Tuple[str, str]: """ Encrypts the given username and password using the provided key. Args: key (bytes): The encryption key. username (str): The username to be encrypted. password (str): The password to be encrypted. Returns: Tuple[str, str]: A tuple containing the encrypted username and password. """ cipher_suite = Fernet(key=key) encrypted_password = cipher_suite.encrypt(password.encode()).decode() encrypted_username = cipher_suite.encrypt(username.encode()).decode() return encrypted_username, encrypted_password
[docs] def decrypt_credentials( key: bytes, encrypted_username: str, encrypted_password: str ) -> tuple[str, str]: """ Decrypts the encrypted username and password using the provided key. Args: key (bytes): The encryption key used to decrypt the credentials. encrypted_username (str): The encrypted username. encrypted_password (str): The encrypted password. Returns: tuple[str, str]: A tuple containing the decrypted username and password. """ cipher_suite = Fernet(key=key) decrypted_username = cipher_suite.decrypt(encrypted_username.encode()).decode() decrypted_password = cipher_suite.decrypt(encrypted_password.encode()).decode() # return the decrypted client_id and username return decrypted_username, decrypted_password
[docs] class OAuth2: """ OAuth2 class for handling authentication and access token retrieval. This class provides methods to authenticate with the NBIA API using OAuth2 and retrieve the access token required for accessing the API. Defaults to using the NBIA Guest for accessing public collections. If you have a username and password which has been granted access to the collections tagged with "limited access" you can use those credentials to access those collections. Attributes ---------- client_id : str The client ID for authentication. username : str The username for authentication. password : str The password for authentication. access_token : str or None The access token retrieved from the API. api_headers : dict or None The authentication headers containing the access token. expiry_time : str or None The expiry time of the access token. refresh_token : str or None The refresh token for obtaining a new access token. refresh_expiry : int or None The expiry time of the refresh token. scope : str or None The scope of the access token. Methods ------- getToken() Authenticates with the API. Returns API headers containing the access token. Example Usage ------------- >>> from nbiatoolkit.auth import OAuth2 To use the NBIA Guest account: >>> oauth = OAuth2() To use a custom account: >>> oauth = OAuth2(username="my_username", password="my_password") Notes ----- This class is mainly for developers looking to add functionality to the nbiatoolkit package. If you are a user looking to access the NBIA API, you can use the `NBIAClient` class without knowledge of this class. As there are many packages for handling OAuth2 authentication, this class was for myself to learn how OAuth2 works and to provide a simple way to authenticate with the NBIA API. If you have any suggestions for improving this class, please open an issue on the GitHub repository. """ def __init__( self, username: str = "nbia_guest", password: str = "", client_id: str = "NBIA", base_url: str | NBIA_BASE_URLS = NBIA_BASE_URLS.NBIA, ) -> None: """ Initialize the OAuth2 class. Parameters ---------- username : str, optional The username for authentication. Default is "nbia_guest". password : str, optional The password for authentication. Default is an empty string. client_id : str, optional The client ID for authentication. Default is "NBIA". base_url : str or NBIA_BASE_URLS, optional. Default is NBIA_BASE_URLS.NBIA """ self.client_id = client_id self._fernet_key: bytes = Fernet.generate_key() self.username: str self.password: str self.username, self.password = encrypt_credentials( key=self.fernet_key, username=username, password=password ) self.username, self.password = encrypt_credentials( key=self._fernet_key, username=username, password=password ) if isinstance(base_url, NBIA_BASE_URLS): self.base_url = base_url.value else: self.base_url = base_url self._access_token = None self.expiry_time = None self.refresh_expiry = None self.refresh_token = "" # Fix: Assign an empty string instead of None self.scope = None @property def fernet_key(self) -> bytes: return self._fernet_key
[docs] def is_logged_out(self) -> bool: return ( self._access_token == "" and self.username == "" and self.password == "" and self.client_id == "" and self.base_url == "" )
@property def access_token(self) -> str | None: if self.is_logged_out(): return None # Check if access token is not set or it's expired if not self._access_token or self.is_token_expired(): self.refresh_token_or_request_new() return self._access_token
[docs] def is_token_expired(self) -> bool: # Check if the token expiration time is set and if it's expired return self.expiry_time is not None and time.time() > self.expiry_time
[docs] def refresh_token_or_request_new(self) -> None: if self.refresh_token != "": self._refresh_access_token() else: self.request_new_access_token()
[docs] def _refresh_access_token(self) -> None: assert self.refresh_token != "", "Refresh token is not set" # Prepare the request data data: dict[str, str] = { "refresh_token": self.refresh_token, "client_id": self.client_id, "grant_type": "refresh_token", } token_url: str = self.base_url + "oauth/token" response = requests.post(token_url, data=data) try: response.raise_for_status() except requests.exceptions.HTTPError as err: raise err else: token_data = response.json() self.set_token_data(token_data)
[docs] def request_new_access_token(self): data: dict[str, str] = { "username": decrypt_credentials( key=self.fernet_key, encrypted_username=self.username, encrypted_password=self.password, )[0], "password": decrypt_credentials( key=self.fernet_key, encrypted_username=self.username, encrypted_password=self.password, )[1], "client_id": self.client_id, "grant_type": "password", } token_url: str = self.base_url + "oauth/token" response: requests.models.Response response = requests.post(token_url, data=data) try: response = requests.post(token_url, data=data) response.raise_for_status() except requests.exceptions.HTTPError as err: raise err else: token_data = response.json() self.set_token_data(token_data)
[docs] def set_token_data(self, token_data: dict): self._access_token = token_data["access_token"] self.expiry_time = time.time() + int(token_data.get("expires_in") or 0) self.refresh_token: str = token_data["refresh_token"] self.refresh_expiry = token_data.get("refresh_expires_in") self.scope = token_data.get("scope")
@property def api_headers(self) -> dict[str, str]: return { "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json", } @property def token_expiration_time(self): return self.expiry_time @property def refresh_expiration_time(self): return self.refresh_expiry @property def token_scope(self): return self.scope
[docs] def __repr__(self) -> Union[str, None]: if self.username: return f"OAuth2(username={self.username}, client_id={self.client_id})" else: return ""
[docs] def __str__(self): if self.username: return f"OAuth2(username={self.username}, client_id={self.client_id})" else: return ""
[docs] def logout(self) -> None: """ Logs out the user and revokes the access token. This method sends a request to the NBIA API to revoke the access token and logs out the user. Notes ----- This method is not yet implemented in the NBIA API. """ if not self.access_token: return None query_url = NBIA_BASE_URLS.LOGOUT_URL.value response = requests.get(query_url, headers=self.api_headers) try: response.raise_for_status() except requests.exceptions.HTTPError as err: print(err) finally: # set the entire object to None self.__dict__.clear() self.username = "" self.password = "" self.client_id = "" self.base_url = "" self._access_token = "" self.expiry_time = None self.refresh_expiry = None self.refresh_token = "" self.scope = None self._fernet_key = b"" self = None return None