'use strict'
/** @module stream */
// NOTE: Although implemented in a separate module, streaming shares internal
// responsibilities with both Client and HTTP interfaces, such as updating last
// seen transaction timestamp. Therefore, this implementation
// sometimes breaks encapsulation and calls internal getters and methods. As a
// general rule: it's okay to call internal methods. You can interpret this
// as calling for a package visible method in languages with fine-grained
// visibility control. However, DO NOT change any internal state from outside of
// its context as it'd most certainly lead to errors.
var { AbortController } = require('node-abort-controller')
var RequestResult = require('./RequestResult')
var errors = require('./errors')
var json = require('./_json')
var http = require('./_http')
var q = require('./query')
var util = require('./_util')
var DefaultEvents = ['start', 'error', 'version', 'history_rewrite', 'set']
var DocumentStreamEvents = DefaultEvents.concat(['snapshot'])
/**
* The internal stream client interface. This class handles the network side of
* a stream subscription.
*
* @constructor
* @param {Client} client The FaunaDB client.
* @param {module:query~ExprArg} expression The FQL expression you are subscribing to.
* @param {module:stream~Options} options The stream options.
* @param {function} onEvent The stream event's callback function.
* @private
*/
function StreamClient(client, expression, options, onEvent) {
options = util.applyDefaults(options, {
fields: null,
})
this._client = client
this._onEvent = onEvent
this._query = q.wrap(expression)
this._urlParams = options.fields ? { fields: options.fields.join(',') } : null
this._abort = new AbortController()
this._state = 'idle'
}
/**
* Takes a snapshot of the current query. Assumes the subscribed query returns a
* reference.
*/
StreamClient.prototype.snapshot = function() {
var self = this
self._client
.query(q.Get(self._query))
.then(function(doc) {
self._onEvent({
type: 'snapshot',
event: doc,
})
})
.catch(function(error) {
self._onEvent({
type: 'error',
event: error,
})
})
}
/** Initiates the stream subscription. */
StreamClient.prototype.subscribe = function() {
var self = this
if (self._state === 'idle') {
self._state = 'open'
} else {
throw new Error(
'Subscription#start should not be called several times, ' +
'consider instantiating a new stream instead.'
)
}
var body = JSON.stringify(self._query)
var startTime = Date.now()
var buffer = ''
function onResponse(response) {
var endTime = Date.now()
var parsed
try {
parsed = json.parseJSON(response.body)
} catch (_) {
parsed = response.body
}
var result = new RequestResult(
'POST',
'stream',
self._urlParams,
body,
self._query,
response.body,
parsed,
response.status,
response.headers,
startTime,
endTime
)
self._client._handleRequestResult(response, result)
}
function onData(data) {
var result = json.parseJSONStreaming(buffer + data)
buffer = result.buffer
result.values.forEach(function(event) {
if (event.txn !== undefined) {
self._client.syncLastTxnTime(event.txn)
}
if (event.event === 'error') {
onError(new errors.StreamErrorEvent(event))
} else {
self._onEvent(event)
}
})
}
function onError(error) {
// AbortError is triggered as result of calling
// close() on a Subscription. There's no need to relay this event back up.
if (error instanceof http.AbortError) {
return
}
self._onEvent({
type: 'error',
event: error,
})
}
self._client._http
.execute({
method: 'POST',
path: 'stream',
body: body,
query: self._urlParams,
signal: this._abort.signal,
streamConsumer: {
onError: onError,
onData: onData,
},
})
.then(onResponse)
.catch(onError)
}
/** Closes the stream subscription by aborting its underlying http request. */
StreamClient.prototype.close = function() {
if (this._state !== 'closed') {
this._state = 'closed'
this._abort.abort()
}
}
/**
* Event dispatch interface for stream subscription.
*
* @constructor
* @param {string[]} allowedEvents List of allowed events.
* @private
*/
function EventDispatcher(allowedEvents) {
this._allowedEvents = allowedEvents
this._listeners = {}
}
/** Subscribe to an event
*
* @param {string} type The event type.
* @param {module:stream~Subscription~eventCalllback} callback
* The event's callback.
*/
EventDispatcher.prototype.on = function(type, callback) {
if (this._allowedEvents.indexOf(type) === -1) {
throw new Error('Unknown event type: ' + type)
}
if (this._listeners[type] === undefined) {
this._listeners[type] = []
}
this._listeners[type].push(callback)
}
/**
* Dispatch the given event to the appropriate listeners.
*
* @param {Object} event The event.
*/
EventDispatcher.prototype.dispatch = function(event) {
var listeners = this._listeners[event.type]
if (!listeners) {
return
}
for (var i = 0; i < listeners.length; i++) {
listeners[i].call(null, event.event, event)
}
}
/**
* Stream's start event. A stream subscription always begins with a start event.
* Upcoming events are guaranteed to have transaction timestamps equal to or greater than
* the stream's start timestamp.
*
* @event module:stream~Subscription#start
* @type {object}
* @property {string} type='start'
* The event type.
* @property {number} txn
* The event's transaction timestamp.
* @property {module:number} event
* The stream start timestamp.
*/
/**
* A version event occurs upon any modifications to the current state of the
* subscribed document.
*
* @event module:stream~Subscription#version
* @type {object}
* @property {string} type='version'
* The event type.
* @property {number} txn
* The event's transaction timestamp.
* @property {object} event
* The event's data.
*/
/**
* A history rewrite event occurs upon any modifications to the history of the
* subscribed document.
*
* @event module:stream~Subscription#history_rewrite
* @type {object}
* @property {string} type='history_rewrite'
* The event type.
* @property {number} txn
* The event's transaction timestamp.
* @property {object} event
* The event's data.
*/
/**
* A snapshot event. A snapshot event is fired once the `document` stream helper
* finishes loading the subscribed document's snapshot data. See {@link
* Client#stream} for more details on the `document` stream helper.
*
* @event module:stream~Subscription#snapshot
* @type {object}
* @property {string} type='snapshot'
* The event type.
* @property {number} txn
* The event's transaction timestamp.
* @property {object} event
* The event's data.
*/
/**
* An error event is fired both for client and server errors that may occur as
* a result of a subscription.
*
* @event module:stream~Subscription#error
* @type {object}
* @property {string} type='error'
* The event type.
* @property {?number} txn
* The event's transaction timestamp.
* @property {Error} event
* The underlying error.
*/
/**
* @typedef {Object} Options
* @property {string[]} [fields=['action', 'document', 'diff', 'prev', 'index']]
* The fields event fields to opt-in during stream subscription. Possible
* options:
* * 'action': The action type
* * 'document': The document's data
* * 'diff': The difference between 'document' and 'prev'
* * 'prev': The event's previous data
* * 'index': The event's source index, if a set event
*/
/**
* The callback to be executed when an new event occurs.
*
* @callback module:stream~Subscription~eventCalllback
* @param {any} data The event's data field.
* @param {object} event The event's entire object.
*/
/**
* A stream subscription which dispatches events received to the registered
* listener functions. This class must be constructed via {@link Client#stream}
* method.
*
* @constructor
* @param {StreamClient} client
* Internal stream client interface.
* @param {EventDispatcher} dispatcher
* Internal event dispatcher interface.
*/
function Subscription(client, dispatcher) {
this._client = client
this._dispatcher = dispatcher
}
/**
* Subscribes to an event type.
*
* @param {string} event
* The event's type.
* @param {module:stream~Subscription~eventCalllback} callback
* A callback function.
*
* @returns {module:stream~Subscription} This instance.
*/
Subscription.prototype.on = function(type, callback) {
this._dispatcher.on(type, callback)
return this
}
/**
* Initiates the underlying subscription network calls.
* @returns {module:stream~Subscription} This instance.
*/
Subscription.prototype.start = function() {
this._client.subscribe()
return this
}
/**
* Stops the current subscription and closes the underlying network connection.
*/
Subscription.prototype.close = function() {
this._client.close()
}
/**
* Stream API factory function. See {@link Client#stream} for details on how to
* use stream's public interface.
* @private
*/
function StreamAPI(client) {
var api = function(expression, options) {
var dispatcher = new EventDispatcher(DefaultEvents)
var streamClient = new StreamClient(client, expression, options, function(
event
) {
dispatcher.dispatch(event)
})
return new Subscription(streamClient, dispatcher)
}
api.document = function(expression, options) {
var buffer = []
var buffering = true
var dispatcher = new EventDispatcher(DocumentStreamEvents)
var streamClient = new StreamClient(client, expression, options, onEvent)
function onEvent(event) {
switch (event.type) {
case 'start':
dispatcher.dispatch(event)
streamClient.snapshot()
break
case 'snapshot':
resume(event)
break
case 'error':
dispatcher.dispatch(event)
break
default:
if (buffering) {
buffer.push(event)
} else {
dispatcher.dispatch(event)
}
}
}
function resume(snapshotEvent) {
dispatcher.dispatch(snapshotEvent)
for (var i = 0; i < buffer.length; i++) {
var bufferedEvent = buffer[i]
if (bufferedEvent.txn > snapshotEvent.event.ts) {
dispatcher.dispatch(bufferedEvent)
}
}
buffering = false
buffer = null
}
return new Subscription(streamClient, dispatcher)
}
return api
}
module.exports = {
StreamAPI: StreamAPI,
}