Just a second...

Configuring legacy conflation

Use the conflation section of the Server.xml configuration file to define conflation policies for deprecated topics that use the legacy conflation system (stateless, single value and record topics).

Note: The legacy conflation system is only used with deprecated topic types, and will be removed when the deprecated topic types are removed.

Legacy conflation versus new topic properties system

The legacy system to configure conflation policy described below is only used for these deprecated topic types:
  • single value
  • record
  • stateless

Other topic types use a new system based on topic properties, described in Configuring conflation.

If the new system is used to set an "off" policy on a deprecated topic, that policy will be applied and any legacy conflation policy will be ignored.

If the new system is used to set an "unsubscribe" policy on a deprecated topic, the legacy policy is evaluated for each message, and the "unsubscribe" policy is applied when the message queue is full.

If the new system is used to set an "always" or "conflate" policy, the legacy conflation policy will be applied and the new system will not otherwise be used.

Note: Legacy conflation policies can be also be configured programmatically using the addPolicy methods on ConflationConfig. Such policies can also be added dynamically after the Diffusion™ server has started.

Legacy conflation policies

One or more conflation policies can be configured, each defining different conflation mechanisms by using conflation-policy elements. Conflation policies comprise the following:

Table 1. Conflation policy elements
Property Description
name A unique name by which the policy is referred to.
mode

Indicating whether the new (or merged) message is to replace the current message in place or whether the current message is to be removed and the new one appended to the end of the queue.

matcher A Java™ class which matches two messages and is used to locate an existing queued message as a candidate for conflation.

If no matcher is specified then default matching finds a message that is of the same topic.

merger A Java class which performs the merge of two messages of the same topic to produce a new message containing the data from both messages (or any resulting data required).

If no merger is specified, no merging takes place and the current message is removed from the queue and the new message either replaces it or is appended to the queue depending upon the mode.

The merger can also indicate that either the current or new message is to be used or even that no conflation takes place in this instance.

Having defined one or more conflation policies, you can map topics to them. This is done by specifying a topic path or topic selector which maps to a particular conflation policy.

Conflation policies can be added or removed at runtime and the removal of a conflation policy automatically removes any mappings to it.

Conflation policy mode

The conflation policy mode determines whether the new (or merged) message is to replace the existing message in the client queue or be appended to the end of the client queue.

Available modes are:

Table 2. Conflation policy modes
Mode Definition
REPLACE The new (or merged) message will replace the existing message at its current position in the client queue.
APPEND The current message is removed from the client queue and the new (or merged) message is appended to the end of the queue.

If no mode is specified, REPLACE is assumed.

The mode is specified in the mode property of a conflation-policy section in etc/Server.xml.

When defining conflation policies programmatically the mode is specified when creating the policy.

Message matchers

A message matcher is used by a conflation policy when queuing a new message for a client that has conflation enabled for a topic that has a conflation policy defined for it. The message matcher is used to locate the last message queued for a client that is a candidate for conflation.

If no message matcher is explicitly defined for a conflation policy, a default matcher is used which locates a message of the same topic.

A message matcher can be supplied if the matching is to be somehow dependent upon the content of the messages.

To implement a message matcher, write a Java class that implements com.pushtechnology.diffusion.api.conflation.MessageMatcher. This has a single method called matches to which is passed the current message in the queue being tested and the new message to be queued.
Note: The existing message is always of the same topic as the new message so you do not have to check that is the case.

An example of a MessageMatcher implementation is shown below:

public class ExampleMessageMatcher implements messageMatcher { 
    @Override 
    public boolean matches(TopicMessage currentMessage,TopicMessage newMessage) { 
        return currentMessage.nextField().equals(newMessage.nextField()); 
    } 
}

MessageMatcher implementations must be thread safe and stateless. The same MessageMatcher instance can be supplied to more than one different conflation policy if so required.

Message mergers

A message merger can be specified on a conflation policy if the action of the policy is to merge the content of an existing queued message with the new message being queued. This technique can be used when message data comprises more than one data item and it is desirable to reduce the number of messages sent to the client whilst preserving the data from all messages.

If no message merger is specified for a conflation policy, the policy replaces the current message with the new.

To implement a message merger, write a Java class that implements com.pushtechnology.diffusion.api.conflation.MessageMerger. This has a single method called merge to which is passed the current message in the queue and the new message to be queued.
Note: The existing message is always of the same topic as the new message so you do not have to check that is the case.

