Skip to content

Aris Koliopoulos

Apache Kafka: 8 things to check before going live

Kafka, Distributed Systems8 min read

A Kafka System

Apache Kafka is a beautiful system. It scales well, it is stable and it provides phenomenal system architecture flexibility. After 5 years of running production Kafka clusters, I have collected a list of tips and pitfalls. Some of them were learnt the hard way. If you work in a small team rolling out Kafka to production, those might prove useful. The article assume familiarity with basic Kafka concepts, such as brokers, topics, producers and consumers. What is more, the following points should be valid for up to Kafka 2.6.0. Without further ado.

1. Key all the messages!

Kafka topics consist of a (configurable) number of partitions. If the partition number is not provided by the user, KafkaProducer chooses the partition for the message using the key in the ProducerRecord instance passed to it.

Check out the default implementation: It checks if the key is null, and if it isn't it, computes a murmur2 hash modulo the number of partitions. This is consistent; it will yield the same result for messages sharing the same key. If the key is null though, it uses a sticky partitioner that chooses a partition randomly at every batch. In practical terms, if no key is passed, the producer will choose the partition randomly.

This has important implications because Kafka only provides delivery ordering guarantees within a partition. Messages in the same partition will be delivered in the order they were committed. Messages in different partitions will be delivered in non-deterministic order. If the messages have any form of causal relationship between them, and they are not in the same partition, then any downstream consumer will have to collect all messages for a key before processing them, else causal consistency might be violated; whatever all means in the context of an infinite stream.

As a simple example to illustrate the above, think of a simple event like the following:

1case class EmailSubscription(
2 email: Email,
3 active: Boolean,
4 createdOn: DateTime
5)
6
7def storeToDB(sub: EmailSubscription) = ???
8val events: Stream[EmailSubscription] = ???

If emails arrive in causal order, then we can map the events statelessly:

1events.map(storeToDb)

If per-email causal order is not guaranteed though, we need to maintain enough state to know if the event we see is indeed the latest. Else if the order is reversed, we might send an email to a user who has unsubscribed. For example:

1events
2 .groupMapReduce(_.email)(identity)(
3 (e1, e2) => if (e1.createdOn.isAtfer(e2.createdOn)) e1 else e2
4 )
5 .values
6 .map(storeToDB)

If we key messages using email, then Kafka will deliver all messages for a single email in the order they were inserted. Our consumer can be completely stateless; it can fetch messages from Kafka and store them to a datastore. If the key is null (none provided), our stateless consuer will happily store a message from the past. To mitigate that, at the very minimum we need to maintain the latest createdOn date for every email. Relying on wall clocks for causality is a very bad idea.

Assuming per key ordering can be guaranteed, downstream reducer bounds can be reduced from a Semilattice to a Semigroup. In practical terms, by taking advantage of this property we can drop the commutativity requirement which unlocks easier implementations. If designing reducers for Kafka consumers sounds interesting, let me know and I will write about it.

2. Ensure all producers are using the same partitioner

Providing a key is not enough. Partitioners (ie the function f: (key) => partition) are configurable. Kafka provides a few and users can roll out their own. Do NOT assume all producers are using the same partitioner.

In a complex system where Go services, Python services, Spark and other wild animals all share the same Kafka cluster, all sorts of different implementations might exist. If different services are pushing data to the same topics, an integration test would be very useful. If things go wrong, delivery to consumers will be non-deterministic and debugging it can be pure hell.

3. Topic versioning

The beauty of Kafka is that data can be reprocessed as many times as needed. This forgives a lot of errors. To illustrate this one, let's assume the same dummy model:

1case class EmailSubscription(
2 email: Email,
3 active: Boolean,
4 createdOn: DateTime
5)

Let's now assume messages are serialized to json before being pushed to Kafka. Due to a json serialization bug, the following is pushed to the emailsubscriptions topic:

1{
2 "email": {
3 "value": "aris@aris.com"
4 },
5 "active": false,
6 "createdOn": 1600000000
7}

Instead of the expected:

1{
2 "email": "aris@aris.com",
3 "active": false,
4 "createdOn": 1600000000
5}

Downstream consumers try to deserialize the message and fail. What can be done?

One solution would be to create a custom deserializer for those buggy instances and to add it to all the consumers. That's non trivial and error prone code; useful just for this instance.

