Receiving topic notifications
Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.
Note: Topic notifications are supported by the Android™
API, Java™
API and JavaScript® API.
The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.
For more details about topic notifications, see Topic notifications.
Required permissions:
and permissions for the specified topicsReceiving topic notifications
A client can register to receive notifications about a set of topics via a listener object.
JavaScript
var listener = { onDescendantNotification: function(topicPath, type) {}, onTopicNotification: function(topicPath, topicSpecification, type) {}, onClose: function() {}, onError: function(error) {} }; session.notifications.addListener(listener).then(function(reg) { reg.select("foo"); });
.NET
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation of a listener for topic notifications. /// </summary> public sealed class TopicNotificationListener { private const string TOPIC_PREFIX = "topic-notifications"; public TopicNotificationListener(string serverUrl) { var selector = $"?{TOPIC_PREFIX}//"; var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open( serverUrl ); var notifications = session.TopicNotifications; INotificationRegistration registration = null; string path = string.Empty; // Register a listener to receive topic notifications. try { registration = await notifications.AddListenerAsync(new Listener()); } catch(Exception ex) { WriteLine($"Failed to add listener : {ex}."); session.Close(); return; } // Start receiving notifications. try { await registration.SelectAsync(selector); } catch (Exception ex) { WriteLine($"Selector '{selector}' registration failed : {ex}."); session.Close(); return; } // Add a topic. try { path = $"{TOPIC_PREFIX}/{DateTime.Now.ToFileTimeUtc()}"; var specification = session.TopicControl.NewSpecification(TopicType.STRING); await session.TopicControl.AddTopicAsync(path, specification); } catch (Exception ex) { WriteLine($"Failed to add topic '{path}' : {ex}."); session.Close(); return; } // Remove the topic. try { await session.TopicControl.RemoveTopicsAsync(path); } catch (Exception ex) { WriteLine($"Failed to remove topic '{path}' : {ex}."); session.Close(); return; } // Stop receiving notifications. try { await registration.DeselectAsync(selector); } catch (Exception ex) { WriteLine($"Deselection failed for selector '{selector}' : {ex}."); session.Close(); return; } // Unregister the listener. try { await registration.CloseAsync(); } catch (Exception ex) { WriteLine($"Failed to unregister the listener : {ex}."); } // Close the session session.Close(); } /// <summary> /// The listener for topic notifications. /// </summary> private class Listener : ITopicNotificationListener { /// <summary> /// Indicates that the stream was closed. /// </summary> public void OnClose() { WriteLine("The listener was closed."); } /// <summary> /// Notification for an immediate descendant of a selected topic path. /// </summary> public void OnDescendantNotification(string topicPath, NotificationType type) { WriteLine($"Descendant topic '{topicPath}' has been {type}."); } /// <summary> /// Indicates an error received by the callback. /// </summary> public void OnError(ErrorReason errorReason) { WriteLine($"The listener received the error: '{errorReason}'."); } /// <summary> /// Notification for a selected topic. /// </summary> public void OnTopicNotification(string topicPath, ITopicSpecification specification, NotificationType type) { WriteLine($"Topic '{topicPath}' has been {type}."); } } } }
Java and Android
final TopicNotifications notifications = session.feature(TopicNotifications.class); final TopicNotificationListener listener = new TopicNotificationListener() { @Override public void onTopicNotification(String topicPath, TopicSpecification specification, NotificationType type) { // Handle notifications for selected/deselected topics } @Override public void onDescendantNotification(String topicPath, NotificationType type) { // Handle notifications for immediate descendants } @Override public void onClose() { // The listener has been closed } @Override public void onError(ErrorReason error) { // The listener has encountered an error } }; final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener); final NotificationRegistration registration = future.get(); registration.select("foo");
Python
# Copyright (c) 2021 Push Technology Ltd., All Rights Reserved. # # Use is subject to license terms. # # NOTICE: All information contained herein is, and remains the # property of Push Technology. The intellectual and technical # concepts contained herein are proprietary to Push Technology and # may be covered by U.S. and Foreign Patents, patents in process, and # are protected by trade secret or copyright law. """ Example of sending a request to a session filter. """ import asyncio import diffusion import diffusion.features.topics as topics # Diffusion server connection information; same for both sessions # adjust as needed for the server used in practice server_url = "ws://localhost:8080" principal = "admin" credentials = diffusion.Credentials("password") # stream callback functions def on_update(*, old_value, topic_path, topic_value, **kwargs): print("Topic:", topic_path) if old_value is None: print(" Initial value:", topic_value) else: print(" Value updated") print(" Old value:", old_value) print(" New value:", topic_value) def on_subscribe(*, topic_path, **kwargs): print(f"Subscribed to {topic_path}") def on_unsubscribe(*, reason, topic_path, **kwargs): print(f"Unsubscribed from {topic_path} because {str(reason)}") # example properties topic_selector = "foo/bar" topic_type = diffusion.datatypes.STRING session_duration = 15 # value stream instance value_stream = topics.ValueStreamHandler( data_type=topic_type, update=on_update, subscribe=on_subscribe, unsubscribe=on_unsubscribe, ) # Because Python SDK for Diffusion is async, all the code needs to be # wrapped inside a coroutine function, and executed using asyncio.run. async def main(): # creating the session async with diffusion.Session( url=server_url, principal=principal, credentials=credentials ) as session: # PUSH_SNIPPET_BEGIN: [topic_notifications.xml/subscribe] print(f"Adding value stream {topic_selector}") session.topics.add_value_stream( topic_selector=topic_selector, stream=value_stream ) print(f"Subscribing to {topic_selector}") await session.topics.subscribe(topic_selector) # PUSH_SNIPPET_END: [topic_notifications.xml/subscribe] await asyncio.sleep(session_duration) print(f"Unsubscribing from {topic_selector}") await session.topics.unsubscribe(topic_selector) await asyncio.sleep(5) # keep alive to display the unsubscription message if __name__ == "__main__": asyncio.run(main())