Just a second...

Using streams for subscription

Register a stream against a set of topics to access values published to those topics. For a registered stream to access the value of a topic, the topic type must match the stream and the client must be subscribed to the topic.

Subscribing to a topic causes the value of the topic to be sent from Diffusion™ Cloud to the client. Registering a stream that matches the topic enables the client to access these values. For more information, see Subscribing to topics

Two kinds of stream are provided to receive updates from subscribed topics: value streams and topic streams.

Value streams

Value streams are typed. Register value streams against a set of topics by using a topic selector. A value stream receives updates for any subscribed topics that match the value stream's type and the topic selector used when registering the value stream.

A value stream can have one of the following types:
JSON
JSON topics are routed to this type of stream.
Binary
Binary topics are routed to this type of stream.
String
String topics are routed to this type of stream.
Int64
Int64 topics are routed to this type of stream.
Double
Double topics are routed to this type of stream.
RecordV2
RecordV2 topics are routed to this type of stream.
Content
JSON, binary, string, int64, double, recordV2 and single value topics are routed to this type of stream.

If a value stream receives a delta update, this delta is automatically applied to a locally cached value so that the stream always receives full values.

Using a value stream

Register the typed stream against the topic or topics that you want the stream to receive updates from:

JavaScript
// Register a JSON value stream
session.addStream('topic_selector', diffusion.datatypes.json())
       .on('value', function(path, specification, newValue, oldValue) {
            // Action to take when update is received
       });

// Register a binary value stream
session.addStream('topic_selector', diffusion.datatypes.binary())
       .on('value', function(path, specification, newValue, oldValue) {
            // Action to take when update is received
       });
                    
Apple
// Register a JSON value stream
PTDiffusionValueStream *const jsonValueStream = [PTDiffusionJSON valueStreamWithDelegate:self];
    [session.topics addStream : jsonValueStream,
                 withSelector : topic_selector];

// Register a binary value stream
PTDiffusionValueStream *const binaryValueStream = [PTDiffusionBinary valueStreamWithDelegate:self];
    [session.topics addStream : binaryValueStream,
                 withSelector : topic_selector];
                    
Java and Android
final Topics topics = session.feature(Topics.class);

// Register a JSON value stream
topics.addStream(topic_selector, JSON.class, new Topics.ValueStream.Default<JSON>());

// Register a binary value stream
topics.addStream(topic_selector, Binary.class, new Topics.ValueStream.Default<Binary>());
                    
.NET
var topics = session.Topics;

// Register a JSON value stream
topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IJSON>() );

// Register a binary value stream
topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IBinary>() );
                    

Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.

The examples above show how to register a default or no-op value stream against a set of topics. The stream receives values from any topic in the set whose topic data type matches the stream data type.

To make use of the values sent to your client, implement a value stream that takes the required action when an update is received from a subscribed topic that matches the type of the stream:

JavaScript
session.addStream('topic_selector', diffusion.datatypes.json())
    .on({
    value : function(topic, specification, newValue, oldValue) {
        console.log('Update from: ', topic, newValue.get());
    },
    subscribe : function(topic, specification) {
        console.log('Subscribed to: ', topic);
    },
    unsubscribe : function(topic, specification, reason) {
        console.log('Unsubscribed from: ', topic);
    }});
                    
Apple
@implementation JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate)

-(void)     diffusionStream:(PTDiffusionStream *const)stream
    didSubscribeToTopicPath:(NSString *const)topicPath
              specification:(PTDiffusionTopicSpecification *const)specification {
    NSLog(@"Subscribed: %@", topicPath);
}

-(void)diffusionStream:(PTDiffusionValueStream *const)stream
    didUpdateTopicPath:(NSString *const)topicPath
         specification:(PTDiffusionTopicSpecification *const)specification
               oldJSON:(PTDiffusionJSON *const)oldJSON
               newJSON:(PTDiffusionJSON *const)newJSON {

    NSError * error;
    NSDictionary *const map = [newJSON objectWithError:&error];
    if (!map) {
        NSLog(@"Failed to create map from received JSON. Error: %@", error);
        return;
    }

    // For the purposes of a meaningful example, only emit a log line if we
    // have a rate for GBP to USD.
    if ([currency isEqualToString:@"GBP"]) {
        const id rate = map[@"USD"];
        if (rate) {
            NSLog(@"Rate for GBP to USD: %@", rate);
        }
    }
}

