Example: Subscribe to a JSON topic
The following examples subscribe to JSON topics and receive a stream of values from the topics.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier // to assign these directly to a local variable. var jsonDataType = diffusion.datatypes.json(); // 2. Data Types are currently provided for JSON and Binary topic types. session.topics.add('topic/json', diffusion.topics.TopicType.JSON); // 3. Values can be created directly from the data type. var jsonValue = jsonDataType.from({ "foo" : "bar" }); // Topics are updated using the standard update mechanisms session.topics.update('topic/json', jsonValue); // Subscriptions are performed normally session.subscribe('topic/json'); // 4. Streams can be specialised to provide values from a specific datatype. session.stream('topic/json').asType(jsonDataType).on('value', function(topic, specification, newValue, oldValue) { // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the // new value, and the old value. // The oldValue parameter will be undefined if this is the first value received for a topic. // For JSON topics, value#get returns a JavaScript object // For Binary topics, value#get returns a Buffer instance console.log("Update for " + topic, newValue.get()); }); // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. // For example, plain JSON objects can be used to update JSON topics. session.topics.update('topic/json', { "foo" : "baz", "numbers" : [1, 2, 3] }); });
Apple
@import Diffusion; @interface JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate) <PTDiffusionJSONValueStreamDelegate> @end /** This example demonstrates a client consuming JSON topics. It is assumed that under the FX topic there is a JSON topic for each currency which contains a map of conversion rates to each target currency. For example, FX/GBP could contain {"USD":"123.45","HKD":"456.3"}. @note For a topic updater compatible with this example, see the following in our Java examples: ControlClientUpdatingJSONTopics */ @implementation JSONSubscribeExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL *const)url { NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Register self as the fallback handler for JSON value updates. PTDiffusionValueStream *const valueStream = [PTDiffusionJSON valueStreamWithDelegate:self]; [session.topics addFallbackStream:valueStream]; // Subscribe. NSLog(@"Subscribing..."); [session.topics subscribeWithTopicSelectorExpression:@"?FX/" completionHandler:^(NSError * const error) { if (error) { NSLog(@"Subscribe request failed. Error: %@", error); } else { NSLog(@"Subscribe request succeeded."); } }]; }]; } -(NSString *)currencyFromTopicPath:(NSString *const)topicPath { // The currency from which we're converting is the last component of the // topic path - e.g. topic path "FX/GBP" is currency "GBP". return [topicPath lastPathComponent]; } @end @implementation JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate) -(void) diffusionStream:(PTDiffusionStream *const)stream didSubscribeToTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification { NSString *const currency = [self currencyFromTopicPath:topicPath]; NSLog(@"Subscribed: Rates from %@", currency); } -(void)diffusionStream:(PTDiffusionValueStream *const)stream didUpdateTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification oldJSON:(PTDiffusionJSON *const)oldJSON newJSON:(PTDiffusionJSON *const)newJSON { NSString *const currency = [self currencyFromTopicPath:topicPath]; // We're assuming that the incoming JSON document is correct as expected, // in that the root element is a map of currencies to which we have // conversion rates. NSError * error; NSDictionary *const map = [newJSON objectWithError:&error]; if (!map) { NSLog(@"Failed to create map from received JSON. Error: %@", error); return; } // For the purposes of a meaningful example, only emit a log line if we // have a rate for GBP to USD. if ([currency isEqualToString:@"GBP"]) { const id rate = map[@"USD"]; if (rate) { NSLog(@"Rate for GBP to USD: %@", rate); } } } -(void) diffusionStream:(PTDiffusionStream *const)stream didUnsubscribeFromTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification reason:(const PTDiffusionTopicUnsubscriptionReason)reason { NSString *const currency = [self currencyFromTopicPath:topicPath]; NSLog(@"Unsubscribed: Rates from %@", currency); } @end
Java
and Android
package com.pushtechnology.diffusion.examples; import static java.util.Objects.requireNonNull; import java.io.IOException; import java.math.BigDecimal; import java.util.Map; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import com.fasterxml.jackson.dataformat.cbor.CBORParser; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.datatype.json.JSON; /** * This demonstrates a client consuming JSON topics. * <P> * It is assumed that under the FX topic there is a JSON topic for each currency * which contains a map of conversion rates to each target currency. For * example, FX/GBP could contain {"USD":"123.45","HKD":"456.3"}. * <P> * All updates will be notified to a listener. * * @author Push Technology Limited * @since 5.7 * @see ControlClientUpdatingJSONTopics */ public final class ClientConsumingJSONTopics { private static final String ROOT_TOPIC = "FX"; private static final String TOPIC_SELECTOR = String.format("?%s/", ROOT_TOPIC); private final RatesListener listener; private final Session session; /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80 */ public ClientConsumingJSONTopics(String serverUrl, RatesListener listener) { this.listener = requireNonNull(listener); session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); // Use the Topics feature to add a topic stream and subscribe to all // topics under the root final Topics topics = session.feature(Topics.class); topics.addStream(TOPIC_SELECTOR, JSON.class, new RatesStream()); topics.subscribe(TOPIC_SELECTOR, new Topics.CompletionCallback.Default()); } /** * Close session. */ public void close() { session.feature(Topics.class).unsubscribe( TOPIC_SELECTOR, new Topics.CompletionCallback.Default() { @Override public void onComplete() { session.close(); } }); } private static String pathToCurrency(String path) { return path.substring(path.indexOf('/') + 1); } /** * A listener for Rates updates. */ public interface RatesListener { /** * Notification of a new rate or rate update. * * @param currency the base currency * @param rates map of rates */ void onNewRates(String currency, Map<String, BigDecimal> rates); /** * Notification of a rate being removed. * * @param currency the base currency */ void onRatesRemoved(String currency); } /** * The value stream. */ private final class RatesStream extends Topics.ValueStream.Default<JSON> { private final CBORFactory factory = new CBORFactory(); private final ObjectMapper mapper = new ObjectMapper(); private final TypeReference<Map<String, BigDecimal>> typeReference = new TypeReference<Map<String, BigDecimal>>() { }; @Override public void onValue( String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) { try { // Use the third-party Jackson library to parse the newValue's // binary representation and convert to a map final CBORParser parser = factory.createParser(newValue.asInputStream()); final Map<String, BigDecimal> map = mapper.readValue(parser, typeReference); final String currency = pathToCurrency(topicPath); listener.onNewRates(currency, map); } catch (IOException ex) { ex.printStackTrace(); } } @Override public void onUnsubscription( String topicPath, TopicSpecification specification, UnsubscribeReason reason) { final String currency = pathToCurrency(topicPath); listener.onRatesRemoved(currency); } } }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2016/04/05