Just a second...

Example: Publish a time series

The following example uses the Diffusion™ API to create and update a time series topic.

This example creates a time series topic at foo/timeseries. It demonstrates how to append and edit values.

JavaScript
const TopicSpecification = diffusion.topics.TopicSpecification;
const TopicType = diffusion.topics.TopicType;

// 1. Create a time series topic specification with events
// of type double
const specification = new TopicSpecification(TopicType.TIME_SERIES, {
    TIME_SERIES_EVENT_VALUE_TYPE : 'double'
});

// 2. Create a time series topic
await session.topics.add('topic/timeseries', specification);

// 3. Append a value to the time series topic
session.timeseries.append('topic/timeseries', 1.23, diffusion.datatypes.double());

// 4. Edit the last value in a time series topic.
try {
    // perform a range query to obtain the last value only
    const queryResult = await session.timeseries.rangeQuery()
        .fromLast(1)
        .as(diffusion.datatypes.double())
        .selectFrom('topic/timeseries');

    // edit the event
    await Promise.all(queryResult.events.map((event) =>
        session.timeseries.edit('topic/timeseries', event.sequence, 4.56, diffusion.datatypes.double())
    ));
} catch (err) {
    console.log('Error obtaining the range query.', err);
}

.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that adds a time series topic and updates its value.
    /// </summary>
    public sealed class TimeSeriesPublish {

        public async Task TimeSeriesPublishExample(string serverUrl) {
            string TOPIC_PREFIX = "time-series";

            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open( serverUrl );

            // Create a time series topic with string values.
            var typeName = Diffusion.DataTypes.Get<string>().TypeName;
            var topic = $"{TOPIC_PREFIX}/{typeName}/{DateTime.Now.ToFileTimeUtc()}";
            var specification = session.TopicControl.NewSpecification(TopicType.TIME_SERIES)
                .WithProperty(TopicSpecificationProperty.TimeSeriesEventValueType, typeName);

            // Add a time series topic.
            try
            {
                await session.TopicControl.AddTopicAsync(topic, specification);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{topic}' : {ex}.");
                session.Close();
                return;
            }

            // Append a value to the time series topic using a custom timestamp.
            try
            {
                await session.TimeSeries.AppendAsync<string>(topic, "Value1", 
                    DateTimeOffset.FromUnixTimeMilliseconds(322));
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} value could not be appended : {ex}.");
                session.Close();
                return;
            }

            // Append a value to the time series topic. The timestamp will be set to the current server time.
            try
            {
                await session.TimeSeries.AppendAsync<string>(topic, "Value 1");
                await session.TimeSeries.AppendAsync<string>(topic, "Value 2");
                await session.TimeSeries.AppendAsync<string>(topic, "Value 3");
                await session.TimeSeries.AppendAsync<string>(topic, "Value 4");
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} value could not be appended : {ex}.");
                session.Close();
                return;
            }

            // Edit a value of the time series topic.
            try
            {
                // Edits the time series topic with sequence number 1 and value 'Value1'.
                await session.TimeSeries.EditAsync<string>(topic, 1, "Value 1a");
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} value could not be edited : {ex}.");
                session.Close();
                return;
            }

            // Update the time series topic using the standard topic update method.
            try
            {
                var newValue = "Last Value";

                await session.TopicUpdate.SetAsync(topic, newValue);
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} could not be updated : {ex}.");
                session.Close();
                return;
            }

            // Remove the topic.
            try
            {
                await session.TopicControl.RemoveTopicsAsync( topic );
            } 
            catch(Exception ex) 
            {
                WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
            }

            // Close the session.
            session.Close();
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2017, 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification;
import static com.pushtechnology.diffusion.datatype.DataTypes.INT64_DATATYPE_NAME;

