Create a
Java™
client that publishes data through topics
on
Diffusion™
Cloud
.
To complete this example, you need a
Diffusion
Cloud
service and a development system with
Java
installed
on it. For more information about getting a
Diffusion
Cloud
service, see Getting started with Diffusion Cloud.
You also require a principal that has a role with the modify_topic and update_topic permissions. For example, the
"ADMINISTRATOR" role. For more information about roles and permissions, see
Role-based authorization.
This example steps through the lines of code required to publish a value to
a JSON topic. The full code example is provided after the steps.
-
Include the client jar file on the build classpath of your
Java
client. You can use one of the following
methods:
- You can use Maven to declare the dependency. First add the DiffusionData public repository to your
pom.xml file:
<repositories>
<repository>
<id>push-repository</id>
<url>https://download.diffusiondata.com/maven/</url>
</repository>
</repositories>
Next declare the following dependency in your pom.xml
file:
<dependency>
<groupId>com.pushtechnology.diffusion</groupId>
<artifactId>diffusion-client</artifactId>
<version>
Where
version is the
Diffusion
Cloud
version, for example
6.10.3
.
- If you are not using Maven, you can include the
diffusion-client.jar file that is located here:
Download the
Java
client
libraries.
-
Create a PublishingClient class that imports the following
packages and classes:
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public final class PublishingClient {
}
The com.pushtechnology.diffusion.client packages contain the classes to use to
interact with
Diffusion
Cloud
. The
java.util.concurrent.CompletableFuture and
java.util.concurrent.TimeUnit classes are used to
simplify this example. The CompletableFuture result is accessed with get, which is
synchronous. However, the
Diffusion
API is designed to be most powerful
when used asynchronously.
-
Create a main method.
public final class PublishingClient {
public static void main(String... arguments) throws InterruptedException, ExecutionException, TimeoutException {
}
}
-
Inside the main method, connect to
Diffusion
Cloud
.
Make sure to edit the host value in the example to match
your Diffusion server's hostname or IP address.
The principal you use to connect must have modify_topic and update_topic permissions. In this example, we use the default
control principal.
final Session session = Diffusion.sessions().principal("control")
.password("password").open("ws://host:80");
Or you can connect securely to
Diffusion
Cloud
through port 443 using
Secure Sockets Layer (SSL):
.open("wss://host:443");
Replace the host, principal, and
password values with your own information.
You can choose to connect anonymously if anonymous sessions are assigned the
modify_topic and update_topic permissions. However, we do
not recommend that anonymous sessions are given write access to data on
Diffusion
Cloud
.
-
Next, in the main method, get the TopicControl and TopicUpdateControl features.
// Get the TopicControl and TopicUpdateControl feature
TopicControl topicControl = session.feature(TopicControl.class);
TopicUpdateControl updateControl = session
.feature(TopicUpdateControl.class);
The
TopicControl feature enables a
client to create and delete topics. For more information, see
Managing topics.
The
TopicUpdateControl feature
enables a client to publish updates to a topic. For more information, see
Updating topics.
-
Next, create an instance of JSONDataType to store the value of the JSON topic.
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
-
Next, in the main method, use the TopicControl feature to create the foo/counter topic.
// Create a JSON topic 'foo/counter'
final CompletableFuture<TopicControl.AddTopicResult> future = topicControl.addTopic(
"foo/counter",
TopicType.JSON);
// Wait for the CompletableFuture to complete
future.get(10, TimeUnit.SECONDS);
-
Next, in the main method, loop once a second updating the
foo/counter topic with an
incrementing count from 0 to 1000.
Use the updateControl.updater().update() method to update a
topic without locking that topic.
// Update the topic
final UpdateCallback updateCallback = new UpdateCallback.Default();
for (int i = 0; i < 1000; ++i) {
final JSON value = jsonDataType.fromJsonString(String.format("{\"count\" : %d }", i));
// Use the non-exclusive updater to update the topic without locking it
updateControl.updater().update(
"foo/counter",
Integer.toString(i),
updateCallback);
Thread.sleep(1000);
}
-
Compile and run your client.
We recommend that you run your client using the
Java Development Kit (JDK)
rather than the
Java Runtime Environment (JRE)
. The
JDK
includes additional diagnostic capabilities that might be useful.
The client publishes a value to the foo/counter topic
every second. You can subscribe to the foo/counter topic by using the
Diffusion
Cloud Dashboard's test client or by creating
a client to subscribe to the topic. For more information, see Start subscribing with Java.
Full example
The completed
PublishingClient class contains the following
code:
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;
import java.util.concurrent.CountDownLatch;
/**
* A client that publishes an incrementing count to the topic 'foo/counter'.
*
* @author DiffusionData Limited
* @since 5.9
*/
public final class PublishingClient {
/**
* Main.
*/
public static void main(String... arguments) throws InterruptedException, ExecutionException, TimeoutException {
// Connect using a principal with 'modify_topic' and 'update_topic'
// permissions
// Change 'host' to the hostname/address of your Diffusion server
final Session session = Diffusion.sessions().principal("control")
.password("password").open("ws://host:80");
// Get the TopicControl and TopicUpdateControl feature
final TopicControl topicControl = session.feature(TopicControl.class);
final TopicUpdateControl updateControl =
session.feature(TopicUpdateControl.class);
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
// Create a JSON topic 'foo/counter'
final CompletableFuture<TopicControl.AddTopicResult> future = topicControl.addTopic(
"foo/counter",
TopicType.JSON);
// Wait for the CompletableFuture to complete
future.get(10, TimeUnit.SECONDS);
// Update the topic
final UpdateCallback updateCallback = new UpdateCallback.Default();
for (int i = 0; i < 1000; ++i) {
final JSON value = jsonDataType.fromJsonString(String.format("{\"count\" : %d }", i));
// Use the non-exclusive updater to update the topic without locking it
updateControl.updater().update(
"foo/counter",
value,
updateCallback);
Thread.sleep(1000);
}
}
}