Example: Publish a time series
The following example uses the Diffusion™ API to create and update a time series topic.
This example creates a time series topic at foo/timeseries. It demonstrates how to append and edit values.
.NET
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds a time series topic and updates its value. /// </summary> public sealed class TimeSeriesPublish { public TimeSeriesPublish(string serverUrl) { string TOPIC_PREFIX = "time-series"; var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open( serverUrl ); // Create a time series topic with string values. var typeName = Diffusion.DataTypes.Get<string>().TypeName; var topic = $"{TOPIC_PREFIX}/{typeName}/{DateTime.Now.ToFileTimeUtc()}"; var specification = session.TopicControl.NewSpecification(TopicType.TIME_SERIES) .WithProperty(TopicSpecificationProperty.TimeSeriesEventValueType, typeName); // Add a time series topic. try { await session.TopicControl.AddTopicAsync(topic, specification); } catch (Exception ex) { WriteLine($"Failed to add topic '{topic}' : {ex}."); session.Close(); return; } // Append a value to the time series topic using a custom timestamp. try { await session.TimeSeries.AppendAsync<string>(topic, "Value1", DateTimeOffset.FromUnixTimeMilliseconds(322)); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be appended : {ex}."); session.Close(); return; } // Append a value to the time series topic. The timestamp will be set to the current server time. try { await session.TimeSeries.AppendAsync<string>(topic, "Value 1"); await session.TimeSeries.AppendAsync<string>(topic, "Value 2"); await session.TimeSeries.AppendAsync<string>(topic, "Value 3"); await session.TimeSeries.AppendAsync<string>(topic, "Value 4"); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be appended : {ex}."); session.Close(); return; } // Edit a value of the time series topic. try { // Edits the time series topic with sequence number 1 and value 'Value1'. await session.TimeSeries.EditAsync<string>(topic, 1, "Value 1a"); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be edited : {ex}."); session.Close(); return; } // Update the time series topic using the standard topic update method. try { var newValue = "Last Value"; await session.TopicUpdate.SetAsync(topic, newValue); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); session.Close(); return; } // Remove the topic. try { await session.TopicControl.RemoveTopicsAsync( topic ); } catch(Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session. session.Close(); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2017, 2020 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.diffusion.examples; import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification; import static com.pushtechnology.diffusion.datatype.DataTypes.INT64_DATATYPE_NAME; import java.time.Instant; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.TimeSeries; import com.pushtechnology.diffusion.client.features.TimeSeries.EventMetadata; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * This example shows a control client creating a {@link TimeSeries} topic. * Values can be appended to the topic using {@link #appendValue(long)}, and * the last value of the topic can be edited using {@link #editLast(long)}. * Alternatively, the methods provided by the {@link TopicUpdate} feature can be * used. See {@link TopicUpdateExample} for example usages of this API. * * @author Push Technology Limited * @since 6.0 * @see ClientConsumingTimeSeriesTopics * @see TimeSeriesQueryExample */ public class ControlClientUpdatingTimeSeriesTopics { private static final String TOPIC_PATH = "foo/timeseries"; private static final Logger LOG = LoggerFactory.getLogger(ControlClientUpdatingTimeSeriesTopics.class); private final Session session; private final TimeSeries timeSeries; private final TopicControl topicControl; /** * Constructor. * * @param serverUrl server URL to connect to example "ws://diffusion.example.com:80" */ public ControlClientUpdatingTimeSeriesTopics(String serverUrl) throws InterruptedException, ExecutionException, TimeoutException { session = Diffusion.sessions().principal("control").password("password") .open(serverUrl); timeSeries = session.feature(TimeSeries.class); topicControl = session.feature(TopicControl.class); final TopicSpecification spec = newTopicSpecification(TopicType.TIME_SERIES) .withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, INT64_DATATYPE_NAME); topicControl.addTopic(TOPIC_PATH, spec) .thenAccept(result -> LOG.info("Add topic result: {}", result)).get(5, TimeUnit.SECONDS); } /** * Appends a value to the time series topic. * * @param value value to append * @return the event metadata from the successful append */ public EventMetadata appendValue(long value) throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { return timeSeries.append(TOPIC_PATH, Long.class, value).get(5, TimeUnit.SECONDS); } /** * Close the session and remove the time series topic. */ public void close() throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { topicControl.removeTopics("?foo//").get(5, TimeUnit.SECONDS); session.close(); } /** * Edit the last value in a time series topic. * * @param value value to edit with */ public void editLast(long value) { //Obtain the last value in the time series topic timeSeries.rangeQuery().fromLast(1).as(Long.class).selectFrom(TOPIC_PATH) .whenComplete((query, ex) -> { if (ex != null) { LOG.error("Error obtaining the range query: {}", ex); return; } //Perform the value edit query.stream().forEach(event -> { timeSeries.edit(TOPIC_PATH, event.sequence(), Long.class, value) .whenComplete((metadata, e) -> { if (e != null) { LOG.error("Error editing topic: {}", e); return; } LOG.info("EventMetadata from edit: {}", metadata); }); }); }); } /** * Appends a value to the time series topic. Allows for creation of events * with a custom timestamp. This can be used for loading historic or future * values. * * @param value value to append * @param timestamp the user supplied timestamp * @return the event metadata from the successful append */ public EventMetadata appendValue(long value, Instant timestamp) throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { return timeSeries.append(TOPIC_PATH, Long.class, value, timestamp).get(5, TimeUnit.SECONDS); } }