Example: Subscribe to a time series
The following example uses Diffusion™ API to subscribe to a time series topic.
This example demonstrates subscribing to a time series topic at foo/timeseries.
.NET
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; using System; using PushTechnology.ClientInterface.Client.Features.TimeSeries; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation which subscribes to a string time series topic and /// consumes the data it receives. /// </summary> public sealed class TimeSeriesTopics { public TimeSeriesTopics(string serverUrl) { string TOPIC_PREFIX = "time-series"; var session = Diffusion.Sessions.Principal("client").Password("password").Open(serverUrl); var typeName = Diffusion.DataTypes.Get<String>().TypeName; var topic = $"?{TOPIC_PREFIX}/{typeName}//"; // Add a value stream var stringStream = new StringStream(); session.Topics.AddTimeSeriesStream<string>(topic, stringStream); // Subscribe to the topic. try { await session.Topics.SubscribeAsync(topic); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. session.Topics.RemoveStream(stringStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for time series string topics. /// </summary> private sealed class StringStream : IValueStream<IEvent<string>> { /// <summary> /// Notification of the stream being closed normally. /// </summary> public void OnClose() => WriteLine( "The subscription stream is now closed." ); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError( ErrorReason errorReason ) => WriteLine( $"An error has occured : {errorReason}." ); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription( string topicPath, ITopicSpecification specification ) => WriteLine( $"Client subscribed to {topicPath}." ); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason ) => WriteLine( $"Client unsubscribed from {topicPath} : {reason}." ); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue( string topicPath, ITopicSpecification specification, IEvent<string> oldValue, IEvent<string> newValue ) => WriteLine( $"New value of {topicPath} is {newValue}." ); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2017 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.diffusion.examples; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.TimeSeries; import com.pushtechnology.diffusion.client.features.TimeSeries.Event; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.ValueStream; import com.pushtechnology.diffusion.client.session.Session; /** * This demonstrates a client session subscribing to a * {@link TimeSeries} topic. * * @author Push Technology Limited * @since 6.0 * @see ControlClientUpdatingTimeSeriesTopics * @see TimeSeriesQueryExample */ public class ClientConsumingTimeSeriesTopics { private static final String TOPIC_PATH = "foo/timeseries"; private Session session; /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" * @param valueStream value stream to receive time series topic events */ public ClientConsumingTimeSeriesTopics(String serverUrl, ValueStream<Event<Long>> valueStream) throws InterruptedException, ExecutionException, TimeoutException { session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); final Topics topics = session.feature(Topics.class); topics.addTimeSeriesStream(TOPIC_PATH, Long.class, valueStream); topics.subscribe(TOPIC_PATH).get(5, TimeUnit.SECONDS); } /** * Close the session. */ public void close() { session.close(); } }