Start publishing with Java
Create a Java™ client that publishes data through topics on the Diffusion™ server.
To complete this example, you need a Diffusion server and a development system with Java installed on it.
You also require a principal that has a role with the "ADMINISTRATOR" role. For more information about roles and permissions, see Role-based authorization.
and permissions. For example, theThe client publishes a value to the foo/counter topic every second. You can subscribe to the foo/counter topic by creating a client to subscribe to the topic. For more information, see Start subscribing with Java.
Full example
The completed
PublishingClient class contains the following
code:
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.json.JSON;
import com.pushtechnology.diffusion.datatype.json.JSONDataType;
import java.util.concurrent.CountDownLatch;
/**
* A client that publishes an incrementing count to the topic 'foo/counter'.
*
* @author Push Technology Limited
* @since 5.9
*/
public final class PublishingClient {
/**
* Main.
*/
public static void main(String... arguments) throws InterruptedException {
// Connect using a principal with 'modify_topic' and 'update_topic'
// permissions
// Change 'host' to the hostname/address of your Diffusion server
final Session session = Diffusion.sessions().principal("control")
.password("password").open("ws://host:port");
// Get the TopicControl and TopicUpdateControl feature
final TopicControl topicControl = session.feature(TopicControl.class);
final TopicUpdateControl updateControl =
session.feature(TopicUpdateControl.class);
final JSONDataType jsonDataType = Diffusion.dataTypes().json();
final CountDownLatch waitForStart = new CountDownLatch(1);
// Create a JSON topic 'foo/counter'
topicControl.addTopic(
"foo/counter",
TopicType.JSON,
new AddCallback.Default() {
@Override
public void onTopicAdded(String topicPath) {
waitForStart.countDown();
}
});
// Wait for the onTopicAdded() callback.
waitForStart.await();
// Update the topic
final UpdateCallback updateCallback = new UpdateCallback.Default();
for (int i = 0; i < 1000; ++i) {
final JSON value = jsonDataType.fromJsonString(String.format("{\"count\" : %d }", i));
// Use the non-exclusive updater to update the topic without locking it
updateControl.updater().update(
"foo/counter",
value,
updateCallback);
Thread.sleep(1000);
}
}
}
This page last modified: 6-12