Just a second...

Example: Receive missing topic notifications

The following examples use the TopicControl feature in the Unified API to register a missing topic notification handler.

JavaScript
var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
}).then(function(session) {

	// Register a missing topic handler on the "example" root topic
	// Any subscriptions to missing topics along this path will invoke this handler
	session.topics.addMissingTopicHandler("example", {
		// Called when a handler is successfully registered
		onRegister : function(path, close) {
			console.log("Registered missing topic handler on path: " + path);
			// Once we've registered the handler, we initiate a subscription with the selector "?example/topic/.*"
			// This will invoke the handler.
			session.subscribe("?example/topic/.*").on('subscribe', function(type, path) {
				console.log("Subscribed to topic: " + path);
			});
		},
		// Called when the handler is closed
		onClose : function(path) {
			console.log("Missing topic handler on path '" + path + "' has been closed");
		},
		// Called if there is an error on the handler
		onError : function(path, error) {
			console.log("Error on missing topic handler");
		},
		// Called when we've received a missing topic notification on our registered handler path
		onMissingTopic : function(notification) {
			console.log("Received missing topic notification with selector: " + notification.selector);
			// Once we've received the missing topic notification initiated from subscribing to "?example/topic/.*", 
			// we add a topic that will match the selector

			var topic = "example/topic/foo";

			session.topics.add(topic).then(function(result) {
				console.log("Topic add success: " + topic);
				// If the topic addition is successful, we proceed() with the session's subscription.
				// The client will now be subscribed to the topic
				notification.proceed();
			}, function(reason) {
				console.log("Topic add failed: " + reason);
				// If the topic addition fails, we cancel() the session's subscription request.
				notification.cancel();
			});
		}
	});

});
Apple
@import Diffusion;

@interface MissingTopicHandlerExample (PTDiffusionMissingTopicHandler) <PTDiffusionMissingTopicHandler>
@end

@implementation MissingTopicHandlerExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    PTDiffusionCredentials *const credentials =
        [[PTDiffusionCredentials alloc] initWithPassword:@"password"];

    PTDiffusionSessionConfiguration *const sessionConfiguration =
        [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control"
                                                       credentials:credentials];

    NSLog(@"Connecting...");

    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Register as missing topic handler for a branch of the topic tree.
        [self registerAsMissingTopicHandlerForSession:session];
    }];
}

-(void)registerAsMissingTopicHandlerForSession:(PTDiffusionSession *const)session {
    [session.topicControl addMissingTopicHandler:self
                                    forTopicPath:@"Example/Control Client Handler"
                               completionHandler:^(PTDiffusionTopicTreeRegistration *const registration, NSError *const error)
    {
        if (registration) {
            NSLog(@"Registered as missing topic handler.");
        } else {
            NSLog(@"Failed to register as missing topic handler. Error: %@", error);
        }
    }];
}

@end

@implementation MissingTopicHandlerExample (PTDiffusionMissingTopicHandler)

-(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *const)registration
          hadMissingTopicNotification:(PTDiffusionMissingTopicNotification *const)notification {
    NSString *const expression = notification.topicSelectorExpression;
    NSLog(@"Received Missing Topic Notification: %@", expression);

    // Expect a path pattern expression.
    if (![expression hasPrefix:@">"]) {
        NSLog(@"Topic selector expression is not a path pattern.");
        return;
    }

    // Extract topic path from path pattern expression.
    NSString *const topicPath = [expression substringFromIndex:1];

    // Add a stateless topic at this topic path.
    [_session.topicControl addWithTopicPath:topicPath
                                       type:PTDiffusionTopicType_Stateless
                                      value:nil
                          completionHandler:^(NSError *const error)
    {
        if (error) {
            NSLog(@"Failed to add topic.");
            return;
        }

        // Topic added so allow subscriber to proceed.
        [notification proceed];
    }];
}

@end
Java and Android
package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.RegisteredHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicAddFailReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicDetails;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * An example of registering a missing topic notification handler and processing
 * notifications using a control client.
 *
 * @author Push Technology Limited
 */
public final class ControlClientHandlingMissingTopicNotification {

    // UCI features
    private final Session session;
    private final TopicControl topicControl;
    private final TopicDetails details;

    /**
     * Constructor.
     */
    public ControlClientHandlingMissingTopicNotification() {
        // Create a session
        session = Diffusion.sessions().password("password").principal("admin").open("ws://diffusion.example.com:8080");

        topicControl = session.feature(TopicControl.class);

        // Registers a missing topic notification on a topic path
        topicControl.addMissingTopicHandler("A", new MissingTopicNotificationHandler());

        // For details that may be reused many times it is far more efficient to
        // create just once - this creates a default string type details.
        details = topicControl.newDetails(TopicType.SINGLE_VALUE);

    }

