0%

Problem Description

Given n non-negative integers representing an elevation map where the width of each bar is 1, compute how much water it is able to trap after raining.

For example,
Given [0,1,0,2,1,0,1,3,2,1,2,1], return 6.

problem link:
https://leetcode.com/problems/trapping-rain-water

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TrappingRainWater_42 {
public int trap(int[] height) {
int a = 0;
int b = height.length - 1;
int max = 0;
int leftMax = 0;
int rightMax = 0;
while (a <= b) {
leftMax = Math.max(leftMax, height[a]);
rightMax = Math.max(rightMax, height[b]);
if (leftMax < rightMax) {
// leftMax is smaller than rightMax, so the (leftMax-A[a]) water can be stored
max += (leftMax - height[a]);
a++;
} else {
max += (rightMax - height[b]);
b--;
}
}
return max;
}
}

算法解释

对任意位置 i,在 i 上的积水,由左右两边最高的 bar 决定。

_说明: kafka 版本号为 0.11.0_

Consumer 拉取消息的实现

在 Kafka Consumer 正常消费时,观察其调用堆栈。

1
2
3
4
5
6
7
8
9
10
11
"pool-16-thread-7" #154 prio=5 os_prio=0 tid=0x00007ff581c8c000 nid=0x326d runnable [0x00007ff5468e7000]
java.lang.Thread.State: RUNNABLE
...
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2e04f90> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
...

对应的代码实现是 org.apache.kafka.clients.consumer.KafkaConsumer#poll,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public ConsumerRecords<K, V> poll(long timeout) {
...
try {
...
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);

if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();

if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));

long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}

其中 org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
...
// ConsumerCoordinator coordinator;
coordinator.poll(time.milliseconds(), timeout);
...

// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;

// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
...
return fetcher.fetchedRecords();
}

所以可以看到 consumer 每次 poll 时是先从 fetcher 中 fetchedRecords 的,如果拿不到结果,就新发起一个 sendFetches 请求。

Consumer 拉取消息的数量

org.apache.kafka.clients.consumer.internals.Fetcher#fetchedRecords 可以看到 maxPollRecords(max.poll.records 配置) 变量限制了每次 poll 的消息条数,不管 consumer 对应多少个 partition,从所有 partition 拉取到的消息条数总和不会超过 maxPollRecords

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches 可以看到 fetchSize(max.partition.fetch.bytes 配置) 用于每次创建 FetchRequest 时的 org.apache.kafka.common.requests.FetchRequest.PartitionData 的参数设置。fetchSize限制了 consumer 每次从每个 partition 拉取的数据量。
不过,还是看代码中的 ConsumerConfig#MAX_PARTITION_FETCH_BYTES_DOC 说明吧:

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See “ + FETCH_MAX_BYTES_CONFIG + “ for limiting the consumer request size.

poll 和 fetch 的关系

在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

Consumer 的心跳机制

org.apache.kafka.clients.consumer.internals.AbstractCoordinat 中启动 HeartbeatThread 线程来定时发送心跳和检查 consumer 的状态。
每个 Consumer 都有一个 ConsumerCoordinator(继承 AbstractCoordinator),每个 ConsumerCoordinator 都启动一个 HeartbeatThread 线程来维护心跳,心跳信息存放在 org.apache.kafka.clients.consumer.internals.Heartbeat

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();

if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

sessionTimeout

如果是 sessionTimeout 则 Mark the current coordinator as dead,此时 会将 consumer 踢掉,重新分配 partition 和 consumer 的对应关系。

在 Kafka Server 端,Consumer 的 Group 定义了五个状态::
Consumer Group State

pollTimeout

如果是 pollTimeout 则 Reset the generation and memberId because we have fallen out of the group,此时 consumer 会退出 group,当再次 poll 时又会 rejoin group 触发 rebalance group。

Rebalance Generation

表示 rebalance 之后的一届成员,主要是用于保护 consumer group,隔离无效 offset 提交。每次 group 进行 rebalance 之后,generation 号都会加 1,表示 group 进入到了一个新的版本,下图所示为 consumer 2 退出后 consumer 4 加入时 Rebalance Generation 的过程:
Rebalance Generation

partition 的数量设置

  • 一个 partition 只能被 Consumer Group 中的一个 consumer 消费,因此,为了提高并发量,可以提高 partition 的数量,但是这会造成 replica 副本拷贝的网络请求增加,故障恢复时的耗时增加。因为 kafka 使用 batch pull 的方式,所以单个线程的消费速率还是有保障的。并且 partition 数量过多,zk 维护 ISR 列表负载较重。

  • partiton 数量最好是 consumer 数目的整数倍,比如取 24, consumer 数目的设置就会灵活很多。

  • consumer 消费消息时不时严格有序的。当从多个 partition 读数据时,kafka 只保证在一个 partition 上数据是有序的,多个 partition 的消息消费很可能就不是严格有序的了。

参数设置

heartbeat.interval.ms

心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。
这个值必须设置的小于 session.timeout.ms,因为:
当 consumer 由于某种原因不能发 heartbeat 到 coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。

参数值

默认值:3000 (3s),通常设置的值要低于session.timeout.ms的1/3。

session.timeout.ms

consumer session 过期时间。如果超时时间范围内,没有收到消费者的心跳,broker 会把这个消费者置为失效,并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳,更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容,尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间,

