Just a second...

Start publishing with C

Create a C client that publishes data through topics on the Diffusion™ server.

The C client libraries rely on a number of dependencies. Depending on which platform you are using the C client libraries for, these dependencies might be included in the client library. If they are not included in the client library, ensure that the dependencies are available on your development system.

For more information about dependencies on each supported platform, see C.

The C client library statically links to Apache Portable Runtime (APR) version 1.5 with APR-util. Ensure that you set APR_DECLARE_STATIC and APU_DECLARE_STATIC before you use any APR includes. You can set these values in the following ways:
  • By including diffusion.h before any APR includes. The diffusion.h file sets these values.
  • As command-line flags
For more information, see http://apr.apache.org

To complete this example, you need a Diffusion server and a development system with the .NET Framework installed on it.

You also require either a named user that has a role with the modify_topic and update_topic permissions. For example, the "ADMINISTRATOR" role. For more information about roles and permissions, see Role-based authorization.

This example steps through the lines of code required to create and update a topic. The full code example is provided after the steps.
  1. Get the Diffusion C client library for your platform and extract the ZIP file.
    The C client library is available in the clients/c directory of the Diffusion installation.
  2. Create a C file called getting-started-publisher.c.

    1. Include the following libraries:
      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      
      #include <apr.h>
      #include <apr_thread_mutex.h>
      #include <apr_thread_cond.h>
      
      #include "diffusion.h"
      #include "args.h"
      #include "conversation.h"
      #include "service/svc-update.h"

    2. Declare an int value that and APR objects to use to manage threading:
      int active = 0;
      
      apr_pool_t *pool = NULL;
      apr_thread_mutex_t *mutex = NULL;
      apr_thread_cond_t *cond = NULL;

    3. Create a main method:
      int
      main(int argc, char **argv)
      {
      
      }

    4. Inside the main method define a constant that is the topic name:
              const char *topic_name = "foo/counter";

    5. Next, add the following lines to initialize the threading mechanism:
        apr_initialize();
              apr_pool_create(&pool, NULL);
              apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
              apr_thread_cond_create(&, pool);
  3. Create a connection to the Diffusion server.
    Inside the main method add the following lines:
            /*
             * Create a session
             */
            DIFFUSION_ERROR_T error = { 0 };
            SESSION_T *session = NULL;
            // Edit this line to include the host and port of your Diffusion server
            session = session_create("ws://hostname:port", "user", credentials_create_password("password"), NULL, NULL, &error);
            if(session == NULL) {
                    fprintf(stderr, "TEST: Failed to create session\n");
                    fprintf(stderr, "ERR : %s\n", error.message);
                    return EXIT_FAILURE;
            } else {
                    fprintf(stdout, "Connected\n");
            }
    Where hostname is the name of the system hosting your Diffusion server, hostname is the name the Diffusion server accepts client connections on, user is the name of a user with the permissions required to subscribe to a topic, and password is the user's password.
    The client logs the string "Connected" to the console if the connection is a success.
  4. Create the foo/counter topic.
    1. Above the main method, create a callback for when the topic is added.
      static int
      on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
      {
              printf("Added topic\n");
              apr_thread_mutex_lock(mutex);
              apr_thread_cond_broadcast(cond);
              apr_thread_mutex_unlock(mutex); 
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console when the foo/counter topic is created.
    2. Above the main method, create a callback for if the topic add fails.
      static int
      on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
      {
              printf("Failed to add topic (%d)\n", response->response_code);
              apr_thread_mutex_lock(mutex);
              apr_thread_cond_broadcast(cond);
              apr_thread_mutex_unlock(mutex); 
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console if the client was unable to create the foo/counter topic.
    3. In the main method, define the topic details:
              const TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
      This defines the topic type as a single value topic of data type string.
    4. Define the parameters for the add topic request:
              const ADD_TOPIC_PARAMS_T add_topic_params = {
                      .topic_path =  topic_name,
                      .details = string_topic_details,
                      .on_topic_added = on_topic_added,
                      .on_topic_add_failed = on_topic_add_failed
              };
    5. Within a locked thread, call the add_topic method:
              apr_thread_mutex_lock(mutex); 
              add_topic(session, add_topic_params);
              apr_thread_cond_wait(cond, mutex);
              apr_thread_mutex_unlock(mutex);
  5. Register an update source.
    1. Above the main method, create a callback for when the update source is registered.
      static int
      on_update_source_registered(SESSION_T *session,
                                  const CONVERSATION_ID_T *updater_id,
                                  const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                                  void *context)
      {
              printf("Registered update source\n");
              apr_thread_mutex_lock(mutex);
              apr_thread_cond_broadcast(cond);
              apr_thread_mutex_unlock(mutex); 
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console when the update source is registered.
    2. Above the main method, create a callback for when the update source becomes active.
      static int
      on_update_source_active(SESSION_T *session,
                              const CONVERSATION_ID_T *updater_id,
                              const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                              void *context)
      {
              printf("Topic source active\n");
              active = 1;
              apr_thread_mutex_lock(mutex);
              apr_thread_cond_broadcast(cond);
              apr_thread_mutex_unlock(mutex);
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console when the update source becomes the active update source and can update the topic exclusively. The callback also sets the active flag to 1.
    3. In the main method, define the parameters for registering the update source:
              const UPDATE_SOURCE_REGISTRATION_PARAMS_T update_reg_params = {
                      .topic_path = topic_name,
                      .on_registered = on_update_source_registered,
                      .on_active = on_update_source_active,
              };
    4. Within a locked thread, call the register_update_source method:
              apr_thread_mutex_lock(mutex); 
              const CONVERSATION_ID_T *updater_id = register_update_source(session, update_reg_params);
              apr_thread_cond_wait(cond, mutex);
              apr_thread_mutex_unlock(mutex);
                                                      
      Registering an update source returns an updater ID that can be used to update topics.
  6. Send updates to the topic.
    1. Above the main method, create a callback for when a topic update is a success.
      static int
      on_update_success(SESSION_T *session,
                        const CONVERSATION_ID_T *updater_id,
                        const SVC_UPDATE_RESPONSE_T *response,
                        void *context)
      {
              printf("Updated topic\n");
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console when the foo/counter topic is successfully updated.
    2. Above the main method, create a callback for if a topic update fails.
      static int
      on_update_failure(SESSION_T *session,
                        const CONVERSATION_ID_T *updater_id,
                        const SVC_UPDATE_RESPONSE_T *response,
                        void *context)
      {
              printf("Update failed\n");
              return HANDLER_SUCCESS;
      }
      This callback prints a message to the console if an update of the foo/counter topic fails.
    3. Define the unchanging parameters for updating a topic using the update source.
              UPDATE_SOURCE_PARAMS_T update_source_params_base = {
                      .updater_id = updater_id,
                      .topic_path = topic_name,
                      .on_success = on_update_success,
                      .on_failure = on_update_failure
              };
    4. Define a variable count which is used for the value published to the topic:
              int count=1;
    5. Create a loop that runs while the update source is active:
              while(active) {
                                                      
              }
    6. Inside the loop, create a buffer to contain the counter value:
                      BUF_T *buf = buf_create();
                      char str[15];
                      sprintf(str, "%d", count);
                      buf_write_string(buf, str);
    7. Next, use the buffer to create content:
                      CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, buf);
    8. Create an update from the content:
                      UPDATE_T *upd = update_create(UPDATE_ACTION_REFRESH,
                                                              UPDATE_TYPE_CONTENT,
                                                              content);
    9. Add the update into the parameters defined for the update.
                      UPDATE_SOURCE_PARAMS_T update_source_params = update_source_params_base;
                      update_source_params.update = upd;
    10. Update the topic.
                      update(session, update_source_params);
    11. Free the resources used by this iteration of the loop.
                      content_free(content);
                      update_free(upd);
                      buf_free(buf);
    12. Wait for a second and increment the count variable before the next iteration of the loop:
                      sleep(1);
                      count++;
  7. Close the session with the Diffusion server and close the client.
            session_close(session, NULL);
            session_free(session);
    
            apr_thread_mutex_destroy(mutex);
            apr_thread_cond_destroy(cond);
            apr_pool_destroy(pool);
            apr_terminate();
    
            return EXIT_SUCCESS;
    Ensure that you free all resources and destroy the threading objects.
  8. Build your C client.
    1. Create a Makefile in the same directory as your C file.
      An example Makefile is provided after the steps.
    2. Ensure that your Makefile links to the include and lib directory of the Diffusion C library.
      DIFFUSION_C_CLIENT_INCDIR	= ../path-to-client/include
      DIFFUSION_C_CLIENT_LIBDIR	=  ../path-to-client/lib
    3. Run the make command to build the example.
      The getting-started-publisher binary is created in the target/bin directory.
  9. Run your C client from the command line.

The client updates the value of the foo/counter topic. You can see the value of the foo/counter topic by creating a subscribing client to subscribe to the topic. For more information, see Start subscribing with C.

The completed getting-started-publisher.c file contains the following code:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"
#include "conversation.h"
#include "service/svc-update.h"

int active = 0;

apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;

/*
 * Handlers for add topic feature.
 */
static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        printf("Added topic\n");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex); 
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        printf("Failed to add topic (%d)\n", response->response_code);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex); 
        return HANDLER_SUCCESS;
}


/*
 * Handlers for creating update source
 */
static int
on_update_source_registered(SESSION_T *session,
                            const CONVERSATION_ID_T *updater_id,
                            const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                            void *context)
{
        printf("Registered update source\n");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex); 
        return HANDLER_SUCCESS;
}

