Acting Soon on Kafka Deserialization Errors

Acting Soon on Kafka Deserialization Errors
Imaginea este preluată automat împreună cu articolul de pe Kaizen Driven Development

by Horatiu Dan

Context

Event driven architectures have been successfully used for quite an amount of time by a lot of organizations in various business cases. They excel at performance, scalability, evolvability and fault-tolerance providing a good level of abstraction and elasticity. These strengths made them good choices when applications needed real or near real-time reactiveness.

In terms of implementations, for standard messaging, ActiveMQ and RabbitMQ are good candidates, while for data streaming, platforms as Apache Kafka and Redpanda are more suitable. Usually, when developers and architects need to opt for either one of these two directions they analyze and weight from a bunch of angles – message payload, flow and usage of data, throughput, solution topology. As the discussion around these aspects can get too big and complex, it is not going to be refined as part of this article.

Conceptually, event driven architectures involve at least three main actors – message producers, message brokers and message consumers. Briefly, the purpose is to allow the producers and the consumers to communicate in a decoupled and asynchronous way, mean that is accomplished with the help of the previously mentioned message brokers. In the optimistic scenario, a producer creates a message, publishes it to a topic owned by the broker from which the consumer reads it, deals with it and out of courtesy provides a response back. Messages are serialized (marshalled) by the producers when sent to topics and de-serialized (unmarshalled) by consumers when received from topics.

This article focuses on the situation in which a consumer experiences issues when de-serializing a received message and provides a way of being able to act further. A few examples of such actions may include constructing a default message or sending back feedback to the message broker. Developers are creative enough to decide on this behavior, depending on the particular implemented use cases.

Set-up

  • Java 21
  • Maven 3.9.2
  • Spring Boot – version 3.1.5
  • Redpanda message broker running in Docker – image version 23.2.15

Redpanda is a lightweight message broker and it was chosen for this proof of concept to give the readers the opportunity to experiment a different option that the widely used Kafka one. As it is Kafka compatible, the development and the configuration of the producers and consumers will not need to change at all if moving from one service provider to another.

According to Redpanda documentation, the Docker support applies only to development and testing. For the purpose of this project, this is more than enough, thus a single Redpanda message broker is set-up to run in Docker.

See [Resource 1] for details on how to accomplish the minimal set-up.

Once up and running, a topic called minifig is created with the following command:

>docker exec -it redpanda-0 rpk topic create minifig TOPIC STATUS minifig OK 

If the cluster is inspected, one may observe that a topic with one partition and one replica was created.

>docker exec -it redpanda-0 rpk cluster info CLUSTER ======= redpanda.581f9a24-3402-4a17-af28-63353a602421 BROKERS ======= ID HOST PORT 0* redpanda-0 9092 TOPICS ====== NAME PARTITIONS REPLICAS __consumer_offsets 3 1 _schemas 1 1 minifig 1 1 

Implementation

The flow is straight-forward, the producer sends a request to the configured topic which is further read by the consumer, as it is able to.

A request represents a mini-figure which is simplistically modelled by the following record:

public record Minifig(String id, Size size, String name) { public Minifig(Size size, String name) { this(UUID.randomUUID().toString(), size, name); } public enum Size { SMALL, MEDIUM, BIG; } } 

id is the unique identifier of the Minifig which has a certain name and is of a certain size – small, medium or big.

For configuring a producer and a consumer, at least these properties are needed (application.properties file):

# the path to the message broker broker.url=localhost:19092 # the name of the broker topic topic.minifig=minifig # the unique string that identifies the consumer group of the consumer topic.minifig.group.id=group-0 

For sending messages, the producer needs a KafkaTemplate instance.

@Configuration public class KafkaConfig { @Value("${broker.url}") private String brokerUrl; @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(producerFactory); } } 

