Module faunadb.client

Expand source code
import os
import platform
import sys
import threading
# pylint: disable=redefined-builtin
from builtins import object
from time import time

from requests import Request, Session, get
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase

from faunadb import __api_version__ as api_version
from faunadb import __version__ as pkg_version
from faunadb._json import parse_json_or_none, to_json
from faunadb.errors import FaunaError, UnexpectedError, _get_or_raise
from faunadb.query import _wrap
from faunadb.request_result import RequestResult
from faunadb.streams import Subscription


class HTTPBearerAuth(AuthBase):
    """Creates a bearer base auth object"""

    def auth_header(self):
        return "Bearer {}".format(self.secret)

    def __init__(self, secret):
        self.secret = secret

    def __eq__(self, other):
        return self.secret == getattr(other, 'secret', None)

    def __ne__(self, other):
        return not self == other

    def __call__(self, r):
        r.headers['Authorization'] = self.auth_header()
        return r


class RuntimeEnvHeader:
    def __init__(self):
        self.pythonVersion = "{0}.{1}.{2}-{3}".format(*sys.version_info)
        self.driverVersion = pkg_version
        self.env = self.getRuntimeEnv()
        self.os = "{0}-{1}".format(platform.system(), platform.release())

    def getRuntimeEnv(self):
        env = [
            {
                "name": "Netlify",
                "check": lambda: "NETLIFY_IMAGES_CDN_DOMAIN" in os.environ
            },
            {
                "name": "Vercel",
                "check": lambda: "VERCEL" in os.environ
            },
            {
                "name": "Heroku",
                "check": lambda: "PATH" in os.environ and ".heroku" in os.environ["PATH"]
            },
            {
                "name": "AWS Lambda",
                "check": lambda: "AWS_LAMBDA_FUNCTION_VERSION" in os.environ
            },
            {
                "name": "GCP Cloud Functions",
                "check": lambda: "_" in os.environ and "google" in os.environ["_"]
            },
            {
                "name": "GCP Compute Instances",
                "check": lambda: "GOOGLE_CLOUD_PROJECT" in os.environ
            },
            {
                "name": "Azure Cloud Functions",
                "check": lambda: "WEBSITE_FUNCTIONS_AZUREMONITOR_CATEGORIES" in os.environ
            },
            {
                "name": "Azure Compute",
                "check": lambda: "ORYX_ENV_TYPE" in os.environ and "WEBSITE_INSTANCE_ID" in os.environ and os.environ["ORYX_ENV_TYPE"] == "AppService"
            }
        ]

        try:
            recognized = next(e for e in env if e.get("check")())
            if recognized is not None:
                return recognized.get("name")
        except:
            return "Unknown"

    def __str__(self):
        return "driver=python-{0}; runtime=python-{1} env={2}; os={3}".format(
            self.driverVersion, self.pythonVersion,
            self.env, self.os
        ).lower()


class _LastTxnTime(object):
    """Wraps tracking the last transaction time supplied from the database."""

    def __init__(self):
        self._lock = threading.Lock()
        self._time = None

    @property
    def time(self):
        """Produces the last transaction time, or, None if not yet updated."""
        with self._lock:
            return self._time

    @property
    def request_header(self):
        """Produces a dictionary with a non-zero `X-Last-Txn-Time` header; or,
        if one has not yet been set, the empty header dictionary."""
        t = self.time
        if t is None:
            return {}
        return {"X-Last-Txn-Time": str(t)}

    def update_txn_time(self, new_txn_time):
        """Updates the internal transaction time.
        In order to maintain a monotonically-increasing value, `newTxnTime`
        is discarded if it is behind the current timestamp."""
        with self._lock:
            if self._time is None:
                self._time = new_txn_time
            else:
                self._time = max(self._time, new_txn_time)


class _Counter(object):
    def __init__(self, init_value=0):
        self.lock = threading.Lock()
        self.counter = init_value

    def __str__(self):
        return "Counter(%s)" % self.counter

    def get_and_increment(self):
        with self.lock:
            counter = self.counter
            self.counter += 1
            return counter

    def decrement(self):
        with self.lock:
            self.counter -= 1
            return self.counter


