Source: stream.js

'use strict'

/** @module stream */

// NOTE: Although implemented in a separate module, streaming shares internal
// responsibilities with both Client and HTTP interfaces, such as updating last
// seen transaction timestamp. Therefore, this implementation
// sometimes breaks encapsulation and calls internal getters and methods. As a
// general rule: it's okay to call internal methods. You can interpret this
// as calling for a package visible method in languages with fine-grained
// visibility control. However, DO NOT change any internal state from outside of
// its context as it'd most certainly lead to errors.

var { AbortController } = require('node-abort-controller')
var RequestResult = require('./RequestResult')
var errors = require('./errors')
var json = require('./_json')
var http = require('./_http')
var q = require('./query')
var util = require('./_util')

var DefaultEvents = ['start', 'error', 'version', 'history_rewrite', 'set']
var DocumentStreamEvents = DefaultEvents.concat(['snapshot'])

/**
 * The internal stream client interface. This class handles the network side of
 * a stream subscription.
 *
 * @constructor
 * @param {Client} client The FaunaDB client.
 * @param {module:query~ExprArg} expression The FQL expression you are subscribing to.
 * @param {module:stream~Options} options The stream options.
 * @param {function} onEvent The stream event's callback function.
 * @private
 */
function StreamClient(client, expression, options, onEvent) {
  options = util.applyDefaults(options, {
    fields: null,
  })

  this._client = client
  this._onEvent = onEvent
  this._query = q.wrap(expression)
  this._urlParams = options.fields ? { fields: options.fields.join(',') } : null
  this._abort = new AbortController()
  this._state = 'idle'
}

/**
 * Takes a snapshot of the current query. Assumes the subscribed query returns a
 * reference.
 */
StreamClient.prototype.snapshot = function() {
  var self = this
  self._client
    .query(q.Get(self._query))
    .then(function(doc) {
      self._onEvent({
        type: 'snapshot',
        event: doc,
      })
    })
    .catch(function(error) {
      self._onEvent({
        type: 'error',
        event: error,
      })
    })
}

/** Initiates the stream subscription.  */
StreamClient.prototype.subscribe = function() {
  var self = this

  if (self._state === 'idle') {
    self._state = 'open'
  } else {
    throw new Error(
      'Subscription#start should not be called several times, ' +
        'consider instantiating a new stream instead.'
    )
  }

  var body = JSON.stringify(self._query)
  var startTime = Date.now()
  var buffer = ''

  function onResponse(response) {
    var endTime = Date.now()
    var parsed

    try {
      parsed = json.parseJSON(response.body)
    } catch (_) {
      parsed = response.body
    }

    var result = new RequestResult(
      'POST',
      'stream',
      self._urlParams,
      body,
      self._query,
      response.body,
      parsed,
      response.status,
      response.headers,
      startTime,
      endTime
    )

    self._client._handleRequestResult(response, result)
  }

  function onData(data) {
    var result = json.parseJSONStreaming(buffer + data)

    buffer = result.buffer

    result.values.forEach(function(event) {
      if (event.txn !== undefined) {
        self._client.syncLastTxnTime(event.txn)
      }

      if (event.event === 'error') {
        onError(new errors.StreamErrorEvent(event))
      } else {
        self._onEvent(event)
      }
    })
  }

  function onError(error) {
    // AbortError is triggered as result of calling
    // close() on a Subscription. There's no need to relay this event back up.
    if (error instanceof http.AbortError) {
      return
    }

    self._onEvent({
      type: 'error',
      event: error,
    })
  }

  self._client._http
    .execute({
      method: 'POST',
      path: 'stream',
      body: body,
      query: self._urlParams,
      signal: this._abort.signal,
      streamConsumer: {
        onError: onError,
        onData: onData,
      },
    })
    .then(onResponse)
    .catch(onError)
}

/** Closes the stream subscription by aborting its underlying http request. */
StreamClient.prototype.close = function() {
  if (this._state !== 'closed') {
    this._state = 'closed'
    this._abort.abort()
  }
}

/**
 * Event dispatch interface for stream subscription.
 *
 * @constructor
 * @param {string[]} allowedEvents List of allowed events.
 * @private
 */
function EventDispatcher(allowedEvents) {
  this._allowedEvents = allowedEvents
  this._listeners = {}
}

/** Subscribe to an event
 *
 * @param {string} type The event type.
 * @param {module:stream~Subscription~eventCalllback} callback
 *   The event's callback.
 */
EventDispatcher.prototype.on = function(type, callback) {
  if (this._allowedEvents.indexOf(type) === -1) {
    throw new Error('Unknown event type: ' + type)
  }
  if (this._listeners[type] === undefined) {
    this._listeners[type] = []
  }
  this._listeners[type].push(callback)
}

/**
 * Dispatch the given event to the appropriate listeners.
 *
 * @param {Object} event The event.
 */
