Example: Receive missing topic notifications

The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.

var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
}).then(function(session) {

	// Register a missing topic handler on the "example" root topic
	// Any subscriptions to missing topics along this path will invoke this handler
	session.topics.addMissingTopicHandler("example", {
		// Called when a handler is successfully registered
		onRegister : function(path, close) {
			console.log("Registered missing topic handler on path: " + path);
			// Once we've registered the handler, we initiate a subscription with the selector "?example/topic/.*"
			// This will invoke the handler.
			session.subscribe("?example/topic/.*").on('subscribe', function(type, path) {
				console.log("Subscribed to topic: " + path);
		// Called when the handler is closed
		onClose : function(path) {
			console.log("Missing topic handler on path '" + path + "' has been closed");
		// Called if there is an error on the handler
		onError : function(path, error) {
			console.log("Error on missing topic handler");
		// Called when we've received a missing topic notification on our registered handler path
		onMissingTopic : function(notification) {
			console.log("Received missing topic notification with selector: " + notification.selector);
			// Once we've received the missing topic notification initiated from subscribing to "?example/topic/.*", 
			// we add a topic that will match the selector

			var topic = "example/topic/foo";

			session.topics.add(topic).then(function(result) {
				console.log("Topic add success: " + topic);
				// If the topic addition is successful, we proceed() with the session's subscription.
				// The client will now be subscribed to the topic
			}, function(reason) {
				console.log("Topic add failed: " + reason);
				// If the topic addition fails, we cancel() the session's subscription request.

@import Diffusion;

@interface MissingTopicHandlerExample (PTDiffusionMissingTopicHandler) <PTDiffusionMissingTopicHandler>

@implementation MissingTopicHandlerExample {
    PTDiffusionSession* _session;

-(void)startWithURL:(NSURL*)url {
    PTDiffusionCredentials *const credentials =
        [[PTDiffusionCredentials alloc] initWithPassword:@"password"];

    PTDiffusionSessionConfiguration *const sessionConfiguration =
        [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control"


    [PTDiffusionSession openWithURL:url
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
        if (!session) {
            NSLog(@"Failed to open session: %@", error);

        // At this point we now have a connected session.

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Register as missing topic handler for a branch of the topic tree.
        [self registerAsMissingTopicHandlerForSession:session];

-(void)registerAsMissingTopicHandlerForSession:(PTDiffusionSession *const)session {
    [session.topicControl addMissingTopicHandler:self
                                    forTopicPath:@"Example/Control Client Handler"
                               completionHandler:^(PTDiffusionTopicTreeRegistration *const registration, NSError *const error)
        if (registration) {
            NSLog(@"Registered as missing topic handler.");
        } else {
            NSLog(@"Failed to register as missing topic handler. Error: %@", error);


@implementation MissingTopicHandlerExample (PTDiffusionMissingTopicHandler)

-(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *const)registration
          hadMissingTopicNotification:(PTDiffusionMissingTopicNotification *const)notification {
    NSString *const expression = notification.topicSelectorExpression;
    NSLog(@"Received Missing Topic Notification: %@", expression);

    // Expect a path pattern expression.
    if (![expression hasPrefix:@">"]) {
        NSLog(@"Topic selector expression is not a path pattern.");

    // Extract topic path from path pattern expression.
    NSString *const topicPath = [expression substringFromIndex:1];

    // Add a stateless topic at this topic path.
    [_session.topicControl addWithTopicPath:topicPath
                          completionHandler:^(NSError *const error)
        if (error) {
            NSLog(@"Failed to add topic.");

        // Topic added so allow subscriber to proceed.
        [notification proceed];

Java and Android
 * Copyright (C) 2014, 2017 Push Technology Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

 * An example of registering a missing topic notification handler and processing
 * notifications using a control client.
 * @author Push Technology Limited
public final class ControlClientHandlingMissingTopicNotification {

    // UCI features
    private final Session session;
    private final TopicControl topicControl;

     * Constructor.
    public ControlClientHandlingMissingTopicNotification(String serverUrl)
        throws InterruptedException, ExecutionException, TimeoutException {
        // Create a session
        session = Diffusion.sessions().password("password").principal("admin")

        topicControl = session.feature(TopicControl.class);

        // Registers a missing topic notification on a topic path
            new NotificationStream()).get(5, TimeUnit.SECONDS);


    private final class NotificationStream implements
        MissingTopicNotificationStream {
        public void onClose() {

        public void onError(ErrorReason errorReason) {

        public void onMissingTopic(MissingTopicNotification notification) {
                TopicType.STRING).whenComplete((result, ex) -> {
                    if (ex == null) {
                    else {

 * This example shows how to register a missing topic notification
 * handler and return a missing topic notification response - calling
 * missing_topic_proceed() once we've created the topic.
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'r', "topic_root", "Topic root to process missing topic notifications on", ARG_OPTIONAL, ARG_HAS_VALUE, "foo"},

static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
        puts("Topic added");
        return HANDLER_SUCCESS;

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
        puts("Topic add failed");
        printf("Reason code: %d\n", response->reason);
        return HANDLER_SUCCESS;

static int
on_topic_add_discard(SESSION_T *session, void *context)
        puts("Topic add discarded");
        return HANDLER_SUCCESS;

 * A request has been made for a topic that doesn't exist; create it
 * and inform Diffusion that the client's subcription request can
 * proceed.
static int
on_missing_topic(SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context)
        printf("Missing topic: %s\n", request->topic_selector);

        BUF_T *sample_data_buf = buf_create();
        buf_write_string(sample_data_buf, "Hello, world");

        // Add the missing topic.
        ADD_TOPIC_PARAMS_T topic_params = {
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard,
                .topic_path = strdup(request->topic_selector+1),
                .details = create_topic_details_single_value(M_DATA_TYPE_STRING),
                .content = content_create(CONTENT_ENCODING_NONE, sample_data_buf)

        add_topic(session, topic_params);

        // Proceed with the client's subscription to the topic
        missing_topic_proceed(session, (SVC_MISSING_TOPIC_REQUEST_T *) request);

        return HANDLER_SUCCESS;

 * Entry point for the example.
main(int argc, char **argv)
         * Standard command-line parsing.
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        const char *topic_root = hash_get(options, "topic_root");

        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session != NULL) {
                printf("Session created (state=%d, id=%s)\n",
        else {
                printf("Failed to create session: %s\n", error.message);
                return EXIT_FAILURE;

         * Register the missing topic handler
        MISSING_TOPIC_PARAMS_T handler = {
                .on_missing_topic = on_missing_topic,
                .topic_path = topic_root,
                .context = NULL

        missing_topic_register_handler(session, handler);

         * Run for 5 minutes.
        sleep(5 * 60);

         * Close session and clean up.
        session_close(session, NULL);

        hash_free(options, NULL, free);

        return EXIT_SUCCESS;

Change the URL from that provided in the example to the URL of the Diffusion server.