## Consumer 拉取消息的实现

### Consumer 拉取消息的数量

org.apache.kafka.clients.consumer.internals.Fetcher#fetchedRecords 可以看到 maxPollRecords(max.poll.records 配置) 变量限制了每次 poll 的消息条数，不管 consumer 对应多少个 partition，从所有 partition 拉取到的消息条数总和不会超过 maxPollRecords

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches 可以看到 fetchSize(max.partition.fetch.bytes 配置) 用于每次创建 FetchRequest 时的 org.apache.kafka.common.requests.FetchRequest.PartitionData 的参数设置。fetchSize限制了 consumer 每次从每个 partition 拉取的数据量。

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See “ + FETCH_MAX_BYTES_CONFIG + “ for limiting the consumer request size.

## Consumer 的心跳机制

org.apache.kafka.clients.consumer.internals.AbstractCoordinat 中启动 HeartbeatThread 线程来定时发送心跳和检查 consumer 的状态。

## partition 的数量设置

• 一个 partition 只能被 Consumer Group 中的一个 consumer 消费，因此，为了提高并发量，可以提高 partition 的数量，但是这会造成 replica 副本拷贝的网络请求增加，故障恢复时的耗时增加。因为 kafka 使用 batch pull 的方式，所以单个线程的消费速率还是有保障的。并且 partition 数量过多，zk 维护 ISR 列表负载较重。

• partiton 数量最好是 consumer 数目的整数倍，比如取 24， consumer 数目的设置就会灵活很多。

• consumer 消费消息时不时严格有序的。当从多个 partition 读数据时，kafka 只保证在一个 partition 上数据是有序的，多个 partition 的消息消费很可能就不是严格有序的了。

## 参数设置

### session.timeout.ms

consumer session 过期时间。如果超时时间范围内，没有收到消费者的心跳，broker 会把这个消费者置为失效，并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳，更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容，尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间，

### max.poll.interval.ms

This config sets the maximum delay between client calls to poll().

When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request.

As soon as the consumer resumes processing with another call to poll(), the consumer will rejoin the group.

By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned frompoll(long). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.

### max.poll.records

Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

0.11.0 Kafka 的默认配置是

• max.poll.interval.ms=5min
• max.poll.records=500

## Kafka Javadoc - Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
It is also possible that the consumer could encounter a “livelock” situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don’t call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a CommitFailedException thrown from a call to commitSync()). This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.