Example: Subscribe other clients to topics
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
.NET
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Clients; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that subscribes another session to topics. /// </summary> public sealed class SubscriptionControlSubscribe { public SubscriptionControlSubscribe(string serverUrl) { var topic = $"topic/example"; var controlSession = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var subscriptionControl = controlSession.SubscriptionControl; try { await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, cancellationToken); } catch (Exception ex) { WriteLine($"Failed to add topic '{topic}' : {ex}."); controlSession.Close(); return; } ISession session = Diffusion.Sessions.Principal("client") .Credentials(Diffusion.Credentials.Password("password")) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .NoReconnection() .Open(serverUrl); WriteLine($"Session with id '{session.SessionId}' created."); var subscriptionCallback = new SubscriptionCallback(); var topicSelector = "?topic//"; try { subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}."); session.Close(); controlSession.Close(); return; } try { subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}."); } // Close the sessions session.Close(); controlSession.Close(); } /// <summary> /// The callback for subscription operations. /// </summary> private class SubscriptionCallback : ISubscriptionCallback { /// <summary> /// Indicates that the session was closed. /// </summary> public void OnDiscard() { WriteLine("The session was closed."); } /// <summary> /// Indicates that a requested operation has been handled by the server. /// </summary> public void OnComplete() { WriteLine("Subscription complete."); } } } }
Java and Android
package com.pushtechnology.diffusion.examples; import java.util.concurrent.CompletableFuture; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionId; /** * This demonstrates using a client to subscribe and unsubscribe other clients * to topics. * <P> * This uses the 'SubscriptionControl' feature. * * @author Push Technology Limited * @since 5.0 */ public class ControlClientSubscriptionControl { private final Session session; private final SubscriptionControl subscriptionControl; /** * Constructor. */ public ControlClientSubscriptionControl() { session = Diffusion.sessions().principal("control").password("password") .open("ws://diffusion.example.com:80"); subscriptionControl = session.feature(SubscriptionControl.class); } /** * Subscribe a client to topics. * * @param sessionId client to subscribe * @param topicSelector topic selector expression * @param callback for subscription result */ public CompletableFuture<?> subscribe( SessionId sessionId, String topicSelector) { // To subscribe a client to a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.subscribe( sessionId, topicSelector); } /** * Unsubscribe a client from topics. * * @param sessionId client to unsubscribe * @param topicSelector topic selector expression * @return a CompletableFuture that completes when a response is received * from the server */ public CompletableFuture<?> unsubscribe( SessionId sessionId, String topicSelector) { // To unsubscribe a client from a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.unsubscribe( sessionId, topicSelector); } /** * Close the session. */ public void close() { session.close(); } }
C
/* * This example waits to be notified of a client connection, and then * subscribes that client to a named topic. */ #include <stdio.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'t', "topic_selector", "Topic selector to subscribe/unsubscribe clients from", ARG_OPTIONAL, ARG_HAS_VALUE, ">foo"}, END_OF_ARG_OPTS }; HASH_T *options = NULL; /* * Callback invoked when a client has been successfully subscribed to * a topic. */ static int on_subscription_complete(SESSION_T *session, void *context) { printf("Subscription complete\n"); return HANDLER_SUCCESS; } /* * Callback invoked when a client session has been opened. */ static int on_session_open(SESSION_T *session, const SESSION_PROPERTIES_EVENT_T *request, void *context) { if(session_id_cmp(*session->id, request->session_id) == 0) { // It's our own session, ignore. return HANDLER_SUCCESS; } char *topic_selector = hash_get(options, "topic_selector"); char *sid_str = session_id_to_string(&request->session_id); printf("Subscribing session %s to topic selector %s\n", sid_str, topic_selector); free(sid_str); /* * Subscribe the client session to the topic. */ SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = { .session_id = request->session_id, .topic_selector = topic_selector, .on_complete = on_subscription_complete }; subscribe_client(session, subscribe_params); return HANDLER_SUCCESS; } int main(int argc, char **argv) { /* * Standard command-line parsing. */ options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } const char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } /* * Create a session with Diffusion. */ DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { fprintf(stderr, "Failed to create session: %s\n", error.message); return EXIT_FAILURE; } /* * Register a session properties listener, so we are notified * of new client connections. * In the callback, we will subscribe the client to topics * according to the topic_selector argument. */ SET_T *required_properties = set_new_string(1); set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES); SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = { .on_session_open = on_session_open, .required_properties = required_properties }; session_properties_listener_register(session, params); set_free(required_properties); /* * Pretend to do some work. */ sleep(10); /* * Close session and tidy up. */ session_close(session, NULL); session_free(session); return EXIT_SUCCESS; }
Python
# Copyright (c) 2021 Push Technology Ltd., All Rights Reserved. # # Use is subject to license terms. # # NOTICE: All information contained herein is, and remains the # property of Push Technology. The intellectual and technical # concepts contained herein are proprietary to Push Technology and # may be covered by U.S. and Foreign Patents, patents in process, and # are protected by trade secret or copyright law. """ Example of sending a request to a session filter. """ import asyncio import diffusion import diffusion.features.topics as topics # Diffusion server connection information; same for both sessions # adjust as needed for the server used in practice server_url = "ws://localhost:8080" principal = "admin" credentials = diffusion.Credentials("password") # stream callback functions def on_update(*, old_value, topic_path, topic_value, **kwargs): print("Topic:", topic_path) if old_value is None: print(" Initial value:", topic_value) else: print(" Value updated") print(" Old value:", old_value) print(" New value:", topic_value) def on_subscribe(*, topic_path, **kwargs): print(f"Subscribed to {topic_path}") def on_unsubscribe(*, reason, topic_path, **kwargs): print(f"Unsubscribed from {topic_path} because {str(reason)}") # example properties topic_selector = "foo/bar" topic_type = diffusion.datatypes.STRING session_duration = 15 # value stream instance value_stream = topics.ValueStreamHandler( data_type=topic_type, update=on_update, subscribe=on_subscribe, unsubscribe=on_unsubscribe, ) # Because Python SDK for Diffusion is async, all the code needs to be # wrapped inside a coroutine function, and executed using asyncio.run. async def main(): # creating the session async with diffusion.Session( url=server_url, principal=principal, credentials=credentials ) as session: # PUSH_SNIPPET_BEGIN: [topic_notifications.xml/subscribe] print(f"Adding value stream {topic_selector}") session.topics.add_value_stream( topic_selector=topic_selector, stream=value_stream ) print(f"Subscribing to {topic_selector}") await session.topics.subscribe(topic_selector) # PUSH_SNIPPET_END: [topic_notifications.xml/subscribe] await asyncio.sleep(session_duration) print(f"Unsubscribing from {topic_selector}") await session.topics.unsubscribe(topic_selector) await asyncio.sleep(5) # keep alive to display the unsubscription message if __name__ == "__main__": asyncio.run(main())
Change the URL from that provided in the example to the URL of the Diffusion server.