Module faunadb.client
Expand source code
from time import time
# pylint: disable=redefined-builtin
from builtins import object
import threading
from requests import Request, Session
from requests.auth import HTTPBasicAuth
from requests.adapters import HTTPAdapter
from faunadb.errors import _get_or_raise, FaunaError, UnexpectedError
from faunadb.query import _wrap
from faunadb.request_result import RequestResult
from faunadb._json import parse_json_or_none, to_json
from faunadb.streams import Subscription
API_VERSION = "4"
class _LastTxnTime(object):
"""Wraps tracking the last transaction time supplied from the database."""
def __init__(self):
self._lock = threading.Lock()
self._time = None
@property
def time(self):
"""Produces the last transaction time, or, None if not yet updated."""
with self._lock:
return self._time
@property
def request_header(self):
"""Produces a dictionary with a non-zero `X-Last-Txn-Time` header; or,
if one has not yet been set, the empty header dictionary."""
t = self.time
if t is None:
return {}
return { "X-Last-Txn-Time" : str(t) }
def update_txn_time(self, new_txn_time):
"""Updates the internal transaction time.
In order to maintain a monotonically-increasing value, `newTxnTime`
is discarded if it is behind the current timestamp."""
with self._lock:
if self._time is None:
self._time = new_txn_time
else:
self._time = max(self._time, new_txn_time)
class _Counter(object):
def __init__(self, init_value=0):
self.lock = threading.Lock()
self.counter = init_value
def __str__(self):
return "Counter(%s)" % self.counter
def get_and_increment(self):
with self.lock:
counter = self.counter
self.counter += 1
return counter
def decrement(self):
with self.lock:
self.counter -= 1
return self.counter
class FaunaClient(object):
"""
Directly communicates with FaunaDB via JSON.
For data sent to the server, the ``to_fauna_json`` method will be called on any values.
It is encouraged to pass e.g. :any:`Ref` objects instead of raw JSON data.
All methods return a converted JSON response.
This is a dict containing lists, ints, floats, strings, and other dicts.
Any :any:`Ref`, :any:`SetRef`, :any:`FaunaTime`, or :class:`datetime.date`
values in it will also be parsed.
(So instead of ``{"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}``,
you will get ``Ref("frogs", Native.CLASSES)``.)
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes
def __init__(
self,
secret,
domain="db.fauna.com",
scheme="https",
port=None,
timeout=60,
observer=None,
pool_connections=10,
pool_maxsize=10,
**kwargs):
"""
:param secret:
Auth token for the FaunaDB server.
:param domain:
Base URL for the FaunaDB server.
:param scheme:
``"http"`` or ``"https"``.
:param port:
Port of the FaunaDB server.
:param timeout:
Read timeout in seconds.
:param observer:
Callback that will be passed a :any:`RequestResult` after every completed request.
:param pool_connections:
The number of connection pools to cache.
:param pool_maxsize:
The maximum number of connections to save in the pool.
"""
self.domain = domain
self.scheme = scheme
self.port = (443 if scheme == "https" else 80) if port is None else port
self.auth = HTTPBasicAuth(secret, "")
self.base_url = "%s://%s:%s" % (self.scheme, self.domain, self.port)
self.observer = observer
self.pool_connections = pool_connections
self.pool_maxsize = pool_maxsize
self._last_txn_time = kwargs.get('last_txn_time') or _LastTxnTime()
self._query_timeout_ms = kwargs.get('query_timeout_ms')
if self._query_timeout_ms is not None:
self._query_timeout_ms = int(self._query_timeout_ms)
if ('session' not in kwargs) or ('counter' not in kwargs):
self.session = Session()
self.session.mount('https://', HTTPAdapter(pool_connections=pool_connections,
pool_maxsize=pool_maxsize))
self.session.mount('http://', HTTPAdapter(pool_connections=pool_connections,
pool_maxsize=pool_maxsize))
self.counter = _Counter(1)
self.session.headers.update({
"Accept-Encoding": "gzip",
"Content-Type": "application/json;charset=utf-8",
"X-Fauna-Driver": "python",
"X-FaunaDB-API-Version": API_VERSION
})
if self._query_timeout_ms is not None:
self.session.headers["X-Query-Timeout"] = str(self._query_timeout_ms)
self.session.timeout = timeout
else:
self.session = kwargs['session']
self.counter = kwargs['counter']
def sync_last_txn_time(self, new_txn_time):
"""
Sync the freshest timestamp seen by this client.
This has no effect if staler than currently stored timestamp.
WARNING: This should be used only when coordinating timestamps across
multiple clients. Moving the timestamp arbitrarily forward into
the future will cause transactions to stall.
:param new_txn_time: the new seen transaction time.
"""
self._last_txn_time.update_txn_time(new_txn_time)
def get_last_txn_time(self):
"""
Get the freshest timestamp reported to this client.
:return:
"""
return self._last_txn_time.time
def get_query_timeout(self):
"""
Get the query timeout for all queries.
"""
return self._query_timeout_ms
def __del__(self):
if self.counter.decrement() == 0:
self.session.close()
def query(self, expression, timeout_millis=None):
"""
Use the FaunaDB query API.
:param expression: A query. See :doc:`query` for information on queries.
:param timeout_millis: Query timeout in milliseconds.
:return: Converted JSON response.
"""
return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)
def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None):
"""
Creates a stream Subscription to the result of the given read-only expression. When
executed.
The subscription returned by this method does not issue any requests until
the subscription's start method is called. Make sure to
subscribe to the events of interest, otherwise the received events are simply
ignored.
:param expression: A read-only expression.
:param options: Object that configures the stream subscription. E.g set fields to return
:param on_start: Callback for the stream's start event.
:param on_error: Callback for the stream's error event.
:param on_version: Callback for the stream's version events.
:param on_history: Callback for the stream's history_rewrite events.
"""
subscription = Subscription(self, expression, options)
subscription.on('start', on_start)
subscription.on('error', on_error)
subscription.on('version', on_version)
subscription.on('history_rewrite', on_history)
return subscription
def ping(self, scope=None, timeout=None):
"""
Ping FaunaDB.
"""
return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout})
def new_session_client(self, secret, observer=None):
"""
Create a new client from the existing config with a given secret.
The returned client share its parent underlying resources.
:param secret:
Credentials to use when sending requests.
:param observer:
Callback that will be passed a :any:`RequestResult` after every completed request.
:return:
"""
if self.counter.get_and_increment() > 0:
return FaunaClient(secret=secret,
domain=self.domain,
scheme=self.scheme,
port=self.port,
timeout=self.session.timeout,
observer=observer or self.observer,
session=self.session,
counter=self.counter,
pool_connections=self.pool_connections,
pool_maxsize=self.pool_maxsize,
last_txn_time=self._last_txn_time,
query_timeout_ms=self._query_timeout_ms)
else:
raise UnexpectedError("Cannnot create a session client from a closed session", None)
def _execute(self, action, path, data=None, query=None, with_txn_time=False, query_timeout_ms=None):
"""Performs an HTTP action, logs it, and looks for errors."""
if query is not None:
query = {k: v for k, v in query.items() if v is not None}
headers = {}
if query_timeout_ms is not None:
headers["X-Query-Timeout"] = str(query_timeout_ms)
if with_txn_time:
headers.update(self._last_txn_time.request_header)
start_time = time()
response = self._perform_request(action, path, data, query, headers)
end_time = time()
if with_txn_time:
if "X-Txn-Time" in response.headers:
new_txn_time = int(response.headers["X-Txn-Time"])
self.sync_last_txn_time(new_txn_time)
response_raw = response.text
response_content = parse_json_or_none(response_raw)
request_result = RequestResult(
action, path, query, data,
response_raw, response_content, response.status_code, response.headers,
start_time, end_time)
if self.observer is not None:
self.observer(request_result)
if response_content is None:
raise UnexpectedError("Invalid JSON.", request_result)
FaunaError.raise_for_status_code(request_result)
return _get_or_raise(request_result, response_content, "resource")
def _perform_request(self, action, path, data, query, headers):
"""Performs an HTTP action."""
url = self.base_url + "/" + path
req = Request(action, url, params=query, data=to_json(data), auth=self.auth, headers=headers)
return self.session.send(self.session.prepare_request(req))
def _auth_header(self):
"""Returns the HTTP authentication header"""
return "Bearer {}".format(self.auth.username)
Classes
class FaunaClient (secret, domain='db.fauna.com', scheme='https', port=None, timeout=60, observer=None, pool_connections=10, pool_maxsize=10, **kwargs)
-
Directly communicates with FaunaDB via JSON.
For data sent to the server, the
to_fauna_json
method will be called on any values. It is encouraged to pass e.g. :any:Ref
objects instead of raw JSON data.All methods return a converted JSON response. This is a dict containing lists, ints, floats, strings, and other dicts. Any :any:
Ref
, :any:SetRef
, :any:FaunaTime
, or :class:datetime.date
values in it will also be parsed. (So instead of{"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}
, you will getRef("frogs", Native.CLASSES)
.):param secret: Auth token for the FaunaDB server. :param domain: Base URL for the FaunaDB server. :param scheme:
"http"
or"https"
. :param port: Port of the FaunaDB server. :param timeout: Read timeout in seconds. :param observer: Callback that will be passed a :any:RequestResult
after every completed request. :param pool_connections: The number of connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool.Expand source code
class FaunaClient(object): """ Directly communicates with FaunaDB via JSON. For data sent to the server, the ``to_fauna_json`` method will be called on any values. It is encouraged to pass e.g. :any:`Ref` objects instead of raw JSON data. All methods return a converted JSON response. This is a dict containing lists, ints, floats, strings, and other dicts. Any :any:`Ref`, :any:`SetRef`, :any:`FaunaTime`, or :class:`datetime.date` values in it will also be parsed. (So instead of ``{"@ref": {"id": "frogs", "class": {"@ref": {"id": "classes"}}}}``, you will get ``Ref("frogs", Native.CLASSES)``.) """ # pylint: disable=too-many-arguments, too-many-instance-attributes def __init__( self, secret, domain="db.fauna.com", scheme="https", port=None, timeout=60, observer=None, pool_connections=10, pool_maxsize=10, **kwargs): """ :param secret: Auth token for the FaunaDB server. :param domain: Base URL for the FaunaDB server. :param scheme: ``"http"`` or ``"https"``. :param port: Port of the FaunaDB server. :param timeout: Read timeout in seconds. :param observer: Callback that will be passed a :any:`RequestResult` after every completed request. :param pool_connections: The number of connection pools to cache. :param pool_maxsize: The maximum number of connections to save in the pool. """ self.domain = domain self.scheme = scheme self.port = (443 if scheme == "https" else 80) if port is None else port self.auth = HTTPBasicAuth(secret, "") self.base_url = "%s://%s:%s" % (self.scheme, self.domain, self.port) self.observer = observer self.pool_connections = pool_connections self.pool_maxsize = pool_maxsize self._last_txn_time = kwargs.get('last_txn_time') or _LastTxnTime() self._query_timeout_ms = kwargs.get('query_timeout_ms') if self._query_timeout_ms is not None: self._query_timeout_ms = int(self._query_timeout_ms) if ('session' not in kwargs) or ('counter' not in kwargs): self.session = Session() self.session.mount('https://', HTTPAdapter(pool_connections=pool_connections, pool_maxsize=pool_maxsize)) self.session.mount('http://', HTTPAdapter(pool_connections=pool_connections, pool_maxsize=pool_maxsize)) self.counter = _Counter(1) self.session.headers.update({ "Accept-Encoding": "gzip", "Content-Type": "application/json;charset=utf-8", "X-Fauna-Driver": "python", "X-FaunaDB-API-Version": API_VERSION }) if self._query_timeout_ms is not None: self.session.headers["X-Query-Timeout"] = str(self._query_timeout_ms) self.session.timeout = timeout else: self.session = kwargs['session'] self.counter = kwargs['counter'] def sync_last_txn_time(self, new_txn_time): """ Sync the freshest timestamp seen by this client. This has no effect if staler than currently stored timestamp. WARNING: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall. :param new_txn_time: the new seen transaction time. """ self._last_txn_time.update_txn_time(new_txn_time) def get_last_txn_time(self): """ Get the freshest timestamp reported to this client. :return: """ return self._last_txn_time.time def get_query_timeout(self): """ Get the query timeout for all queries. """ return self._query_timeout_ms def __del__(self): if self.counter.decrement() == 0: self.session.close() def query(self, expression, timeout_millis=None): """ Use the FaunaDB query API. :param expression: A query. See :doc:`query` for information on queries. :param timeout_millis: Query timeout in milliseconds. :return: Converted JSON response. """ return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis) def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None): """ Creates a stream Subscription to the result of the given read-only expression. When executed. The subscription returned by this method does not issue any requests until the subscription's start method is called. Make sure to subscribe to the events of interest, otherwise the received events are simply ignored. :param expression: A read-only expression. :param options: Object that configures the stream subscription. E.g set fields to return :param on_start: Callback for the stream's start event. :param on_error: Callback for the stream's error event. :param on_version: Callback for the stream's version events. :param on_history: Callback for the stream's history_rewrite events. """ subscription = Subscription(self, expression, options) subscription.on('start', on_start) subscription.on('error', on_error) subscription.on('version', on_version) subscription.on('history_rewrite', on_history) return subscription def ping(self, scope=None, timeout=None): """ Ping FaunaDB. """ return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout}) def new_session_client(self, secret, observer=None): """ Create a new client from the existing config with a given secret. The returned client share its parent underlying resources. :param secret: Credentials to use when sending requests. :param observer: Callback that will be passed a :any:`RequestResult` after every completed request. :return: """ if self.counter.get_and_increment() > 0: return FaunaClient(secret=secret, domain=self.domain, scheme=self.scheme, port=self.port, timeout=self.session.timeout, observer=observer or self.observer, session=self.session, counter=self.counter, pool_connections=self.pool_connections, pool_maxsize=self.pool_maxsize, last_txn_time=self._last_txn_time, query_timeout_ms=self._query_timeout_ms) else: raise UnexpectedError("Cannnot create a session client from a closed session", None) def _execute(self, action, path, data=None, query=None, with_txn_time=False, query_timeout_ms=None): """Performs an HTTP action, logs it, and looks for errors.""" if query is not None: query = {k: v for k, v in query.items() if v is not None} headers = {} if query_timeout_ms is not None: headers["X-Query-Timeout"] = str(query_timeout_ms) if with_txn_time: headers.update(self._last_txn_time.request_header) start_time = time() response = self._perform_request(action, path, data, query, headers) end_time = time() if with_txn_time: if "X-Txn-Time" in response.headers: new_txn_time = int(response.headers["X-Txn-Time"]) self.sync_last_txn_time(new_txn_time) response_raw = response.text response_content = parse_json_or_none(response_raw) request_result = RequestResult( action, path, query, data, response_raw, response_content, response.status_code, response.headers, start_time, end_time) if self.observer is not None: self.observer(request_result) if response_content is None: raise UnexpectedError("Invalid JSON.", request_result) FaunaError.raise_for_status_code(request_result) return _get_or_raise(request_result, response_content, "resource") def _perform_request(self, action, path, data, query, headers): """Performs an HTTP action.""" url = self.base_url + "/" + path req = Request(action, url, params=query, data=to_json(data), auth=self.auth, headers=headers) return self.session.send(self.session.prepare_request(req)) def _auth_header(self): """Returns the HTTP authentication header""" return "Bearer {}".format(self.auth.username)
Methods
def get_last_txn_time(self)
-
Get the freshest timestamp reported to this client. :return:
Expand source code
def get_last_txn_time(self): """ Get the freshest timestamp reported to this client. :return: """ return self._last_txn_time.time
def get_query_timeout(self)
-
Get the query timeout for all queries.
Expand source code
def get_query_timeout(self): """ Get the query timeout for all queries. """ return self._query_timeout_ms
def new_session_client(self, secret, observer=None)
-
Create a new client from the existing config with a given secret. The returned client share its parent underlying resources.
:param secret: Credentials to use when sending requests. :param observer: Callback that will be passed a :any:
RequestResult
after every completed request. :return:Expand source code
def new_session_client(self, secret, observer=None): """ Create a new client from the existing config with a given secret. The returned client share its parent underlying resources. :param secret: Credentials to use when sending requests. :param observer: Callback that will be passed a :any:`RequestResult` after every completed request. :return: """ if self.counter.get_and_increment() > 0: return FaunaClient(secret=secret, domain=self.domain, scheme=self.scheme, port=self.port, timeout=self.session.timeout, observer=observer or self.observer, session=self.session, counter=self.counter, pool_connections=self.pool_connections, pool_maxsize=self.pool_maxsize, last_txn_time=self._last_txn_time, query_timeout_ms=self._query_timeout_ms) else: raise UnexpectedError("Cannnot create a session client from a closed session", None)
def ping(self, scope=None, timeout=None)
-
Ping FaunaDB.
Expand source code
def ping(self, scope=None, timeout=None): """ Ping FaunaDB. """ return self._execute("GET", "ping", query={"scope": scope, "timeout": timeout})
def query(self, expression, timeout_millis=None)
-
Use the FaunaDB query API.
:param expression: A query. See :doc:
query
for information on queries. :param timeout_millis: Query timeout in milliseconds. :return: Converted JSON response.Expand source code
def query(self, expression, timeout_millis=None): """ Use the FaunaDB query API. :param expression: A query. See :doc:`query` for information on queries. :param timeout_millis: Query timeout in milliseconds. :return: Converted JSON response. """ return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)
def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None)
-
Creates a stream Subscription to the result of the given read-only expression. When executed.
The subscription returned by this method does not issue any requests until the subscription's start method is called. Make sure to subscribe to the events of interest, otherwise the received events are simply ignored.
:param expression: A read-only expression. :param options: Object that configures the stream subscription. E.g set fields to return :param on_start: Callback for the stream's start event. :param on_error: Callback for the stream's error event. :param on_version: Callback for the stream's version events. :param on_history: Callback for the stream's history_rewrite events.
Expand source code
def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None): """ Creates a stream Subscription to the result of the given read-only expression. When executed. The subscription returned by this method does not issue any requests until the subscription's start method is called. Make sure to subscribe to the events of interest, otherwise the received events are simply ignored. :param expression: A read-only expression. :param options: Object that configures the stream subscription. E.g set fields to return :param on_start: Callback for the stream's start event. :param on_error: Callback for the stream's error event. :param on_version: Callback for the stream's version events. :param on_history: Callback for the stream's history_rewrite events. """ subscription = Subscription(self, expression, options) subscription.on('start', on_start) subscription.on('error', on_error) subscription.on('version', on_version) subscription.on('history_rewrite', on_history) return subscription
def sync_last_txn_time(self, new_txn_time)
-
Sync the freshest timestamp seen by this client.
This has no effect if staler than currently stored timestamp. WARNING: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall.
:param new_txn_time: the new seen transaction time.
Expand source code
def sync_last_txn_time(self, new_txn_time): """ Sync the freshest timestamp seen by this client. This has no effect if staler than currently stored timestamp. WARNING: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall. :param new_txn_time: the new seen transaction time. """ self._last_txn_time.update_txn_time(new_txn_time)