class FaunaClient(object):
    """
    Directly communicates with FaunaDB via JSON.

    For data sent to the server, the ``to_fauna_json`` method will be called on any values.
    It is encouraged to pass e.g. :any:`Ref` objects instead of raw JSON data.

    All methods return a converted JSON response.
    This is a dict containing lists, ints, floats, strings, and other dicts.
    Any :any:`Ref`, :any:`SetRef`, :any:`FaunaTime`, or :class:`datetime.date`
    values in it will also be parsed.
    (So instead of ``{"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}``,
    you will get ``Ref("frogs", Native.CLASSES)``.)
    """

    # pylint: disable=too-many-arguments, too-many-instance-attributes
    def __init__(
            self,
            secret,
            domain="db.fauna.com",
            scheme="https",
            port=None,
            timeout=60,
            observer=None,
            pool_connections=10,
            pool_maxsize=10,
            **kwargs):
        """
        :param secret:
          Auth token for the FaunaDB server.
        :param domain:
          Base URL for the FaunaDB server.
        :param scheme:
          ``"http"`` or ``"https"``.
        :param port:
          Port of the FaunaDB server.
        :param timeout:
          Read timeout in seconds.
        :param observer:
          Callback that will be passed a :any:`RequestResult` after every completed request.
        :param pool_connections:
          The number of connection pools to cache.
        :param pool_maxsize:
          The maximum number of connections to save in the pool.
        """

        self.check_new_version()

        self.domain = domain
        self.scheme = scheme
        self.port = (443 if scheme ==
                     "https" else 80) if port is None else port

        self.auth = HTTPBearerAuth(secret)
        self.base_url = "%s://%s:%s" % (self.scheme, self.domain, self.port)
        self.observer = observer

        self.pool_connections = pool_connections
        self.pool_maxsize = pool_maxsize

        self._last_txn_time = kwargs.get('last_txn_time') or _LastTxnTime()
        self._query_timeout_ms = kwargs.get('query_timeout_ms')
        if self._query_timeout_ms is not None:
            self._query_timeout_ms = int(self._query_timeout_ms)

        if ('session' not in kwargs) or ('counter' not in kwargs):
            self.session = Session()
            self.session.mount('https://', HTTPAdapter(pool_connections=pool_connections,
                                                       pool_maxsize=pool_maxsize))
            self.session.mount('http://', HTTPAdapter(pool_connections=pool_connections,
                                                      pool_maxsize=pool_maxsize))
            self.counter = _Counter(1)

            self.session.headers.update({
                "Accept-Encoding": "gzip",
                "Content-Type": "application/json;charset=utf-8",
                "X-Fauna-Driver": "python",
                "X-FaunaDB-API-Version": api_version,
                "X-Driver-Env": str(RuntimeEnvHeader())
            })
            if self._query_timeout_ms is not None:
                self.session.headers["X-Query-Timeout"] = str(
                    self._query_timeout_ms)
            self.session.timeout = timeout
        else:
            self.session = kwargs['session']
            self.counter = kwargs['counter']

    def check_new_version(self):
        response = get('https://pypi.org/pypi/faunadb/json')
        latest_version = response.json().get('info').get('version')

        if latest_version > pkg_version:
            msg1 = "New fauna version available {} => {}".format(
                pkg_version, latest_version)
            msg2 = "Changelog: https://github.com/fauna/faunadb-python/blob/main/CHANGELOG.md"
            width = 80
            print('+' + '-' * width + '+')
            print('| ' + msg1 + ' ' * (width - len(msg1) - 1) + '|')
            print('| ' + msg2 + ' ' * (width - len(msg2) - 1) + '|')
            print('+' + '-' * width + '+')

    def sync_last_txn_time(self, new_txn_time):
        """
        Sync the freshest timestamp seen by this client.

        This has no effect if staler than currently stored timestamp.
        WARNING: This should be used only when coordinating timestamps across
                multiple clients. Moving the timestamp arbitrarily forward into
                the future will cause transactions to stall.

        :param new_txn_time: the new seen transaction time.
        """
        self._last_txn_time.update_txn_time(new_txn_time)

    def get_last_txn_time(self):
        """
        Get the freshest timestamp reported to this client.
        :return:
        """
        return self._last_txn_time.time

    def get_query_timeout(self):
        """
        Get the query timeout for all queries.
        """
        return self._query_timeout_ms

    def __del__(self):
        if self.counter.decrement() == 0:
            self.session.close()

    def query(self, expression, timeout_millis=None):
        """
        Use the FaunaDB query API.

        :param expression: A query. See :doc:`query` for information on queries.
        :param timeout_millis: Query timeout in milliseconds.
        :return: Converted JSON response.
        """
        return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)

    def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None):
        """
        Creates a stream Subscription to the result of the given read-only expression. When
        executed.

        The subscription returned by this method does not issue any requests until
        the subscription's start method is called. Make sure to
        subscribe to the events of interest, otherwise the received events are simply
        ignored.

        :param expression:   A read-only expression.
        :param    options:   Object that configures the stream subscription. E.g set fields to return
        :param   on_start:   Callback for the stream's start event.
        :param   on_error:   Callback for the stream's error event.
        :param   on_version: Callback for the stream's version events.
        :param   on_history: Callback for the stream's history_rewrite events.
        """
        subscription = Subscription(self, expression, options)
        subscription.on('start', on_start)
        subscription.on('error', on_error)
        subscription.on('version', on_version)
        subscription.on('history_rewrite', on_history)
        return subscription

    def ping(self, scope=None, timeout=None):
        """
        Ping FaunaDB.
        """
        return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout})

    def new_session_client(self, secret, observer=None):
        """
        Create a new client from the existing config with a given secret.
        The returned client share its parent underlying resources.

        :param secret:
          Credentials to use when sending requests.
        :param observer:
          Callback that will be passed a :any:`RequestResult` after every completed request.
        :return:
        """
        if self.counter.get_and_increment() > 0:
            return FaunaClient(secret=secret,
                               domain=self.domain,
                               scheme=self.scheme,
                               port=self.port,
                               timeout=self.session.timeout,
                               observer=observer or self.observer,
                               session=self.session,
                               counter=self.counter,
                               pool_connections=self.pool_connections,
                               pool_maxsize=self.pool_maxsize,
                               last_txn_time=self._last_txn_time,
                               query_timeout_ms=self._query_timeout_ms)
        else:
            raise UnexpectedError(
                "Cannnot create a session client from a closed session", None)

    def _execute(self, action, path, data=None, query=None, with_txn_time=False, query_timeout_ms=None):
        """Performs an HTTP action, logs it, and looks for errors."""
        if query is not None:
            query = {k: v for k, v in query.items() if v is not None}

        headers = {}

        if query_timeout_ms is not None:
            headers["X-Query-Timeout"] = str(query_timeout_ms)

        if with_txn_time:
            headers.update(self._last_txn_time.request_header)

        start_time = time()
        response = self._perform_request(action, path, data, query, headers)
        end_time = time()

        if with_txn_time:
            if "X-Txn-Time" in response.headers:
                new_txn_time = int(response.headers["X-Txn-Time"])
                self.sync_last_txn_time(new_txn_time)

        response_raw = response.text
        response_content = parse_json_or_none(response_raw)

        request_result = RequestResult(
            action, path, query, data,
            response_raw, response_content, response.status_code, response.headers,
            start_time, end_time)

        if self.observer is not None:
            self.observer(request_result)

        if response_content is None:
            raise UnexpectedError("Invalid JSON.", request_result)

        FaunaError.raise_for_status_code(request_result)
        return _get_or_raise(request_result, response_content, "resource")

    def _perform_request(self, action, path, data, query, headers):
        """Performs an HTTP action."""
        url = self.base_url + "/" + path
        req = Request(action, url, params=query, data=to_json(
            data), auth=self.auth, headers=headers)
        return self.session.send(self.session.prepare_request(req))

