Example: Receive missing topic notifications
The following examples use the TopicControl feature in the Unified API to register a missing topic notification handler.
JavaScript
var diffusion = require('diffusion'); // Connect to the server. Change these options to suit your own environment. // Node.js will not accept self-signed certificates by default. If you have // one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0 // before running this example. diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true }).then(function(session) { // Register a missing topic handler on the "example" root topic // Any subscriptions to missing topics along this path will invoke this handler session.topics.addMissingTopicHandler("example", { // Called when a handler is successfully registered onRegister : function(path, close) { console.log("Registered missing topic handler on path: " + path); // Once we've registered the handler, we initiate a subscription with the selector "?example/topic/.*" // This will invoke the handler. session.subscribe("?example/topic/.*").on('subscribe', function(type, path) { console.log("Subscribed to topic: " + path); }); }, // Called when the handler is closed onClose : function(path) { console.log("Missing topic handler on path '" + path + "' has been closed"); }, // Called if there is an error on the handler onError : function(path, error) { console.log("Error on missing topic handler"); }, // Called when we've received a missing topic notification on our registered handler path onMissingTopic : function(notification) { console.log("Received missing topic notification with selector: " + notification.selector); // Once we've received the missing topic notification initiated from subscribing to "?example/topic/.*", // we add a topic that will match the selector var topic = "example/topic/foo"; session.topics.add(topic).then(function(result) { console.log("Topic add success: " + topic); // If the topic addition is successful, we proceed() with the session's subscription. // The client will now be subscribed to the topic notification.proceed(); }, function(reason) { console.log("Topic add failed: " + reason); // If the topic addition fails, we cancel() the session's subscription request. notification.cancel(); }); } }); });
Apple
@import Diffusion; @interface MissingTopicHandlerExample (PTDiffusionMissingTopicHandler) <PTDiffusionMissingTopicHandler> @end @implementation MissingTopicHandlerExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { PTDiffusionCredentials *const credentials = [[PTDiffusionCredentials alloc] initWithPassword:@"password"]; PTDiffusionSessionConfiguration *const sessionConfiguration = [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control" credentials:credentials]; NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url configuration:sessionConfiguration completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Register as missing topic handler for a branch of the topic tree. [self registerAsMissingTopicHandlerForSession:session]; }]; } -(void)registerAsMissingTopicHandlerForSession:(PTDiffusionSession *const)session { [session.topicControl addMissingTopicHandler:self forTopicPath:@"Example/Control Client Handler" completionHandler:^(PTDiffusionTopicTreeRegistration *const registration, NSError *const error) { if (registration) { NSLog(@"Registered as missing topic handler."); } else { NSLog(@"Failed to register as missing topic handler. Error: %@", error); } }]; } @end @implementation MissingTopicHandlerExample (PTDiffusionMissingTopicHandler) -(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *const)registration hadMissingTopicNotification:(PTDiffusionMissingTopicNotification *const)notification { NSString *const expression = notification.topicSelectorExpression; NSLog(@"Received Missing Topic Notification: %@", expression); // Expect a path pattern expression. if (![expression hasPrefix:@">"]) { NSLog(@"Topic selector expression is not a path pattern."); return; } // Extract topic path from path pattern expression. NSString *const topicPath = [expression substringFromIndex:1]; // Add a stateless topic at this topic path. [_session.topicControl addWithTopicPath:topicPath type:PTDiffusionTopicType_Stateless value:nil completionHandler:^(NSError *const error) { if (error) { NSLog(@"Failed to add topic."); return; } // Topic added so allow subscriber to proceed. [notification proceed]; }]; } @end
Java
and Android
package com.pushtechnology.diffusion.examples; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.RegisteredHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicAddFailReason; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicDetails; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * An example of registering a missing topic notification handler and processing * notifications using a control client. * * @author Push Technology Limited */ public final class ControlClientHandlingMissingTopicNotification { // UCI features private final Session session; private final TopicControl topicControl; private final TopicDetails details; /** * Constructor. */ public ControlClientHandlingMissingTopicNotification() { // Create a session session = Diffusion.sessions().password("password").principal("admin").open("ws://diffusion.example.com:8080"); topicControl = session.feature(TopicControl.class); // Registers a missing topic notification on a topic path topicControl.addMissingTopicHandler("A", new MissingTopicNotificationHandler()); // For details that may be reused many times it is far more efficient to // create just once - this creates a default string type details. details = topicControl.newDetails(TopicType.SINGLE_VALUE); } // Private class that implements MissingTopicHandler interface private final class MissingTopicNotificationHandler implements MissingTopicHandler { /** * @param topicPath * - the path that the handler is active for * @param registeredHandler * - allows the handler to be closed */ @Override public void onActive(String topicPath, RegisteredHandler registeredHandler) { } /** * @param topicPath * - the branch of the topic tree for which the handler was * registered */ @Override public void onClose(String topicPath) { } /** * @param notification * - the missing topic details */ @Override public void onMissingTopic(MissingTopicNotification notification) { // Create a topic and do process() in the callback topicControl.addTopic(notification.getTopicPath(), details, new AddTopicCallback(notification)); } } private final class AddTopicCallback implements TopicControl.AddCallback { private final MissingTopicNotification notification; AddTopicCallback(MissingTopicNotification notification) { this.notification = notification; } @Override public void onDiscard() { } /** * @param topicPath * - the topic path as supplied to the add request * @param reason * - the reason for failure */ @Override public void onTopicAddFailed(String topicPath, TopicAddFailReason reason) { // Cancel the notification because the server have failed to notification.cancel(); } /** * @param topicPath * - the full path of the topic that was added */ @Override public void onTopicAdded(String topicPath) { // Proceed the notification notification.proceed(); } } }
.NET
using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; namespace Examples { public class ControlClientMissingTopicNotification { private readonly ISession clientSession; private readonly ITopicControl topicControl; public ControlClientMissingTopicNotification() { clientSession = Diffusion.Sessions.Principal( "client" ).Password( "password" ) .Open( "ws://diffusion.example.com:80" ); topicControl = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .Open( "ws://diffusion.example.com:80" ).GetTopicControlFeature(); Subscribe( "some/path10" ); } /// <summary> /// Subscribes to a topic which may or may not be missing. /// </summary> /// <param name="topicPath">The path of the topic to subscribe to.</param> public async void Subscribe( string topicPath ) { var missingTopicHandler = new MissingTopicHandler(); // Add the 'missing topic handler' to the topic control object topicControl.AddMissingTopicHandler( topicPath, missingTopicHandler ); // Wait for the successful registration of the handler var registeredHandler = await missingTopicHandler.OnActiveCalled; var topics = clientSession.GetTopicsFeature(); var topicCompletion = new TaskCompletionSource<bool>(); // Attempt to subscribe to the topic topics.Subscribe( topicPath, new TopicsCompletionCallback( topicCompletion ) ); await topicCompletion.Task; // Wait and see if a missing topic notification is generated var request = await missingTopicHandler.OnMissingTopicCalled; // Cancel the client request on the server request.Cancel(); // Close the registered handler registeredHandler.Close(); // All events in Diffusion are asynchronous, so we must wait for the close to happen await missingTopicHandler.OnCloseCalled; } private class TopicsCompletionCallback : ITopicsCompletionCallback { private readonly TaskCompletionSource<bool> theCompletionSource; public TopicsCompletionCallback( TaskCompletionSource<bool> source ) { theCompletionSource = source; } /// <summary> /// This is called to notify that a call context was closed prematurely, typically due to a timeout or the /// session being closed. No further calls will be made for the context. /// </summary> public void OnDiscard() { theCompletionSource.SetResult( false ); } /// <summary> /// Called to indicate that the requested operation has been processed by the server. /// </summary> public void OnComplete() { theCompletionSource.SetResult( true ); } } /// <summary> /// Asynchronous helper class for handling missing topic notifications. /// </summary> private class MissingTopicHandler : IMissingTopicHandler { private readonly TaskCompletionSource<IRegisteredHandler> onActive = new TaskCompletionSource<IRegisteredHandler>(); private readonly TaskCompletionSource<IMissingTopicNotification> onMissingTopic = new TaskCompletionSource<IMissingTopicNotification>(); private readonly TaskCompletionSource<bool> onClose = new TaskCompletionSource<bool>(); /// <summary> /// Waits for the 'OnActive' event to be called. /// </summary> public Task<IRegisteredHandler> OnActiveCalled { get { return onActive.Task; } } /// <summary> /// Waits for the 'OnMissingTopic' event to be called. /// </summary> public Task<IMissingTopicNotification> OnMissingTopicCalled { get { return onMissingTopic.Task; } } public Task OnCloseCalled { get { return onClose.Task; } } /// <summary> /// Called when a client session requests a topic that does not exist, and the topic path belongs to part of /// the topic tree for which this handler was registered. /// /// The handler implementation should take the appropriate action (for example, create the topic), and then /// call IMissingTopicNotification.Proceed on the supplied notification. This allows the client request to /// continue and successfully resolve against the topic if it was created. /// /// A handler should always call Proceed() otherwise resources will continue to be reserved on the server /// and the client's request will not complete. /// </summary> /// <param name="notification">The client notification object.</param> void IMissingTopicHandler.OnMissingTopic( IMissingTopicNotification notification ) { onMissingTopic.SetResult( notification ); } /// <summary> /// Called when the handler has been successfully registered with the server. /// /// A session can register a single handler of each type for a given branch of the topic tree. If there is /// already a handler registered for the topic path the operation will fail, <c>registeredHandler</c> will /// be closed, and the session error handler will be notified. To change the handler, first close the /// previous handler. /// </summary> /// <param name="topicPath">The path that the handler is active for.</param> /// <param name="registeredHandler">Allows the handler to be closed.</param> void ITopicTreeHandler.OnActive( string topicPath, IRegisteredHandler registeredHandler ) { onActive.SetResult( registeredHandler ); } /// <summary> /// Called if the handler is closed. This happens if the call to register the handler fails, or the handler /// is unregistered. /// </summary> /// <param name="topicPath">The branch of the topic tree for which the handler was registered.</param> void ITopicTreeHandler.OnClose( string topicPath ) { onClose.TrySetResult( false ); } } } }
C
/* * This example shows how to register a missing topic notification * handler and return a missing topic notification response - calling * missing_topic_proceed() once we've created the topic. */ #include <stdio.h> #include <stdlib.h> #include <time.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'r', "topic_root", "Topic root to process missing topic notifications on", ARG_OPTIONAL, ARG_HAS_VALUE, "foo"}, END_OF_ARG_OPTS }; static int on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { puts("Topic added"); return HANDLER_SUCCESS; } static int on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { puts("Topic add failed"); printf("Reason code: %d\n", response->reason); return HANDLER_SUCCESS; } static int on_topic_add_discard(SESSION_T *session, void *context) { puts("Topic add discarded"); return HANDLER_SUCCESS; } /* * A request has been made for a topic that doesn't exist; create it * and inform Diffusion that the client's subcription request can * proceed. */ static int on_missing_topic(SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context) { printf("Missing topic: %s\n", request->topic_selector); BUF_T *sample_data_buf = buf_create(); buf_write_string(sample_data_buf, "Hello, world"); // Add the missing topic. ADD_TOPIC_PARAMS_T topic_params = { .on_topic_added = on_topic_added, .on_topic_add_failed = on_topic_add_failed, .on_discard = on_topic_add_discard, .topic_path = strdup(request->topic_selector+1), .details = create_topic_details_single_value(M_DATA_TYPE_STRING), .content = content_create(CONTENT_ENCODING_NONE, sample_data_buf) }; add_topic(session, topic_params); // Proceed with the client's subscription to the topic missing_topic_proceed(session, (SVC_MISSING_TOPIC_REQUEST_T *) request); return HANDLER_SUCCESS; } /* * Entry point for the example. */ int main(int argc, char **argv) { /* * Standard command-line parsing. */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } const char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); const char *topic_root = hash_get(options, "topic_root"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; session = session_create(url, principal, credentials, NULL, NULL, &error); if(session != NULL) { printf("Session created (state=%d, id=%s)\n", session_state_get(session), session_id_to_string(session->id)); } else { printf("Failed to create session: %s\n", error.message); free(error.message); return EXIT_FAILURE; } /* * Register the missing topic handler */ MISSING_TOPIC_PARAMS_T handler = { .on_missing_topic = on_missing_topic, .topic_path = topic_root, .context = NULL }; missing_topic_register_handler(session, handler); /* * Run for 5 minutes. */ sleep(5 * 60); /* * Close session and clean up. */ session_close(session, NULL); session_free(session); hash_free(options, NULL, free); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2015/06/18