参数值

默认值:10000 (10s),这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。

max.poll.interval.ms

This config sets the maximum delay between client calls to poll().

When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request.

As soon as the consumer resumes processing with another call to poll(), the consumer will rejoin the group.

By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned frompoll(long). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.

参数设置大一点可以增加两次 poll 之间处理消息的时间。
当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。

但如果设置的太大,会延迟 group rebalance,因为消费者只会在调用 poll 时加入rebalance。

max.poll.records

Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

0.11.0 Kafka 的默认配置是

  • max.poll.interval.ms=5min
  • max.poll.records=500

即平均 600ms 要处理完一条消息,如果消息的消费时间高于 600ms,则一定要调整 max.poll.records 或 max.poll.interval.ms。

Kafka Javadoc - Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
It is also possible that the consumer could encounter a “livelock” situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don’t call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a CommitFailedException thrown from a call to commitSync()). This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

Reference

Kafka消费组(consumer group)
kafka.apache.org javadoc
Coordinator实现原理
kafka params
kafka源码分析之kafka的consumer的负载均衡管理
Group Management Protocol
Kafka 之 Group 状态变化分析及 Rebalance 过程
KIP-62: Allow consumer to send heartbeats from a background thread
Kafka: The Definitive Guide Chapter 4 - Kafka Consumers

运行环境说明

_kafka 版本号为 0.11.0_

Kafka Consumer 的参数配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Object> getDefaultConsumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();

// 手动设置自动提交为false,交由 spring-kafka 启动的invoker执行提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

// 从partition中获取消息最大大小
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "102400");

return propsMap;
}

Consumer 卡顿现象

Consumer 卡顿时的日志

每次卡顿不消费时都出现以下日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11429299 for partition my_topic-27 returned fetch data (error=NONE, highWaterMark=11429299, lastStableOffset = -1, logStartOffset = 10299493, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_UNCOMMITTED fetch request for partition my_topic-27 at offset 11429299 to node p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_UNCOMMITTED fetch for partitions [my_topic-27] to broker p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG kafka-coordinator-heartbeat-thread | myConsumerGroup org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending Heartbeat request for group myConsumerGroup to coordinator p-kafka-host-02:9092 (id: 2147483642 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-21] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]

...

2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11426689 for partition my_topic-21 returned fetch data (error=NONE, highWaterMark=11426689, lastStableOffset = -1, logStartOffset = 10552294, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group myConsumerGroup committed offset 11429849 for partition my_topic-18
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=myConsumerGroup, sessionTimeout=30000, rebalanceTimeout=300000, memberId=p-my-consumer-host-03-12-97c12fb0-9bb7-4762-8478-538f06be9e90, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@54371fac)) to coordinator p-kafka-02.ali.keep:9092 (id: 2147483642 rack: null)

其中最重要的部分是:

2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]

2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup

那为什么每次会这样呢?我们是有单独的线程在发起心跳的!!!

Consumer 卡顿时的 jstack

观察日志可以发现,卡顿时 ConsumerCoordinator 在不停地 rejoin group,并且做 rebalance,所以需要对比在正常和卡顿这两种情况下 ConsumerCoordinator 的行为。

正常时的 ConsumerCoordinator

1
2
3
cat jstack.normal.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
22 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

卡顿时的 ConsumerCoordinator

1
2
3
4
5
cat jstack.pause.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:920)
8 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

根据以上的现场信息,可以发现关键就在 AbstractCoordinator.ensureActiveGroup 这一步,继续观察 jstack.pause.log 中的相关堆栈信息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"pool-16-thread-14" #167 prio=5 os_prio=0 tid=0x00007f5b19dbf000 nid=0x7ac2 runnable [0x00007f5ae4ccb000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c2e816b0> (a sun.nio.ch.Util$2)
- locked <0x00000000c2e816a0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c2e742a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:529)
at org.apache.kafka.common.network.Selector.poll(Selector.java:321)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2f00da0> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

卡顿原因分析

卡顿原因:Consumer 在 Region Group

根据以上信息,结合 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 的代码可以发现在
ConsumerCoordinator#poll 中判断 needRejoin() 为 true 时会调用 ensureActiveGroup() 函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void poll(long now, long remainingMs) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
...
if (needRejoin()) {
...
ensureActiveGroup();
...
}
} else {
...
}
}

pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}

Region Group 原因:Consumer Leave Group

那么问题就是什么情况下 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#needRejoin 会返回 true,我们还是看看他的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;

// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;

// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;

return super.needRejoin();
}

kafka metadata 什么时候变化????

可以看到,不是 metadataSnapshot 有变化,也不是 订阅者 subscriptions 有变化,那就是 super.needRejoin() 返回了 true,问题就转到了 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#needRejoin 这个函数,其实现是:

1
2
3
protected synchronized boolean needRejoin() {
return rejoinNeeded;
}

从代码上看 rejoinNeeded 的整个变化过程,初始化为 true,在 initiateJoinGroup 成功后,会赋值为 false,在 maybeLeaveGroup 时会赋值为 true,所以怀疑卡顿时是 consumer leave group 了。

Consumer Leave Group 原因:pollTimeoutExpired

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread#run 中调用了 maybeLeaveGroup() 函数,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();

if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

所以是 pollTimeoutExpired 引起了 leave group.

