Module faunadb.streams.client
Expand source code
from time import time
try:
# python2
from urllib import urlencode
except ImportError:
# python3
from urllib.parse import urlencode
from faunadb._json import parse_json_or_none, stream_content_to_json, to_json
from faunadb.request_result import RequestResult
import httpx
from .errors import StreamError
from .events import Error, parse_stream_request_result_or_none
VALID_FIELDS = {"diff", "prev", "document", "action"}
class Connection(object):
"""
The internal stream client connection interface.
This class handles the network side of a stream
subscription.
Current limitations:
Python requests module uses HTTP1; httpx is used for HTTP/2
"""
def __init__(self, client, expression, options):
self._client = client
self.options = options
self._fields = None
if isinstance(self.options, dict):
self._fields = self.options.get("fields", None)
elif hasattr(self.options, "fields"):
self._fields = self.options.field
if isinstance(self._fields, list):
union = set(self._fields).union(VALID_FIELDS)
if union != VALID_FIELDS:
raise Exception("Valid fields options are %s, provided %s." % (
VALID_FIELDS, self._fields))
self._state = "idle"
self._query = expression
self._data = to_json(expression).encode()
def close(self):
"""
Closes the stream subscription by aborting its underlying http request.
"""
if self._state == 'closed':
raise StreamError('Cannot close inactive stream subscription.')
self._state = 'closed'
def subscribe(self, on_event):
"""Initiates the stream subscription."""
if self._state != "idle":
raise StreamError('Stream subscription already started.')
try:
base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}"
timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10)
conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout)
except Exception as error_msg:
raise StreamError(error_msg)
try:
self._state = 'connecting'
headers = self._client.session.headers
headers["Authorization"] = self._client.auth.auth_header()
if self._client._query_timeout_ms is not None:
headers["X-Query-Timeout"] = str(
self._client._query_timeout_ms)
headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
start_time = time()
url_params = ''
if isinstance(self._fields, list):
url_params = "?%s" % (
urlencode({'fields': ",".join(self._fields)}))
stream_id = conn.stream("POST", "/stream%s" %
(url_params), content=self._data, headers=dict(headers))
self._state = 'open'
self._event_loop(stream_id, on_event, start_time)
except Exception as error_msg:
if callable(on_event):
on_event(Error(error_msg), None)
finally:
conn.close()
def _event_loop(self, stream_id, on_event, start_time):
""" Event loop for the stream. """
with stream_id as response:
if 'x-txn-time' in response.headers:
self._client.sync_last_txn_time(int(response.headers['x-txn-time']))
try:
buffer = ''
for push in response.iter_bytes():
try:
chunk = push.decode()
buffer += chunk
except:
continue
result = stream_content_to_json(buffer)
buffer = result["buffer"]
for value in result["values"]:
request_result = self._stream_chunk_to_request_result(
response, value["raw"], value["content"], start_time, time())
event = parse_stream_request_result_or_none(request_result)
if event is not None and hasattr(event, 'txn'):
self._client.sync_last_txn_time(int(event.txn))
on_event(event, request_result)
if self._client.observer is not None:
self._client.observer(request_result)
if self._state == 'closed':
break
except Exception as error_msg:
self.error = error_msg
on_event(Error(error_msg), None)
def _stream_chunk_to_request_result(self, response, raw, content, start_time, end_time):
""" Converts a stream chunk to a RequestResult. """
return RequestResult(
"POST", "/stream", self._query, self._data,
raw, content, None, response.headers,
start_time, end_time)
Classes
class Connection (client, expression, options)
-
The internal stream client connection interface. This class handles the network side of a stream subscription.
Current limitations: Python requests module uses HTTP1; httpx is used for HTTP/2
Expand source code
class Connection(object): """ The internal stream client connection interface. This class handles the network side of a stream subscription. Current limitations: Python requests module uses HTTP1; httpx is used for HTTP/2 """ def __init__(self, client, expression, options): self._client = client self.options = options self._fields = None if isinstance(self.options, dict): self._fields = self.options.get("fields", None) elif hasattr(self.options, "fields"): self._fields = self.options.field if isinstance(self._fields, list): union = set(self._fields).union(VALID_FIELDS) if union != VALID_FIELDS: raise Exception("Valid fields options are %s, provided %s." % ( VALID_FIELDS, self._fields)) self._state = "idle" self._query = expression self._data = to_json(expression).encode() def close(self): """ Closes the stream subscription by aborting its underlying http request. """ if self._state == 'closed': raise StreamError('Cannot close inactive stream subscription.') self._state = 'closed' def subscribe(self, on_event): """Initiates the stream subscription.""" if self._state != "idle": raise StreamError('Stream subscription already started.') try: base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}" timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10) conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout) except Exception as error_msg: raise StreamError(error_msg) try: self._state = 'connecting' headers = self._client.session.headers headers["Authorization"] = self._client.auth.auth_header() if self._client._query_timeout_ms is not None: headers["X-Query-Timeout"] = str( self._client._query_timeout_ms) headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time()) start_time = time() url_params = '' if isinstance(self._fields, list): url_params = "?%s" % ( urlencode({'fields': ",".join(self._fields)})) stream_id = conn.stream("POST", "/stream%s" % (url_params), content=self._data, headers=dict(headers)) self._state = 'open' self._event_loop(stream_id, on_event, start_time) except Exception as error_msg: if callable(on_event): on_event(Error(error_msg), None) finally: conn.close() def _event_loop(self, stream_id, on_event, start_time): """ Event loop for the stream. """ with stream_id as response: if 'x-txn-time' in response.headers: self._client.sync_last_txn_time(int(response.headers['x-txn-time'])) try: buffer = '' for push in response.iter_bytes(): try: chunk = push.decode() buffer += chunk except: continue result = stream_content_to_json(buffer) buffer = result["buffer"] for value in result["values"]: request_result = self._stream_chunk_to_request_result( response, value["raw"], value["content"], start_time, time()) event = parse_stream_request_result_or_none(request_result) if event is not None and hasattr(event, 'txn'): self._client.sync_last_txn_time(int(event.txn)) on_event(event, request_result) if self._client.observer is not None: self._client.observer(request_result) if self._state == 'closed': break except Exception as error_msg: self.error = error_msg on_event(Error(error_msg), None) def _stream_chunk_to_request_result(self, response, raw, content, start_time, end_time): """ Converts a stream chunk to a RequestResult. """ return RequestResult( "POST", "/stream", self._query, self._data, raw, content, None, response.headers, start_time, end_time)
Methods
def close(self)
-
Closes the stream subscription by aborting its underlying http request.
Expand source code
def close(self): """ Closes the stream subscription by aborting its underlying http request. """ if self._state == 'closed': raise StreamError('Cannot close inactive stream subscription.') self._state = 'closed'
def subscribe(self, on_event)
-
Initiates the stream subscription.
Expand source code
def subscribe(self, on_event): """Initiates the stream subscription.""" if self._state != "idle": raise StreamError('Stream subscription already started.') try: base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}" timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10) conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout) except Exception as error_msg: raise StreamError(error_msg) try: self._state = 'connecting' headers = self._client.session.headers headers["Authorization"] = self._client.auth.auth_header() if self._client._query_timeout_ms is not None: headers["X-Query-Timeout"] = str( self._client._query_timeout_ms) headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time()) start_time = time() url_params = '' if isinstance(self._fields, list): url_params = "?%s" % ( urlencode({'fields': ",".join(self._fields)})) stream_id = conn.stream("POST", "/stream%s" % (url_params), content=self._data, headers=dict(headers)) self._state = 'open' self._event_loop(stream_id, on_event, start_time) except Exception as error_msg: if callable(on_event): on_event(Error(error_msg), None) finally: conn.close()