by Horatiu Dan
A lot of companies nowadays use event driven architectures in their day to day business activities, especially when they desire their applications to own real-time or near real-time reactiveness.
In such a scenery, during the interactions among the three main type of actors – producers, message broker and consumers – a lot of messages are exchanged. Nevertheless, under certain circumstances, some of these messages might not be of interest and thus they are discarded and ignored.
This article aims to analyze in detail how a consumer application shall be configured so that it behaves correctly when it needs to filter messages that are “irrelevant”. First, a standard record filter strategy is configured at consumer level. Then, a custom deserialization mechanism is added and the analysis is refined. As stated, the intention is to preserve a correct behavior of the consumer.
Set-up
- Java 21
- Maven 3.9.2
- Spring Boot – version 3.2.2
- Redpanda message broker running in Docker – image version 23.2.15
As message broker, the great and lightweight Redpanda is chosen. Since it is completely Kafka compatible, the development and the configuration do not need to be modified at all if deciding to change it with a different one. [Resource 1] describes how to accomplish the Redpanda minimal set-up.
Once the Docker container is up and running, a topic called
request
is created with the following command:
>docker exec -it redpanda-0 rpk topic create request TOPIC STATUS request OK
>docker exec -it redpanda-0 rpk cluster info CLUSTER ======= redpanda.581f9a24-3402-4a17-af28-63353a602421 BROKERS ======= ID HOST PORT 0* redpanda-0 9092 TOPICS ====== NAME PARTITIONS REPLICAS __consumer_offsets 3 1 _schemas 1 1 request 1 1
As shown, the request
topic was created
successfully.
Implement a Record Filter Strategy
The use case is the following:
- the producer sends a request to the configured topic
- if the request message fulfills the acceptance criteria, the consumer processes it
- otherwise, the message is discarded
A request message has a simple form:
{ "id": "34b25c6b-60d6-4e53-8f79-bdcdd17b3a2d", "contextId": "hcd" }
having just two fields, an identifier and a context identifier.
Messages are taken into account only in a certain acceptable
context. Differently put, a message is accepted if it has a certain
contextId
, that is equal to the one configured on the
consumer side, otherwise it is discarded.
A request is modelled by the following record:
public record Request(String id, String contextId) { }
For configuring a producer and a consumer, at least these
properties are needed (application.properties
file):
# the path to the message broker broker.url = localhost:19092 # the name of the topic topic.request = request # the unique string that identifies the consumer group of the consumer context.id = hcd
The requirement is clear – only the messages having
hcd
as contextId
are accepted.
In order to send messages, a producer needs a
KafkaTemplate
instance, configured as below:
@Configuration public class KafkaConfig { @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(producerFactory); } }
One may observe in the producer configuration that a
StringSerializer
was chosen for marshalling the
payload value. Usually, a JsonSerializer
provides more
robustness to the producer-consumer contract. Nevertheless, the
choice here was intentional to increase the experimental
flexibility.
Once the messages reach the request
topic, a
consumer is configured to pick them up.
@Configuration @EnableKafka public class KafkaConfig { @Value("${broker.url}") private String brokerUrl; @Value("${context.id}") private String groupId; @Bean public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName()); DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Request.class)); ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setRecordFilterStrategy(recordFilterStrategy); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; } }
Line 26 in the listing above allows injecting a record filtering strategy, which is exactly the purpose here – a mean to decide whether a message is filtered out or not.
The RecordFilterStrategy
interface has one abstract
method
boolean filter(ConsumerRecord consumerRecord);
which according to its JavaDoc, returns true
if the
ConsumerRecord
should be discarded (K
represents the message key, while V
the message
value).
In the case of this proof of concept, all messages that have
their contextId
equal to hcd
are accepted
and consumed, while the rest are filtered out. The implementation
is below.
@Component public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> { private static final Logger LOG = LoggerFactory.getLogger(CustomRecordFilterStrategy.class); @Value("${context.id}") private String contextId; @Override public boolean filter(ConsumerRecord<String, Request> consumerRecord) { Request request = consumerRecord.value(); boolean discard = !contextId.equals(request.contextId()); LOG.info("{} is{} compliant.", request, discard ? "n't" : ""); return discard; } }
As part of the configuration, the
KafkaListenerContainerFactory
interface is responsible
for creating the listener container of a particular endpoint. The
@EnableKafka
annotation on the configuration class
enables the detection of @KafkaListener
annotations on
any Spring-managed beans in the container. Thus, the actual
listener (the message consumer) is developed next.
@Component public class RequestMessageListener { private static final Logger LOG = LoggerFactory.getLogger(RequestMessageListener.class); private final ResponseService responseService; public RequestMessageListener(ResponseService responseService) { this.responseService = responseService; } @KafkaListener(topics = "${topic.request}", groupId = "${context.id}") public void onMessage(@Payload Request request) { LOG.info("Processing {}.", request); responseService.send(Response.success()); } }
Its functionality is trivial, it logs the messages read from the
request
topic and destined to the configured consumer
group. Then, it invokes a ResponseService
which acts
as the entity that sends a message back (here, it only logs
it).
@Service public class ResponseService { private static final Logger LOG = LoggerFactory.getLogger(ResponseService.class); public void send(Response response) { LOG.info("Sending {}.", response); } }
A Reponse
is modeled simply, as below:
public record Response (String id, Result result) { public static Response success() { return new Response(UUID.randomUUID().toString(), Result.SUCCESS); } public enum Result { SUCCESS, FAILURE } }
When the the application is started, provided the message broker is up, the listener is ready to receive messages.
INFO 20080 --- [main] c.h.r.RecordFilterStrategyApplication : Started RecordFilterStrategyApplication in 1.282 seconds (process running for 1.868) INFO 20080 --- [main] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-hcd-1, groupId=hcd] Subscribed to topic(s): request INFO 20080 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-hcd-1, groupId=hcd] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421 INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-hcd-1, groupId=hcd] Request joining group due to: need to re-join with the given member-id: consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc INFO 20080 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-hcd-1, groupId=hcd] Successfully joined group with generation Generation{generationId=7, memberId='consumer-hcd-1-1fa7cd25-2bf2-49bd-82fd-ac3e4c54cafc', protocol='range'}
In order to check the integration, the following two tests are
used. Since a Request
is expected by the listener, a
compliance template was created for convenience.
@SpringBootTest class RecordFilterStrategyApplicationTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${topic.request}") private String topic; @Value("${context.id}") private String contextId; private static final String template = """ { "id": "%s", "contextId": "%s" }"""; @Test void compliant() { kafkaTemplate.send(topic, String.format(template, UUID.randomUUID(), contextId)); } @Test void notCompliant() { kafkaTemplate.send(topic, String.format(template, UUID.randomUUID(), "other context")); } }
compliant()
sends a message whose
contextId
is as this consumer has configured it. As
expected, it is processed and a response is sent back.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd] is compliant. INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.RequestMessageListener : Processing Request[id=44a2644c-0025-4a38-bd36-f163a530725e, contextId=hcd]. INFO 20080 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=ebe0f65c-eddf-4866-b71f-e6cd766dd499, result=SUCCESS].
notCompliant()
sends a message whose
contextId
is different from what was configured on
this consumer. Thus, the message is neither processed, nor
responded to, but ignored.
INFO 20080 --- [ntainer#0-0-C-1] c.h.r.l.CustomRecordFilterStrategy : Request[id=ed22f60c-b13d-4315-8132-46aa83ddf33b, contextId=other context] isn't compliant.
So far, the proof of concept has exemplified how to configure the consumer with a record filtering strategy so that only certain messages are accepted.
The code for this part is here – 1-filter-strategy
Implement a Record Filter Strategy with Custom Deserialization
Let’s assume that the messages which are consumed from the
request
queue are unmarshalled using a custom
deserializer and the filtering is still required.
The custom deserializer here is trivial and has a didactic
purpose. Moreover, in case the id
field is missing, a
runtime RequestDeserializationException
is thrown.
Such an action is not necessarily needed at this point, but it was
put here to outline a certain use case. Read on.
public class CustomRequestDeserializer extends StdDeserializer<Request> { private static final Logger LOG = LoggerFactory.getLogger(CustomRequestDeserializer.class); public CustomRequestDeserializer() { super(Request.class); } @Override public Request deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { ObjectCodec oc = jsonParser.getCodec(); JsonNode root = oc.readTree(jsonParser); final String contextId = deserializeField(root, "contextId"); final String id = deserializeField(root, "id"); if (id == null || id.isEmpty()) { throw new RequestDeserializationException("'id' is required"); } Request request = new Request(id, contextId); LOG.info("Successfully deserialized {}", request); return request; } }
To apply it, the Request
record is annotated as
below:
@JsonDeserialize(using = CustomRequestDeserializer.class) public record Request(String id, String contextId) { }
Up until now, the behavior described in the first part is
preserved. If the previous compliant()
and
nonCompliant()
tests are run again, the outcome is the
same.
The next analyzed situation is the one in which a
RequestDeserializationException
is thrown when
deserializing an incoming message. Let’s assume the id
is empty, thus the form is as below:
{ "id": "", "contextId": "hcd" }
@Test void deserializationError_compliant() { kafkaTemplate.send(topic, String.format(template, "", contextId)); }
When such a message is received, the outcome is the following:
... Caused by: com.hcd.recordfilterstrategy.domain.deserialization.RequestDeserializationException: 'id' is required ...
An exception thrown at desrialization time determines the message to be neither consummed, nor responded to, but to be lost.
See [Resource 3] for a detailed analysis on situations like this.
One solution that allows recovering after deserialization
exceptions is to configure the value deserializer of the
KafkaListenerContainerFactory
with a failed
deserialization function – see line 15 below:
@Bean public ConcurrentKafkaListenerContainerFactory<String, Request> kafkaListenerContainerFactory(CustomRecordFilterStrategy recordFilterStrategy, FailedRequestDeserializationFunction failedDeserializationFunction) { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Request.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Request.class.getName()); JsonDeserializer<Request> jsonDeserializer = new JsonDeserializer<>(Request.class); ErrorHandlingDeserializer<Request> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer); valueDeserializer.setFailedDeserializationFunction(failedDeserializationFunction); DefaultKafkaConsumerFactory<String, Request> defaultFactory = new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), valueDeserializer); ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setRecordFilterStrategy(recordFilterStrategy); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; }
The purpose of the component is to allow recovering after such an exceptional situation and to be able to send a failure response back.
@Component public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> { private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class); private final ResponseService responseService; public FailedRequestDeserializationFunction(ResponseService responseService) { this.responseService = responseService; } @Override public Request apply(FailedDeserializationInfo failedDeserializationInfo) { final Exception ex = failedDeserializationInfo.getException(); if (ex instanceof RequestDeserializationException deserializationEx) { LOG.info("Error deserializing request - {}", deserializationEx.getMessage()); responseService.send(Response.failure()); } else { LOG.error("Unexpected error deserializing request.", ex); } return null; } }
If the same test is run again and a compliant, but incorrect message is sent, the behavior changes.
2024-03-13T10:52:38.893+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required 2024-03-13T10:52:38.895+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=5393b4a0-3849-4130-934b-671e43a2358f, result=FAILURE].
The only left case is that of a non-compliant and incorrect
message, meaning the id
is still empty, but the
contextId
is different from the expected one.
{ "id": "", "contextId": "other context" }
If the following test is run, nothing changes, unfortunately the
failed deserialization function still sends a failure response
back, although the record filtering strategy should have filtered
the message out as the contextId
is non-compliant.
@Test void deserializationError_notCompliant() { kafkaTemplate.send(topic, String.format(template, "", "other context")); }
2024-03-13T11:03:56.609+02:00 INFO 24232 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required 2024-03-13T11:03:56.610+02:00 INFO 24232 --- [ntainer#0-0-C-1] c.h.r.listener.ResponseService : Sending Response[id=b345f001-3543-46da-bc0f-17c63c20e32a, result=FAILURE].
The code for this second section is here – 2-filter-strategy-custom-deser.
Implement a Record Filter Strategy with Custom Deserialization – Correctly
The last part of this analysis provides a solution on how to address this last use-case.
Before moving on with it, let’s recall what currently happens in all possible use-cases:
Correct, compliant message
- since the message is correct, the custom deserializer successfully unmarshalls it
- the failed deserialization function is not invoked
- since the message is compliant, the record filter strategy does not reject it
- the listener is invoked, it processes the request and sends a response back
Correct, non-compliant message
- since the message is correct, the custom deserializer successfully unmarshalls it
- the failed deserialization function is not invoked
- since the message is non-compliant, the record filter strategy rejects it
- the listener is not invoked
Incorrect, compliant or non-compliant message
- since the message is incorrect, the custom deserializer throws an exception
- the failed deserialization is invoked and it sends a failure response back
- the record filter strategy is not invoked
- the listener is not invoked
In case of a correct message, the consumer application
behaves correctly, irrespective of the compliancy of the
message.
In case of an incorrect message, a failure response is
sent back, irrespective of the compliancy of the message, which
means the consumer behaves correctly only for compliant
messages.
For incorrect, non-compliant messages it should act as follows:
- since the message is incorrect, the custom deserializer throws an exception
- the failed deserialization is invoked and it sends a failure response back only if the message is compliant
- the record filter strategy is not invoked
- the listener is not invoked
At a first glance, in order to cover the last use-case as well,
only the FailedRequestDeserializationFunction
needs to
be enhanced to also check the message compliancy.
Basically, before sending the response, the same check as the
one in CustomRecordFilterStrategy
shall be added. To
avoid repetition, some refactoring is done.
To isolate the compliancy check, a separate component in charge of it is created.
@Component public class RequestFilterStrategy { private static final Logger LOG = LoggerFactory.getLogger(RequestFilterStrategy.class); @Value("${context.id}") private String contextId; public boolean filter(String contextId) { boolean discard = !this.contextId.equals(contextId); LOG.info("Request is{} compliant.", discard ? "n't" : ""); return discard; } }
Then, the component is injected both in the
CustomRecordFilterStrategy
and in the
FailedRequestDeserializationFunction
and consequently,
they are refactored as follows.
@Component public class CustomRecordFilterStrategy implements RecordFilterStrategy<String, Request> { private final RequestFilterStrategy requestFilterStrategy; public CustomRecordFilterStrategy(RequestFilterStrategy requestFilterStrategy) { this.requestFilterStrategy = requestFilterStrategy; } @Override public boolean filter(ConsumerRecord<String, Request> consumerRecord) { return requestFilterStrategy.filter(consumerRecord.value().contextId()); } }
@Component public class FailedRequestDeserializationFunction implements Function<FailedDeserializationInfo, Request> { private static final Logger LOG = LoggerFactory.getLogger(FailedRequestDeserializationFunction.class); private final RequestFilterStrategy requestFilterStrategy; private final ResponseService responseService; public FailedRequestDeserializationFunction(RequestFilterStrategy requestFilterStrategy, ResponseService responseService) { this.requestFilterStrategy = requestFilterStrategy; this.responseService = responseService; } @Override public Request apply(FailedDeserializationInfo failedDeserializationInfo) { final Exception ex = failedDeserializationInfo.getException(); if (ex instanceof RequestDeserializationException deserializationEx) { LOG.info("Error deserializing request - {}", deserializationEx.getMessage()); if (!requestFilterStrategy.filter(deserializationEx.getContextId())) { responseService.send(Response.failure()); } } else { LOG.error("Unexpected error deserializing request.", ex); } return null; } }
To check the behavior, the last unit test is run again.
@Test void deserializationError_notCompliant() { kafkaTemplate.send(topic, String.format(template, "", "other context")); }
The output clearly shows that for incorrect, non-compliant messages, no response is sent anymore.
2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] d.d.FailedRequestDeserializationFunction : Error deserializing request - 'id' is required 2024-03-13T15:05:56.432+02:00 INFO 17916 --- [ntainer#0-0-C-1] c.h.r.listener.RequestFilterStrategy : Request isn't compliant.
The code for the enhanced solution is here – 3-filter-strategy-custom-deser-covered
Resources
- Redpanda Quickstart
- Spring for Apache Kafka Reference
- Acting Soon on Kafka Deserialization Errors
- The picture was taken at Legoland, Germany