Log the sizes allocated through the SimpleMemoryPool for analysis.#380
Log the sizes allocated through the SimpleMemoryPool for analysis.#380wyuka wants to merge 2 commits into
Conversation
| public class MemoryPoolStatsStore { | ||
| private static final Logger log = LoggerFactory.getLogger(MemoryPoolStatsStore.class); | ||
|
|
||
| private final AtomicInteger[] histogram; |
There was a problem hiding this comment.
Is there a reason to not use the metrics object but implement this by hand?
There was a problem hiding this comment.
Yes. Kafka metrics support publishing histograms only as percentiles. This will not be useful information for us to determine the range of most frequent memory allocations. For example, consider that around 60% of requests are of size 1 kb. Now, if we observe from metrics that the p90 request size is 400k and the p99 size is 1500k, we are unable to make any inference about the fact that most requests are within 1 kb range.
Hence, we are publishing the full histogram as logs.
| @@ -114,6 +119,7 @@ public boolean isOutOfMemory() { | |||
| //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code. | |||
| protected void bufferToBeReturned(ByteBuffer justAllocated) { | |||
| this.allocateSensor.record(justAllocated.capacity()); | |||
There was a problem hiding this comment.
Is the metrics from this allocateSensor not good enough?
If so, can you please explain why?
There was a problem hiding this comment.
This sensor relies on the Stat class, and it appears that it can record statistics like max, sum, avg, and not the entire histogram.
| memoryPoolStatsStore.clear() | ||
| } | ||
|
|
||
| val histogramPublisher = new KafkaScheduler(threads = 1, "histogram-publisher-") |
There was a problem hiding this comment.
Can we reuse the same scheduler
since the startup and shutting down of that scheduler is already taken care of.
There was a problem hiding this comment.
Yeah we can do that. I was not sure myself, because we use this kafkaScheduler in some places inside KafkaServer, while in other places we allocate a new KafkaScheduler for other purposes. So I was confused as to which way to go.
I have fixed it.
| class MemoryPoolStatsLogger extends Logging { | ||
| override lazy val logger = MemoryPoolStatsLogger.logger | ||
|
|
||
| def logStats(memoryPoolStatsStore: MemoryPoolStatsStore): Unit = { |
There was a problem hiding this comment.
It seems everything in this class can be moved into the object MemoryPoolStatsLogger
| log.debug("Requested bytes {} for allocation exceeds maximum recorded value {}", bytes, maxSizeBytes); | ||
| return -1; | ||
| } else { | ||
| return (bytes - 1) / segmentSizeBytes; |
There was a problem hiding this comment.
Would it be better to record the bytes in log scale instead of linear scale?
[LI-HOTFIX]
TICKET = LIKAFKA-43625
LI_DESCRIPTION = We want to re-use memory buffers for request deserialization. To do this, we need to gather statistics on the most frequent allocation sizes on current clusters. We need histograms of the number of requests for every allocation size. This change enables us to specify Kafka configuration to log this histogram every few (configurable amount of) minutes.
EXIT_CRITERIA = This PR will be reverted once we determine the right range of sizes to use for the memory pool as part of the work on the aforementioned ticket.
Committer Checklist (excluded from commit message)