Just a second...

Start subscribing with C

Create a C client within minutes that connects to Diffusion™ Cloud . This example creates a client that subscribes to a JSON topic called "processes" and prints its value to the console when the topic is updated.

The C client libraries rely on a number of dependencies. Depending on the platform you are using, these dependencies might be included in the client library. If they are not included in the client library, ensure that the dependencies are available on your development system.

For more information about dependencies on each supported platform, see C.

The C client library statically links to Apache Portable Runtime (APR) version 1.6.1 with APR-util. Ensure that you set APR_DECLARE_STATIC and APU_DECLARE_STATIC before you use any APR includes. You can set these values in the following ways:
  • By including diffusion.h before any APR includes. The diffusion.h file sets these values.
  • As command-line flags.
For more information, see http://apr.apache.org

To complete this example, you need a Diffusion Cloud service. For more information about getting a Diffusion Cloud service, see Getting started with Diffusion Cloud.

You also must either:
  • create a named user that has a role with the select_topic and read_topic permissions.
  • assign a role with those permissions to anonymous client sessions.
An example of a suitable role is "CLIENT". For more information about roles and permissions, see Role-based authorization.
This example steps through the lines of code required to subscribe to a topic. The full code example is provided after the steps.
  1. Get the Diffusion Cloud C client library for your platform and extract the ZIP file.
    The C client library is available from http://download.diffusiondata.com/cloud/latest/sdks.html.
  2. Create a C file called cjson-subscribing-example.c.
    1. Include the following libraries:
      
      
                              
    2. Define some values to be used later.
      
      
                                  
                                  

      Where hostname is the name of the system hosting your Diffusion Cloud server (for example, yourserver.eu.diffusion.cloud), port is the port Diffusion Cloud that accepts client connections on (80 or 443 for a secure connection), user is the name of a user with the permissions required to subscribe to a topic, and password is the user's password.

  3. Create a main method.
    This does standard parsing of the command-line parameters.
  4. Set up a mutex and condition variable, then create a session with the server.
    DIFFUSION_ERROR_T error = { 0 };
    SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
    if(session == NULL) {
            fprintf(stderr, "Failed to create session: %s\n", error.message);
            return EXIT_FAILURE;
    }
    
  5. Handle any incoming subscription notifications, subscribe to a topic provided in the command-line parameters (or the default one), process topic updates for 60 seconds then shut down.
    VALUE_STREAM_T value_stream = {
            .datatype = DATATYPE_JSON,
            .on_subscription = on_subscription,
            .on_unsubscription = on_unsubscription,
            .on_value = on_value,
            .on_close = on_close
    };
    add_stream(session, topic_name, &value_stream);
    
    SUBSCRIPTION_PARAMS_T params = {
            .topic_selector = topic_name
    };
    subscribe(session, params);
    
    // Receive updates for 2 minutes
    sleep(120);
    
    // close session
    session_close(session, NULL);
    
    // cleanup
    session_free(session);
    credentials_free(credentials);
    hash_free(options, NULL, free);
    
    return EXIT_SUCCESS;
    
    Note that this code relies on callbacks which we will add in the following steps.
  6. Above the main method, create the 'on_subscription' callback to handle when this client receives a notification that it has been subscribed to a topic. Note that this could be called for topics that we have not explicitly subscribed to. Other control clients (or publishers) may request to subscribe this client to a topic.
    // Define the callback functions
    static int on_subscription(
            const char* topic_path,
            const TOPIC_SPECIFICATION_T *specification,
            void *context)
    {
            // value stream is now subscribed to `topic_path`
            return HANDLER_SUCCESS;
    }
    
  7. Create the on_topic_message callback. When a subscribed message is received, this callback is invoked to print the JSON value.
    static int on_value(
            const char* topic_path,
            const TOPIC_SPECIFICATION_T *const specification,
            const DIFFUSION_DATATYPE datatype,
            const DIFFUSION_VALUE_T *const old_value,
            const DIFFUSION_VALUE_T *const new_value,
            void *context)
    {
            // read the value update
            DIFFUSION_API_ERROR api_error;
            char *result;
            bool success = to_diffusion_json_string(new_value, &result, &api_error);
    
            if(success) {
                    printf("Received value: %s\n", result);
                    free(result);
            }
            else {
                    const char *description = get_diffusion_api_error_description(api_error);
                    printf("Error during diffusion value read: %s\n", description);
                    diffusion_api_error_free(api_error);
            }
            return HANDLER_SUCCESS;
    }
    
  8. Build your C client.
    1. Create a Makefile in the same directory as your C file.
      An example Makefile is provided after the steps.
    2. Run the make command to build the example.
      The cjson-subscribing-example binary is created in the target/bin directory.
  9. Run your C client from the command line.

