Warning:
Fauna is decommissioning FQL v4 on June 30, 2025.

This driver is not compatible with FQL v10, the latest version. Fauna accounts created after August 21, 2024 must use FQL v10.
Ensure you migrate existing projects to the official v10 driver by the v4 EOL date: https://github.com/fauna/fauna-python.

For more information, see the v4 end of life (EOL) announcement and related FAQ.

Module faunadb.streams.client

Expand source code
from time import time

try:
    #python2
    from urllib import urlencode
except ImportError:
    #python3
    from urllib.parse import urlencode

from hyper import HTTP20Connection
from faunadb._json import to_json, parse_json_or_none
from faunadb.request_result import RequestResult
from .events import parse_stream_request_result_or_none, Error
from .errors import StreamError

VALID_FIELDS = {"diff", "prev", "document", "action"}


class Connection(object):
    """
    The internal stream client connection interface.
    This class handles the network side of a stream
    subscription.

    Current limitations:
    Python requests module uses HTTP1; hyper is used for HTTP/2
    """
    def __init__(self, client, expression, options):
        self._client = client
        self.options = options
        self.conn = None
        self._fields = None
        if isinstance(self.options, dict):
            self._fields = self.options.get("fields", None)
        elif hasattr(self.options, "fields"):
            self._fields = self.options.field
        if isinstance(self._fields, list):
            union = set(self._fields).union(VALID_FIELDS)
            if union != VALID_FIELDS:
                raise Exception("Valid fields options are %s, provided %s."%(VALID_FIELDS, self._fields))
        self._state = "idle"
        self._query = expression
        self._data = to_json(expression).encode()
        try:
            self.conn = HTTP20Connection(
                    self._client.domain, port=self._client.port, enable_push=True)
        except Exception as e:
            raise StreamError(e)

    def close(self):
        """
        Closes the stream subscription by aborting its underlying http request.
        """
        if self.conn is None:
            raise StreamError('Cannot close inactive stream subscription.')
        self.conn.close()
        self._state = 'closed'

    def subscribe(self, on_event):
        """Initiates the stream subscription."""
        if self._state != "idle":
            raise StreamError('Stream subscription already started.')
        try:
            self._state = 'connecting'
            headers = self._client.session.headers
            headers["Authorization"] = self._client._auth_header()
            if self._client._query_timeout_ms is not None:
                    headers["X-Query-Timeout"] = str(self._client._query_timeout_ms)
            headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
            start_time = time()
            url_params = ''
            if isinstance(self._fields, list):
                url_params= "?%s"%(urlencode({'fields': ",".join(self._fields)}))
            id = self.conn.request("POST", "/stream%s"%(url_params), body=self._data, headers=headers)
            self._state = 'open'
            self._event_loop(id, on_event, start_time)
        except Exception as e:
            if callable(on_event):
                on_event(Error(e), None)

    def _event_loop(self, stream_id, on_event, start_time):
        """ Event loop for the stream. """
        response = self.conn.get_response(stream_id)
        if 'x-txn-time' in response.headers:
            self._client.sync_last_txn_time(int(response.headers['x-txn-time'][0].decode()))
        try:
            for push in response.read_chunked():  # all pushes promised before response headers
                    raw = push.decode()
                    request_result = self._stream_chunk_to_request_result(response, raw, start_time, time())
                    event = parse_stream_request_result_or_none(request_result)
                    if event is not None and hasattr(event, 'txn'):
                        self._client.sync_last_txn_time(int(event.txn))
                    on_event(event, request_result)
                    if self._client.observer is not None:
                        self._client.observer(request_result)
        except Exception as e:
            self.error = e
            self.close()
            on_event(Error(e), None)

    def _stream_chunk_to_request_result(self, response, raw_chunk, start_time, end_time):
        """ Converts a stream chunk to a RequestResult. """
        response_content = parse_json_or_none(raw_chunk)
        return RequestResult(
            "POST", "/stream", self._query, self._data,
            raw_chunk, response_content, None, response.headers,
            start_time, end_time)

Classes

class Connection (client, expression, options)

The internal stream client connection interface. This class handles the network side of a stream subscription.

Current limitations: Python requests module uses HTTP1; hyper is used for HTTP/2

