Source code for kappa_sdk.rest_api

import base64
import os
import requests
from typing import List, Union, Optional, Dict, Any
from .oauth_service import OAuthService
from ._private._configuration import Configuration
from .service_enum import ServiceEnum
import nest_asyncio
from requests import Response
from aiohttp import ClientResponse
import concurrent.futures

nest_asyncio.apply()


[docs] class RestAPI: """ A REST API facade for KAPPA Automate. .. note:: Should not be instantiated directly. """ def __init__(self, configuration: Configuration, oath_service: Optional[OAuthService] = None): self._configuration: Configuration = configuration self.__oauth_service: Optional[OAuthService] = oath_service @property def headers(self) -> Dict[str, str]: """ Gets the list of REST API headers that will be used to perform POST, PUT and DELETE operations. """ token = self.__oauth_service.get_oauth_token() if self.__oauth_service else None default_headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"} return {**self._configuration.headers, **default_headers} @property def request_headers(self) -> Dict[str, str]: """ Gets the list of REST API headers that will be used to perform GET operations. """ token = self.__oauth_service.get_oauth_token() if self.__oauth_service else None default_headers = {"Accept": "application/json", "Authorization": f"Bearer {token}"} return {**self._configuration.headers, **default_headers} def delete(self, service: ServiceEnum, request_url: str, dto: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: """ Performs a DELETE request. Parameters ---------- service: The name of the KAPPA Automate microservice. request_url: The request URL. dto: The DTO (as a JSON-serializable dictionary). """ if dto is None: dto = {} response = requests.delete(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, json=dto, headers=self.headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.json() if response.text else None def post(self, service: ServiceEnum, request_url: str, dto: Optional[Union[List[Any], Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]: """ Performs a POST request. Parameters ---------- service: The name of the KAPPA Automate microservice. request_url: The request URL. dto: The DTO (as a JSON-serializable dictionary). """ response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, json=dto, headers=self.headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.json() if response.text else None def put(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any], str]) -> Optional[Dict[str, Any]]: """ Performs a PUT request. Parameters ---------- service: The name of the KAPPA Automate microservice. request_url: The request URL. dto: The DTO (as a JSON-serializable dictionary). """ response = requests.put(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, json=dto, headers=self.headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.json() if response.text else None def get(self, service: ServiceEnum, request_url: str) -> Optional[Union[List[Any], Dict[str, Any]]]: """ Performs a GET request. Returns the DTO (as a JSON-serializable dictionary). Parameters ---------- service: The name of the KAPPA Automate microservice. request_url: The request URL. """ response = requests.get(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, headers=self.request_headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.json() if response.text else None def async_get_list(self, service: ServiceEnum, request_urls: List[str]) -> List[Dict[str, Any]]: """ Performs a series of GET requests. Each request is run asynchronous. When each of requests completes, returns a list of resulting DTOs (as JSON-serializable dictionaries). Parameters ---------- service: The name of the KAPPA Automate microservice. request_urls: A list of request URLs. """ results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(request_urls))) as executor: future_to_url = { executor.submit(self._get_single_url, service, url): url for url in request_urls } for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() results.append(data) except Exception as e: print(f"Error for {url}: {e}") return results def _get_single_url(self, service: ServiceEnum, request_url: str) -> Dict[str, Any]: url = self._configuration.get_request_url(str(service.value), request_url) response = requests.get( url=url, proxies=self._configuration.proxies, headers=self.request_headers, verify=self._configuration.verify_ssl ) RestAPI.__verify_response(response, service) return response.json() if response.text else {} def upload(self, service: ServiceEnum, request_url: str, file_path: str) -> Optional[Dict[str, Any]]: """ Performs a file upload via POST request. Parameters ---------- service: The name of the KAPPA Automate microservice. request_url: The request URL. file_name: Complete path and name of the file to upload. """ file_name = os.path.basename(file_path).split('/')[-1] message_bytes = file_name.encode('ascii') base64_bytes = base64.b64encode(message_bytes) encoded_file_name = base64_bytes.decode('ascii') file_headers = { "Content-Type": "application/octet-stream", "Content-Disposition": f"attachment; filename=\"{encoded_file_name}\"" } with open(file_path, 'rb') as f: response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, data=f, headers={**self.headers, **file_headers}, verify=False) RestAPI.__verify_response(response, service) return response.json() if response.text else None def _get_content(self, service: ServiceEnum, request_url: str) -> str: response = requests.get(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, headers=self.request_headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.text def _put_content(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any]]) -> str: response = requests.put(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, json=dto, headers=self.headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.text def _post_content(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any]]) -> str: response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, json=dto, headers=self.headers, verify=self._configuration.verify_ssl) response.encoding = 'utf-8' RestAPI.__verify_response(response, service) return response.text def _get_stream(self, service: ServiceEnum, request_url: str) -> Response: return requests.get(url=self._configuration.get_request_url(str(service.value), request_url), proxies=self._configuration.proxies, stream=True, headers=self.request_headers, verify=self._configuration.verify_ssl) @staticmethod def __verify_response(response: Union[Response, ClientResponse], service: ServiceEnum) -> None: if not response.ok: error = response.text if response.text else response.reason error = "\r\nResponse: {}".format(error) if error else "" if isinstance(response, Response): message = "Error {} occurred when communicating with the \"{}\" service\r\nRequest: {} {}{}".format( response.status_code, service, response.request.method, response.request.url, error) else: message = "Error occurend when communicating with the \"{}\" service\r\nRequest: {}".format(service, error) raise ConnectionError(message)