Another solution would be to implement a consumer that would read from emailsubscriptions-1, fix the issue, and write to emailsubscriptions-2. Once the offsets of the two topics are identical, producers and consumers can switch from emailsubscriptions-1 to emailsubscriptions-2 without having to update any code. What's great about this is that those migrations can fail with no major consequences. If emailsubscriptions-2 is no good, we can run again and produce emailsubscriptions-3 and so on. This trick also works for non-trivial schema changes, migrations and other data enrichments. Avro and Profobuf can help in some cases, but bugs will occur and requirements will evolve in unpredictable ways. In any case, "fixing" a topic's data by reading from it and publishing to it is rarely a good idea. Topics should be immutable and versioning them can help in many situations where a topic's content have been corrupted.

4. Treat ZooKeeper like royalty

Up until at least 2.6.0, Kafka relies on ZooKeeper. Losing connection to ZooKeeper means no ISRs (In-Sync-Replicas, more on that later), no partition leader election and eventually the brokers shut down. Thankfully @fpjunqueira and his team who created ZooKeeper are real pros, and that won't happen without reason. In fact, ZooKeeper is one of the most reliable distributed systems (that I have seen at least).

The two following mess ups have occurred though:

  1. Due to a bug in the provisioning Ansible script, 2/3rds of a cluster ended up in the same availability zone, with sequential IPs (that usually means in the same rack on AWS). They all disappeared at the same time. No consensus, hell broke loose.
  2. A QA environment ran for long enough for all nodes to run out of disk space (ZooKeeper creates backup snapshots of the transaction log over time and someone/something external has to deal with deleting them). At the same time. Bringing this env back to life required editing znodes manually, and still data was lost.

To clean up older transaction log snapshots in ZooKeeper 3.4.x, ZooKeeper provides the following tool:

