Just a second...

Example: Subscribe to a topic

The following examples use the Unified API to subscribe to topics and assign handlers to topics to receive the topic content.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
}).then(function(session) {

    // 1. Subscriptions are how sessions receive streams of data from the server.

    // When subscribing, a topic selector is used to select which topics to subscribe to. Topics do not need to exist
    // at the time of subscription - the server dynamically resolves subscriptions as topics are added or removed.

    // Subscribe to the "foo" topic with an inline callback function
    var subscription = session.subscribe('foo', function(update) {
        // Log the new value whenever the 'foo' topic is updated
        // By default, we get a Buffer object which preserves binary
        // data.
        console.log(update);
    });

    // Callbacks can also be registered after the subscription has occurred
    subscription.on({
        update : function(value, topic) {
            console.log('Update for topic: ' + topic, value);
        },
        subscribe : function(details, topic) {
            console.log('Subscribed to topic: ' + topic);
        },
        unsubscribe : function(reason, topic) {
            console.log('Unsubscribed from topic:' + topic);
            subscription.close();
        }
    });

    // 2. Sessions may unsubscribe from any topic to stop receiving data

    // Unsubscribe from the "foo" topic. Sessions do not need to have previously been subscribed to the topics they are
    // unsubscribing from. Unsubscribing from a topic will result in the 'unsubscribe' callback registered above being
    // called.
    session.unsubscribe('foo');

    // 3. Subscriptions / Unsubscriptions can select multiple topics using Topic Selectors

    // Topic Selectors provide regex-like capabilities for subscribing to topics. These are resolved dynamically, much
    // like subscribing to a single topic.
    var subscription2 = session.subscribe('?foo/.*/[a-z]');

    // 4. Subscriptions can use transformers to convert update values

    // Subscribe to a topic and then convert all received values to JSON. Transforming a subscription creates a new
    // subscription stream, rather than modifying the original. This assumes that the topic is a single value topic
    // receiving stringified JSON and is not a JSON topic.
    session.subscribe('bar').transform(JSON.parse).on('update', function(value, topic) {
        console.log('Got JSON update for topic: ' + topic, value);
    });

    // 5. Metadata can be used within transformers to parse data

    // Create a simple metadata instance
    var meta = new diffusion.metadata.RecordContent();

    // Add a single record/field
    meta.addRecord('record', {
        'field' : meta.string('some-value')
    });

    // Subscribe to a topic and transform with the metadata
    session.subscribe('baz').transform(meta).on('update', function(value) {
        console.log('Field value: ', value.get('record').get('field'));
    });
});
Apple
@import Diffusion;

@interface SubscribeUnsubscribeExample (PTDiffusionTopicStreamDelegate) <PTDiffusionTopicStreamDelegate>
@end

@implementation SubscribeUnsubscribeExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    NSLog(@"Connecting...");

    [PTDiffusionSession openWithURL:url
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Register self as the fallback handler for topic updates.
        [session.topics addFallbackTopicStreamWithDelegate:self];

        // Wait 5 seconds and then subscribe.
        [self performSelector:@selector(subscribe:) withObject:session afterDelay:5.0];
    }];
}

static NSString *const _TopicSelectorExpression = @"*Assets//";

-(void)subscribe:(const id)object {
    PTDiffusionSession *const session = object;

    NSLog(@"Subscribing...");
    [session.topics subscribeWithTopicSelectorExpression:_TopicSelectorExpression
                                       completionHandler:^(NSError * const error)
    {
        if (error) {
            NSLog(@"Subscribe request failed. Error: %@", error);
        } else {
            NSLog(@"Subscribe request succeeded.");

            // Wait 5 seconds and then unsubscribe.
            [self performSelector:@selector(unsubscribe:) withObject:session afterDelay:5.0];
        }
    }];
}

-(void)unsubscribe:(const id)object {
    PTDiffusionSession *const session = object;

    NSLog(@"Unsubscribing...");
    [session.topics unsubscribeFromTopicSelectorExpression:_TopicSelectorExpression
                                         completionHandler:^(NSError * const error)
    {
        if (error) {
            NSLog(@"Unsubscribe request failed. Error: %@", error);
        } else {
            NSLog(@"Unsubscribe request succeeded.");

            // Wait 5 seconds and then subscribe.
            [self performSelector:@selector(subscribe:) withObject:session afterDelay:5.0];
        }
    }];
}

@end

@implementation SubscribeUnsubscribeExample (PTDiffusionTopicStreamDelegate)

-(void)diffusionStream:(PTDiffusionStream * const)stream
    didUpdateTopicPath:(NSString * const)topicPath
               content:(PTDiffusionContent * const)content
               context:(PTDiffusionUpdateContext * const)context {
    NSString *const string = [[NSString alloc] initWithData:content.data encoding:NSUTF8StringEncoding];
    NSLog(@"\t%@ = \"%@\"", topicPath, string);
}

-(void)     diffusionStream:(PTDiffusionStream * const)stream
    didSubscribeToTopicPath:(NSString * const)topicPath
                    details:(PTDiffusionTopicDetails * const)details {
    NSLog(@"Subscribed: \"%@\" (%@)", topicPath, details);
}

