Just a second...

Example: Subscribe other clients to topics via session filter

The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.

Subscribe other clients to topics via session filter

JavaScript
const controlSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
});

const topic = 'topic/example';
controlSession.topics.add(topic, diffusion.topics.TopicType.STRING);

const clientSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true
});

const filter = `$SessionId is "${clientSession.sessionId}"`;
const topicSelector = '?topic//';
const numSubscribed = await controlSession.clients.subscribe(filter, topicSelector);

console.log(`${numSubscribed} sessions satisfying filter "${filter}" have been subscribed to topic "${topicSelector}".`)
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Clients;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Types;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes to topics with a filter.
    /// </summary>
    public sealed class SubscriptionControlSubscribeFilter
    {
        public SubscriptionControlSubscribeFilter(string serverUrl)
        {
            var topic = $"?topic-example//";

            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var clientSession = Diffusion.Sessions.Principal("client").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open($"{serverUrl}");

            var subscriptionControl = controlSession.SubscriptionControl;

            var filterCallback = new SubscriptionByFilterCallback();

            var filter = "$SessionId is \"" + clientSession.SessionId + "\"";

            try
            {
                subscriptionControl.SubscribeByFilter(filter, topic, filterCallback);

                WriteLine($"Sessions satisfying filter '{filter}' are subscribed to '{topic}'.");

                Thread.Sleep(300);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe by filter '{filter}' : {ex}.");
                controlSession.Close();
                return;
            }

            try
            {
                subscriptionControl.UnsubscribeByFilter(filter, topic, filterCallback);

                WriteLine($"Sessions satisfying filter '{filter}' are unsubscribed to '{topic}'.");

                Thread.Sleep(300);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe by filter '{filter}' : {ex}.");
            }

            controlSession.Close();
            clientSession.Close();
        }

        /// <summary>
        /// The callback for filtered subscriptions and unsubscriptions.
        /// </summary>
        private class SubscriptionByFilterCallback : ISubscriptionByFilterCallback
        {
            /// <summary>
            /// Indicates successful processing of the request at the server.
            /// </summary>
            /// <param name="numberSelected">Indicates the number of sessions that satisfied the filter and which qualified
            /// for subscription/unsubscription.</param>
            public void OnComplete(int numberSelected)
            {
                WriteLine($"The number of sessions that qualified for subscription/unsubscription is: {numberSelected}.");
            }

            /// <summary>
            /// The filter was rejected. No sessions were subscribed/unsubscribed.
            /// </summary>
            /// <param name="errors">Errors found.</param>
            public void OnRejected(ICollection<IErrorReport> errors)
            {
                WriteLine($"The following errors occured:");

                foreach(var error in errors)
                {
                    WriteLine($"{error}.");
                }
            }

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason provided.</param>
            public void OnError(ErrorReason errorReason)
            {
                WriteLine($"An error has occured : {errorReason}.");
            }
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * This demonstrates using a client to subscribe and unsubscribe other clients
 * to topics using a session filter.
 * <P>
 * This uses the 'SubscriptionControl' feature.
 *
 * @author DiffusionData Limited
 */
public final class SubscriptionControlSubscribeFilter {

    public static void main(String[] args) throws Exception {

        final String myTopicPath = "my/topic/path";

        // establish control and client sessions
        final Session controlSession = Diffusion.sessions()
            .principal("control")
            .password("password")
            .open("ws://localhost:8080");

        final Session clientSession = Diffusion.sessions()
            .principal("client")
            .password("password")
            .open("ws://localhost:8080");

        // initialise SubscriptionControl and TopicUpdate features
        final SubscriptionControl subscriptionControl = controlSession
            .feature(SubscriptionControl.class);
        final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class);

        // register a stream to receive topic events
        clientSession.feature(Topics.class)
            .addStream(myTopicPath, String.class, new MyValueStream());

        // subscribe the client session to the topic using a session filter
        subscriptionControl.subscribeByFilter("$Principal is 'client'", myTopicPath)
            .join();

        // add and set a topic with an initial value
        topicUpdate.addAndSet(
            myTopicPath,
            newTopicSpecification(TopicType.STRING),
            String.class,
            "myValue"
        ).join();

        // set a new topic value
        topicUpdate.set(myTopicPath, String.class, "myNewValue").join();

        // unsubscribe the client session from the topic using the session filter
        subscriptionControl.unsubscribeByFilter("$Principal is 'client'", myTopicPath);

        Thread.sleep(1000);

        clientSession.close();
        controlSession.close();
    }

    private static class MyValueStream implements Topics.ValueStream<String> {
        @Override
        public void onValue(String topicPath,
            TopicSpecification topicSpecification,
            String oldValue, String newValue) {
            System.out.printf("%s set to %s\n", topicPath, newValue);
        }

        @Override
        public void onSubscription(String topicPath,
            TopicSpecification topicSpecification) {
            System.out.printf("subscribed to %s\n", topicPath);
        }

        @Override
        public void onUnsubscription(String topicPath,
            TopicSpecification topicSpecification,
            Topics.UnsubscribeReason unsubscribeReason) {
            System.out.printf("unsubscribed from %s\n", topicPath);
        }

        @Override public void onClose() { }

        @Override public void onError(ErrorReason errorReason) { }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_subscription_by_filter_complete(
        const int number_selected,
        void *context)
{
        // clients that matched `session_filter` have been successfully subscribed to `topic_path`
        return HANDLER_SUCCESS;
}


static int on_unsubscription_by_filter_complete(
        const int number_selected,
        void *context)
{
        // clients that matched `session_filter` have been successfully unsubscribed from `topic_path`
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";
        const char *session_filter = "$Principal is 'client'";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        // create a session with Diffusion
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // subscribe sessions that match `session_filter` to `topic_path`
        DIFFUSION_SUBSCRIBE_BY_FILTER_PARAMS_T sub_params = {
                .topic_selector = topic_path,
                .filter = session_filter,
                .on_subscribe_by_filter = on_subscription_by_filter_complete,
        };
        diffusion_subscribe_by_filter(session, sub_params, NULL);

        // Sleep for a while
        sleep(5);

        // unsubscribe sessions that match `session_filter` from `topic_path`
        DIFFUSION_UNSUBSCRIBE_BY_FILTER_PARAMS_T unsub_params = {
                .topic_selector = topic_path,
                .filter = session_filter,
                .on_unsubscribe_by_filter = on_unsubscription_by_filter_complete,
        };
        diffusion_unsubscribe_by_filter(session, unsub_params, NULL);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SubscriptionControlFilter {

    func subscribe(session: PTDiffusionSession,
                   filter: String,
                   topic_path: String) {

        session.subscriptionControl.subscribe(withFilter: filter,
                                              topicSelectorExpression: topic_path) { (selected_sessions, error) in
            if (error != nil) {
                print("An error has occurred while subscribing session that match filter '%@' to topic '%@': %@",
                      filter,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Successfully subscribed %lu matching sessions with filter '%@' to topic '%@'",
                      selected_sessions,
                      filter,
                      topic_path)
            }
        }
    }


    func unsubscribe(session: PTDiffusionSession,
                     filter: String,
                     topic_path: String) {

        session.subscriptionControl.unsubscribe(withFilter: filter,
                                                topicSelectorExpression: topic_path) { (selected_sessions, error) in
            if (error != nil) {
                print("An error has occurred while unsubscribing session that match filter '%@' from topic '%@': %@",
                      filter,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Successfully unsubscribed %lu matching sessions with filter '%@' from topic '%@'",
                      selected_sessions,
                      filter,
                      topic_path)
            }
        }
    }



}

Change the URL from that provided in the example to the URL of the Diffusion server .