— Kafka, Distributed Systems — 8 min read
Apache Kafka is a beautiful system. It scales well, it is stable and it provides phenomenal system architecture flexibility. After 5 years of running production Kafka clusters, I have collected a list of tips and pitfalls. Some of them were learnt the hard way. If you work in a small team rolling out Kafka to production, those might prove useful. The article assume familiarity with basic Kafka concepts, such as brokers, topics, producers and consumers. What is more, the following points should be valid for up to Kafka 2.6.0. Without further ado.
Kafka topics consist of a (configurable) number of partitions.
If the partition number is not provided by the user, KafkaProducer
chooses the partition for the message using the key
in the ProducerRecord
instance passed to it.
Check out the default implementation:
It checks if the key is null
, and if it isn't it, computes a murmur2
hash modulo the number of partitions.
This is consistent; it will yield the same result for messages sharing the same key.
If the key is null
though, it uses a sticky partitioner
that chooses a partition randomly at every batch.
In practical terms, if no key is passed, the producer will choose the partition randomly.
This has important implications because Kafka only provides delivery ordering guarantees within a partition. Messages in the same partition will be delivered in the order they were committed. Messages in different partitions will be delivered in non-deterministic order. If the messages have any form of causal relationship between them, and they are not in the same partition, then any downstream consumer will have to collect all messages for a key before processing them, else causal consistency might be violated; whatever all means in the context of an infinite stream.
As a simple example to illustrate the above, think of a simple event like the following:
1case class EmailSubscription(2 email: Email,3 active: Boolean,4 createdOn: DateTime5)67def storeToDB(sub: EmailSubscription) = ???8val events: Stream[EmailSubscription] = ???
If emails arrive in causal order, then we can map the events statelessly:
1events.map(storeToDb)
If per-email causal order is not guaranteed though, we need to maintain enough state to know if the event we see is indeed the latest. Else if the order is reversed, we might send an email to a user who has unsubscribed. For example:
1events2 .groupMapReduce(_.email)(identity)(3 (e1, e2) => if (e1.createdOn.isAtfer(e2.createdOn)) e1 else e24 )5 .values6 .map(storeToDB)
If we key messages using email
, then Kafka will deliver all messages for a single email in the order they were inserted.
Our consumer can be completely stateless; it can fetch messages from Kafka and store them to a datastore.
If the key is null
(none provided), our stateless consuer will happily store a message from the past.
To mitigate that, at the very minimum we need to maintain the latest createdOn
date for every email.
Relying on wall clocks for causality is a very bad idea.
Assuming per key ordering can be guaranteed, downstream reducer bounds can be reduced from a Semilattice
to a Semigroup
.
In practical terms, by taking advantage of this property we can drop the commutativity requirement which unlocks easier implementations.
If designing reducers for Kafka consumers sounds interesting, let me know and I will write about it.
Providing a key is not enough. Partitioners (ie the function f: (key) => partition) are configurable. Kafka provides a few and users can roll out their own. Do NOT assume all producers are using the same partitioner.
In a complex system where Go services, Python services, Spark and other wild animals all share the same Kafka cluster, all sorts of different implementations might exist. If different services are pushing data to the same topics, an integration test would be very useful. If things go wrong, delivery to consumers will be non-deterministic and debugging it can be pure hell.
The beauty of Kafka is that data can be reprocessed as many times as needed. This forgives a lot of errors. To illustrate this one, let's assume the same dummy model:
1case class EmailSubscription(2 email: Email,3 active: Boolean,4 createdOn: DateTime5)
Let's now assume messages are serialized to json
before being pushed to Kafka.
Due to a json serialization bug, the following is pushed to the emailsubscriptions
topic:
1{2 "email": {3 "value": "aris@aris.com"4 },5 "active": false,6 "createdOn": 16000000007}
Instead of the expected:
1{2 "email": "aris@aris.com",3 "active": false,4 "createdOn": 16000000005}
Downstream consumers try to deserialize the message and fail. What can be done?
One solution would be to create a custom deserializer for those buggy instances and to add it to all the consumers. That's non trivial and error prone code; useful just for this instance.
Another solution would be to implement a consumer that would read from emailsubscriptions-1
, fix the issue, and write to emailsubscriptions-2
.
Once the offsets of the two topics are identical, producers and consumers can switch from emailsubscriptions-1
to emailsubscriptions-2
without having to update any code.
What's great about this is that those migrations can fail with no major consequences. If emailsubscriptions-2
is no good, we can run again and produce emailsubscriptions-3
and so on.
This trick also works for non-trivial schema changes, migrations and other data enrichments.
Avro and Profobuf can help in some cases, but bugs will occur and requirements will evolve in unpredictable ways.
In any case, "fixing" a topic's data by reading from it and publishing to it is rarely a good idea.
Topics should be immutable and versioning them can help in many situations where a topic's content have been corrupted.
Up until at least 2.6.0, Kafka relies on ZooKeeper. Losing connection to ZooKeeper means no ISRs (In-Sync-Replicas, more on that later), no partition leader election and eventually the brokers shut down. Thankfully @fpjunqueira and his team who created ZooKeeper are real pros, and that won't happen without reason. In fact, ZooKeeper is one of the most reliable distributed systems (that I have seen at least).
The two following mess ups have occurred though:
To clean up older transaction log snapshots in ZooKeeper 3.4.x, ZooKeeper provides the following tool:
1java -cp zookeeper-3.4.x.jar:lib/*:conf org.apache.zookeeper.server.PurgeTxnLog \2 /var/lib/zookeeper /var/lib/zookeeper -n 5
Ideally on a cron.
Those are just two examples. A lot more can go wrong. Because of the consequences of failure, proper JMX metric monitoring and real time log aggregation, all hooked up to a form of PagerDuty, are very highly recommended.
This is essentially a trade off between availability and durability. Let's start with unclean elections.
Let's assume we have a topic with a single partition and a single replica. Data is happily flowing in. If the replica is "in-sync" (aka identical to the leader and in the ISR set in ZooKeeper), then if the leader partition becomes unavailable (eg the broker crashes) then the replica can pick up, accept writes and continue with no downtime. If the replica lags behind though, the leader will remove it from the ISRs in ZooKeeper. If then the leader goes down, there are two options:
It entirely depends on the kind of data the topic holds. If the topic holds system metrics, then maybe the most recent data is more valuable and thus losing some older writes might be acceptable. If the topic contains bank transactions, going down until a human intervenes might be a better option. This is a broker level config that can be overridden per topic.
The second part of this equation is min.insync.replicas
, which represents the minimum number of replicas that have to be in-sync for a write to go through.
This is configurable at the broker level, topic level and even at the producer level (ie acks
). Same considerations as above,
if the topic holds payments, having just 1 replica with all the data might be risky.
Legendary distributed systems researcher Kyle Kingsbury, aka Aphyr, did an excellent analysis on Kafka's replication mechanism some 7 years ago. If you wish to dig deeper into this trade off, reading Aphyr's piece is very highly recommeneded. As far as I understand, the basic trade offs discussed still hold true today.
Kafka uses a LOT of those. Running out of them leads to a fatal runtime exception that will kill the broker. If the OS defaults are used, it is extremely likely that those will be reached as soon as the cluster has a few tens of thousand segments per broker. What's worse is that in a well balanced cluster where brokers hold similar numbers of partitions, those failures will occur roughly at the same time. Let's look a bit closer:
vm.max_map_count
: is the maximum number of memory map areas a process can have.
From the linux kernel docs:
max_map_count:
This file contains the maximum number of memory map areas a process may have. Memory map areas are used as a side-effect of calling malloc, directly by mmap, mprotect, and madvise, and also when loading shared libraries. While most applications need less than a thousand maps, certain programs, particularly malloc debuggers, may consume lots of them, e.g., up to one or two maps per allocation.
The default value is 65536.
Each log segment requires an index
file and a timeindex
file; each of those require 1 map area.
Each partition contains a number of log segments. How often Kafka closes a segment and opens a new one depends on segment.bytes
and segment.ms
.
The defaults are 1GB and 7 days. If for example retention is set to a year, and the segments are set to roll daily then 1 partition might have 365 log segments a year later.
Each of those will require 2 map areas. If a broker has 1000 partitions like this one it will require 365 x 2 x 1000 = 730000 map areas.
This might sound high, but if Kafka serves as the central nervous system of a tech organisation, having thousands of partitions and tens of thousands of log segments is not uncommon.
To see how many maps a broker is currently consuming, given the pid
of Kafka:
1cat /proc/:pid/maps | wc -l
Eg:
1[root@ip-* kafka-logs]# cat /proc/12054/maps | wc -l210790
To find the current limit:
1[root@ip-* kafka-logs]# sysctl vm.max_map_count2vm.max_map_count = 1048576
Note: this might not be the limit with which the process was started. To increase the limit for the current session:
1sysctl -w vm.max_map_count=1048576
Or to change in permanently:
1echo "vm.max_map_count=1048576" >> /etc/sysctl.conf
After each change the broker needs to be restarted.
If things go wrong, the logs will be littered with the following:
1java.io.IOException: Map failed2 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)3 at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:115)4 at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:105)5 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)6 at kafka.log.AbstractIndex.resize(AbstractIndex.scala:105)7 at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:167)8 at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:167)9 at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:167)10 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)11 at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:166)12 at kafka.log.AbstractIndex.close(AbstractIndex.scala:178)13 at kafka.log.LogSegment$$anonfun$close$2.apply$mcV$sp(LogSegment.scala:477)14 ... 31 more
Or:
1Caused by: java.io.IOException: Map failed2 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)3 at kafka.log.AbstractIndex.<init>(AbstractIndex.scala:63)4 at kafka.log.OffsetIndex.<init>(OffsetIndex.scala:52)5 at kafka.log.LogSegment.<init>(LogSegment.scala:77)6 at kafka.log.Log.roll(Log.scala:1238)7 at kafka.log.Log.maybeRoll(Log.scala:1194)8 at kafka.log.Log.append(Log.scala:652)9 ... 22 more10Caused by: java.lang.OutOfMemoryError: Map failed11 at sun.nio.ch.FileChannelImpl.map0(Native Method)12 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)13 ... 28 more
The quick solution is to increase the maximum map areas and restart the broker. The actual solution is to review segment sizes, retention policies and segment rolling schedules to spot inefficiencies.
From Kafka's OS configuration guide:
File descriptor limits: Kafka uses file descriptors for log segments and open connections. If a broker hosts many partitions, consider that the broker needs at least (number_of_partitions)*(partition_size/segment_size) to track all log segments in addition to the number of connections the broker makes. We recommend at least 100000 allowed file descriptors for the broker processes as a starting point.
To see the limit for the process, given the pid
:
1cat /proc/:pid/limits | grep files
Production brokers I have seen consume roughly 400K of those. 100K might not be enough in cases where a large number of consumers and segments are in play. In any case:
1#<domain> <type> <item> <value>2root soft nofile 2621443root hard nofile 1024288
can be added to /etc/security/limits.d/root.conf
to ajust those limits.
Note: Both map areas and file descriptors are OS dependent and vary depending on where Kafka is running.
In cases where Kafka is deployed on Kubernetes or Mesos, the container orchestration layer might introduce additional constraints.
A good approach would be to set segment.bytes
and segment.ms
to tiny values on a test environment and then create topics and consumers programmatically until Kafka breaks.
The purpose of Log Compaction is to retain the last known value for every key in a partition. This process retains the most valuable data, saves space and speeds up reprocessing the topics. For Log Compaction to work properly,
keys and partitioning needs to be configured (see (1.)). In practice, it is a good candidate for use cases where
a) we need to retain an entity forever and
b) this entity is updated often.
Examples can range from user profiles, to account balances to stats aggregations.
For log compaction to work, first make sure it is enabled on server.properties
(this is the default since 0.9.0):
1log.cleaner.enable=true
A second important config is the total memory used amongst all log cleaner threads:
1log.cleaner.dedupe.buffer.size=536870912
Empirically, the 128MB is a bit too little if compaction is used aggressively.
Most importantly, the Log Cleaner manager thread can die without killing the broker.
If the broker relies on Log Compaction, then make sure log-cleaner.log
is monitored for errors.
If those threads die, then the broker might sit on a ticking disk bomb.
Thank you for reading!
If something isn't right in the above, please let me know. Ditto for general feedback, or requests to expand on anything.