Just a second...

Specifying a reconnection strategy

Reconnection behavior can be configured using custom reconnection strategies.

The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.

Reconnection can only succeed if the client session is still available on Diffusion™ Cloud. The maximum time that Diffusion Cloud keeps client sessions available for reconnection is 60 seconds.

Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to Diffusion Cloud

Examples

JavaScript
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.

// Set the maximum amount of time we'll try and reconnect for to 10 minutes
var maximumTimeoutDuration = 1000 * 60 * 10;

// Set the maximum interval between reconnect attempts to 60 seconds
var maximumAttemptInterval = 1000 * 60;

// Set an upper limit to the number of times we'll try to reconnect for
var maximumAttempts = 25;

// Count the number of reconnection attempts we've made
var attempts = 0;

// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
var reconnectionStrategy = function(start, abort) {
    if (attempts > maximumAttempts) {
        abort();
    } else {
        var wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);

        // Wait the specified time period, and then start the reconnection attempt
        setTimeout(start, wait);
    }
};

// Connect to the server.
diffusion.connect({
    host : 'diffusion.example.com',
    port : 443,
    secure : true,
    principal : 'control',
    credentials : 'password',
    reconnect : {
        timeout : maximumTimeoutDuration,
        strategy : reconnectionStrategy
    }
}).then(function(session) {

    session.on('disconnect', function() {
        // This will be called when we lose connection. Because we've specified the 
        // reconnection strategy, it will be called automatically when this event
        // is dispatched
    });

    session.on('reconnect', function() {
        // If the session is able to reconnect within the reconnect timeout, this
        // event will be dispatched to notify that normal operations may resume
        attempts = 0;
    });

    session.on('close', function() {
        // If the session is closed normally, or the session is unable to reconnect,
        // this event will be dispatched to notify that the session is no longer
        // operational.
    });
});
Apple
@import Diffusion;

@interface ExponentialBackoffReconnectionStrategy : NSObject <PTDiffusionSessionReconnectionStrategy>
@end

@implementation CustomReconnectionStrategyExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    NSLog(@"Connecting...");

    PTDiffusionMutableSessionConfiguration *const sessionConfiguration =
        [PTDiffusionMutableSessionConfiguration new];

    // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
    sessionConfiguration.reconnectionTimeout = @(10.0 * 60.0); // seconds

    // Set the reconnection strategy to be used.
    sessionConfiguration.reconnectionStrategy = [ExponentialBackoffReconnectionStrategy new];

    // Start connecting asynchronously.
    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;
    }];
}

@end

@implementation ExponentialBackoffReconnectionStrategy {
    NSUInteger _attemptCount;
}

-(void)         diffusionSession:(PTDiffusionSession *const)session
    wishesToReconnectWithAttempt:(PTDiffusionSessionReconnectionAttempt *const)attempt {
    // Limit the maximum time to delay between reconnection attempts to 60 seconds.
    const NSTimeInterval maximumAttemptInterval = 60.0;

    // Compute delay for exponential backoff based on the number of attempts so far.
    const NSTimeInterval delay = MIN(pow(2.0, _attemptCount++) * 0.1, maximumAttemptInterval);

    // Schedule asynchronous execution.
    NSLog(@"Reconnection attempt scheduled for %.2fs", delay);
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)),
        dispatch_get_main_queue(), ^
    {
        NSLog(@"Attempting reconnection.");
        [attempt start];
    });
}

@end
                    
Java and Android
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.Listener;
import com.pushtechnology.diffusion.client.session.Session.State;
import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy;


/**
 * This example class demonstrates the ability to set a custom {@link ReconnectionStrategy}
 * when creating sessions.
 *
 * @author Push Technology Limited
 * @since 5.5
 */
public class ClientWithReconnectionStrategy {