Classes

class FaunaClient (secret, domain='db.fauna.com', scheme='https', port=None, timeout=60, observer=None, pool_connections=10, pool_maxsize=10, **kwargs)

Directly communicates with FaunaDB via JSON.

For data sent to the server, the to_fauna_json method will be called on any values. It is encouraged to pass e.g. :any:Ref objects instead of raw JSON data.

All methods return a converted JSON response. This is a dict containing lists, ints, floats, strings, and other dicts. Any :any:Ref, :any:SetRef, :any:FaunaTime, or :class:datetime.date values in it will also be parsed. (So instead of {"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}, you will get Ref("frogs", Native.CLASSES).)

:param secret: Auth token for the FaunaDB server. :param domain: Base URL for the FaunaDB server. :param scheme: "http" or "https". :param port: Port of the FaunaDB server. :param timeout: Read timeout in seconds. :param observer: Callback that will be passed a :any:RequestResult after every completed request. :param pool_connections: The number of connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool.

Expand source code
class FaunaClient(object):
    """
    Directly communicates with FaunaDB via JSON.

    For data sent to the server, the ``to_fauna_json`` method will be called on any values.
    It is encouraged to pass e.g. :any:`Ref` objects instead of raw JSON data.

    All methods return a converted JSON response.
    This is a dict containing lists, ints, floats, strings, and other dicts.
    Any :any:`Ref`, :any:`SetRef`, :any:`FaunaTime`, or :class:`datetime.date`
    values in it will also be parsed.
    (So instead of ``{"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}``,
    you will get ``Ref("frogs", Native.CLASSES)``.)
    """

    # pylint: disable=too-many-arguments, too-many-instance-attributes
    def __init__(
            self,
            secret,
            domain="db.fauna.com",
            scheme="https",
            port=None,
            timeout=60,
            observer=None,
            pool_connections=10,
            pool_maxsize=10,
            **kwargs):
        """
        :param secret:
          Auth token for the FaunaDB server.
        :param domain:
          Base URL for the FaunaDB server.
        :param scheme:
          ``"http"`` or ``"https"``.
        :param port:
          Port of the FaunaDB server.
        :param timeout:
          Read timeout in seconds.
        :param observer:
          Callback that will be passed a :any:`RequestResult` after every completed request.
        :param pool_connections:
          The number of connection pools to cache.
        :param pool_maxsize:
          The maximum number of connections to save in the pool.
        """

        self.check_new_version()

        self.domain = domain
        self.scheme = scheme
        self.port = (443 if scheme ==
                     "https" else 80) if port is None else port

        self.auth = HTTPBearerAuth(secret)
        self.base_url = "%s://%s:%s" % (self.scheme, self.domain, self.port)
        self.observer = observer

        self.pool_connections = pool_connections
        self.pool_maxsize = pool_maxsize

        self._last_txn_time = kwargs.get('last_txn_time') or _LastTxnTime()
        self._query_timeout_ms = kwargs.get('query_timeout_ms')
        if self._query_timeout_ms is not None:
            self._query_timeout_ms = int(self._query_timeout_ms)

        if ('session' not in kwargs) or ('counter' not in kwargs):
            self.session = Session()
            self.session.mount('https://', HTTPAdapter(pool_connections=pool_connections,
                                                       pool_maxsize=pool_maxsize))
            self.session.mount('http://', HTTPAdapter(pool_connections=pool_connections,
                                                      pool_maxsize=pool_maxsize))
            self.counter = _Counter(1)

            self.session.headers.update({
                "Accept-Encoding": "gzip",
                "Content-Type": "application/json;charset=utf-8",
                "X-Fauna-Driver": "python",
                "X-FaunaDB-API-Version": api_version,
                "X-Driver-Env": str(RuntimeEnvHeader())
            })
            if self._query_timeout_ms is not None:
                self.session.headers["X-Query-Timeout"] = str(
                    self._query_timeout_ms)
            self.session.timeout = timeout
        else:
            self.session = kwargs['session']
            self.counter = kwargs['counter']

    def check_new_version(self):
        response = get('https://pypi.org/pypi/faunadb/json')
        latest_version = response.json().get('info').get('version')

        if latest_version > pkg_version:
            msg1 = "New fauna version available {} => {}".format(
                pkg_version, latest_version)
            msg2 = "Changelog: https://github.com/fauna/faunadb-python/blob/main/CHANGELOG.md"
            width = 80
            print('+' + '-' * width + '+')
            print('| ' + msg1 + ' ' * (width - len(msg1) - 1) + '|')
            print('| ' + msg2 + ' ' * (width - len(msg2) - 1) + '|')
            print('+' + '-' * width + '+')

    def sync_last_txn_time(self, new_txn_time):
        """
        Sync the freshest timestamp seen by this client.

        This has no effect if staler than currently stored timestamp.
        WARNING: This should be used only when coordinating timestamps across
                multiple clients. Moving the timestamp arbitrarily forward into
                the future will cause transactions to stall.

        :param new_txn_time: the new seen transaction time.
        """
        self._last_txn_time.update_txn_time(new_txn_time)

    def get_last_txn_time(self):
        """
        Get the freshest timestamp reported to this client.
        :return:
        """
        return self._last_txn_time.time

    def get_query_timeout(self):
        """
        Get the query timeout for all queries.
        """
        return self._query_timeout_ms

    def __del__(self):
        if self.counter.decrement() == 0:
            self.session.close()

    def query(self, expression, timeout_millis=None):
        """
        Use the FaunaDB query API.

        :param expression: A query. See :doc:`query` for information on queries.
        :param timeout_millis: Query timeout in milliseconds.
        :return: Converted JSON response.
        """
        return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)

    def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None):
        """
        Creates a stream Subscription to the result of the given read-only expression. When
        executed.

        The subscription returned by this method does not issue any requests until
        the subscription's start method is called. Make sure to
        subscribe to the events of interest, otherwise the received events are simply
        ignored.

        :param expression:   A read-only expression.
        :param    options:   Object that configures the stream subscription. E.g set fields to return
        :param   on_start:   Callback for the stream's start event.
        :param   on_error:   Callback for the stream's error event.
        :param   on_version: Callback for the stream's version events.
        :param   on_history: Callback for the stream's history_rewrite events.
        """
        subscription = Subscription(self, expression, options)
        subscription.on('start', on_start)
        subscription.on('error', on_error)
        subscription.on('version', on_version)
        subscription.on('history_rewrite', on_history)
        return subscription

    def ping(self, scope=None, timeout=None):
        """
        Ping FaunaDB.
        """
        return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout})

    def new_session_client(self, secret, observer=None):
        """
        Create a new client from the existing config with a given secret.
        The returned client share its parent underlying resources.

        :param secret:
          Credentials to use when sending requests.
        :param observer:
          Callback that will be passed a :any:`RequestResult` after every completed request.
        :return:
        """
        if self.counter.get_and_increment() > 0:
            return FaunaClient(secret=secret,
                               domain=self.domain,
                               scheme=self.scheme,
                               port=self.port,
                               timeout=self.session.timeout,
                               observer=observer or self.observer,
                               session=self.session,
                               counter=self.counter,
                               pool_connections=self.pool_connections,
                               pool_maxsize=self.pool_maxsize,
                               last_txn_time=self._last_txn_time,
                               query_timeout_ms=self._query_timeout_ms)
        else:
            raise UnexpectedError(
                "Cannnot create a session client from a closed session", None)

    def _execute(self, action, path, data=None, query=None, with_txn_time=False, query_timeout_ms=None):
        """Performs an HTTP action, logs it, and looks for errors."""
        if query is not None:
            query = {k: v for k, v in query.items() if v is not None}

        headers = {}

        if query_timeout_ms is not None:
            headers["X-Query-Timeout"] = str(query_timeout_ms)

        if with_txn_time:
            headers.update(self._last_txn_time.request_header)

        start_time = time()
        response = self._perform_request(action, path, data, query, headers)
        end_time = time()

        if with_txn_time:
            if "X-Txn-Time" in response.headers:
                new_txn_time = int(response.headers["X-Txn-Time"])
                self.sync_last_txn_time(new_txn_time)

        response_raw = response.text
        response_content = parse_json_or_none(response_raw)

        request_result = RequestResult(
            action, path, query, data,
            response_raw, response_content, response.status_code, response.headers,
            start_time, end_time)

        if self.observer is not None:
            self.observer(request_result)

        if response_content is None:
            raise UnexpectedError("Invalid JSON.", request_result)

        FaunaError.raise_for_status_code(request_result)
        return _get_or_raise(request_result, response_content, "resource")

    def _perform_request(self, action, path, data, query, headers):
        """Performs an HTTP action."""
        url = self.base_url + "/" + path
        req = Request(action, url, params=query, data=to_json(
            data), auth=self.auth, headers=headers)
        return self.session.send(self.session.prepare_request(req))

