Just a second...

Example: Subscribe to a time series

The following example uses Diffusion™ API to subscribe to a time series topic.

This example demonstrates subscribing to a time series topic at foo/timeseries.

JavaScript
const stream =  session.addStream('topic/timeseries', diffusion.datatypes.timeseries(diffusion.datatypes.int64()));
session.select('topic/timeseries');

stream.on('value', (topic, spec, event) => {
    console.log(`received update ${event.value}: sequence=${event.sequence} timestamp=${event.timestamp}`);
});
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;
using System;
using PushTechnology.ClientInterface.Client.Features.TimeSeries;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation which subscribes to a string time series topic and 
    /// consumes the data it receives.
    /// </summary>
    public sealed class TimeSeriesTopics {

        public async Task TimeSeriesTopicsExample(string serverUrl) {
            string TOPIC_PREFIX = "time-series";

            var session = Diffusion.Sessions.Principal("client").Password("password").Open(serverUrl);

            var typeName = Diffusion.DataTypes.Get<String>().TypeName;
            var topic = $"?{TOPIC_PREFIX}/{typeName}//";

            // Add a value stream
            var stringStream = new StringStream();
            session.Topics.AddTimeSeriesStream<string>(topic, stringStream);

            // Subscribe to the topic.
            try
            {
                await session.Topics.SubscribeAsync(topic);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
            }
            finally
            {
                // Note that closing the session, will automatically unsubscribe from all topics 
                // the client is subscribed to.
                session.Topics.RemoveStream(stringStream);
                session.Close();
            }
        }

        /// <summary>
        /// Basic implementation of the IValueStream for time series string topics.
        /// </summary>
        private sealed class StringStream : IValueStream<IEvent<string>> {
            /// <summary>
            /// Notification of the stream being closed normally.
            /// </summary>
            public void OnClose()
                => WriteLine( "The subscription stream is now closed." );

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason.</param>
            public void OnError( ErrorReason errorReason )
                => WriteLine( $"An error has occured : {errorReason}." );

            /// <summary>
            /// Notification of a successful subscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            public void OnSubscription( string topicPath, ITopicSpecification specification )
                => WriteLine( $"Client subscribed to {topicPath}." );

            /// <summary>
            /// Notification of a successful unsubscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="reason">Error reason.</param>
            public void OnUnsubscription( string topicPath, ITopicSpecification specification, 
                                          TopicUnsubscribeReason reason )
                => WriteLine( $"Client unsubscribed from {topicPath} : {reason}." );

            /// <summary>
            /// Topic update received.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="oldValue">Value prior to update.</param>
            /// <param name="newValue">Value after update.</param>
            public void OnValue( string topicPath, ITopicSpecification specification, 
                                 IEvent<string> oldValue, IEvent<string> newValue )
                => WriteLine( $"New value of {topicPath} is {newValue}." );
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2017, 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TimeSeries.Event;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;

/**
 * This demonstrates a client session subscribing to a
 * {@link TimeSeries} topic.
 *
 * @author DiffusionData Limited
 */
public class ClientConsumingTimeSeriesTopics {

    private static final String TOPIC_PATH = "foo/timeseries";
    private final Session session;

    /**
     * Constructor.
     *
     * @param serverUrl for example "ws://diffusion.example.com:80"
     * @param valueStream value stream to receive time series topic events
     */
    public ClientConsumingTimeSeriesTopics(String serverUrl, ValueStream<Event<Long>> valueStream)
        throws InterruptedException, ExecutionException, TimeoutException {
        session = Diffusion.sessions().principal("client").password("password")
            .open(serverUrl);

        final Topics topics = session.feature(Topics.class);
        topics.addTimeSeriesStream(TOPIC_PATH, Long.class, valueStream);
        topics.subscribe(TOPIC_PATH).get(5, TimeUnit.SECONDS);
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_subscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        void *context)
{
        // subscribed to `topic_path`
        return HANDLER_SUCCESS;
}


static int on_unsubscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        // unsubscribed from `topic_path`
        return HANDLER_SUCCESS;
}


static int on_value(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        const DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        DIFFUSION_TIME_SERIES_EVENT_T *new_event;
        read_diffusion_time_series_event(new_value, &new_event, NULL);

        DIFFUSION_VALUE_T *new_event_value = diffusion_time_series_event_get_value(new_event);

        DIFFUSION_API_ERROR api_error;
        char *result;
        bool success = to_diffusion_json_string(new_event_value, &result, &api_error);

        if(success) {
                printf("Received value: %s\n", result);
                free(result);
        }
        else {
                const char *description = get_diffusion_api_error_description(api_error);
                printf("Error during diffusion value read: %s\n", description);
                diffusion_api_error_free(api_error);
        }

        diffusion_value_free(new_event_value);
        diffusion_time_series_event_free(new_event);

        return HANDLER_SUCCESS;
}


static void on_close()
{
        // value stream is now closed
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // Create a value stream
        VALUE_STREAM_T value_stream = {
                .datatype = DATATYPE_JSON,
                .on_subscription = on_subscription,
                .on_unsubscription = on_unsubscription,
                .on_value = on_value,
                .on_close = on_close
        };
        add_time_series_stream(session, topic_path, &value_stream);

        // Subscribe to topics matching the selector
        SUBSCRIPTION_PARAMS_T params = {
                .topic_selector = topic_path
        };
        subscribe(session, params);

        // Sleep for a while
        sleep(5);

        // Unsubscribe from topics matching the selector
        UNSUBSCRIPTION_PARAMS_T unsub_params = {
                .topic_selector = topic_path
        };
        unsubscribe(session, unsub_params);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        return EXIT_SUCCESS;
}
Python
#  Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved.
#
#  Use is subject to licence terms.
#
#  NOTICE: All information contained herein is, and remains the
#  property of DiffusionData. The intellectual and technical
#  concepts contained herein are proprietary to DiffusionData and
#  may be covered by U.S. and Foreign Patents, patents in process, and
#  are protected by trade secret or copyright law.
import asyncio
import typing

import diffusion.datatypes

from diffusion.datatypes.timeseries import Event
from diffusion.features.timeseries import TimeSeries
from diffusion.features.topics import ValueStreamHandler
from diffusion.features.topics.details.topic_specification import TopicSpecification
from diffusion.internal.services.topics import UnsubscribeReason
from diffusion.internal.session.exception_handler import ErrorReason


# Diffusion server connection information;
# adjust as needed for the server used in practice.
server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")

TOPIC_PREFIX = "time-series"


async def main():
    """
    Client implementation which subscribes to a string time series topic and
    consumes the data it receives.
    """
    # Creating the session.
    async with diffusion.Session(
        url=server_url, principal=principal, credentials=credentials
    ) as session:
        # Create a string topic
        topic_type = diffusion.datatypes.STRING
        topic = f"?{TOPIC_PREFIX}/{topic_type.type_name}//"

        # Add a value stream
        time_series_string_stream = TimeSeriesStringStream()
        session.topics.add_value_stream(topic, time_series_string_stream)

        # Subscribe to the topic.
        try:
            await session.topics.subscribe(topic)
            await asyncio.sleep(0.3)
        except Exception as ex:
            print(f"Failed to subscribe to topic '{topic}' : {ex}.")


# noinspection PyUnusedLocal
class TimeSeriesStringStream(ValueStreamHandler):
    """
    Basic implementation of the ValueStreamHandler for time series string topics.
    """
    def __init__(self):
        super().__init__(
            TimeSeries.of(diffusion.datatypes.STRING),
            update=self.update,
            subscribe=self.subscribe,
            unsubscribe=self.unsubscribe,
            error=self.error,
            close=self.close,
        )

    async def close(self):
        """
        Notification of the stream being closed normally.
        """
        print("The subscrption stream is now closed.")

    async def error(self, error_reason: ErrorReason):
        """
        Notification of a contextual error related to this callback.

        Situations in which <code>OnError</code> is called include
        the session being closed, a communication
        timeout, or a problem with the provided parameters.
        No further calls will be made to this callback.

        Args:
            error_reason: Error reason.
        """
        print(f"An error has occured : {error_reason}.")

    async def subscribe(
        self,
        *,
        topic_path: str,
        topic_spec: TopicSpecification,
        topic_value: typing.Optional[Event[diffusion.datatypes.STRING]] = None,
    ):
        """
        Notification of a successful subscription.

        Args:
            topic_path: Topic path.
            topic_spec: Topic specification.
            topic_value: Topic value.
        """
        print(f"Client subscribed to {topic_path}.")

    async def unsubscribe(
        self,
        *,
        topic_path: str,
        topic_spec: TopicSpecification,
        reason: typing.Optional[UnsubscribeReason] = None,
    ):
        """
        Args:
            topic_path: Topic path.
            topic_spec: Topic specification.
            reason: error reason.
        """
        print(f"Client unsubscribed from {topic_path} : {reason}.")

    async def update(
        self,
        *,
        topic_path: str,
        topic_spec: TopicSpecification,
        old_value: Event[diffusion.datatypes.STRING],
        topic_value: Event[diffusion.datatypes.STRING],
    ):
        """
        Topic update received.

        Args:
            topic_path: Topic path.
            topic_spec: Topic specification.
            old_value: Value prior to update.
            topic_value: Value after update.
        """
        print(f"New value of {topic_path} is {topic_value}.")
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SubscribeTimeSeries {

    class JSONTimeSeriesValueStreamHandler: PTDiffusionJSONTimeSeriesEventValueStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionValueStream,
                             didUpdateTimeSeriesTopicPath topicPath: String,
                             specification: PTDiffusionTopicSpecification,
                             oldJSONEvent oldJsonEvent: PTDiffusionJSONTimeSeriesEvent?,
                             newJSONEvent newJsonEvent: PTDiffusionJSONTimeSeriesEvent) {
            print("Topic '%@' has been updated: %@ -> %@", topicPath, oldJsonEvent ?? "NULL", newJsonEvent)
        }

        func diffusionStream(_ stream: PTDiffusionStream,
                             didSubscribeToTopicPath topicPath: String,
                             specification: PTDiffusionTopicSpecification) {
            print("You have been subscribed to '%@'", topicPath)
        }

        func diffusionStream(_ stream: PTDiffusionStream,
                             didUnsubscribeFromTopicPath topicPath: String,
                             specification: PTDiffusionTopicSpecification,
                             reason: PTDiffusionTopicUnsubscriptionReason) {
            print("You have been unsubscribed from '%@'", topicPath)
        }

        func diffusionStream(_ stream: PTDiffusionStream,
                             didFailWithError error: Error) {
            print("Value stream failed with error: %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Value stream is now closed")
        }
    }


    func run_example(url: URL, topic_path: String) {

        let credentials = PTDiffusionCredentials(password: "password")
        let configuration = PTDiffusionSessionConfiguration(principal: "control",
                                                            credentials: credentials)

        // establish the session
        PTDiffusionSession.open(with: url,
                                configuration: configuration) { (session, error) in
            if (error != nil) {
                print("An error has occurred while establishing a session: %@",
                      error!.localizedDescription)
            }
            else {
                // create a handler to receive time series topic updates
                let handler = JSONTimeSeriesValueStreamHandler()
                let value_stream = PTDiffusionJSON.timeSeriesEventValueStream(with: handler);

                // link handler to `topic_path`
                try! session!.topics.add(value_stream, withSelectorExpression: topic_path, error: ())

                // subscribe to `topic_path`
                session!.topics.subscribe(withTopicSelectorExpression: topic_path) { error in

                    if (error != nil) {

                        print("An error has occurred while attempting to subscribe to '%@: %@",
                              topic_path,
                              error!.localizedDescription)
                    }
                    else {

                        print("Successfully subscribed to '%@'", topic_path)
                    }
                }
            }
        }
    }
}