Warning:
Fauna is decommissioning FQL v4 on June 30, 2025.

This driver is not compatible with FQL v10, the latest version. Fauna accounts created after August 21, 2024 must use FQL v10.
Ensure you migrate existing projects to the official v10 driver by the v4 EOL date: https://github.com/fauna/fauna-python.

For more information, see the v4 end of life (EOL) announcement and related FAQ.

Module faunadb.streams.client

Expand source code
from time import time

try:
    # python2
    from urllib import urlencode
except ImportError:
    # python3
    from urllib.parse import urlencode

from faunadb._json import parse_json_or_none, stream_content_to_json, to_json
from faunadb.request_result import RequestResult
import httpx

from .errors import StreamError
from .events import Error, parse_stream_request_result_or_none

VALID_FIELDS = {"diff", "prev", "document", "action"}


class Connection(object):
    """
    The internal stream client connection interface.
    This class handles the network side of a stream
    subscription.

    Current limitations:
    Python requests module uses HTTP1; httpx is used for HTTP/2
    """

    def __init__(self, client, expression, options):
        self._client = client
        self.options = options
        self._fields = None
        if isinstance(self.options, dict):
            self._fields = self.options.get("fields", None)
        elif hasattr(self.options, "fields"):
            self._fields = self.options.field
        if isinstance(self._fields, list):
            union = set(self._fields).union(VALID_FIELDS)
            if union != VALID_FIELDS:
                raise Exception("Valid fields options are %s, provided %s." % (
                    VALID_FIELDS, self._fields))
        self._state = "idle"
        self._query = expression
        self._data = to_json(expression).encode()

    def close(self):
        """
        Closes the stream subscription by aborting its underlying http request.
        """
        if self._state == 'closed':
            raise StreamError('Cannot close inactive stream subscription.')

        self._state = 'closed'

    def subscribe(self, on_event):
        """Initiates the stream subscription."""
        if self._state != "idle":
            raise StreamError('Stream subscription already started.')

        try:
            base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}"
            timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10)
            conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout)
        except Exception as error_msg:
            raise StreamError(error_msg)

        try:
            self._state = 'connecting'
            headers = self._client.session.headers
            headers["Authorization"] = self._client.auth.auth_header()
            if self._client._query_timeout_ms is not None:
                headers["X-Query-Timeout"] = str(
                    self._client._query_timeout_ms)
            headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
            start_time = time()
            url_params = ''
            if isinstance(self._fields, list):
                url_params = "?%s" % (
                    urlencode({'fields': ",".join(self._fields)}))

            stream_id = conn.stream("POST", "/stream%s" %
                                  (url_params), content=self._data, headers=dict(headers))
            self._state = 'open'
            self._event_loop(stream_id, on_event, start_time)
        except Exception as error_msg:
            if callable(on_event):
                on_event(Error(error_msg), None)
        finally:
            conn.close()


    def _event_loop(self, stream_id, on_event, start_time):
        """ Event loop for the stream. """
        with stream_id as response:
            if 'x-txn-time' in response.headers:
                self._client.sync_last_txn_time(int(response.headers['x-txn-time']))
            try:
                buffer = ''
                for push in response.iter_bytes():

                    try:
                        chunk = push.decode()
                        buffer += chunk
                    except:
                        continue

                    result = stream_content_to_json(buffer)
                    buffer = result["buffer"]

                    for value in result["values"]:
                        request_result = self._stream_chunk_to_request_result(
                            response, value["raw"], value["content"], start_time, time())
                        event = parse_stream_request_result_or_none(request_result)

                        if event is not None and hasattr(event, 'txn'):
                            self._client.sync_last_txn_time(int(event.txn))
                        on_event(event, request_result)
                        if self._client.observer is not None:
                            self._client.observer(request_result)

                    if self._state == 'closed':
                        break
            except Exception as error_msg:
                self.error = error_msg
                on_event(Error(error_msg), None)

    def _stream_chunk_to_request_result(self, response, raw, content, start_time, end_time):
        """ Converts a stream chunk to a RequestResult. """
        return RequestResult(
            "POST", "/stream", self._query, self._data,
            raw, content, None, response.headers,
            start_time, end_time)

Classes

class Connection (client, expression, options)

The internal stream client connection interface. This class handles the network side of a stream subscription.

Current limitations: Python requests module uses HTTP1; httpx is used for HTTP/2