Methods

def check_new_version(self)
Expand source code
def check_new_version(self):
    response = get('https://pypi.org/pypi/faunadb/json')
    latest_version = response.json().get('info').get('version')

    if latest_version > pkg_version:
        msg1 = "New fauna version available {} => {}".format(
            pkg_version, latest_version)
        msg2 = "Changelog: https://github.com/fauna/faunadb-python/blob/main/CHANGELOG.md"
        width = 80
        print('+' + '-' * width + '+')
        print('| ' + msg1 + ' ' * (width - len(msg1) - 1) + '|')
        print('| ' + msg2 + ' ' * (width - len(msg2) - 1) + '|')
        print('+' + '-' * width + '+')
def get_last_txn_time(self)

Get the freshest timestamp reported to this client. :return:

Expand source code
def get_last_txn_time(self):
    """
    Get the freshest timestamp reported to this client.
    :return:
    """
    return self._last_txn_time.time
def get_query_timeout(self)

Get the query timeout for all queries.

Expand source code
def get_query_timeout(self):
    """
    Get the query timeout for all queries.
    """
    return self._query_timeout_ms
def new_session_client(self, secret, observer=None)

Create a new client from the existing config with a given secret. The returned client share its parent underlying resources.

:param secret: Credentials to use when sending requests. :param observer: Callback that will be passed a :any:RequestResult after every completed request. :return:

Expand source code
def new_session_client(self, secret, observer=None):
    """
    Create a new client from the existing config with a given secret.
    The returned client share its parent underlying resources.

    :param secret:
      Credentials to use when sending requests.
    :param observer:
      Callback that will be passed a :any:`RequestResult` after every completed request.
    :return:
    """
    if self.counter.get_and_increment() > 0:
        return FaunaClient(secret=secret,
                           domain=self.domain,
                           scheme=self.scheme,
                           port=self.port,
                           timeout=self.session.timeout,
                           observer=observer or self.observer,
                           session=self.session,
                           counter=self.counter,
                           pool_connections=self.pool_connections,
                           pool_maxsize=self.pool_maxsize,
                           last_txn_time=self._last_txn_time,
                           query_timeout_ms=self._query_timeout_ms)
    else:
        raise UnexpectedError(
            "Cannnot create a session client from a closed session", None)
def ping(self, scope=None, timeout=None)

Ping FaunaDB.

Expand source code
def ping(self, scope=None, timeout=None):
    """
    Ping FaunaDB.
    """
    return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout})
def query(self, expression, timeout_millis=None)

Use the FaunaDB query API.

:param expression: A query. See :doc:query for information on queries. :param timeout_millis: Query timeout in milliseconds. :return: Converted JSON response.