-(void)         diffusionStream:(PTDiffusionStream *const)stream
    didUnsubscribeFromTopicPath:(NSString *const)topicPath
                  specification:(PTDiffusionTopicSpecification *const)specification
                         reason:(const PTDiffusionTopicUnsubscriptionReason)reason {
    NSLog(@"Unsubscribed: %@", topicPath);
}

@end
                    
Java and Android
private class JSONStream extends ValueStream.Default<JSON> {
    @Override
    public void onValue(
        String topicPath,
        TopicSpecification specification,
        JSON oldValue,
        JSON newValue) {
        LOG.info(newValue.toJsonString());
    }
}
                    
.NET
/// Basic implementation of the IValueStream for JSON topics.

private sealed class JSONStream : IValueStream<IJSON> {

/// Notification of stream being closed normally.

public void OnClose()
=> WriteLine( "The subscrption stream is now closed." );

/// Notification of a contextual error related to this callback.

/// Situations in which OnError is called include the session being closed, a communication
/// timeout, or a problem with the provided parameters. No further calls will be made to this callback.

public void OnError( ErrorReason errorReason )
=> WriteLine( $"An error has occured : {errorReason}." );

/// Notification of a successful subscription.

public void OnSubscription( string topicPath, ITopicSpecification specification )
=> WriteLine( $"Client subscribed to {topicPath}." );

/// Notification of a successful unsubscription.

public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason )
=> WriteLine( $"Client unsubscribed from {topicPath} : {reason}." );

/// Topic update received.

public void OnValue( string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue )
=> WriteLine( $"New value of {topicPath} is {newValue.ToJSONString()}." );
}
                    

Topic streams

Note: Where a value stream is available for your topic type, we recommend you use a value stream instead of a topic stream.

Topic streams are not typed and are used to receive value and delta updates for all subscribed topics that match the topic selectors used when registering the value stream.

This type of stream provides the value and the deltas but relies upon the application to apply the deltas to a client-maintained current value. It is important, when using a topic stream with a record topic, to register the stream before subscribing to the topic. This ensures that a full value is received by the subscribing client.

Using a topic stream

Register the stream against the topic or topics that you want the stream to receive updates from:

JavaScript
session.stream('topic_selector')
       .on('update', function(update, topic) {
            // Do something
       });
Apple
// Register self as the handler for topic updates on a set of topics.
[session.topics addTopicStreamWithSelector : topic_selector,
                                  delegate : self];
Java and Android
Topics topics = session.feature(Topics.class);
// Add a topic stream that you implemented elsewhere
topics.addTopicStream(topic_selector, new myTopicStream(data));
                    
.NET
var topics = session.Topics;

// Add a topic stream that you implemented elsewhere
topics.AddTopicStream( topic_selector, myTopicStream );
                    

Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.

Registering a fallback stream

You can register one or more fallback streams to receive updates to subscribed topics that do not have a value stream or topic stream registered against them:

JavaScript
session.addFallbackStream(diffusion.datatypes.json())
     .on('value', function(topic, specification, newValue, oldValue) {
         // Do something
     });
                    
Apple
// Register self as the fallback handler for JSON value updates.
PTDiffusionValueStream *const valueStream = [PTDiffusionJSON valueStreamWithDelegate:self];
[session.topics addFallbackStream:valueStream];
                    
Java and Android
final Topics topics = session.feature(Topics.class);

topics.addFallbackStream(topic_selector, JSON.class, new Topics.ValueStream.Default());
                    
.NET
var topics = session.Topics;

topics.AddFallbackStream<IJSON>( new Topics.DefaultValueStream<IJSON>() );
                    
C
/*
 * Install a global topic handler to capture messages for
 * topics we haven't explicitly subscribed to, and therefore
 * don't have a specific handler for.
 */
session->global_topic_handler = on_unexpected_topic_message;
                    

A fallback value stream receives all updates for topics of the matching type that do not have a stream already registered against them.

A fallback topic stream receives all updates for topics of any type that do not have a stream already registered against them.