Just a second...

Session locks

Session locks are a way to ensure that only one session at a time can access a particular resource. For example, you can use a session lock to ensure that only one session is allowed to update a certain topic.

Session locks are a mechanism managed by the Diffusion™ server to coordinate access to shared resources among multiple sessions.

A session can acquire a lock, identified by a lock name (chosen by you to suit your application). Once a session acquires a lock, no other session can acquire the same lock.

Acquiring a lock does not automatically change anything else about a session. Locks are not linked to topics or permissions, except through your application's logic. It is up to you to design a suitable locking scheme and ensure your application implements it. For example, if you want to implement exclusive updating of a topic using a session lock, you must make sure that each session always acquires the lock and uses a lock constraint created from the lock when updating the topic.

By default, a lock is released when the session owning it closes. Alternatively, when acquiring a lock, a session can specify that the lock will be released if connection to the server is lost. This is done using a scope parameter.

A session can also explicitly release a lock.

JavaScript
const lock = await session.lock('lock_name', diffusion.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);
console.log('Acquired session lock');

setTimeout(async () => {
    await lock.unlock();
    console.log('Lock has been released');
}, 1000);
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that demonstrates session locks.
    /// </summary>
    public sealed class SessionLocks
    {
        private ISession session1, session2;
        private ISessionLock sessionLock1, sessionLock2;

        private static string LOCK_NAME = "lockA";

        public SessionLocks(string serverUrl)
        {
            session1 = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            session2 = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            WriteLine("Sessions 1 and 2 have been created.");

            AcquireLockSession1();
        }

        private async void AcquireLockSession1()
        {
            try
            {
                WriteLine("Requesting lock 1...");

                sessionLock1 = await session1.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);

                WriteLine("Lock 1 has been acquired.");

                AcquireLockSession2();

                Thread.Sleep(1000);

                ReleaseLock1();
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to get lock 1 : {ex}.");
                session1.Close();
                session2.Close();
            }
        }

        private async void AcquireLockSession2()
        {
            try
            {
                WriteLine("Requesting lock 2...");

                sessionLock2 = await session2.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);

                WriteLine("Lock 2 has been acquired.");

                Thread.Sleep(1000);

                ReleaseLock2();
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to get lock 2 : {ex}.");
                session1.Close();
                session2.Close();
            }
        }

        private async void ReleaseLock1()
        {
            try
            {
                WriteLine("Requesting lock 1 release...");

                await sessionLock1.UnlockAsync();

                WriteLine("Lock 1 has been released.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to release lock 1 : {ex}.");
                session1.Close();
                session2.Close();
            }
        }

        private async void ReleaseLock2()
        {
            try
            {
                WriteLine("Requesting lock 2 release...");

                await sessionLock2.UnlockAsync();

                WriteLine("Lock 2 has been released.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to release lock 2 : {ex}.");
            }
            finally
            {
                session1.Close();
                session2.Close();
            }
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2018, 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.SessionLock;
import com.pushtechnology.diffusion.client.session.Session.SessionLockScope;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.types.PathPermission;

/**
 * An example of a client that uses session locks to coordinate actions with
 * other sessions.
 *
 * <p>
 * In this example, a single session receives and processes updates from the
 * topic {@code topicA}. Each client instance running this code creates a
 * session and competes for the session lock {@code lockA}. The session that is
 * assigned the session lock will subscribe to the topic and log updates.
 *
 * <p>
 * {@link SessionLockScope#UNLOCK_ON_CONNECTION_LOSS UNLOCK_ON_CONNECTION_LOSS}
 * session locks are used. If the session that owns the session lock loses its
 * connection to the server, the server will reassign the lock to another
 * session. This example uses a session listener to independently detect the
 * connection loss, unsubscribe, unregister the stream listening for updates,
 * and compete for the lock again.
 *
 * <p>
 * The locking protocol has races documented under {@link SessionLock}. In the
 * context of this example, the consequences are:
 * <ul>
 * <li>There may be a transient period where two sessions are subscribed to the
 * topic, and both process the same update.
 * <li>A session acquiring a lock may miss one or more updates that were not
 * processed by the session that previously held the lock.
 * </ul>
 *
 * <h2>Security note</h2>
 *
 * <p>
 * To run this example, the "client" principal must be granted
 * {@link PathPermission#ACQUIRE_LOCK ACQUIRE_LOCK} permission to
 * {@code lockA}.
 *
 * @author DiffusionData Limited
 * @since 6.1
 */
public class ClientUsingSessionLocks {

    private static final Logger LOG =
        LoggerFactory.getLogger(ClientUsingSessionLocks.class);

    private static final String LOCK_NAME = "lockA";
    private static final String TOPIC_PATH = "topicA";

    private final Session session;

    private final ValueStream<String> stream = new LogUpdates();

    /**
     * Construct a request handling application.
     *
     * @param serverURL url of the server to connect to
     */
    public ClientUsingSessionLocks(String serverURL) {

        // The "client" principal must have ACQUIRE_LOCK permission, see note in
        // class Javadoc.
        session = Diffusion.sessions().principal("client").password("password")
            .open(serverURL);
    }

    /**
     * Start competing for the lock.
     */
    public void start() {
        session.addListener((s, oldState, newState) -> {
            if (newState.isClosed()) {
                onLockLost();
            }
        });

        requestLock();
    }

    private void requestLock() {
        session.lock(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS)
            .thenAccept(lock -> onLockAcquired());
    }

    private void onLockAcquired() {
        final Topics topics = session.feature(Topics.class);
        topics.subscribe(TOPIC_PATH);
        topics.addStream(TOPIC_PATH,  String.class, stream);
    }

    private void onLockLost() {
        final Topics topics = session.feature(Topics.class);

        // Remove the stream from the local registry. This will prevent
        // processing of updates that may already be queued for this session and
        // will be delivered on reconnection.
        topics.removeStream(stream);

        // Unsubscribe from the topic. This will not take effect until this
        // session has reconnected to the server.
        topics.unsubscribe(TOPIC_PATH);

        // Compete for the lock again. This will not take effect until this
        // session has reconnected to the server, and will be processed after
        // the unsubscription.
        requestLock();
    }

    /**
     * Close the session. If the session owned the lock, the server is free to
     * reassign it to another session.
     */
    public void close() {
        session.close();
    }

    /**
     * Log updates received for a topic.
     */
    private static class LogUpdates extends Topics.ValueStream.Default<String> {

        @Override
        public void onSubscription(
            String topicPath,
            TopicSpecification specification) {

            LOG.info("onSubscription({})", topicPath);
        }

        @Override
        public void onUnsubscription(String topicPath,
            TopicSpecification specification, UnsubscribeReason reason) {

            LOG.info("onUnsubscription({})", topicPath);
        }

        @Override
        public void onValue(
            String topicPath,
            TopicSpecification specification,
            String oldValue,
            String newValue) {

            LOG.info("onValue({}, {})", topicPath, newValue);
        }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


DIFFUSION_SESSION_LOCK_T *g_session_lock;


static int on_lock_acquired(
        const DIFFUSION_SESSION_LOCK_T *session_lock,
        void *context)
{
        // lock has been acquired
        g_session_lock = diffusion_session_lock_dup(session_lock);

        return HANDLER_SUCCESS;
}


static int on_unlock(
        bool lock_owned,
        void *context)
{
        // lock has been released
        diffusion_session_lock_free(g_session_lock);
        g_session_lock = NULL;

        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // acquire a session lock
        DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = {
                .on_lock_acquired = on_lock_acquired
        };
        diffusion_session_lock(session, "lock_a",  lock_params);

        // sleep for a while
        sleep(5);

        // release the session lock
        DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = {
                .on_unlock = on_unlock
        };
        diffusion_session_lock_unlock(session, g_session_lock, unlock_params);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SessionLocks {

    func run_example(url: URL, lock_name: String) {

        let credentials = PTDiffusionCredentials(password: "password")
        let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials)

        // establish a session
        PTDiffusionSession.open(with: url,
                                configuration: configuration) { (session, error) in
            if (error != nil) {
                print("An error has occurred while establishing a session: %@",
                      error!.localizedDescription)
            }
            else {
                // request a lock with name `lock_name`
                session?.lock(withName:lock_name) { (lock, error) in
                    if (error != nil) {
                        print("An error has occurred while attempting to retrieve lock '%@: %@",
                              lock_name,
                              error!.localizedDescription)
                    }
                    else {
                        print("Successfully acquired lock '%@'", lock_name)

                        // wait 5 seconds, then released the lock
                        let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(5)

                        DispatchQueue.main.asyncAfter(deadline: dispatch_time) {
                            print("Releasing lock now")

                            lock!.unlock { (was_owner, error) in
                                if (error != nil) {
                                    print("An error has occurred while releasing the lock: %@",
                                          error!.localizedDescription)
                                }
                                else {
                                    print("Lock has been successfully released")
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Acquiring a lock

Required permissions: acquire_lock

Session locks are established on demand. There is no separate operation to create or destroy a named lock.

If a session attempts to acquire a lock that is not assigned, the server assigns it immediately to the session.

If a session attempts to acquire a lock that is already assigned, the server will record that the session is waiting to acquire it. When a lock is released and multiple sessions are waiting to acquire it, the server will arbitrarily assign it to one of the waiting sessions.

A session can request a lock with these parameters:
Lock name
A name for the lock.
Lock scope (optional)

The scope of the lock.

By default, the scope is UNLOCK_ON_SESSION_LOSS, meaning that the lock will be released when the session is closed.

If the scope is set to UNLOCK_ON_CONNECTION_LOSS, the lock will be released when the session loses its current connection to the server.