    private volatile int retries = 0;
    /**
     * Constructor.
     */
    public ClientWithReconnectionStrategy() {

        // Set the maximum amount of time we'll try and reconnect for to 10 minutes.
        final int maximumTimeoutDuration = 1000 * 60 * 10;

        // Set the maximum interval between reconnect attempts to 60 seconds.
        final long maximumAttemptInterval = 1000 * 60;

        // Create a new reconnection strategy that applies an exponential backoff
        final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {
            private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            @Override
            public void performReconnection(final ReconnectionAttempt reconnection) {
                final long exponentialWaitTime =
                    Math.min((long) Math.pow(2,  retries++) * 100L, maximumAttemptInterval);

                scheduler.schedule(new Runnable() {
                    @Override
                    public void run() {
                        reconnection.start();
                    }
                }, exponentialWaitTime, TimeUnit.MILLISECONDS);
            }
        };

        final Session session = Diffusion.sessions().reconnectionTimeout(maximumTimeoutDuration)
                                                    .reconnectionStrategy(reconnectionStrategy)
                                                    .open("ws://diffusion.example.com:80");
        session.addListener(new Listener() {
            @Override
            public void onSessionStateChanged(Session session, State oldState, State newState) {

                if (newState == State.RECOVERING_RECONNECT) {
                    // The session has been disconnected, and has entered recovery state. It is during this state that
                    // the reconnect strategy will be called
                }

                if (newState == State.CONNECTED_ACTIVE) {
                    // The session has connected for the first time, or it has been reconnected.
                    retries = 0;
                }

                if (oldState == State.RECOVERING_RECONNECT) {
                    // The session has left recovery state. It may either be attempting to reconnect, or the attempt has
                    // been aborted; this will be reflected in the newState.
                }
            }
        });
    }
}
.NET
                        
                    
C
/*
 * This example shows how to make a synchronous connection to
 * Diffusion, with user-provided reconnection logic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr_time.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'s', "sleep", "Time to sleep before disconnecting (in seconds).", ARG_OPTIONAL, ARG_HAS_VALUE, "5" },
        END_OF_ARG_OPTS
};

/*
 * This callback is used when the session state changes, e.g. when a session
 * moves from a "connecting" to a "connected" state, or from "connected" to
 * "closed".
 */
static void
on_session_state_changed(SESSION_T *session,
        const SESSION_STATE_T old_state,
        const SESSION_STATE_T new_state)
{
        printf("Session state changed from %s (%d) to %s (%d)\n",
               session_state_as_string(old_state), old_state,
               session_state_as_string(new_state), new_state);
}

typedef struct {
        long current_wait;
        long max_wait;
} BACKOFF_STRATEGY_ARGS_T;

static RECONNECTION_ATTEMPT_ACTION_T
backoff_reconnection_strategy(SESSION_T *session, void *args)
{
        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        printf("Waiting for %ld ms\n", backoff_args->current_wait);

        apr_sleep(backoff_args->current_wait * 1000); // µs -> ms

        // But only up to some maximum time.
        if(backoff_args->current_wait > backoff_args->max_wait) {
                backoff_args->current_wait = backoff_args->max_wait;
        }

        return RECONNECTION_ATTEMPT_ACTION_START;
}

static void
backoff_success(SESSION_T *session, void *args)
{
        printf("Reconnection successful\n");

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
        backoff_args->current_wait = 0; // Reset wait.
}

static void
backoff_failure(SESSION_T *session, void *args)
{
        printf("Reconnection failed (%s)\n", session_state_as_string(session->state));

        BACKOFF_STRATEGY_ARGS_T *backoff_args = args;

        // Exponential backoff.
        if(backoff_args->current_wait == 0) {
                backoff_args->current_wait = 1;
        }
        else {
                backoff_args->current_wait *= 2;
        }
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        const unsigned int sleep_time = atol(hash_get(options, "sleep"));

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        SESSION_LISTENER_T session_listener = { 0 };
        session_listener.on_state_changed = &on_session_state_changed;

        /*
         * Set the arguments to our exponential backoff strategy.
         */
        BACKOFF_STRATEGY_ARGS_T *backoff_args = calloc(1, sizeof(BACKOFF_STRATEGY_ARGS_T));
        backoff_args->current_wait = 0;
        backoff_args->max_wait = 5000;

        /*
         * Create the backoff strategy.
         */
        RECONNECTION_STRATEGY_T *reconnection_strategy =
                make_reconnection_strategy_user_function(backoff_reconnection_strategy,
                                                         backoff_args,
                                                         backoff_success,
                                                         backoff_failure,
                                                         NULL);

        /*
         * Only ever retry for 30 seconds.
         */
        reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);

        /*
         * Create a session, synchronously.
         */
        session = session_create(url, principal, credentials, &session_listener, reconnection_strategy, &error);
        if(session != NULL) {
                char *sid_str = session_id_to_string(session->id);
                printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
                free(sid_str);
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
        }

        // With the exception of backoff_args, the reconnection strategy is
        // copied withing session_create() and may be freed now.
        free(reconnection_strategy);

        /*
         * Sleep for a while.
         */
        sleep(sleep_time);

        /*
         * Close the session, and release resources and memory.
         */
        session_close(session, NULL);
        session_free(session);

        free(backoff_args);

        credentials_free(credentials);
        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}