Just a second...

Example: Make exclusive updates to a topic

The following examples use the Diffusion™ API to register as the update source of a topic and to update that topic with content. A client that updates a topic using this method locks the topic and prevents other clients from updating the topic.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {
    // A session may establish an exclusive update source. Once active, only this session may update topics at or
    // under the registration branch.

    session.topics.registerUpdateSource('exclusive/topic', {
        onRegister : function(topic, deregister) {
            // The handler provides a deregistration function to remove this registration and allow other sessions to
            // update topics under the registered path.
        },
        onActive : function(topic, updater) {
            // Once active, a handler may use the provided updater to update any topics at or under the registered path
            updater.update('exclusive/topic/bar', 123).then(function() {
                // The update was successful.
            }, function(err) {
                // There was an error updating the topic
            });
        },
        onStandBy : function(topic) {
            // If there is another update source registered for the same topic path, any subsequent registrations will
            // be put into a standby state. The registration is still held by the server, and the 'onActive' function
            // will be called if the pre-existing registration is closed at a later point in time
        },
        onClose : function(topic, err) {
            // The 'onClose' function will be called once the registration is closed, either by the session being closed
            // or the 'deregister' function being called.
        }
    });
});
                    
.NET
/**
 * Copyright © 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using PushTechnology.ClientInterface.Data.JSON;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that adds and updates a JSON topic exclusively.
    /// </summary>
    public sealed class JSONTopicsExclusiveUpdate{

        public async Task JSONTopicsExclusiveUpdateExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) 
                    => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var topicControl = session.TopicControl;
            var topicUpdate = session.TopicUpdate;

            // Create a JSON topic 'random/JSON'
            string topic = "random/JSON";

            try {
                await topicControl.AddTopicAsync(topic, TopicType.JSON);
                WriteLine($"Topic '{topic}' added successfully.");
            }
            catch (Exception ex) 
            {
                WriteLine( $"Failed to add topic '{topic}' : {ex}." );
                session.Close();
                return;
            }

            WriteLine($"Updating topic '{topic}' with a new value:");

            var newValue = Diffusion.DataTypes.JSON.FromJSONString(
                "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," +
                "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}");

            var sessionLock = await session.LockAsync(topic);

            try
            {
                var constraint = Diffusion.UpdateConstraints.Locked(sessionLock);
                await topicUpdate.SetAsync(topic, newValue, constraint);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Topic {topic} could not be updated : {ex}.");
            }

            // Remove the JSON topic 'random/JSON'
            try {
                await topicControl.RemoveTopicsAsync(topic);
            } 
            catch (Exception ex) 
            {
                WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
            }

            await sessionLock.UnlockAsync();

            // Close the session
            session.Close();
        }
    }

    /// <summary>
    /// Client implementation that subscribes to a JSON topic and consumes the data it receives.
    /// </summary>
    public sealed class JSONTopics
    {
        public async Task JSONTopicsConsumerExample(string serverUrl)
        {
            // Connect anonymously
            var session = Diffusion.Sessions.Open(serverUrl);

            // Get the Topics feature to subscribe to topics
            var topics = session.Topics;
            string topic = "random/JSON";

            // Add a topic stream for 'random/JSON'
            var jsonStream = new JSONStream();
            topics.AddStream(topic, jsonStream);

            try
            {
                // Subscribe to 'random/JSON' topic
                await topics.SubscribeAsync(topic);

                await Task.Delay(TimeSpan.FromMilliseconds(300));
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
            }
            finally
            {
                // Note that closing the session, will automatically unsubscribe from all topics
                // the client is subscribed to.
                topics.RemoveStream(jsonStream);
                session.Close();
            }
        }

        /// <summary>
        /// Basic implementation of the IValueStream for JSON topics.
        /// </summary>
        private sealed class JSONStream : IValueStream<IJSON>
        {
            /// <summary>
            /// Notification of stream being closed normally.
            /// </summary>
            public void OnClose()
                => WriteLine("The subscrption stream is now closed.");

            /// <summary>
            /// Notification of a contextual error related to this callback.
            /// </summary>
            /// <param name="errorReason">Error reason.</param>
            public void OnError(ErrorReason errorReason)
                => WriteLine($"An error has occured : {errorReason}.");

            /// <summary>
            /// Notification of a successful subscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            public void OnSubscription(string topicPath, ITopicSpecification specification)
                => WriteLine($"Client subscribed to topic '{topicPath}'.");

            /// <summary>
            /// Notification of a successful unsubscription.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="reason">Error reason.</param>
            public void OnUnsubscription(string topicPath, ITopicSpecification specification, 
                                         TopicUnsubscribeReason reason)
                => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");

            /// <summary>
            /// Topic update received.
            /// </summary>
            /// <param name="topicPath">Topic path.</param>
            /// <param name="specification">Topic specification.</param>
            /// <param name="oldValue">Value prior to update.</param>
            /// <param name="newValue">Value after update.</param>
            public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, 
                                IJSON newValue)
                => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}.");
        }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


DIFFUSION_SESSION_LOCK_T *g_session_lock;


static int on_topic_update(void *context)
{
        // topic has been updated
        return HANDLER_SUCCESS;
}


static int on_topic_added(
        SESSION_T *session,
        TOPIC_ADD_RESULT_CODE result_code,
        void *context)
{
        // topic has been added
        return HANDLER_SUCCESS;
}


static int on_lock_acquired(
        const DIFFUSION_SESSION_LOCK_T *session_lock,
        void *context)
{
        // lock has been acquired
        g_session_lock = diffusion_session_lock_dup(session_lock);

        return HANDLER_SUCCESS;
}


static int on_unlock(
        bool lock_owned,
        void *context)
{
        // lock has been released
        diffusion_session_lock_free(g_session_lock);
        g_session_lock = NULL;

        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // Create a JSON topic
        TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);

        ADD_TOPIC_CALLBACK_T add_topic_callback = {
                .on_topic_added_with_specification = on_topic_added
        };
        add_topic_from_specification(session, topic_path, specification, add_topic_callback);

        // Sleep for a while
        sleep(5);

        // acquire a session lock
        DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = {
                .on_lock_acquired = on_lock_acquired
        };
        diffusion_session_lock(session, "lock_a",  lock_params);

        // Sleep for a while
        sleep(5);

        // create update constraint that requires session lock to update topic
        DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *update_constraint =
                diffusion_topic_update_constraint_locked(g_session_lock);

        // Update the topic with a JSON value
        BUF_T *value_buf = buf_create();
        write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);

        DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params = {
                .topic_path = topic_path,
                .datatype = DATATYPE_JSON,
                .update = value_buf,
                .on_topic_update = on_topic_update
        };

        diffusion_topic_update_set_with_constraint(session, update_constraint, update_params);

        // Sleep for a while
        sleep(5);

        // release the session lock
        DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = {
                .on_unlock = on_unlock
        };
        diffusion_session_lock_unlock(session, g_session_lock, unlock_params);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        buf_free(value_buf);
        credentials_free(credentials);
        topic_specification_free(specification);

        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

/**
 * An example of using a control client to create and update a topic in exclusive mode
 */