根本原因:pollTimeoutExpired

pollTimeoutExpired 的原因是两次 poll 的时间间隔超过了设置的 maxPollInterval 值。

解决方案

调整以下参数

  • max.poll.records:100 (默认值 500)
  • max.poll.interval.ms:600000 (默认值 300000,也就是5分钟)

后续

至此,问题已经解决了,但是有一些疑问。

  • 对于这两个参数值的设定, 是 max.poll.records 越小越好,max.poll.interval.ms 越大越好吗?
  • 已经设置过的 session.timeout.msheartbeat.interval.ms难道没用吗?为什么有这么多超时参数的设置啊?
  • 已经设置过的 max.partition.fetch.bytes 没用吗?为什么还要设置 max.poll.records 啊?
  • 整体上还需要调哪些参数才可以让 consumer 运行正常,或者是性能达到最大呢?

在下一篇博客「Kafka Consumer 的实现」中,将会继续分析 Kafka Consumer 的消费过程和参数配置,试图回答以上问题。

指数分布族(Exponential Family)

指数分布族的定义

若一类概率分布可以写成如下形式,那么它就属于指数分布族:

  • $\eta$: 自然参数,通常是一个实数
  • T(y): 充分统计量,通常,T(y)=y,实际上是一个概率分布的充分统计量(统计学知识)
  • a($\eta$) 被称为 log partition function

对于给定的 a,b,T 三个函数,上式定义了一个以 $\eta$ 为参数的概率分布集合,即改变 $\eta$ 可以得到不同的概率分布,例如高斯分布和伯努利分布。

指数分布族以及它们的特征

  • 正态分布(高斯分布)——总体噪音(由中心极限定理得)
  • 伯努利分布——逻辑回归(对01问题建模)
  • 多项式分布——K种结果的事情进行建模
  • 泊松分布——对计数过程进行建模(一个样本中放射性衰变的数目,网站的访客数目,商店的顾客数目)
  • 伽马分布,指数分布——正数的分布,对间隔进行建模(在公交车站等车的时间)
  • β分布,Dirichlet分布——对小数进行分布,对概率分布进行建模
  • Wishart分布——协方差的分布

指数分布簇推导

高斯分布(Gaussian)和伯努利(Bernoulli)分布都可以推导为指数分布族。

伯努利分布的推导

伯努利分布的概率公式为:$P(y=1;\phi)=\phi; P(y=0;\phi)=1-\phi;$

公式可经如下变换:

对应的指数分布族的参数为:
$T(y) = y$
$b(y) = 1$
$\eta = log(\frac\phi{1-\phi}) => \phi=\frac1{1+e^{-n}}$
$a(\eta) = -log(1-\phi) = log(1+e^n)$

高斯分布的推导

在线性回归中,$\sigma$ 对于模型参数 $\theta$ 的选择没有影响,为了推导方便我们令 $\sigma = 1$。
则有:

对应的指数分布族的参数为:
$T(y) = y$
$b(y) = \frac{1}{\sqrt{2\pi}}exp(-\frac12y^2)$
$\eta = \mu$
$a(\eta) = \frac{ {\mu}^2}2 = \frac{ {\eta}^2}2$

广义线性模型(Generalized Linear Model)

想用 广义线性模型对一般问题进行建模首先需要明确几个 假设:

  1. $y | x;θ \sim ExponentialFamily(\eta)$ y的条件概率属于指数分布族;
  2. 给定 x 广义线性模型的目标是求解 T(y) | x, 不过由于 很多情况下 T(y) = y 所以我们的目标变成了 y | x , 也即 我们希望拟合函数为 h(x) = E[y|x] (这个条件在线性回归和逻辑回归中都满足, 例如在逻辑回归中 $hθ(x) = p(y = 1|x;\theta) = 0 \cdot p(y = 0|x; \theta) + 1 \cdot p(y = 1|x; \theta) = E[y|x;\theta])$
  3. 自然参数 $\eta$ 与 x 是线性关系:$\eta=\theta^Tx$ ($\eta 为向量时 \eta_{i} = \theta_{i}^Tx$)

有了如上假设,就可以进行建模和求解了。

对于伯努利分布,可以推导出:

这也就是逻辑回归中 sigmod 函数的由来。

多分类算法(Softmax Regression)

y有多个可能的分类:{1, 2, …, k}

=======具体的公式略=======

最后求借寻找最佳参数时,跟最小二乘和逻辑回归的解法类似,可以用梯度下降法或者牛顿迭代法。

Referecen

广义线性模型(Generalized Linear Model)

逻辑回归

对于逻辑回归而言,y 的取值不是 0 就是 1,所以 $h_θ(x)$ 可以写为

其中

梯度上升法求解逻辑回归

对于给定的逻辑回归函数,我们使用最小二乘法来推导出最大似然估计,假设:
$P(y=1|x;θ)=h_θ(x)$,代表对于给定的 θ,y 取值为 1 的概率。
$P(y=0|x;θ)=1-h_θ(x)$,代表对于给定的 θ,y 取值为 0 的概率。

以上两者可以合并为:

假设 m 个训练集是相互独立的,则似然估计为:

和之前一样,上式可以简化为:


$l(θ) = logL(θ)
= \sum_{m}^{i=1}{y^{(i)}}log{h(x^{(i)}) + {(1−y^{(i)})}log(1 − h(x^{(i)}))}$

