Example: Create a topic
The following examples use the TopicControl feature in the Unified API to create topics.
JavaScript
var diffusion = require('diffusion'); // Connect to the server. Change these options to suit your own environment. // Node.js does not accept self-signed certificates by default. If you have // one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0 // before running this example. diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Topics can be created with a specified topic path and value. If the path contains multiple levels, any // intermediary topic path that do not already have topics remain unchanged. // Create a topic with string values, and an initial value of "xyz". session.topics.add('topic/string', 'xyz'); // Create a topic with integer values, and an initial value of 123. session.topics.add('topic/integer', 123); // Create a topic with decimal values, with an implicit scale of 2 and an initial value of 1.23. session.topics.add('topic/decimal', 1.23); // 2. Adding a topic returns a result, which allows us to handle when the operation has either // completed successfully or encountered an error. session.topics.add('topic/result', 'abc').then(function(result) { console.log('Added topic: ' + result.topic); }, function(reason) { console.log('Failed to add topic: ', reason); }); // Adding a topic that already exists will succeed, so long as it has the same value type session.topics.add('topic/result', 'xyz').then(function(result) { // result.added will be false, as the topic already existed console.log('Added topic: ' + result.topic, result.added); }); // Because the result returned from adding a topic is a promise, we can easily chain // multiple topic adds together session.topics.add('chain/foo', 1).then(session.topics.add('chain/bar', 2)) .then(session.topics.add('chain/baz', 3)) .then(session.topics.add('chain/bob', 4)) .then(function() { console.log('Added all topics'); }, function(reason) { console.log('Failed to add topic', reason); }); // 3. Metadata can be used to create topics that will contain values of a specified format. // RecordContent formats data in a series of records and fields, similar to tabular data. // Each record & field is named, allowing direct lookup of values. Each field value has a // particular type (string, integer, decimal) var metadata = new diffusion.metadata.RecordContent(); // Records are like rows in a table. They can have fields assigned, with default values. // You can add fields all at once like this, or individually (see below). var game = metadata.addRecord('game', 1, { 'title' : metadata.string(), 'round' : metadata.integer(0), 'count' : metadata.integer(0) }); // Records and fields can be set as occurring a certain number of times. var player = metadata.addRecord('player', metadata.occurs(0, 8)); // Add fields to a record individually. player.addField('name', 'Anonymous'); player.addField('score', 0); // Adding the topic works just like normal. session.topics.add('games/some-game', metadata); // And the metadata can be re-used for multiple topics. session.topics.add('games/some-other-game', metadata); // 4. Using metadata, it is possible to create a topic with both a metadata format, and the initial value // Topic values can be produced from metadata via the builder interface var builder = metadata.builder(); // Values must be set before a value can be created builder.add('game', { title : 'Planet Express!', count : 3 }); builder.add('player', { name : 'Fry', score : 0 }); builder.add('player', { name : 'Amy', score : 0 }); builder.add('player', { name : 'Kif', score : 0 }); // Build a content instance var content = builder.build(); // Now that the content has been built, a topic can be added with the metadata & initial value session.topics.add('games/yet-another-game', metadata, content).then(function() { console.log('Topic was added with metadata and content'); }); });
Apple
#import "AddTopicExample.h" @import Diffusion; @implementation AddTopicExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { PTDiffusionCredentials *const credentials = [[PTDiffusionCredentials alloc] initWithPassword:@"password"]; PTDiffusionSessionConfiguration *const sessionConfiguration = [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control" credentials:credentials]; NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url configuration:sessionConfiguration completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Add topics. [self addTopicsForSession:session]; }]; } -(void)addTopicsForSession:(PTDiffusionSession *const)session { // Add a stateless topic. [session.topicControl addWithTopicPath:@"Example/Stateless" type:PTDiffusionTopicType_Stateless value:nil completionHandler:^(NSError *const error) { if (error) { NSLog(@"Failed to add stateless topic. Error: %@", error); } else { NSLog(@"Stateless topic created."); } }]; // Add a single value topic with an initial value. NSData *const data = [@"Hello" dataUsingEncoding:NSUTF8StringEncoding]; PTDiffusionContent *const initialValue = [[PTDiffusionContent alloc] initWithData:data]; [session.topicControl addWithTopicPath:@"Example/SingleValue" type:PTDiffusionTopicType_SingleValue value:initialValue completionHandler:^(NSError * _Nullable error) { if (error) { NSLog(@"Failed to add single value topic. Error: %@", error); } else { NSLog(@"Single value topic created."); } }]; } @end
Java
and Android
import java.util.List; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler; import com.pushtechnology.diffusion.client.content.Content; import com.pushtechnology.diffusion.client.content.RecordContentBuilder; import com.pushtechnology.diffusion.client.content.metadata.MContent; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.RecordTopicDetails; import com.pushtechnology.diffusion.client.topics.details.TopicDetails; /** * An example of using a control client to add topics. * <P> * This uses the 'TopicControl' feature only. * <P> * To add or remove topics, the client session must have the 'modify_topic' * permission for that branch of the topic tree. * * @author Push Technology Limited * @since 5.0 */ public class ControlClientAddingTopics { private final Session session; private final TopicControl topicControl; /** * Constructor. */ public ControlClientAddingTopics() { session = Diffusion.sessions().principal("control").password("password") .open("ws://diffusion.example.com:80"); topicControl = session.feature(TopicControl.class); } /** * Adds a topic with type derived from the initial value. * <P> * This uses the simple convenience method for adding topics where the topic * type (and metadata, if appropriate) are derived from a supplied value * which can be any object. For example, an Integer would result in a single * value topic of type integer or a JSON object would result in a JSON * topic. * * @param topicPath full topic path * @param initialValue an initial value for the topic * @param context this will be passed back to the callback when reporting * success or failure of the topic add * @param callback to notify result of operation * @param <T> the value type * @return the topic details used to add the topic */ public <T> TopicDetails addTopic( String topicPath, T initialValue, String context, AddContextCallback<String> callback) { return topicControl.addTopicFromValue( topicPath, initialValue, context, callback); } /** * Add a record topic from a list of initial values. * <P> * This demonstrates the simplest mechanism for adding a record topic by * supplying values that both the metadata and the initial values are * derived from. * * @param topicPath full topic path * @param initialValues the initial values for the topic fields which will * also be used to derive the metadata definition of the topic * @param context this will be passed back to the callback when reporting * success or failure of the topic add * @param callback to notify result of operation * @return the topic details used to add the topic */ public TopicDetails addRecordTopic( String topicPath, List<String> initialValues, String context, AddContextCallback<String> callback) { return topicControl.addTopicFromValue( topicPath, Diffusion.content().newBuilder(RecordContentBuilder.class) .putFields(initialValues).build(), context, callback); } /** * Adds a record topic with supplied metadata and optional initial content. * <P> * This example shows details being created and would be fine when creating * topics that are all different but if creating many record topics with the * same details then it is far more efficient to pre-create the details. * * @param topicPath the full topic path * @param metadata pre-created record metadata * @param initialValue optional initial value for the topic which must have * been created to match the supplied metadata * @param context context passed back to callback when topic created * @param callback to notify result of operation */ public void addRecordTopic( String topicPath, MContent metadata, Content initialValue, String context, AddContextCallback<String> callback) { final TopicDetails details = topicControl.newDetailsBuilder(RecordTopicDetails.Builder.class) .metadata(metadata).build(); topicControl.addTopic( topicPath, details, initialValue, context, callback); } /** * Remove a single topic given its path. * * @param topicPath the topic path * @param callback notifies result of operation */ public void removeTopic(String topicPath, RemovalCallback callback) { topicControl.remove(topicPath, callback); } /** * Remove a topic and all of its descendants. * * @param topicPath the topic path * @param callback notifies result of operation */ public void removeTopicBranch(String topicPath, RemovalCallback callback) { topicControl.remove("?" + topicPath + "//", callback); } /** * Remove one or more topics using a topic selector expression. * * @param topicSelector the selector expression * @param callback notifies result of operation */ public void removeTopics(String topicSelector, RemovalCallback callback) { topicControl.remove(topicSelector, callback); } /** * Request that the topic {@code topicPath} and its descendants be removed * when the session is closed (either explicitly using {@link Session#close} * , or by the server). If more than one session calls this method for the * same {@code topicPath}, the topics will be removed when the last session * is closed. * * <p> * Different sessions may call this method for the same topic path, but not * for topic paths above or below existing registrations on the same branch * of the topic tree. * * @param topicPath the part of the topic tree to remove when the last * session is closed */ public void removeTopicsWithSession(String topicPath) { topicControl.removeTopicsWithSession( topicPath, new TopicTreeHandler.Default()); } /** * Close the session. */ public void close() { session.close(); } }
.NET
using System.Collections.Generic; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Content; using PushTechnology.ClientInterface.Client.Content.Metadata; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; namespace Examples { /// <summary> /// An example of using a control client to add topics. /// /// This uses the <see cref="ITopicControl"/> feature only. /// /// To add or remove topics, the client session must have the <see cref="TopicPermission.MODIFY_TOPIC"/> permission /// for that branch of the topic tree. /// </summary> public class ControlClientAddingTopics { private readonly ISession session; private readonly ITopicControl topicControl; public ControlClientAddingTopics() { session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .Open( "ws://diffusion.example.com:80" ); topicControl = session.GetTopicControlFeature(); } /// <summary> /// Adds a topic with the type derived from the value. /// /// This uses the simple convenience method for adding topics where the topic type and metadata are derived /// from a supplied value which can be any object. For example, an integer would result in a single value topic /// of type 'integer'. /// </summary> /// <typeparam name="T">The value type.</typeparam> /// <param name="topicPath">The full topic path.</param> /// <param name="initialValue">An optional initial value for the topic.</param> /// <param name="context">This will be passed back to the callback when reporting success or failure of the /// topic add.</param> /// <param name="callback">To notify the result of the operation.</param> /// <returns>The topic details used to add the topic.</returns> public ITopicDetails AddTopicForValue<T>( string topicPath, T initialValue, string context, ITopicControlAddContextCallback<string> callback ) { return topicControl.AddTopicFromValue( topicPath, initialValue, context, callback ); } /// <summary> /// Add a record topic from a list of initial values. /// /// This demonstrates the simplest mechanism for adding a record topic by supplying values that both the /// metadata and the initial values are derived from. /// </summary> /// <param name="topicPath">The full topic path.</param> /// <param name="initialValues">The initial values for the topic fields which will also be used to derive the /// metadata definition of the topic.</param> /// <param name="context">This will be passed back to the callback when reporting success or failure of the /// topic add.</param> /// <param name="callback">To notify the result of the operation.</param> /// <returns></returns> public ITopicDetails AddRecordTopic( string topicPath, List<string> initialValues, string context, ITopicControlAddContextCallback<string> callback ) { return topicControl.AddTopicFromValue( topicPath, Diffusion.Content.NewBuilder<IRecordContentBuilder>().PutFields( initialValues.ToArray() ).Build(), context, callback ); } /// <summary> /// Adds a record topic with supplied metadata and optional initial content. /// /// This example shows details being created and would be fine when creating topics that are all different, but /// if creating many record topics with the same details, then it is far more efficient to pre-create the /// details. /// </summary> /// <param name="topicPath">The full topic path.</param> /// <param name="metadata">The pre-created record metadata.</param> /// <param name="initialValue">The optional initial value for the topic which must have been created to match /// the supplied metadata.</param> /// <param name="context">The context passed back to the callback when the topic is created.</param> /// <param name="callback">To notify the result of the operation.</param> public void AddRecordTopic( string topicPath, IMContent metadata, IContent initialValue, string context, ITopicControlAddContextCallback<string> callback ) { var details = topicControl.CreateDetailsBuilder<IRecordTopicDetailsBuilder>().Metadata( metadata ).Build(); topicControl.AddTopic( topicPath, details, initialValue, context, callback ); } /// <summary> /// Remove a single topic given its path. /// </summary> /// <param name="topicPath">The topic path.</param> /// <param name="callback">Notifies the result of the operation.</param> public void RemoveTopic( string topicPath, ITopicControlRemoveCallback callback ) { topicControl.RemoveTopics( ">" + topicPath, callback ); } /// <summary> /// Remove one or more topics using a topic selector expression. /// </summary> /// <param name="topicSelector">The selector expression.</param> /// <param name="callback">Notifies the result of the operation.</param> public void RemoveTopics( string topicSelector, ITopicControlRemoveCallback callback ) { topicControl.RemoveTopics( topicSelector, callback ); } /// <summary> /// Request that the topic path and its descendants be removed when the session is closed (either explicitly /// using <see cref="ISession.Close"/>, or by the server). If more than one session calls this method for the /// same topic path, the topics will be removed when the last session is closed. /// /// Different sessions may call this method for the same topic path, but not for topic paths above or below /// existing registrations on the same branch of the topic tree. /// </summary> /// <param name="topicPath">The part of the topic tree to remove when the last session is closed.</param> public void RemoveTopicsWithSession( string topicPath ) { topicControl.RemoveTopicsWithSession( topicPath, new DefaultTopicTreeHandler() ); } /// <summary> /// Close the session. /// </summary> public void Close() { session.Close(); } } }
C
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" #include "utils.h" /* * We use a mutex and a condition variable to help synchronize the * flow so that it becomes linear and easier to follow the core logic. */ apr_pool_t *pool = NULL; apr_thread_mutex_t *mutex = NULL; apr_thread_cond_t *cond = NULL; ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, END_OF_ARG_OPTS }; // Various handlers which are common to all add_topic() functions. static int on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { puts("on_topic_added"); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { printf("on_topic_add_failed: %d\n", response->response_code); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_add_discard(SESSION_T *session, void *context) { puts("on_topic_add_discard"); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_removed(SESSION_T *session, const SVC_REMOVE_TOPICS_RESPONSE_T *response, void *context) { puts("on_topic_removed"); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_remove_discard(SESSION_T *session, void *context) { puts("on_topic_remove_discard"); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } /* * */ int main(int argc, char** argv) { /* * Standard command-line parsing. */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } // Setup for condition variable apr_initialize(); apr_pool_create(&pool, NULL); apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool); apr_thread_cond_create(&cond, pool); // Setup for session SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { fprintf(stderr, "TEST: Failed to create session\n"); fprintf(stderr, "ERR : %s\n", error.message); return EXIT_FAILURE; } // Common params for all add_topic() functions. ADD_TOPIC_PARAMS_T common_params = { .on_topic_added = on_topic_added, .on_topic_add_failed = on_topic_add_failed, .on_discard = on_topic_add_discard }; /* * Create a stateless topic. */ TOPIC_DETAILS_T *topic_details = create_topic_details_stateless(); ADD_TOPIC_PARAMS_T stateless_params = common_params; stateless_params.topic_path = "stateless"; stateless_params.details = topic_details; apr_thread_mutex_lock(mutex); add_topic(session, stateless_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with single value string data, but with * containing no default data. */ TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING); ADD_TOPIC_PARAMS_T string_params = common_params; string_params.topic_path = "string"; string_params.details = string_topic_details; apr_thread_mutex_lock(mutex); add_topic(session, string_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with single value string data and containing * some default data. */ ADD_TOPIC_PARAMS_T string_data_params = common_params; string_data_params.topic_path = "string-data"; string_data_params.details = string_topic_details; BUF_T *sample_data_buf = buf_create(); buf_write_string(sample_data_buf, "Hello, world"); string_data_params.content = content_create(CONTENT_ENCODING_NONE, sample_data_buf); apr_thread_mutex_lock(mutex); add_topic(session, string_data_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with single value integer data, and with a * default value. */ TOPIC_DETAILS_T *integer_topic_details = create_topic_details_single_value(M_DATA_TYPE_INTEGER_STRING); integer_topic_details->topic_details_params.integer.default_value = 999; ADD_TOPIC_PARAMS_T integer_params = common_params; integer_params.topic_path = "integer"; integer_params.details = integer_topic_details; apr_thread_mutex_lock(mutex); add_topic(session, integer_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with integer data, but using a CONTENT_T to * specify the initial data. */ ADD_TOPIC_PARAMS_T integer_data_params = common_params; integer_data_params.topic_path = "integer-data"; integer_data_params.details = integer_topic_details; BUF_T *integer_data_buf = buf_create(); buf_sprintf(integer_data_buf, "%d", 123); integer_data_params.content = content_create(CONTENT_ENCODING_NONE, integer_data_buf); apr_thread_mutex_lock(mutex); add_topic(session, integer_data_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with single value decimal data, with a * default value and specifying the scale (i.e. positions * after the decimal place). */ TOPIC_DETAILS_T *decimal_topic_details = create_topic_details_single_value(M_DATA_TYPE_DECIMAL_STRING); decimal_topic_details->topic_details_params.decimal.default_value = 123.456; decimal_topic_details->topic_details_params.decimal.scale = 4; ADD_TOPIC_PARAMS_T decimal_params = common_params; decimal_params.topic_path = "decimal"; decimal_params.details = decimal_topic_details; apr_thread_mutex_lock(mutex); add_topic(session, decimal_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with decimal data, using a CONTENT_T to * specify the initial data. */ ADD_TOPIC_PARAMS_T decimal_data_params = common_params; decimal_data_params.topic_path = "decimal-data"; decimal_data_params.details = decimal_topic_details; BUF_T *decimal_data_buf = buf_create(); buf_sprintf(decimal_data_buf, "%f", 987.654); decimal_data_params.content = content_create(CONTENT_ENCODING_NONE, decimal_data_buf); apr_thread_mutex_lock(mutex); add_topic(session, decimal_data_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Record topic data. * * The C API does not have the concept of "builders" for * creating record topic data, but requires you to build a * string containing XML that describes the structure of the * messages. */ /* * First of all, this adds a topic equivalent to single-value * strings, but defined with XML. */ BUF_T *manual_schema = buf_create(); buf_write_string(manual_schema, "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"); buf_write_string(manual_schema, "<field name=\"x\" type=\"string\" default=\"xyzzy\" allowsEmpty=\"true\"/>"); TOPIC_DETAILS_T *manual_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING); manual_topic_details->user_defined_schema = manual_schema; ADD_TOPIC_PARAMS_T string_manual_params = common_params; string_manual_params.topic_path = "string-manual"; string_manual_params.details = manual_topic_details; apr_thread_mutex_lock(mutex); add_topic(session, string_manual_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * This adds a topic with a record containing multiple fields * of different types. */ BUF_T *record_schema = buf_create(); buf_write_string(record_schema, "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"); buf_write_string(record_schema, "<message topicDataType=\"record\" name=\"MyContent\">"); buf_write_string(record_schema, "<record name=\"Record1\">"); buf_write_string(record_schema, "<field type=\"string\" default=\"\" allowsEmpty=\"true\" name=\"Field1\"/>"); buf_write_string(record_schema, "<field type=\"integerString\" default=\"0\" allowsEmpty=\"false\" name=\"Field2\"/>"); buf_write_string(record_schema, "<field type=\"decimalString\" default=\"0.00\" scale=\"2\" allowsEmpty=\"false\" name=\"Field3\"/>"); buf_write_string(record_schema, "</record>"); buf_write_string(record_schema, "</message>"); TOPIC_DETAILS_T *record_topic_details = create_topic_details_record(); record_topic_details->user_defined_schema = record_schema; ADD_TOPIC_PARAMS_T record_params = common_params; record_params.topic_path = "record"; record_params.details = record_topic_details; apr_thread_mutex_lock(mutex); add_topic(session, record_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Create a topic with binary data */ TOPIC_DETAILS_T *binary_topic_details = create_topic_details_binary(); ADD_TOPIC_PARAMS_T binary_params = common_params; binary_params.topic_path = "binary-data"; binary_params.details = binary_topic_details; char binary_bytes[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }; BUF_T *binary_data_buf = buf_create(); buf_write_bytes(binary_data_buf, binary_bytes, sizeof(binary_bytes)); binary_params.content = content_create(CONTENT_ENCODING_NONE, binary_data_buf); apr_thread_mutex_lock(mutex); add_topic(session, binary_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * We can also remove topics. First, add a couple of topics * and then remove their parent topic. All child topics are * removed with the parent. */ puts("Adding topics remove_me/1 and remove_me/2"); ADD_TOPIC_PARAMS_T topic_params = common_params; topic_params.details = topic_details; topic_params.topic_path = "remove_me/1"; apr_thread_mutex_lock(mutex); add_topic(session, topic_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); topic_params.topic_path = "remove_me/2"; apr_thread_mutex_lock(mutex); add_topic(session, topic_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); puts("Removing topics in 5 seconds..."); sleep(5); REMOVE_TOPICS_PARAMS_T remove_params = { .on_removed = on_topic_removed, .on_discard = on_topic_remove_discard, .topic_selector = ">remove_me" }; apr_thread_mutex_lock(mutex); remove_topics(session, remove_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Close our session, and release resources and memory. */ session_close(session, NULL); session_free(session); apr_thread_mutex_destroy(mutex); apr_thread_cond_destroy(cond); apr_pool_destroy(pool); apr_terminate(); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2017/06/29