Just a second...

Example: Subscribe to a JSON topic

The following examples subscribe to JSON topics and receive a stream of values from the topics.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {

    // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier
    // to assign these directly to a local variable.
    var jsonDataType = diffusion.datatypes.json();

    // 2. Data Types are currently provided for JSON and Binary topic types.
    session.topics.add('topic/json', diffusion.topics.TopicType.JSON);

    // 3. Values can be created directly from the data type.
    var jsonValue = jsonDataType.from({
        "foo" : "bar"
    });

    // Topics are updated using the standard update mechanisms
    session.topics.update('topic/json', jsonValue);

    // Subscriptions are performed normally
    session.subscribe('topic/json');

    // 4. Streams can be specialised to provide values from a specific datatype.
    session.stream('topic/json').asType(jsonDataType).on('value', function(topic, specification, newValue, oldValue) {
        // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the
        // new value, and the old value.
   
        // The oldValue parameter will be undefined if this is the first value received for a topic.

        // For JSON topics, value#get returns a JavaScript object
        // For Binary topics, value#get returns a Buffer instance
        console.log("Update for " + topic, newValue.get());
    });

    // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. 
    // For example, plain JSON objects can be used to update JSON topics.
    session.topics.update('topic/json', {
         "foo" : "baz",
         "numbers" : [1, 2, 3]
    });
});
Apple
@import Diffusion;

@interface JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate) <PTDiffusionJSONValueStreamDelegate>
@end

/**
 This example demonstrates a client consuming JSON topics.

 It is assumed that under the FX topic there is a JSON topic for each currency
 which contains a map of conversion rates to each target currency. For example,
 FX/GBP could contain {"USD":"123.45","HKD":"456.3"}.
 
 @note For a topic updater compatible with this example, see the following
 in our Java examples: ControlClientUpdatingJSONTopics
 */
@implementation JSONSubscribeExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL *const)url {
    NSLog(@"Connecting...");

    [PTDiffusionSession openWithURL:url
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Register self as the fallback handler for JSON value updates.
        PTDiffusionValueStream *const valueStream =
            [PTDiffusionJSON valueStreamWithDelegate:self];
        [session.topics addFallbackStream:valueStream];

        // Subscribe.
        NSLog(@"Subscribing...");
        [session.topics subscribeWithTopicSelectorExpression:@"?FX/"
                                           completionHandler:^(NSError * const error)
        {
            if (error) {
                NSLog(@"Subscribe request failed. Error: %@", error);
            } else {
                NSLog(@"Subscribe request succeeded.");
            }
        }];
    }];
}

-(NSString *)currencyFromTopicPath:(NSString *const)topicPath {
    // The currency from which we're converting is the last component of the
    // topic path - e.g. topic path "FX/GBP" is currency "GBP".
    return [topicPath lastPathComponent];
}

@end

@implementation JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate)

-(void)     diffusionStream:(PTDiffusionStream *const)stream
    didSubscribeToTopicPath:(NSString *const)topicPath
              specification:(PTDiffusionTopicSpecification *const)specification {
    NSString *const currency = [self currencyFromTopicPath:topicPath];
    NSLog(@"Subscribed: Rates from %@", currency);
}

-(void)diffusionStream:(PTDiffusionValueStream *const)stream
    didUpdateTopicPath:(NSString *const)topicPath
         specification:(PTDiffusionTopicSpecification *const)specification
               oldJSON:(PTDiffusionJSON *const)oldJSON
               newJSON:(PTDiffusionJSON *const)newJSON {
    NSString *const currency = [self currencyFromTopicPath:topicPath];

    // We're assuming that the incoming JSON document is correct as expected,
    // in that the root element is a map of currencies to which we have
    // conversion rates.
    NSError * error;
    NSDictionary *const map = [newJSON objectWithError:&error];
    if (!map) {
        NSLog(@"Failed to create map from received JSON. Error: %@", error);
        return;
    }

    // For the purposes of a meaningful example, only emit a log line if we
    // have a rate for GBP to USD.
    if ([currency isEqualToString:@"GBP"]) {
        const id rate = map[@"USD"];
        if (rate) {
            NSLog(@"Rate for GBP to USD: %@", rate);
        }
    }
}