static int
on_update_source_active(SESSION_T *session,
                        const CONVERSATION_ID_T *updater_id,
                        const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                        void *context)
{
        printf("Topic source active\n");
        active = 1;
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

/*
 * Handlers for update of data.
 */
static int
on_update_success(SESSION_T *session,
                  const CONVERSATION_ID_T *updater_id,
                  const SVC_UPDATE_RESPONSE_T *response,
                  void *context)
{
        printf("Updated topic\n");
        return HANDLER_SUCCESS;
}

static int
on_update_failure(SESSION_T *session,
                  const CONVERSATION_ID_T *updater_id,
                  const SVC_UPDATE_RESPONSE_T *response,
                  void *context)
{
        printf("Update failed\n");
        return HANDLER_SUCCESS;
}


int
main(int argc, char** argv)
{
        

        const char *topic_name = "foo/counter";

        /*
         * Setup for condition variable.
         */
        apr_initialize();
        apr_pool_create(&pool, NULL);
        apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
        apr_thread_cond_create(&, pool);

        /*
         * Create a session
         */
        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };
         
         // Edit this line to include the host and port of your Diffusion server
        session = session_create("ws://hostname:port", "user", credentials_create_password("password"), NULL, NULL, &error);
       if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        } else {
                fprintf(stdout, "Connected\n");
        }

        /*
         * Create a topic holding simple string content.
         */
        const TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
        const ADD_TOPIC_PARAMS_T add_topic_params = {
                .topic_path = "foo/counter",
                .details = string_topic_details,
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed
        };

        apr_thread_mutex_lock(mutex); 
        add_topic(session, add_topic_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

/*  
         * Define the handlers for add_update_source()
         */
        const UPDATE_SOURCE_REGISTRATION_PARAMS_T update_reg_params = {
                .topic_path = topic_name,
                .on_registered = on_update_source_registered,
                .on_active = on_update_source_active,
        };

        /*
         * Register an updater.
         */
        apr_thread_mutex_lock(mutex); 
        const CONVERSATION_ID_T *updater_id = register_update_source(session, update_reg_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Define default parameters for an update source.
         */
        UPDATE_SOURCE_PARAMS_T update_source_params_base = {
                .updater_id = updater_id,
                .topic_path = topic_name,
                .on_success = on_update_success,
                .on_failure = on_update_failure
        };


        int count=1;
        while(active) {

                /*
                 * Create an update structure containing the counter.
                 */
                BUF_T *buf = buf_create();
                char str[15];
                sprintf(str, "%d", count);
                buf_write_string(buf, str);

                CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, buf);

                UPDATE_T *upd = update_create(UPDATE_ACTION_REFRESH,
                                              UPDATE_TYPE_CONTENT,
                                              content);

                UPDATE_SOURCE_PARAMS_T update_source_params = update_source_params_base;
                update_source_params.update = upd;

                /*
                 * Update the topic.
                 */
                update(session, update_source_params);

                content_free(content);
                update_free(upd);
                buf_free(buf);

                sleep(1);
                count++;
        }

        /*
         * Close session and free resources.
         */
        session_close(session, NULL);
        session_free(session);

        apr_thread_mutex_destroy(mutex);
        apr_thread_cond_destroy(cond);
        apr_pool_destroy(pool);
        apr_terminate();

        return EXIT_SUCCESS;
}
The Makefile contains the following code:
# The following two variables must be set.
#
# Directory containing the C client include files.
DIFFUSION_C_CLIENT_INCDIR	= ../path-to-client/include
#
# Directory containing libdiffusion.a
DIFFUSION_C_CLIENT_LIBDIR	=  ../path-to-client/lib

