Example: Send a message to a topic path
The following examples use the Unified API to send a message to a topic path. The message is received by a handler that has registered to receive messages on that topic path.
JavaScript
var diffusion = require('diffusion'); // Connect to the server. Change these options to suit your own environment. // Node.js will not accept self-signed certificates by default. If you have // one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0 // before running this example. diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Messages can be sent & received between sessions. // Create a stream of received messages for a specific path session.messages.listen('foo').on('message', function(msg) { console.log('Received message: ' + msg.content); }); // Send a message to another session. It is the application's responsibility to find the SessionID of the intended // recipient. session.messages.send('foo', 'Hello world', 'another-session-id'); // 2. Messages can also be sent without a recipient, in which case they will be dispatched to any Message Handlers // that have been registered for the same path. If multiple handlers are registered to the same path, any given // message will only be dispatched to one handler. // Register the handler to receive messages at or below the given path. session.messages.addHandler('foo', { onActive : function() { console.log('Handler registered'); }, onClose : function() { console.log('Handler closed'); }, onMessage : function(msg) { console.log('Received message:' + msg.content + ' from Session: ' + msg.session); if (msg.properties) { console.log('with properties:', msg.properties); } } }).then(function() { console.log('Registered handler'); }, function(e) { console.log('Failed to register handler: ', e); }); // Send a message at a lower path, without an explicit recipient - this will be received by the Handler. session.messages.send('foo/bar', 'Another message'); });
Apple
@import Diffusion; @implementation MessagingSendExample { PTDiffusionSession* _session; NSUInteger _nextValue; } -(void)startWithURL:(NSURL*)url { NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Create a timer to send a message once a second. NSTimer *const timer = [NSTimer timerWithTimeInterval:1.0 target:self selector:@selector(sendMessage:) userInfo:session repeats:YES]; [[NSRunLoop currentRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode]; }]; } -(void)sendMessage:(NSTimer *const)timer { PTDiffusionSession *const session = timer.userInfo; const NSUInteger value = _nextValue++; NSData *const data = [[NSString stringWithFormat:@"%lu", (long)value] dataUsingEncoding:NSUTF8StringEncoding]; PTDiffusionContent *const content = [[PTDiffusionContent alloc] initWithData:data]; NSLog(@"Sending %lu...", (long)value); [session.messaging sendWithTopicPath:@"foo/bar" value:content options:[PTDiffusionSendOptions new] completionHandler:^(NSError *const error) { if (error) { NSLog(@"Failed to send. Error: %@", error); } else { NSLog(@"Sent"); } }]; } @end
Java
and Android
package com.pushtechnology.diffusion.examples; import java.util.List; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.Messaging; import com.pushtechnology.diffusion.client.features.Messaging.SendCallback; import com.pushtechnology.diffusion.client.features.Messaging.SendContextCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.datatype.json.JSON; /** * This is a simple example of a client that uses the 'Messaging' feature to * send messages to a topic path. * <P> * To send a message on a topic path, the client session requires the * 'send_to_message_handler' permission. * * @author Push Technology Limited * @since 5.0 */ public final class ClientSendingMessages { private final Session session; private final Messaging messaging; /** * Constructs a message sending application. */ public ClientSendingMessages() { session = Diffusion.sessions().principal("client").password("password") .open("ws://diffusion.example.com:80"); messaging = session.feature(Messaging.class); } /** * Sends a simple string message to a specified topic path. * <P> * There will be no context with the message so callback will be directed to * the no context callback. * * @param topicPath the topic path * @param message the message to send * @param callback notifies message sent */ public void send(String topicPath, String message, SendCallback callback) { messaging.send(topicPath, message, callback); } /** * Sends a JSON object to a specified topic path. * <P> * * @param topicPath the topic path * @param message the JSON object to send * @param callback notifies message sent */ public void send(String topicPath, JSON message, SendCallback callback) { messaging.send(topicPath, message, callback); } /** * Sends a simple string message to a specified topic path with context string. * <P> * Callback will be directed to the contextual callback with the string * provided. * * @param topicPath the topic path * @param message the message to send * @param context the context string to return with the callback * @param callback notifies message sent */ public void send( String topicPath, String message, String context, SendContextCallback<String> callback) { messaging.send(topicPath, message, context, callback); } /** * Sends a string message to a specified topic path with headers. * <P> * There will be no context with the message so callback will be directed to * the no context callback. * * @param topicPath the topic path * @param message the message to send * @param headers the headers to send with the message * @param callback notifies message sent */ public void sendWithHeaders( String topicPath, String message, List<String> headers, SendCallback callback) { messaging.send( topicPath, Diffusion.content().newContent(message), messaging.sendOptionsBuilder().headers(headers).build(), callback); } /** * Close the session. */ public void close() { session.close(); } }
.NET
using System.Collections.Generic; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Session; namespace Examples { /// <summary> /// This is a simple example of a client that uses the 'Messaging' feature to send messages on a topic path. /// /// To send messages on a topic path, the client session requires the /// <see cref="TopicPermission.SEND_TO_MESSAGE_HANDLER"/> permission. /// </summary> public class ClientSendingMessages { private readonly ISession session; private readonly IMessaging messaging; /// <summary> /// Constructs a message sending application. /// </summary> public ClientSendingMessages() { session = Diffusion.Sessions.Principal( "client" ).Password( "password" ) .Open( "ws://diffusion.example.com:80" ); messaging = session.GetMessagingFeature(); } /// <summary> /// Sends a simple string message to a specified topic path. /// /// There will be no context with the message so callback will be directed to the 'no context' callback. /// </summary> /// <param name="topicPath">The topic path.</param> /// <param name="message">The message to send.</param> /// <param name="callback">Notifies that the message was sent.</param> public void Send( string topicPath, string message, ISendCallback callback ) { messaging.Send( topicPath, Diffusion.Content.NewContent( message ), callback ); } /// <summary> /// Sends a simple string message to a specified topic path with context string. /// /// The callback will be directed to the contextual callback with the string provided. /// </summary> /// <param name="topicPath"></param> /// <param name="message"></param> /// <param name="context"></param> /// <param name="callback"></param> public void Send( string topicPath, string message, string context, ISendContextCallback<string> callback ) { messaging.Send( topicPath, Diffusion.Content.NewContent( message ), context, callback ); } /// <summary> /// Sends a string message to a specified topic with headers. /// /// There will be no context with the message so callback will be directed to the 'no context' callback. /// </summary> /// <param name="topicPath">The topic path.</param> /// <param name="message">The message to send.</param> /// <param name="headers">The headers to send with the message.</param> /// <param name="callback">Notifies that the message was sent.</param> public void SendWithHeaders( string topicPath, string message, List<string> headers, ISendCallback callback ) { messaging.Send( topicPath, Diffusion.Content.NewContent( message ), messaging.CreateSendOptionsBuilder().SetHeaders( headers ).Build(), callback ); } /// <summary> /// Close the session. /// </summary> public void Close() { session.Close(); } } }
C
/* * This example shows how a message can be sent from a client to a * message handler via a topic path. * * See msg-handler.c for an example of how to receive these messages * in a control client. */ #include <stdio.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" apr_pool_t *pool = NULL; apr_thread_mutex_t *mutex = NULL; apr_thread_cond_t *cond = NULL; ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'t', "topic", "Topic name", ARG_REQUIRED, ARG_HAS_VALUE, "echo"}, {'d', "data", "Data to send", ARG_REQUIRED, ARG_HAS_VALUE, NULL}, END_OF_ARG_OPTS }; /* * Callback invoked when/if a message is sent on the topic that the * client is writing to. */ static int on_send(SESSION_T *session, void *context) { printf("on_send() successful. Context=\"%s\".\n", (char *)context); /* * Allow main thread to continue. */ apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } int main(int argc, char **argv) { /* * Standard command-line parsing. */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } char *topic = hash_get(options, "topic"); /* * Setup for condition variable. */ apr_initialize(); apr_pool_create(&pool, NULL); apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool); apr_thread_cond_create(&cond, pool); /* * Create a session with Diffusion. */ SESSION_T *session = NULL; DIFFUSION_ERROR_T error = { 0 }; session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { fprintf(stderr, "TEST: Failed to create session\n"); fprintf(stderr, "ERR : %s\n", error.message); return EXIT_FAILURE; } /* * Create a payload. */ char *data = hash_get(options, "data"); BUF_T *payload = buf_create(); buf_write_bytes(payload, data, strlen(data)); /* * Build up a list of some headers to send with the message. */ LIST_T *headers = list_create(); list_append_last(headers, "apple"); list_append_last(headers, "train"); /* * Parameters for send_msg() call. */ SEND_MSG_PARAMS_T params = { .topic_path = topic, .payload = *payload, .headers = headers, .priority = CLIENT_SEND_PRIORITY_NORMAL, .on_send = on_send, .context = "FOO" }; /* * Send the message and wait for the callback to acknowledge * delivery. */ apr_thread_mutex_lock(mutex); send_msg(session, params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Politely close the client connection and tidy up. */ session_close(session, NULL); session_free(session); apr_thread_mutex_destroy(mutex); apr_thread_cond_destroy(cond); apr_pool_destroy(pool); apr_terminate(); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2015/02/10