Just a second...

Example: Create a topic

The following examples use the TopicControl feature in the Diffusion™ API to create topics.

JavaScript
// Connect to the server. Change these options to suit your own environment.
// Node.js does not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
const session = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
});

// 1. Topics can be created with a specified topic path and value. If the path contains multiple levels, any
// intermediary topic path that do not already have topics remain unchanged.

// Create a topic with string values, and an initial value of "xyz".
await session.topics.add('topic/string', diffusion.topics.TopicType.STRING);
await session.topicUpdate.set('topic/string', diffusion.datatypes.string(), 'abc');

// Create a topic with integer values, and an initial value of 123.
await session.topics.add('topic/integer', diffusion.topics.TopicType.INT64);
await session.topicUpdate.set('topic/integer', diffusion.datatypes.int64(), 123);

// Create a topic with decimal values, with an implicit scale of 2 and an initial value of 1.23.
await session.topics.add('topic/decimal', diffusion.topics.TopicType.DOUBLE);
await session.topicUpdate.set('topic/decimal', diffusion.datatypes.double(), 1.23);

// 2. Adding a topic returns a result, which allows us to handle when the operation has either
// completed successfully or encountered an error.
try {
    const result = await session.topics.add('topic/result', diffusion.topics.TopicType.STRING);
    console.log('Added topic: ' + result.topic);
} catch(err) {
    console.log('Failed to add topic: ', err);
}

// Adding a topic that already exists will succeed, so long as it has the same value type
const result = await session.topics.add('topic/result', diffusion.topics.TopicType.STRING);
// result.added will be false, as the topic already existed
console.log('Added topic: ' + result.topic, result.added);

.NET
/**
 * Copyright © 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that adds and updates a string topic.
    /// </summary>
    public sealed class StringTopicsManager {

        public async Task StringTopicsManagerExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) 
                    => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var topicControl = session.TopicControl;
            var topicUpdate = session.TopicUpdate;

            // Create a string topic 'random/string'
            string topic = "random/string";

            try {
                await topicControl.AddTopicAsync(topic, TopicType.STRING);
                WriteLine($"Topic '{topic}' added successfully.");
            }
            catch (Exception ex) 
            {
                WriteLine( $"Failed to add topic '{topic}' : {ex}." );
                session.Close();
                return;
            }

            WriteLine($"Updating topic '{topic}' with a new value:");

            string newValue = string.Format("date:{0},time:{1}", DateTime.Today.Date.ToString("D"), DateTime.Now.TimeOfDay.ToString("g"));

            try
            {
                await topicUpdate.SetAsync(topic, newValue);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} could not be updated : {ex}.");
            }

            // Remove the string topic 'random/string'
            try {
                await topicControl.RemoveTopicsAsync(topic);
            } 
            catch (Exception ex) 
            {
                WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
            }

            // Close the session
            session.Close();
        }
    }

    /// <summary>
    /// Client implementation that subscribes to a string topic and consumes the data it receives.
    /// </summary>
    public sealed class StringTopicsConsumer
    {
        public async Task StringTopicsConsumerExample(string serverUrl)
        {
            var session = Diffusion.Sessions.Open(serverUrl);

            // Get the Topics feature to subscribe to topics
            var topics = session.Topics;
            string topic = "random/string";

            // Add a topic stream for 'random/string'
            var stringStream = new StringStream();
            topics.AddStream(topic, stringStream);

            try
            {
                // Subscribe to 'random/string' topic
                await topics.SubscribeAsync(topic);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
            }
            finally
            {
                await topics.UnsubscribeAsync(topic);
                topics.RemoveStream(stringStream);
                session.Close();
            }
        }

        /// <summary>
        /// Basic implementation of the IValueStream for string topics.
        /// </summary>
        private sealed class StringStream : IValueStream<string>
        {
            /// <summary>
            /// Notification of stream being closed normally.
            /// </summary>
            public void OnClose()
                => WriteLine("The subscrption stream is now closed.");

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason.</param>
            public void OnError(ErrorReason errorReason)
                => WriteLine($"An error has occured : {errorReason}.");

            /// <summary>
            /// Notification of a successful subscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            public void OnSubscription(string topicPath, ITopicSpecification specification)
                => WriteLine($"Client subscribed to topic '{topicPath}'.");

            /// <summary>
            /// Notification of a successful unsubscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="reason">Error reason.</param>
            public void OnUnsubscription(string topicPath, ITopicSpecification specification,
                                         TopicUnsubscribeReason reason)
                => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");

            /// <summary>
            /// Topic update received.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="oldValue">Value prior to update.</param>
            /// <param name="newValue">Value after update.</param>
            public void OnValue(string topicPath, ITopicSpecification specification, string oldValue,
                                string newValue)
                => WriteLine($"New value of topic '{topicPath}' is {newValue}.");
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2016, 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddTopicResult;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.TopicRemovalResult;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * An example of using a control client to add and remove JSON topics.
 *
 * This uses the 'TopicControl' feature
 *
 * @author DiffusionData Limited
 */
