Just a second...

Example: Subscribe to a time series

The following example uses Diffusion™ API to subscribe to a time series topic.

This example demonstrates subscribing to a time series topic at foo/timeseries.

.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;
using System;
using PushTechnology.ClientInterface.Client.Features.TimeSeries;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation which subscribes to a string time series topic and 
    /// consumes the data it receives.
    /// </summary>
    public sealed class TimeSeriesTopics {

        public TimeSeriesTopics(string serverUrl) {
            string TOPIC_PREFIX = "time-series";

            var session = Diffusion.Sessions.Principal("client").Password("password").Open(serverUrl);

            var typeName = Diffusion.DataTypes.Get<String>().TypeName;
            var topic = $"?{TOPIC_PREFIX}/{typeName}//";

            // Add a value stream
            var stringStream = new StringStream();
            session.Topics.AddTimeSeriesStream<string>(topic, stringStream);

            // Subscribe to the topic.
            try
            {
                await session.Topics.SubscribeAsync(topic);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
            }
            finally
            {
                // Note that closing the session, will automatically unsubscribe from all topics 
                // the client is subscribed to.
                session.Topics.RemoveStream(stringStream);
                session.Close();
            }
        }

        /// <summary>
        /// Basic implementation of the IValueStream for time series string topics.
        /// </summary>
        private sealed class StringStream : IValueStream<IEvent<string>> {
            /// <summary>
            /// Notification of the stream being closed normally.
            /// </summary>
            public void OnClose()
                => WriteLine( "The subscription stream is now closed." );

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason.</param>
            public void OnError( ErrorReason errorReason )
                => WriteLine( $"An error has occured : {errorReason}." );

            /// <summary>
            /// Notification of a successful subscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            public void OnSubscription( string topicPath, ITopicSpecification specification )
                => WriteLine( $"Client subscribed to {topicPath}." );

            /// <summary>
            /// Notification of a successful unsubscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="reason">Error reason.</param>
            public void OnUnsubscription( string topicPath, ITopicSpecification specification, 
                                          TopicUnsubscribeReason reason )
                => WriteLine( $"Client unsubscribed from {topicPath} : {reason}." );

            /// <summary>
            /// Topic update received.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="oldValue">Value prior to update.</param>
            /// <param name="newValue">Value after update.</param>
            public void OnValue( string topicPath, ITopicSpecification specification, 
                                 IEvent<string> oldValue, IEvent<string> newValue )
                => WriteLine( $"New value of {topicPath} is {newValue}." );
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2017 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TimeSeries.Event;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;

/**
 * This demonstrates a client session subscribing to a
 * {@link TimeSeries} topic.
 *
 * @author Push Technology Limited
 * @since 6.0
 * @see ControlClientUpdatingTimeSeriesTopics
 * @see TimeSeriesQueryExample
 */
public class ClientConsumingTimeSeriesTopics {

    private static final String TOPIC_PATH = "foo/timeseries";

    private Session session;

    /**
     * Constructor.
     *
     * @param serverUrl for example "ws://diffusion.example.com:80"
     * @param valueStream value stream to receive time series topic events
     */
    public ClientConsumingTimeSeriesTopics(String serverUrl, ValueStream<Event<Long>> valueStream)
        throws InterruptedException, ExecutionException, TimeoutException {
        session = Diffusion.sessions().principal("client").password("password")
            .open(serverUrl);

        final Topics topics = session.feature(Topics.class);
        topics.addTimeSeriesStream(TOPIC_PATH, Long.class, valueStream);
        topics.subscribe(TOPIC_PATH).get(5, TimeUnit.SECONDS);
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }
}