那么,
如何去最大化似然函数呢,可以应用梯度上升法,因为我们要使 P 的取值足够大,也是就预测准确的概率最够大。

随机梯度上升的公式为:

下面来求$\Deltaθl(θ)$的取值:

附上手写的推导过程:
手写推导过程

所以,最终随机梯度上升的公式为:

如何和线性回归的公式放在一起比较,

会发现,这两者非常相似,实际上却不然,因为这里的 $(h_θ(x^{(i)})$ 定义的不是线性函数。后续我们谈到 GLM 时会发现这并不是巧合,而是有更深层次的原因。

牛顿迭代法求解逻辑回归

牛顿迭代法可以利用到曲线本身的信息,比梯度下降法更容易收敛,即迭代更少次数。

牛顿迭代法简述

假设我们要求解方程 f(x)=0 的根,首先随便找一个初始值 x0,如果 x0 不是解,做一个经过 (x0,f(x0)) 这个点的切线,与 x 轴的交点为 x1。同样的道理,如果 x1 不是解,做一个经过 (x1,f(x1)) 这个点的切线,与 x 轴的交点为 x2。 以此类推。以这样的方式得到的 xi 会无限趋近于 f(x)=0 的解。

对于任意一点 $(x_n,y_n)$ 做切线,切线的斜率为 $f’(x_n)$,则有方程:

迭代过程

求解 $f(\theta)$ = 0 时 $\theta$ 的取值。
设下一次迭代时 $\theta^{(t+1)}$ 的取值与前一次迭代 $\theta^{(t)}$ 的取值(在 x 轴)距离为 $\Delta$。

则 $\theta^{(t+1)} = \theta^{(t)} - \Delta$,且 $\Delta = \frac{f(\theta^{(t)})}{f’(\theta^{(t)})}$,
所以有:

从泰勒展开到牛顿迭代

也可以由泰勒展开中推导牛顿迭代的公式。这次为了求解方程 f′=0 的根,把原函数 f(x) 的做泰勒展开,展开到二阶形式:

当且仅当 $\Delta x$ 逼近 0 时,上式成立,此时忽略 1/2 系数的作用,所以有:

故:

对函数求极大值的方法
>

  1. 将原函数y=f(x),对x求一次导数,得到dy/dx;
  2. 令dy/dx = 0,解得一次导函数的零点;
  3. 将原函数对x求二次导函数;
  4. 将解得的零点坐标的x值代入二次导函数,
    如果是正值,零点所在位置,就是极小值点,再将该x值代入原函数,得到极小值;
    如果是值值,零点所在位置,就是极大值点,再将该x值代入原函数,得到极大值;
    如果是0,零点所在位置,既不是极小值点,也不是极大值点,是拐点。

所以求 $l(\theta)$ 在极大值处 $\theta$ 的取值,则是求 $l’(\theta) = 0$ 时 $\theta$ 的值,应用牛顿迭代法则有:

多维向量的牛顿迭代

对于多维向量 $\overrightarrow{X}$ 求解。

其中
$\nabla l(\theta)$ 是对 $l(\theta)$ 求导的值。

H 是一个 n*n 的矩阵,n 是特征数量,元素的计算公式为:

牛顿迭代法的特点

是否收敛

通常情况下是收敛的,但是需要满足一些条件,对于逻辑回归来讲,是收敛的。

迭代速度

每次迭代后,有解数字的误差是成平方倍减小的,是二次收敛函数。

优缺点

优点:收敛快
缺点:特征多(上千个)时,每次迭代成本大

Reference

http://blog.csdn.net/baimafujinji/article/details/51179381
http://blog.csdn.net/baimafujinji/article/details/51167852
如何通过牛顿方法解决Logistic回归问题

欠拟合与过拟合

欠拟合:underfitting,与训练数据贴合的不够好,不能准确预测未来目标值。
过拟合:overfitting,与训练数据贴合的太好了,预测未来目标值的准确性有较大风险。

线性模型的概率解释

思考:我们为什么要用最小二乘的指标作为 cost function?为什么不是绝对值或四次方?

最小二乘法(又称最小平方法)是一种数学优化技术。它通过最小化误差的平方和寻找数据的最佳函数匹配。
最小二乘是从函数形式上来看的,极大似然是从概率意义上来看的。事实上,最小二乘可以由高斯噪声假设+极大似然估计推导出来。当然极大似然估计还可以推导出其他的loss function, 比如logistic回归中,loss function是交叉熵。
最大似然估计与最小二乘估计的区别

一般的最小二乘法实际上是在假设误差项满足高斯分布且独立同分布的情况下,使似然性最大化。

推导过程

回到预测房价的例子,假设最终的预测函数,每一次预测都有误差,用$ε^{(i)}$表示误差,则预测函数可以写为:

其中,误差是随机分布的,均值为 0,服从高斯分布 $N(0,σ^2)$。

Andrew Ng 讲到在大多数情况下,线性回归的误差值如果综合来看,就是符合高斯分布的。并且根据中心极限定律,正态分布确实是对误差项分布的合理猜想。

所以

$P(y^{(i)}|x^{(i)}; θ)$ 表示:在 θ 为给定的参数的情况下,概率 $y^{(i)}$ 以 $x^{(i)}$ 为随机变量的概率分布,注意 θ 不是随机变量。

由于 ε(i) 是独立的同分布(IID:independentlyidentically distribution),所以以 θ 为变量的似然函数为:

对 L(θ) 取对数有:

最大化 $l(\theta)$ 即是最小化 $\frac1{2\sigma^2}\sum_{i=1}^{m}(y^{(i)}-\theta^Tx^{(i)})^2$,这样就是 cost function.

由于目标变量服从正态分布,但分布的均值和方差都未知,对均值和方差两个参数的合理估计是选取两个参数使得在正态分布的前提下,抽到各样本中的 y 值的概率最大,这就是最大似然估计的思想。

Reference

http://www.holehouse.org/mlclass/07_Regularization.html
http://rstudio-pubs-static.s3.amazonaws.com/4810_06e3d8fd26ed40eb8c31aff35eae81ae.html
https://rpubs.com/badbye/ml03
http://www.qiujiawei.com/linear-algebra-15/
最大似然估计

多变量的线性回归

n: 特征(features) 数量
m: 训练集数量
$x^{(i)}$:

  • 表示一条训练数据的向量
  • i is an index into the training set
  • So
    • x is an n-dimensional feature vector
    • $x^{(3)}$ is, for example, the 3rd training data

$x^{(j)}_i$: The value of feature j in the ith training example

例如,当 n=4 时:

For convenience of notation, $x_0$ = 1, 所以最后的特征向量的维度是 n+1,从 0 开始,记为”X”,
则有:

$θ^T$: [1 * (n+1)] matrix

多变量的梯度下降

Cost Function

Gradient descent

Repeat {

}

every iterator

  • θj = θj - learning rate (α) times the partial derivative of J(θ) with respect to θJ(…)
  • We do this through a simultaneous update of every θj value

Gradient Decent in practice

Feature Scaling

假设只有 $x_1$,$x_2$ 两个变量,其中:$x_1\in(0,2000), x_2\in(1,5)$,则最后的 J(θ) 图形是一个椭圆,在椭圆下用梯度下降法会比圆形要耗时更久,So we need to rescale this input so it’s more effective,有很多方式,一种是将各个 feature 除以其本身的最大值,缩小范围至[0,1],一种是各个 feature 减去 mean 然后除以最大值,缩小范围至[-0.5,0.5]

Learning Rate α

  • working correctly: If gradient descent is working then J(θ) should decrease after every iteration
  • convergence: 收敛是指每经过一次迭代,J(θ)的值都变化甚小。
  • choose α
    1. When to use a smaller α
      • If J(θ) is increasing, see below picture
      • If J(θ) looks like a series of waves, decreasing and increasing again
      • But if α is too small then rate is too slow
    2. Try a range of α values
      • Plot J(θ) vs number of iterations for each version of alpha
      • Go for roughly threefold increases: 0.001, 0.003, 0.01, 0.03. 0.1, 0.3

Features and polynomial regression

Can create new features

如何选择 features 和表达式尤为关键,例如房价与房子的长,房子的宽组成的表达式就会麻烦很多,若将房子的长乘以房子的宽得出面积,则有房价与房子面积的表达式,将会更容易拟合出房价的走势。

Polynomial regression

例如房价的走势,如下图,横坐标 x 为房子的面积,纵坐标为房价,使用一元二次的方程,会得出下图的蓝色曲线。容易得到房价今后会有一个下降的过程,可实际上房价是不会随着面积的增大而下降的。所以需要重新选定 Polynomial regression,可以改为使用一元三次的方程或者使用平凡根的方程。

所以选择合适的 Features 和 Polynomial regression 都非常重要。

Normal equation 求解多变量线性回归

Normal equation

举例说明,假设 J(θ) 是一元二次方程,如:J(θ)=a$θ^2$+bθ+c,则令 即可,求出最终的 θ 则得到了线性回归方程,可以预测出今后的 y 值。

更普遍地,当 θ 是一个 n+1 维的向量时,θ $\in$ $R^{n+1}$,则 cost function 如下:

只需要令:

Gradient descent Vs Normal equation

Gradient descent

  • Need to chose learning rate
  • Needs many iterations - could make it slower
  • Works well even when n is massive (millions)
  • Better suited to big data
  • What is a big n though: 100 or even a 1000 is still (relativity) small, If n is 10000 then look at using gradient descent
  • 适用于线性回归会逻辑回归

Normal equation

  • No need to chose a learning rate
  • No need to iterate, check for convergence etc.
  • Normal equation needs to compute $(X^TX)^{-1}$
    • This is the inverse of an n x n matrix
    • With most implementations computing a matrix inverse grows by O(n3), So not great
  • Slow of n is large, Can be much slower
  • 仅适用于线性回归

局部加权线性回归

局部加权回归(locally weighted regression)简称 loess,其思想是,针对对某训练数据的每一个点,选取这个点及其临近的一批点做线性回归;同时也需要考虑整个训练数据,考虑的原则是距离该区域越近的点贡献越大,反之则贡献越小,这也正说明局部的思想。其 cost function 为:

其中

$w^{(i)}$的形式跟正态分布很相似,但二者没有任何关系,仅仅只是便于计算。可以发现,$x^{(j)}$ 离 $x^{(i)}$ 非常近时,${w^{(i)}_j}$ 的值接近于1,此时 j 点的贡献很大,当 $x^{(j)}$ 离 $x^{(i)}$ 非常远时,${w^{(i)}_j}$ 的值接近于 0,此时 j 点的贡献很小。

$\tau^2$ 是波长函数(bandwidth), 控制权重随距离下降的速度,τ 越小则 x 离 $x^{(i)}$ 越远时 $w^{(i)}$ 的值下降的越快。

所以,如果沿着 x 轴的每个点都进行局部直线拟合,那么你会发现对于这个数据集合来说,局部加权的预测结果,能够最终跟踪这条非线性的曲线。

但局部加权回归也有其缺点:

  • 每次对一个点的预测都需要整个数据集的参与,样本量大且需要多点预测时效率低。提高效率的方法参考 Andrew More’s KD Tree
  • 不可外推,对样本所包含的区域外的点进行预测时效果不好,事实上这也是一般线性回归的弱点

对于线性回归算法,一旦拟合出适合训练数据的参数θ,保存这些参数θ,对于之后的预测,不需要再使用原始训练数据集,所以是参数学习算法。

对于局部加权线性回归算法,每次进行预测都需要全部的训练数据(每次进行的预测得到不同的参数θ),没有固定的参数θ,所以是非参数算法(non-parametric algorithm)。

Reference

http://www.holehouse.org/mlclass/04_Linear_Regression_with_multiple_variables.html

引言

本系列的课程来源是 斯坦福大学公开课 CS229: 机器学习课程,也可以看网易公开课的资源,是带字幕的。斯坦福的 CS229 课程相比于 Course 上的 Machine Learning 课程,理论更强,讲解的也更深入,需要有一些的高数基础。两个课程都看了前半部分,更推荐前者,所以相关笔记对应的都是 CS229 课程。

线性回归的定义

适用于监督学习,根据已有的数据集合(x, y),来推断出将来的数据趋势。

单变量线性回归

最后的函数应该是 y = ax + b,假设 hypothesis 为:

则问题转化为求 $\theta_{0}$ 和 $\theta_{1}$ 的值。要求这两个值需要转化上式,并根据已有的数据来求解。下面介绍损失函数,又叫代价函数的概念。

Cost Function

针对每一组数据,公式的值是 $h_{\theta}$($x_{i}$), 实际的值是 $y_{i}$,我们要达到的效果则是公式能够尽量表达已有的 m 组数据集合,即 $( h_{\theta}(x^{(i)}) - y_{i})^{2}$ 的值尽量小。
所以,对于所有数据集合,需要求使得

我们需要最小化这个 Cost Function。

Cost Function 的作用

假设 $\theta_0$ = 0,则有 $\theta_1$ 和 J($\theta_1$) 的关系,且图形如下:

所以当 $\theta_1$ = 1 时,

很容易看出,$J(\theta_1)$ 是关于 $\theta_1$ 的一元二次方程,对于所有的训练数据,每个 $\theta_1$ 的取值都会得到一个 $J(\theta_1)$ 值,而 $J(\theta_1)$ 和 $\theta_1$ 的对应关系根据一元二次方程可知,函数曲线如上图。
当 $J(\theta_1)$ 最小时,求得 $\theta_1$ 结果。

当 $\theta_0$ 和 $\theta_1$ 都不为 0 时,J($\theta_0$, $\theta_1$) 的图形如下:

对于两个系数的情况不如一个系数是一个二维坐标系的抛物线那么简单。下面将介绍梯度下降法。

梯度下降法

  • Start with initial guesses
  • Start at 0,0 (or any other value)
  • Keeping changing $\theta_0$ and $\theta_1$ a little bit to try and reduce J($\theta_0$, $\theta_1$)
  • Each time you change the parameters, you select the gradient which reduces J($\theta_0$, $\theta_1$) the most possible
  • Repeat
  • Do so until you converge to a local minimum
    Has an interesting property
    • Where you start can determine which minimum you end up
    • Here we can see one initialization point led to one local minimum
    • The other led to a different one

具体的计算过程

(for j = 0 and j = 1)

Notation

$\alpha$

  • Is a number called the learning rate
  • Controls how big a step you take
    • If α is big have an aggressive gradient descent
    • If α is small take tiny steps
  • Too small
    • Take baby steps
    • Takes too long
  • Too large
    • Can overshoot the minimum and fail to converge

Computer

每次都是同时计算 $\theta_0, \theta_1$ 的值,如下:

利用梯度下降法求解线性回归问题

对于 j = 0 or 1 的情况有:
j = 0:

j = 1:

梯度下降法的证明

1、如果优化函数存在解析解。例如我们求最值一般是对优化函数求导,找到导数为0的点。如果代价函数能简单求导,并且求导后为0的式子存在解析解,那么我们就可以直接得到最优的参数。

2、如果式子很难求导,例如函数里面存在隐含的变量或者变量相互间存在耦合,互相依赖的情况。或者求导后式子得不到解释解,或者未知参数的个数大于方程组的个数等。这时候使用迭代算法来一步一步找到最优解。

  • 当目标函数是凸函数时,梯度下降法的解是全局最优解
  • 一般情况下,其解不保证是全局最优解

凸函数

凸函数就是一个定义在某个向量空间的凸子集C(区间)上的实值函数 f,而且对于凸子集C中任意两个向量 $x_1$, $x_2$ 有:

于是容易得出对于任意(0,1)中有理数 p,有:

如果 f 连续,那么 p 可以改成任意(0,1)中实数。则 f 称为 I 上的凸函数,当且仅当其上境图(在函数图像上方的点集)为一个凸集。

梯度下降法的使用

我们首先在函数上任选一点,计算其损失(即我们上面的L(w)) ,然后按照某一规则寻找更低的一点计算新的损失,只要新损失更小(最小化问题),我们就继续下降,直到达到一个可接受的优化目标。
梯度下降方法分为两个部分,第一部分是整体上,我们使用某步长不断下降求损失函数,第二部分是为了防止步长太长导致最后无法收敛,每次当损失上升的时候都调整步长。
通常实践中使用时,都是用一些开源算法,很少需要深度改进,比如使用 libsvm 可以直接求解逻辑回归。

Reference

http://www.cnblogs.com/yysblog/p/3268508.html
http://52opencourse.com/125/coursera%E5%85%AC%E5%BC%80%E8%AF%BE%E7%AC%94%E8%AE%B0-%E6%96%AF%E5%9D%A6%E7%A6%8F%E5%A4%A7%E5%AD%A6%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E7%AC%AC%E5%85%AD%E8%AF%BE-%E9%80%BB%E8%BE%91%E5%9B%9E%E5%BD%92-logistic-regression
http://www.cnblogs.com/chaoren399/p/4851658.html

要解决的问题

json 反序列化 bean 时,当某个字段在 json 中为 null 时,使用 bean 中声明的默认值。

Person 类我们改造下:

1
2
3
4
5
public class Person {
private String name;
// Address is a enum: {CH, US, GZ}
private Region region = Region.GZ;
}

仍然以 Person 类举例,如果 json 串是:

1
{"name":"robert", "region":null}

希望反序列化后的 bean 为

1
Person(name="robert", region=Region.GZ)

解决过程

在上一篇文章 lombok 的 AllArgs 导致 Jackson 反序列化丢失字段默认值 中可以看到 json 反序列化为 bean 的过程,一般情况下,是先调用默认构造函数生成 bean,然后根据 json 中出现的字段挨个赋值。
所以反序列化生成的 bean 的 region 肯定为 null。

解决方案

@JsonInclude(Include.NON_NULL) 可行吗?

不可行,这个注解是序列化时忽略 null 值,反序列化时不生效,基本上反序列化时我们不能做什么事情。

JsonCreator 可行吗?

在 Region 枚举里写 JsonCreator:

1
2
3
4
5
6
7
8
9
10
@JsonCreator
public static Region getRegion(String value) {
for (Region region : Region.values()) {
if (region.name().equals(value)) {
return region;
}
}

return Region.GZ;
}

直接将 {"region": null} 反序列化为 Region 是可行的,会调用 JsonCreator,但是如果是反序列化 Person 则不会调用到 JsonCreator,为什么呢?

debug 过程:
如前文所述,会调用到 com.fasterxml.jackson.databind.deser.BeanDeserializer#deserialize 这个函数中,然后会调用到
com.fasterxml.jackson.databind.deser.SettableBeanProperty#deserialize,这个函数的实现是:

1
2
3
4
5
6
7
8
9
10
11
public final Object deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonToken t = p.getCurrentToken();

if (t == JsonToken.VALUE_NULL) {
return _valueDeserializer.getNullValue(ctxt);
}
if (_valueTypeDeserializer != null) {
return _valueDeserializer.deserializeWithType(p, ctxt, _valueTypeDeserializer);
}
return _valueDeserializer.deserialize(p, ctxt);
}