Expand source code
class Connection(object):
    """
    The internal stream client connection interface.
    This class handles the network side of a stream
    subscription.

    Current limitations:
    Python requests module uses HTTP1; httpx is used for HTTP/2
    """

    def __init__(self, client, expression, options):
        self._client = client
        self.options = options
        self._fields = None
        if isinstance(self.options, dict):
            self._fields = self.options.get("fields", None)
        elif hasattr(self.options, "fields"):
            self._fields = self.options.field
        if isinstance(self._fields, list):
            union = set(self._fields).union(VALID_FIELDS)
            if union != VALID_FIELDS:
                raise Exception("Valid fields options are %s, provided %s." % (
                    VALID_FIELDS, self._fields))
        self._state = "idle"
        self._query = expression
        self._data = to_json(expression).encode()

    def close(self):
        """
        Closes the stream subscription by aborting its underlying http request.
        """
        if self._state == 'closed':
            raise StreamError('Cannot close inactive stream subscription.')

        self._state = 'closed'

    def subscribe(self, on_event):
        """Initiates the stream subscription."""
        if self._state != "idle":
            raise StreamError('Stream subscription already started.')

        try:
            base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}"
            timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10)
            conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout)
        except Exception as error_msg:
            raise StreamError(error_msg)

        try:
            self._state = 'connecting'
            headers = self._client.session.headers
            headers["Authorization"] = self._client.auth.auth_header()
            if self._client._query_timeout_ms is not None:
                headers["X-Query-Timeout"] = str(
                    self._client._query_timeout_ms)
            headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
            start_time = time()
            url_params = ''
            if isinstance(self._fields, list):
                url_params = "?%s" % (
                    urlencode({'fields': ",".join(self._fields)}))

            stream_id = conn.stream("POST", "/stream%s" %
                                  (url_params), content=self._data, headers=dict(headers))
            self._state = 'open'
            self._event_loop(stream_id, on_event, start_time)
        except Exception as error_msg:
            if callable(on_event):
                on_event(Error(error_msg), None)
        finally:
            conn.close()


    def _event_loop(self, stream_id, on_event, start_time):
        """ Event loop for the stream. """
        with stream_id as response:
            if 'x-txn-time' in response.headers:
                self._client.sync_last_txn_time(int(response.headers['x-txn-time']))
            try:
                buffer = ''
                for push in response.iter_bytes():

                    try:
                        chunk = push.decode()
                        buffer += chunk
                    except:
                        continue

                    result = stream_content_to_json(buffer)
                    buffer = result["buffer"]

                    for value in result["values"]:
                        request_result = self._stream_chunk_to_request_result(
                            response, value["raw"], value["content"], start_time, time())
                        event = parse_stream_request_result_or_none(request_result)

                        if event is not None and hasattr(event, 'txn'):
                            self._client.sync_last_txn_time(int(event.txn))
                        on_event(event, request_result)
                        if self._client.observer is not None:
                            self._client.observer(request_result)

                    if self._state == 'closed':
                        break
            except Exception as error_msg:
                self.error = error_msg
                on_event(Error(error_msg), None)

    def _stream_chunk_to_request_result(self, response, raw, content, start_time, end_time):
        """ Converts a stream chunk to a RequestResult. """
        return RequestResult(
            "POST", "/stream", self._query, self._data,
            raw, content, None, response.headers,
            start_time, end_time)

Methods

def close(self)

Closes the stream subscription by aborting its underlying http request.

Expand source code
def close(self):
    """
    Closes the stream subscription by aborting its underlying http request.
    """
    if self._state == 'closed':
        raise StreamError('Cannot close inactive stream subscription.')

    self._state = 'closed'
def subscribe(self, on_event)

Initiates the stream subscription.

Expand source code
def subscribe(self, on_event):
    """Initiates the stream subscription."""
    if self._state != "idle":
        raise StreamError('Stream subscription already started.')

    try:
        base_url = f"{self._client.scheme}://{self._client.domain}:{self._client.port}"
        timeout = httpx.Timeout(connect=None, read=None, write=None, pool=10)
        conn = httpx.Client(http2=True, http1=False, base_url=base_url, timeout=timeout)
    except Exception as error_msg:
        raise StreamError(error_msg)

    try:
        self._state = 'connecting'
        headers = self._client.session.headers
        headers["Authorization"] = self._client.auth.auth_header()
        if self._client._query_timeout_ms is not None:
            headers["X-Query-Timeout"] = str(
                self._client._query_timeout_ms)
        headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
        start_time = time()
        url_params = ''
        if isinstance(self._fields, list):
            url_params = "?%s" % (
                urlencode({'fields': ",".join(self._fields)}))

        stream_id = conn.stream("POST", "/stream%s" %
                              (url_params), content=self._data, headers=dict(headers))
        self._state = 'open'
        self._event_loop(stream_id, on_event, start_time)
    except Exception as error_msg:
        if callable(on_event):
            on_event(Error(error_msg), None)
    finally:
        conn.close()