Just a second...

Example: Create a topic

The following examples use the TopicControl feature in the Unified API to create topics.

JavaScript
var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js does not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {
    // 1. Topics can be created with a specified topic path and value. If the path contains multiple levels, any
    // intermediary topic path that do not already have topics remain unchanged.

    // Create a topic with string values, and an initial value of "xyz".
    session.topics.add('topic/string', 'xyz');

    // Create a topic with integer values, and an initial value of 123.
    session.topics.add('topic/integer', 123);

    // Create a topic with decimal values, with an implicit scale of 2 and an initial value of 1.23.
    session.topics.add('topic/decimal', 1.23);

    // 2. Adding a topic returns a result, which allows us to handle when the operation has either 
    // completed successfully or encountered an error.
    session.topics.add('topic/result', 'abc').then(function(result) {
        console.log('Added topic: ' + result.topic);
    }, function(reason) {
        console.log('Failed to add topic: ', reason);
    });

    // Adding a topic that already exists will succeed, so long as it has the same value type
    session.topics.add('topic/result', 'xyz').then(function(result) {
        // result.added will be false, as the topic already existed
        console.log('Added topic: ' + result.topic, result.added);
    });

    // Because the result returned from adding a topic is a promise, we can easily chain
    // multiple topic adds together
    session.topics.add('chain/foo', 1).then(session.topics.add('chain/bar', 2))
                                      .then(session.topics.add('chain/baz', 3))
                                      .then(session.topics.add('chain/bob', 4))
                                      .then(function() {
                                         console.log('Added all topics');
                                      }, function(reason) {
                                         console.log('Failed to add topic', reason);
                                      });

    // 3. Metadata can be used to create topics that will contain values of a specified format.
    
    // RecordContent formats data in a series of records and fields, similar to tabular data.
    // Each record & field is named, allowing direct lookup of values. Each field value has a
    // particular type (string, integer, decimal)
    var metadata = new diffusion.metadata.RecordContent();

    // Records are like rows in a table. They can have fields assigned, with default values. 
    // You can add fields all at once like this, or individually (see below).
    var game = metadata.addRecord('game', 1, {
        'title' : metadata.string(),
        'round' : metadata.integer(0),
        'count' : metadata.integer(0)
    });

    // Records and fields can be set as occurring a certain number of times.
    var player = metadata.addRecord('player', metadata.occurs(0, 8));

    // Add fields to a record individually.
    player.addField('name', 'Anonymous');
    player.addField('score', 0);

    // Adding the topic works just like normal.
    session.topics.add('games/some-game', metadata);

    // And the metadata can be re-used for multiple topics.
    session.topics.add('games/some-other-game', metadata);

    // 4. Using metadata, it is possible to create a topic with both a metadata format, and the initial value
    
    // Topic values can be produced from metadata via the builder interface
    var builder = metadata.builder();

    // Values must be set before a value can be created
    builder.add('game', { title : 'Planet Express!', count : 3 });
    
    builder.add('player', { name : 'Fry', score : 0 });
    builder.add('player', { name : 'Amy', score : 0 });
    builder.add('player', { name : 'Kif', score : 0 });

    // Build a content instance
    var content = builder.build();

    // Now that the content has been built, a topic can be added with the metadata & initial value
    session.topics.add('games/yet-another-game', metadata, content).then(function() {
        console.log('Topic was added with metadata and content');
    });
});
Apple
#import "AddTopicExample.h"

@import Diffusion;

@implementation AddTopicExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {

    PTDiffusionCredentials *const credentials =
        [[PTDiffusionCredentials alloc] initWithPassword:@"password"];

    PTDiffusionSessionConfiguration *const sessionConfiguration =
        [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control"
                                                       credentials:credentials];

    NSLog(@"Connecting...");

    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Add topics.
        [self addTopicsForSession:session];
    }];
}

-(void)addTopicsForSession:(PTDiffusionSession *const)session {
    // Add a stateless topic.
    [session.topicControl addWithTopicPath:@"Example/Stateless"
                                      type:PTDiffusionTopicType_Stateless
                                     value:nil
                         completionHandler:^(NSError *const error)
    {
        if (error) {
            NSLog(@"Failed to add stateless topic. Error: %@", error);
        } else {
            NSLog(@"Stateless topic created.");
        }
    }];

    // Add a single value topic with an initial value.
    NSData *const data = [@"Hello" dataUsingEncoding:NSUTF8StringEncoding];
    PTDiffusionContent *const initialValue =
        [[PTDiffusionContent alloc] initWithData:data];
    [session.topicControl addWithTopicPath:@"Example/SingleValue"
                                      type:PTDiffusionTopicType_SingleValue
                                     value:initialValue
                         completionHandler:^(NSError * _Nullable error)
    {
        if (error) {
            NSLog(@"Failed to add single value topic. Error: %@", error);
        } else {
            NSLog(@"Single value topic created.");
        }
    }];
}

