Skip to content

Module isilon.api.base

View Source
import attr

from isilon.creds import Credentials

from isilon.http import Http

@attr.s(frozen=True)

class BaseAPI:

    API_VERSION = "v1"

    http = attr.ib(type=Http, repr=False)

    url = attr.ib(type=str, validator=attr.validators.instance_of(str), converter=str)

    credentials = attr.ib(

        type=Credentials, validator=attr.validators.instance_of(Credentials)

    )

    async def base_request(self, fn, url, headers: dict = {}, *args, **kwargs):

        headers = await self.get_token(headers)

        response = await fn(url, headers=headers, *args, **kwargs)

        return response

    async def get_token(self, headers: dict) -> dict:

        token = await self.credentials.x_auth_token(self.url)

        headers.update(token)

        return headers

Classes

BaseAPI

class BaseAPI(
    http: isilon.http.Http,
    url,
    credentials: isilon.creds.Credentials
)
View Source
class BaseAPI:

    API_VERSION = "v1"

    http = attr.ib(type=Http, repr=False)

    url = attr.ib(type=str, validator=attr.validators.instance_of(str), converter=str)

    credentials = attr.ib(

        type=Credentials, validator=attr.validators.instance_of(Credentials)

    )

    async def base_request(self, fn, url, headers: dict = {}, *args, **kwargs):

        headers = await self.get_token(headers)

        response = await fn(url, headers=headers, *args, **kwargs)

        return response

    async def get_token(self, headers: dict) -> dict:

        token = await self.credentials.x_auth_token(self.url)

        headers.update(token)

        return headers

Descendants

  • isilon.api.accounts.Accounts
  • isilon.api.containers.Containers
  • isilon.api.discoverability.Discoverability
  • isilon.api.endpoints.Endpoints
  • isilon.api.objects.Objects

Class variables

API_VERSION
credentials
http
url

Methods

base_request
def base_request(
    self,
    fn,
    url,
    headers: dict = {},
    *args,
    **kwargs
)
View Source
    async def base_request(self, fn, url, headers: dict = {}, *args, **kwargs):

        headers = await self.get_token(headers)

        response = await fn(url, headers=headers, *args, **kwargs)

        return response
get_token
def get_token(
    self,
    headers: dict
) -> dict
View Source
    async def get_token(self, headers: dict) -> dict:

        token = await self.credentials.x_auth_token(self.url)

        headers.update(token)

        return headers