Example: Subscribe to a topic
The following examples use the Unified API to subscribe to topics and assign handlers to topics to receive the topic content.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true }).then(function(session) { // 1. Subscriptions are how sessions receive streams of data from the server. // When subscribing, a topic selector is used to select which topics to subscribe to. Topics do not need to exist // at the time of subscription - the server dynamically resolves subscriptions as topics are added or removed. // Subscribe to the "foo" topic with an inline callback function var subscription = session.subscribe('foo', function(update) { // Log the new value whenever the 'foo' topic is updated // By default, we get a Buffer object which preserves binary // data. console.log(update); }); // Callbacks can also be registered after the subscription has occurred subscription.on({ update : function(value, topic) { console.log('Update for topic: ' + topic, value); }, subscribe : function(details, topic) { console.log('Subscribed to topic: ' + topic); }, unsubscribe : function(reason, topic) { console.log('Unsubscribed from topic:' + topic); subscription.close(); } }); // 2. Sessions may unsubscribe from any topic to stop receiving data // Unsubscribe from the "foo" topic. Sessions do not need to have previously been subscribed to the topics they are // unsubscribing from. Unsubscribing from a topic will result in the 'unsubscribe' callback registered above being // called. session.unsubscribe('foo'); // 3. Subscriptions / Unsubscriptions can select multiple topics using Topic Selectors // Topic Selectors provide regex-like capabilities for subscribing to topics. These are resolved dynamically, much // like subscribing to a single topic. var subscription2 = session.subscribe('?foo/.*/[a-z]'); // 4. Subscriptions can use transformers to convert update values // Subscribe to a topic and then convert all received values to JSON. Transforming a subscription creates a new // subscription stream, rather than modifying the original. This assumes that the topic is a single value topic // receiving stringified JSON and is not a JSON topic. session.subscribe('bar').transform(JSON.parse).on('update', function(value, topic) { console.log('Got JSON update for topic: ' + topic, value); }); // 5. Metadata can be used within transformers to parse data // Create a simple metadata instance var meta = new diffusion.metadata.RecordContent(); // Add a single record/field meta.addRecord('record', { 'field' : meta.string('some-value') }); // Subscribe to a topic and transform with the metadata session.subscribe('baz').transform(meta).on('update', function(value) { console.log('Field value: ', value.get('record').get('field')); }); });
Apple
@import Diffusion; @interface SubscribeUnsubscribeExample (PTDiffusionTopicStreamDelegate) <PTDiffusionTopicStreamDelegate> @end @implementation SubscribeUnsubscribeExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Register self as the fallback handler for topic updates. [session.topics addFallbackTopicStreamWithDelegate:self]; // Wait 5 seconds and then subscribe. [self performSelector:@selector(subscribe:) withObject:session afterDelay:5.0]; }]; } static NSString *const _TopicSelectorExpression = @"*Assets//"; -(void)subscribe:(const id)object { PTDiffusionSession *const session = object; NSLog(@"Subscribing..."); [session.topics subscribeWithTopicSelectorExpression:_TopicSelectorExpression completionHandler:^(NSError * const error) { if (error) { NSLog(@"Subscribe request failed. Error: %@", error); } else { NSLog(@"Subscribe request succeeded."); // Wait 5 seconds and then unsubscribe. [self performSelector:@selector(unsubscribe:) withObject:session afterDelay:5.0]; } }]; } -(void)unsubscribe:(const id)object { PTDiffusionSession *const session = object; NSLog(@"Unsubscribing..."); [session.topics unsubscribeFromTopicSelectorExpression:_TopicSelectorExpression completionHandler:^(NSError * const error) { if (error) { NSLog(@"Unsubscribe request failed. Error: %@", error); } else { NSLog(@"Unsubscribe request succeeded."); // Wait 5 seconds and then subscribe. [self performSelector:@selector(subscribe:) withObject:session afterDelay:5.0]; } }]; } @end @implementation SubscribeUnsubscribeExample (PTDiffusionTopicStreamDelegate) -(void)diffusionStream:(PTDiffusionStream * const)stream didUpdateTopicPath:(NSString * const)topicPath content:(PTDiffusionContent * const)content context:(PTDiffusionUpdateContext * const)context { NSString *const string = [[NSString alloc] initWithData:content.data encoding:NSUTF8StringEncoding]; NSLog(@"\t%@ = \"%@\"", topicPath, string); } -(void) diffusionStream:(PTDiffusionStream * const)stream didSubscribeToTopicPath:(NSString * const)topicPath details:(PTDiffusionTopicDetails * const)details { NSLog(@"Subscribed: \"%@\" (%@)", topicPath, details); } -(void) diffusionStream:(PTDiffusionStream *const)stream didUnsubscribeFromTopicPath:(NSString *const)topicPath reason:(const PTDiffusionTopicUnsubscriptionReason)reason { NSLog(@"Unsubscribed: \"%@\" [Reason: %@]", topicPath, PTDiffusionTopicUnsubscriptionReasonToString(reason)); } @end
Java
and Android
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.content.Content; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.ValueStream; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.datatype.json.JSON; /** * In this simple and commonest case for a client we just subscribe to a few * topics and assign handlers for each to receive content. * <P> * This makes use of the 'Topics' feature only. * <P> * To subscribe to a topic, the client session must have the * 'select_topic' and 'read_topic' permissions for that branch of the * topic tree. * * @author Push Technology Limited * @since 5.0 */ public final class ClientSimpleSubscriber { private static final Logger LOG = LoggerFactory.getLogger(ClientSimpleSubscriber.class); private final Session session; /** * Constructor. */ public ClientSimpleSubscriber() { session = Diffusion.sessions().principal("client").password("password") .open("ws://diffusion.example.com:80"); // Use the Topics feature to add a topic stream for // Foo and all topics under Bar and request subscription to those topics final Topics topics = session.feature(Topics.class); topics.addStream(">Foo", Content.class, new FooStream()); topics.addStream(">Bar//", JSON.class, new BarStream()); topics.subscribe( Diffusion.topicSelectors().anyOf("Foo", "Bar//"), new Topics.CompletionCallback.Default()); } /** * Close session. */ public void close() { session.close(); } /** * The stream for all messages on the 'Foo' topic. */ private class FooStream extends ValueStream.Default<Content> { @Override public void onValue( String topicPath, TopicSpecification specification, Content oldValue, Content newValue) { LOG.info(newValue.asString()); } } /** * The stream for all messages on 'Bar' topics which are JSON topics. */ private class BarStream extends ValueStream.Default<JSON> { @Override public void onValue( String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) { LOG.info(newValue.toJsonString()); } } }
.NET
using System; using System.Threading; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Content; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; namespace PushTechnology.ClientInterface.GettingStarted { /// <summary> /// A client that subscribes to the topic 'foo/counter'. /// </summary> public sealed class SubscribingClient { public static void Main( string[] args ) { // Connect anonymously var session = Diffusion.Sessions.Open( "ws://localhost:8080" ); // Get the Topics feature to subscribe to topics var topics = session.GetTopicsFeature(); // Add a topic stream for 'foo/counter' and request subscription topics.AddStream( ">foo/counter", new CounterTopicStream() ); topics.Subscribe( ">foo/counter", new TopicsCompletionCallbackDefault() ); // Stay connected for 1 minute Thread.Sleep( TimeSpan.FromMinutes( 1 ) ); session.Close(); } } /// <summary> /// A simple IValueStream implementation. /// </summary> internal sealed class CounterTopicStream : IValueStream<IContent> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() { Console.WriteLine( "The subscription stream is now closed." ); } /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <remarks> /// Situations in which <code>OnError</code> is called include the session being closed, a communication /// timeout, or a problem with the provided parameters. No further calls will be made to this callback. /// </remarks> /// <param name="errorReason"></param> public void OnError( ErrorReason errorReason ) { Console.WriteLine( "An error has occured : {0}", errorReason ); } /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath"></param> /// <param name="specification"></param> public void OnSubscription( string topicPath, ITopicSpecification specification ) { Console.WriteLine( "Client subscribed to {0} ", topicPath ); } /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">topic</param> /// <param name="specification">the specification of the topic</param> /// <param name="reason">error reason</param> public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason ) { Console.WriteLine( "Client unsubscribed from {0} : {1}", topicPath, reason ); } /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">topic</param> /// <param name="specification">the specification of the topic</param> /// <param name="oldValue">value prior to update</param> /// <param name="newValue">value after update</param> public void OnValue( string topicPath, ITopicSpecification specification, IContent oldValue, IContent newValue ) { Console.WriteLine( "New value of {0} is {1}", topicPath, newValue.AsString() ); } } }
C
/* * This is a sample client which connects to Diffusion and subscribes * to topics using a user-specified selector. Any messages received on * those topics are then displayed to standard output. */ #include <stdio.h> #include <unistd.h> #include "diffusion.h" #include "args.h" ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'t', "topic_selector", "Topic selector", ARG_REQUIRED, ARG_HAS_VALUE, NULL}, END_OF_ARG_OPTS }; /* * This callback is used when the session state changes, e.g. when a session * moves from a "connecting" to a "connected" state, or from "connected" to * "closed". */ static void on_session_state_changed(SESSION_T *session, const SESSION_STATE_T old_state, const SESSION_STATE_T new_state) { printf("Session state changed from %s (%d) to %s (%d)\n", session_state_as_string(old_state), old_state, session_state_as_string(new_state), new_state); } /* * When a subscribed message is received, this callback is invoked. */ static int on_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg) { printf("Received message for topic %s\n", msg->name); printf("Payload: (%d bytes) %.*s\n", (int)msg->payload->len, (int)msg->payload->len, msg->payload->data); hexdump_buf(msg->payload); return HANDLER_SUCCESS; } /* * This callback is fired when Diffusion responds to say that a topic * subscription request has been received and processed. */ static int on_subscribe(SESSION_T *session, void *context_data) { printf("on_subscribe\n"); return HANDLER_SUCCESS; } /* * This is callback is for when Diffusion response to an unsubscription * request to a topic, and only indicates that the request has been received. */ static int on_unsubscribe(SESSION_T *session, void *context_data) { printf("on_unsubscribe\n"); return HANDLER_SUCCESS; } /* * Publishers and control clients may choose to subscribe any other client to * a topic of their choice at any time. We register this callback to capture * messages from these topics and display them. */ static int on_unexpected_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg) { printf("Received a message for a topic we didn't subscribe to (%s)\n", msg->name); printf("Payload: %.*s\n", (int)msg->payload->len, msg->payload->data); return HANDLER_SUCCESS; } /* * We use this callback when Diffusion notifies us that we've been subscribed * to a topic. Note that this could be called for topics that we haven't * explicitly subscribed to - other control clients or publishers may ask to * subscribe us to a topic. */ static int on_notify_subscription(SESSION_T *session, const SVC_NOTIFY_SUBSCRIPTION_REQUEST_T *request, void *context) { printf("on_notify_subscription: %d: \"%s\"\n", request->topic_info.topic_id, request->topic_info.topic_path); return HANDLER_SUCCESS; } /* * This callback is used when we receive notification that this client has been * unsubscribed from a specific topic. Causes of the unsubscription are the same * as those for subscription. */ static int on_notify_unsubscription(SESSION_T *session, const SVC_NOTIFY_UNSUBSCRIPTION_REQUEST_T *request, void *context) { printf("on_notify_unsubscription: ID: %d, Path: %s, Reason: %d\n", request->topic_id, request->topic_path, request->reason); return HANDLER_SUCCESS; } int main(int argc, char **argv) { /* * Standard command-line parsing */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } char *url = hash_get(options, "url"); char *topic = hash_get(options, "topic_selector"); /* * A SESSION_LISTENER_T holds callbacks to inform the client * about changes to the state. Used here for informational * purposes only. */ SESSION_LISTENER_T session_listener = { 0 }; session_listener.on_state_changed = &on_session_state_changed; /* * Creating a session requires at least a URL. Creating a * session initiates a connection with Diffusion. */ DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = NULL; session = session_create(url, NULL, NULL, &session_listener, NULL, &error); if(session == NULL) { fprintf(stderr, "TEST: Failed to create session\n"); fprintf(stderr, "ERR : %s\n", error.message); return EXIT_FAILURE; } /* * When issuing commands to Diffusion (in this case, subscribe * to a topic), it's typical that more than one message may be * received in response and a handler can be installed for * each message type. In the case of subscription, we can * install handlers for: * 1. The topic message data (on_topic_message). * 2. Notification that the subscription has been received * (on_subscribe). * 3. Topic details (on_topic_details). */ notify_subscription_register(session,(NOTIFY_SUBSCRIPTION_PARAMS_T) { .on_notify_subscription = on_notify_subscription }); notify_unsubscription_register(session, (NOTIFY_UNSUBSCRIPTION_PARAMS_T) { .on_notify_unsubscription = on_notify_unsubscription }); subscribe(session, (SUBSCRIPTION_PARAMS_T) { .topic_selector = topic, .on_topic_message = on_topic_message, .on_subscribe = on_subscribe }); /* * Install a global topic handler to capture messages for * topics we haven't explicitly subscribed to, and therefore * don't have a specific handler for. */ session->global_topic_handler = on_unexpected_topic_message; /* * Receive messages for 5 seconds. */ sleep(5); /* * Unsubscribe from the topic */ unsubscribe(session, (UNSUBSCRIPTION_PARAMS_T) {.topic_selector = topic, .on_unsubscribe = on_unsubscribe} ); /* * Wait for any unsubscription notifications to be received. */ sleep(5); /* * Politely tell Diffusion we're closing down. */ session_close(session, NULL); session_free(session); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2017/06/29