@end
Java and Android
import java.util.List;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler;
import com.pushtechnology.diffusion.client.content.Content;
import com.pushtechnology.diffusion.client.content.RecordContentBuilder;
import com.pushtechnology.diffusion.client.content.metadata.MContent;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.RecordTopicDetails;
import com.pushtechnology.diffusion.client.topics.details.TopicDetails;

/**
 * An example of using a control client to add topics.
 * <P>
 * This uses the 'TopicControl' feature only.
 * <P>
 * To add or remove topics, the client session must have the 'modify_topic'
 * permission for that branch of the topic tree.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public class ControlClientAddingTopics {

    private final Session session;

    private final TopicControl topicControl;

    /**
     * Constructor.
     */
    public ControlClientAddingTopics() {

        session =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");

        topicControl = session.feature(TopicControl.class);

    }

    /**
     * Adds a topic with type derived from the initial value.
     * <P>
     * This uses the simple convenience method for adding topics where the topic
     * type (and metadata, if appropriate) are derived from a supplied value
     * which can be any object. For example, an Integer would result in a single
     * value topic of type integer or a JSON object would result in a JSON
     * topic.
     *
     * @param topicPath full topic path
     * @param initialValue an initial value for the topic
     * @param context this will be passed back to the callback when reporting
     *        success or failure of the topic add
     * @param callback to notify result of operation
     * @param <T> the value type
     * @return the topic details used to add the topic
     */
    public <T> TopicDetails addTopic(
        String topicPath,
        T initialValue,
        String context,
        AddContextCallback<String> callback) {

        return topicControl.addTopicFromValue(
            topicPath,
            initialValue,
            context,
            callback);
    }

    /**
     * Add a record topic from a list of initial values.
     * <P>
     * This demonstrates the simplest mechanism for adding a record topic by
     * supplying values that both the metadata and the initial values are
     * derived from.
     *
     * @param topicPath full topic path
     * @param initialValues the initial values for the topic fields which will
     *        also be used to derive the metadata definition of the topic
     * @param context this will be passed back to the callback when reporting
     *        success or failure of the topic add
     * @param callback to notify result of operation
     * @return the topic details used to add the topic
     */
    public TopicDetails addRecordTopic(
        String topicPath,
        List<String> initialValues,
        String context,
        AddContextCallback<String> callback) {

        return topicControl.addTopicFromValue(
            topicPath,
            Diffusion.content().newBuilder(RecordContentBuilder.class)
                .putFields(initialValues).build(),
            context,
            callback);

    }

    /**
     * Adds a record topic with supplied metadata and optional initial content.
     * <P>
     * This example shows details being created and would be fine when creating
     * topics that are all different but if creating many record topics with the
     * same details then it is far more efficient to pre-create the details.
     *
     * @param topicPath the full topic path
     * @param metadata pre-created record metadata
     * @param initialValue optional initial value for the topic which must have
     *        been created to match the supplied metadata
     * @param context context passed back to callback when topic created
     * @param callback to notify result of operation
     */
    public void addRecordTopic(
        String topicPath,
        MContent metadata,
        Content initialValue,
        String context,
        AddContextCallback<String> callback) {

        final TopicDetails details =
            topicControl.newDetailsBuilder(RecordTopicDetails.Builder.class)
                .metadata(metadata).build();

        topicControl.addTopic(
            topicPath,
            details,
            initialValue,
            context,
            callback);
    }

    /**
     * Remove a single topic given its path.
     *
     * @param topicPath the topic path
     * @param callback notifies result of operation
     */
    public void removeTopic(String topicPath, RemovalCallback callback) {
        topicControl.remove(topicPath, callback);
    }

    /**
     * Remove a topic and all of its descendants.
     *
     * @param topicPath the topic path
     * @param callback notifies result of operation
     */
    public void removeTopicBranch(String topicPath, RemovalCallback callback) {
        topicControl.remove("?" + topicPath + "//", callback);
    }

    /**
     * Remove one or more topics using a topic selector expression.
     *
     * @param topicSelector the selector expression
     * @param callback notifies result of operation
     */
    public void removeTopics(String topicSelector, RemovalCallback callback) {
        topicControl.remove(topicSelector, callback);
    }

    /**
     * Request that the topic {@code topicPath} and its descendants be removed
     * when the session is closed (either explicitly using {@link Session#close}
     * , or by the server). If more than one session calls this method for the
     * same {@code topicPath}, the topics will be removed when the last session
     * is closed.
     *
     * <p>
     * Different sessions may call this method for the same topic path, but not
     * for topic paths above or below existing registrations on the same branch
     * of the topic tree.
     *
     * @param topicPath the part of the topic tree to remove when the last
     *        session is closed
     */
    public void removeTopicsWithSession(String topicPath) {
        topicControl.removeTopicsWithSession(
            topicPath, new TopicTreeHandler.Default());
    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }

}
.NET
using System.Collections.Generic;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Content;
using PushTechnology.ClientInterface.Client.Content.Metadata;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;