-(void)         diffusionStream:(PTDiffusionStream *const)stream
    didUnsubscribeFromTopicPath:(NSString *const)topicPath
                         reason:(const PTDiffusionTopicUnsubscriptionReason)reason {
    NSLog(@"Unsubscribed: \"%@\" [Reason: %@]", topicPath, PTDiffusionTopicUnsubscriptionReasonToString(reason));
}

@end
Java and Android
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.content.Content;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;

/**
 * In this simple and commonest case for a client we just subscribe to a few
 * topics and assign handlers for each to receive content.
 * <P>
 * This makes use of the 'Topics' feature only.
 * <P>
 * To subscribe to a topic, the client session must have the
 * 'select_topic' and 'read_topic' permissions for that branch of the
 * topic tree.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public final class ClientSimpleSubscriber {

    private static final Logger LOG =
        LoggerFactory.getLogger(ClientSimpleSubscriber.class);

    private final Session session;

    /**
     * Constructor.
     */
    public ClientSimpleSubscriber() {

        session =
            Diffusion.sessions().principal("client").password("password")
                .open("ws://diffusion.example.com:80");

        // Use the Topics feature to add a topic stream for
        // Foo and all topics under Bar and request subscription to those topics
        final Topics topics = session.feature(Topics.class);
        topics.addStream(">Foo", Content.class, new FooStream());
        topics.addStream(">Bar//", JSON.class, new BarStream());
        topics.subscribe(
            Diffusion.topicSelectors().anyOf("Foo", "Bar//"),
            new Topics.CompletionCallback.Default());
    }

    /**
     * Close session.
     */
    public void close() {
        session.close();
    }

    /**
     * The stream for all messages on the 'Foo' topic.
     */
    private class FooStream extends ValueStream.Default<Content> {
        @Override
        public void onValue(
            String topicPath,
            TopicSpecification specification,
            Content oldValue,
            Content newValue) {
            LOG.info(newValue.asString());
        }
    }

    /**
     * The stream for all messages on 'Bar' topics which are JSON topics.
     */
    private class BarStream extends ValueStream.Default<JSON> {
        @Override
        public void onValue(
            String topicPath,
            TopicSpecification specification,
            JSON oldValue,
            JSON newValue) {
            LOG.info(newValue.toJsonString());
        }
    }
}
.NET
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Content;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;

namespace PushTechnology.ClientInterface.GettingStarted {
    /// <summary>
    /// A client that subscribes to the topic 'foo/counter'.
    /// </summary>
    public sealed class SubscribingClient {
        public static void Main( string[] args ) {

            // Connect anonymously
            var session = Diffusion.Sessions.Open( "ws://localhost:8080" );

            // Get the Topics feature to subscribe to topics
            var topics = session.GetTopicsFeature();

            // Add a topic stream for 'foo/counter' and request subscription
            topics.AddStream( ">foo/counter", new CounterTopicStream() );

            topics.Subscribe( ">foo/counter", new TopicsCompletionCallbackDefault() );

            // Stay connected for 1 minute
            Thread.Sleep( TimeSpan.FromMinutes( 1 ) );

            session.Close();
        }
    }

    /// <summary>
    /// A simple IValueStream implementation.
    /// </summary>
    internal sealed class CounterTopicStream : IValueStream<IContent> {
        /// <summary>
        /// Notification of stream being closed normally.
        /// </summary>
        public void OnClose() {
            Console.WriteLine( "The subscription stream is now closed." );
        }
        /// <summary>
        /// Notification of a contextual error related to this callback.
        /// </summary>
        /// <remarks>
        /// Situations in which <code>OnError</code> is called include the session being closed, a communication
        /// timeout, or a problem with the provided parameters. No further calls will be made to this callback.
        /// </remarks>
        /// <param name="errorReason"></param>
        public void OnError( ErrorReason errorReason ) {
            Console.WriteLine( "An error has occured  : {0}", errorReason );
        }
        /// <summary>
        /// Notification of a successful subscription.
        /// </summary>
        /// <param name="topicPath"></param>
        /// <param name="specification"></param>
        public void OnSubscription( string topicPath, ITopicSpecification specification ) {
            Console.WriteLine( "Client subscribed to {0} ", topicPath );
        }
        /// <summary>
        /// Notification of a successful unsubscription.
        /// </summary>
        /// <param name="topicPath">topic</param>
        /// <param name="specification">the specification of the topic</param>
        /// <param name="reason">error reason</param>
        public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason ) {
            Console.WriteLine( "Client unsubscribed from {0} : {1}", topicPath, reason );
        }

        /// <summary>
        /// Topic update received.
        /// </summary>
        /// <param name="topicPath">topic</param>
        /// <param name="specification">the specification of the topic</param>
        /// <param name="oldValue">value prior to update</param>
        /// <param name="newValue">value after update</param>
        public void OnValue( string topicPath, ITopicSpecification specification, IContent oldValue, IContent newValue ) {
            Console.WriteLine( "New value of {0} is {1}", topicPath, newValue.AsString() );
        }
    }
}
C
/*
 * This is a sample client which connects to Diffusion and subscribes
 * to topics using a user-specified selector. Any messages received on
 * those topics are then displayed to standard output.
 */