Expand source code
class Connection(object):
    """
    The internal stream client connection interface.
    This class handles the network side of a stream
    subscription.

    Current limitations:
    Python requests module uses HTTP1; hyper is used for HTTP/2
    """
    def __init__(self, client, expression, options):
        self._client = client
        self.options = options
        self.conn = None
        self._fields = None
        if isinstance(self.options, dict):
            self._fields = self.options.get("fields", None)
        elif hasattr(self.options, "fields"):
            self._fields = self.options.field
        if isinstance(self._fields, list):
            union = set(self._fields).union(VALID_FIELDS)
            if union != VALID_FIELDS:
                raise Exception("Valid fields options are %s, provided %s."%(VALID_FIELDS, self._fields))
        self._state = "idle"
        self._query = expression
        self._data = to_json(expression).encode()
        try:
            self.conn = HTTP20Connection(
                    self._client.domain, port=self._client.port, enable_push=True)
        except Exception as e:
            raise StreamError(e)

    def close(self):
        """
        Closes the stream subscription by aborting its underlying http request.
        """
        if self.conn is None:
            raise StreamError('Cannot close inactive stream subscription.')
        self.conn.close()
        self._state = 'closed'

    def subscribe(self, on_event):
        """Initiates the stream subscription."""
        if self._state != "idle":
            raise StreamError('Stream subscription already started.')
        try:
            self._state = 'connecting'
            headers = self._client.session.headers
            headers["Authorization"] = self._client._auth_header()
            if self._client._query_timeout_ms is not None:
                    headers["X-Query-Timeout"] = str(self._client._query_timeout_ms)
            headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
            start_time = time()
            url_params = ''
            if isinstance(self._fields, list):
                url_params= "?%s"%(urlencode({'fields': ",".join(self._fields)}))
            id = self.conn.request("POST", "/stream%s"%(url_params), body=self._data, headers=headers)
            self._state = 'open'
            self._event_loop(id, on_event, start_time)
        except Exception as e:
            if callable(on_event):
                on_event(Error(e), None)

    def _event_loop(self, stream_id, on_event, start_time):
        """ Event loop for the stream. """
        response = self.conn.get_response(stream_id)
        if 'x-txn-time' in response.headers:
            self._client.sync_last_txn_time(int(response.headers['x-txn-time'][0].decode()))
        try:
            for push in response.read_chunked():  # all pushes promised before response headers
                    raw = push.decode()
                    request_result = self._stream_chunk_to_request_result(response, raw, start_time, time())
                    event = parse_stream_request_result_or_none(request_result)
                    if event is not None and hasattr(event, 'txn'):
                        self._client.sync_last_txn_time(int(event.txn))
                    on_event(event, request_result)
                    if self._client.observer is not None:
                        self._client.observer(request_result)
        except Exception as e:
            self.error = e
            self.close()
            on_event(Error(e), None)

    def _stream_chunk_to_request_result(self, response, raw_chunk, start_time, end_time):
        """ Converts a stream chunk to a RequestResult. """
        response_content = parse_json_or_none(raw_chunk)
        return RequestResult(
            "POST", "/stream", self._query, self._data,
            raw_chunk, response_content, None, response.headers,
            start_time, end_time)

Methods

def close(self)

Closes the stream subscription by aborting its underlying http request.

Expand source code
def close(self):
    """
    Closes the stream subscription by aborting its underlying http request.
    """
    if self.conn is None:
        raise StreamError('Cannot close inactive stream subscription.')
    self.conn.close()
    self._state = 'closed'
def subscribe(self, on_event)

Initiates the stream subscription.

Expand source code
def subscribe(self, on_event):
    """Initiates the stream subscription."""
    if self._state != "idle":
        raise StreamError('Stream subscription already started.')
    try:
        self._state = 'connecting'
        headers = self._client.session.headers
        headers["Authorization"] = self._client._auth_header()
        if self._client._query_timeout_ms is not None:
                headers["X-Query-Timeout"] = str(self._client._query_timeout_ms)
        headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
        start_time = time()
        url_params = ''
        if isinstance(self._fields, list):
            url_params= "?%s"%(urlencode({'fields': ",".join(self._fields)}))
        id = self.conn.request("POST", "/stream%s"%(url_params), body=self._data, headers=headers)
        self._state = 'open'
        self._event_loop(id, on_event, start_time)
    except Exception as e:
        if callable(on_event):
            on_event(Error(e), None)