Expand source code
def query(self, expression, timeout_millis=None):
    """
    Use the FaunaDB query API.

    :param expression: A query. See :doc:`query` for information on queries.
    :param timeout_millis: Query timeout in milliseconds.
    :return: Converted JSON response.
    """
    return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)
def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None)

Creates a stream Subscription to the result of the given read-only expression. When executed.

The subscription returned by this method does not issue any requests until the subscription's start method is called. Make sure to subscribe to the events of interest, otherwise the received events are simply ignored.

:param expression: A read-only expression. :param options: Object that configures the stream subscription. E.g set fields to return :param on_start: Callback for the stream's start event. :param on_error: Callback for the stream's error event. :param on_version: Callback for the stream's version events. :param on_history: Callback for the stream's history_rewrite events.

Expand source code
def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None):
    """
    Creates a stream Subscription to the result of the given read-only expression. When
    executed.

    The subscription returned by this method does not issue any requests until
    the subscription's start method is called. Make sure to
    subscribe to the events of interest, otherwise the received events are simply
    ignored.

    :param expression:   A read-only expression.
    :param    options:   Object that configures the stream subscription. E.g set fields to return
    :param   on_start:   Callback for the stream's start event.
    :param   on_error:   Callback for the stream's error event.
    :param   on_version: Callback for the stream's version events.
    :param   on_history: Callback for the stream's history_rewrite events.
    """
    subscription = Subscription(self, expression, options)
    subscription.on('start', on_start)
    subscription.on('error', on_error)
    subscription.on('version', on_version)
    subscription.on('history_rewrite', on_history)
    return subscription
def sync_last_txn_time(self, new_txn_time)

Sync the freshest timestamp seen by this client.

This has no effect if staler than currently stored timestamp. WARNING: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.

:param new_txn_time: the new seen transaction time.

Expand source code
def sync_last_txn_time(self, new_txn_time):
    """
    Sync the freshest timestamp seen by this client.

    This has no effect if staler than currently stored timestamp.
    WARNING: This should be used only when coordinating timestamps across
            multiple clients. Moving the timestamp arbitrarily forward into
            the future will cause transactions to stall.

    :param new_txn_time: the new seen transaction time.
    """
    self._last_txn_time.update_txn_time(new_txn_time)
