Just a second...

Handling client queues

Each client session has a queue on the Diffusion™ server . Messages to be sent to the client are queued here. You can monitor the state of these queues and set client queue behavior.

Receiving notifications of client queue events

Required permissions:view_session, register_handler

A client can register a handler that is notified when outbound client queues at the Diffusion server reach pre-configured thresholds.

.NET
var queueEventHandler = new QueueEventHandler();
session.ClientControl.SetQueueEventHandler(queueEventHandler);
Java and Android
final ClientControl clientControl = session.feature(ClientControl.class);

clientControl.setQueueEventHandler(
    new ClientControl.QueueEventStream() {
        @Override
        public void onUpperThresholdCrossed(
            SessionId sessionId,
            MessageQueuePolicy policy) {
            // The action to perform when the queue upper threshold is crossed.
        }

        @Override
        public void onLowerThresholdCrossed(
            SessionId sessionId,
            MessageQueuePolicy policy) {
            // The action to perform when the queue lower threshold is crossed.
        }

        @Override
        public void onError(ErrorReason errorReason) { }

        @Override
        public void onClose() { }
    });
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_session_closed(void *context) {
        // session has been closed
        return HANDLER_SUCCESS;
}


void close_session(
        SESSION_T *session,
        SESSION_ID_T *session_id)
{
        DIFFUSION_CLIENT_CLOSE_WITH_SESSION_PARAMS_T params = {
                .session_id = session_id,
                .on_closed = on_session_closed,
        };

        diffusion_client_close_with_session(session, params, NULL);
}


static int on_sessions_closed(int number_of_matching_sessions, void *context)
{
        // sessions have been closed
        return HANDLER_SUCCESS;
}


void close_sessions_with_filter(
        SESSION_T *session,
        char *session_filter)
{
        DIFFUSION_CLIENT_CLOSE_WITH_FILTER_PARAMS_T params =  {
                .filter = session_filter,
                .on_clients_closed = on_sessions_closed
        };

        diffusion_client_close_with_filter(session, params, NULL);
}


static int on_roles_changed(void *context)
{
        // roles have been successfully changed
        return HANDLER_SUCCESS;
}


void change_roles(
        SESSION_T *session,
        SESSION_ID_T *session_id,
        SET_T *roles_to_add,
        SET_T *roles_to_remove)
{
        DIFFUSION_CHANGE_ROLES_WITH_SESSION_ID_PARAMS_T params = {
                .session_id = session_id,
                .roles_to_add = roles_to_add,
                .roles_to_remove = roles_to_remove,
                .on_roles_changed = on_roles_changed
        };
        diffusion_change_roles_with_session_id(session, params, NULL);

}


static int on_roles_changed_filter(
        int number_of_matching_sessions,
        void *context)
{
        // roles have been successfully changed
        return HANDLER_SUCCESS;
}


void change_roles_with_filter(
        SESSION_T *session,
        char *session_filter,
        SET_T *roles_to_add,
        SET_T *roles_to_remove)
{
        DIFFUSION_CHANGE_ROLES_WITH_FILTER_PARAMS_T params = {
                .filter = session_filter,
                .roles_to_add = roles_to_add,
                .roles_to_remove = roles_to_remove,
                .on_roles_changed = on_roles_changed_filter
        };
        diffusion_change_roles_with_filter(session, params, NULL);
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);
        SESSION_T *session =
                session_create(url, principal, credentials, NULL, NULL, NULL);

        // example of closing a session using session ID
        SESSION_ID_T *session_id =
                session_id_create_from_string("e9cfbd5be9a72622-f000000300000004");

        close_session(session, session_id);

        // example of closing sessions using a session filter
        char *session_filter = "$Principal is 'client'";

        close_sessions_with_filter(session, session_filter);

        // example of changing roles using a session ID
        SET_T *roles_to_add = set_new_string(1);
        set_add(roles_to_add, "AUTHENTICATION_HANDLER");

        SET_T *roles_to_remove = set_new_string(1);
        set_add(roles_to_remove, "TOPIC_CONTROL");

        change_roles(session, session_id, roles_to_add, roles_to_remove);

        // example of changing roles using a session filter
        change_roles_with_filter(session, session_filter, roles_to_add, roles_to_remove);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        session_id_free(session_id);
        set_free(roles_to_add);
        set_free(roles_to_remove);

        return EXIT_SUCCESS;
}
Apple
class QueueEventHandler: PTDiffusionClientQueueEventListener {
    func diffusionQueueEventListenerRegistration(_ registration: PTDiffusionRegistration,
                                                 didReportPolicyChange policy: PTDiffusionClientQueuePolicy,
                                                 forSession sessionId: PTDiffusionSessionId) {
        print("Session '%@' changed policy to %@", sessionId, policy)
    }

    func diffusionQueueEventListenerRegistrationDidClose(_ registration: PTDiffusionRegistration) {
        print("Queue event listener has closed")
    }

    func diffusionQueueEventListenerRegistration(_ registration: PTDiffusionRegistration,
                                                 didFailWithError error: Error) {
        print("Queue event listener failed with error: %@", error.localizedDescription)
    }
}

let handler = QueueEventHandler()
session.clientControl.setQueueEventHandler(handler) { (registration, error) in
    if (error != nil) {
        print("An error has occurred while setting the queue event handler: %@", error!.localizedDescription)
    }
    else {
        print("Queue event handler has been successfully registered")
    }
}