ifndef DIFFUSION_C_CLIENT_INCDIR
$(error DIFFUSION_C_CLIENT_INCDIR is not set)
endif

ifndef DIFFUSION_C_CLIENT_LIBDIR
$(error DIFFUSION_C_CLIENT_LIBDIR is not set)
endif

CC		= gcc

# Extra definitions from parent directory, if they exist.
-include ../makefile.defs

CFLAGS		+= -g -Wall -Werror -std=c99 -D_POSIX_C_SOURCE=200112L -D_XOPEN_SOURCE=700 -c -I$(DIFFUSION_C_CLIENT_INCDIR)
LDFLAGS		+= $(LIBS) $(DIFFUSION_C_CLIENT_LIBDIR)/libdiffusion.a -lpthread -lpcre -lssl -lcrypto

ARFLAGS		+=
SOURCES = getting-started-publisher.c

TARGETDIR	= target
OBJDIR		= $(TARGETDIR)/objs
BINDIR		= $(TARGETDIR)/bin
OBJECTS		= $(SOURCES:.c=.o)
TARGETS = getting-started-publisher

all:		prepare $(TARGETS)
.PHONY:		all

prepare:
		mkdir -p $(OBJDIR) $(BINDIR)

$(OBJDIR)/%.o:	%.c
		$(CC) $(CFLAGS) -o $@ $<

getting-started-publisher:	$(OBJDIR)/getting-started-publisher.o
		$(CC) $< $(LDFLAGS) -o $(BINDIR)/$@

clean:
		rm -rf $(TARGETS) $(OBJECTS) $(TARGETDIR) core a.out