class HTTPBearerAuth (secret)

Creates a bearer base auth object

Expand source code
class HTTPBearerAuth(AuthBase):
    """Creates a bearer base auth object"""

    def auth_header(self):
        return "Bearer {}".format(self.secret)

    def __init__(self, secret):
        self.secret = secret

    def __eq__(self, other):
        return self.secret == getattr(other, 'secret', None)

    def __ne__(self, other):
        return not self == other

    def __call__(self, r):
        r.headers['Authorization'] = self.auth_header()
        return r

Ancestors

  • requests.auth.AuthBase

Methods

def auth_header(self)
Expand source code
def auth_header(self):
    return "Bearer {}".format(self.secret)
class RuntimeEnvHeader
Expand source code
class RuntimeEnvHeader:
    def __init__(self):
        self.pythonVersion = "{0}.{1}.{2}-{3}".format(*sys.version_info)
        self.driverVersion = pkg_version
        self.env = self.getRuntimeEnv()
        self.os = "{0}-{1}".format(platform.system(), platform.release())

    def getRuntimeEnv(self):
        env = [
            {
                "name": "Netlify",
                "check": lambda: "NETLIFY_IMAGES_CDN_DOMAIN" in os.environ
            },
            {
                "name": "Vercel",
                "check": lambda: "VERCEL" in os.environ
            },
            {
                "name": "Heroku",
                "check": lambda: "PATH" in os.environ and ".heroku" in os.environ["PATH"]
            },
            {
                "name": "AWS Lambda",
                "check": lambda: "AWS_LAMBDA_FUNCTION_VERSION" in os.environ
            },
            {
                "name": "GCP Cloud Functions",
                "check": lambda: "_" in os.environ and "google" in os.environ["_"]
            },
            {
                "name": "GCP Compute Instances",
                "check": lambda: "GOOGLE_CLOUD_PROJECT" in os.environ
            },
            {
                "name": "Azure Cloud Functions",
                "check": lambda: "WEBSITE_FUNCTIONS_AZUREMONITOR_CATEGORIES" in os.environ
            },
            {
                "name": "Azure Compute",
                "check": lambda: "ORYX_ENV_TYPE" in os.environ and "WEBSITE_INSTANCE_ID" in os.environ and os.environ["ORYX_ENV_TYPE"] == "AppService"
            }
        ]

        try:
            recognized = next(e for e in env if e.get("check")())
            if recognized is not None:
                return recognized.get("name")
        except:
            return "Unknown"

    def __str__(self):
        return "driver=python-{0}; runtime=python-{1} env={2}; os={3}".format(
            self.driverVersion, self.pythonVersion,
            self.env, self.os
        ).lower()

Methods

def getRuntimeEnv(self)
Expand source code
def getRuntimeEnv(self):
    env = [
        {
            "name": "Netlify",
            "check": lambda: "NETLIFY_IMAGES_CDN_DOMAIN" in os.environ
        },
        {
            "name": "Vercel",
            "check": lambda: "VERCEL" in os.environ
        },
        {
            "name": "Heroku",
            "check": lambda: "PATH" in os.environ and ".heroku" in os.environ["PATH"]
        },
        {
            "name": "AWS Lambda",
            "check": lambda: "AWS_LAMBDA_FUNCTION_VERSION" in os.environ
        },
        {
            "name": "GCP Cloud Functions",
            "check": lambda: "_" in os.environ and "google" in os.environ["_"]
        },
        {
            "name": "GCP Compute Instances",
            "check": lambda: "GOOGLE_CLOUD_PROJECT" in os.environ
        },
        {
            "name": "Azure Cloud Functions",
            "check": lambda: "WEBSITE_FUNCTIONS_AZUREMONITOR_CATEGORIES" in os.environ
        },
        {
            "name": "Azure Compute",
            "check": lambda: "ORYX_ENV_TYPE" in os.environ and "WEBSITE_INSTANCE_ID" in os.environ and os.environ["ORYX_ENV_TYPE"] == "AppService"
        }
    ]

    try:
        recognized = next(e for e in env if e.get("check")())
        if recognized is not None:
            return recognized.get("name")
    except:
        return "Unknown"