-(void)         diffusionStream:(PTDiffusionStream *const)stream
    didUnsubscribeFromTopicPath:(NSString *const)topicPath
                  specification:(PTDiffusionTopicSpecification *const)specification
                         reason:(const PTDiffusionTopicUnsubscriptionReason)reason {
    NSString *const currency = [self currencyFromTopicPath:topicPath];
    NSLog(@"Unsubscribed: Rates from %@", currency);
}

@end
Java and Android
package com.pushtechnology.diffusion.examples;

import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.Map;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.fasterxml.jackson.dataformat.cbor.CBORParser;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.json.JSON;

/**
 * This demonstrates a client consuming JSON topics.
 * <P>
 * It is assumed that under the FX topic there is a JSON topic for each currency
 * which contains a map of conversion rates to each target currency. For
 * example, FX/GBP could contain {"USD":"123.45","HKD":"456.3"}.
 * <P>
 * All updates will be notified to a listener.
 *
 * @author Push Technology Limited
 * @since 5.7
 * @see ControlClientUpdatingJSONTopics
 */
public final class ClientConsumingJSONTopics {

    private static final String ROOT_TOPIC = "FX";
    private static final String TOPIC_SELECTOR = String.format("?%s/", ROOT_TOPIC);

    private final RatesListener listener;

    private final Session session;

    /**
     * Constructor.
     *
     * @param serverUrl for example "ws://diffusion.example.com:80
     */
    public ClientConsumingJSONTopics(String serverUrl, RatesListener listener) {

        this.listener = requireNonNull(listener);

        session =
            Diffusion.sessions().principal("client").password("password")
                .open(serverUrl);

        // Use the Topics feature to add a topic stream and subscribe to all
        // topics under the root
        final Topics topics = session.feature(Topics.class);
        topics.addStream(TOPIC_SELECTOR, JSON.class, new RatesStream());
        topics.subscribe(TOPIC_SELECTOR, new Topics.CompletionCallback.Default());
    }

    /**
     * Close session.
     */
    public void close() {
        session.feature(Topics.class).unsubscribe(
            TOPIC_SELECTOR,
            new Topics.CompletionCallback.Default() {
                @Override
                public void onComplete() {
                    session.close();
                }
            });
    }

    private static String pathToCurrency(String path) {
        return path.substring(path.indexOf('/') + 1);
    }

    /**
     * A listener for Rates updates.
     */
    public interface RatesListener {

        /**
         * Notification of a new rate or rate update.
         *
         * @param currency the base currency
         * @param rates map of rates
         */
        void onNewRates(String currency, Map<String, BigDecimal> rates);

        /**
         * Notification of a rate being removed.
         *
         * @param currency the base currency
         */
        void onRatesRemoved(String currency);
    }

    /**
     * The value stream.
     */
    private final class RatesStream extends Topics.ValueStream.Default<JSON> {

        private final CBORFactory factory = new CBORFactory();
        private final ObjectMapper mapper = new ObjectMapper();
        private final TypeReference<Map<String, BigDecimal>> typeReference =
            new TypeReference<Map<String, BigDecimal>>() {
            };

        @Override
        public void onValue(
            String topicPath,
            TopicSpecification specification,
            JSON oldValue,
            JSON newValue) {
            try {
                // Use the third-party Jackson library to parse the newValue's
                // binary representation and convert to a map
                final CBORParser parser =
                    factory.createParser(newValue.asInputStream());
                final Map<String, BigDecimal> map =
                    mapper.readValue(parser, typeReference);
                final String currency = pathToCurrency(topicPath);
                listener.onNewRates(currency, map);
            }
            catch (IOException ex) {
                ex.printStackTrace();
            }
        }

        @Override
        public void onUnsubscription(
            String topicPath,
            TopicSpecification specification,
            UnsubscribeReason reason) {

            final String currency = pathToCurrency(topicPath);
            listener.onRatesRemoved(currency);
        }

    }
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.