所以在这里会把 null 值拦住,直接返回 getNullValue 的结果。

自定义 deserializer

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RegionDeserializer extends JsonDeserializer<Region> {
@Override
public Region deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
JsonNode node = jsonParser.getCodec().readTree(jsonParser);
Region region = Region.GZ;

try {
if (StringUtils.isNotEmpty(node.textValue())) {
return Region.getRegion(node.textValue());
}
} catch (Exception e) {
type = Region.GZ;
}

return region;
}

@Override
public Region getNullValue(DeserializationContext ctxt) {
return Region.GZ;
}
}

Person 类改为:

1
2
3
4
5
6
7
public class Person {
private String name;

// Address is a enum: {CH, US, GZ}
@JsonDeserialize(using = RegionDeserializer.class)
private Region region = Region.GZ;
}

这样,在com.fasterxml.jackson.databind.deser.SettableBeanProperty#deserialize这个方法里,碰到 null 值,就会返回 getNullValue 的结果,即 Region.GZ,如果不是 null 会进入 getRegion 函数处理,也能处理其他情况。

要解决的问题

希望在反序列化 json 到 bean 时,对于 json 中未出现的字段,在 bean 中赋上默认值。

例如
Person 类如下:

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Person {
private String name;
private String address = "beijing"; // default value if json missing the age field
}