namespace Examples {
    /// <summary>
    /// An example of using a control client to add topics.
    ///
    /// This uses the <see cref="ITopicControl"/> feature only.
    ///
    /// To add or remove topics, the client session must have the <see cref="TopicPermission.MODIFY_TOPIC"/> permission
    /// for that branch of the topic tree.
    /// </summary>
    public class ControlClientAddingTopics {
        private readonly ISession session;
        private readonly ITopicControl topicControl;

        public ControlClientAddingTopics() {
            session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .Open( "ws://diffusion.example.com:80" );

            topicControl = session.GetTopicControlFeature();
        }

        /// <summary>
        /// Adds a topic with the type derived from the value.
        ///
        /// This uses the simple convenience method for adding topics where the topic type and metadata are derived
        /// from a supplied value which can be any object. For example, an integer would result in a single value topic
        /// of type 'integer'.
        /// </summary>
        /// <typeparam name="T">The value type.</typeparam>
        /// <param name="topicPath">The full topic path.</param>
        /// <param name="initialValue">An optional initial value for the topic.</param>
        /// <param name="context">This will be passed back to the callback when reporting success or failure of the
        /// topic add.</param>
        /// <param name="callback">To notify the result of the operation.</param>
        /// <returns>The topic details used to add the topic.</returns>
        public ITopicDetails AddTopicForValue<T>( string topicPath, T initialValue, string context,
            ITopicControlAddContextCallback<string> callback ) {
            return topicControl.AddTopicFromValue( topicPath, initialValue, context, callback );
        }

        /// <summary>
        /// Add a record topic from a list of initial values.
        ///
        /// This demonstrates the simplest mechanism for adding a record topic by supplying values that both the
        /// metadata and the initial values are derived from.
        /// </summary>
        /// <param name="topicPath">The full topic path.</param>
        /// <param name="initialValues">The initial values for the topic fields which will also be used to derive the
        /// metadata definition of the topic.</param>
        /// <param name="context">This will be passed back to the callback when reporting success or failure of the
        /// topic add.</param>
        /// <param name="callback">To notify the result of the operation.</param>
        /// <returns></returns>
        public ITopicDetails AddRecordTopic( string topicPath, List<string> initialValues, string context,
            ITopicControlAddContextCallback<string> callback ) {
            return topicControl.AddTopicFromValue( topicPath,
                Diffusion.Content.NewBuilder<IRecordContentBuilder>().PutFields( initialValues.ToArray() ).Build(),
                context, callback );
        }

        /// <summary>
        /// Adds a record topic with supplied metadata and optional initial content.
        ///
        /// This example shows details being created and would be fine when creating topics that are all different, but
        /// if creating many record topics with the same details, then it is far more efficient to pre-create the
        /// details.
        /// </summary>
        /// <param name="topicPath">The full topic path.</param>
        /// <param name="metadata">The pre-created record metadata.</param>
        /// <param name="initialValue">The optional initial value for the topic which must have been created to match
        /// the supplied metadata.</param>
        /// <param name="context">The context passed back to the callback when the topic is created.</param>
        /// <param name="callback">To notify the result of the operation.</param>
        public void AddRecordTopic( string topicPath, IMContent metadata, IContent initialValue, string context,
            ITopicControlAddContextCallback<string> callback ) {
            var details = topicControl.CreateDetailsBuilder<IRecordTopicDetailsBuilder>().Metadata( metadata ).Build();

            topicControl.AddTopic( topicPath, details, initialValue, context, callback );
        }

        /// <summary>
        /// Remove a single topic given its path.
        /// </summary>
        /// <param name="topicPath">The topic path.</param>
        /// <param name="callback">Notifies the result of the operation.</param>
        public void RemoveTopic( string topicPath, ITopicControlRemoveCallback callback ) {
            topicControl.RemoveTopics( ">" + topicPath, callback );
        }

