Just a second...

Example: Use session property filters with messaging

The following examples use the MessagingControl feature in the Unified API to request session properties with messages sent to topic paths and to send messages to one or more clients depending on the values of their session properties.

JavaScript
var diffusion = require('../../js-uci-client/src/diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host        : 'diffusion.example.com',
    port        : 443,
    secure      : true,
    principal   : 'control',
    credentials : 'password'
}).then(function(session) {

    // Create a listener for a stream of messages on a specific path.
    session.messages.listen('foo').on('message', function(msg) {
        console.log('Received message: ' + msg.content);
    });

    // Send a message to another session listening on 'foo' by way of
    // session properties.
    session.messages.send('foo', 'Hello world', '$Principal is "control"');
});
Java and Android
package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.content.Content;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl.MessageHandler;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl.SendToFilterCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionId;
import com.pushtechnology.diffusion.client.types.ReceiveContext;

/**
 * This is an example of a control client using the 'MessagingControl' feature
 * to send messages to clients using message filters. It also demonstrates the
 * ability to register a message handler with an interest in session property
 * values.
 *
 * @author Push Technology Limited
 * @since 5.5
 */
public final class ControlClientUsingFiltersAndProperties {

    private final Session session;
    private final MessagingControl messagingControl;
    private final SendToFilterCallback sendToFilterCallback;

    /**
     * Constructor.
     *
     * @param callback for result of sends
     */
    public ControlClientUsingFiltersAndProperties(SendToFilterCallback callback) {

        sendToFilterCallback = callback;

        session =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");
        messagingControl = session.feature(MessagingControl.class);

        // Register to receive all messages sent by clients on the "foo" branch
        // and include the "JobTitle" session property value with each message.
        // To do this, the client session must have the 'register_handler'
        // permission.
        messagingControl.addMessageHandler(
            "foo", new BroadcastHandler(), "JobTitle");
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }

    /**
     * Handler that will pass any message to all sessions that have a "JobTitle"
     * property set to "Staff" if, and only if it comes from a session that has
     * a "JobTitle" set to "Manager".
     */
    private class BroadcastHandler extends MessageHandler.Default {
        @Override
        public void onMessage(
            SessionId sessionId,
            String topicPath,
            Content content,
            ReceiveContext context) {

            if ("Manager".equals(context.getSessionProperties().get("JobTitle"))) {
                messagingControl.sendToFilter(
                    "JobTitle is 'Staff'",
                    topicPath,
                    content,
                    messagingControl.sendOptionsBuilder()
                        .headers(context.getHeaderList())
                        .build(),
                    sendToFilterCallback);
            }

        }
    }

}
.NET
using System.Linq;
using PushTechnology.ClientInterface.Client.Content;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Types;

namespace Examples {
    /// <summary>
    /// This is an example of a control client using the 'MessagingControl' feature to send messages to clients using
    /// message filters. It also demonstrates the ability to register a message handler with an interest in session
    /// property values.
    /// </summary>
    public class ControlClientUsingFiltersAndProperties {
        private readonly ISession theSession;
        private readonly IMessagingControl theMessagingControl;
        private readonly ISendToFilterCallback theSendToFilterCallback;

        public ControlClientUsingFiltersAndProperties( ISendToFilterCallback callback ) {
            theSendToFilterCallback = callback;

            theSession = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .Open( "ws://diffusion.example.com:80" );

            theMessagingControl = theSession.GetMessagingControlFeature();

            // Register and receive all messages sent by clients on the "foo" branch and include the "JobTitle" session
            // property value with each message. To do this, the client session must have the "register_handler"
            // permission.
            theMessagingControl.AddMessageHandler(
                "foo",
                new BroadcastHandler( theMessagingControl, theSendToFilterCallback ),
                "JobTitle" );
        }

        public void Close() {
            theSession.Close();
        }

        private class BroadcastHandler : IMessageHandler {
            private readonly IMessagingControl theMessagingControl;
            private readonly ISendToFilterCallback theSendToFilterCallback;

            /// <summary>
            /// Constructor.
            /// </summary>
            /// <param name="messagingControl">The messaging control object.</param>
            /// <param name="callback">The filter callback.</param>
            public BroadcastHandler( IMessagingControl messagingControl, ISendToFilterCallback callback ) {
                theMessagingControl = messagingControl;
                theSendToFilterCallback = callback;
            }