json:

1
{"name":"robert"}

反序列化后的 bean 为

1
Person(name="robert", address="beijing")

但实际上,发序列化的结果为

1
Person(name="robert", address=null)

解决过程

查看 maven 版本

项目中 jackson 的配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- Jackson dependency versions -->
<jackson.version>2.6.5</jackson.version>

配置升到最新后问题仍然存在。

debug json 反序列化过程,找到原因

json 反序列化是从

1
com.fasterxml.jackson.databind.ObjectMapper#_readMapAndClose

这个方法调用开始的,里面的一段代码为:

1
2
3
4
5
6
7
8
9
DeserializationConfig cfg = getDeserializationConfig();
DeserializationContext ctxt = createDeserializationContext(jp, cfg);
JsonDeserializer<Object> deser = _findRootDeserializer(ctxt, valueType);
if (cfg.useRootWrapping()) {
result = _unwrapAndDeserialize(jp, ctxt, cfg, valueType, deser);
} else {
result = deser.deserialize(jp, ctxt);
}
ctxt.checkUnresolvedObjectId();

在第 3 行找到的 JsonDeserializer 是 com.fasterxml.jackson.databind.deser.BeanDeserializer
从第 7 行代表进入 com.fasterxml.jackson.databind.deser.BeanDeserializer#deserialize(com.fasterxml.jackson.core.JsonParser, com.fasterxml.jackson.databind.DeserializationContext)

