Example: Create a topic
The following examples use the TopicControl feature in the Diffusion™ API to create topics.
JavaScript
// Connect to the server. Change these options to suit your own environment. // Node.js does not accept self-signed certificates by default. If you have // one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0 // before running this example. const session = await diffusion.connect({ host : 'host_name', port : 443, secure : true, principal : 'control', credentials : 'password' }); // 1. Topics can be created with a specified topic path and value. If the path contains multiple levels, any // intermediary topic path that do not already have topics remain unchanged. // Create a topic with string values, and an initial value of "xyz". await session.topics.add('topic/string', diffusion.topics.TopicType.STRING); await session.topicUpdate.set('topic/string', diffusion.datatypes.string(), 'abc'); // Create a topic with integer values, and an initial value of 123. await session.topics.add('topic/integer', diffusion.topics.TopicType.INT64); await session.topicUpdate.set('topic/integer', diffusion.datatypes.int64(), 123); // Create a topic with decimal values, with an implicit scale of 2 and an initial value of 1.23. await session.topics.add('topic/decimal', diffusion.topics.TopicType.DOUBLE); await session.topicUpdate.set('topic/decimal', diffusion.datatypes.double(), 1.23); // 2. Adding a topic returns a result, which allows us to handle when the operation has either // completed successfully or encountered an error. try { const result = await session.topics.add('topic/result', diffusion.topics.TopicType.STRING); console.log('Added topic: ' + result.topic); } catch(err) { console.log('Failed to add topic: ', err); } // Adding a topic that already exists will succeed, so long as it has the same value type const result = await session.topics.add('topic/result', diffusion.topics.TopicType.STRING); // result.added will be false, as the topic already existed console.log('Added topic: ' + result.topic, result.added);
.NET
/** * Copyright © 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds and updates a string topic. /// </summary> public sealed class StringTopicsManager { public async Task StringTopicsManagerExample(string serverUrl) { var session = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var topicControl = session.TopicControl; var topicUpdate = session.TopicUpdate; // Create a string topic 'random/string' string topic = "random/string"; try { await topicControl.AddTopicAsync(topic, TopicType.STRING); WriteLine($"Topic '{topic}' added successfully."); } catch (Exception ex) { WriteLine( $"Failed to add topic '{topic}' : {ex}." ); session.Close(); return; } WriteLine($"Updating topic '{topic}' with a new value:"); string newValue = string.Format("date:{0},time:{1}", DateTime.Today.Date.ToString("D"), DateTime.Now.TimeOfDay.ToString("g")); try { await topicUpdate.SetAsync(topic, newValue); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); } // Remove the string topic 'random/string' try { await topicControl.RemoveTopicsAsync(topic); } catch (Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session session.Close(); } } /// <summary> /// Client implementation that subscribes to a string topic and consumes the data it receives. /// </summary> public sealed class StringTopicsConsumer { public async Task StringTopicsConsumerExample(string serverUrl) { var session = Diffusion.Sessions.Open(serverUrl); // Get the Topics feature to subscribe to topics var topics = session.Topics; string topic = "random/string"; // Add a topic stream for 'random/string' var stringStream = new StringStream(); topics.AddStream(topic, stringStream); try { // Subscribe to 'random/string' topic await topics.SubscribeAsync(topic); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { await topics.UnsubscribeAsync(topic); topics.RemoveStream(stringStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for string topics. /// </summary> private sealed class StringStream : IValueStream<string> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, string oldValue, string newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue}."); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2016, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.diffusion.manual; import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddTopicResult; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.TopicRemovalResult; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * An example of using a control client to add and remove JSON topics. * * This uses the 'TopicControl' feature * * @author DiffusionData Limited */ public final class ControlClientUpdatingJSONTopics { public static void main(String[] args) { final Session session = Diffusion.sessions() .principal("admin") .password("password") .open("ws://localhost:8080"); final TopicControl topicControl = session.feature(TopicControl.class); // add a JSON topic AddTopicResult addResult = topicControl .addTopic("my/topic/path/hello", TopicType.JSON).join(); System.out.println("add result: " + addResult); // add a JSON topic with a specification final TopicSpecification specification = newTopicSpecification(TopicType.JSON); addResult = topicControl.addTopic("my/topic/path/world", specification).join(); System.out.println("add result: " + addResult); // remove the topics using a topic selector TopicRemovalResult removeResult = topicControl.removeTopics("?my/topic/path//").join(); System.out.println("topics removed = " + removeResult.getRemovedCount()); session.close(); } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_topic_added( SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context) { // topic was added return HANDLER_SUCCESS; } static int on_topic_removed( SESSION_T *session, const DIFFUSION_TOPIC_REMOVAL_RESULT_T *result, void *context) { // topic was removed return HANDLER_SUCCESS; } int main(int argc, char** argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // create JSON topic TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON); ADD_TOPIC_CALLBACK_T add_topic_callback = { .on_topic_added_with_specification = on_topic_added, }; add_topic_from_specification(session, topic_path, specification, add_topic_callback); // Sleep for a while sleep(5); // remove JSON topic TOPIC_REMOVAL_PARAMS_T params = { .topic_selector = topic_path, .on_removed = on_topic_removed }; topic_removal(session, params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); topic_specification_free(specification); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class TopicControl { func add_json_topic(session: PTDiffusionSession, topic_path: String) { session.topicControl.addTopic(withPath: topic_path, type: PTDiffusionTopicType.JSON) { (result, error) in if (error != nil) { print("An error has occurred while adding topic '%@': %@", topic_path, error!.localizedDescription) } else if (result == PTDiffusionAddTopicResult.exists()) { print("Topic '%@' already existed", topic_path) } else { print("Topic '%@' was created", topic_path) } } } func remove_topic(session: PTDiffusionSession, topic_path: String) { session.topicControl.removeTopics(withTopicSelectorExpression: topic_path) { (result, error) in if (error != nil) { print("An error has occurred while removing topic '%@': %@", topic_path, error!.localizedDescription) } else { print("Topic '%@' has been successfully removed", topic_path) } } } }
Change the URL from that provided in the example to the URL of Diffusion Cloud . Diffusion Cloud service URLs end in diffusion.cloud