Just a second...

Sending request messages to a session filter

A client session can send a request message containing typed data directly to each client session in the set of connected client sessions that match a specified session properties filter. The receiving client sessions can then send a response message containing typed data. The request and response messages are addressed through the same message path.

Note: Sending messages to a set of client sessions defined by a filter is not intended for high throughput of data. If you have a lot of data to send or want to send data to a lot of client sessions, use the pub-sub capabilities of Diffusion™. Subscribe the set of client sessions to a topic and publish the data as updates through that topic.

For more information about session properties and how to filter connected client sessions using their properties, see Session properties and Session filtering.

A control client session on the left. Diffusion in the centre. A set of client sessions that match the specified filter on the right. An arrow representing the request message goes from the control client session through a shape representing the message path inside the Diffusion server. This arrow splits into many as Diffusion sends the request on to all client sessions specified by the filter. Arrows representing the response messages go from each of the receiving client sessions back to the Diffusion server where they are send on to the control client session.
When a request message is sent to a set of client sessions and those sessions respond, the following events occur:
  1. A control client session sends a request message, specifying the filter that selects the client sessions to receive the request and specifying the message path to send the message through.
  2. The Diffusion server evaluates the query and sends the message on to connected client sessions whose session properties match the filter
  3. The client sessions in the filtered set each receive the request message through a request stream.
  4. Each client session uses a responder to send a response to the request message.
  5. The control client session receives responses from each of the clients sessions specified by the filter.

The request messages and the response messages contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response messages are not required to be the same data type as the request or as the response messages from other client sessions.

Sending a request message to a filter

Required permissions: send_to_session permission for the specified message path and register_handler permission

Usually, it is a control client session in your organization's backend that sends messages to a filter. For more information about defining a session filter, see Session filtering.

Send the request message specifying the following information:
  • The query to use to filter which client sessions to send the requests to
  • The message path to send the request and receive the responses through
  • The request message
  • The datatype of the request message
  • The datatype of the response message
JavaScript
const handler = {
    onResponse : (sessionID, response) => {
        // response received
    },
    onResponseError : (sessionID, error) => {
        // the response from the session resulted in an error
    },
    onError : (error) => {
        // an error occured
    }
};

control.messages.sendRequestToFilter(filter, 'foo', 'Hello clients', handler, diffusion.datatypes.json(), diffusion.datatypes.json());
.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that sends a request message to a filter and 
    /// displays the response.
    /// </summary>
    public sealed class SendingFilterRequestMessages {
        private readonly string messagingPath = ">random/requestResponse";

        public async Task SendingFilterRequestMessagesExample(string serverUrl) {

            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var messaging = session.Messaging;
            var requestCallback = new RequestCallback();

            int requestsSent = await messaging.SendRequestToFilterAsync(
                "$Principal EQ 'client'",
                messagingPath,
                "Time",
                requestCallback );
            WriteLine( $"Sent request to {requestsSent} session(s) matching the filter." );
                                
            // Close the session
            session.Close();
        }

        /// <summary>
        /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
        /// </summary>
        private class RequestCallback : IFilteredRequestCallback<string> {
            /// <summary>
            /// Indicates that the stream was closed.
            /// </summary>
            public void OnClose()
                => WriteLine( "A request handler was closed." );

            /// <summary>
            /// Indicates error received by the callback.
            /// </summary>
            public void OnError( ErrorReason errorReason )
                => WriteLine( $"A request handler has received error: '{errorReason}'." );

            /// <summary>
            /// Indicates that a response message was received.
            /// </summary>
            public void OnResponse( ISessionId sessionId, string response )
                => WriteLine( $"Received response: '{response}'." );

            /// <summary>
            /// Indicates that a error response message was received.
            /// </summary>
            public void OnResponseError( ISessionId sessionId, Exception exception )
                => WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
        }
    }
}
Java and Android
//Establish control sesssion
final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080");

//Obtain the MessagingControl feature
final MessagingControl messagingControl = control.feature(MessagingControl.class);

//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");

//Send the request to a message path "foo", to all sessions which do not have a 'control' principal and wait for (at most) 5 seconds until the response (number of responses) is received.
final int numberOfResponses = messagingControl.sendRequestToFilter("$Principal NE 'control'", "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
                    
C
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);

SEND_REQUEST_TO_FILTER_PARAMS_T params = {
        .path = message_path,
        .filter = session_filter,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .request = message_buf,
        .on_response = on_message_response,
        .on_number_sent = on_selected_sessions_received
};

send_request_to_filter(session, params);
buf_free(message_buf);
Python
# sending the request and receiving the number of expected responses
print(f"Sending request: '{request}' to session filter `{session_filter}`...")
try:
    response = await session.messaging.send_request_to_filter(
        session_filter=session_filter,
        path=path,
        request=request_type(request),
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR while sending request to session filter: {ex}")
else:
    print(f"... expecting {response} response(s) ...")
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}

Responding to messages sent to a filter

Required permissions: send_to_message_handler for the specified message path

To the receiving client session, a request message sent to a filter is the same as a request message sent directly to the session. The receiving client session responds in the same way.

See Responding to messages sent to a session for details.