Just a second...

Specifying a reconnection strategy

Reconnection behavior can be configured using custom reconnection strategies.

The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.

Reconnection can only succeed if the client session is still available on the Diffusion™ server. The maximum time that the Diffusion server keeps client sessions in the DISCONNECTED state before closing them can be configured using the Connectors.xml configuration file. For more information, see .

Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to the Diffusion server

Examples

JavaScript
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.

// Set the maximum amount of time we'll try and reconnect for to 10 minutes
const maximumTimeoutDuration = 1000 * 60 * 10;

// Set the maximum interval between reconnect attempts to 60 seconds
const maximumAttemptInterval = 1000 * 60;

// Set an upper limit to the number of times we'll try to reconnect for
const maximumAttempts = 25;

// Count the number of reconnection attempts we've made
let attempts = 0;

// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
const reconnectionStrategy = (start, abort) => {
    if (attempts > maximumAttempts) {
        abort();
    } else {
        const wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);

        // Wait the specified time period, and then start the reconnection attempt
        setTimeout(start, wait);
    }
};

// Connect to the server.
const session = await diffusion.connect({
    host : 'diffusion.example.com',
    port : 443,
    secure : true,
    principal : 'control',
    credentials : 'password',
    reconnect : {
        timeout : maximumTimeoutDuration,
        strategy : reconnectionStrategy
    }
});

session.on('disconnect', () => {
    // This will be called when we lose connection. Because we've specified the
    // reconnection strategy, it will be called automatically when this event
    // is dispatched
});

session.on('reconnect', () => {
    // If the session is able to reconnect within the reconnect timeout, this
    // event will be dispatched to notify that normal operations may resume
    attempts = 0;
});

session.on('close', () => {
    // If the session is closed normally, or the session is unable to reconnect,
    // this event will be dispatched to notify that the session is no longer
    // operational.
});
.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Session.Reconnection;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Implementation that demonstrates session reconnection strategy.
    /// </summary>
    public sealed class SessionReconnection
    {
        private int maximumTimeoutDuration = 1000 * 60 * 10;

        private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy();

        public SessionReconnection(string serverUrl)
        {
            var factory = Diffusion.Sessions;

            var session = Connect(serverUrl, factory);

            if (session != null)
            {
                WriteLine("The session has been created.");
            }

            Thread.Sleep(60000);

            session.Close();
        }

        public ISession Connect(string url, ISessionFactory initialFactory)
        {
            try
            {
                string principal = "control";
                string password = "password";

                var factory = initialFactory
                    .Principal(principal)
                    .Credentials(Diffusion.Credentials.Password(password))
                    .CertificateValidation((cert, chain, errors)
                        => CertificateValidationResult.ACCEPT)
                    .ReconnectionTimeout(maximumTimeoutDuration)
                    .ReconnectionStrategy(reconnectionStrategy)
                    .SessionStateChangedHandler(OnSessionStateChanged);

                return factory.Open(url);
            }
            catch (Exception ex)
            {
                WriteLine($"Session connection error : {ex}.");
            }

            return null;
        }

        private void OnSessionStateChanged(object sender, SessionListenerEventArgs e)
        {
            if (e.NewState == SessionState.RECOVERING_RECONNECT)
            {
                // The session has been disconnected, and has entered
                // recovery state. It is during this state that
                // the reconnect strategy will be called
                WriteLine("The session has been disconnected.");
            }

            if (e.NewState == SessionState.CONNECTED_ACTIVE)
            {
                // The session has connected for the first time, or it has
                // been reconnected.
                reconnectionStrategy.Retries = 0;

                WriteLine("The session has connected.");
            }

            if (e.OldState == SessionState.RECOVERING_RECONNECT)
            {
                // The session has left recovery state. It may either be
                // attempting to reconnect, or the attempt has been aborted;
                // this will be reflected in the newState.
            }

            if (e.NewState == SessionState.CLOSED_BY_CLIENT)
            {
                WriteLine("The session has been closed.");
            }
        }

        /// <summary>
        /// A reconnection strategy that gets applied after the connection failure notification.
        /// </summary>
        private class ReconnectionStrategy : IReconnectionStrategy
        {
            public int Retries { get; set; }

            // Set the maximum interval between reconnect attempts to 60 seconds.
            private long maximumAttemptInterval = 1000 * 60;

            public ReconnectionStrategy() => Retries = 0;

            public async Task PerformReconnection(IReconnectionAttempt reconnection)
            {
                long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval);

                Thread.Sleep((int)wait);

                WriteLine("Attempting to reconnect...");

                reconnection.Start();
            }
        }
    }
}
Java and Android
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.Listener;
import com.pushtechnology.diffusion.client.session.Session.State;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;


/**
 * This example class demonstrates the ability to set a custom {@link ReconnectionStrategy}
 * when creating sessions.
 *
 * @author Push Technology Limited
 * @since 5.5
 */
public class ClientWithReconnectionStrategy {

