Just a second...

Example: Handle messages and send messages to sessions

The following examples use the MessagingControl feature in the Unified API to handle messages sent to topic paths and to send messages to one or more clients.

JavaScript
var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {
    // 1. Messages can be sent & received between sessions.
    
    // Create a stream of received messages for a specific path
    session.messages.listen('foo').on('message', function(msg) {
        console.log('Received message: ' + msg.content);
    });

    // Send a message to another session. It is the application's responsibility to find the SessionID of the intended
    // recipient.
    session.messages.send('foo', 'Hello world', 'another-session-id');

    // 2. Messages can also be sent without a recipient, in which case they will be dispatched to any Message Handlers
    // that have been registered for the same path. If multiple handlers are registered to the same path, any given
    // message will only be dispatched to one handler.
    
    // Register the handler to receive messages at or below the given path. 
    session.messages.addHandler('foo', {
        onActive : function() {
            console.log('Handler registered');
        },
        onClose : function() {
            console.log('Handler closed');
        },
        onMessage : function(msg) {
            console.log('Received message:' + msg.content + ' from Session: ' + msg.session);
            if (msg.properties) {
                console.log('with properties:', msg.properties);
            }
        }
    }).then(function() {
        console.log('Registered handler');
    }, function(e) {
        console.log('Failed to register handler: ', e);
    });

    // Send a message at a lower path, without an explicit recipient - this will be received by the Handler.
    session.messages.send('foo/bar', 'Another message');
});
Java and Android
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.content.Content;
import com.pushtechnology.diffusion.client.features.Messaging;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl.MessageHandler;
import com.pushtechnology.diffusion.client.features.control.topics.MessagingControl.SendCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionId;
import com.pushtechnology.diffusion.client.types.ReceiveContext;
import com.pushtechnology.diffusion.datatype.json.JSON;

/**
 * This is an example of a control client using the 'MessagingControl' feature
 * to receive messages from clients and also send messages to clients.
 * <P>
 * It is a trivial example that simply responds to all messages on a particular
 * branch of the topic tree by echoing them back to the client exactly as they
 * are complete with headers.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public class ControlClientReceivingMessages {

    private final Session echoingSession;
    private final Session sendingSession;
    private final MessagingControl echoingSessionMessagingControl;
    private final MessagingControl sendingSessionMessagingControl;
    private final SendCallback sendCallback;

    private static final Logger LOG =
        LoggerFactory.getLogger(ControlClientReceivingMessages.class);

    /**
     * Constructor.
     *
     * @param callback for result of sends
     */
    public ControlClientReceivingMessages(SendCallback callback) {

        sendCallback = callback;

        echoingSession =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");

        sendingSession =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");

        echoingSessionMessagingControl = echoingSession.feature(MessagingControl.class);
        sendingSessionMessagingControl = sendingSession.feature(MessagingControl.class);

        // Register to receive all messages sent by clients on the "foo" branch
        // To do this, the client session must have the 'register_handler' permission.
        echoingSessionMessagingControl.addMessageHandler("foo", new EchoHandler());
    }

    /**
     * Close the session.
     */
    public void close() {
        echoingSession.close();
        sendingSession.close();
    }

    /**
     * Handler that echoes messages back to the originating client complete with
     * original headers.
     */
    private class EchoHandler extends MessageHandler.Default {
        @Override
        public void onMessage(
            SessionId sessionId,
            String topicPath,
            Content content,
            ReceiveContext context) {

            try {
                final JSONObject jsonObject = new JSONObject(content.asString());
                final String value = (String) jsonObject.get("hello");
                LOG.info("JSON content with key: 'hello' and value: '{}'", value);
            }
            catch (JSONException e) {
                //Non-JSON message so just carry on and echo the message
            }

            // To send a message to a client, this client session must have
            // the 'view_session' and 'send_to_session' permissions.
            echoingSessionMessagingControl.send(
                sessionId,
                topicPath,
                content,
                echoingSessionMessagingControl.sendOptionsBuilder()
                    .headers(context.getHeaderList())
                    .build(),
                sendCallback);

        }
    }

    /**
     * Add a message stream to observe echoed messages.
     *
     * @param stream stream to be added
     */
    public void addSendingSessionMessageStream(Messaging.MessageStream stream) {
        sendingSession.feature(Messaging.class).addMessageStream("foo", stream);
    }

    /**
     * Sends messages "hello:world" and "{"hello":"world"}".
     */
    public void sendHelloWorld() {
        final Content helloWorldContent = Diffusion.content().newContent("hello:world");
        final JSON helloWorldJson = Diffusion.dataTypes().json().fromJsonString("{\"hello\":\"world\"}");

        //To do this, the client session must have the 'view_session' and 'send_to_session' permissions.
        sendingSessionMessagingControl.send(echoingSession.getSessionId(), "foo", helloWorldContent, sendCallback);
        sendingSessionMessagingControl.send(echoingSession.getSessionId(), "foo", helloWorldJson, sendCallback);
    }

}
.NET
using System.Linq;
using PushTechnology.ClientInterface.Client.Content;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Types;