函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Object deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
// common case first
if (p.isExpectedStartObjectToken()) {
if (_vanillaProcessing) {
return vanillaDeserialize(p, ctxt, p.nextToken());
}
p.nextToken();
if (_objectIdReader != null) {
return deserializeWithObjectId(p, ctxt);
}
return deserializeFromObject(p, ctxt);
}
JsonToken t = p.getCurrentToken();
return _deserializeOther(p, ctxt, t);
}

vanillaDeserialize 为 false,最后走到了第 11 行,最后到了

1
com.fasterxml.jackson.databind.deser.BeanDeserializer#_deserializeUsingPropertyBased

然后到

1
com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator#build

在这个函数里有这样一段代码:

1
Object bean = _valueInstantiator.createFromObjectWith(ctxt, buffer.getParameters(_allProperties));

调用的是

1
com.fasterxml.jackson.databind.deser.ValueInstantiator#createFromObjectWith(com.fasterxml.jackson.databind.DeserializationContext, java.lang.Object[])

可以发现,createFromObjectWith 的第二个参数是数组,json 解出来的字段都放在了这个数组里。然后调用了 Person 类的全参构造函数,对于
缺失的字段自动补 null 值,这样就导致了 address 字段为 null。

解决方案

去掉 @AllArgsConstructor 时,没有问题了,因为此时找到的 com.fasterxml.jackson.databind.deser.BeanDeserializer 的 vanillaDeserialize 字段为 true,会调用 vanillaDeserialize(p, ctxt, p.nextToken());,这个函数的实现非常明确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final Object vanillaDeserialize(JsonParser p, DeserializationContext ctxt, JsonToken t) throws IOException {
final Object bean = _valueInstantiator.createUsingDefault(ctxt);
// [databind#631]: Assign current value, to be accessible by custom serializers
p.setCurrentValue(bean);
if (p.hasTokenId(JsonTokenId.ID_FIELD_NAME)) {
String propName = p.getCurrentName();
do {
p.nextToken();
SettableBeanProperty prop = _beanProperties.find(propName);

if (prop != null) { // normal case
try {
prop.deserializeAndSet(p, ctxt, bean);
} catch (Exception e) {
wrapAndThrow(e, bean, propName, ctxt);
}
continue;
}
handleUnknownVanilla(p, ctxt, bean, propName);
} while ((propName = p.nextFieldName()) != null);
}
return bean;
}

