Just a second...

Example: Create a JSON topic

The following examples create a JSON topic and receive a stream of values from the topic.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {

    // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier
    // to assign these directly to a local variable.
    var jsonDataType = diffusion.datatypes.json();

    // 2. Data Types are currently provided for JSON and Binary topic types.
    session.topics.add('topic/json', diffusion.topics.TopicType.JSON);

    // 3. Values can be created directly from the data type.
    var jsonValue = jsonDataType.from({
        "foo" : "bar"
    });

    // Topics are updated using the standard update mechanisms
    session.topics.update('topic/json', jsonValue);

    // Subscriptions are performed normally
    session.select('topic/json');

    // 4. Streams can be specialised to provide values from a specific datatype.
    session.addStream('topic/json', jsonDataType).on('value', function(topic, specification, newValue, oldValue) {
        // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the
        // new value, and the old value.

        // The oldValue parameter will be undefined if this is the first value received for a topic.

        // For JSON topics, value#get returns a JavaScript object
        // For Binary topics, value#get returns a Buffer instance
        console.log("Update for " + topic, newValue.get());
    });

    // 5. Raw values of an appropriate type can also be used for JSON and Binary topics.
    // For example, plain JSON objects can be used to update JSON topics.
    session.topics.update('topic/json', {
         "foo" : "baz",
         "numbers" : [1, 2, 3]
    });
});
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example
{
    /// <summary>
    /// Client implementation that adds and updates a JSON topic.
    /// </summary>
    public sealed class JSONTopicsManager {

        public JSONTopicsManager(string serverUrl) {
            var session = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) 
                    => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var topicControl = session.TopicControl;
            var topicUpdate = session.TopicUpdate;

            // Create a JSON topic 'random/JSON'
            string topic = "random/JSON";

            try {
                await topicControl.AddTopicAsync(topic, TopicType.JSON);
                WriteLine($"Topic '{topic}' added successfully.");
            }
            catch (Exception ex) 
            {
                WriteLine( $"Failed to add topic '{topic}' : {ex}." );
                session.Close();
                return;
            }

            WriteLine($"Updating topic '{topic}' with a new value:");

            var newValue = Diffusion.DataTypes.JSON.FromJSONString(
                "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," +
                "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}");

            try
            {
                await topicUpdate.SetAsync(topic, newValue);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} could not be updated : {ex}.");
            }

            // Remove the JSON topic 'random/JSON'
            try {
                await topicControl.RemoveTopicsAsync(topic);
            } 
            catch (Exception ex) 
            {
                WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
            }

            // Close the session
            session.Close();
        }
    }

    /// <summary>
    /// Client implementation that subscribes to a JSON topic and consumes the data it receives.
    /// </summary>
    public sealed class JSONTopicsConsumer
    {
        public JSONTopicsConsumer(string serverUrl)
        {
            // Connect anonymously
            var session = Diffusion.Sessions.Open(serverUrl);

            // Get the Topics feature to subscribe to topics
            var topics = session.Topics;
            string topic = "random/JSON";

            // Add a topic stream for 'random/JSON'
            var jsonStream = new JSONStream();
            topics.AddStream(topic, jsonStream);

            try
            {
                // Subscribe to 'random/JSON' topic
                await topics.SubscribeAsync(topic);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
            }
            finally
            {
                // Note that closing the session, will automatically unsubscribe from all topics
                // the client is subscribed to.
                topics.RemoveStream(jsonStream);
                session.Close();
            }
        }

        /// <summary>
        /// Basic implementation of the IValueStream for JSON topics.
        /// </summary>
        private sealed class JSONStream : IValueStream<IJSON>
        {
            /// <summary>
            /// Notification of stream being closed normally.
            /// </summary>
            public void OnClose()
                => WriteLine("The subscrption stream is now closed.");

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason.</param>
            public void OnError(ErrorReason errorReason)
                => WriteLine($"An error has occured : {errorReason}.");

            /// <summary>
            /// Notification of a successful subscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            public void OnSubscription(string topicPath, ITopicSpecification specification)
                => WriteLine($"Client subscribed to topic '{topicPath}'.");

            /// <summary>
            /// Notification of a successful unsubscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="reason">Error reason.</param>
            public void OnUnsubscription(string topicPath, ITopicSpecification specification, 
                                         TopicUnsubscribeReason reason)
                => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");

            /// <summary>
            /// Topic update received.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="oldValue">Value prior to update.</param>
            /// <param name="newValue">Value after update.</param>
            public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, 
                                IJSON newValue)
                => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}.");
        }
    }
}
Java and Android
package com.pushtechnology.diffusion.examples;

import static java.util.Objects.requireNonNull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.fasterxml.jackson.dataformat.cbor.CBORGenerator;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.Registration;
import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalContextCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateContextCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionClosedException;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;

