Just a second...

Sending request messages to a message path

A client session can send a request message containing typed data to a message path. One or more client sessions can register to handle messages sent to that message path. The handling client session can then send a response message containing typed data. The response message is sent to the requesting client session directly, through the same message path.

A client session on the left. Diffusion in the centre. A control client session that has registered a handler for the message path on the right. An arrow representing the request message goes from the client session through a shape representing the message path inside the Diffusion server and continues to the handling client session. An arrow representing the response message goes from the handling client session back through the message path on the Diffusion server to the requesting client session.
When a request message is sent to a message path and a client session that handlers that message path responds, the following events occur:
  1. A client session sends a request message to a message path.
  2. The control client session receives the request message through a request handler.
  3. The session client session uses sends a response to the request message.
  4. The client session receives the response.

Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.

Sending to a message path

Required permissions: send_to_handler permission for the specified message path

Send the request message specifying the following information:
  • The message path to send the request to and receive the response through
  • The request message
  • The datatype of the request message
  • The datatype of the response message
JavaScript
// Example with json topic type.
const jsonType = diffusion.datatypes.json();
// Create a JSON object to send as a request.
const requestJson = jsonType.from('hello');

try {
    // Send the request to a message path 'foo'.
    const response = await control.messages.sendRequest('foo', requestJson, jsonType);
    console.log(response.get());
} catch (err) {
    console.log('An error occured');
}

.NET
/// <summary>
/// Client implementation that sends request messages to a path and
/// displays the response.
/// </summary>
public sealed class SendingPathRequestMessages {

    public async Task SendingPathRequestMessagesExample(string serverUrl) {

        var session = Diffusion.Sessions.Principal("control").Password( "password" )
            .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
            .Open(serverUrl);

        var messaging = session.Messaging;
        string messagingPath = ">random/requestResponse";

        try {
            string response = await messaging.SendRequestAsync<string, string>(
                messagingPath, "Starting chat..." );
            WriteLine( $"Received response: '{response}'." );

            Thread.Sleep( 1000 );

            response = await messaging.SendRequestAsync<string, string>(
                messagingPath, "Hello!" );
            WriteLine( $"Received response: '{response}'." );
        }
        catch ( Exception e )
        {
            WriteLine( $"Got exception: '{e.Message}'." );
        }

        // Close the session
        session.Close();
    }
}
Java and Android
//Establish client sesssion
final Session session = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080");

//Obtain the Messaging feature
final Messaging messaging = session.feature(Messaging.class);

//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");

//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messaging.sendRequest("foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS)
                    
C
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);

SEND_REQUEST_PARAMS_T params = {
        .path = message_path,
        .request = message_buf,
        .on_response = on_message_response,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING
};
send_request(session, params);

buf_free(message_buf);
Python
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to path '{path}'...")
try:
    response = await session.messaging.send_request_to_path(
        path=path, request=request_type(request)
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print(f"... received response '{response}'")
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}

Responding to request messages sent to a message path

Required permissions: send_to_session permission for the specified message path, register_handler permission, and view_session permission to register to receive session property values with the request message

Define a request handler to receive and respond to request messages that have a specific data type.

JavaScript
const jsonType = diffusion.datatypes.json();
const responseJson = jsonType.from({ "ying": "yang"});
// Define a request handler for json topic type
const handler = {
    onRequest: (request, context, responder) => {
        responder.respond(responseJson, jsonType);
    },
    onError: () => {
        // an error occured
    },
    onClose: () => {
        // the handler is closed
    }
};

.NET
/// <summary>
/// A simple IRequestHandler implementation that prints confirmation of the actions completed.
/// </summary>
internal class SimpleRequestHandler : IRequestHandler<string, string> {
    /// <summary>
    /// Indicates that the request handler was closed.
    /// </summary>
    public void OnClose()
        => WriteLine( "A request handler was closed." );

    /// <summary>
    /// Indicates that the request handler has received error.
    /// </summary>
    public void OnError( ErrorReason errorReason )
        => WriteLine( $"A request handler has received error: '{errorReason}'." );

    /// <summary>
    /// Indicates that a request was received and responds to it.
    /// </summary>
    /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
    public void OnRequest( string request, IRequestContext context, IResponder<string> responder ) {
        WriteLine( $"Received request: '{request}'." );
        responder.Respond( DateTime.UtcNow.ToLongTimeString() );
    }
}
Java and Android
private final class JSONRequestHandler implements MessagingControl.RequestHandler<JSON, JSON> {
    @Override
    public void onClose() {
        ....
    }

    @Override
    public void onError(ErrorReason errorReason) {
        ....
    }

    @Override
    public void onRequest(JSON request, RequestContext context, Responder<JSON> responder) {
        ....
        responder.respond(response);
    }
}
                    
C
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"

static int on_selected_sessions_received(
        int number_sent,
        void *context)
{
        // `number_sent` is the number of sessions selected by the `session_filter`
        return HANDLER_SUCCESS;
}


static int on_message_response(
        DIFFUSION_DATATYPE response_datatype,
        const DIFFUSION_VALUE_T *response,
        void *context)
{
        // read the `response` by converting it to the datatype `response_datatype`
        return HANDLER_SUCCESS;
}


