NAV Navbar
JavaScript Apple (Swift) Android Java .NET (C#) C

Diffusion On-Prem Quick Start

Logo diffusion

Diffusion is a real-time API management platform that makes it simple, fast and efficient to move real-time data across the Internet.

This Quick Start Guide gives you an introduction to Diffusion and how to use it. First you'll download and install Diffusion to your local machine. Then you'll create a client and use the core publish and subscribe broadcast mechanism to send and receive data.

Install Diffusion Locally

On MacOS or Linux, from the terminal, run:

sh bin/diffusion.sh

or if you're using Java 11:

sh bin/diffusion_java11.sh

On Windows, either from Windows Explorer or the Command Terminal, run:

bin\diffusion.bat

or if you're using Java 11:

bin\diffusion_java11.bat

Once Diffusion is installed, navigate to the directory where you installed it and run the startup script.

You should see Diffusion's startup logs in the terminal.

By default, Diffusion's web console will now be running at localhost:8080 with admin credentials set to admin/password.

Get a Client Library

Include the Diffusion JavaScript client (with zlib support) in the <head> of your HTML.

<script src="https://download.pushtechnology.com/clients/6.5.1/js/diffusion-6.5.1.js"></script>
<script src="https://download.pushtechnology.com/clients/6.5.1/js/browserify-zlib-0.2.0.js"></script>

Or if you wish to use Node.js, install the latest Diffusion NPM module by running:

npm install --save diffusion

then import the library in your code:

const diffusion = require('diffusion');

The Diffusion manual has more information on installing and using the JavaScript SDK.

Also see these Knowledge Base articles about using Diffusion with popular JavaScript frameworks and libraries:

The Diffusion Apple SDK is available for download in two variants.

For iOS applications: diffusion-iphoneos-6.5.1.zip

For macOS applications: diffusion-macosx-6.5.1.zip

We recommend using Xcode 10.2. The lowest supported version is Xcode 9.2.

In your ViewController.swift file, import the client interface.

import Diffusion;

The Diffusion manual has more information on installing and using the Apple SDKs.

Download the Diffusion Android client: diffusion-android-6.5.1.jar.

In Android Studio, create a new project using API Level 19 or later, then copy the diffusion-android-6.5.1.jar file to the app/libs directory in your project.

Find the library by expanding the app/libs folder in the left-hand panel (the Project Tool Window). If the libs folder is not shown in the left-hand panel, use the pull-down menu at the top of the panel to select the Project view.

Right-click on the jar, select Add as Library... then accept the default library settings.

Right-click on the libs folder in the Project Tool Window, then select Add as Library. If the libs folder is not shown in the left-hand panel, use the pull-down menu at the top of the panel to select Project view.

Add the following code to the build.gradle file within your project:

compileOptions {
  sourceCompatibility JavaVersion.VERSION_1_8
  targetCompatibility JavaVersion.VERSION_1_8
 }

The Diffusion manual has more information on installing and using the Android SDK.

If you are using Maven, first add the Push Technology repo to your pom.xml file:

<repositories>
  <repository>
   <id>push-repository</id>
   <url>https://download.pushtechnology.com/maven/</url>
  </repository>
</repositories>

And then add the Diffusion client to your dependencies.

<dependencies>
  <dependency>
    <groupId>com.pushtechnology.diffusion</groupId>
    <artifactId>diffusion-client</artifactId>
    <version>6.5.1</version>
  </dependency>
</dependencies>

Or you can download the Diffusion Java client with dependencies: diffusion-client-with-dependencies-6.5.1.jar

The Diffusion manual has more information on installing and using the Java SDK.

Download the Diffusion .NET client assembly: diffusion-dotnet-6.5.1.zip

Or install the client using NuGet or .NET CLI (instructions here).

Import the Diffusion Client to your project:

using PushTechnology.ClientInterface;

The Diffusion manual has more information on installing and using the .NET SDK.

Download the C client libraries for your operating system:

For more details of dependencies and supported compilers for each OS, see the Diffusion manual page about the C SDK.

Download and configure the client library of your choice.

Connect to Diffusion

All interactions with Diffusion are done via our client libraries. When you use a client library to connect to Diffusion, this creates a session.

A session is able to send and receive data, and acts as the intermediary between Diffusion and your application code.

Let’s create a session:

diffusion.connect({
    host : "<HOST>",
    port : <PORT>,
    principal : "admin",
    credentials : "<PASSWORD>"
}).then(function(session) {
    // Use the new session here
});

Add a reference to the session in your ViewController class to prevent it from being garbage collected:

var session:PTDiffusionSession?

Connect to Diffusion within the viewDidLoad method:

let url = URL(string: "ws://<HOST>:<PORT>")!
let credentials = PTDiffusionCredentials(password: "<PASSWORD>")
let config = PTDiffusionSessionConfiguration(principal: "admin", credentials: credentials)

PTDiffusionSession.open(with: url, configuration: config) { (_session, error) -> Void in
    self.session = _session
}

Add the following lines to the ViewController class to handle any potential errors:

let errorHandler: (Error?) -> Void = { error in
    if (error != nil) {
        print(error!)
    }
}

In your project's app/src/main/AndroidManifest.xml, set the INTERNET permission.

<uses-permission android:name="android.permission.INTERNET"/>;

Insert the element between the opening <manifest> tag and the opening <application> tag. This permission is required to use the Diffusion API.

In the MainActivity.java file in your project, create a SessionHandler inner class. This is required to prevent Diffusion running in the main thread and potentially affecting your app's performance.

private class SessionHandler implements SessionFactory.OpenCallback {
        private Session session = null;

        @Override
        public void onOpened(Session session) {
            this.session = session;
        }

        @Override
        public void onError(ErrorReason errorReason) {

        }

        public void close() {
            if ( session != null ) {
              session.close();
            }
        }
    }

Either use Android Studio to resolve the imports, or paste the following into the import section of the file:

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionFactory;

In your MainActivity class, add the following private field.

private SessionHandler sessionHandler = null;

Then add the following lines to the onCreate method.

if (sessionHandler == null) {
    sessionHandler = new SessionHandler();

    Diffusion.sessions()
              .principal("admin")
              .password("<PASSWORD>")
              .open("ws://<HOST>:<PORT>", sessionHandler);
}

Again, use Android Studio to resolve the imports, or paste the following into the import section:

import com.pushtechnology.diffusion.client.Diffusion;

To properly free resources when your app is closed, override the onDestroy method of your MainActivity class.

if ( sessionHandler != null ) {
    sessionHandler.close();
    sessionHandler = null;
}
super.onDestroy();

In the main method of your Java project, add the following lines to connect to Diffusion:

final Session session = Diffusion.sessions()
  .principal("admin")
  .password("<PASSWORD>")
  .open("ws://<HOST>:<PORT>");

In the main method of your .NET project, add the following lines to connect to Diffusion:

var session = Diffusion.Sessions
    .Principal("admin")
    .Password("<PASSWORD>")
    .Open("ws://<HOST>:<PORT>:");
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "admin");
diffusion_session_factory_password(session_factory, "<PASSWORD>");

SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>:");

Parameter Value
<HOST> localhost
<PORT> 8080
<PASSWORD> password

Publish Data

Understanding topics and paths

Publish and subscribe real-time broadcast is one of the key ways of using Diffusion to move data in real-time.

Neither publishers nor subscribers need to know about the existence of each other, which lets you simplify your code to care solely about the data, not who is using it.

To publish data, your client must specify a topic.

A topic:

Unlike other messaging products, Diffusion allows you to have millions of unique topics. You can use as many or as few as needed to represent your desired data structure.

Topic paths are structured into hierarchies using the / character, much like URLs. For example, a set of topics used for streaming telemetry data from ships might have paths like:

/telemetry/gps/ss-victoria

/telemetry/gps/ss-narwhal

/telemetry/gps/ss-leo

This makes it easy to organize related topics, and for subscribers to select data at different levels of granularity.

For example, a subscriber that needs the GPS data for all the ships can subscribe to all topics matching the path:

/telemetry/gps/

Create and update a topic

session.topics.add('my-topic', diffusion.topics.TopicType.JSON);
var value = { foo : "bar" };
var result = session.topicUpdate.set("my-topic", diffusion.datatypes.json(), value);

First, create a JSON topic. Add the following code within the callback after establishing the session:

self.session!.topicControl.add(
withTopicPath: "my-topic",
        type: PTDiffusionTopicType.JSON,
        value: nil,
        completionHandler: self.errorHandler)

Now that the topic exists, you can publish data to it. Add the following lines:

var error: NSError? = nil
let value = PTDiffusionJSON(object: ["foo" : "bar" ], error: &error)

self.session!.topicUpdateControl.updater.update(
            withTopicPath: "my-topic",
            value: value,
            completionHandler: self.errorHandler)

First, create a JSON topic. Add the following lines within the onOpened method of the SessionHandler class:

 session.feature(TopicControl.class).addTopic(
                "my-topic",
                TopicType.JSON,
                new TopicControl.AddCallback.Default());