One may observe in the producer configuration that a StringSerializer was chosen for marshalling the payload value. Usually, a JsonSerializer provides more robustness to the producer-consumer contract. Nevertheless, the choice here was intentional to increase the experimental flexibility on the consumer side (will see later). Just as a reminder, the interest in this proof of concept is to act on the encountered deserialization errors.

Once the messages reach the minifig topic, a consumer is configured to pick them up.

@Configuration @EnableKafka public class KafkaConfig { @Value("${broker.url}") private String brokerUrl; @Value("${topic.minifig.group.id}") private String groupId; @Bean public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName()); DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props); ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; } } 

The KafkaListenerContainerFactory interface is responsible to create the listener container for a particular endpoint. The @EnableKafka annotation on the configuration class enables the detection of @KafkaListener annotations on any Spring-managed beans in the container. Thus, the actual listener (the message consumer) is developed next.

@Component public class MinifigListener { private static final Logger LOG = LoggerFactory.getLogger(MinifigListener.class); @KafkaListener(topics = "${topic.minifig}", groupId = "${topic.minifig.group.id}") public void onReceive(@Payload Minifig minifig) { LOG.info("New minifig received - {}.", minifig); } } 

Its functionality is trivial, It only logs the messages read from the minifig topic, destined for the configured consumer group.

If the the application is started, provided the message broker is up, the listener is ready to receive messages.

INFO 10008 --- [main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-group-0-1, groupId=group-0] Subscribed to topic(s): minifig INFO 10008 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-group-0-1, groupId=group-0] Cluster ID: redpanda.581f9a24-3402-4a17-af28-63353a602421 INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Discovered group coordinator localhost:19092 (id: 2147483647 rack: null) INFO 10008 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-group-0-1, groupId=group-0] Found no committed offset for partition minifig-0 INFO 10008 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : group-0: partitions assigned: [minifig-0] 

In order to check the integration, the following simple test is used. Since a Minifig is expected by the listener, a compliance template was created for convenience.

@SpringBootTest class AppTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${topic.minifig}") private String topic; final String template = "{" + "\"id\":\"%s\"," + "\"size\":\"%s\"," + "\"name\":\"%s\"" + "}"; @Test void send_compliant() { final String minifig = String.format(template, UUID.randomUUID(), Minifig.Size.SMALL, "Spider-Man"); kafkaTemplate.send(topic, minifig); } } 

When running the test, a ‘compliant’ message is sent to the broker and as expected, it is successfully picked up by the local consumer.

INFO 10008 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=0c75b9e4-511a-48b3-a984-404d2fc1d47b, size=SMALL, name=Spider-Man]. 

Redpanda Console can be helpful in observing what is happening at the broker level, particularly what is flowing through the minifig topic.

In scenarios as the one above, messages are sent from the producer to the consumer via the message broker, as planed.

Recover on Deserialization Failures

In the particular case of this proof of concept, it is assumed the type of a mini-figure can be SMALL, MEDIUM or BIG, in line to the defined Type enum. In case the producer sends a mini-figure of an unknown type, one that deviates a bit from the agreed contract, the messages are basically rejected by the listener, as the payload cannot be de-serialized.

To simulate this, the following test is run.

@SpringBootTest class AppTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${topic.minifig}") private String topic; final String template = "{" + "\"id\":\"%s\"," + "\"size\":\"%s\"," + "\"name\":\"%s\"" + "}"; @Test void send_non_compliant() { final String minifig = String.format(template, UUID.randomUUID(), "Unknown", "Spider-Man"); kafkaTemplate.send(topic, minifig); } } 

The message reaches the topic, but not the MinifigListener#onReceive() method. As expected, the error appeared when the payload was being unmarshalled. The causes can be depicted by looking deep down the stack trace.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data from topic [minifig] Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL] at [Source: (byte[])"{"id":"fbc86874-55ac-4313-bbbb-0ed99341825a","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"]) 

One other aspect is that the messages are continually tried to be read on the consumer side. This is unfortunate at least from the consumer point of view, as the logs are accumulating.