import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TimeSeries.EventMetadata;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * This example shows a control client creating a {@link TimeSeries} topic.
 * Values can be appended to the topic using {@link #appendValue(long)}, and
 * the last value of the topic can be edited using {@link #editLast(long)}.
 * Alternatively, the methods provided by the {@link TopicUpdate} feature can be
 * used.
 *
 * @author DiffusionData Limited
 */
public class ControlClientUpdatingTimeSeriesTopics {

    private static final String TOPIC_PATH = "foo/timeseries";
    private static final Logger LOG =
        LoggerFactory.getLogger(ControlClientUpdatingTimeSeriesTopics.class);

    private final Session session;
    private final TimeSeries timeSeries;
    private final TopicControl topicControl;

    /**
     * Constructor.
     *
     * @param serverUrl server URL to connect to example "ws://diffusion.example.com:80"
     */
    public ControlClientUpdatingTimeSeriesTopics(String serverUrl)
        throws InterruptedException, ExecutionException, TimeoutException {

        session = Diffusion.sessions().principal("control").password("password")
            .open(serverUrl);

        timeSeries = session.feature(TimeSeries.class);
        topicControl = session.feature(TopicControl.class);

        final TopicSpecification spec = newTopicSpecification(TopicType.TIME_SERIES)
            .withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, INT64_DATATYPE_NAME);

        topicControl.addTopic(TOPIC_PATH, spec)
            .thenAccept(result -> LOG.info("Add topic result: {}", result)).get(5, TimeUnit.SECONDS);
    }

    /**
     * Appends a value to the time series topic.
     *
     * @param value value to append
     * @return the event metadata from the successful append
     */
    public EventMetadata appendValue(long value)
        throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
        return timeSeries.append(TOPIC_PATH, Long.class, value).get(5, TimeUnit.SECONDS);
    }

    /**
     * Close the session and remove the time series topic.
     */
    public void close()
        throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
        topicControl.removeTopics("?foo//").get(5, TimeUnit.SECONDS);
        session.close();
    }

    /**
     * Edit the last value in a time series topic.
     *
     * @param value value to edit with
     */
    public void editLast(long value) {
        //Obtain the last value in the time series topic
        timeSeries.rangeQuery().fromLast(1).as(Long.class).selectFrom(TOPIC_PATH)
            .whenComplete((query, ex) -> {
                if (ex != null) {
                    LOG.error("Error obtaining the range query: {}", ex);
                    return;
                }
                //Perform the value edit
                query.stream().forEach(event -> {
                    timeSeries.edit(TOPIC_PATH, event.sequence(), Long.class, value)
                        .whenComplete((metadata, e) -> {
                            if (e != null) {
                                LOG.error("Error editing topic: {}", e);
                                return;
                            }
                            LOG.info("EventMetadata from edit: {}", metadata);
                        });
                });
            });
    }

    /**
     * Appends a value to the time series topic. Allows for creation of events
     * with a custom timestamp. This can be used for loading historic or future
     * values.
     *
     * @param value value to append
     * @param timestamp the user supplied timestamp
     * @return the event metadata from the successful append
     */
    public EventMetadata appendValue(long value, Instant timestamp)
        throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
        return timeSeries.append(TOPIC_PATH, Long.class, value, timestamp).get(5, TimeUnit.SECONDS);
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_topic_added_with_specification(
        SESSION_T *session,
        TOPIC_ADD_RESULT_CODE result_code,
        void *context)
{
        // topic has been added
        return HANDLER_SUCCESS;
}


static int on_append(
        const DIFFUSION_TIME_SERIES_EVENT_METADATA_T *event_metadata,
        void *context)
{
        // value has been appended to the topic
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // create a time series topic
        ADD_TOPIC_CALLBACK_T add_topic_callback = {
                .on_topic_added_with_specification = on_topic_added_with_specification,
        };

        HASH_T *properties = hash_new(1);
        hash_add(properties, strdup(DIFFUSION_TIME_SERIES_EVENT_VALUE_TYPE), strdup("json"));

        TOPIC_SPECIFICATION_T *specification =
                topic_specification_init_with_properties(TOPIC_TYPE_TIME_SERIES, properties);
        hash_free(properties, free, free);

        add_topic_from_specification(session, topic_path, specification, add_topic_callback);
        topic_specification_free(specification);

        // append value to time series topic
        BUF_T *value_buf = buf_create();
        write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);

        DIFFUSION_TIME_SERIES_APPEND_PARAMS_T params = {
                .on_append = on_append,
                .topic_path = topic_path,
                .datatype = DATATYPE_JSON,
                .value = value_buf
        };
        diffusion_time_series_append(session, params, NULL);
        buf_free(value_buf);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        return EXIT_SUCCESS;
}
Python
#  Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved.
#
#  Use is subject to licence terms.
#
#  NOTICE: All information contained herein is, and remains the
#  property of DiffusionData. The intellectual and technical
#  concepts contained herein are proprietary to DiffusionData and
#  may be covered by U.S. and Foreign Patents, patents in process, and
#  are protected by trade secret or copyright law.