    // Private class that implements MissingTopicHandler interface
    private final class MissingTopicNotificationHandler implements
            MissingTopicHandler {
        /**
         * @param topicPath
         *            - the path that the handler is active for
         * @param registeredHandler
         *            - allows the handler to be closed
         */
        @Override
        public void onActive(String topicPath, RegisteredHandler registeredHandler) {
        }

        /**
         * @param topicPath
         *            - the branch of the topic tree for which the handler was
         *            registered
         */
        @Override
        public void onClose(String topicPath) {
        }

        /**
         * @param notification
         *            - the missing topic details
         */
        @Override
        public void onMissingTopic(MissingTopicNotification notification) {
            // Create a topic and do process() in the callback
            topicControl.addTopic(notification.getTopicPath(), details, new AddTopicCallback(notification));
        }
    }

    private final class AddTopicCallback implements TopicControl.AddCallback {
        private final MissingTopicNotification notification;

        AddTopicCallback(MissingTopicNotification notification) {
            this.notification = notification;
        }

        @Override
        public void onDiscard() {
        }

        /**
         * @param topicPath
         *            - the topic path as supplied to the add request
         * @param reason
         *            - the reason for failure
         */
        @Override
        public void onTopicAddFailed(String topicPath, TopicAddFailReason reason) {
            // Cancel the notification because the server have failed to
            notification.cancel();
        }

        /**
         * @param topicPath
         *            - the full path of the topic that was added
         */
        @Override
        public void onTopicAdded(String topicPath) {
            // Proceed the notification
            notification.proceed();
        }

    }

}
.NET
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;

namespace Examples {
    public class ControlClientMissingTopicNotification {
        private readonly ISession clientSession;
        private readonly ITopicControl topicControl;

        public ControlClientMissingTopicNotification() {
            clientSession = Diffusion.Sessions.Principal( "client" ).Password( "password" )
                .Open( "ws://diffusion.example.com:80" );

            topicControl = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .Open( "ws://diffusion.example.com:80" ).GetTopicControlFeature();

            Subscribe( "some/path10" );
        }

        /// <summary>
        /// Subscribes to a topic which may or may not be missing.
        /// </summary>
        /// <param name="topicPath">The path of the topic to subscribe to.</param>
        public async void Subscribe( string topicPath ) {
            var missingTopicHandler = new MissingTopicHandler();

            // Add the 'missing topic handler' to the topic control object
            topicControl.AddMissingTopicHandler( topicPath, missingTopicHandler );

            // Wait for the successful registration of the handler
            var registeredHandler = await missingTopicHandler.OnActiveCalled;

            var topics = clientSession.GetTopicsFeature();

            var topicCompletion = new TaskCompletionSource<bool>();

            // Attempt to subscribe to the topic
            topics.Subscribe( topicPath, new TopicsCompletionCallback( topicCompletion ) );

            await topicCompletion.Task;

            // Wait and see if a missing topic notification is generated
            var request = await missingTopicHandler.OnMissingTopicCalled;

            // Cancel the client request on the server
            request.Cancel();

            // Close the registered handler
            registeredHandler.Close();

            // All events in Diffusion are asynchronous, so we must wait for the close to happen
            await missingTopicHandler.OnCloseCalled;
        }

        private class TopicsCompletionCallback : ITopicsCompletionCallback {
            private readonly TaskCompletionSource<bool> theCompletionSource;

            public TopicsCompletionCallback( TaskCompletionSource<bool> source ) {
                theCompletionSource = source;
            }

            /// <summary>
            /// This is called to notify that a call context was closed prematurely, typically due to a timeout or the
            /// session being closed. No further calls will be made for the context.
            /// </summary>
            public void OnDiscard() {
                theCompletionSource.SetResult( false );
            }

            /// <summary>
            /// Called to indicate that the requested operation has been processed by the server.
            /// </summary>
            public void OnComplete() {
                theCompletionSource.SetResult( true );
            }
        }

        /// <summary>
        /// Asynchronous helper class for handling missing topic notifications.
        /// </summary>
        private class MissingTopicHandler : IMissingTopicHandler {
            private readonly TaskCompletionSource<IRegisteredHandler> onActive =
                new TaskCompletionSource<IRegisteredHandler>();

            private readonly TaskCompletionSource<IMissingTopicNotification> onMissingTopic =
                new TaskCompletionSource<IMissingTopicNotification>();