public final class ControlClientUpdatingJSONTopics {

    public static void main(String[] args) {

        final Session session = Diffusion.sessions()
            .principal("admin")
            .password("password")
            .open("ws://localhost:8080");

        final TopicControl topicControl = session.feature(TopicControl.class);

        // add a JSON topic
        AddTopicResult addResult = topicControl
            .addTopic("my/topic/path/hello", TopicType.JSON).join();
        System.out.println("add result: " + addResult);

        // add a JSON topic with a specification
        final TopicSpecification specification = newTopicSpecification(TopicType.JSON);
        addResult = topicControl.addTopic("my/topic/path/world", specification).join();
        System.out.println("add result: " + addResult);

        // remove the topics using a topic selector
        TopicRemovalResult removeResult = topicControl.removeTopics("?my/topic/path//").join();
        System.out.println("topics removed = " + removeResult.getRemovedCount());

        session.close();
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_topic_added(
        SESSION_T *session,
        TOPIC_ADD_RESULT_CODE result_code,
        void *context)
{
        // topic was added
        return HANDLER_SUCCESS;
}


static int on_topic_removed(
        SESSION_T *session,
        const DIFFUSION_TOPIC_REMOVAL_RESULT_T *result,
        void *context)
{
        // topic was removed
        return HANDLER_SUCCESS;
}


int main(int argc, char** argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // create JSON topic
        TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);

        ADD_TOPIC_CALLBACK_T add_topic_callback = {
                .on_topic_added_with_specification = on_topic_added,
        };
        add_topic_from_specification(session, topic_path, specification, add_topic_callback);

        // Sleep for a while
        sleep(5);

        // remove JSON topic
        TOPIC_REMOVAL_PARAMS_T params = {
                .topic_selector = topic_path,
                .on_removed = on_topic_removed
        };
        topic_removal(session, params);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        topic_specification_free(specification);
        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class TopicControl {

    func add_json_topic(session: PTDiffusionSession, topic_path: String) {

        session.topicControl.addTopic(withPath: topic_path,
                                      type: PTDiffusionTopicType.JSON) { (result, error) in
            if (error != nil) {
                print("An error has occurred while adding topic '%@': %@",
                      topic_path,
                      error!.localizedDescription)
            }
            else if (result == PTDiffusionAddTopicResult.exists()) {

                print("Topic '%@' already existed", topic_path)
            }
            else {

                print("Topic '%@' was created", topic_path)
            }
        }
    }


    func remove_topic(session: PTDiffusionSession, topic_path: String) {

        session.topicControl.removeTopics(withTopicSelectorExpression: topic_path) { (result, error) in
            if (error != nil) {
                print("An error has occurred while removing topic '%@': %@",
                      topic_path,
                      error!.localizedDescription)
            }
            else {

                print("Topic '%@' has been successfully removed", topic_path)
            }
        }

    }

}

Change the URL from that provided in the example to the URL of Diffusion Cloud . Diffusion Cloud service URLs end in diffusion.cloud