Start publishing with C
Create a C client that publishes data through topics on the Diffusion™ server.
For more information about dependencies on each supported platform, see C.
- By including diffusion.h before any APR includes. The diffusion.h file sets these values.
- As command-line flags
To complete this example, you need a Diffusion server and a development system with the .NET Framework installed on it.
You also require either a named user that has a role with the "ADMINISTRATOR" role. For more information about roles and permissions, see Role-based authorization.
and permissions. For example, theThe client updates the value of the foo/counter topic. You can see the value of the foo/counter topic by creating a subscribing client to subscribe to the topic. For more information, see Start subscribing with C.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include "diffusion.h"
#include "args.h"
#include "conversation.h"
#include "service/svc-update.h"
int active = 0;
apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;
/*
* Handlers for add topic feature.
*/
static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
printf("Added topic\n");
apr_thread_mutex_lock(mutex);
apr_thread_cond_broadcast(cond);
apr_thread_mutex_unlock(mutex);
return HANDLER_SUCCESS;
}
static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
printf("Failed to add topic (%d)\n", response->response_code);
apr_thread_mutex_lock(mutex);
apr_thread_cond_broadcast(cond);
apr_thread_mutex_unlock(mutex);
return HANDLER_SUCCESS;
}
/*
* Handlers for creating update source
*/
static int
on_update_source_registered(SESSION_T *session,
const CONVERSATION_ID_T *updater_id,
const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
void *context)
{
printf("Registered update source\n");
apr_thread_mutex_lock(mutex);
apr_thread_cond_broadcast(cond);
apr_thread_mutex_unlock(mutex);
return HANDLER_SUCCESS;
}
static int
on_update_source_active(SESSION_T *session,
const CONVERSATION_ID_T *updater_id,
const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
void *context)
{
printf("Topic source active\n");
active = 1;
apr_thread_mutex_lock(mutex);
apr_thread_cond_broadcast(cond);
apr_thread_mutex_unlock(mutex);
return HANDLER_SUCCESS;
}
/*
* Handlers for update of data.
*/
static int
on_update_success(SESSION_T *session,
const CONVERSATION_ID_T *updater_id,
const SVC_UPDATE_RESPONSE_T *response,
void *context)
{
printf("Updated topic\n");
return HANDLER_SUCCESS;
}
static int
on_update_failure(SESSION_T *session,
const CONVERSATION_ID_T *updater_id,
const SVC_UPDATE_RESPONSE_T *response,
void *context)
{
printf("Update failed\n");
return HANDLER_SUCCESS;
}
int
main(int argc, char** argv)
{
const char *topic_name = "foo/counter";
/*
* Setup for condition variable.
*/
apr_initialize();
apr_pool_create(&pool, NULL);
apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
apr_thread_cond_create(&, pool);
/*
* Create a session
*/
SESSION_T *session;
DIFFUSION_ERROR_T error = { 0 };
// Edit this line to include the host and port of your Diffusion server
session = session_create("ws://hostname:port", "user", credentials_create_password("password"), NULL, NULL, &error);
if(session == NULL) {
fprintf(stderr, "TEST: Failed to create session\n");
fprintf(stderr, "ERR : %s\n", error.message);
return EXIT_FAILURE;
} else {
fprintf(stdout, "Connected\n");
}
/*
* Create a topic holding simple string content.
*/
const TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
const ADD_TOPIC_PARAMS_T add_topic_params = {
.topic_path = "foo/counter",
.details = string_topic_details,
.on_topic_added = on_topic_added,
.on_topic_add_failed = on_topic_add_failed
};
apr_thread_mutex_lock(mutex);
add_topic(session, add_topic_params);
apr_thread_cond_wait(cond, mutex);
apr_thread_mutex_unlock(mutex);
/*
* Define the handlers for add_update_source()
*/
const UPDATE_SOURCE_REGISTRATION_PARAMS_T update_reg_params = {
.topic_path = topic_name,
.on_registered = on_update_source_registered,
.on_active = on_update_source_active,
};
/*
* Register an updater.
*/
apr_thread_mutex_lock(mutex);
const CONVERSATION_ID_T *updater_id = register_update_source(session, update_reg_params);
apr_thread_cond_wait(cond, mutex);
apr_thread_mutex_unlock(mutex);
/*
* Define default parameters for an update source.
*/
UPDATE_SOURCE_PARAMS_T update_source_params_base = {
.updater_id = updater_id,
.topic_path = topic_name,
.on_success = on_update_success,
.on_failure = on_update_failure
};
int count=1;
while(active) {
/*
* Create an update structure containing the counter.
*/
BUF_T *buf = buf_create();
char str[15];
sprintf(str, "%d", count);
buf_write_string(buf, str);
CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, buf);
UPDATE_T *upd = update_create(UPDATE_ACTION_REFRESH,
UPDATE_TYPE_CONTENT,
content);
UPDATE_SOURCE_PARAMS_T update_source_params = update_source_params_base;
update_source_params.update = upd;
/*
* Update the topic.
*/
update(session, update_source_params);
content_free(content);
update_free(upd);
buf_free(buf);
sleep(1);
count++;
}
/*
* Close session and free resources.
*/
session_close(session, NULL);
session_free(session);
apr_thread_mutex_destroy(mutex);
apr_thread_cond_destroy(cond);
apr_pool_destroy(pool);
apr_terminate();
return EXIT_SUCCESS;
}
# The following two variables must be set. # # Directory containing the C client include files. DIFFUSION_C_CLIENT_INCDIR = ../path-to-client/include # # Directory containing libdiffusion.a DIFFUSION_C_CLIENT_LIBDIR = ../path-to-client/lib ifndef DIFFUSION_C_CLIENT_INCDIR $(error DIFFUSION_C_CLIENT_INCDIR is not set) endif ifndef DIFFUSION_C_CLIENT_LIBDIR $(error DIFFUSION_C_CLIENT_LIBDIR is not set) endif CC = gcc # Extra definitions from parent directory, if they exist. -include ../makefile.defs CFLAGS += -g -Wall -Werror -std=c99 -D_POSIX_C_SOURCE=200112L -D_XOPEN_SOURCE=700 -c -I$(DIFFUSION_C_CLIENT_INCDIR) LDFLAGS += $(LIBS) $(DIFFUSION_C_CLIENT_LIBDIR)/libdiffusion.a -lpthread -lpcre -lssl -lcrypto ARFLAGS += SOURCES = getting-started-publisher.c TARGETDIR = target OBJDIR = $(TARGETDIR)/objs BINDIR = $(TARGETDIR)/bin OBJECTS = $(SOURCES:.c=.o) TARGETS = getting-started-publisher all: prepare $(TARGETS) .PHONY: all prepare: mkdir -p $(OBJDIR) $(BINDIR) $(OBJDIR)/%.o: %.c $(CC) $(CFLAGS) -o $@ $< getting-started-publisher: $(OBJDIR)/getting-started-publisher.o $(CC) $< $(LDFLAGS) -o $(BINDIR)/$@ clean: rm -rf $(TARGETS) $(OBJECTS) $(TARGETDIR) core a.out
This page last modified: 2015/05/01