/**
 * This example shows a control client creating a JSON topic and sending updates
 * to it.
 * <P>
 * There will be a topic for each currency for which rates are provided. The
 * topic will be created under the FX topic - so, for example FX/GBP will
 * contain a map of all rate conversions from the base GBP currency. The rates
 * are represented as string decimal values (e.g. "12.457").
 * <P>
 * The {@code addRates} method shows how to create a new rates topic, specifying
 * its initial map of values.
 * <P>
 * The {@code changeRates} method which takes a map shows how to completely
 * replace the set of rates for a currency with a new map of rates.
 * <P>
 * The {@code changeRates} method which takes a string shows an alternative
 * mechanism where the new rates are simply supplied as a JSON string.
 * <P>
 * Either of the changeRates methods could be used and after the first usage for
 * any topic the values is cached, and so subsequent set calls can compare with
 * the last value and send only the differences to the server.
 *
 * @author Push Technology Limited
 * @since 5.7
 * @see ClientConsumingJSONTopics
 */
public final class ControlClientUpdatingJSONTopics {

    private static final String ROOT_TOPIC = "FX";

    private final Session session;
    private final TopicControl topicControl;
    private volatile TopicUpdateControl.ValueUpdater<JSON> valueUpdater;
    private volatile Registration updateSourceRegistration;
    private final CBORFactory cborFactory = new CBORFactory();
    private final JSONDataType jsonDataType = Diffusion.dataTypes().json();

    /**
     * Constructor.
     *
     * @param serverUrl for example "ws://diffusion.example.com:80"
     */
    public ControlClientUpdatingJSONTopics(String serverUrl) {

        cborFactory.setCodec(new ObjectMapper());

        session =
            Diffusion.sessions().principal("control").password("password")
                .open(serverUrl);

        topicControl = session.feature(TopicControl.class);

        // Register as an updater for all topics under the root and request
        // that all topics created are removed when the session closes
        session.feature(TopicUpdateControl.class).registerUpdateSource(
            ROOT_TOPIC,
            new UpdateSource.Default() {
                @Override
                public void onRegistered(
                    String topicPath,
                    Registration registration) {
                    updateSourceRegistration = registration;
                }

                @Override
                public void onActive(String topicPath, Updater updater) {
                    topicControl.removeTopicsWithSession(
                        ROOT_TOPIC,
                        new TopicTreeHandler.Default());
                    valueUpdater = updater.valueUpdater(JSON.class);
                }

                @Override
                public void onClose(String topicPath) {
                    session.close();
                }
            });

    }

    /**
     * Add a new rates topic.
     *
     * @param currency the base currency
     * @param values the full map of initial rates values
     * @param callback reports outcome
     * @throws IOException if unable to convert rates map
     */
    public void addRates(
        String currency,
        Map<String, String> values,
        AddContextCallback<String> callback) throws IOException {

        topicControl.addTopic(
            rateTopicName(currency),
            TopicType.JSON,
            mapToJSON(values),
            currency,
            callback);
    }

    /**
     * Update an existing rates topic, replacing the rates mappings with a new
     * set of mappings.
     *
     * @param currency the base currency
     * @param values the new rates values
     * @param callback reports outcome
     * @throws IOException if unable to convert rates map
     */
    public void changeRates(
        String currency,
        Map<String, String> values,
        UpdateContextCallback<String> callback) throws IOException {

        if (valueUpdater == null) {
            throw new IllegalStateException("Not registered as updater");
        }

        valueUpdater.update(
            rateTopicName(currency),
            mapToJSON(values),
            currency,
            callback);
    }

    /**
     * Update an existing rates topic, replacing the rates mappings with a new
     * set of mappings specified as a JSON string, for example
     * {"USD":"123.45","HKD":"456.3"}.
     *
     * @param currency the base currency
     * @param jsonString a JSON string specifying the map of currency rates
     * @param callback reports the outcome
     * @throws IOException if unable to convert string
     */
    public void changeRates(
        String currency,
        String jsonString,
        UpdateContextCallback<String> callback) throws SessionClosedException,
        IllegalArgumentException, IOException {

        if (valueUpdater == null) {
            throw new IllegalStateException("Not registered as updater");
        }

        valueUpdater.update(
            rateTopicName(currency),
            jsonDataType.fromJsonString(jsonString),
            currency,
            callback);

    }

    /**
     * Convert a given map to a JSON object.
     */
    private JSON mapToJSON(Map<String, String> values) throws IOException {
        // Use the third-party Jackson library to write out the values map as a
        // CBOR-format binary.
        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
        final CBORGenerator generator = cborFactory.createGenerator(baos);
        generator.writeObject(values);
        return jsonDataType.readValue(baos.toByteArray());
    }

    /**
     * Remove a rates entry (removes its topic) and clear cached value for the
     * topic.
     *
     * @param currency the currency
     *
     * @param callback reports the outcome
     */
    public void removeRates(
        String currency,
        RemovalContextCallback<String> callback) {

        final String topicName = rateTopicName(currency);

        if (valueUpdater != null) {
            valueUpdater.removeCachedValues(topicName);
        }

        topicControl.remove(topicName, currency, callback);
    }

    /**
     * Close the session.
     */
    public void close() {
        updateSourceRegistration.close();
    }

    /**
     * Generate a hierarchical topic name for a rates topic.
     * <P>
     * e.g. for currency=GBP would return "FX/GBP".
     *
     * @param currency the currency
     * @return the topic name
     */
    private static String rateTopicName(String currency) {
        return String.format("%s/%s", ROOT_TOPIC, requireNonNull(currency));
    }

}
                    

Change the URL from that provided in the example to the URL of the Diffusion™ server.