Just a second...

Specifying a reconnection strategy

Reconnection behavior can be configured using custom reconnection strategies.

The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.

Reconnection can only succeed if the client session is still available on the Diffusion™ server. The maximum time that the Diffusion server keeps client sessions in the DISCONNECTED state before closing them can be configured using the Connectors.xml configuration file. For more information, see Configuring connectors.

Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to the Diffusion server

Examples

JavaScript
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.

// Set the maximum amount of time we'll try and reconnect for to 10 minutes
var maximumTimeoutDuration = 1000 * 60 * 10;

// Set the maximum interval between reconnect attempts to 60 seconds
var maximumAttemptInterval = 1000 * 60;

// Set an upper limit to the number of times we'll try to reconnect for
var maximumAttempts = 25;

// Count the number of reconnection attempts we've made
var attempts = 0;

// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
var reconnectionStrategy = function(start, abort) {
    if (attempts > maximumAttempts) {
        abort();
    } else {
        var wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);

        // Wait the specified time period, and then start the reconnection attempt
        setTimeout(start, wait);
    }
};

// Connect to the server.
diffusion.connect({
    host : 'diffusion.example.com',
    port : 443,
    secure : true,
    principal : 'control',
    credentials : 'password',
    reconnect : {
        timeout : maximumTimeoutDuration,
        strategy : reconnectionStrategy
    }
}).then(function(session) {

    session.on('disconnect', function() {
        // This will be called when we lose connection. Because we've specified the
        // reconnection strategy, it will be called automatically when this event
        // is dispatched
    });

    session.on('reconnect', function() {
        // If the session is able to reconnect within the reconnect timeout, this
        // event will be dispatched to notify that normal operations may resume
        attempts = 0;
    });

    session.on('close', function() {
        // If the session is closed normally, or the session is unable to reconnect,
        // this event will be dispatched to notify that the session is no longer
        // operational.
    });
});
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Session.Reconnection;
using static System.Console;

namespace PushTechnology.ClientInterface.Example
{
    /// <summary>
    /// Implementation that demonstrates session reconnection strategy.
    /// </summary>
    public sealed class SessionReconnection
    {
        private int maximumTimeoutDuration = 1000 * 60 * 10;

        private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy();

        public SessionReconnection(string serverUrl)
        {
            var factory = Diffusion.Sessions;

            var session = Connect(serverUrl, factory);

            if (session != null)
            {
                WriteLine("The session has been created.");
            }

            Thread.Sleep(60000);

            session.Close();
        }

        public ISession Connect(string url, ISessionFactory initialFactory)
        {
            try
            {
                string principal = "control";
                string password = "password";

                var factory = initialFactory
                    .Principal(principal)
                    .Credentials(Diffusion.Credentials.Password(password))
                    .CertificateValidation((cert, chain, errors) 
                        => CertificateValidationResult.ACCEPT)
                    .ReconnectionTimeout(maximumTimeoutDuration)
                    .ReconnectionStrategy(reconnectionStrategy)
                    .SessionStateChangedHandler(OnSessionStateChanged);

                return factory.Open(url);
            }
            catch (Exception ex)
            {
                WriteLine($"Session connection error : {ex}.");
            }

            return null;
        }

        private void OnSessionStateChanged(object sender, SessionListenerEventArgs e)
        {
            if (e.NewState == SessionState.RECOVERING_RECONNECT)
            {
                // The session has been disconnected, and has entered
                // recovery state. It is during this state that
                // the reconnect strategy will be called
                WriteLine("The session has been disconnected.");
            }

            if (e.NewState == SessionState.CONNECTED_ACTIVE)
            {
                // The session has connected for the first time, or it has
                // been reconnected.
                reconnectionStrategy.Retries = 0;

                WriteLine("The session has connected.");
            }

            if (e.OldState == SessionState.RECOVERING_RECONNECT)
            {
                // The session has left recovery state. It may either be
                // attempting to reconnect, or the attempt has been aborted;
                // this will be reflected in the newState.
            }

            if (e.NewState == SessionState.CLOSED_BY_CLIENT)
            {
                WriteLine("The session has been closed.");
            }
        }

        /// <summary>
        /// A reconnection strategy that gets applied after the connection failure notification.
        /// </summary>
        private class ReconnectionStrategy : IReconnectionStrategy
        {
            public int Retries { get; set; }

            // Set the maximum interval between reconnect attempts to 60 seconds.
            private long maximumAttemptInterval = 1000 * 60;

            public ReconnectionStrategy() => Retries = 0;

            public async Task PerformReconnection(IReconnectionAttempt reconnection)
            {
                long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval);

                Thread.Sleep((int)wait);

                WriteLine("Attempting to reconnect...");

                reconnection.Start();
            }
        }
    }
}
Java and Android
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.Listener;
import com.pushtechnology.diffusion.client.session.Session.State;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;


/**
 * This example class demonstrates the ability to set a custom {@link ReconnectionStrategy}
 * when creating sessions.
 *
 * @author Push Technology Limited
 * @since 5.5
 */
public class ClientWithReconnectionStrategy {

