Just a second...

Example: Subscribe other clients to topics

The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.

.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Clients;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes another session to topics.
    /// </summary>
    public sealed class SubscriptionControlSubscribe
    {
        public SubscriptionControlSubscribe(string serverUrl)
        {
            var topic = $"topic/example";

            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var subscriptionControl = controlSession.SubscriptionControl;

            try
            {
                await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, cancellationToken);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{topic}' : {ex}.");
                controlSession.Close();
                return;
            }

            ISession session = Diffusion.Sessions.Principal("client")
                .Credentials(Diffusion.Credentials.Password("password"))
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .NoReconnection()
                .Open(serverUrl);

            WriteLine($"Session with id '{session.SessionId}' created.");

            var subscriptionCallback = new SubscriptionCallback();

            var topicSelector = "?topic//";

            try
            {
                subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}.");
                session.Close();
                controlSession.Close();
                return;
            }

            try
            {
                subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}.");
            }

            // Close the sessions
            session.Close();
            controlSession.Close();
        }

        /// <summary>
        /// The callback for subscription operations.
        /// </summary>
        private class SubscriptionCallback : ISubscriptionCallback
        {
            /// <summary>
            /// Indicates that the session was closed.
            /// </summary>
            public void OnDiscard()
            {
                WriteLine("The session was closed.");
            }

            /// <summary>
            /// Indicates that a requested operation has been handled by the server.
            /// </summary>
            public void OnComplete()
            {
                WriteLine("Subscription complete.");
            }
        }
    }
}
Java and Android
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.CompletableFuture;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionId;

/**
 * This demonstrates using a client to subscribe and unsubscribe other clients
 * to topics.
 * <P>
 * This uses the 'SubscriptionControl' feature.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public class ControlClientSubscriptionControl {

    private final Session session;

    private final SubscriptionControl subscriptionControl;

    /**
     * Constructor.
     */
    public ControlClientSubscriptionControl() {

        session =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");

        subscriptionControl = session.feature(SubscriptionControl.class);
    }

    /**
     * Subscribe a client to topics.
     *
     * @param sessionId client to subscribe
     * @param topicSelector topic selector expression
     * @param callback for subscription result
     */
    public CompletableFuture<?> subscribe(
        SessionId sessionId,
        String topicSelector) {

        // To subscribe a client to a topic, this client session
        // must have the 'modify_session' permission.
        return subscriptionControl.subscribe(
            sessionId,
            topicSelector);
    }

    /**
     * Unsubscribe a client from topics.
     *
     * @param sessionId client to unsubscribe
     * @param topicSelector topic selector expression
     * @return a CompletableFuture that completes when a response is received
     *         from the server
     */
    public CompletableFuture<?> unsubscribe(
        SessionId sessionId,
        String topicSelector) {

        // To unsubscribe a client from a topic, this client session
        // must have the 'modify_session' permission.
        return subscriptionControl.unsubscribe(
            sessionId,
            topicSelector);
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }
}
                    
C
/*
 * This example waits to be notified of a client connection, and then
 * subscribes that client to a named topic.
 */

#include <stdio.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'t', "topic_selector", "Topic selector to subscribe/unsubscribe clients from", ARG_OPTIONAL, ARG_HAS_VALUE, ">foo"},
        END_OF_ARG_OPTS
};
HASH_T *options = NULL;

/*
 * Callback invoked when a client has been successfully subscribed to
 * a topic.
 */
static int
on_subscription_complete(SESSION_T *session, void *context)
{
        printf("Subscription complete\n");
        return HANDLER_SUCCESS;
}

/*
 * Callback invoked when a client session has been opened.
 */
static int
on_session_open(SESSION_T *session, const SESSION_PROPERTIES_EVENT_T *request, void *context)
{
        if(session_id_cmp(*session->id, request->session_id) == 0) {
                // It's our own session, ignore.
                return HANDLER_SUCCESS;
        }

        char *topic_selector = hash_get(options, "topic_selector");

        char *sid_str = session_id_to_string(&request->session_id);
        printf("Subscribing session %s to topic selector %s\n", sid_str, topic_selector);
        free(sid_str);

        /*
         * Subscribe the client session to the topic.
         */
        SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = {
                .session_id = request->session_id,
                .topic_selector = topic_selector,
                .on_complete = on_subscription_complete
        };
        subscribe_client(session, subscribe_params);

        return HANDLER_SUCCESS;
}

int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        /*
         * Create a session with Diffusion.
         */
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "Failed to create session: %s\n", error.message);
                return EXIT_FAILURE;
        }

        /*
         * Register a session properties listener, so we are notified
         * of new client connections.
         * In the callback, we will subscribe the client to topics
         * according to the topic_selector argument.
         */
        SET_T *required_properties = set_new_string(1);
        set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES);
        SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = {
                .on_session_open = on_session_open,
                .required_properties = required_properties
        };
        session_properties_listener_register(session, params);
        set_free(required_properties);

        /*
         * Pretend to do some work.
         */
        sleep(10);

        /*
         * Close session and tidy up.
         */
        session_close(session, NULL);
        session_free(session);

        return EXIT_SUCCESS;
}
                    
Python
#  Copyright (c) 2021 Push Technology Ltd., All Rights Reserved.
#
#  Use is subject to license terms.
#
#  NOTICE: All information contained herein is, and remains the
#  property of Push Technology. The intellectual and technical
#  concepts contained herein are proprietary to Push Technology and
#  may be covered by U.S. and Foreign Patents, patents in process, and
#  are protected by trade secret or copyright law.
""" Example of sending a request to a session filter. """

import asyncio
import diffusion
import diffusion.features.topics as topics

# Diffusion server connection information; same for both sessions
# adjust as needed for the server used in practice
server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")


# stream callback functions
def on_update(*, old_value, topic_path, topic_value, **kwargs):
    print("Topic:", topic_path)
    if old_value is None:
        print("  Initial value:", topic_value)
    else:
        print("  Value updated")
        print("    Old value:", old_value)
        print("    New value:", topic_value)


def on_subscribe(*, topic_path, **kwargs):
    print(f"Subscribed to {topic_path}")


def on_unsubscribe(*, reason, topic_path, **kwargs):
    print(f"Unsubscribed from {topic_path} because {str(reason)}")


# example properties
topic_selector = "foo/bar"
topic_type = diffusion.datatypes.STRING

session_duration = 15

# value stream instance
value_stream = topics.ValueStreamHandler(
    data_type=topic_type,
    update=on_update,
    subscribe=on_subscribe,
    unsubscribe=on_unsubscribe,
)


# Because Python SDK for Diffusion is async, all the code needs to be
# wrapped inside a coroutine function, and executed using asyncio.run.
async def main():

    # creating the session
    async with diffusion.Session(
        url=server_url, principal=principal, credentials=credentials
    ) as session:

        # PUSH_SNIPPET_BEGIN: [topic_notifications.xml/subscribe]
        print(f"Adding value stream {topic_selector}")
        session.topics.add_value_stream(
            topic_selector=topic_selector, stream=value_stream
        )
        print(f"Subscribing to {topic_selector}")
        await session.topics.subscribe(topic_selector)
        # PUSH_SNIPPET_END: [topic_notifications.xml/subscribe]
        await asyncio.sleep(session_duration)

        print(f"Unsubscribing from {topic_selector}")
        await session.topics.unsubscribe(topic_selector)

        await asyncio.sleep(5)  # keep alive to display the unsubscription message


if __name__ == "__main__":
    asyncio.run(main())

Change the URL from that provided in the example to the URL of the Diffusion server.