Fetching the current value of a topic
A client can send a fetch request for the values and/or topic specifications of a set of topics. The result set can be filtered by topic selector and topic path range.
Required permissions:
and permissions for the specified topicsA fetch request enables you to retrieve values and/or topic specifications for a set of topics without subscribing to the topics.
In Diffusion™ 6.2, there is a new enhanced fetch API, and the old fetch API is deprecated. This documentation describes the new API.
Results can be filtered by topic selector and topic path range. A request can specify the maximum number of results to return, avoiding the inefficient transfer of very large result sets.
A request can specify the value type of topics to be returned, in which case only topics of types compatible with the given value type will be returned. Returned values will be typed accordingly, avoiding the need for data conversion.
Here is how to fetch the value of a topic or set of topics by using a topic selector to make a fetch request:
session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); topics = session.feature(Topics.class); FetchResult<String> result = topics.fetchRequest() .withValues(String.class) .fetch("*.*").get(5, SECONDS);
static int on_fetch_result(const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { char *result; LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, 0); DIFFUSION_VALUE_T *value = diffusion_topic_result_get_value(topic_result); read_diffusion_string_value(value, &result, NULL); printf("Fetch Result: %s\n", result); free(result); return HANDLER_SUCCESS; }
var session = Diffusion.Sessions .Principal( "client" ) .Password( "password" ) .Open( serverUrl ); var topics = session.Topics; var result = await topics.FetchRequest .WithValues<string>() .FetchAsync( "*.*" );
To get the result set and print the results:
List<TopicResult<Void>> results = result.results(); results.forEach(t -> { System.out.println(t.type() + " : " + t.path()); });
DIFFUSION_FETCH_REQUEST_T *fetch_request = diffusion_fetch_request_init(session); DIFFUSION_DATATYPE dt = DATATYPE_STRING; SET_T *topic_types = set_new_int(1); TOPIC_TYPE_T topic_type_string = TOPIC_TYPE_STRING; diffusion_fetch_request_topic_types(fetch_request, topic_types, NULL); diffusion_fetch_request_with_values(fetch_request, &dt, NULL); diffusion_fetch_request_from(fetch_request, "test-fetch-query", NULL); diffusion_fetch_request_to(fetch_request, "test-fetch-query", NULL); diffusion_fetch_request_first(fetch_request, 1, NULL); diffusion_fetch_request_maximum_result_size(fetch_request, 100, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = ">test-fetch-query", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params);
foreach ( var item in result.Results ) { Console.WriteLine( $"{item.Type} : {item.Path}" ); }
Fetching topic specifications
You can return topic specifications instead of values for each topic selected.
FetchResult<Void> result = topics.fetchRequest() .withProperties() .fetch("*Accounts/").get(5, SECONDS); TopicResult<Void> topicResult = result.get(0); Map<String, String> properties = topicResult.specification().getProperties();
static int on_fetch_result(const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, 0); TOPIC_SPECIFICATION_T *spec = diffusion_topic_result_get_specification(topic_result); list_free(results, (void (*)(void *))diffusion_topic_result_free); return HANDLER_SUCCESS; } ... DIFFUSION_FETCH_REQUEST_T *fetch_request = diffusion_fetch_request_init(session); diffusion_fetch_request_with_properties(fetch_request, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*Accounts/", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params);
var result = await topics.FetchRequest .WithProperties() .FetchAsync( "*Accounts/" ); var topicResult = result.Results.First(); var properties = topicResult.Specification.Properties;
Filtering by topic type
The results can also be restricted to topics of a particular topic type or types:FetchResult<Void> result = topics.fetchRequest() .topicTypes(EnumSet.of(TopicType.STRING, TopicType.INT64)) .fetch("*Accounts/").get(5, SECONDS);
static int on_fetch_result(const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, 0); TOPIC_SPECIFICATION_T *spec = diffusion_topic_result_get_specification(topic_result); list_free(results, (void (*)(void *))diffusion_topic_result_free); return HANDLER_SUCCESS; } ... DIFFUSION_FETCH_REQUEST_T *fetch_request = diffusion_fetch_request_init(session); diffusion_fetch_request_with_properties(fetch_request, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*Accounts/", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params);
var result = await topics.FetchRequest .TopicTypes( new[] { TopicType.STRING, TopicType.INT64 } ) .FetchAsync( "*Accounts/" );
Restricting the results to a range of topics
You can restrict the returned results to within a specified range of topics. All the topics within the selection that have a path that is lexically within the specified range will be returned, at all levels.
You can specify either a start point or an end point or both. For example, if you specify a start point but no end point, results will be returned from the start point up to the end of the topic tree.
The specified start and end points do not need to represent topics that actually exist.
FetchResult<Bytes> result = topics.fetchRequest() .withValues(Bytes.class) .from("Accounts/Dept05") .to{"Accounts/Dept10") .fetch("*Accounts/").get(5, SECONDS);
static int on_fetch_result(const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); return HANDLER_SUCCESS; } ... DIFFUSION_FETCH_REQUEST_T *fetch_request = diffusion_fetch_request_init(session); diffusion_fetch_request_with_values(fetch_request, NULL, NULL); diffusion_fetch_request_from(fetch_request, "Accounts/Dept05", NULL); diffusion_fetch_request_to(fetch_request, "Accounts/Dept10", NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*Accounts/", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params);
var result = await topics.FetchRequest .WithValues<IBytes>() .From( "Accounts/Dept05" ) .To( "Accounts/Dept10" ) .FetchAsync( "*Accounts/" );
Paging through topics
You can specify a non-inclusive range using the after and before methods. You can limit the number of results and check if there are further results remaining.
By combining these, you can page through a topic tree. This can be useful when there is a large number of topics and you wish to access it in manageable chunks, for example when presenting results from a large set into a limited window in a user interface.
Here is an example of paging through all string topics in a topic tree, in chunks of 20:
FetchRequest request = topics.fetchRequest() .withValues(String.class) .first(20); FetchResult<String> result = request.fetch("*.*").get(5, SECONDS); if (result.hasMore()) { result = request.after(result.results.get(19).path()).fetch("*.*"); }
DIFFUSION_FETCH_REQUEST_T *fetch_request = NULL; static int on_fetch_result(const DIFFUSION_FETCH_RESULT_T *fetch_result, void *context) { if(diffusion_fetch_result_has_more(fetch_result)) { LIST_T *results = diffusion_fetch_result_get_topic_results(fetch_result); DIFFUSION_TOPIC_RESULT_T *topic_result = list_get_data_indexed(results, 19); diffusion_fetch_request_after(fetch_request, diffusion_topic_result_get_path(topic_result), NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params); list_free(results, (void (*)(void *))diffusion_topic_result_free); } return HANDLER_SUCCESS; } ... fetch_request = diffusion_fetch_request_init(session); DIFFUSION_DATATYPE dt = DATATYPE_STRING; diffusion_fetch_request_with_values(fetch_request, &dt, NULL); diffusion_fetch_request_first(fetch_request, 20, NULL); DIFFUSION_FETCH_REQUEST_PARAMS_T params = { .topic_selector = "*.*", .fetch_request = fetch_request, .on_fetch_result = on_fetch_result }; diffusion_fetch_request_fetch(session, params);
var request = topics.FetchRequest .WithValues<string>() .First( 20 ); var result = await request.FetchAsync( "*.*" ); if ( result.HasMore ) { result = await request .After( result.Results.Last().Path ) .FetchAsync( "*.*" ); }
This page last modified: 2019/01/07