先用默认构造函数生成 bean,此时的 bean 是有默认值的,然后将 json 中出现的字段的值赋值给 bean,这样 address 就有值了。

根本原因

看上去是声明了全参构造函数导致的,所以想尝试自己写全参构造函数,在 address 为 null 时给其赋默认值。
写完如下:

1
2
3
4
5
6
7
public Person(String name, String address){
this.name = name;
this.address = address;
if(this.address == null){
this.address = "beijing";
}
}

继续走刚才 debug 的流程,发现居然没有请求这个全参构造函数。

那问题就是 @AllArgsConstructor 生成的全参函数有不同之处,jackson 能够识别出来并用于反序列化。查看 jar 包中 Person 类的代码发现其全参构造函数如下:

1
2
3
4
5
@ConstructorProperties({"name", "address"})
public Person(String name, String address){
this.name = name;
this.address = address;
}

所以,区别就是 @ConstructorProperties({"name", "address"}) 这个注解,这个注解的作用是指定构造函数参数的名字,Spring 可根据参数的名字注入 bean。

(补充自 liwei)
jackson 调用了全参构造函数的原因在于@AllArgsConstructor 的构造函数有ConstructorProperties ,jackson在选择构造函数的时候会调用BasicDeserializerFactory._addDeserialzerContructors方法,他首先选择无参构造函数,并遍历所有的构造函数,如果存在具有@ConstructProperties注解的构造函数,则把该构造函数作为默认创建bean的构造函数,如下:

可以通过设置 @AllArgsConstructor(suppressConstructorProperties=true) 来禁用 @ConstructorProperties.

结论

Lombok 的 @AllArgsConstructor 注解导致 Jackson 反序列化时调用了全参构造函数,将没有出现的字段都赋值为 null 了。

修改方式:

  1. 不使用 @AllArgsConstructor
  2. 使用 @AllArgsConstructor 但是不让其在全参构造函数上加入 ConstructorProperties 注解,声明方式改为 @AllArgsConstructor(suppressConstructorProperties = true)