#include <stdio.h>
#include <unistd.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'t', "topic_selector", "Topic selector", ARG_REQUIRED, ARG_HAS_VALUE, NULL},
        END_OF_ARG_OPTS
};

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void
on_session_state_changed(SESSION_T *session,
                         const SESSION_STATE_T old_state,
                         const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}

/*
 * When a subscribed message is received, this callback is invoked.
 */
static int
on_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg)
{
        printf("Received message for topic %s\n", msg->name);
        printf("Payload: (%d bytes) %.*s\n",
               (int)msg->payload->len,
               (int)msg->payload->len,
               msg->payload->data);

        hexdump_buf(msg->payload);

        return HANDLER_SUCCESS;
}

/*
 * This callback is fired when Diffusion responds to say that a topic
 * subscription request has been received and processed.
 */
static int
on_subscribe(SESSION_T *session, void *context_data)
{
        printf("on_subscribe\n");
        return HANDLER_SUCCESS;
}

/*
 * This is callback is for when Diffusion response to an unsubscription
 * request to a topic, and only indicates that the request has been received.
 */
static int
on_unsubscribe(SESSION_T *session, void *context_data)
{
        printf("on_unsubscribe\n");
        return HANDLER_SUCCESS;
}

/*
 * Publishers and control clients may choose to subscribe any other client to
 * a topic of their choice at any time. We register this callback to capture
 * messages from these topics and display them.
 */
static int
on_unexpected_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg)
{
        printf("Received a message for a topic we didn't subscribe to (%s)\n", msg->name);
        printf("Payload: %.*s\n", (int)msg->payload->len, msg->payload->data);
        return HANDLER_SUCCESS;
}

/*
 * We use this callback when Diffusion notifies us that we've been subscribed
 * to a topic. Note that this could be called for topics that we haven't
 * explicitly subscribed to - other control clients or publishers may ask to
 * subscribe us to a topic.
 */
static int
on_notify_subscription(SESSION_T *session, const SVC_NOTIFY_SUBSCRIPTION_REQUEST_T *request, void *context)
{
        printf("on_notify_subscription: %d: \"%s\"\n",
               request->topic_info.topic_id,
               request->topic_info.topic_path);
        return HANDLER_SUCCESS;
}

/*
 * This callback is used when we receive notification that this client has been
 * unsubscribed from a specific topic. Causes of the unsubscription are the same
 * as those for subscription.
 */
static int
on_notify_unsubscription(SESSION_T *session, const SVC_NOTIFY_UNSUBSCRIPTION_REQUEST_T *request, void *context)
{
        printf("on_notify_unsubscription: ID: %d, Path: %s, Reason: %d\n",
               request->topic_id,
               request->topic_path,
               request->reason);
        return HANDLER_SUCCESS;
}

int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        char *url = hash_get(options, "url");
        char *topic = hash_get(options, "topic_selector");

        /*
         * A SESSION_LISTENER_T holds callbacks to inform the client
         * about changes to the state. Used here for informational
         * purposes only.
         */
        SESSION_LISTENER_T session_listener = { 0 };
        session_listener.on_state_changed = &on_session_state_changed;

        /*
         * Creating a session requires at least a URL. Creating a
         * session initiates a connection with Diffusion.
         */
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = NULL;
        session = session_create(url, NULL, NULL, &session_listener, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        /*
         * When issuing commands to Diffusion (in this case, subscribe
         * to a topic), it's typical that more than one message may be
         * received in response and a handler can be installed for
         * each message type. In the case of subscription, we can
         * install handlers for:
         * 1. The topic message data (on_topic_message).
         * 2. Notification that the subscription has been received
         *    (on_subscribe).
         * 3. Topic details (on_topic_details).
         */
        notify_subscription_register(session,(NOTIFY_SUBSCRIPTION_PARAMS_T) { .on_notify_subscription = on_notify_subscription });
        notify_unsubscription_register(session, (NOTIFY_UNSUBSCRIPTION_PARAMS_T) { .on_notify_unsubscription = on_notify_unsubscription });

        subscribe(session, (SUBSCRIPTION_PARAMS_T) { .topic_selector = topic, .on_topic_message = on_topic_message, .on_subscribe = on_subscribe });

        /*
         * Install a global topic handler to capture messages for
         * topics we haven't explicitly subscribed to, and therefore
         * don't have a specific handler for.
         */
        session->global_topic_handler = on_unexpected_topic_message;

        /*
         * Receive messages for 5 seconds.
         */
        sleep(5);

        /*
         * Unsubscribe from the topic
         */
        unsubscribe(session, (UNSUBSCRIPTION_PARAMS_T) {.topic_selector = topic, .on_unsubscribe = on_unsubscribe} );

        /*
         * Wait for any unsubscription notifications to be received.
         */
        sleep(5);

        /*
         * Politely tell Diffusion we're closing down.
         */
        session_close(session, NULL);
        session_free(session);

        return EXIT_SUCCESS;
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.