namespace Examples {
    /// <summary>
    /// This is an example of a control client using the <see cref="IMessagingControl"/> feature to receive messages
    /// from clients and also send messages to clients.
    ///
    /// It is a trivial example that simply responds to all messages on a particular branch of the topic tree by
    /// echoing them back to the client exactly as they are, complete with headers.
    /// </summary>
    public class ControlClientReceivingMessages {
        private readonly ISession session;

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="callback">The callback to receive the result of message sending.</param>
        public ControlClientReceivingMessages( ISendCallback callback ) {
            session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .Open( "ws://diffusion.example.com:80" );

            var messagingControl = session.GetMessagingControlFeature();

            // Register to receive all messages sent by clients on the "foo" branch.
            // To do this, the client session must have the REGISTER_HANDLER permission.
            messagingControl.AddMessageHandler( "foo", new EchoHandler( messagingControl, callback ) );
        }

        /// <summary>
        /// Close the session.
        /// </summary>
        public void Close() {
            session.Close();
        }

        private class EchoHandler : MessageHandlerDefault {
            private readonly IMessagingControl theMessagingControl;
            private readonly ISendCallback theSendCallback;

            public EchoHandler( IMessagingControl messagingControl, ISendCallback sendCallback ) {
                theMessagingControl = messagingControl;
                theSendCallback = sendCallback;
            }

            /// <summary>
            /// Receives content sent from a session via a topic.
            /// </summary>
            /// <param name="sessionId">Identifies the client session that sent the content.</param>
            /// <param name="topicPath">The path of the topic that the content was sent on.</param>
            /// <param name="content">The content sent by the client.</param>
            /// <param name="context">The context associated with the content.</param>
            public override void OnMessage( SessionId sessionId, string topicPath, IContent content,
                IReceiveContext context ) {
                theMessagingControl.Send( sessionId, topicPath, content,
                    theMessagingControl.CreateSendOptionsBuilder().SetHeaders( context.HeadersList.ToList() ).Build(),
                    theSendCallback );
            }
        }

        private class MessageHandlerDefault : TopicTreeHandlerDefault, IMessageHandler {
            /// <summary>
            /// Receives content sent from a session via a topic.
            /// </summary>
            /// <param name="sessionId">Identifies the client session that sent the content.</param>
            /// <param name="topicPath">The path of the topic that the content was sent on.</param>
            /// <param name="content">The content sent by the client.</param>
            /// <param name="context">The context associated with the content.</param>
            public virtual void OnMessage( SessionId sessionId, string topicPath, IContent content,
                IReceiveContext context ) {
            }
        }
    }
}
C – Receive
/*
 * This example shows how to receive messages, rather than topic
 * updates, as part of MessagingControl.
 *
 * You may register a handler against an endpoint, which will
 * become the only destination for messages to that endpoint (where
 * the control client which is considered "active" is determined by
 * the server).
 *
 * See send-msg.c for an example of how to send messages to an
 * endpoint from a client.
 */

#include <stdio.h>
#include <unistd.h>

#include "diffusion.h"
#include "conversation.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'t', "topic", "Topic name", ARG_REQUIRED, ARG_HAS_VALUE, "echo"},
        END_OF_ARG_OPTS
};

/*
 * Function to be called when the message receiver has been registered.
 */
int
on_registered(SESSION_T *session, void *context)
{
        printf("on_registered()\n");
        return HANDLER_SUCCESS;
}

/*
 * Function called on receipt of a message from a client.
 *
 * We print the following information:
 *   1. The topic path on which the message was received.
 *   2. A hexdump of the message content.
 *   3. The headers associated with the message.
 *   4. The session properties that were requested when the handler was
 *      registered.
 *   5. The user context, as a string.
 */