            /// <summary>
            /// Called when the handler has been successfully registered with the server.
            ///
            /// A session can register a single handler of each type for a given branch of the topic tree. If there is
            /// already a handler registered for the topic path the operation will fail, <c>registeredHandler</c> will
            /// be closed, and the session error handler will be notified. To change the handler, first close the
            /// previous handler.
            /// </summary>
            /// <param name="topicPath">The path that the handler is active for.</param>
            /// <param name="registeredHandler">Allows the handler to be closed.</param>
            public void OnActive( string topicPath, IRegisteredHandler registeredHandler ) {
            }

            /// <summary>
            /// Called if the handler is closed. This happens if the call to register the handler fails, or the handler
            /// is unregistered.
            /// </summary>
            /// <param name="topicPath">The branch of the topic tree for which the handler was registered.</param>
            public void OnClose( string topicPath ) {
            }

            /// <summary>
            /// Receives content sent from a session via a topic.
            /// </summary>
            /// <param name="sessionId">Identifies the client session that sent the content.</param>
            /// <param name="topicPath">The path of the topic that the content was sent on.</param>
            /// <param name="content">The content sent by the client.</param>
            /// <param name="context">The context associated with the content.</param>
            public void OnMessage( SessionId sessionId, string topicPath, IContent content, IReceiveContext context ) {
                if ( !"Manager".Equals( context.SessionProperties[ "JobTitle" ] ) )
                    return;

                theMessagingControl.SendToFilter( "JobTitle is 'Staff'", topicPath, content,
                    theMessagingControl.CreateSendOptionsBuilder().SetHeaders( context.HeadersList.ToList() ).Build(),
                    theSendToFilterCallback );
            }
        }
    }
}
C
/*
 * This example shows how a message can be sent to another client via
 * a topic endpoint using a filter expression.
 *
 * See msg-listener.c for an example of how to receive these messages
 * in a client.
 */

#include <stdio.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'t', "topic", "Topic name", ARG_REQUIRED, ARG_HAS_VALUE, "echo"},
        {'f', "filter", "Filter", ARG_REQUIRED, ARG_HAS_VALUE, NULL},
        {'d', "data", "Data to send", ARG_REQUIRED, ARG_HAS_VALUE, NULL},
        END_OF_ARG_OPTS
};

/*
 * Callback invoked when/if a message is published on the topic that the
 * client is writing to.
 */
static int
on_send(SESSION_T *session, const SVC_SEND_MSG_TO_FILTER_RESPONSE_T *response, void *context)
{
        printf("on_send() successful. Context=\"%s\".\n", (char *)context);
        printf("Sent message to %d clients\n", response->sent_count);

        if(response->error_reports != NULL && response->error_reports->first != NULL) {
                LIST_NODE_T *node = response->error_reports->first;
                while(node != NULL) {
                        ERROR_REPORT_T *err = (ERROR_REPORT_T *)node->data;
                        printf("Error: %s at line %d, column %d\n", err->message, err->line, err->column);
                        node = node->next;
                }
        }
        else {
                printf("No errors reported\n");
        }

        /*
         * Allow main thread to continue.
         */
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);

        return HANDLER_SUCCESS;
}

int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        char *url = hash_get(options, "url");

        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;

        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        char *topic = hash_get(options, "topic");
        char *filter = hash_get(options, "filter");

        /*
         * Setup for condition variable.
         */
        apr_initialize();
        apr_pool_create(&pool, NULL);
        apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
        apr_thread_cond_create(&cond, pool);

        /*
         * Create a session with Diffusion.
         */
        SESSION_T *session = NULL;
        DIFFUSION_ERROR_T error = { 0 };
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        /*
         * Create a payload.
         */
        char *data = hash_get(options, "data");
        BUF_T *payload = buf_create();
        buf_write_bytes(payload, data, strlen(data));
        CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, payload);
        buf_free(payload);

        /*
         * Build up some headers to send with the message.
         */
        LIST_T *headers = list_create();
        list_append_last(headers, "apple");
        list_append_last(headers, "train");

        /*
         * Parameters for send_msg_to_session() call.
         */
        SEND_MSG_TO_FILTER_PARAMS_T params = {
                .topic_path = topic,
                .filter = filter,
                .content = *content,
                .options.headers = headers,
                .options.priority = CLIENT_SEND_PRIORITY_NORMAL,
                .on_send = on_send,
                .context = "FOO"
        };

        /*
         * Send the message and wait for the callback to acknowledge delivery.
         */
        apr_thread_mutex_lock(mutex);
        send_msg_to_filter(session, params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Close session and clean up.
         */
        session_close(session, NULL);
        session_free(session);

        apr_thread_mutex_destroy(mutex);
        apr_thread_cond_destroy(cond);
        apr_pool_destroy(pool);
        apr_terminate();

        return EXIT_SUCCESS;
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.