void to_filter(
        SESSION_T *session,
        char *message_path,
        char *message,
        char *session_filter)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_TO_FILTER_PARAMS_T params = {
                .path = message_path,
                .filter = session_filter,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .request = message_buf,
                .on_response = on_message_response,
                .on_number_sent = on_selected_sessions_received
        };

        send_request_to_filter(session, params);
        buf_free(message_buf);
}


void to_session(
        SESSION_T *session,
        char *message_path,
        char *message,
        SESSION_ID_T *session_id)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_TO_SESSION_PARAMS_T params = {
                .recipient_session = session_id,
                .path = message_path,
                .request = message_buf,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .on_response = on_message_response
        };

        send_request_to_session(session, params);
        buf_free(message_buf);
}


static int on_message_request(
        SESSION_T *session,
        const char *request_path,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // read the `request` based on the received `request_datatype`
        char *request_val;
        read_diffusion_string_value(request, &request_val, NULL);
        printf("Received message: %s\n", request_val);
        free(request_val);

        // create the response
        BUF_T *response_buf = buf_create();
        write_diffusion_string_value("This is my response", response_buf);

        // and respond to the request
        diffusion_respond_to_request(session, handle, response_buf, NULL);
        buf_free(response_buf);

        return HANDLER_SUCCESS;
}


void register_request_stream(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_STREAM_T request_stream = {
                .on_request = on_message_request
        };

        set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}


void to_path(
        SESSION_T *session,
        char *message_path,
        char *message)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_PARAMS_T params = {
                .path = message_path,
                .request = message_buf,
                .on_response = on_message_response,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING
        };
        send_request(session, params);

        buf_free(message_buf);
}


static int on_request_handler_active(
        SESSION_T *session,
        const char *path,
        const DIFFUSION_REGISTRATION_T *registered_handler)
{
        // message path `path` is now active for `registered_handler`

        return HANDLER_SUCCESS;
}


static int on_request_received(
        SESSION_T *session,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_REQUEST_CONTEXT_T *request_context,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // handle request received
        // and response to request with
        // `diffusion_respond_to_request(session, handle, response, NULL)`

        return HANDLER_SUCCESS;
}


void register_request_handler(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_HANDLER_T request_handler = {
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .on_active = on_request_handler_active,
                .on_request = on_request_received
        };

        ADD_REQUEST_HANDLER_PARAMS_T params = {
                .path = message_path,
                .request_handler = &request_handler
        };

        add_request_handler(session, params);
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);
        SESSION_T *session =
                session_create(url, principal, credentials, NULL, NULL, NULL);

        SESSION_ID_T *session_id =
                session_id_create_from_string("e9cfbd5be9a72622-f000000300000004");

        char *message_path = "message/path";
        char *message = "hello world";
        char *session_filter = "$Principal is 'control'";

        to_filter(session, message_path, message, session_filter);
        to_session(session, message_path, message, session_id);
        to_path(session, message_path, message);
        register_request_handler(session, message_path);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        session_id_free(session_id);

        return EXIT_SUCCESS;
}
Python
def callback(request: str, **kwargs) -> str:
    return f"Hello there, {request}!"
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}

Register the request handler against a message path. You can only register one request handler against each message path.

JavaScript
const handler = {
    onRequest: (request, context, responder) => {
        // request received
    },
    onError: () => {
        // an error occured
    },
    onClose: () => {
        // the handler is closed
    }
};
session.messages.addRequestHandler('topic', handler);
.NET
/// <summary>
/// Client implementation that registers a handler to listen for messages on a path.
/// </summary>
public sealed class ReceivingPathRequestMessages {

    public async Task ReceivingPathRequestMessagesExample(string serverUrl) {
        var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
            .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
            .Open(serverUrl);

        var messaging = session.Messaging;
        string messagingPath = ">random/requestResponse";

        var requestHandler = new SimpleRequestHandler();
        var requestHandlerRegistration = await messaging.AddRequestHandlerAsync(
            messagingPath, requestHandler );

        try {
            Thread.Sleep( 60000 );//wait for messages...
        } finally {
            // Close session
            await requestHandlerRegistration.CloseAsync();
            session.Close();
        }
    }
}
Java and Android
messagingControl.addRequestHandler(messagePath, JSON.class, JSON.class, new JSONRequestHandler());
                    
C
static int on_request_handler_active(
        SESSION_T *session,
        const char *path,
        const DIFFUSION_REGISTRATION_T *registered_handler)
{
        // message path `path` is now active for `registered_handler`

        return HANDLER_SUCCESS;
}


static int on_request_received(
        SESSION_T *session,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_REQUEST_CONTEXT_T *request_context,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // handle request received
        // and response to request with
        // `diffusion_respond_to_request(session, handle, response, NULL)`

        return HANDLER_SUCCESS;
}


void register_request_handler(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_HANDLER_T request_handler = {
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .on_active = on_request_handler_active,
                .on_request = on_request_received
        };

        ADD_REQUEST_HANDLER_PARAMS_T params = {
                .path = message_path,
                .request_handler = &request_handler
        };

        add_request_handler(session, params);
}
Python
# Register handler to receive the request
handler = RequestHandler(
    callback,
    request_type=request_type,
    response_type=request_type
)
print("Registering request handler...")
try:
    await session.messaging.add_request_handler(path, handler=handler)
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print("... request handler registered")
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}