    private volatile int retries = 0;
    /**
     * Constructor.
     */
    public ClientWithReconnectionStrategy() {

        // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
        final int maximumTimeoutDuration = 1000 * 60 * 10;

        // Set the maximum interval between reconnect attempts to 60 seconds.
        final long maximumAttemptInterval = 1000 * 60;

        // Create a new reconnection strategy that applies an exponential backoff
        final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {
            private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            @Override
            public void performReconnection(final ReconnectionAttempt reconnection) {
                final long exponentialWaitTime =
                    Math.min((long) Math.pow(2,  retries++) * 100L, maximumAttemptInterval);

                scheduler.schedule(new Runnable() {
                    @Override
                    public void run() {
                        reconnection.start();
                    }
                }, exponentialWaitTime, TimeUnit.MILLISECONDS);
            }
        };

        final Session session = Diffusion.sessions().reconnectionTimeout(maximumTimeoutDuration)
                                                    .reconnectionStrategy(reconnectionStrategy)
                                                    .open("ws://diffusion.example.com:80");
        session.addListener(new Listener() {
            @Override
            public void onSessionStateChanged(Session session, State oldState, State newState) {

                if (newState == State.RECOVERING_RECONNECT) {
                    // The session has been disconnected, and has entered recovery state. It is during this state that
                    // the reconnect strategy will be called
                }

                if (newState == State.CONNECTED_ACTIVE) {
                    // The session has connected for the first time, or it has been reconnected.
                    retries = 0;
                }

                if (oldState == State.RECOVERING_RECONNECT) {
                    // The session has left recovery state. It may either be attempting to reconnect, or the attempt has
                    // been aborted; this will be reflected in the newState.
                }
            }
        });
    }
}
                    
C
/**
 * Copyright © 2021 - 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include <time.h>

#include "diffusion.h"

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void on_session_state_changed(SESSION_T *session,
        const SESSION_STATE_T old_state,
        const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}


typedef struct {
        double current_wait;
        double max_wait;
} BACKOFF_STRATEGY_ARGS_T;


static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy(
        SESSION_T *session,
        void *args)
{
        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        printf("Waiting for %f ms\n", backoff_args->current_wait);

        sleep(backoff_args->current_wait);

        // But only up to some maximum time.
        if(backoff_args->current_wait > backoff_args->max_wait) {
                backoff_args->current_wait = backoff_args->max_wait;
        }

        return RECONNECTION_ATTEMPT_ACTION_START;
}


static void backoff_success(SESSION_T *session, void *args)
{
        printf("Reconnection successful\n");

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
        backoff_args->current_wait = 0; // Reset wait.
}


static void backoff_failure(SESSION_T *session, void *args)
{
        printf("Reconnection failed (%s)\n", session_state_as_string(session->state));

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        // Exponential backoff.
        if(backoff_args->current_wait == 0) {
                backoff_args->current_wait = 0.01;
        }
        else {
                backoff_args->current_wait *= 2;
        }
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_LISTENER_T session_listener = {
                .on_state_changed = on_session_state_changed
        };

        // Set the arguments to our exponential backoff strategy
        BACKOFF_STRATEGY_ARGS_T backoff_args = {
                .current_wait = 0,
                .max_wait = 5000
        };

        // Create the backoff strategy
        RECONNECTION_STRATEGY_T *reconnection_strategy =
                make_reconnection_strategy_user_function(
                        backoff_reconnection_strategy,
                        &backoff_args,
                        backoff_success,
                        backoff_failure);

        // Only retry for 30 seconds
        reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);

        // Create a session, synchronously
        session = session_create(
                url,
                principal,
                credentials,
                &session_listener,
                reconnection_strategy,
                &error);
        if(session != NULL) {
                char *sid_str = session_id_to_string(session->id);
                printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
                free(sid_str);
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
        }

        // With the exception of backoff_args, the reconnection strategy is
        // copied withing session_create() and may be freed now
        free_reconnection_strategy(reconnection_strategy);

        // Sleep for a while
        sleep(20);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
class ExponentialBackoffReconnectionStrategy: NSObject, PTDiffusionSessionReconnectionStrategy {

    var attempt_count : Int = 0

    func diffusionSession(_ session: PTDiffusionSession,
                          wishesToReconnectWith attempt: PTDiffusionSessionReconnectionAttempt) {

        // limit the maximum delay time between reconnection attempts to 60 seconds
        let maximum_attempt_interval = 60.0

        // compute delay for exponential backoff on the number of attempts so far
        self.attempt_count += 1
        let delay = min(pow(2.0, Double(self.attempt_count)) * 0.1, maximum_attempt_interval)

        // schedule asynchronous execution
        let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(Int(delay))
        DispatchQueue.main.asyncAfter(deadline: dispatch_time) {
            print("Attempting to reconnect now")
            attempt.start()
        }
    }
}


func connect() {
    let configuration = PTDiffusionMutableSessionConfiguration()

    // Set the maximum amount of time we'll try and reconnect for to 10 minutes
    configuration.reconnectionTimeout = NSNumber(value: 10.0 * 60.0)

    // Set the reconnection strategy to be used
    configuration.reconnectionStrategy = ExponentialBackoffReconnectionStrategy()

    // Use the configuration to open a new session...
    PTDiffusionSession.open(with: URL(string: "url")!,
                            configuration: configuration) { (session, error) in

        // Check error is `nil`, then use session as required.
        // Ensure to maintain a strong reference to the session beyond the lifetime
        // of this callback, for example by assigning it to an instance variable.

        if (session == nil) {
            print("Failed to open session: %@", error!.localizedDescription)
            return
        }

        // At this point we now have a connected session.
        print("Connected.")
    }
}