Example: Fetch topic state
The following examples use the Unified API to fetch the current state of a topic without subscribing to the topic.
Apple
@import Diffusion; @interface FetchExample (PTDiffusionFetchStreamDelegate) <PTDiffusionFetchStreamDelegate> @end @implementation FetchExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Send fetch request. [session.topics fetchWithTopicSelectorExpression:@"*Assets//" delegate:self]; }]; } @end @implementation FetchExample (PTDiffusionFetchStreamDelegate) -(void)diffusionStream:(PTDiffusionStream * const)stream didFetchTopicPath:(NSString * const)topicPath content:(PTDiffusionContent * const)content { NSLog(@"Fetch Result: %@ = \"%@\"", topicPath, content); } -(void)diffusionDidCloseStream:(PTDiffusionStream * const)stream { NSLog(@"Fetch stream finished."); } -(void)diffusionStream:(PTDiffusionStream * const)stream didFailWithError:(NSError * const)error { NSLog(@"Fetch stream failed error: %@", error); } @end
Java
and Android
package com.pushtechnology.diffusion.examples; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.FetchContextStream; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.TopicSelector; /** * This is a simple example of a client that fetches the state of topics but * does not subscribe to them. * <P> * This makes use of the 'Topics' feature only. * * @author Push Technology Limited * @since 5.0 */ public final class ClientUsingFetch { private final Session session; private final Topics topics; /** * Constructor. */ public ClientUsingFetch() { session = Diffusion.sessions().principal("client").password("password") .open("ws://diffusion.example.com:80"); topics = session.feature(Topics.class); } /** * Issues a fetch request for a topic or selection of topics. * * @param topicSelector a {@link TopicSelector} expression * @param fetchContext context string to be returned with the fetch * response(s) * @param stream callback for fetch responses */ public void fetch( String topicSelector, String fetchContext, FetchContextStream<String> stream) { topics.fetch(topicSelector, fetchContext, stream); } /** * Close the session. */ public void close() { session.close(); } }
.NET
using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Session; namespace Examples { /// <summary> /// This is a simple example of a client that fetches the state of topics but does not subscribe to them. /// /// This makes use of the <see cref="ITopics"/> feature only. /// </summary> public class ClientUsingFetch { private readonly ISession session; private readonly ITopics topics; public ClientUsingFetch() { session = Diffusion.Sessions.Principal( "client" ).Password( "password" ) .Open( "ws://diffusion.example.com:80" ); topics = session.GetTopicsFeature(); } /// <summary> /// Issues a fetch request for a topic or selection of topics. /// </summary> /// <param name="topicSelector">A <see cref="TopicSelector"/> expression.</param> /// <param name="fetchContext">The context string to be returned with the fetch response(s).</param> /// <param name="stream">The callback for fetch responses.</param> public void Fetch( string topicSelector, string fetchContext, IFetchContextStream<string> stream ) { topics.Fetch( topicSelector, fetchContext, stream ); } /// <summary> /// Close the session. /// </summary> public void Close() { session.Close(); } } }
C
/* * This is a sample client which connects to Diffusion and demonstrates * the following features: * * 1. Fetch topic state using a user-specified topic selector. * 2. Connect to Diffusion with a username and password. * 3. Automatic retry of a connection if unable to connect at the first * attempt. */ #include <stdio.h> #include <unistd.h> #include "diffusion.h" #include "args.h" extern void topic_message_debug(); ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'t', "topic_selector", "Topic selector", ARG_REQUIRED, ARG_HAS_VALUE, NULL}, {'r', "retries", "Number of connection retries", ARG_OPTIONAL, ARG_HAS_VALUE, "3"}, {'d', "retry_delay", "Delay (in ms) between connection attempts", ARG_OPTIONAL, ARG_HAS_VALUE, "1000"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, END_OF_ARG_OPTS }; /* * This callback is used when the session state changes, e.g. when a session * moves from a "connecting" to a "connected" state, or from "connected" to * "closed". */ static void on_session_state_changed(SESSION_T *session, const SESSION_STATE_T old_state, const SESSION_STATE_T new_state) { printf("Session state changed from %s (%d) to %s (%d)\n", session_state_as_string(old_state), old_state, session_state_as_string(new_state), new_state); if(new_state == CONNECTED_ACTIVE) { printf("Session ID=%s\n", session_id_to_string(session->id)); } } /* * This callback is invoked when Diffusion acknowledges that it has received * the fetch request. It does not indicate that there will be any subsequent * messages; see on_topic_message() and on_fetch_status_message() for that. */ static int on_fetch(SESSION_T *session, void *context) { puts("Fetch acknowledged by server"); return HANDLER_SUCCESS; } /* * This callback is invoked when all messages for a topic selector have * been received, or there was some kind of server-side error during the * fetch processing. */ static int on_fetch_status_message(SESSION_T *session, const SVC_FETCH_STATUS_RESPONSE_T *status, void *context) { switch(status->status_flag) { case DIFFUSION_TRUE: puts("Fetch succeeded"); break; //exit(0); case DIFFUSION_FALSE: puts("Fetch failed"); break; //exit(1); default: printf("Unknown fetch status: %d\n", status->status_flag); break; } return HANDLER_SUCCESS; } /* * When a fetched message is received, this callback in invoked. */ static int on_topic_message(SESSION_T *session, const TOPIC_MESSAGE_T *msg) { printf("Received message for topic %s\n", msg->name); printf("Payload: %.*s\n", (int)msg->payload->len, msg->payload->data); #ifdef DEBUG topic_message_debug(response->payload); #endif return HANDLER_SUCCESS; } int main(int argc, char **argv) { /* * Standard command-line parsing. */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } char *url = hash_get(options, "url"); char *topic = hash_get(options, "topic_selector"); int retries = atoi(hash_get(options, "retries")); long retry_delay = atol(hash_get(options, "retry_delay")); /* * A SESSION_LISTENER_T holds callbacks to inform the client * about changes to the state. Used here for informational * purposes only. */ SESSION_LISTENER_T foo_listener = { foo_listener.on_state_changed = &on_session_state_changed }; /* * The client-side API can automatically keep retrying to * connect to the Diffusion server if it's not immediately * available. */ RECONNECTION_STRATEGY_T *reconnection_strategy = make_reconnection_strategy_repeating_attempt(retries, retry_delay); /* * Creating a session requires at least a URL. Creating a session * initiates a connection with Diffusion. */ SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; session = session_create(url, hash_get(options, "principal"), credentials_create_password(hash_get(options, "credentials")), &foo_listener, reconnection_strategy, &error); if(session == NULL) { fprintf(stderr, "TEST: Failed to create session\n"); fprintf(stderr, "ERR : %s\n", error.message); return EXIT_FAILURE; } /* * Register handlers for callbacks we're interested in * relating to the fetch request. In particular, we want to * know about the topic messages that are returned, and the * status message which tells us when all messages have been * received for the selector (or, if something went wrong.) */ FETCH_PARAMS_T params = { .selector = topic, .on_topic_message = on_topic_message, .on_fetch = on_fetch, .on_status_message = on_fetch_status_message }; /* * Issue the fetch request. */ fetch(session, params); /* * Wait for 5 seconds for the results to come in. */ sleep(5); /* * Clean up. */ session_close(session, NULL); session_free(session); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of the Diffusion™ server.
This page last modified: 2017/06/29