        /// <summary>
        /// Remove one or more topics using a topic selector expression.
        /// </summary>
        /// <param name="topicSelector">The selector expression.</param>
        /// <param name="callback">Notifies the result of the operation.</param>
        public void RemoveTopics( string topicSelector, ITopicControlRemoveCallback callback ) {
            topicControl.RemoveTopics( topicSelector, callback );
        }

        /// <summary>
        /// Request that the topic path and its descendants be removed when the session is closed (either explicitly
        /// using <see cref="ISession.Close"/>, or by the server). If more than one session calls this method for the
        /// same topic path, the topics will be removed when the last session is closed.
        ///
        /// Different sessions may call this method for the same topic path, but not for topic paths above or below
        /// existing registrations on the same branch of the topic tree.
        /// </summary>
        /// <param name="topicPath">The part of the topic tree to remove when the last session is closed.</param>
        public void RemoveTopicsWithSession( string topicPath ) {
            topicControl.RemoveTopicsWithSession( topicPath, new DefaultTopicTreeHandler() );
        }

        /// <summary>
        /// Close the session.
        /// </summary>
        public void Close() {
            session.Close();
        }
    }
}
C
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"
#include "utils.h"

/*
 * We use a mutex and a condition variable to help synchronize the
 * flow so that it becomes linear and easier to follow the core logic.
 */
apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        END_OF_ARG_OPTS
};

