Just a second...

Example: Receive missing topic notifications

The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.

JavaScript
// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
const session = await diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
});

// Register a missing topic handler on the 'example' root topic
// Any subscriptions to missing topics along this path will invoke this handler
session.topics.addMissingTopicHandler('example', {
    // Called when a handler is successfully registered
    onRegister : (path, close) => {
        console.log('Registered missing topic handler on path: ' + path);
        // Once we've registered the handler, we initiate a subscription with the selector '?example/topic/.*'
        // This will invoke the handler.
        session.select('?example/topic/.*');
        session.addStream('?example/topic/.*', diffusion.datatypes.string()).on('subscribe', (path) => {
            console.log('Subscribed to topic: ' + path);
        });
    },
    // Called when the handler is closed
    onClose : (path) => {
        console.log(`Missing topic handler on path '${path}' has been closed`);
    },
    // Called if there is an error on the handler
    onError : (path, error) => {
        console.log('Error on missing topic handler');
    },
    // Called when we've received a missing topic notification on our registered handler path
    onMissingTopic : (notification) => {
        console.log('Received missing topic notification with selector: ' + notification.selector);
        // Once we've received the missing topic notification initiated from subscribing to '?example/topic/.*',
        // we add a topic that will match the selector

        const topic = 'example/topic/foo';

        session.topics.add(topic, diffusion.topics.TopicType.STRING).then((result) => {
            console.log('Topic add success: ' + topic);
        }, (reason) => {
            console.log('Topic add failed: ' + reason);
        });
    }
});
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Implementation of a missing topic handler.
    /// </summary>
    public sealed class AddMissingTopicHandler
    {
        public async Task AddMissingTopicHandlerExample(string serverUrl)
        {
            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var clientSession = Diffusion.Sessions.Principal("client").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            string selector = "?Example/Some Topic//";
            string topicPath = "Example/Some Topic";

            WriteLine($"Adding missing topic handler for topic '{topicPath}'.");

            var registration = 
                await controlSession.TopicControl.AddMissingTopicHandlerAsync(
                        topicPath, 
                        new MissingTopicNotificationStream(controlSession));
            WriteLine($"Subscribing to topic '{topicPath}'.");
            
            await clientSession.Topics.SubscribeAsync("?Example/Some Topic//");

            await Task.Delay(TimeSpan.FromSeconds(1));
            
            // Clean up
            await controlSession.TopicControl.RemoveTopicsAsync(topicPath);

            WriteLine($"Topic '{topicPath}' removed.");

            await clientSession.Topics.UnsubscribeAsync(selector, CancellationToken.None);

            WriteLine($"Unsubscribing to topic '{topicPath}'.");

            await registration.CloseAsync();
            
            clientSession.Close();
            controlSession.Close();
        }

        /// <summary>
        /// Basic implementation of the stream that will be called when a session subscribes using
        /// a topic selector that matches no topics.
        /// </summary>
        private sealed class MissingTopicNotificationStream : IMissingTopicNotificationStream
        {
            private ISession session;

            public MissingTopicNotificationStream(ISession session) => this.session = session;

            public void OnClose() => WriteLine("Handler is removed.");

            public void OnError(ErrorReason errorReason) 
                        => WriteLine($"An error has occured : {errorReason}.");

            public void OnMissingTopic(IMissingTopicNotification notification)
            {
                WriteLine($"Topic '{notification.TopicPath}' does not exist.");

                session.TopicControl.AddTopic(
                        notification.TopicPath, 
                        session.TopicControl.NewSpecification(TopicType.STRING), 
                        new TopicControlAddCallback(notification));
            }
        }

        /// <summary>
        /// Implementation of a callback interface for adding topics.
        /// </summary>
        private sealed class TopicControlAddCallback : ITopicControlAddCallback
        {
            IMissingTopicNotification notification;

            public TopicControlAddCallback(IMissingTopicNotification notification) 
                        => this.notification = notification;

            public void OnDiscard() => WriteLine("The stream is now closed.");

            public void OnTopicAdded(string topicPath) => WriteLine($"Topic '{topicPath}' added.");

            public void OnTopicAddFailed(string topicPath, TopicAddFailReason reason) 
                        => WriteLine($"The topic '{topicPath}' could not be added - reason: {reason}.");
        }    
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2014, 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector.Type;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * An example of registering a missing topic notification handler and processing
 * notifications using a control client.
 *
 * @author DiffusionData Limited
 */
public final class ControlClientHandlingMissingTopicNotification {

    private static final Logger LOG =
        LoggerFactory.getLogger(ControlClientHandlingMissingTopicNotification.class);

    private final Session session;
    private final TopicControl topicControl;

    /**
     * Constructor.
     */
    public ControlClientHandlingMissingTopicNotification(String serverUrl)
        throws InterruptedException, ExecutionException, TimeoutException {
        // Create a session
        session = Diffusion.sessions().password("password").principal("admin")
            .open(serverUrl);

        topicControl = session.feature(TopicControl.class);

        // Registers a missing topic notification on a topic path
        topicControl.addMissingTopicHandler(
            "Accounts",
            new NotificationStream()).get(5, TimeUnit.SECONDS);

    }

    private final class NotificationStream implements
        MissingTopicNotificationStream {
        @Override
        public void onClose() {
        }

        @Override
        public void onError(ErrorReason errorReason) {
        }

        @Override
        public void onMissingTopic(MissingTopicNotification notification) {
            // This handler will create a missing topic if a path selector
            // requesting a topic starting with "Accounts/" is selected and
            // the requesting session has the principal 'control'.
            if (notification.getTopicSelector().getType() == Type.PATH) {
                final String path = notification.getTopicPath();
                if (path.startsWith("Accounts/") &&
                    "control".equals(
                        notification.getSessionProperties().get(Session.PRINCIPAL))) {

                    topicControl.addTopic(
                        path,
                        TopicType.STRING).whenComplete((result, ex) -> {
                        if (ex == null) {
                            LOG.info("Missing topic " + path + " " + result);
                        }
                        else {
                            LOG.warn("Failed to create missing topic " + path, ex);
                        }
                    });
                }
            }
        }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_missing_topic(
        SESSION_T *session,
        const SVC_MISSING_TOPIC_REQUEST_T *request,
        void *context)
{
        // handle missing topic notification
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // register missing topic handler
        MISSING_TOPIC_PARAMS_T params = {
                .on_missing_topic = on_missing_topic,
                .topic_path = topic_path,
                .context = NULL
        };
        missing_topic_register_handler(session, params);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class MissingTopicHandlerExample: PTDiffusionMissingTopicHandler  {
    var session: PTDiffusionSession?

    func startWithURL(url: URL) {
        let credentials = PTDiffusionCredentials(password: "password")

        let sessionConfiguration =
            PTDiffusionSessionConfiguration(principal: "control",
                                            credentials:credentials)

        print("Connecting...")

        PTDiffusionSession.open(with: url,
                                configuration: sessionConfiguration) { (session, error) in

            if (session == nil) {
                print("Failed to open session: %@", error!.localizedDescription)
                return
            }

            // At this point we now have a connected session.
            print("Connected.")

            // To maintain a strong reference to the session.
            self.session = session!

            self.registerAsMissingTopicHandler(session: session!)
        }
    }

    func registerAsMissingTopicHandler(session: PTDiffusionSession) {
        session.topicControl.add(self, forTopicPath: "Example/Control Client Handler") { (registration, error) in

            if (registration != nil) {
                print("Registered as missing topic handler.")
            }
            else {
                print("Failed to register as missing topic handler: %@", error!.localizedDescription)
            }
        }
    }

    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                        hadMissingTopicNotification notification: PTDiffusionMissingTopicNotification) {
        print("Received Missing Topic Notification: %@", notification);

        let expression: String = notification.topicSelectorExpression

        // Expect a path pattern expression.
        if (!expression.hasPrefix(">")) {
            print("Topic selector expression is not a path pattern.")
            return
        }

        // extract topic path from path pattern expression
        let index = expression.index(expression.startIndex, offsetBy: 1)
        let topicPath = String(expression[index...])

        // Add a stateless topic at this topic path.
        self.session?.topicControl.addTopic(withPath: topicPath,
                                            type: PTDiffusionTopicType.string) { (result, error) in
            if (result == nil) {
                print("Error occurred while creating topic: %@", error!.localizedDescription)
            }
            else {
                print("Topic created.")
            }
        }
    }

    func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
        print("Registration %@ closed.")
    }

    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                        didFailWithError error: Error) {
        print("Registration %@ failed: %@", registration, error.localizedDescription)
    }

}

Change the URL from that provided in the example to the URL of the Diffusion server .