Topics

class

diffusion.components.topics.Topics(session)

Bases
diffusion.components.Component

Topics component.

Parameters
  • session (Session) The active Session to operate on.
Attributes
  • topics (dict) Internal storage for registered topics.
Methods
  • add_fallback_stream(topic_type, update, **kwargs) Registers a fallback stream handler for a topic type.
  • add_value_stream(topic_selector, topic_type, update, **kwargs) Registers a value stream handler for a topic selector and type.
  • subscribe(topic_selector) Register the session to receive updates for the given topic.
  • unsubscribe(topic_selector) Unregister the session to stop receiving updates for the given topic.
method

add_value_stream(topic_selector, topic_type, update, **kwargs)

Registers a value stream handler for a topic selector and type.

A value stream is a series of events associated with a registered topic. This method makes it possible to register callbacks for each of those events.

Parameters
  • topic_selector (str) The handler will react to the updates to all topics matching this selector.
  • topic_type (Union(int, str, type of datatype)) The data type of the topic being streamed.
  • update (callable) The callback to be executed on the value update event.
  • **kwargs (callable) The callbacks to be executed on other value stream events. The following events are currently supported: * subscribe * unsubscribe * error * close
method

add_fallback_stream(topic_type, update, **kwargs)

Registers a fallback stream handler for a topic type.

A value stream is a series of events associated with a registered topic. This method makes it possible to register callbacks for each of those events.

Parameters
  • topic_type (Union(int, str, type of datatype)) The data type of the topic being streamed.
  • update (callable) The callback to be executed on the value update event.
  • **kwargs (callable) The callbacks to be executed on other value stream events. The following events are currently supported: * subscribe * unsubscribe * error * close
method

subscribe(topic_selector)

Register the session to receive updates for the given topic.

Parameters
  • topic_selector (str) The selector for topics to subscribe to.
method

unsubscribe(topic_selector)

Unregister the session to stop receiving updates for the given topic.

Parameters
  • topic_selector (str) The selector for topics to unsubscribe from.
class

diffusion.internal.services.topics.Topic(id, path, type, properties={}, binary_value=None, streams=NOTHING)

A Diffusion topic.

Parameters
  • id (int) Internal ID of the topic on this session.
  • path (str) The topic path.
  • type Type of the topic.
  • properties (optional) A mapping of topic properties.
  • binary_value (bytes, optional) The current value of the property. None by default.
  • streams (optional) A mapping of streams available for various events.
Attributes
  • value Returns the current value for the topic.
Methods
  • apply_delta(original, delta) (bytes) Applies a binary delta value to an original binary value.
  • handle(event, **kwargs) Runs registered stream handlers for the topic and event.
  • parse_delta(delta) (iterator of bytes or slice) Parses a binary delta value, yielding insert and match values.
  • update(value, is_delta) Updates the binary value of the topic.
  • update_streams(handlers) Updates the collection of registered stream handlers for the topic.
method

update_streams(handlers)

Updates the collection of registered stream handlers for the topic.

First it tries to locate any registered handlers with selectors that match the topic's path and type. If none are available, it selects the fallback stream handlers which match the topic's type.

Parameters
  • handlers (HandlersMapping) The Session.handlers mapping containing all the registered handlers.
method

handle(event, **kwargs)

Runs registered stream handlers for the topic and event.

Parameters
  • event (str) Textual identifier for the event: update, subscribe etc.
  • kwargs Various parameters. The topic's path and current value are injected at runtime.
method

update(value, is_delta=False)

Updates the binary value of the topic.

Parameters
  • value (bytes) The new binary value to apply.
  • is_delta (optional) If True, the new binary value is a binary delta to be applied to the current value. If False, the current value is replaced by the new value.
classmethod

apply_delta(original, delta) → bytes

Applies a binary delta value to an original binary value.

Parameters
  • original (bytes) The original binary value.
  • delta (bytes) The binary delta value to apply. If this value is the CBOR null value, the original value is left unchanged.
classmethod

parse_delta(delta) → iterator of bytes or slice

Parses a binary delta value, yielding insert and match values.

The yielded values are either bytes to apply, or slices to be extracted from the original value.

Parameters
  • delta (bytes) The binary delta to parse.