Just a second...

Example: Subscribe other clients to topics via session ID

The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.

.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Clients;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes another session to topics.
    /// </summary>
    public sealed class SubscriptionControlSubscribe
    {
        public async Task SubscriptionControlSubscribeExample(string serverUrl)
        {
            var topic = $"topic/example";

            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var subscriptionControl = controlSession.SubscriptionControl;

            try
            {
                await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{topic}' : {ex}.");
                controlSession.Close();
                return;
            }

            ISession session = Diffusion.Sessions.Principal("client")
                .Credentials(Diffusion.Credentials.Password("password"))
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .NoReconnection()
                .Open(serverUrl);

            WriteLine($"Session with id '{session.SessionId}' created.");

            var subscriptionCallback = new SubscriptionCallback();

            var topicSelector = "?topic//";

            try
            {
                subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}.");
                session.Close();
                controlSession.Close();
                return;
            }

            try
            {
                subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}.");
            }

            // Close the sessions
            session.Close();
            controlSession.Close();
        }

        /// <summary>
        /// The callback for subscription operations.
        /// </summary>
        private class SubscriptionCallback : ISubscriptionCallback
        {
            /// <summary>
            /// Indicates that the session was closed.
            /// </summary>
            public void OnDiscard()
            {
                WriteLine("The session was closed.");
            }

            /// <summary>
            /// Indicates that a requested operation has been handled by the server.
            /// </summary>
            public void OnComplete()
            {
                WriteLine("Subscription complete.");
            }
        }
    }
}
Java and Android
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.CompletableFuture;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionId;

/**
 * This demonstrates using a client to subscribe and unsubscribe other clients
 * to topics.
 * <P>
 * This uses the 'SubscriptionControl' feature.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public class ControlClientSubscriptionControl {

    private final Session session;

    private final SubscriptionControl subscriptionControl;

    /**
     * Constructor.
     */
    public ControlClientSubscriptionControl() {

        session =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");

        subscriptionControl = session.feature(SubscriptionControl.class);
    }

    /**
     * Subscribe a client to topics.
     *
     * @param sessionId client to subscribe
     * @param topicSelector topic selector expression
     * @param callback for subscription result
     */
    public CompletableFuture<?> subscribe(
        SessionId sessionId,
        String topicSelector) {

        // To subscribe a client to a topic, this client session
        // must have the 'modify_session' permission.
        return subscriptionControl.subscribe(
            sessionId,
            topicSelector);
    }

    /**
     * Unsubscribe a client from topics.
     *
     * @param sessionId client to unsubscribe
     * @param topicSelector topic selector expression
     * @return a CompletableFuture that completes when a response is received
     *         from the server
     */
    public CompletableFuture<?> unsubscribe(
        SessionId sessionId,
        String topicSelector) {

        // To unsubscribe a client from a topic, this client session
        // must have the 'modify_session' permission.
        return subscriptionControl.unsubscribe(
            sessionId,
            topicSelector);
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }
}
                    
C
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


char *g_topic_selector = "my/topic/path";


static int on_subscription_complete(
        SESSION_T *session,
        void *context)
{
        // client has been successfully subscribed to a topic
        return HANDLER_SUCCESS;
}


static int on_session_open(
        SESSION_T *session,
        const SESSION_PROPERTIES_EVENT_T *request,
        void *context)
{
        if(session_id_cmp(*session->id, request->session_id) == 0) {
                // It's our own session, ignore.
                return HANDLER_SUCCESS;
        }

        // Subscribe the client session to the topic
        SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = {
                .session_id = request->session_id,
                .topic_selector = g_topic_selector,
                .on_complete = on_subscription_complete
        };
        subscribe_client(session, subscribe_params);

        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        // create a session with Diffusion
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        /*
         * Register a session properties listener, so we are notified
         * of new client connections.
         */
        SET_T *required_properties = set_new_string(1);
        set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES);
        SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = {
                .on_session_open = on_session_open,
                .required_properties = required_properties
        };
        session_properties_listener_register(session, params);
        set_free(required_properties);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SubscriptionControl {

    func subscribe(session: PTDiffusionSession,
                   sessionId: PTDiffusionSessionId,
                   topic_path: String) {

        session.subscriptionControl.subscribeSessionId(sessionId,
                                                       withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while subscribing session '%@' to topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path)
            }
        }
    }


    func unsubscribe(session: PTDiffusionSession,
                     sessionId: PTDiffusionSessionId,
                     topic_path: String) {

        session.subscriptionControl.unsubscribeSessionId(sessionId,
                                                         withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while unsubscribing session '%@' from topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path)
            }
        }
    }



}

Change the URL from that provided in the example to the URL of the Diffusion server.