Now you need to publish the data. Since the topic holds JSON, you need to construct values of the appropriate type before updating. To do this, use the DataType library provided by Diffusion.

final JSONDataType jsonDataType = Diffusion.dataTypes().json();
final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");
final CompletableFuture result = session.feature(TopicUpdate.class).set("my-topic", JSON.class, value);

First, create a JSON topic. Add the following after the line that establishes the session:

session.feature(TopicControl.class).addTopic("my-topic", TopicType.JSON);

Now you need to publish the data. Since the topic holds JSON, you need to construct values of the appropriate type before updating. To do this, use the DataType library provided by Diffusion.

final JSONDataType jsonDataType = Diffusion.dataTypes().json();
final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");
final CompletableFuture result = session.feature(TopicUpdate.class).set("my-topic", JSON.class, value);

First, create a JSON topic. Add the following after the line that establishes the session:

await session.TopicControl.AddTopicAsync( my-topic, TopicType.JSON );

Now you need to publish the data. Since the topic holds JSON, you need to construct values of the appropriate type before updating. To do this, use the DataType library provided by Diffusion.

var jsonDataType = Diffusion.DataTypes.JSON;
var value = jsonDataType.FromJSONString("{\"foo\": \"bar\"}");
var result = session.TopicUpdate.SetAsync(
    "my-topic",
    value);
on_topic_added(SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context)
{
        printf("on_topic_added\n");
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, TOPIC_ADD_FAIL_RESULT_CODE result_code, const DIFFUSION_ERROR_T *error, void *context)
{
        printf("on_topic_add_failed: %d\n", result_code);
        return HANDLER_SUCCESS;
}

static int
on_topic_update(void *context)
{
        printf("topic update success\n");
        return HANDLER_SUCCESS;
}

ADD_TOPIC_CALLBACK_T callback = {
        .on_topic_added_with_specification = on_topic_added,
        .on_topic_add_failed_with_specification = on_topic_add_failed
};

TOPIC_SPECIFICATION_T *spec = topic_specification_init(TOPIC_TYPE_JSON);
add_topic_from_specification(session, "my-topic", spec, callback);

BUF_T *buf = buf_create();
write_diffusion_json_value("\"hello world\"", buf);

DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T topic_update_params = {
        .topic_path = "my-topic",
        .datatype = DATATYPE_JSON,
        .update = buf,
        .on_topic_update = on_topic_update
};

diffusion_topic_update_set(session, topic_update_params);

First, you will create a topic using the JSON topic type.

The path of the topic will be:

/my-topic

Then, you will publish this JSON data to the topic:

{"foo":"bar"}

Monitor topics with the console

You can use the Diffusion management console to see your new topic.

Go to localhost:8080 and log in with admin / password.

Click the Topics tab at left.

Qsg console

You should see my-topic listed in the topic browser. Click on it to display the topic's value.

Subscribe to Data

You can use security policies to determine which topics a session can subscribe to.

A session can subscribe to data published to topics. Subscription is a way for a session to tell Diffusion which topics it’s interested in. Diffusion will cache your subscriptions, so you can subscribe to topics which don’t yet exist, and retain full subscription state should your session disconnect and reconnect.

When a session subscribes to an existing topic, Diffusion will send that topic’s current value to the session immediately, and then send any subsequent updates as they arrive. This allows Diffusion to act as a cache, where subscribers are always guaranteed to get the latest value.

Before you subscribe, you need to first attach a stream to listen for updates.

