Just a second...

Sending request messages to a session

A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.

A control client session on the left. Diffusion in the centre. Another client session on the right. An arrow representing the request message goes from the control client session through a shape representing the message path inside the Diffusion server and continues to the other client session. An arrow representing the response message goes from the receiving client session back through the message path on the Diffusion server to the control client session.
When a request message is sent to a specific client session and that session responds, the following events occur:
  1. A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
  2. The client session receives the request message through a request stream.
  3. The client session uses a responder to send a response to the request message.
  4. The control client session receives the response.

Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.

Sending a request to a session

Required permissions: send_to_session permission for the specified message path and register_handler permission

Usually, it is a control client session in your organization's backend that sends messages directly to other sessions.

Send the request message specifying the following information:
  • The session ID of the client session to send the request to
  • The message path to send the request and receive the response through
  • The request message
  • The datatype of the request message
  • The datatype of the response message
JavaScript
session.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that sends a request message to a filter, 
    /// then sends another request directly to the session, and displays the response.
    /// </summary>
    public sealed class SendingSessionRequestMessages {
        private readonly string messagingPath = ">random/requestResponse";

        public async Task SendingSessionRequestMessagesExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var messaging = session.Messaging;
            var requestCallback = new RequestCallback();

            // Filter messaging is used to get the session ID for this example
            int requestsSent = await messaging.SendRequestToFilterAsync(
                "$Principal EQ 'client'",
                messagingPath,
                "Hello?",
                requestCallback);

            Thread.Sleep( 1000 );

            // Send message to a session using obtained session ID
            string response = await messaging.SendRequestAsync<string, string>(
                requestCallback.SessionId, messagingPath, "Time");
            WriteLine($"Received response: '{response}'.");
            
            // Close the session
            session.Close();
        }

        /// <summary>
        /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
        /// </summary>
        private class RequestCallback : IFilteredRequestCallback<string> {
            public ISessionId SessionId { get; private set; }

            /// <summary>
            /// Indicates that the stream was closed.
            /// </summary>
            public void OnClose()
                => WriteLine( "A request handler was closed." );

            /// <summary>
            /// Indicates error received by the callback.
            /// </summary>
            public void OnError( ErrorReason errorReason )
                => WriteLine( $"A request handler has received error: '{errorReason}'." );

            /// <summary>
            /// Indicates that a response message was received.
            /// </summary>
            public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId;

            /// <summary>
            /// Indicates that a error response message was received.
            /// </summary>
            public void OnResponseError( ISessionId sessionId, Exception exception )
                => WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
        }
    }
}
Java and Android
//Establish client session and control session
final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080");
final Session client = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080");

//Obtain the Messaging and MessagingControl features
final MessagingControl messagingControl = control.feature("MessagingControl.class");
final Messaging messaging = client.feature(Messaging.class);

//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");

//Create a local request stream for the client to receive direct requests from the control session
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);

//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messagingControl.sendRequest(client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
                    
C
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);

SEND_REQUEST_TO_SESSION_PARAMS_T params = {
        .recipient_session = session_id,
        .path = message_path,
        .request = message_buf,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .on_response = on_message_response
};

send_request_to_session(session, params);
buf_free(message_buf);
Python
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to session {session_id}...")
try:
    response = await session.messaging.send_request_to_session(
        path=path, session_id=session_id, request=request_type(request)
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print(f"... received response '{response}'")
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}

Responding to messages sent to a session

Required permissions: send_to_message_handler for the specified message path

Define a request stream to receive and respond to request messages that have a specific data type.

JavaScript
const handler = {
    onRequest : function(request, context, responder) {
        /// ...
        responder.respond(response);
    },
    onError : (error) => {
        // an error occured
    },
    onClose : () => {
        // the handler is closed
    }
}
.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// A simple IRequestStream implementation that prints confirmation of the actions completed.
    /// </summary>
    internal class SimpleSessionMessagesRequestStream : IRequestStream<string, string> {
        /// <summary>
        /// Indicates that the request stream was closed.
        /// </summary>
        public void OnClose()
            => WriteLine( "A request handler was closed." );

        /// <summary>
        /// Indicates that the request stream has received error.
        /// </summary>
        public void OnError( ErrorReason errorReason )
            => WriteLine( $"A request handler has received error: {errorReason}." );

        /// <summary>
        /// Indicates that a request was received and responds to it.
        /// </summary>
        /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
        public void OnRequest( string path, string request, IResponder<string> responder ) {
            if ( request == "Hello?" ) {    // message to the filter to obtain the session ID
                responder.Respond( "Yes" );
            } else {
                WriteLine( $"Received request: '{request}'." );
                responder.Respond( DateTime.UtcNow.ToLongTimeString() );
            }
        }
    }
}
Java and Android
private final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> {

    @Override
    public void onClose() {
        ....
    }

    @Override
    public void onError(ErrorReason errorReason) {
        ....
    }

    @Override
    public void onRequest(String path, JSON request, Responder<JSON> responder) {
        ....
    }
}
                    
