Module faunadb.streams.dispatcher

Expand source code
import logging

class EventDispatcher(object):
    """
    Event dispatch interface for stream subscription.
    """
    def __init__(self):
        self.callbacks = {}

    def on(self, event_type, callback):
        """
        Subscribe to an event.
        """
        if callable(callback):
            self.callbacks[event_type] = callback
        elif callback is not None:
            raise Exception("Callback for event `%s` is not callable."%(event_type))

    def _noop(self, event, request_result):
        """
        Default callback for unregistered event types.
        """
        logging.debug("Unhandled stream event %s; %s"%(event, request_result))
        pass

    def dispatch(self, event, request_result):
        """
        Dispatch the given event to the appropriate listeners.
        """
        fn = self.callbacks.get(event.type, None)
        if fn is None:
            return self._noop(event, request_result)
        return fn(event)

Classes

class EventDispatcher

Event dispatch interface for stream subscription.

Expand source code
class EventDispatcher(object):
    """
    Event dispatch interface for stream subscription.
    """
    def __init__(self):
        self.callbacks = {}

    def on(self, event_type, callback):
        """
        Subscribe to an event.
        """
        if callable(callback):
            self.callbacks[event_type] = callback
        elif callback is not None:
            raise Exception("Callback for event `%s` is not callable."%(event_type))

    def _noop(self, event, request_result):
        """
        Default callback for unregistered event types.
        """
        logging.debug("Unhandled stream event %s; %s"%(event, request_result))
        pass

    def dispatch(self, event, request_result):
        """
        Dispatch the given event to the appropriate listeners.
        """
        fn = self.callbacks.get(event.type, None)
        if fn is None:
            return self._noop(event, request_result)
        return fn(event)

Methods

def dispatch(self, event, request_result)

Dispatch the given event to the appropriate listeners.

Expand source code
def dispatch(self, event, request_result):
    """
    Dispatch the given event to the appropriate listeners.
    """
    fn = self.callbacks.get(event.type, None)
    if fn is None:
        return self._noop(event, request_result)
    return fn(event)
def on(self, event_type, callback)

Subscribe to an event.

Expand source code
def on(self, event_type, callback):
    """
    Subscribe to an event.
    """
    if callable(callback):
        self.callbacks[event_type] = callback
    elif callback is not None:
        raise Exception("Callback for event `%s` is not callable."%(event_type))