            private readonly TaskCompletionSource<bool> onClose = new TaskCompletionSource<bool>();

            /// <summary>
            /// Waits for the 'OnActive' event to be called.
            /// </summary>
            public Task<IRegisteredHandler> OnActiveCalled {
                get {
                    return onActive.Task;
                }
            }

            /// <summary>
            /// Waits for the 'OnMissingTopic' event to be called.
            /// </summary>
            public Task<IMissingTopicNotification> OnMissingTopicCalled {
                get {
                    return onMissingTopic.Task;
                }
            }

            public Task OnCloseCalled {
                get {
                    return onClose.Task;
                }
            }

            /// <summary>
            /// Called when a client session requests a topic that does not exist, and the topic path belongs to part of
            /// the topic tree for which this handler was registered.
            ///
            /// The handler implementation should take the appropriate action (for example, create the topic), and then
            /// call IMissingTopicNotification.Proceed on the supplied notification. This allows the client request to
            /// continue and successfully resolve against the topic if it was created.
            ///
            /// A handler should always call Proceed() otherwise resources will continue to be reserved on the server
            /// and the client's request will not complete.
            /// </summary>
            /// <param name="notification">The client notification object.</param>
            void IMissingTopicHandler.OnMissingTopic( IMissingTopicNotification notification ) {
                onMissingTopic.SetResult( notification );
            }

            /// <summary>
            /// Called when the handler has been successfully registered with the server.
            ///
            /// A session can register a single handler of each type for a given branch of the topic tree. If there is
            /// already a handler registered for the topic path the operation will fail, <c>registeredHandler</c> will
            /// be closed, and the session error handler will be notified. To change the handler, first close the
            /// previous handler.
            /// </summary>
            /// <param name="topicPath">The path that the handler is active for.</param>
            /// <param name="registeredHandler">Allows the handler to be closed.</param>
            void ITopicTreeHandler.OnActive( string topicPath, IRegisteredHandler registeredHandler ) {
                onActive.SetResult( registeredHandler );
            }

            /// <summary>
            /// Called if the handler is closed. This happens if the call to register the handler fails, or the handler
            /// is unregistered.
            /// </summary>
            /// <param name="topicPath">The branch of the topic tree for which the handler was registered.</param>
            void ITopicTreeHandler.OnClose( string topicPath ) {
                onClose.TrySetResult( false );
            }
        }
    }
}
C
/*
 * This example shows how to register a missing topic notification
 * handler and return a missing topic notification response - calling
 * missing_topic_proceed() once we've created the topic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'r', "topic_root", "Topic root to process missing topic notifications on", ARG_OPTIONAL, ARG_HAS_VALUE, "foo"},
        END_OF_ARG_OPTS
};

static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic added");
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic add failed");
        printf("Reason code: %d\n", response->reason);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_discard(SESSION_T *session, void *context)
{
        puts("Topic add discarded");
        return HANDLER_SUCCESS;
}

/*
 * A request has been made for a topic that doesn't exist; create it
 * and inform Diffusion that the client's subcription request can
 * proceed.
 */
static int
on_missing_topic(SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context)
{
        printf("Missing topic: %s\n", request->topic_selector);

        BUF_T *sample_data_buf = buf_create();
        buf_write_string(sample_data_buf, "Hello, world");

        // Add the missing topic.
        ADD_TOPIC_PARAMS_T topic_params = {
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard,
                .topic_path = strdup(request->topic_selector+1),
                .details = create_topic_details_single_value(M_DATA_TYPE_STRING),
                .content = content_create(CONTENT_ENCODING_NONE, sample_data_buf)
        };

        add_topic(session, topic_params);

        // Proceed with the client's subscription to the topic
        missing_topic_proceed(session, (SVC_MISSING_TOPIC_REQUEST_T *) request);

        return HANDLER_SUCCESS;
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        const char *topic_root = hash_get(options, "topic_root");

        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session != NULL) {
                printf("Session created (state=%d, id=%s)\n",
                       session_state_get(session),
                       session_id_to_string(session->id));
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                return EXIT_FAILURE;
        }

        /*
         * Register the missing topic handler
         */
        MISSING_TOPIC_PARAMS_T handler = {
                .on_missing_topic = on_missing_topic,
                .topic_path = topic_root,
                .context = NULL
        };

        missing_topic_register_handler(session, handler);

        /*
         * Run for 5 minutes.
         */
        sleep(5 * 60);

        /*
         * Close session and clean up.
         */
        session_close(session, NULL);
        session_free(session);

        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.