Just a second...

Receiving topic notifications

Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.

Note: Topic notifications are supported by the Android™ API, Java™ API and JavaScript® API.

The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.

For more details about topic notifications, see Topic notifications.

Required permissions:select_topic and read_topic permissions for the specified topics

Receiving topic notifications

A client can register to receive notifications about a set of topics via a listener object.

JavaScript
const listener = {
    onDescendantNotification: (topicPath, type) => {
        // notification for an immediate descendant
    },
    onTopicNotification: (topicPath, topicSpecification, type) => {
        // notification for the topic
    },
    onClose: () => {
    // the handler is closed
    },
    onError: (error) => {
        // an error occured
    }
};

const registration = await session.notifications.addListener(listener);
registration.select('foo');

.NET
registration = await notifications.AddListenerAsync(new Listener());


...


/// <summary>
/// The listener for topic notifications.
/// </summary>
private class Listener : ITopicNotificationListener {
    /// <summary>
    /// Indicates that the stream was closed.
    /// </summary>
    public void OnClose()
    {
        WriteLine("The listener was closed.");
    }

    /// <summary>
    /// Notification for an immediate descendant of a selected topic path.
    /// </summary>
    public void OnDescendantNotification(string topicPath, NotificationType type)
    {
        WriteLine($"Descendant topic '{topicPath}' has been {type}.");
    }

    /// <summary>
    /// Indicates an error received by the callback.
    /// </summary>
    public void OnError(ErrorReason errorReason)
    {
        WriteLine($"The listener received the error: '{errorReason}'.");
    }

    /// <summary>
    /// Notification for a selected topic.
    /// </summary>
    public void OnTopicNotification(string topicPath, ITopicSpecification specification, 
                                    NotificationType type)
    {
        WriteLine($"Topic '{topicPath}' has been {type}.");
    }
}
Java and Android
final Session session = Diffusion.sessions().open("ws://localhost:8080");
final TopicNotifications notifications = session.feature(TopicNotifications.class);

final TopicNotificationListener listener = new TopicNotificationListener() {
    @Override
    public void onTopicNotification(String topicPath,
        TopicSpecification specification,
        NotificationType type) {
        // Handle notifications for selected/deselected topics
    }

    @Override
    public void onDescendantNotification(String topicPath, NotificationType type) {
        // Handle notifications for immediate descendants
    }

    @Override
    public void onClose() {
        // The listener has been closed
    }

    @Override
    public void onError(ErrorReason error) {
        // The listener has encountered an error
    }
};

final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener);
final NotificationRegistration registration = future.join();

registration.select("foo");
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"

DIFFUSION_REGISTRATION_T *g_registration;

static int on_listener_registered(
        const DIFFUSION_REGISTRATION_T *registration,
        void *context)
{
        // listener has been registered
        g_registration = diffusion_registration_dup(registration);
        return HANDLER_SUCCESS;
}


static int on_descendant_notification(
        const char *topic_path,
        const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
        void *context)
{
        // handle topic notification for descendant of `topic_path`
        return HANDLER_SUCCESS;
}


static int on_topic_notification(
        const char *topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
        void *context)
{
        // handle topic notification for `topic_path`
        return HANDLER_SUCCESS;
}


static void on_close()
{
        // topic notification listener has been closed
}


static int on_selected(void *context)
{
        // topic notification listener has been registered
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // create a topic notification listener
        DIFFUSION_TOPIC_NOTIFICATION_LISTENER_T listener = {
                .on_registered = on_listener_registered,
                .on_descendant_notification = on_descendant_notification,
                .on_topic_notification = on_topic_notification,
                .on_close = on_close
        };
        diffusion_topic_notification_add_listener(session, listener, NULL);

        // Sleep for a while
        sleep(5);

        // register the topic notification listener
        DIFFUSION_TOPIC_NOTIFICATION_REGISTRATION_PARAMS_T params = {
                .on_selected = on_selected,
                .registration = g_registration,
                .selector = topic_path
        };
        diffusion_topic_notification_registration_select(session, params, NULL);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        diffusion_registration_free(g_registration);
        return EXIT_SUCCESS;
}
Apple
class TopicNotificationListener: PTDiffusionTopicNotificationListener {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveDescendantNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String) {
        // Handle notifications for immediate descendants
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String, specification: PTDiffusionTopicSpecification) {
        // Handle notifications for selected/deselected topics
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        // The listener has encountered an error
    }


    func diffusionDidClose(_ stream: PTDiffusionStream) {
        // The listener has been closed
    }
}


func register_topic_notification_listener(session: PTDiffusionSession) {
    let listener = TopicNotificationListener()

    session.topicNotifications.add(listener) { (registration, error) in
        if (error != nil) {
            print("An error occurred: %@", error!.localizedDescription)
            return
        }

        registration?.select(withTopicSelectorExpression: "foo") { _ in }
    }
}