by Horatiu Dan
Context
Event driven architectures have been successfully used for quite an amount of time by a lot of organizations in various business cases. They excel at performance, scalability, evolvability and fault-tolerance providing a good level of abstraction and elasticity. These strengths made them good choices when applications needed real or near real-time reactiveness.
In terms of implementations, for standard messaging, ActiveMQ and RabbitMQ are good candidates, while for data streaming, platforms as Apache Kafka and Redpanda are more suitable. Usually, when developers and architects need to opt for either one of these two directions they analyze and weight from a bunch of angles – message payload, flow and usage of data, throughput, solution topology. As the discussion around these aspects can get too big and complex, it is not going to be refined as part of this article.
Conceptually, event driven architectures involve at least three main actors – message producers, message brokers and message consumers. Briefly, the purpose is to allow the producers and the consumers to communicate in a decoupled and asynchronous way, mean that is accomplished with the help of the previously mentioned message brokers. In the optimistic scenario, a producer creates a message, publishes it to a topic owned by the broker from which the consumer reads it, deals with it and out of courtesy provides a response back. Messages are serialized (marshalled) by the producers when sent to topics and de-serialized (unmarshalled) by consumers when received from topics.
This article focuses on the situation in which a consumer experiences issues when de-serializing a received message and provides a way of being able to act further. A few examples of such actions may include constructing a default message or sending back feedback to the message broker. Developers are creative enough to decide on this behavior, depending on the particular implemented use cases.
Set-up
- Java 21
- Maven 3.9.2
- Spring Boot – version 3.1.5
- Redpanda message broker running in Docker – image version 23.2.15
Redpanda is a lightweight message broker and it was chosen for this proof of concept to give the readers the opportunity to experiment a different option that the widely used Kafka one. As it is Kafka compatible, the development and the configuration of the producers and consumers will not need to change at all if moving from one service provider to another.
According to Redpanda documentation, the Docker support applies only to development and testing. For the purpose of this project, this is more than enough, thus a single Redpanda message broker is set-up to run in Docker.
See [Resource 1] for details on how to accomplish the minimal set-up.
Once up and running, a topic called minifig
is
created with the following command:
>docker exec -it redpanda-0 rpk topic create minifig TOPIC STATUS minifig OK
If the cluster is inspected, one may observe that a topic with one partition and one replica was created.
>docker exec -it redpanda-0 rpk cluster info CLUSTER ======= redpanda.581f9a24-3402-4a17-af28-63353a602421 BROKERS ======= ID HOST PORT 0* redpanda-0 9092 TOPICS ====== NAME PARTITIONS REPLICAS __consumer_offsets 3 1 _schemas 1 1 minifig 1 1
Implementation
The flow is straight-forward, the producer sends a request to the configured topic which is further read by the consumer, as it is able to.
A request represents a mini-figure which is simplistically modelled by the following record:
public record Minifig(String id, Size size, String name) { public Minifig(Size size, String name) { this(UUID.randomUUID().toString(), size, name); } public enum Size { SMALL, MEDIUM, BIG; } }
id
is the unique identifier of the Minifig which
has a certain name
and is of a certain
size
– small, medium or big.
For configuring a producer and a consumer, at least these
properties are needed (application.properties
file):
# the path to the message broker broker.url=localhost:19092 # the name of the broker topic topic.minifig=minifig # the unique string that identifies the consumer group of the consumer topic.minifig.group.id=group-0
For sending messages, the producer needs a
KafkaTemplate
instance.
@Configuration public class KafkaConfig { @Value("${broker.url}") private String brokerUrl; @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(producerFactory); } }
One may observe in the producer configuration that a
StringSerializer
was chosen for marshalling the
payload value. Usually, a JsonSerializer
provides more
robustness to the producer-consumer contract. Nevertheless, the
choice here was intentional to increase the experimental
flexibility on the consumer side (will see later). Just as a
reminder, the interest in this proof of concept is to act on the
encountered deserialization errors.
Once the messages reach the minifig
topic, a
consumer is configured to pick them up.
@Configuration @EnableKafka public class KafkaConfig { @Value("${broker.url}") private String brokerUrl; @Value("${topic.minifig.group.id}") private String groupId; @Bean public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName()); DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props); ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; } }
The KafkaListenerContainerFactory
interface is
responsible to create the listener container for a particular
endpoint. The @EnableKafka
annotation on the
configuration class enables the detection of
@KafkaListener
annotations on any Spring-managed beans
in the container. Thus, the actual listener (the message consumer)
is developed next.
@Component public class MinifigListener { private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class); @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}") public void onReceive(@Payload Minifig minifig) { LOG.info("New minifig received - {}.", minifig); } }
Its functionality is trivial, It only logs the messages read from the minifig topic, destined for the configured consumer group.
If the the application is started, provided the message broker is up, the listener is ready to receive messages.
INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421 INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null) INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0 INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-0: partitions assigned: [minifig-0]
In order to check the integration, the following simple test is
used. Since a Minifig
is expected by the listener, a
compliance template was created for convenience.
@SpringBootTest class AppTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${topic.minifig}") private String topic; final String template = "{" + "\"id\":\"%s\"," + "\"size\":\"%s\"," + "\"name\":\"%s\"" + "}"; @Test void send_compliant() { final String minifig = String.format(template, UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man"); kafkaTemplate.send(topic, minifig); } }
When running the test, a ‘compliant’ message is sent to the broker and as expected, it is successfully picked up by the local consumer.
INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man].
Redpanda Console can be helpful in observing what is happening
at the broker level, particularly what is flowing through the
minifig
topic.
In scenarios as the one above, messages are sent from the producer to the consumer via the message broker, as planed.
Recover on Deserialization Failures
In the particular case of this proof of concept, it is assumed
the type of a mini-figure can be SMALL, MEDIUM or BIG, in line to
the defined Type enum
. In case the producer sends a
mini-figure of an unknown type, one that deviates a bit from the
agreed contract, the messages are basically rejected by the
listener, as the payload cannot be de-serialized.
To simulate this, the following test is run.
@SpringBootTest class AppTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${topic.minifig}") private String topic; final String template = "{" + "\"id\":\"%s\"," + "\"size\":\"%s\"," + "\"name\":\"%s\"" + "}"; @Test void send_non_compliant() { final String minifig = String.format(template, UUID.randomUUID(), "Unknown", "Spider-Man"); kafkaTemplate.send(topic, minifig); } }
The message reaches the topic, but not the
MinifigListener#onReceive()
method. As expected, the
error appeared when the payload was being unmarshalled. The causes
can be depicted by looking deep down the stack trace.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data from topic [minifig] Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL] at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"])
One other aspect is that the messages are continually tried to be read on the consumer side. This is unfortunate at least from the consumer point of view, as the logs are accumulating.
In order to pass over such situations, the
JsonDeserializer
used for unmarshalling the payload
value is decorated in an ErrorHandlingDeserializer
as
its actual delegate. Moreover, the
ErrorHandlingDeserializer
has a
failedDeserializationFunction
member that according to
its JavaDoc
, provides an alternative mechanism when
the deserialization fails.
The new consumer configuration looks as below:
@Bean public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName()); JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class); ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer); valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction()); DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), valueDeserializer); ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; }
The failedDeserializationFunction
used here is
simplistic, but the reason is to prove its utility.
public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> { private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class); @Override public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) { final Exception exception = failedDeserializationInfo.getException(); LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage()); return new Minifig("Default"); } }
The FailedDeserializationInfo
entity (the
Function#apply()
input) is constructed during the
recovery from the de-serialization exception and it encapsulates
various pieces of information (here, the exception is the one
leveraged).
Since the output of the apply()
method is the actual
deserialization result, one may return either null
or
whatever it is suitable depending on the aimed behavior.
If running the send_non_compliant()
test again, the
deserialization exception is handled and a default value is
returned. Further, the MinifigListener
is invoked and
has the opportunity to deal with it.
INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL] at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"]) INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=null, size=SMALL, name=Undefined].
Conclusion
Configuring Kafka producers and consumers and fine-tuning them in order to achieve the desired performance in accordance to the used message brokers is not always straight-forward. Controlling each step of the communication is by all means something desirable and moreover, acting fast to unknown situations helps delivering robust and easy to maintain solutions. This post focused on the deserialization issues that might appear at Kafka consumers level and provided with a way of having a second plan when dealing with non compliant payloads.
Sample Code
Available here – https://github.com/horatiucd/err-handler-deserializer
Resources
- Redpanda Quickstart
- Spring for Apache Kafka Reference
- The picture was taken at Zoo Brasov, Romania