import base64
import os
import requests
from typing import List, Union, Optional, Dict, Any
from .oauth_service import OAuthService
from ._private._configuration import Configuration
from .service_enum import ServiceEnum
import nest_asyncio
from requests import Response
from aiohttp import ClientResponse
import concurrent.futures
nest_asyncio.apply()
[docs]
class RestAPI:
""" A REST API facade for KAPPA Automate.
.. note:: Should not be instantiated directly.
"""
def __init__(self, configuration: Configuration, oath_service: Optional[OAuthService] = None):
self._configuration: Configuration = configuration
self.__oauth_service: Optional[OAuthService] = oath_service
@property
def headers(self) -> Dict[str, str]:
"""
Gets the list of REST API headers that will be used to perform POST, PUT and DELETE operations.
"""
token = self.__oauth_service.get_oauth_token() if self.__oauth_service else None
default_headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
return {**self._configuration.headers, **default_headers}
@property
def request_headers(self) -> Dict[str, str]:
"""
Gets the list of REST API headers that will be used to perform GET operations.
"""
token = self.__oauth_service.get_oauth_token() if self.__oauth_service else None
default_headers = {"Accept": "application/json", "Authorization": f"Bearer {token}"}
return {**self._configuration.headers, **default_headers}
def delete(self, service: ServiceEnum, request_url: str, dto: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
""" Performs a DELETE request.
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_url:
The request URL.
dto:
The DTO (as a JSON-serializable dictionary).
"""
if dto is None:
dto = {}
response = requests.delete(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
json=dto,
headers=self.headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.json() if response.text else None
def post(self, service: ServiceEnum, request_url: str, dto: Optional[Union[List[Any], Dict[str, Any]]] = None) -> Optional[Dict[str, Any]]:
""" Performs a POST request.
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_url:
The request URL.
dto:
The DTO (as a JSON-serializable dictionary).
"""
response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
json=dto,
headers=self.headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.json() if response.text else None
def put(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any], str]) -> Optional[Dict[str, Any]]:
""" Performs a PUT request.
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_url:
The request URL.
dto:
The DTO (as a JSON-serializable dictionary).
"""
response = requests.put(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
json=dto,
headers=self.headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.json() if response.text else None
def get(self, service: ServiceEnum, request_url: str) -> Optional[Union[List[Any], Dict[str, Any]]]:
""" Performs a GET request.
Returns the DTO (as a JSON-serializable dictionary).
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_url:
The request URL.
"""
response = requests.get(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
headers=self.request_headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.json() if response.text else None
def async_get_list(self, service: ServiceEnum, request_urls: List[str]) -> List[Dict[str, Any]]:
""" Performs a series of GET requests.
Each request is run asynchronous. When each of requests completes, returns a list of resulting DTOs (as JSON-serializable dictionaries).
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_urls:
A list of request URLs.
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(request_urls))) as executor:
future_to_url = {
executor.submit(self._get_single_url, service, url): url
for url in request_urls
}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
results.append(data)
except Exception as e:
print(f"Error for {url}: {e}")
return results
def _get_single_url(self, service: ServiceEnum, request_url: str) -> Dict[str, Any]:
url = self._configuration.get_request_url(str(service.value), request_url)
response = requests.get(
url=url,
proxies=self._configuration.proxies,
headers=self.request_headers,
verify=self._configuration.verify_ssl
)
RestAPI.__verify_response(response, service)
return response.json() if response.text else {}
def upload(self, service: ServiceEnum, request_url: str, file_path: str) -> Optional[Dict[str, Any]]:
""" Performs a file upload via POST request.
Parameters
----------
service:
The name of the KAPPA Automate microservice.
request_url:
The request URL.
file_name:
Complete path and name of the file to upload.
"""
file_name = os.path.basename(file_path).split('/')[-1]
message_bytes = file_name.encode('ascii')
base64_bytes = base64.b64encode(message_bytes)
encoded_file_name = base64_bytes.decode('ascii')
file_headers = {
"Content-Type": "application/octet-stream",
"Content-Disposition": f"attachment; filename=\"{encoded_file_name}\""
}
with open(file_path, 'rb') as f:
response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
data=f,
headers={**self.headers, **file_headers},
verify=False)
RestAPI.__verify_response(response, service)
return response.json() if response.text else None
def _get_content(self, service: ServiceEnum, request_url: str) -> str:
response = requests.get(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
headers=self.request_headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.text
def _put_content(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any]]) -> str:
response = requests.put(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
json=dto,
headers=self.headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.text
def _post_content(self, service: ServiceEnum, request_url: str, dto: Union[List[Any], Dict[str, Any]]) -> str:
response = requests.post(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
json=dto,
headers=self.headers,
verify=self._configuration.verify_ssl)
response.encoding = 'utf-8'
RestAPI.__verify_response(response, service)
return response.text
def _get_stream(self, service: ServiceEnum, request_url: str) -> Response:
return requests.get(url=self._configuration.get_request_url(str(service.value), request_url),
proxies=self._configuration.proxies,
stream=True,
headers=self.request_headers,
verify=self._configuration.verify_ssl)
@staticmethod
def __verify_response(response: Union[Response, ClientResponse], service: ServiceEnum) -> None:
if not response.ok:
error = response.text if response.text else response.reason
error = "\r\nResponse: {}".format(error) if error else ""
if isinstance(response, Response):
message = "Error {} occurred when communicating with the \"{}\" service\r\nRequest: {} {}{}".format(
response.status_code,
service,
response.request.method,
response.request.url,
error)
else:
message = "Error occurend when communicating with the \"{}\" service\r\nRequest: {}".format(service, error)
raise ConnectionError(message)