int
on_msg(SESSION_T *session, const SVC_SEND_RECEIVER_CLIENT_REQUEST_T *request, void *context)
{
        printf("Received message on topic path %s\n", request->topic_path);
        hexdump_buf(request->content->data);
        printf("Headers:\n");
        if(request->send_options.headers->first == NULL) {
                printf("  No headers\n");
        }
        else {
                for(LIST_NODE_T *node = request->send_options.headers->first;
                    node != NULL;
                    node = node->next) {
                        printf("  Header: %s\n", (char *)node->data);
                }
        }

        printf("Session properties:\n");
        char **keys = hash_keys(request->session_properties);
        if(keys == NULL || *keys == NULL) {
                printf("  No properties\n");
        }
        else {
                for(char **k = keys; *k != NULL; k++) {
                        char *v = hash_get(request->session_properties, *k);
                        printf("  %s=%s\n", *k, v);
                }
        }
        free(keys);

        if(context != NULL) {
                printf("Context: %s\n", (char *)context);
        }

        return HANDLER_SUCCESS;
}

int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }
        char *topic = hash_get(options, "topic");

        /*
         * Create a session with Diffusion.
         */
        SESSION_T *session = NULL;
        DIFFUSION_ERROR_T error = { 0 };
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        char *session_id = session_id_to_string(session->id);
        printf("Session created, id=%s\n", session_id);
        free(session_id);

        /*
         * Register a message handler, and for each message ask for
         * the $Principal property to be provided.
         */
        LIST_T *requested_properties = list_create();
        list_append_last(requested_properties, "$Principal");

        MSG_RECEIVER_REGISTRATION_PARAMS_T params = {
                .on_registered = on_registered,
                .topic_path = topic,
                .on_message = on_msg,
                .session_properties = requested_properties
        };
        list_free(requested_properties, free);

        register_msg_handler(session, params);

        /*
         * Accept messages for a while, then deregister.
         */
        sleep(30);

        deregister_msg_handler(session, params);

        /*
         * Close session and clean up.
         */
        session_close(session, NULL);
        session_free(session);

        list_free(requested_properties, NULL);

        return EXIT_SUCCESS;
}
C – Send
/*
 * This example shows how a message can be sent to another client via
 * a topic endpoint. The session ID of the target client must be
 * known.
 *
 * See msg-listener.c for an example of how to receive these messages
 * in a client.
 */

#include <stdio.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'t', "topic", "Topic name", ARG_REQUIRED, ARG_HAS_VALUE, "echo"},
        {'s', "session_id", "Session id", ARG_REQUIRED, ARG_HAS_VALUE, NULL},
        {'d', "data", "Data to send", ARG_REQUIRED, ARG_HAS_VALUE, NULL},
        END_OF_ARG_OPTS
};

/*
 * Callback invoked when/if a message is published on the topic that the
 * client is writing to.
 */
static int
on_send(SESSION_T *session, void *context)
{
        printf("on_send() successful. Context=\"%s\".\n", (char *)context);

        /*
         * Allow main thread to continue.
         */
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);

        return HANDLER_SUCCESS;
}

int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        char *url = hash_get(options, "url");

        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;

        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        char *topic = hash_get(options, "topic");
        char *session_id_str = hash_get(options, "session_id");

        /*
         * Setup for condition variable.
         */
        apr_initialize();
        apr_pool_create(&pool, NULL);
        apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
        apr_thread_cond_create(&cond, pool);

        /*
         * Create a session with Diffusion.
         */
        SESSION_T *session = NULL;
        DIFFUSION_ERROR_T error = { 0 };
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        /*
         * Create a payload.
         */
        char *data = hash_get(options, "data");
        BUF_T *payload = buf_create();
        buf_write_bytes(payload, data, strlen(data));
        CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, payload);
        buf_free(payload);

        /*
         * Build up some headers to send with the message.
         */
        LIST_T *headers = list_create();
        list_append_last(headers, "apple");
        list_append_last(headers, "train");

        /*
         * Parameters for send_msg_to_session() call.
         */
        SESSION_ID_T *session_id = session_id_create_from_string(session_id_str);

        SEND_MSG_TO_SESSION_PARAMS_T params = {
                .topic_path = topic,
                .session_id = *session_id,
                .content = *content,
                .options.headers = headers,
                .options.priority = CLIENT_SEND_PRIORITY_NORMAL,
                .on_send = on_send,
                .context = "FOO"
        };

        /*
         * Send the message and wait for the callback to acknowledge
         * delivery.
         */
        apr_thread_mutex_lock(mutex);
        send_msg_to_session(session, params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Politely close the client connection and clean up.
         */
        session_id_free(session_id);
        session_close(session, NULL);
        session_free(session);

        apr_thread_mutex_destroy(mutex);
        apr_thread_cond_destroy(cond);
        apr_pool_destroy(pool);
        apr_terminate();

        return EXIT_SUCCESS;
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.