# Client implementation that adds a time series topic and updates its value.
import datetime
import asyncio
import diffusion
import diffusion.datatypes


# Diffusion server connection information;
# adjust as needed for the server used in practice.
from diffusion.features.timeseries import TimeSeries

server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")

TOPIC_PREFIX = "time-series"


async def main():
    # Creating the session.
    async with diffusion.Session(
        url=server_url, principal=principal, credentials=credentials
    ) as session:
        # Create a time series topic with string values.
        topic_type = diffusion.datatypes.STRING
        topic = f"{TOPIC_PREFIX}/{topic_type.type_name}/{datetime.datetime.utcnow()}"
        specification = TimeSeries.of(topic_type).with_properties()

        # Add a time series topic.
        try:
            await session.topics.add_topic(topic, specification)
        except Exception as ex:
            print(f"Failed to add topic '{topic}' : {ex}.")
            await session.close()
            return

        # Append a value to the time series topic using a custom timestamp.
        try:
            user_supplied_timestamp = datetime.datetime.utcnow() + datetime.timedelta(
                milliseconds=322
            )
            await session.time_series.append(
                topic,
                "Value1",
                value_type=diffusion.datatypes.STRING,
                timestamp=user_supplied_timestamp,
            )
        except Exception as ex:
            print(f"Topic {topic} value could not be appended : {ex}.")
            return

        # Append a value to the time series topic.
        # The timestamp will be set to the current server time.
        try:
            await session.time_series.append(
                topic, "Value 1", diffusion.datatypes.STRING
            )
            await session.time_series.append(
                topic, "Value 2", diffusion.datatypes.STRING
            )
            await session.time_series.append(
                topic, "Value 3", diffusion.datatypes.STRING
            )
            await session.time_series.append(
                topic, "Value 4", diffusion.datatypes.STRING
            )
        except Exception as ex:
            print(f"Topic {topic} value could not be appended : {ex}.")
            await session.close()
            return

        # Edit a value of the time series topic.
        try:
            #  Edits the time series topic with sequence number 1 and value 'Value1'.
            await session.time_series.edit(topic, 1, "Value 1a", diffusion.datatypes.STRING)
        except Exception as ex:
            print(f"Topic {topic} value could not be edited : {ex}.")
            return

        # Update the time series topic using the standard topic update method.
        try:
            new_value = "Last Value"

            await session.topics.set_topic(topic, new_value, diffusion.datatypes.STRING)
        except Exception as ex:
            print(f"Topic {topic} could not be updated : {ex}.")
            await session.close()
            return

        # Remove the topic.
        try:
            await session.topics.remove_topic(topic)
        except Exception as ex:
            print(f"Failed to remove topic '{topic}' : {ex}.")


if __name__ == "__main__":
    asyncio.run(main())
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class PublishTimeSeries {

    func run_example(url: URL, topic_path: String) {

        let credentials = PTDiffusionCredentials(password: "password")
        let configuration = PTDiffusionSessionConfiguration(principal: "control",
                                                            credentials: credentials)

        // establish the session
        PTDiffusionSession.open(with: url,
                                configuration: configuration) { (session, error) in
            if (error != nil) {
                print("An error has occurred while establishing a session: %@",
                      error!.localizedDescription)
            }
            else {

                // create a time series JSON topic
                session!.topicControl.addTopic(withPath: topic_path,
                                               type: PTDiffusionTopicType.JSON) { (result, error) in
                    if (error != nil) {
                        print("An error has occurred while adding topic '%@': %@",
                              topic_path,
                              error!.localizedDescription)
                    }
                    else if (result == PTDiffusionAddTopicResult.exists()) {

                        print("Topic '%@' already existed", topic_path)
                    }
                    else {

                        print("Topic '%@' was created", topic_path)
                    }

                    // append a new value to the time series JSON topic
                    let json_value = try! PTDiffusionJSON(jsonString: "{\"hello\"}: {\"world\"}")
                    session!.timeSeries.append(toTopicPath: topic_path, jsonValue: json_value) { (metadata, error) in
                        if (error != nil) {
                            print("An error has occurred while appending to the time series JSON topic: %@",
                                  error!.localizedDescription);
                        }
                        else {
                            print("JSON value has been successfully appended to the time series topic")
                        }
                    }
                }
            }
        }
    }
}