Topics

class

diffusion.topics.Topics(session)

Bases
diffusion.internal.components.Component

Topics component.

It is not supposed to be instantiated independently; an instance is available on each Session instance as session.topics.

Parameters
  • session (Session) The active Session to operate on.
Attributes
  • services Alias for the internal session's service locator.
  • topics (dict) Internal storage for registered topics.
Methods
  • add_and_set_topic(topic_path, specification, value) (TopicAddResponse) Create a new topic of the given type and properties.
  • add_fallback_stream(stream) Registers a fallback stream handler for a topic type.
  • add_topic(topic_path, specification) (TopicAddResponse) Create a new topic of the given type and properties.
  • add_value_stream(topic_selector, stream) Registers a value stream handler for a topic selector.
  • remove_topic(topic_selector) (int) Remove all the topics that match the given selector.
  • subscribe(topic_selector) Register the session to receive updates for the given topic.
  • unsubscribe(topic_selector) Unregister the session to stop receiving updates for the given topic.
method

add_topic(topic_path, specification) → TopicAddResponse

Create a new topic of the given type and properties.

Parameters
  • topic_path (str) The path to create the topic on.
  • specification (Union(int, str, type of datatype)) Data type of the topic.
method

add_and_set_topic(topic_path, specification, value) → TopicAddResponse

Create a new topic of the given type and properties.

Parameters
  • topic_path (str) The path to create the topic on.
  • specification (Union(int, str, type of datatype)) Data type of the topic.
  • value (any) Value to set when creating the topic. The value needs to be compatible with the topic_type. If the topic already exists, this will be set as its value.
method

remove_topic(topic_selector)

Remove all the topics that match the given selector.

Parameters
  • topic_selector (str) The topics matching this selector will be removed from the server.
Returns (int)

The number of topics removed.

method

add_value_stream(topic_selector, stream)

Registers a value stream handler for a topic selector.

A value stream is a series of events associated with a registered topic. This method adds a ValueStream which can handle those events.

Parameters
  • topic_selector (str) The handler will react to the updates to all topics matching this selector.
  • stream (ValueStreamHandler) A ValueStream instance that handles incoming events of the matching data type.
method

add_fallback_stream(stream)

Registers a fallback stream handler for a topic type.

A value stream is a series of events associated with a registered topic. This method makes it possible to register callbacks for each of those events.

Parameters
  • stream (ValueStreamHandler) A ValueStream instance that handles incoming events of the matching data type.
method

subscribe(topic_selector)

Register the session to receive updates for the given topic.

Parameters
  • topic_selector (str) The selector for topics to subscribe to.
method

unsubscribe(topic_selector)

Unregister the session to stop receiving updates for the given topic.

Parameters
  • topic_selector (str) The selector for topics to unsubscribe from.
class

diffusion.internal.topics.Topic(path, type, properties=NOTHING, id=None, binary_value=None, streams=NOTHING)

A Diffusion topic.

Parameters
  • path (str) The topic path.
  • type Type of the topic.
  • properties (optional) A mapping of topic properties.
  • id (int, optional) Internal ID of the topic on this session.
  • binary_value (bytes, optional) The current value of the property. None by default.
  • streams (set of ValueStreamHandler, optional) A mapping of streams available for various events.
Attributes
  • value Returns the current value for the topic.
Methods
  • apply_delta(original, delta) (bytes) Applies a binary delta value to an original binary value.
  • handle(event, **kwargs) Runs registered stream handlers for the topic and event.
  • parse_delta(delta) (iterator of bytes or slice) Parses a binary delta value, yielding insert and match values.
  • update(value, is_delta) Updates the binary value of the topic.
  • update_streams(handlers) Updates the collection of registered stream handlers for the topic.
method

update_streams(handlers)

Updates the collection of registered stream handlers for the topic.

First it tries to locate any registered handlers with selectors that match the topic's type and path. If none are available, it selects the fallback stream handlers which match the topic's type.

Parameters
  • handlers (HandlersMapping) The Session.handlers mapping containing all the registered handlers.
method

handle(event, **kwargs)

Runs registered stream handlers for the topic and event.

Parameters
  • event (str) Textual identifier for the event: update, subscribe etc.
  • kwargs Additional parameters. The topic's path and current value are injected at runtime.
method

update(value, is_delta=False)

Updates the binary value of the topic.

Parameters
  • value (bytes) The new binary value to apply.
  • is_delta (optional) If True, the new binary value is a binary delta to be applied to the current value. If False, the current value is replaced by the new value.
classmethod

apply_delta(original, delta) → bytes

Applies a binary delta value to an original binary value.

Parameters
  • original (bytes) The original binary value.
  • delta (bytes) The binary delta value to apply. If this value is the CBOR null value, the original value is left unchanged.
classmethod

parse_delta(delta) → iterator of bytes or slice

Parses a binary delta value, yielding insert and match values.

The yielded values are either bytes to apply, or slices to be extracted from the original value.

Parameters
  • delta (bytes) The binary delta to parse.