EventDispatcher.prototype.dispatch = function(event) {
  var listeners = this._listeners[event.type]
  if (!listeners) {
    return
  }

  for (var i = 0; i < listeners.length; i++) {
    listeners[i].call(null, event.event, event)
  }
}

/**
 * Stream's start event. A stream subscription always begins with a start event.
 * Upcoming events are guaranteed to have transaction timestamps equal to or greater than
 * the stream's start timestamp.
 *
 * @event module:stream~Subscription#start
 * @type {object}
 * @property {string} type='start'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {module:number} event
 *   The stream start timestamp.
 */

/**
 * A version event occurs upon any modifications to the current state of the
 * subscribed document.
 *
 * @event module:stream~Subscription#version
 * @type {object}
 * @property {string} type='version'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.
 */

/**
 * A history rewrite event occurs upon any modifications to the history of the
 * subscribed document.
 *
 * @event module:stream~Subscription#history_rewrite
 * @type {object}
 * @property {string} type='history_rewrite'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.
 */

/**
 * A snapshot event. A snapshot event is fired once the `document` stream helper
 * finishes loading the subscribed document's snapshot data. See {@link
 * Client#stream} for more details on the `document` stream helper.
 *
 * @event module:stream~Subscription#snapshot
 * @type {object}
 * @property {string} type='snapshot'
 *   The event type.
 * @property {number} txn
 *   The event's transaction timestamp.
 * @property {object} event
 *   The event's data.
 */

/**
 * An error event is fired both for client and server errors that may occur as
 * a result of a subscription.
 *
 * @event module:stream~Subscription#error
 * @type {object}
 * @property {string} type='error'
 *   The event type.
 * @property {?number} txn
 *   The event's transaction timestamp.
 * @property {Error} event
 *   The underlying error.
 */

/**
 * @typedef {Object} Options
 * @property {string[]} [fields=['action', 'document', 'diff', 'prev', 'index']]
 *   The fields event fields to opt-in during stream subscription. Possible
 *   options:
 *   * 'action': The action type
 *   * 'document': The document's data
 *   * 'diff': The difference between 'document' and 'prev'
 *   * 'prev': The event's previous data
 *   * 'index': The event's source index, if a set event
 */

/**
 * The callback to be executed when an new event occurs.
 *
 * @callback module:stream~Subscription~eventCalllback
 * @param {any} data The event's data field.
 * @param {object} event The event's entire object.
 */

/**
 * A stream subscription which dispatches events received to the registered
 * listener functions. This class must be constructed via {@link Client#stream}
 * method.
 *
 * @constructor
 * @param {StreamClient} client
 *   Internal stream client interface.
 * @param {EventDispatcher} dispatcher
 *   Internal event dispatcher interface.
 */
function Subscription(client, dispatcher) {
  this._client = client
  this._dispatcher = dispatcher
}

/**
 * Subscribes to an event type.
 *
 * @param {string} event
 *   The event's type.
 * @param {module:stream~Subscription~eventCalllback} callback
 *   A callback function.
 *
 * @returns {module:stream~Subscription} This instance.
 */
Subscription.prototype.on = function(type, callback) {
  this._dispatcher.on(type, callback)
  return this
}

/**
 * Initiates the underlying subscription network calls.
 * @returns {module:stream~Subscription} This instance.
 */
Subscription.prototype.start = function() {
  this._client.subscribe()
  return this
}

/**
 * Stops the current subscription and closes the underlying network connection.
 */
Subscription.prototype.close = function() {
  this._client.close()
}

/**
 * Stream API factory function. See {@link Client#stream} for details on how to
 * use stream's public interface.
 * @private
 */
function StreamAPI(client) {
  var api = function(expression, options) {
    var dispatcher = new EventDispatcher(DefaultEvents)
    var streamClient = new StreamClient(client, expression, options, function(
      event
    ) {
      dispatcher.dispatch(event)
    })
    return new Subscription(streamClient, dispatcher)
  }

  api.document = function(expression, options) {
    var buffer = []
    var buffering = true
    var dispatcher = new EventDispatcher(DocumentStreamEvents)
    var streamClient = new StreamClient(client, expression, options, onEvent)

    function onEvent(event) {
      switch (event.type) {
        case 'start':
          dispatcher.dispatch(event)
          streamClient.snapshot()
          break
        case 'snapshot':
          resume(event)
          break
        case 'error':
          dispatcher.dispatch(event)
          break
        default:
          if (buffering) {
            buffer.push(event)
          } else {
            dispatcher.dispatch(event)
          }
      }
    }

    function resume(snapshotEvent) {
      dispatcher.dispatch(snapshotEvent)
      for (var i = 0; i < buffer.length; i++) {
        var bufferedEvent = buffer[i]
        if (bufferedEvent.txn > snapshotEvent.event.ts) {
          dispatcher.dispatch(bufferedEvent)
        }
      }
      buffering = false
      buffer = null
    }

    return new Subscription(streamClient, dispatcher)
  }

  return api
}

module.exports = {
  StreamAPI: StreamAPI,
}