class TopicUpdateExclusive {

    func run_example(url: URL, topic_path: String, value: String, lock_name: String) {

        // setup credentials and configuration
        let credentials = PTDiffusionCredentials(password: "password")
        let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials)

        // establish the session
        PTDiffusionSession.open(with: url,
                                configuration: configuration) { (session, error) in
            if (error != nil) {
                print("An error has occurred while establishing a session: %@",
                      error!.localizedDescription)
            }
            else {
                // create a new String Topic at `topic_path`
                session!.topicControl.addTopic(withPath: topic_path,
                                               type: PTDiffusionTopicType.string) { (result, error) in
                    if (error != nil) {
                        print("An error has occurred while creating the topic '%@': %@",
                              topic_path,
                              error!.localizedDescription)
                    }
                    else {
                        print("Topic %@ has been successfully created", topic_path)

                        // acquire the lock `lock_name`
                        session?.lock(withName:lock_name) { (lock, error) in
                            if (error != nil) {
                                print("An error has occurred while attempting to retrieve lock '%@: %@",
                                      lock_name,
                                      error!.localizedDescription)
                            }
                            else {
                                print("Successfully acquired lock '%@'", lock_name)

                                // add an update constraint that requires the calling session to hold the
                                // lock `lock_name` to update topic `topic_path`
                                let update_constraint = PTDiffusionUpdateConstraint.locked(with: lock!)

                                // update `topic_path` with `value`
                                try! session!.topicUpdate.setWithPath(topic_path,
                                                                      toStringValue: value,
                                                                      constraint: update_constraint) { (error) in
                                    if (error != nil) {
                                        print("An error has occurred while set the topic '%@' to '%@': %@",
                                              topic_path,
                                              value,
                                              error!.localizedDescription)
                                    }
                                    else {
                                        print("Topic '%@' has been successfully updated to %@",
                                              topic_path,
                                              value)
                                    }
                                }

                                // if there are no more updates to be done by this session, release the lock
                                lock!.unlock { (was_owner, error) in
                                    if (error != nil) {
                                        print("An error has occurred while releasing the lock: %@",
                                              error!.localizedDescription)
                                    }
                                    else {
                                        print("Lock has been successfully released")
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Change the URL from that provided in the example to the URL of the Diffusion server .