Just a second...

Using streams for subscription

Register a stream against a set of topics to access values published to those topics. For a registered stream to access the value of a topic, the topic type must match the stream and the client must be subscribed to the topic. Diffusion has two types of streams: Value streams and Fallbacl streams.

Subscribing to a topic causes the value of the topic to be sent from Diffusion™ Cloud to the client. Registering a stream that matches the topic enables the client to access these values. For more information, see Subscribing to topics

Value streams

Value streams are typed. Register value streams against a set of topics by using a topic selector. A value stream receives updates for any subscribed topics that match the value stream's type and the topic selector used when registering the value stream.

A value stream can have one of the following types:
JSON
JSON topics are routed to this type of stream.
Binary
Binary topics are routed to this type of stream.
String
String topics are routed to this type of stream.
Int64
Int64 topics are routed to this type of stream.
Double
Double topics are routed to this type of stream.
RecordV2
RecordV2 topics are routed to this type of stream.
Content
JSON, binary, string, int64, double, recordV2 and single value topics are routed to this type of stream.

If a value stream receives a delta update, this delta is automatically applied to a locally cached value so that the stream always receives full values.

Using a value stream

Register the typed stream against the topic or topics that you want the stream to receive updates from:

JavaScript
// Register a JSON value stream
session.addStream('topic_selector', diffusion.datatypes.json())
    .on('value', (path, specification, newValue, oldValue) => {
        // Action to take when update is received
    });

// Register a binary value stream
session.addStream('topic_selector', diffusion.datatypes.binary())
    .on('value', (path, specification, newValue, oldValue) => {
        // Action to take when update is received
    });
.NET
var session = Diffusion.Sessions
                    .Principal("admin")
                    .Credentials(Diffusion.Credentials.Password("password"))
                    .Open(serverUrl);

// Register a JSON value stream
session.Topics.AddStream("topic_selector", new DefaultValueStream<IJSON>() );

// Register a binary value stream
session.Topics.AddStream("topic_selector", new DefaultValueStream<IBinary>() );
Java and Android
final Session session = Diffusion.sessions().open("ws://localhost:8080");
final Topics topics = session.feature(Topics.class);

// Register a JSON value stream
topics.addStream(">my/topic/path", JSON.class,
    new Topics.ValueStream.Default<>());

// Register a binary value stream
topics.addStream(">my/topic/path", Binary.class,
    new Topics.ValueStream.Default<>());
C
// Register a JSON value stream
VALUE_STREAM_T json_value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_json_value
};
add_stream(session, topic_path, &json_value_stream);

// Register a binary value stream
VALUE_STREAM_T binary_value_stream = {
        .datatype = DATATYPE_BINARY,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_binary_value
};
add_stream(session, topic_path, &binary_value_stream);
Apple
// register a JSON value stream
let json_value_stream = PTDiffusionJSON.valueStream(with: self)
do {
    try session.topics.add(json_value_stream,
                           withSelectorExpression: ">public/json//",
                           error: ())
} catch {
    print("An error occurred: %@", error)
}

// register a binary value stream
let binary_value_stream = PTDiffusionBinary.valueStream(with: self)
do {
    try session.topics.add(binary_value_stream,
                           withSelectorExpression: ">public/binary//",
                           error: ())
} catch {
    print("An error occurred: %@", error)
}

Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.

The examples above show how to register a default or no-op value stream against a set of topics. The stream receives values from any topic in the set whose topic data type matches the stream data type.

To make use of the values sent to your client, implement a value stream that takes the required action when an update is received from a subscribed topic that matches the type of the stream:

JavaScript
session.addStream('topic_selector', diffusion.datatypes.json())
    .on({
        value : (topic, specification, newValue, oldValue) => {
            console.log('Update from: ', topic, newValue.get());
        },
        subscribe : (topic, specification) => {
            console.log('Subscribed to: ', topic);
        },
        unsubscribe : (topic, specification, reason) => {
            console.log('Unsubscribed from: ', topic);
        }
    });
