Just a second...

Sending request messages to a session

A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.

A control client session on the left. Diffusion in the centre. Another client session on the right. An arrow representing the request message goes from the control client session through a shape representing the message path inside the Diffusion server and continues to the other client session. An arrow representing the response message goes from the receiving client session back through the message path on the Diffusion server to the control client session.
When a request message is sent to a specific client session and that session responds, the following events occur:
  1. A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
  2. The client session receives the request message through a request stream.
  3. The client session uses a responder to send a response to the request message.
  4. The control client session receives the response.

Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.

Sending a request to a session

Required permissions: send_to_session permission for the specified message path and register_handler permission

Usually, it is a control client session in your organization's backend that sends messages directly to other sessions.

Send the request message specifying the following information:
  • The session ID of the client session to send the request to
  • The message path to send the request and receive the response through
  • The request message
  • The datatype of the request message
  • The datatype of the response message
JavaScript
control.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that sends a request message to a filter, 
    /// then sends another request directly to the session, and displays the response.
    /// </summary>
    public sealed class SendingSessionRequestMessages {
        private readonly string messagingPath = ">random/requestResponse";

        public SendingSessionRequestMessages(string serverUrl) {
            string serverUrl = args[ 0 ];
            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var messaging = session.Messaging;
            var requestCallback = new RequestCallback();

            // Filter messaging is used to get the session ID for this example
            int requestsSent = await messaging.SendRequestToFilterAsync(
                "$Principal EQ 'client'",
                messagingPath,
                "Hello?",
                requestCallback);

            Thread.Sleep( 1000 );

            // Send message to a session using obtained session ID
            string response = await messaging.SendRequestAsync<string, string>(
                requestCallback.SessionId, messagingPath, "Time");
            WriteLine($"Received response: '{response}'.");
            
            // Close the session
            session.Close();
        }

        /// <summary>
        /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
        /// </summary>
        private class RequestCallback : IFilteredRequestCallback<string> {
            public ISessionId SessionId { get; private set; }

            /// <summary>
            /// Indicates that the stream was closed.
            /// </summary>
            public void OnClose()
                => WriteLine( "A request handler was closed." );

            /// <summary>
            /// Indicates error received by the callback.
            /// </summary>
            public void OnError( ErrorReason errorReason )
                => WriteLine( $"A request handler has received error: '{errorReason}'." );

            /// <summary>
            /// Indicates that a response message was received.
            /// </summary>
            public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId;

            /// <summary>
            /// Indicates that a error response message was received.
            /// </summary>
            public void OnResponseError( ISessionId sessionId, Exception exception )
                => WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
        }
    }
}
Java and Android
//Establish client session and control session
final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080");
final Session client = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080");

//Obtain the Messaging and MessagingControl features
final MessagingControl messagingControl = control.feature("MessagingControl.class");
final Messaging messaging = client.feature(Messaging.class);

//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");

//Create a local request stream for the client to receive direct requests from the control session
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);

//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messagingControl.sendRequest(client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
                    
Python
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to session {session_id}...")
try:
    response = await session.messaging.send_request_to_session(
        path=path, session_id=session_id, request=request_type(request)
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print(f"... received response '{response}'")
Apple
[session.messagingControl sendRequest:[PTDiffusionPrimitive requestWithLongLong:42]
                          toSessionId:sessionId
                                 path:message_path
         int64NumberCompletionHandler:^(NSNumber *response, NSError* error)
{
    if (error) {
        NSLog(@"Failed to send to %@. Error: %@", message_path, error);
    } else {
        NSLog(@"Received response: %@", response);
    }
}];
                    

Responding to messages sent to a session

Required permissions: send_to_message_handler for the specified message path

Define a request stream to receive and respond to request messages that have a specific data type.

JavaScript
var handler = {
    onRequest : function(request, context, responder) {
        ....
        responder.respond(response);
    },
    onError : function(error) {},
    onClose : function() {}
}
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// A simple IRequestStream implementation that prints confirmation of the actions completed.
    /// </summary>
    internal class SimpleRequestStream : IRequestStream<string, string> {
        /// <summary>
        /// Indicates that the request stream was closed.
        /// </summary>
        public void OnClose()
            => WriteLine( "A request handler was closed." );

        /// <summary>
        /// Indicates that the request stream has received error.
        /// </summary>
        public void OnError( ErrorReason errorReason )
            => WriteLine( $"A request handler has received error: {errorReason}." );

        /// <summary>
        /// Indicates that a request was received and responds to it.
        /// </summary>
        /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
        public void OnRequest( string path, string request, IResponder<string> responder ) {
            if ( request == "Hello?" ) {    // message to the filter to obtain the session ID
                responder.Respond( "Yes" );
            } else {
                WriteLine( $"Received request: '{request}'." );
                responder.Respond( DateTime.UtcNow.ToLongTimeString() );
            }
        }
    }
}
Java and Android
private final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> {

    @Override
    public void onClose() {
        ....
    }

    @Override
    public void onError(ErrorReason errorReason) {
        ....
    }

    @Override
    public void onRequest(String path, JSON request, Responder<JSON> responder) {
        ....
    }
}
                    
Python
def callback(request: str, **kwargs) -> str:
    return f"Hello there, {request}!"
Apple
@interface NumberRequestStreamDelegate : NSObject<PTDiffusionNumberRequestStreamDelegate>
@end

@implementation NumberRequestStreamDelegate
- (void)    diffusionStream:(nonnull PTDiffusionStream *)stream
didReceiveRequestWithNumber:(nullable NSNumber *)number
                  responder:(nonnull PTDiffusionResponder *)responder
{
    // Do something when a request is received.
}

- (void)diffusionStream:(nonnull PTDiffusionStream *)stream
       didFailWithError:(nonnull NSError *)error
{
    // Do something if the stream fails.
}

- (void)diffusionDidCloseStream:(nonnull PTDiffusionStream *)stream
{
    // Do something if the stream closes.
}
                    

Add the request stream against a message path. You can only add one request stream for each message path.

JavaScript
control.messages.setRequestStream("foo", diffusion.datatypes.json(), diffusion.datatypes.json(), request_stream);
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that registers a handler to listen for messages on a path.
    /// </summary>
    public sealed class ReceivingSessionRequestMessages {

        public ReceivingSessionRequestMessages(string serverUrl) {

            var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl );
            var messaging = session.Messaging;
            string messagingPath = ">random/requestResponse";

            var requestStream = new SimpleRequestStream();
            messaging.SetRequestStream( messagingPath, requestStream );

            try 
            {
                Thread.Sleep( 60000 );//wait for messages...
            } 
            finally 
            {
                // Close session
                messaging.RemoveRequestStream( messagingPath );
                session.Close();
            }
        }
    }
}
Java and Android
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);
                    
Python
# Register handler to receive the request
handler = RequestHandler(
    callback,
    request_type=request_type,
    response_type=request_type
)
session.messaging.add_stream_handler(path, handler=handler, addressed=True)
Apple
// Ensure to maintain a strong reference to your request stream as it
// is referenced weakly by the Diffusion client library.
NumberRequestStreamDelegate *delegate = [NumberRequestStreamDelegate new];
PTDiffusionRequestStream *requestStream = [PTDiffusionPrimitive int64RequestStreamWithDelegate:delegate];
[session.messaging setRequestStream:requestStream forPath:message_path];