1java -cp zookeeper-3.4.x.jar:lib/*:conf org.apache.zookeeper.server.PurgeTxnLog \
2 /var/lib/zookeeper /var/lib/zookeeper -n 5

Ideally on a cron.

Those are just two examples. A lot more can go wrong. Because of the consequences of failure, proper JMX metric monitoring and real time log aggregation, all hooked up to a form of PagerDuty, are very highly recommended.

5. Unclean elections and minimum ISRs

This is essentially a trade off between availability and durability. Let's start with unclean elections.

Let's assume we have a topic with a single partition and a single replica. Data is happily flowing in. If the replica is "in-sync" (aka identical to the leader and in the ISR set in ZooKeeper), then if the leader partition becomes unavailable (eg the broker crashes) then the replica can pick up, accept writes and continue with no downtime. If the replica lags behind though, the leader will remove it from the ISRs in ZooKeeper. If then the leader goes down, there are two options:

  1. The lagging replica picks up, accepts writes and whatever excess writes the old leader had are lost. Essentially, the replica gets elected as leader without being in-sync. This is the "unclean" part.
  2. The partition becomes unavailable and new writes are rejected.

It entirely depends on the kind of data the topic holds. If the topic holds system metrics, then maybe the most recent data is more valuable and thus losing some older writes might be acceptable. If the topic contains bank transactions, going down until a human intervenes might be a better option. This is a broker level config that can be overridden per topic.

The second part of this equation is min.insync.replicas, which represents the minimum number of replicas that have to be in-sync for a write to go through. This is configurable at the broker level, topic level and even at the producer level (ie acks). Same considerations as above, if the topic holds payments, having just 1 replica with all the data might be risky.

Legendary distributed systems researcher Kyle Kingsbury, aka Aphyr, did an excellent analysis on Kafka's replication mechanism some 7 years ago. If you wish to dig deeper into this trade off, reading Aphyr's piece is very highly recommeneded. As far as I understand, the basic trade offs discussed still hold true today.

6. Memory Maps

Kafka uses a LOT of those. Running out of them leads to a fatal runtime exception that will kill the broker. If the OS defaults are used, it is extremely likely that those will be reached as soon as the cluster has a few tens of thousand segments per broker. What's worse is that in a well balanced cluster where brokers hold similar numbers of partitions, those failures will occur roughly at the same time. Let's look a bit closer:

vm.max_map_count: is the maximum number of memory map areas a process can have.

From the linux kernel docs:

max_map_count:
This file contains the maximum number of memory map areas a process may have. Memory map areas are used as a side-effect of calling malloc, directly by mmap, mprotect, and madvise, and also when loading shared libraries. While most applications need less than a thousand maps, certain programs, particularly malloc debuggers, may consume lots of them, e.g., up to one or two maps per allocation.
The default value is 65536.

Each log segment requires an index file and a timeindex file; each of those require 1 map area. Each partition contains a number of log segments. How often Kafka closes a segment and opens a new one depends on segment.bytes and segment.ms. The defaults are 1GB and 7 days. If for example retention is set to a year, and the segments are set to roll daily then 1 partition might have 365 log segments a year later. Each of those will require 2 map areas. If a broker has 1000 partitions like this one it will require 365 x 2 x 1000 = 730000 map areas.

This might sound high, but if Kafka serves as the central nervous system of a tech organisation, having thousands of partitions and tens of thousands of log segments is not uncommon. To see how many maps a broker is currently consuming, given the pid of Kafka:

1cat /proc/:pid/maps | wc -l

Eg:

1[root@ip-* kafka-logs]# cat /proc/12054/maps | wc -l
210790

To find the current limit:

1[root@ip-* kafka-logs]# sysctl vm.max_map_count
2vm.max_map_count = 1048576

Note: this might not be the limit with which the process was started. To increase the limit for the current session:

1sysctl -w vm.max_map_count=1048576

Or to change in permanently:

1echo "vm.max_map_count=1048576" >> /etc/sysctl.conf

After each change the broker needs to be restarted.

If things go wrong, the logs will be littered with the following:

1java.io.IOException: Map failed
2       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
3       at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)
4       at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:105)
5       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
6       at kafka.log.AbstractIndex.resize(AbstractIndex.scala:105)
7       at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:167)
8       at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:167)
9       at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:167)
10       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
11       at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:166)
12       at kafka.log.AbstractIndex.close(AbstractIndex.scala:178)
13       at kafka.log.LogSegment$$anonfun$close$2.apply$mcV$sp(LogSegment.scala:477)
14 ... 31 more

Or:

1Caused by: java.io.IOException: Map failed
2    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
3    at kafka.log.AbstractIndex.<init>(AbstractIndex.scala:63)
4    at kafka.log.OffsetIndex.<init>(OffsetIndex.scala:52)
5    at kafka.log.LogSegment.<init>(LogSegment.scala:77)
6    at kafka.log.Log.roll(Log.scala:1238)
7    at kafka.log.Log.maybeRoll(Log.scala:1194)
8    at kafka.log.Log.append(Log.scala:652)
9    ... 22 more
10Caused by: java.lang.OutOfMemoryError: Map failed
11    at sun.nio.ch.FileChannelImpl.map0(Native Method)
12    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)
13    ... 28 more

The quick solution is to increase the maximum map areas and restart the broker. The actual solution is to review segment sizes, retention policies and segment rolling schedules to spot inefficiencies.

7. File descriptors

From Kafka's OS configuration guide:

File descriptor limits: Kafka uses file descriptors for log segments and open connections. If a broker hosts many partitions, consider that the broker needs at least (number_of_partitions)*(partition_size/segment_size) to track all log segments in addition to the number of connections the broker makes. We recommend at least 100000 allowed file descriptors for the broker processes as a starting point.

To see the limit for the process, given the pid:

1cat /proc/:pid/limits | grep files

Production brokers I have seen consume roughly 400K of those. 100K might not be enough in cases where a large number of consumers and segments are in play. In any case:

1#<domain> <type> <item> <value>
2root soft nofile 262144
3root hard nofile 1024288

can be added to /etc/security/limits.d/root.conf to ajust those limits.

Note: Both map areas and file descriptors are OS dependent and vary depending on where Kafka is running. In cases where Kafka is deployed on Kubernetes or Mesos, the container orchestration layer might introduce additional constraints. A good approach would be to set segment.bytes and segment.ms to tiny values on a test environment and then create topics and consumers programmatically until Kafka breaks.

8. Log Compaction

The purpose of Log Compaction is to retain the last known value for every key in a partition. This process retains the most valuable data, saves space and speeds up reprocessing the topics. For Log Compaction to work properly, keys and partitioning needs to be configured (see (1.)). In practice, it is a good candidate for use cases where a) we need to retain an entity forever and b) this entity is updated often. Examples can range from user profiles, to account balances to stats aggregations. For log compaction to work, first make sure it is enabled on server.properties (this is the default since 0.9.0):

1log.cleaner.enable=true

A second important config is the total memory used amongst all log cleaner threads:

1log.cleaner.dedupe.buffer.size=536870912

Empirically, the 128MB is a bit too little if compaction is used aggressively. Most importantly, the Log Cleaner manager thread can die without killing the broker. If the broker relies on Log Compaction, then make sure log-cleaner.log is monitored for errors. If those threads die, then the broker might sit on a ticking disk bomb.

Thank you for reading!

If something isn't right in the above, please let me know. Ditto for general feedback, or requests to expand on anything.


More

Implementing Raft using ZIO in Scala

© 2021 by Aris Koliopoulos. All rights reserved.
Theme by LekoArts