.NET
/// <summary>
/// Basic implementation of the IValueStream for JSON topics.
/// </summary>
private sealed class JSONStream : IValueStream<IJSON>
{
    /// <summary>
    /// Notification of stream being closed normally.
    /// </summary>
    public void OnClose()
        => WriteLine("The subscrption stream is now closed.");

    /// <summary>
    /// Notification of a contextual error related to this callback.
    /// </summary>
    /// <param name="errorReason">Error reason.</param>
    public void OnError(ErrorReason errorReason)
        => WriteLine($"An error has occured : {errorReason}.");

    /// <summary>
    /// Notification of a successful subscription.
    /// </summary>
    /// <param name="topicPath">Topic path.</param>
    /// <param name="specification">Topic specification.</param>
    public void OnSubscription(string topicPath, ITopicSpecification specification)
        => WriteLine($"Client subscribed to topic '{topicPath}'.");

    /// <summary>
    /// Notification of a successful unsubscription.
    /// </summary>
    /// <param name="topicPath">Topic path.</param>
    /// <param name="specification">Topic specification.</param>
    /// <param name="reason">Error reason.</param>
    public void OnUnsubscription(string topicPath, ITopicSpecification specification,
                                 TopicUnsubscribeReason reason)
        => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");

    /// <summary>
    /// Topic update received.
    /// </summary>
    /// <param name="topicPath">Topic path.</param>
    /// <param name="specification">Topic specification.</param>
    /// <param name="oldValue">Value prior to update.</param>
    /// <param name="newValue">Value after update.</param>
    public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue,
                        IJSON newValue)
        => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}.");
}
Java and Android
private static class JSONStream extends ValueStream.Default<JSON> {
    @Override
    public void onValue(
        String topicPath,
        TopicSpecification specification,
        JSON oldValue,
        JSON newValue) {
        System.out.println(newValue.toJsonString());
    }
}
C
static int on_subscribe(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        void *context)
{
        // subscribed to `topic_path`
        return HANDLER_SUCCESS;
}


static int on_unsubscribe(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        // unsubscribed from `topic_path` due to `reason`
        return HANDLER_SUCCESS;
}


static int on_json_value(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        // handle json topic update
        return HANDLER_SUCCESS;
}
Apple
class RegisterStreams: PTDiffusionJSONValueStreamDelegate, PTDiffusionBinaryValueStreamDelegate {

    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldBinary: PTDiffusionBinary?,
                         newBinary: PTDiffusionBinary) {
        print("Binary topic %@ update: %@ --> %@", topicPath, oldBinary!, newBinary)
    }


    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldJSON oldJson: PTDiffusionJSON?,
                         newJSON newJson: PTDiffusionJSON) {
        print("JSON topic %@ update: %@ --> %@", topicPath, oldJson!, newJson)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didSubscribeToTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification) {
        print("Subscribed to topic %@", topicPath)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didUnsubscribeFromTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         reason: PTDiffusionTopicUnsubscriptionReason) {
        print("Unsubscribed from topic: %@. Reason: %@", topicPath, reason)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        print("Failed with error: %@", error.localizedDescription)
    }


    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Value stream closed")
    }

Registering a fallback stream

You can register one or more fallback streams to receive updates to subscribed topics that do not have a value stream registered against them:

JavaScript
session.addFallbackStream(diffusion.datatypes.json())
    .on('value', (topic, specification, newValue, oldValue) => {
        // Do something
    });
.NET
var session = Diffusion.Sessions
                    .Principal("admin")
                    .Credentials(Diffusion.Credentials.Password("password"))
                    .Open(serverUrl);

// Register a json fallback stream
session.Topics.AddFallbackStream<IJSON>( new DefaultValueStream<IJSON>() );
Java and Android
topics.addFallbackStream(JSON.class, new Topics.ValueStream.Default<>());
C
VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_json_value
};
add_fallback_stream(session, &value_stream);
Apple
// register self as the fallback handler for JSON value updates
let value_stream = PTDiffusionJSON.valueStream(with: self)

do {
    try session.topics.addFallbackStream(value_stream, error: ())
} catch {
    print ("An error occurred: %@", error)
}

A fallback value stream receives all updates for topics of the matching type that do not have a stream already registered against them.