session.addStream('my-topic',
                  diffusion.datatypes.json()).on('value',
                  function(topic, specification, newValue, oldValue) {
  console.log('New value:', newValue.get());
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.select('my-topic');

To receive data from Diffusion, you use value streams. Value streams provide callbacks for topic events, such as subscriptions and received values.

Let’s define a class that implements the JSON Value Stream protocol, which you will then use to subscribe to the topic you created.

class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {
        func diffusionStream(
    _ stream: PTDiffusionStream,
    didSubscribeToTopicPath topicPath: String,
    specification: PTDiffusionTopicSpecification) {
              print("Subscribed to: \(topicPath)")
        }

        func diffusionStream(
            _ stream: PTDiffusionValueStream,
           didUpdateTopicPath topicPath: String,
           specification: PTDiffusionTopicSpecification,
           oldJSON: PTDiffusionJSON?,
           newJSON: PTDiffusionJSON) {

           do {
              let value:Dictionary<String, Any> = try newJSON.object() as! Dictionary
              print("\(topicPath): \(value.description)")
           } catch {
              print("Unable to read message")
           }
    }
}

Create an instance of this new class and assign it to a property of the ViewController. This avoids the reference being garbage collected after the initial session connection closure has ended.

let delegate = StreamDelegate()

Now you can use this delegate to register a Value Stream.

let selector = PTDiffusionTopicSelector(expression: "my-topic")
let stream = PTDiffusionJSON.valueStream(with: self.delegate)

_session!.topics.add(stream, with: selector)

Now you can tell Diffusion that you want to receive updates from the topic you created by subscribing to the topic.

_session!.topics.subscribe(
withTopicSelectorExpression: "my-topic",
completionHandler: self.errorHandler)

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a stream to listen for updates.

import com.pushtechnology.diffusion.client.features.Topics.ValueStream.Default;

session.feature(Topics.class).addStream("my-topic", JSON.class, new Topics.ValueStream.Default<JSON>() {
  @Override
  public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
    System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
  }
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.feature(Topics.class).subscribe("my-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a stream to listen for updates.

import com.pushtechnology.diffusion.client.features.Topics.ValueStream.Default;

session.feature(Topics.class).addStream("my-topic", JSON.class, new Topics.ValueStream.Default<JSON>() {
  @Override
  public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
    System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
  }
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.feature(Topics.class).subscribe("my-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a value stream to listen for updates. Value streams provide callbacks for topic events, such as subscriptions and received values. Define a class that implements the IValueStream interface, which you will then use to subscribe to your topic.

class ExampleValueStream : IValueStream<IJSON>
 {
     public void OnSubscription(string topicPath, ITopicSpecification specification)
     {
         Console.WriteLine($"Subscribed to: {topicPath}");
     }

     public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
     {
         Console.WriteLine($"Unsubscribed from: {topicPath}");
     }

     public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
     {
         Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
     }

     public void OnClose()
     {
         // Not used
     }

     public void OnError(ErrorReason errorReason)
     {
         // Not used
     }
}

Now create a value stream instance and subscribe to your topic.

session.Topics.AddStream("my-topic", new ExampleValueStream());

await session.Topics.SubscribeAsync("my-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.


static int on_subscription(const char* topic_path,
                    const TOPIC_SPECIFICATION_T *specification,
                    void *context)
{
        printf("Subscribed to topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_unsubscription(const char* topic_path,
                      const TOPIC_SPECIFICATION_T *specification,
                      NOTIFY_UNSUBSCRIPTION_REASON_T reason,
                      void *context)
{
        printf("Unsubscribed from topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_value(const char* topic_path,
             const TOPIC_SPECIFICATION_T *const specification,
             const DIFFUSION_DATATYPE datatype,
             const DIFFUSION_VALUE_T *const old_value,
             const DIFFUSION_VALUE_T *const new_value,
             void *context)
{
        char *result;
        bool success = to_diffusion_json_string(new_value, &result, &api_error);

        if(success) {
                printf("Received value: %s\n", result);
                free(result);
                return HANDLER_SUCCESS;
        }

        DIFFUSION_API_ERROR api_error;
        printf("Error during diffusion value read: %s\n", get_diffusion_api_error_description(api_error));
        diffusion_api_error_free(api_error);
        return HANDLER_FAILURE;
}

static void on_close() 
{
        printf("Value stream closed\n");
}

VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscription,
        .on_unsubscription = on_unsubscription,
        .on_value = on_value,
        .on_close = on_close
};

add_stream(session, "my-topic", &value_stream);
SUBSCRIPTION_PARAMS_T params = {
        .topic_selector = "my-topic"
};

subscribe(session, params);

Now that we’ve created a topic and published some data to it, let’s add a subscription to retrieve the JSON value.

Next Steps

That’s it – you’ve successfully created a Diffusion client that creates a topic, publishes updates, and receives values!

Further exercises

Now you could try:

Documentation

To learn more about the Diffusion JavaScript SDK, please browse the JavaScript API Documentation.

To learn more about the Diffusion Apple SDK, please browse the Apple API Documentation.

To learn more about the Diffusion Android SDK, please browse the Android API Documentation.

To learn more about the Diffusion Java SDK, please browse the Java API Documentation.

To learn more about the Diffusion .NET SDK, please browse the .NET API Documentation.

To learn more about the Diffusion C SDK, please browse the C API Documentation.

More advanced code examples are available on our GitHub page.

For in-depth documentation about Diffusion, see the on-premise Diffusion manual.

To browse all the documentation, including docs for Diffusion Cloud and older versions of Diffusion, visit the Diffusion documentation server.

For more resources or to contact Push Technology support, visit the Diffusion Developer Hub.