Just a second...

Example: Subscribe other clients to topics via session filter

The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.

Subscribe other clients to topics via session filter

.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Clients;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Types;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes to topics with a filter.
    /// </summary>
    public sealed class SubscriptionControlSubscribeFilter
    {
        public SubscriptionControlSubscribeFilter(string serverUrl)
        {
            var topic = $"?topic-example//";

            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var clientSession = Diffusion.Sessions.Principal("client").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open($"{serverUrl}");

            var subscriptionControl = controlSession.SubscriptionControl;

            var filterCallback = new SubscriptionByFilterCallback();

            var filter = "$SessionId is \"" + clientSession.SessionId + "\"";

            try
            {
                subscriptionControl.SubscribeByFilter(filter, topic, filterCallback);

                WriteLine($"Sessions satisfying filter '{filter}' are subscribed to '{topic}'.");

                Thread.Sleep(300);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe by filter '{filter}' : {ex}.");
                controlSession.Close();
                return;
            }

            try
            {
                subscriptionControl.UnsubscribeByFilter(filter, topic, filterCallback);

                WriteLine($"Sessions satisfying filter '{filter}' are unsubscribed to '{topic}'.");

                Thread.Sleep(300);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe by filter '{filter}' : {ex}.");
            }

            controlSession.Close();
            clientSession.Close();
        }

        /// <summary>
        /// The callback for filtered subscriptions and unsubscriptions.
        /// </summary>
        private class SubscriptionByFilterCallback : ISubscriptionByFilterCallback
        {
            /// <summary>
            /// Indicates successful processing of the request at the server.
            /// </summary>
            /// <param name="numberSelected">Indicates the number of sessions that satisfied the filter and which qualified
            /// for subscription/unsubscription.</param>
            public void OnComplete(int numberSelected)
            {
                WriteLine($"The number of sessions that qualified for subscription/unsubscription is: {numberSelected}.");
            }

            /// <summary>
            /// The filter was rejected. No sessions were subscribed/unsubscribed.
            /// </summary>
            /// <param name="errors">Errors found.</param>
            public void OnRejected(ICollection<IErrorReport> errors)
            {
                WriteLine($"The following errors occured:");

                foreach(var error in errors)
                {
                    WriteLine($"{error}.");
                }
            }

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason provided.</param>
            public void OnError(ErrorReason errorReason)
            {
                WriteLine($"An error has occured : {errorReason}.");
            }
        }
    }
}
C
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_subscription_by_filter_complete(
        const int number_selected,
        void *context)
{
        // clients that matched `session_filter` have been successfully subscribed to `topic_path`
        return HANDLER_SUCCESS;
}


static int on_unsubscription_by_filter_complete(
        const int number_selected,
        void *context)
{
        // clients that matched `session_filter` have been successfully unsubscribed from `topic_path`
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";
        const char *session_filter = "$Principal is 'client'";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        // create a session with Diffusion
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // subscribe sessions that match `session_filter` to `topic_path`
        DIFFUSION_SUBSCRIBE_BY_FILTER_PARAMS_T sub_params = {
                .topic_selector = topic_path,
                .filter = session_filter,
                .on_subscribe_by_filter = on_subscription_by_filter_complete,
        };
        diffusion_subscribe_by_filter(session, sub_params, NULL);

        // Sleep for a while
        sleep(5);

        // unsubscribe sessions that match `session_filter` from `topic_path`
        DIFFUSION_UNSUBSCRIBE_BY_FILTER_PARAMS_T unsub_params = {
                .topic_selector = topic_path,
                .filter = session_filter,
                .on_unsubscribe_by_filter = on_unsubscription_by_filter_complete,
        };
        diffusion_unsubscribe_by_filter(session, unsub_params, NULL);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SubscriptionControlFilter {

    func subscribe(session: PTDiffusionSession,
                   filter: String,
                   topic_path: String) {

        session.subscriptionControl.subscribe(withFilter: filter,
                                              topicSelectorExpression: topic_path) { (selected_sessions, error) in
            if (error != nil) {
                print("An error has occurred while subscribing session that match filter '%@' to topic '%@': %@",
                      filter,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Successfully subscribed %lu matching sessions with filter '%@' to topic '%@'",
                      selected_sessions,
                      filter,
                      topic_path)
            }
        }
    }


    func unsubscribe(session: PTDiffusionSession,
                     filter: String,
                     topic_path: String) {

        session.subscriptionControl.unsubscribe(withFilter: filter,
                                                topicSelectorExpression: topic_path) { (selected_sessions, error) in
            if (error != nil) {
                print("An error has occurred while unsubscribing session that match filter '%@' from topic '%@': %@",
                      filter,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Successfully unsubscribed %lu matching sessions with filter '%@' from topic '%@'",
                      selected_sessions,
                      filter,
                      topic_path)
            }
        }
    }



}

Change the URL from that provided in the example to the URL of the Diffusion server.