Just a second...

Example: Receive missing topic notifications

The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.

JavaScript
var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
}).then(function(session) {

	// Register a missing topic handler on the "example" root topic
	// Any subscriptions to missing topics along this path will invoke this handler
	session.topics.addMissingTopicHandler("example", {
		// Called when a handler is successfully registered
		onRegister : function(path, close) {
			console.log("Registered missing topic handler on path: " + path);
			// Once we've registered the handler, we initiate a subscription with the selector "?example/topic/.*"
			// This will invoke the handler.
			session.select("?example/topic/.*");
			session.addStream("?example/topic/.*", diffucion.datatypes.string()).on('subscribe', function(path) {
				console.log("Subscribed to topic: " + path);
			});
		},
		// Called when the handler is closed
		onClose : function(path) {
			console.log("Missing topic handler on path '" + path + "' has been closed");
		},
		// Called if there is an error on the handler
		onError : function(path, error) {
			console.log("Error on missing topic handler");
		},
		// Called when we've received a missing topic notification on our registered handler path
		onMissingTopic : function(notification) {
			console.log("Received missing topic notification with selector: " + notification.selector);
			// Once we've received the missing topic notification initiated from subscribing to "?example/topic/.*",
			// we add a topic that will match the selector

			var topic = "example/topic/foo";

			session.topics.add(topic, diffusion.topics.TopicType.STRING).then(function(result) {
				console.log("Topic add success: " + topic);
				// If the topic addition is successful, we proceed() with the session's subscription.
				// The client will now be subscribed to the topic
				notification.proceed();
			}, function(reason) {
				console.log("Topic add failed: " + reason);
				// If the topic addition fails, we cancel() the session's subscription request.
				notification.cancel();
			});
		}
	});
});
                    
C
/*
 * This example shows how to register a missing topic notification
 * handler and return a missing topic notification response - calling
 * missing_topic_proceed() once we've created the topic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'r', "topic_root", "Topic root to process missing topic notifications on", ARG_OPTIONAL, ARG_HAS_VALUE, "foo"},
        END_OF_ARG_OPTS
};

static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic added");
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic add failed");
        printf("Reason code: %d\n", response->reason);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_discard(SESSION_T *session, void *context)
{
        puts("Topic add discarded");
        return HANDLER_SUCCESS;
}

/*
 * A request has been made for a topic that doesn't exist; create it
 * and inform Diffusion that the client's subcription request can
 * proceed.
 */
static int
on_missing_topic(SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context)
{
        printf("Missing topic: %s\n", request->topic_selector);

        BUF_T *sample_data_buf = buf_create();
        buf_write_string(sample_data_buf, "Hello, world");

        // Add the missing topic.
        ADD_TOPIC_PARAMS_T topic_params = {
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard,
                .topic_path = strdup(request->topic_selector+1),
                .details = create_topic_details_single_value(M_DATA_TYPE_STRING),
                .content = content_create(CONTENT_ENCODING_NONE, sample_data_buf)
        };

        add_topic(session, topic_params);

        // Proceed with the client's subscription to the topic
        missing_topic_proceed(session, (SVC_MISSING_TOPIC_REQUEST_T *) request);

        return HANDLER_SUCCESS;
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        const char *topic_root = hash_get(options, "topic_root");

        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session != NULL) {
                printf("Session created (state=%d, id=%s)\n",
                       session_state_get(session),
                       session_id_to_string(session->id));
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                return EXIT_FAILURE;
        }

        /*
         * Register the missing topic handler
         */
        MISSING_TOPIC_PARAMS_T handler = {
                .on_missing_topic = on_missing_topic,
                .topic_path = topic_root,
                .context = NULL
        };

        missing_topic_register_handler(session, handler);

        /*
         * Run for 5 minutes.
         */
        sleep(5 * 60);

        /*
         * Close session and clean up.
         */
        session_close(session, NULL);
        session_free(session);

        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}
                    
Java and Android
                        /*******************************************************************************
 * Copyright (C) 2014, 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector.Type;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * An example of registering a missing topic notification handler and processing
 * notifications using a control client.
 *
 * @author Push Technology Limited
 */
public final class ControlClientHandlingMissingTopicNotification {

    private static final Logger LOG =
        LoggerFactory.getLogger(ControlClientHandlingMissingTopicNotification.class);

    private final Session session;
    private final TopicControl topicControl;

    /**
     * Constructor.
     */
    public ControlClientHandlingMissingTopicNotification(String serverUrl)
        throws InterruptedException, ExecutionException, TimeoutException {
        // Create a session
        session = Diffusion.sessions().password("password").principal("admin")
            .open(serverUrl);

        topicControl = session.feature(TopicControl.class);

        // Registers a missing topic notification on a topic path
        topicControl.addMissingTopicHandler(
            "Accounts",
            new NotificationStream()).get(5, TimeUnit.SECONDS);

    }

    private final class NotificationStream implements
        MissingTopicNotificationStream {
        @Override
        public void onClose() {
        }

        @Override
        public void onError(ErrorReason errorReason) {
        }

        @Override
        public void onMissingTopic(MissingTopicNotification notification) {
            // This handler will create a missing topic if a path selector
            // requesting a topic starting with "Accounts/" is selected and
            // the requesting session has the principal 'control'.
            if (notification.getTopicSelector().getType() == Type.PATH) {
                final String path = notification.getTopicPath();
                if (path.startsWith("Accounts/") &&
                    "control".equals(
                        notification.getSessionProperties().get(Session.PRINCIPAL))) {

                    topicControl.addTopic(
                        path,
                        TopicType.STRING).whenComplete((result, ex) -> {
                            if (ex == null) {
                                LOG.info("Missing topic " + path + " " + result);
                            }
                            else {
                                LOG.warn("Failed to create missing topic " + path, ex);
                            }
                        });
                }
            }
        }
    }

}
                    
.Net


                

Change the URL from that provided in the example to the URL of Diffusion Cloud. Diffusion Cloud service URLs end in diffusion.cloud