The client prints to the console every time the value of the subscribed topic is updated. You can update the value of the topic by using the Diffusion Cloud Dashboard's test client or by creating a publishing client to update the topic. By default, the client subscribes to the processes topic used in the Start publishing with C example.

The completed cjson-subscribing-example.c file contains the following code:
/**
 * Copyright © 2022 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * This example is written in C99. Please use an appropriate C99 capable compiler
 *
 * @author DiffusionData Limited
 */

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"
#include "args.h"
#include "conversation.h"

#define SYNC_DEFAULT_TIMEOUT 5000 * 1000


ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, "user"},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, "password"},
        {'t', "topic", "Topic name to subscribe", ARG_OPTIONAL, ARG_HAS_VALUE, "time"},
        END_OF_ARG_OPTS
};


// Define the callback functions
static int on_subscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        void *context)
{
        // value stream is now subscribed to `topic_path`
        return HANDLER_SUCCESS;
}

static int on_unsubscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        // value stream is now unsubscribed from `topic_path`
        return HANDLER_SUCCESS;
}

static int on_value(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        const DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        // read the value update
        DIFFUSION_API_ERROR api_error;
        char *result;
        bool success = to_diffusion_json_string(new_value, &result, &api_error);

        if(success) {
                printf("Received value: %s\n", result);
                free(result);
        }
        else {
                const char *description = get_diffusion_api_error_description(api_error);
                printf("Error during diffusion value read: %s\n", description);
                diffusion_api_error_free(api_error);
        }
        return HANDLER_SUCCESS;
}

static void on_close()
{
        // value stream has been closed
}

// Program entry point.
int main(int argc, char** argv)
{
        // Standard command-line parsing.
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        const char *password = hash_get(options, "credentials");
        const char *topic_name = hash_get(options, "topic");

        CREDENTIALS_T *credentials = NULL;
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "Failed to create session: %s\n", error.message);
                return EXIT_FAILURE;
        }

        VALUE_STREAM_T value_stream = {
                .datatype = DATATYPE_JSON,
                .on_subscription = on_subscription,
                .on_unsubscription = on_unsubscription,
                .on_value = on_value,
                .on_close = on_close
        };
        add_stream(session, topic_name, &value_stream);

        SUBSCRIPTION_PARAMS_T params = {
                .topic_selector = topic_name
        };
        subscribe(session, params);

        // Receive updates for 2 minutes
        sleep(120);

        // close session
        session_close(session, NULL);

        // cleanup
        session_free(session);
        credentials_free(credentials);
        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}
The Makefile contains the following code:
# The following two variables must be set.
#
# Directory containing the C client include files.
# DIFFUSION_C_CLIENT_INCDIR	=
#
# Directory containing libdiffusion.a
# DIFFUSION_C_CLIENT_LIBDIR	=

ifndef DIFFUSION_C_CLIENT_INCDIR
$(error DIFFUSION_C_CLIENT_INCDIR is not set)
endif

ifndef DIFFUSION_C_CLIENT_LIBDIR
$(error DIFFUSION_C_CLIENT_LIBDIR is not set)
endif

# Extra definitions from parent directory, if they exist.
-include ../makefile.defs

CC      = gcc
CFLAGS  += $(INCLUDES) \
           -g -std=c99 -D_POSIX_C_SOURCE=200112L -D_XOPEN_SOURCE=700 \
           -Wall -Werror -Wno-error=deprecated-declarations \
           -I$(DIFFUSION_C_CLIENT_INCDIR)

LDFLAGS += -lpthread -lpcre -lcurl -lz \
           $(DIFFUSION_C_CLIENT_LIBDIR)/libdiffusion.a \
           $(LIBS)

# Detect the platform the Diffusion Client will be running on
PLATFORM =  $(shell uname -s | tr '[A-Z]' '[a-z]' | sed -e 's/darwin/osx/')

# If not MacOS, add `-lrt -ldl` to the linker flags
ifneq ($(PLATFORM),osx)
        LDFLAGS += -lrt -ldl
endif

ARFLAGS     +=
SOURCES     = cjson-subscribing-example.c
TARGETDIR   = target
OBJDIR      = $(TARGETDIR)/objs
BINDIR      = $(TARGETDIR)/bin
OBJECTS     = $(SOURCES:.c=.o)
TARGETS     = cjson-subscribing-example

all:        prepare $(TARGETS)
.PHONY:     all

prepare:
        mkdir -p $(OBJDIR) $(BINDIR)

$(OBJDIR)/%.o:  %.c
        $(CC) $(CFLAGS) -c -o $@ $<

cjson-subscribing-example:  json/cjson-subscribing-example.c
        $(CC) $^ $(CFLAGS) $(LDFLAGS) -lm -o $(BINDIR)/$@

clean:
        rm -rf $(TARGETS) $(OBJECTS) $(TARGETDIR) core a.out