Source: stream.js

'use strict'

/** @module stream */

// NOTE: Although implemented in a separate module, streaming shares internal
// responsibilities with both Client and HTTP interfaces, such as updating last
// seen transaction timestamp. Therefore, this implementation
// sometimes breaks encapsulation and calls internal getters and methods. As a
// general rule: it's okay to call internal methods. You can interpret this
// as calling for a package visible method in languages with fine-grained
// visibility control. However, DO NOT change any internal state from outside of
// its context as it'd most certainly lead to errors.

var { AbortController } = require('node-abort-controller')
var RequestResult = require('./RequestResult')
var errors = require('./errors')
var json = require('./_json')
var http = require('./_http')
var q = require('./query')
var util = require('./_util')

var DefaultEvents = ['start', 'error', 'version', 'history_rewrite', 'set']
var DocumentStreamEvents = DefaultEvents.concat(['snapshot'])

 * The internal stream client interface. This class handles the network side of
 * a stream subscription.
 * @constructor
 * @param {Client} client The FaunaDB client.
 * @param {module:query~ExprArg} expression The FQL expression you are subscribing to.
 * @param {module:stream~Options} options The stream options.
 * @param {function} onEvent The stream event's callback function.
 * @private
function StreamClient(client, expression, options, onEvent) {
  options = util.applyDefaults(options, {
    fields: null,

  this._client = client
  this._onEvent = onEvent
  this._query = q.wrap(expression)
  this._urlParams = options.fields ? { fields: options.fields.join(',') } : null
  this._abort = new AbortController()
  this._state = 'idle'

 * Takes a snapshot of the current query. Assumes the subscribed query returns a
 * reference.
StreamClient.prototype.snapshot = function() {
  var self = this
    .then(function(doc) {
        type: 'snapshot',
        event: doc,
    .catch(function(error) {
        type: 'error',
        event: error,

/** Initiates the stream subscription.  */
StreamClient.prototype.subscribe = function() {
  var self = this

  if (self._state === 'idle') {
    self._state = 'open'
  } else {
    throw new Error(
      'Subscription#start should not be called several times, ' +
        'consider instantiating a new stream instead.'

  var body = JSON.stringify(self._query)
  var startTime =
  var buffer = ''

  function onResponse(response) {
    var endTime =
    var parsed

    try {
      parsed = json.parseJSON(response.body)
    } catch (_) {
      parsed = response.body

    var result = new RequestResult(

    self._client._handleRequestResult(response, result)

  function onData(data) {
    var result = json.parseJSONStreaming(buffer + data)

    buffer = result.buffer

    result.values.forEach(function(event) {
      if (event.txn !== undefined) {

      if (event.event === 'error') {
        onError(new errors.StreamErrorEvent(event))
      } else {

  function onError(error) {
    // AbortError is triggered as result of calling
    // close() on a Subscription. There's no need to relay this event back up.
    if (error instanceof http.AbortError) {

      type: 'error',
      event: error,

      method: 'POST',
      path: 'stream',
      body: body,
      query: self._urlParams,
      signal: this._abort.signal,
      streamConsumer: {
        onError: onError,
        onData: onData,

/** Closes the stream subscription by aborting its underlying http request. */
StreamClient.prototype.close = function() {
  if (this._state !== 'closed') {
    this._state = 'closed'

 * Event dispatch interface for stream subscription.
 * @constructor
 * @param {string[]} allowedEvents List of allowed events.
 * @private
function EventDispatcher(allowedEvents) {
  this._allowedEvents = allowedEvents
  this._listeners = {}

/** Subscribe to an event
 * @param {string} type The event type.
 * @param {module:stream~Subscription~eventCalllback} callback
 *   The event's callback.
EventDispatcher.prototype.on = function(type, callback) {
  if (this._allowedEvents.indexOf(type) === -1) {
    throw new Error('Unknown event type: ' + type)
  if (this._listeners[type] === undefined) {
    this._listeners[type] = []

 * Dispatch the given event to the appropriate listeners.
 * @param {Object} event The event.
EventDispatcher.prototype.dispatch = function(event) {
  var listeners = this._listeners[event.type]
  if (!listeners) {

  for (var i = 0; i < listeners.length; i++) {
    listeners[i].call(null, event.event, event)

 * Stream's start event. A stream subscription always begins with a start event.
 * Upcoming events are guaranteed to have transaction timestamps equal to or greater than
 * the stream's start timestamp.
 * @event module:stream~Subscription#start
 * @type {object}
 * @property {string} type='start'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {module:number} event
 *   The stream start timestamp.

 * A version event occurs upon any modifications to the current state of the
 * subscribed document.
 * @event module:stream~Subscription#version
 * @type {object}
 * @property {string} type='version'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.

 * A history rewrite event occurs upon any modifications to the history of the
 * subscribed document.
 * @event module:stream~Subscription#history_rewrite
 * @type {object}
 * @property {string} type='history_rewrite'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.

 * A snapshot event. A snapshot event is fired once the `document` stream helper
 * finishes loading the subscribed document's snapshot data. See {@link
 * Client#stream} for more details on the `document` stream helper.
 * @event module:stream~Subscription#snapshot
 * @type {object}
 * @property {string} type='snapshot'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.

 * An error event is fired both for client and server errors that may occur as
 * a result of a subscription.
 * @event module:stream~Subscription#error
 * @type {object}
 * @property {string} type='error'
 *   The event type.
 * @property {?number} txn
 *   The event's transaction timestamp.
 * @property {Error} event
 *   The underlying error.

 * @typedef {Object} Options
 * @property {string[]} [fields=['action', 'document', 'diff', 'prev', 'index']]
 *   The fields event fields to opt-in during stream subscription. Possible
 *   options:
 *   * 'action': The action type
 *   * 'document': The document's data
 *   * 'diff': The difference between 'document' and 'prev'
 *   * 'prev': The event's previous data
 *   * 'index': The event's source index, if a set event

 * The callback to be executed when an new event occurs.
 * @callback module:stream~Subscription~eventCalllback
 * @param {any} data The event's data field.
 * @param {object} event The event's entire object.

 * A stream subscription which dispatches events received to the registered
 * listener functions. This class must be constructed via {@link Client#stream}
 * method.
 * @constructor
 * @param {StreamClient} client
 *   Internal stream client interface.
 * @param {EventDispatcher} dispatcher
 *   Internal event dispatcher interface.
function Subscription(client, dispatcher) {
  this._client = client
  this._dispatcher = dispatcher

 * Subscribes to an event type.
 * @param {string} event
 *   The event's type.
 * @param {module:stream~Subscription~eventCalllback} callback
 *   A callback function.
 * @returns {module:stream~Subscription} This instance.
Subscription.prototype.on = function(type, callback) {
  this._dispatcher.on(type, callback)
  return this

 * Initiates the underlying subscription network calls.
 * @returns {module:stream~Subscription} This instance.
Subscription.prototype.start = function() {
  return this

 * Stops the current subscription and closes the underlying network connection.
Subscription.prototype.close = function() {

 * Stream API factory function. See {@link Client#stream} for details on how to
 * use stream's public interface.
 * @private
function StreamAPI(client) {
  var api = function(expression, options) {
    var dispatcher = new EventDispatcher(DefaultEvents)
    var streamClient = new StreamClient(client, expression, options, function(
    ) {
    return new Subscription(streamClient, dispatcher)

  api.document = function(expression, options) {
    var buffer = []
    var buffering = true
    var dispatcher = new EventDispatcher(DocumentStreamEvents)
    var streamClient = new StreamClient(client, expression, options, onEvent)

    function onEvent(event) {
      switch (event.type) {
        case 'start':
        case 'snapshot':
        case 'error':
          if (buffering) {
          } else {

    function resume(snapshotEvent) {
      for (var i = 0; i < buffer.length; i++) {
        var bufferedEvent = buffer[i]
        if (bufferedEvent.txn > snapshotEvent.event.ts) {
      buffering = false
      buffer = null

    return new Subscription(streamClient, dispatcher)

  return api

module.exports = {
  StreamAPI: StreamAPI,