    private volatile int retries = 0;
    /**
     * Constructor.
     */
    public ClientWithReconnectionStrategy() {

        // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
        final int maximumTimeoutDuration = 1000 * 60 * 10;

        // Set the maximum interval between reconnect attempts to 60 seconds.
        final long maximumAttemptInterval = 1000 * 60;

        // Create a new reconnection strategy that applies an exponential backoff
        final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {
            private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            @Override
            public void performReconnection(final ReconnectionAttempt reconnection) {
                final long exponentialWaitTime =
                    Math.min((long) Math.pow(2,  retries++) * 100L, maximumAttemptInterval);

                scheduler.schedule(new Runnable() {
                    @Override
                    public void run() {
                        reconnection.start();
                    }
                }, exponentialWaitTime, TimeUnit.MILLISECONDS);
            }
        };

        final Session session = Diffusion.sessions().reconnectionTimeout(maximumTimeoutDuration)
                                                    .reconnectionStrategy(reconnectionStrategy)
                                                    .open("ws://diffusion.example.com:80");
        session.addListener(new Listener() {
            @Override
            public void onSessionStateChanged(Session session, State oldState, State newState) {

                if (newState == State.RECOVERING_RECONNECT) {
                    // The session has been disconnected, and has entered recovery state. It is during this state that
                    // the reconnect strategy will be called
                }

                if (newState == State.CONNECTED_ACTIVE) {
                    // The session has connected for the first time, or it has been reconnected.
                    retries = 0;
                }

                if (oldState == State.RECOVERING_RECONNECT) {
                    // The session has left recovery state. It may either be attempting to reconnect, or the attempt has
                    // been aborted; this will be reflected in the newState.
                }
            }
        });
    }
}
                    
C
/*
 * This example shows how to make a synchronous connection to
 * Diffusion, with user-provided reconnection logic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr_time.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'s', "sleep", "Time to sleep before disconnecting (in seconds).", ARG_OPTIONAL, ARG_HAS_VALUE, "5" },
        END_OF_ARG_OPTS
};

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void
on_session_state_changed(SESSION_T *session,
        const SESSION_STATE_T old_state,
        const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}

typedef struct {
        long current_wait;
        long max_wait;
} BACKOFF_STRATEGY_ARGS_T;

static RECONNECTION_ATTEMPT_ACTION_T
backoff_reconnection_strategy(SESSION_T *session, void *args)
{
        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        printf("Waiting for %ld ms\n", backoff_args->current_wait);

        apr_sleep(backoff_args->current_wait * 1000); // µs -> ms

        // But only up to some maximum time.
        if(backoff_args->current_wait > backoff_args->max_wait) {
                backoff_args->current_wait = backoff_args->max_wait;
        }

        return RECONNECTION_ATTEMPT_ACTION_START;
}

static void
backoff_success(SESSION_T *session, void *args)
{
        printf("Reconnection successful\n");

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
        backoff_args->current_wait = 0; // Reset wait.
}

static void
backoff_failure(SESSION_T *session, void *args)
{
        printf("Reconnection failed (%s)\n", session_state_as_string(session->state));

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        // Exponential backoff.
        if(backoff_args->current_wait == 0) {
                backoff_args->current_wait = 1;
        }
        else {
                backoff_args->current_wait *= 2;
        }
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        const unsigned int sleep_time = atol(hash_get(options, "sleep"));

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        SESSION_LISTENER_T session_listener = { 0 };
        session_listener.on_state_changed = &on_session_state_changed;

        /*
         * Set the arguments to our exponential backoff strategy.
         */
        BACKOFF_STRATEGY_ARGS_T *backoff_args = calloc(1, sizeof(BACKOFF_STRATEGY_ARGS_T));
        backoff_args->current_wait = 0;
        backoff_args->max_wait = 5000;

        /*
         * Create the backoff strategy.
         */
        RECONNECTION_STRATEGY_T *reconnection_strategy =
                make_reconnection_strategy_user_function(backoff_reconnection_strategy,
                                                         backoff_args,
                                                         backoff_success,
                                                         backoff_failure,
                                                         NULL);

        /*
         * Only ever retry for 30 seconds.
         */
        reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);

        /*
         * Create a session, synchronously.
         */
        session = session_create(url, principal, credentials, &session_listener, reconnection_strategy, &error);
        if(session != NULL) {
                char *sid_str = session_id_to_string(session->id);
                printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
                free(sid_str);
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
        }

        // With the exception of backoff_args, the reconnection strategy is
        // copied withing session_create() and may be freed now.
        free(reconnection_strategy);

        /*
         * Sleep for a while.
         */
        sleep(sleep_time);

        /*
         * Close the session, and release resources and memory.
         */
        session_close(session, NULL);
        session_free(session);

        free(backoff_args);

        credentials_free(credentials);
        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}
                    
Apple
@import Diffusion;

@interface ExponentialBackoffReconnectionStrategy : NSObject <PTDiffusionSessionReconnectionStrategy>
@end

@implementation CustomReconnectionStrategyExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    NSLog(@"Connecting...");

    PTDiffusionMutableSessionConfiguration *const sessionConfiguration =
        [PTDiffusionMutableSessionConfiguration new];

    // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
    sessionConfiguration.reconnectionTimeout = @(10.0 * 60.0); // seconds

    // Set the reconnection strategy to be used.
    sessionConfiguration.reconnectionStrategy = [ExponentialBackoffReconnectionStrategy new];

    // Start connecting asynchronously.
    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;
    }];
}

@end

@implementation ExponentialBackoffReconnectionStrategy {
    NSUInteger _attemptCount;
}

-(void)         diffusionSession:(PTDiffusionSession *const)session
    wishesToReconnectWithAttempt:(PTDiffusionSessionReconnectionAttempt *const)attempt {
    // Limit the maximum time to delay between reconnection attempts to 60 seconds.
    const NSTimeInterval maximumAttemptInterval = 60.0;

    // Compute delay for exponential backoff based on the number of attempts so far.
    const NSTimeInterval delay = MIN(pow(2.0, _attemptCount++) * 0.1, maximumAttemptInterval);

    // Schedule asynchronous execution.
    NSLog(@"Reconnection attempt scheduled for %.2fs", delay);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)),
        dispatch_get_main_queue(), ^
    {
        NSLog(@"Attempting reconnection.");
        [attempt start];
    });
}

@end