In order to pass over such situations, the JsonDeserializer used for unmarshalling the payload value is decorated in an ErrorHandlingDeserializer as its actual delegate. Moreover, the ErrorHandlingDeserializer has a failedDeserializationFunction member that according to its JavaDoc, provides an alternative mechanism when the deserialization fails.

The new consumer configuration looks as below:

@Bean public ConcurrentKafkaListenerContainerFactory<String, Minifig> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, Minifig.class.getPackageName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Minifig.class.getName()); JsonDeserializer<Minifig> jsonDeserializer = new JsonDeserializer<>(Minifig.class); ErrorHandlingDeserializer<Minifig> valueDeserializer = new ErrorHandlingDeserializer<>(jsonDeserializer); valueDeserializer.setFailedDeserializationFunction(new MinifigFailedDeserializationFunction()); DefaultKafkaConsumerFactory<String, Minifig> defaultFactory = new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), valueDeserializer); ConcurrentKafkaListenerContainerFactory<String, Minifig> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(defaultFactory); factory.setCommonErrorHandler(new DefaultErrorHandler()); return factory; } 

The failedDeserializationFunction used here is simplistic, but the reason is to prove its utility.

public class MinifigFailedDeserializationFunction implements Function<FailedDeserializationInfo, Minifig> { private static final Logger LOG = LoggerFactory.getLogger(MinifigFailedDeserializationFunction.class); @Override public Minifig apply(FailedDeserializationInfo failedDeserializationInfo) { final Exception exception = failedDeserializationInfo.getException(); LOG.info("Error deserializing minifig - {}", exception.getCause().getMessage()); return new Minifig("Default"); } } 

The FailedDeserializationInfo entity (the Function#apply() input) is constructed during the recovery from the de-serialization exception and it encapsulates various pieces of information (here, the exception is the one leveraged).
Since the output of the apply() method is the actual deserialization result, one may return either null or whatever it is suitable depending on the aimed behavior.

If running the send_non_compliant() test again, the deserialization exception is handled and a default value is returned. Further, the MinifigListener is invoked and has the opportunity to deal with it.

INFO 30160 --- [ntainer#0-0-C-1] e.l.MinifigFailedDeserializationFunction : Error deserializing minifig - Cannot deserialize value of type `com.hcd.errhandlerdeserializer.domain.Minifig$Size` from String "Unknown": not one of the values accepted for Enum class: [BIG, MEDIUM, SMALL] at [Source: (byte[])"{"id":"f35a77bf-29e5-4f5c-b5de-cc674f22029f","size":"Unknown","name":"Spider-Man"}"; line: 1, column: 53] (through reference chain: com.hcd.errhandlerdeserializer.domain.Minifig["size"]) INFO 30160 --- [ntainer#0-0-C-1] c.h.e.listener.MinifigListener : New minifig received - Minifig[id=null, size=SMALL, name=Undefined]. 

Conclusion

Configuring Kafka producers and consumers and fine-tuning them in order to achieve the desired performance in accordance to the used message brokers is not always straight-forward. Controlling each step of the communication is by all means something desirable and moreover, acting fast to unknown situations helps delivering robust and easy to maintain solutions. This post focused on the deserialization issues that might appear at Kafka consumers level and provided with a way of having a second plan when dealing with non compliant payloads.

Sample Code

Available here – https://github.com/horatiucd/err-handler-deserializer

Resources

  1. Redpanda Quickstart
  2. Spring for Apache Kafka Reference
  3. The picture was taken at Zoo Brasov, Romania

Despre ZTB.ro

ZTB.ro este un agregator românesc de bloguri care colectează și afișează articole din diverse domenii, oferind vizibilitate bloggerilor și o platformă centralizată pentru cititori. Articolele sunt preluate prin feed-uri RSS/Atom și direcționează traficul către blogurile originale.

Articole recente