// Various handlers which are common to all add_topic() functions.
static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("on_topic_added");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        printf("on_topic_add_failed: %d\n", response->response_code);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_discard(SESSION_T *session, void *context)
{
        puts("on_topic_add_discard");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_removed(SESSION_T *session, const SVC_REMOVE_TOPICS_RESPONSE_T *response, void *context)
{
        puts("on_topic_removed");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_remove_discard(SESSION_T *session, void *context)
{
        puts("on_topic_remove_discard");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}
/*
 *
 */
int main(int argc, char** argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        // Setup for condition variable
        apr_initialize();
        apr_pool_create(&pool, NULL);
        apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
        apr_thread_cond_create(&cond, pool);

        // Setup for session
        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        // Common params for all add_topic() functions.
        ADD_TOPIC_PARAMS_T common_params = {
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard
        };

        /*
         * Create a stateless topic.
         */
        TOPIC_DETAILS_T *topic_details = create_topic_details_stateless();
        ADD_TOPIC_PARAMS_T stateless_params = common_params;
        stateless_params.topic_path = "stateless";
        stateless_params.details = topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, stateless_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with single value string data, but with
         * containing no default data.
         */
        TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
        ADD_TOPIC_PARAMS_T string_params = common_params;
        string_params.topic_path = "string";
        string_params.details = string_topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, string_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with single value string data and containing
         * some default data.
         */
        ADD_TOPIC_PARAMS_T string_data_params = common_params;
        string_data_params.topic_path = "string-data";
        string_data_params.details = string_topic_details;
        BUF_T *sample_data_buf = buf_create();
        buf_write_string(sample_data_buf, "Hello, world");
        string_data_params.content = content_create(CONTENT_ENCODING_NONE, sample_data_buf);

        apr_thread_mutex_lock(mutex);
        add_topic(session, string_data_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with single value integer data, and with a
         * default value.
         */
        TOPIC_DETAILS_T *integer_topic_details = create_topic_details_single_value(M_DATA_TYPE_INTEGER_STRING);
        integer_topic_details->topic_details_params.integer.default_value = 999;

        ADD_TOPIC_PARAMS_T integer_params = common_params;
        integer_params.topic_path = "integer";
        integer_params.details = integer_topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, integer_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with integer data, but using a CONTENT_T to
         * specify the initial data.
         */
        ADD_TOPIC_PARAMS_T integer_data_params = common_params;
        integer_data_params.topic_path = "integer-data";
        integer_data_params.details = integer_topic_details;
        BUF_T *integer_data_buf = buf_create();
        buf_sprintf(integer_data_buf, "%d", 123);
        integer_data_params.content = content_create(CONTENT_ENCODING_NONE, integer_data_buf);

        apr_thread_mutex_lock(mutex);
        add_topic(session, integer_data_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with single value decimal data, with a
         * default value and specifying the scale (i.e. positions
         * after the decimal place).
         */
        TOPIC_DETAILS_T *decimal_topic_details = create_topic_details_single_value(M_DATA_TYPE_DECIMAL_STRING);
        decimal_topic_details->topic_details_params.decimal.default_value = 123.456;
        decimal_topic_details->topic_details_params.decimal.scale = 4;

        ADD_TOPIC_PARAMS_T decimal_params = common_params;
        decimal_params.topic_path = "decimal";
        decimal_params.details = decimal_topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, decimal_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with decimal data, using a CONTENT_T to
         * specify the initial data.
         */
        ADD_TOPIC_PARAMS_T decimal_data_params = common_params;
        decimal_data_params.topic_path = "decimal-data";
        decimal_data_params.details = decimal_topic_details;
        BUF_T *decimal_data_buf = buf_create();
        buf_sprintf(decimal_data_buf, "%f", 987.654);
        decimal_data_params.content = content_create(CONTENT_ENCODING_NONE, decimal_data_buf);

        apr_thread_mutex_lock(mutex);
        add_topic(session, decimal_data_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Record topic data.
         *
         * The C API does not have the concept of "builders" for
         * creating record topic data, but requires you to build a
         * string containing XML that describes the structure of the
         * messages.
         */

        /*
         * First of all, this adds a topic equivalent to single-value
         * strings, but defined with XML.
         */
        BUF_T *manual_schema = buf_create();
        buf_write_string(manual_schema,
                "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n");
        buf_write_string(manual_schema,
                "<field name=\"x\" type=\"string\" default=\"xyzzy\" allowsEmpty=\"true\"/>");
        TOPIC_DETAILS_T *manual_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
        manual_topic_details->user_defined_schema = manual_schema;

        ADD_TOPIC_PARAMS_T string_manual_params = common_params;
        string_manual_params.topic_path = "string-manual";
        string_manual_params.details = manual_topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, string_manual_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * This adds a topic with a record containing multiple fields
         * of different types.
         */
        BUF_T *record_schema = buf_create();
        buf_write_string(record_schema,
                "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>");
        buf_write_string(record_schema,
                "<message topicDataType=\"record\" name=\"MyContent\">");
        buf_write_string(record_schema,
                "<record name=\"Record1\">");
        buf_write_string(record_schema,
                "<field type=\"string\" default=\"\" allowsEmpty=\"true\" name=\"Field1\"/>");
        buf_write_string(record_schema,
                "<field type=\"integerString\" default=\"0\" allowsEmpty=\"false\" name=\"Field2\"/>");
        buf_write_string(record_schema,
                "<field type=\"decimalString\" default=\"0.00\" scale=\"2\" allowsEmpty=\"false\" name=\"Field3\"/>");
        buf_write_string(record_schema,
                "</record>");
        buf_write_string(record_schema,
                "</message>");
        TOPIC_DETAILS_T *record_topic_details = create_topic_details_record();
        record_topic_details->user_defined_schema = record_schema;

        ADD_TOPIC_PARAMS_T record_params = common_params;
        record_params.topic_path = "record";
        record_params.details = record_topic_details;

        apr_thread_mutex_lock(mutex);
        add_topic(session, record_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Create a topic with binary data
         */
        TOPIC_DETAILS_T *binary_topic_details = create_topic_details_binary();
        ADD_TOPIC_PARAMS_T binary_params = common_params;
        binary_params.topic_path = "binary-data";
        binary_params.details = binary_topic_details;

        char binary_bytes[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 };

        BUF_T *binary_data_buf = buf_create();
        buf_write_bytes(binary_data_buf, binary_bytes, sizeof(binary_bytes));
        binary_params.content = content_create(CONTENT_ENCODING_NONE, binary_data_buf);

        apr_thread_mutex_lock(mutex);
        add_topic(session, binary_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * We can also remove topics. First, add a couple of topics
         * and then remove their parent topic. All child topics are
         * removed with the parent.
         */
        puts("Adding topics remove_me/1 and remove_me/2");

        ADD_TOPIC_PARAMS_T topic_params = common_params;
        topic_params.details = topic_details;
        topic_params.topic_path = "remove_me/1";

        apr_thread_mutex_lock(mutex);
        add_topic(session, topic_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        topic_params.topic_path = "remove_me/2";
        apr_thread_mutex_lock(mutex);
        add_topic(session, topic_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        puts("Removing topics in 5 seconds...");
        sleep(5);

        REMOVE_TOPICS_PARAMS_T remove_params = {
                .on_removed = on_topic_removed,
                .on_discard = on_topic_remove_discard,
                .topic_selector = ">remove_me"
        };

        apr_thread_mutex_lock(mutex);
        remove_topics(session, remove_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Close our session, and release resources and memory.
         */
        session_close(session, NULL);
        session_free(session);

        apr_thread_mutex_destroy(mutex);
        apr_thread_cond_destroy(cond);
        apr_pool_destroy(pool);
        apr_terminate();

        return EXIT_SUCCESS;
}

Change the URL from that provided in the example to the URL of the Diffusion™ server.