C
static int on_message_response(
        DIFFUSION_DATATYPE response_datatype,
        const DIFFUSION_VALUE_T *response,
        void *context)
{
        // read the `response` by converting it to the datatype `response_datatype`
        return HANDLER_SUCCESS;
}
Python
def callback(request: str, **kwargs) -> str:
    return f"Hello there, {request}!"
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}

Add the request stream against a message path. You can only add one request stream for each message path.

JavaScript
session.messages.setRequestStream("foo", handler, diffusion.datatypes.json(), diffusion.datatypes.json(), );
.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Factories;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that registers a handler to listen for messages on a path.
    /// </summary>
    public sealed class ReceivingSessionRequestMessages {

        public void ReceivingSessionRequestMessagesExample(string serverUrl) {

            var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl );
            var messaging = session.Messaging;
            string messagingPath = ">random/requestResponse";

            var requestStream = new SimpleSessionMessagesRequestStream();
            messaging.SetRequestStream( messagingPath, requestStream );

            try 
            {
                Thread.Sleep( 60000 );//wait for messages...
            } 
            finally 
            {
                // Close session
                messaging.RemoveRequestStream( messagingPath );
                session.Close();
            }
        }
    }
}
Java and Android
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);
                    
C
static int on_message_request(
        SESSION_T *session,
        const char *request_path,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // read the `request` based on the received `request_datatype`
        char *request_val;
        read_diffusion_string_value(request, &request_val, NULL);
        printf("Received message: %s\n", request_val);
        free(request_val);

        // create the response
        BUF_T *response_buf = buf_create();
        write_diffusion_string_value("This is my response", response_buf);

        // and respond to the request
        diffusion_respond_to_request(session, handle, response_buf, NULL);
        buf_free(response_buf);

        return HANDLER_SUCCESS;
}


void register_request_stream(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_STREAM_T request_stream = {
                .on_request = on_message_request
        };

        set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
Python
# Register handler to receive the request
handler = RequestHandler(
    callback,
    request_type=request_type,
    response_type=request_type
)
session.messaging.add_stream_handler(path, handler=handler, addressed=True)
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class Messaging {

    func to_session(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request,
                               to: session_id,
                               path: message_path,
                               jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        })
    }


    class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
        func diffusionStream(_ stream: PTDiffusionStream,
                             didReceiveRequestWith json: PTDiffusionJSON,
                             responder: PTDiffusionResponder) {
            print("Received request %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
            print("Request stream failed with error %@", error.localizedDescription)
        }

        func diffusionDidClose(_ stream: PTDiffusionStream) {
            print("Request stream is now closed")
        }
    }


    func session_register_handler(session: PTDiffusionSession, message_path: String) {
        let stream_delegate = JSONRequestStreamDelegate()
        let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
        session.messaging.setRequestStream(request_stream, forPath: message_path)
    }


    func to_path(session: PTDiffusionSession, session_id: PTDiffusionSessionId, message_path: String) {
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        session.messaging.send(json_request, toPath: message_path) { (response: PTDiffusionJSON?, error) in
            if (error != nil) {
                print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
            }
            else {
                print("Received response: %@", response!)
            }
        }
    }


    class JSONRequestHandler: PTDiffusionJSONRequestDelegate {
        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didReceiveRequestWith json: PTDiffusionJSON,
                                            context: PTDiffusionRequestContext,
                                            responder: PTDiffusionResponder) {
            print("Received request: %@", json)
            let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
            responder.respond(with: json_response)
        }

        func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
            print("Message path is now closed")
        }

        func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                            didFailWithError error: Error) {
            print("Message path failed with error: %@", error.localizedDescription)
        }
    }


    func path_register_handler(session: PTDiffusionSession, message_path: String) {
        let handler = JSONRequestHandler()
        let request_stream = PTDiffusionJSON.requestHandler(with: handler)
        session.messaging.add(request_stream, forPath: message_path) { (registration, error) in
            if (error != nil) {
                print("An error has occurred while registering the message path %@. Error: %@",
                      message_path, error!.localizedDescription)
            }
            else {
                print("Message path %@ has been successfully registered.", message_path)
            }
        }
    }


    func to_filter(session: PTDiffusionSession,
                   session_id: PTDiffusionSessionId,
                   message_path: String,
                   session_properties_filter: String) {
        class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate {
            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveResponseWith json: PTDiffusionJSON,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received response from %@: %@", sessionId, json)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didReceiveError error: Error,
                                 from sessionId: PTDiffusionSessionId) {
                print("Received error from %@: %@", sessionId, error)
            }

            func diffusionStream(_ stream: PTDiffusionStream,
                                 didFailWithError error: Error) {
                print("Stream failed with error: %@", error.localizedDescription)
            }

            func diffusionDidClose(_ stream: PTDiffusionStream) {
                print("Stream is now closed")
            }
        }

        // create the request
        let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

        // create the handler that will receive the response
        let handler = JSONSessionResponseHandler()
        let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler)

        // send the message request
        session.messaging.send(json_request,
                               toFilter: session_properties_filter,
                               path: message_path,
                               responseStream: response_stream) { (selectedSessions, error) in

            if (error != nil) {
                print("Error occurred while sending message to filter: %@", error!.localizedDescription)
            }
            else {
                print("Message successfully sent to %lu sessions", selectedSessions)
            }
        }
    }


}