Sending request messages to a session filter
A client session can send a request message containing typed data directly to each client session in the set of connected client sessions that match a specified session properties filter. The receiving client sessions can then send a response message containing typed data. The request and response messages are addressed through the same message path.
For more information about session properties and how to filter connected client sessions using their properties, see Session properties and Session filtering.
- A control client session sends a request message, specifying the filter that selects the client sessions to receive the request and specifying the message path to send the message through.
- Diffusion Cloud evaluates the query and sends the message on to connected client sessions whose session properties match the filter
- The client sessions in the filtered set each receive the request message through a request stream.
- Each client session uses a responder to send a response to the request message.
- The control client session receives responses from each of the clients sessions specified by the filter.
The request messages and the response messages contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response messages are not required to be the same data type as the request or as the response messages from other client sessions.
Sending a request message to a filter
Required permissions:
permission for the specified message path and permissionUsually, it is a control client session in your organization's backend that sends messages to a filter. For more information about defining a session filter, see Session filtering.
- The query to use to filter which client sessions to send the requests to
- The message path to send the request and receive the responses through
- The request message
- The datatype of the request message
- The datatype of the response message
var handler = { onResponse : function(sessionID, response) {}, onResponseError : function(sessionID, error) {}, onError : function(error) {} } control.messages.sendRequestToFilter(filter, 'foo', 'Hello clients', handler, diffusion.datatypes.json(), diffusion.datatypes.json());
//Establish control sesssion final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080"); //Obtain the MessagingControl feature final MessagingControl messagingControl = control.feature(MessagingControl.class); //Create a JSON object to send as a request final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\""); //Send the request to a message path "foo", to all sessions which do not have a 'control' principal and wait for (at most) 5 seconds until the response (number of responses) is received. final int numberOfResponses = messagingControl.sendRequestToFilter("$Principal NE 'control'", "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
Responding to messages sent to a filter
Required permissions:
for the specified message pathTo the receiving client session, a request message sent to a filter is the same as a request message sent directly to the session. The receiving client session responds in the same way.
See Responding to messages sent to a session for details.
One-way messaging
Diffusion Cloud also provides a capability to send one-way messages to a filter. This message is not typed. The specified sessions cannot respond directly to this message.
- The query to use to filter which client sessions to send the message to
- The message path to send the messages through
- The message content
- Any additional options, such as headers or a message priority
session.messages.send('message_path', filter_query);
[session.messagingControl sendToFilter:filter path:message_path message:[[PTDiffusionBytes alloc] initWithData:data] completionHandler:^(NSUInteger count, NSError* error) { if (error) { NSLog(@"Failed to send to %@. Error: %@", message_path, error); } else { NSLog(@"Sent to %lu sessions", (unsigned long)count); } }];
var options = session.MessagingControl.CreateSendOptionsBuilder().SetHeaders( headers ).Build(); session.MessagingControl.SendToFilter( filter_query, message_path, content, options, send_callback );
/* * Parameters for send_msg_to_session() call. */ SEND_MSG_TO_FILTER_PARAMS_T params = { .topic_path = message_path, .filter = filter, .content = message_content, .options.headers = headers, .options.priority = CLIENT_SEND_PRIORITY_NORMAL, .on_send = on_send_callback, .context = context }; /* * Send the message and wait for the callback to acknowledge delivery. */ send_msg_to_filter(session, params);
To the receiving client session, a one-way message sent to a filter is the same as a one-way message sent directly to that session. The receiving client session receives the message in the same way.
To receive a message sent directly to a client session, that client session must add a message stream to receive messages sent through that message path:
// Create with a default listener function session.messages.listen('message_path', function(message) { // Do something with the message });
@interface MessageStreamDelegate : NSObject <PTDiffusionMessageStreamDelegate> @end @implementation MessageStreamDelegate -(void) diffusionStream:(PTDiffusionStream *)stream didReceiveMessageOnTopicPath:(NSString *)path content:(PTDiffusionContent *)content context:(PTDiffusionReceiveContext *)context { // Do something when a message is received. } -(void)diffusionDidCloseStream:(PTDiffusionStream *)stream { // Do something if the stream closes. } -(void)diffusionStream:(PTDiffusionStream *)stream didFailWithError:(NSError *)error { // Do something if the stream fails. } // ... in later code // Ensure to maintain a strong reference to your message stream delegate as it // is referenced weakly by the Diffusion client library. MessageStreamDelegate *const delegate = [MessageStreamDelegate new]; // Create a locally evaluated topic selector specifying the messaging paths that // should be captured by the stream. PTDiffusionTopicSelector *const topicSelector = [PTDiffusionTopicSelector topicSelectorWithExpression:message_path]; // Use the Messaging feature to add a local stream using your delegate against // the topic selector. [session.messaging addMessageStreamWithSelector:topicSelector delegate:delegate];
session.feature(Messaging.class).addMessageStream(message_path, stream);
session.Messaging.AddMessageStream( message_path, stream );
/* * Register a listener for messages on the given path. */ MSG_LISTENER_REGISTRATION_PARAMS_T listener_params = { .topic_path = message_path, .listener = on_stream_message, .context = context }; register_msg_listener(session, listener_params);
You can also add a fallback message stream to receive messages sent through any message path that does not have a stream add against it:
// Ensure to maintain a strong reference to your message stream delegate as it // is referenced weakly by the Diffusion client library. MessageStreamDelegate *const delegate = [MessageStreamDelegate new]; // Use the Messaging feature to add a local fallback stream using your delegate. [session.messaging addFallbackMessageStreamWithDelegate:delegate];
messaging.addFallbackMessageStream(message_stream);
session.Messaging.AddFallbackMessageStream( stream );
/* * Register a listener for any other messages. * (.topic_path is NULL). */ MSG_LISTENER_REGISTRATION_PARAMS_T global_listener_params = { .listener = on_stream_message, .context = context }; register_msg_listener(session, global_listener_params);
To respond to this message, the receiving client session sends a separate message to the message path through which the received message was sent. For more information, see One-way messaging to a path. However, if multiple client sessions have added messages handlers on this message path, the one-way message sent in response is not guaranteed to be received by the client session that sent the original one-way message.
This page last modified: 2019/11/22