Keeping track of the size of a Kafka topic and watching it vary over time helped us uncover unexpected patterns that might seem mysterious to the inexperienced cluster administrator. Even with limits set, the reported size can vary vastly. Periodic spikes upon deletion of data can be seen, with a frequency that varies from topic to topic. Deleting data from Kafka topics can be even more disconcerting, as under certain conditions information long past the retention policy can be unburied from the depths of the topic.
In this blog post, we'll share a few scenarios that left us scratching our heads. These were not obvious at first sight and in some cases, it required us to read Kafka’s source code to shed light on what was going on.
Overview of Kafka Architecture
Before we jump into the juicy details, let’s quickly review how Kafka works and stores its information. A Kafka cluster is made up of one or more Kafka brokers. Kafka clusters contain topics, that act like a message queue where client applications can write and read their data. Topics are divided into partitions and these partitions are distributed among the Kafka brokers. The brokers do not usually own all the partitions for all the topics. They only get to have some partitions per topic.
A partition can be replicated to other brokers, and that’s how Kafka provides redundancy: if a broker fails, the partitions it had can be found spread in the rest of the brokers. Partitions are stored in disk as ‘segments’ and are 1 GB in size by default. So the bigger the partition, the more segments it is going to have in the disk.
There is plenty of information on how Kafka works internally, so our modest overview will not delve any deeper. Without further ado, we will talk now about the intriguing scenarios that startled us when our Kafka topics started growing in size and the deletion policies were not acting as we expected.
Spikes of Deletion
We’ll start with a situation where topics would experience spikes of deletion that differed in frequency, even though their configuration was exactly the same. As these topics are configured to keep data for a week, we were expecting a constant drop of information instead of the spikes, similar to dozens of other topics we have in our cluster. So why were we seeing spikes?
We noticed this scenario when we started accumulating data and were able to see the retention and cleaning policies in action. In Figure 1 we show the size of four topics, including replication. As you can see, two of them are spiky while the other two follow an expected pattern.
Originally all of them were spiky, but when we found the reason for the spikes, we were able to re-configure two of the topics so as to get the expected behavior.
So, what’s the reason behind these spikes? Let’s understand this by studying the purple line in Figure 1, which represents a topic whose size varies from 16GB to 32GB. It’s a topic with 32 partitions and a replication factor of 3. The rest of the parameters have their default value. By default, segments are rolled out when they reach 1GB in size or once a week (default), whichever comes first.
The key to understanding the spikes is to notice that the deletion policies do not act on active segments. If segments don’t get full, they will not get rolled out until the number of hours defined in log.roll.hours is met, the default being one week. So the spikes that we see in Figure 1 are the sum of the sizes of the segments for all the topic’s partitions that don’t reach 1GB in seven days. We see the big drops because they all get rolled out at the same time once a week, when the time period is met.
If the deletion spikes occur less frequently, let's say five days, it's because the segments of that topic reached their maximum size and were rolled out. That was the case for other topics we have: the amount of data that we would see deleted was huge, sometimes 60 GB or more. After researching this we understood that it was because the segments were getting full. We had 32 partitions, and each one of them is replicated 3 times and usually, all the segments are rolled out at the same time.
Spikes by themselves are harmless, but it’s better to have topics that don’t swing in size so we can use the disk space more efficiently. The spikes can be avoided by decreasing the value of the segment.bytes parameter, which specifies the segment size, to a value in accordance with the amount of data to be handled by the topic. In Figure 1, the segment size for the two topics that are not spiky was reduced to 128 MB.
When Topics Grow
The second situation we’ll discuss is related to the size of certain topics, as they were bigger than expected. Some applications we developed need to load and process all the data from a topic before being able to work, and due to the unusual size of the topic, the bootstrapping time of these applications would take almost a minute, instead of the near-instantaneous time that we were expecting. Having big topics was detrimental to the application’s performance. In order to clean up older records in the topic and thereby restrict the topic size, we use Kafka’s delete policy. In this particular scenario, even though we configured the maximum size of a topic or the maximum time to wait before deleting log files, the topic size would consistently get larger.
After some research, we found out that delete policies don’t have fine granularity: it can delete segments, but not old data that is inside a segment. As such, it will only delete segments for which the totality of information is older than the retention time. This results in Kafka deleting the segment as a whole only when the last added log is older than the specified retention time. Naturally, this behavior creates a lot of fluctuation in the size of a topic, sometimes resulting in unexpectedly large sizes. Once again, reducing the value of segment.bytes helps in achieving a consistent size.
Old Data in Topics Not Being Deleted
The last situation we’ll discuss is the most mysterious one. We had configured a topic with a retention period of one week. During a normal week it would receive very little data, so in theory, the topic would always load fast and keep just a few megabytes of space. Turns out the topic was neither small nor retained information for just a week — we were able to mysteriously retrieve information several months old! The protagonist of this anecdote is a topic configured with a deletion policy of compact,delete.
Even though we were using a compact,delete policy with a one-week retention period, we were seeing messages several months old. How could this happen? Well, as the name of the policy implies, Kafka will first run the compact process and then the delete process. Compaction works by analyzing the whole segment and leaving only the latest entry for each message key. The older entries get deleted. In order to avoid having multiple smaller segments, the compact policy will merge them.
The described behavior has the following consequence. During the compact phase, unrepeated record keys that are at the beginning of the topic, located in the first segments of a partition, will end up being merged with segments that contain newer records. The newer segments, of course, will have keys that are still in the retention time frame. After compaction is finished, Kafka will trigger the delete policy. We already mentioned that this policy thinks in segments, not in individual messages and that it will not delete segments that contain information that has not expired. As the oldest segment contains both old and new keys, it won't be deleted, hence the consumer of this topic will see information that is very old! A workaround for avoiding this corner case is, once again, to reduce the size of segment.bytes.
When creating a new topic the amount of partitions is not the only important parameter to configure. The segment size is a key parameter if the amount of data to hold per segment isn't expected to surpass the default segment size during the retention period. Also, when considering the total size of a topic, you have to take into account the number of partitions, the deletion policy, and the replication factor.
Our initial confusion came from not understanding in depth how the deletion policies work. There is a lot of documentation based on how the compact policy works but not that much regarding the delete policy. Mainly what compact does is to mark some logs in the segment for deletion. If the segments become too small, Kafka will merge them.
The main confusion in the delete policy comes from assuming that Kafka will always delete the data older than the specified retention time. Particularly, it's a mistake to assume that the log manager will delete data from segments that have data newer than the specified retention time frame.
Each of these “Kafkaesque” situations was tackled down and a perfectly logical explanation was found. For some of them, the reason was a misinterpretation of the documentation, for others it was the obscure behavior that Kafka can have under certain conditions. As one would expect, there are continuous enhancements in Kafka itself and it is possible these scenarios have been addressed in newer versions.