The action of conflation will depend upon the message that is returned from the merger method, as follows:

Table 3. Action depending upon merge result
Returned message Action
A new message It is assumed that the new message represents a merging of the data of the two messages input and so the returned message either replaces the current message in the queue or the current message is removed and the returned message added to the end of the queue, depending upon the policy mode.
The current message The current message is retained at its current queue position and the new message is not queued.
The new message The new message either replaces the current message in the queue or the current message is removed and the new message appended to the end of the queue depending upon the policy mode.

This is effectively the same as the result that occurs if there is no merger.

Null No conflation will occur. The current message remains where it is in the queue and the new message is appended to the end of the queue,

An example of a message merger implementation is shown below:

package com.pushtechnology.diffusion.examples;

import com.pushtechnology.diffusion.api.message.MessageReader;
import com.pushtechnology.diffusion.api.message.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.api.APIException;
import com.pushtechnology.diffusion.api.conflation.MessageMerger;
import com.pushtechnology.diffusion.api.message.TopicMessage;
import com.pushtechnology.diffusion.api.publisher.Publishers;

import java.util.ArrayList;
import java.util.List;

public final class MessageMergerExample implements MessageMerger {
    private static final Logger LOG =
            LoggerFactory.getLogger(MessageMergerExample.class);

    @Override
    public TopicMessage merge(TopicMessage currentMessage, TopicMessage newMessage) throws APIException {

        final MessageReader currentMessageReader = currentMessage.getReader();
        final MessageReader newMessageReader = newMessage.getReader();

        final TopicMessage result = Publishers.createDeltaMessage(currentMessage.getTopicName());

        Record cRecord = currentMessageReader.nextRecord();
        Record nRecord = newMessageReader.nextRecord();

        while (nRecord != null) {
            final List<String> mergedRecord = new ArrayList<>(nRecord.size());
            for (int i = 0; i < nRecord.size(); i++) {
                final String nField = nRecord.getField(i);
                if (!nField.isEmpty() || cRecord == null || i >= cRecord.size()) {
                    mergedRecord.add(nField);
                }
                else {
                    mergedRecord.add(cRecord.getField(i));
                }
            }

            result.putRecord(mergedRecord);

            nRecord = newMessageReader.nextRecord();
            cRecord = currentMessageReader.nextRecord();
        }

        if (LOG.isTraceEnabled()) {
            LOG.trace("MessageMerger merging - currentMessage: {}, newMessage: {}, merged: {}",
                    currentMessage.asRecords(), newMessage.asRecords(), result.asRecords());
        }

        return result;
    }
}

The above example merges delta messages for record topics with variable records and fields.

MessageMerger implementations must be thread safe and stateless. The same MessageMerger instance can be supplied to more than one different conflation policy if so required.

Default conflation policy

You can specify a default conflation policy that is used for any topics that do not have explicit policy mappings.

Use a default conflation policy only if you want to apply conflation to all topics when conflation is enabled for a client.

This can be specified using the default-conflation-policy property in the conflation section of etc/Server.xml. Alternatively it can be set programmatically at any time using the setDefaultPolicy method on ConflationConfig.

If no default policy is set, conflation will not occur for topics that have no explicit mappings even when conflation is enabled.

Mapping topics to policies

Having defined one or more conflation policies, you can map topics to the conflation policies that are to be used for them.

Use a topic selector pattern when mapping to a conflation policy.

As the use of topic selectors makes it possible for more than one mapping to potentially apply to the same topic, the last mapping defined that matches a specific topic is the one that is used for conflating messages of that topic.

A default conflation policy can be specified which is selected to map to if no other mapping matches a topic.

Messages for topics that have no mappings (when there is no default policy) are not conflated, even if conflation is enabled for a client.

Conflation mappings can be defined using topic-conflation elements within the conflation section of the etc/Server.xml property file.

Conflation mappings can also be set programmatically using the setTopicPolicy method of ConflationConfig. Mappings can be set at any time during the running of a server. Mappings can also be removed at any time using unsetTopicPolicy.

Note: Removing a conflation policy at runtime will automatically remove any mappings to it.

Enabling conflation

Specify conflation for a queue-definition by setting the conflates property to true. This queue definition can then be used wherever required, for example by connectors that have conflation enabled for all clients.