Create a Java™ client that publishes data through topics
on Diffusion™ Cloud.
To complete this example, you need a Diffusion Cloud
service and a development system with Java installed
on it. For more information about getting a Diffusion Cloud
service, see Getting started with Diffusion Cloud.
You also require a principal that has a role with the modify_topic and update_topic permissions. For example, the
"ADMINISTRATOR" role. For more information about roles and permissions, see
Role-based authorization.
This example steps through the lines of code required to publish a value to
a JSON topic. The full code example is provided after the steps.
-
Include the client jar file on the build classpath of your Java client. You can use one of the following
methods:
- You can use Maven to declare the dependency. First add the Push Technology public repository to your
pom.xml file:
<repositories>
<repository>
<id>push-repository</id>
<url>https://download.pushtechnology.com/maven/</url>
</repository>
</repositories>
Next declare the following dependency in your pom.xml
file:
<dependency>
<groupId>com.pushtechnology.diffusion</groupId>
<artifactId>diffusion-client</artifactId>
<version>version</version>
</dependency>
Where
version is the Diffusion Cloud version, for example 6.4.1.
- If you are not using Maven, you can include the
diffusion-client.jar file that is located here:
Download the Java client
libraries.
-
Create a PublishingClient class that imports the following
packages and classes:
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public final class PublishingClient {
}
The com.pushtechnology.diffusion.client packages contain the classes to use to
interact with Diffusion Cloud. The
java.util.concurrent.CompletableFuture and
java.util.concurrent.TimeUnit classes are used to
simplify this example. The CompletableFuture result is accessed with get, which is
synchronous. However, the Diffusion API is designed to be most powerful
when used asynchronously.
-
Create a main method.
public final class PublishingClient {
public static void main(String... arguments) throws InterruptedException, ExecutionException, TimeoutException {
}
}
-
Inside the main method, connect to Diffusion Cloud.
Make sure to edit the host value in the example to match
your Diffusion server's hostname or IP address.
The principal you use to connect must have modify_topic and update_topic permissions. In this example, we use the default
control principal.
final Session session = Diffusion.sessions().principal("control")
.password("password").open("ws://host:80");
Or you can connect securely to
Diffusion Cloud through port 443 using
Secure Sockets Layer (SSL):
.open("wss://host:443");
Replace the host, principal, and
password values with your own information.
You can choose to connect anonymously if anonymous sessions are assigned the
modify_topic and update_topic permissions. However, we do
not recommend that anonymous sessions are given write access to data on Diffusion Cloud.
-
Next, in the main method, get the TopicControl and TopicUpdateControl features.
// Get the TopicControl and TopicUpdateControl feature
TopicControl topicControl = session.feature(TopicControl.class);
TopicUpdateControl updateControl = session
.feature(TopicUpdateControl.class);
The
TopicControl feature enables a
client to create and delete topics. For more information, see
Managing topics.
The
TopicUpdateControl feature
enables a client to publish updates to a topic. For more information, see
Updating topics.
-
Next, create an instance of JSONDataType to store the value of the JSON topic.
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
-
Next, in the main method, use the TopicControl feature to create the foo/counter topic.
// Create a JSON topic 'foo/counter'
final CompletableFuture<TopicControl.AddTopicResult> future = topicControl.addTopic(
"foo/counter",
TopicType.JSON);
// Wait for the CompletableFuture to complete
future.get(10, TimeUnit.SECONDS);
-
Next, in the main method, loop once a second updating the
foo/counter topic with an
incrementing count from 0 to 1000.
Use the updateControl.updater().update() method to update a
topic without locking that topic.
// Update the topic
final UpdateCallback updateCallback = new UpdateCallback.Default();
for (int i = 0; i < 1000; ++i) {
final JSON value = jsonDataType.fromJsonString(String.format("{\"count\" : %d }", i));
// Use the non-exclusive updater to update the topic without locking it
updateControl.updater().update(
"foo/counter",
Integer.toString(i),
updateCallback);
Thread.sleep(1000);
}
-
Compile and run your client.
We recommend that you run your client using the Java Development Kit (JDK) rather than the Java Runtime Environment (JRE). The JDK
includes additional diagnostic capabilities that might be useful.
The client publishes a value to the foo/counter topic
every second. You can subscribe to the foo/counter topic by using the Diffusion Cloud Dashboard's test client or by creating
a client to subscribe to the topic. For more information, see Start subscribing with Java.
Full example
The completed
PublishingClient class contains the following
code:
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;
import java.util.concurrent.CountDownLatch;
/**
* A client that publishes an incrementing count to the topic 'foo/counter'.
*
* @author Push Technology Limited
* @since 5.9
*/
public final class PublishingClient {
/**
* Main.
*/
public static void main(String... arguments) throws InterruptedException, ExecutionException, TimeoutException {
// Connect using a principal with 'modify_topic' and 'update_topic'
// permissions
// Change 'host' to the hostname/address of your Diffusion server
final Session session = Diffusion.sessions().principal("control")
.password("password").open("ws://host:80");
// Get the TopicControl and TopicUpdateControl feature
final TopicControl topicControl = session.feature(TopicControl.class);
final TopicUpdateControl updateControl =
session.feature(TopicUpdateControl.class);
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
// Create a JSON topic 'foo/counter'
final CompletableFuture<TopicControl.AddTopicResult> future = topicControl.addTopic(
"foo/counter",
TopicType.JSON);
// Wait for the CompletableFuture to complete
future.get(10, TimeUnit.SECONDS);
// Update the topic
final UpdateCallback updateCallback = new UpdateCallback.Default();
for (int i = 0; i < 1000; ++i) {
final JSON value = jsonDataType.fromJsonString(String.format("{\"count\" : %d }", i));
// Use the non-exclusive updater to update the topic without locking it
updateControl.updater().update(
"foo/counter",
value,
updateCallback);
Thread.sleep(1000);
}
}
}