说明: kafka 版本号为 0.11.0

Consumer 拉取消息的实现

在 Kafka Consumer 正常消费时,观察其调用堆栈。

1
2
3
4
5
6
7
8
9
10
11
"pool-16-thread-7" #154 prio=5 os_prio=0 tid=0x00007ff581c8c000 nid=0x326d runnable [0x00007ff5468e7000]
java.lang.Thread.State: RUNNABLE
...
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2e04f90> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
...

对应的代码实现是 org.apache.kafka.clients.consumer.KafkaConsumer#poll,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public ConsumerRecords<K, V> poll(long timeout) {
...
try {
...
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}

其中 org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
...
// ConsumerCoordinator coordinator;
coordinator.poll(time.milliseconds(), timeout);
...
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
...
return fetcher.fetchedRecords();
}

所以可以看到 consumer 每次 poll 时是先从 fetcher 中 fetchedRecords 的,如果拿不到结果,就新发起一个 sendFetches 请求。

Consumer 拉取消息的数量

org.apache.kafka.clients.consumer.internals.Fetcher#fetchedRecords 可以看到 maxPollRecords(max.poll.records 配置) 变量限制了每次 poll 的消息条数,不管 consumer 对应多少个 partition,从所有 partition 拉取到的消息条数总和不会超过 maxPollRecords

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches 可以看到 fetchSize(max.partition.fetch.bytes 配置) 用于每次创建 FetchRequest 时的 org.apache.kafka.common.requests.FetchRequest.PartitionData 的参数设置。fetchSize限制了 consumer 每次从每个 partition 拉取的数据量。
不过,还是看代码中的 ConsumerConfig#MAX_PARTITION_FETCH_BYTES_DOC 说明吧:

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See “ + FETCH_MAX_BYTES_CONFIG + “ for limiting the consumer request size.

poll 和 fetch 的关系

在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

Consumer 的心跳机制

org.apache.kafka.clients.consumer.internals.AbstractCoordinat 中启动 HeartbeatThread 线程来定时发送心跳和检查 consumer 的状态。
每个 Consumer 都有一个 ConsumerCoordinator(继承 AbstractCoordinator),每个 ConsumerCoordinator 都启动一个 HeartbeatThread 线程来维护心跳,心跳信息存放在 org.apache.kafka.clients.consumer.internals.Heartbeat

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}
public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

sessionTimeout

如果是 sessionTimeout 则 Mark the current coordinator as dead,此时 会将 consumer 踢掉,重新分配 partition 和 consumer 的对应关系。

在 Kafka Server 端,Consumer 的 Group 定义了五个状态::
Consumer Group State

pollTimeout

如果是 pollTimeout 则 Reset the generation and memberId because we have fallen out of the group,此时 consumer 会退出 group,当再次 poll 时又会 rejoin group 触发 rebalance group。

Rebalance Generation

表示 rebalance 之后的一届成员,主要是用于保护 consumer group,隔离无效 offset 提交。每次 group 进行 rebalance 之后,generation 号都会加 1,表示 group 进入到了一个新的版本,下图所示为 consumer 2 退出后 consumer 4 加入时 Rebalance Generation 的过程:
Rebalance Generation

partition 的数量设置

  • 一个 partition 只能被 Consumer Group 中的一个 consumer 消费,因此,为了提高并发量,可以提高 partition 的数量,但是这会造成 replica 副本拷贝的网络请求增加,故障恢复时的耗时增加。因为 kafka 使用 batch pull 的方式,所以单个线程的消费速率还是有保障的。并且 partition 数量过多,zk 维护 ISR 列表负载较重。

  • partiton 数量最好是 consumer 数目的整数倍,比如取 24, consumer 数目的设置就会灵活很多。

  • consumer 消费消息时不时严格有序的。当从多个 partition 读数据时,kafka 只保证在一个 partition 上数据是有序的,多个 partition 的消息消费很可能就不是严格有序的了。

参数设置

heartbeat.interval.ms

心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。
这个值必须设置的小于 session.timeout.ms,因为:
当 consumer 由于某种原因不能发 heartbeat 到 coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。

参数值

默认值:3000 (3s),通常设置的值要低于session.timeout.ms的1/3。

session.timeout.ms

consumer session 过期时间。如果超时时间范围内,没有收到消费者的心跳,broker 会把这个消费者置为失效,并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳,更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容,尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间,

参数值

默认值:10000 (10s),这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。

max.poll.interval.ms

This config sets the maximum delay between client calls to poll().

When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request.

As soon as the consumer resumes processing with another call to poll(), the consumer will rejoin the group.

By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned frompoll(long). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.

参数设置大一点可以增加两次 poll 之间处理消息的时间。
当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。

但如果设置的太大,会延迟 group rebalance,因为消费者只会在调用 poll 时加入rebalance。

max.poll.records

Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

0.11.0 Kafka 的默认配置是

  • max.poll.interval.ms=5min
  • max.poll.records=500

即平均 600ms 要处理完一条消息,如果消息的消费时间高于 600ms,则一定要调整 max.poll.records 或 max.poll.interval.ms。

Kafka Javadoc - Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
It is also possible that the consumer could encounter a “livelock” situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don’t call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a CommitFailedException thrown from a call to commitSync()). This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

Reference

Kafka消费组(consumer group)
kafka.apache.org javadoc
Coordinator实现原理
kafka params
kafka源码分析之kafka的consumer的负载均衡管理
Group Management Protocol
Kafka 之 Group 状态变化分析及 Rebalance 过程
KIP-62: Allow consumer to send heartbeats from a background thread
Kafka: The Definitive Guide Chapter 4 - Kafka Consumers

运行环境说明

kafka 版本号为 0.11.0

Kafka Consumer 的参数配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Object> getDefaultConsumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// 手动设置自动提交为false,交由 spring-kafka 启动的invoker执行提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 从partition中获取消息最大大小
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "102400");
return propsMap;
}

Consumer 卡顿现象

Consumer 卡顿时的日志

每次卡顿不消费时都出现以下日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11429299 for partition my_topic-27 returned fetch data (error=NONE, highWaterMark=11429299, lastStableOffset = -1, logStartOffset = 10299493, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_UNCOMMITTED fetch request for partition my_topic-27 at offset 11429299 to node p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_UNCOMMITTED fetch for partitions [my_topic-27] to broker p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG kafka-coordinator-heartbeat-thread | myConsumerGroup org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending Heartbeat request for group myConsumerGroup to coordinator p-kafka-host-02:9092 (id: 2147483642 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-21] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]
...
2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11426689 for partition my_topic-21 returned fetch data (error=NONE, highWaterMark=11426689, lastStableOffset = -1, logStartOffset = 10552294, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group myConsumerGroup committed offset 11429849 for partition my_topic-18
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=myConsumerGroup, sessionTimeout=30000, rebalanceTimeout=300000, memberId=p-my-consumer-host-03-12-97c12fb0-9bb7-4762-8478-538f06be9e90, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@54371fac)) to coordinator p-kafka-02.ali.keep:9092 (id: 2147483642 rack: null)

其中最重要的部分是:

2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]

2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup

那为什么每次会这样呢?我们是有单独的线程在发起心跳的!!!

Consumer 卡顿时的 jstack

观察日志可以发现,卡顿时 ConsumerCoordinator 在不停地 rejoin group,并且做 rebalance,所以需要对比在正常和卡顿这两种情况下 ConsumerCoordinator 的行为。

正常时的 ConsumerCoordinator

1
2
3
cat jstack.normal.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
22 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

卡顿时的 ConsumerCoordinator

1
2
3
4
5
cat jstack.pause.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:920)
8 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

根据以上的现场信息,可以发现关键就在 AbstractCoordinator.ensureActiveGroup 这一步,继续观察 jstack.pause.log 中的相关堆栈信息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"pool-16-thread-14" #167 prio=5 os_prio=0 tid=0x00007f5b19dbf000 nid=0x7ac2 runnable [0x00007f5ae4ccb000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c2e816b0> (a sun.nio.ch.Util$2)
- locked <0x00000000c2e816a0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c2e742a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:529)
at org.apache.kafka.common.network.Selector.poll(Selector.java:321)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2f00da0> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

卡顿原因分析

卡顿原因:Consumer 在 Region Group

根据以上信息,结合 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 的代码可以发现在
ConsumerCoordinator#poll 中判断 needRejoin() 为 true 时会调用 ensureActiveGroup() 函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void poll(long now, long remainingMs) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
...
if (needRejoin()) {
...
ensureActiveGroup();
...
}
} else {
...
}
}
pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}

Region Group 原因:Consumer Leave Group

那么问题就是什么情况下 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#needRejoin 会返回 true,我们还是看看他的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;
// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;
return super.needRejoin();
}

kafka metadata 什么时候变化????

可以看到,不是 metadataSnapshot 有变化,也不是 订阅者 subscriptions 有变化,那就是 super.needRejoin() 返回了 true,问题就转到了 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#needRejoin 这个函数,其实现是:

1
2
3
protected synchronized boolean needRejoin() {
return rejoinNeeded;
}

从代码上看 rejoinNeeded 的整个变化过程,初始化为 true,在 initiateJoinGroup 成功后,会赋值为 false,在 maybeLeaveGroup 时会赋值为 true,所以怀疑卡顿时是 consumer leave group 了。

Consumer Leave Group 原因:pollTimeoutExpired

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread#run 中调用了 maybeLeaveGroup() 函数,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}
public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

所以是 pollTimeoutExpired 引起了 leave group.

根本原因:pollTimeoutExpired

pollTimeoutExpired 的原因是两次 poll 的时间间隔超过了设置的 maxPollInterval 值。

解决方案

调整以下参数

  • max.poll.records:100 (默认值 500)
  • max.poll.interval.ms:600000 (默认值 300000,也就是5分钟)

后续

至此,问题已经解决了,但是有一些疑问。

  • 对于这两个参数值的设定, 是 max.poll.records 越小越好,max.poll.interval.ms 越大越好吗?
  • 已经设置过的 session.timeout.msheartbeat.interval.ms难道没用吗?为什么有这么多超时参数的设置啊?
  • 已经设置过的 max.partition.fetch.bytes 没用吗?为什么还要设置 max.poll.records 啊?
  • 整体上还需要调哪些参数才可以让 consumer 运行正常,或者是性能达到最大呢?

在下一篇博客「Kafka Consumer 的实现」中,将会继续分析 Kafka Consumer 的消费过程和参数配置,试图回答以上问题。

1. 指数分布族(Exponential Family)

指数分布族的定义

若一类概率分布可以写成如下形式,那么它就属于指数分布族:
$$P(y;\eta) = b(y)exp(\eta^TT(y)-a(\eta))$$

  • $\eta$: 自然参数,通常是一个实数
  • T(y): 充分统计量,通常,T(y)=y,实际上是一个概率分布的充分统计量(统计学知识)
  • a($\eta$) 被称为 log partition function

对于给定的 a,b,T 三个函数,上式定义了一个以 $\eta$ 为参数的概率分布集合,即改变 $\eta$ 可以得到不同的概率分布,例如高斯分布和伯努利分布。

指数分布族以及它们的特征

  • 正态分布(高斯分布)——总体噪音(由中心极限定理得)
  • 伯努利分布——逻辑回归(对01问题建模)
  • 多项式分布——K种结果的事情进行建模
  • 泊松分布——对计数过程进行建模(一个样本中放射性衰变的数目,网站的访客数目,商店的顾客数目)
  • 伽马分布,指数分布——正数的分布,对间隔进行建模(在公交车站等车的时间)
  • β分布,Dirichlet分布——对小数进行分布,对概率分布进行建模
  • Wishart分布——协方差的分布

2. 指数分布簇推导

高斯分布(Gaussian)和伯努利(Bernoulli)分布都可以推导为指数分布族。

2.1 伯努利分布的推导

伯努利分布的概率公式为:$P(y=1;\phi)=\phi; P(y=0;\phi)=1-\phi;$

公式可经如下变换:
$$P(y;\phi)=\phi^y(1-\phi)^y$$
$$=exp(log(\phi^y(1-\phi)^y))=exp(ylog(\phi)+ (1-y)log(1-\phi))$$
$$=exp(log(\frac\phi{1-\phi})y + log(1-\phi))$$

对应的指数分布族的参数为:
$T(y) = y$
$b(y) = 1$
$\eta = log(\frac\phi{1-\phi}) => \phi=\frac1{1+e^{-n}}$
$a(\eta) = -log(1-\phi) = log(1+e^n)$

2.2 高斯分布的推导

在线性回归中,$\sigma$ 对于模型参数 $\theta$ 的选择没有影响,为了推导方便我们令 $\sigma = 1$。
则有:
$$P(y;\mu)=\frac{1}{\sqrt{2\pi}}exp(-\frac12(y-\mu)^2)$$
$$=\frac{1}{\sqrt{2\pi}}exp(-\frac{1}{2}y^2) * exp({\mu}y-\frac{1}{2}\mu^2)$$

对应的指数分布族的参数为:
$T(y) = y$
$b(y) = \frac{1}{\sqrt{2\pi}}exp(-\frac12y^2)$
$\eta = \mu$
$a(\eta) = \frac{{\mu}^2}2 = \frac{{\eta}^2}2$

3. 广义线性模型(Generalized Linear Model)

想用 广义线性模型对一般问题进行建模首先需要明确几个 假设:

  1. $y | x;θ \sim ExponentialFamily(\eta)$ y的条件概率属于指数分布族;
  2. 给定 x 广义线性模型的目标是求解 T(y) | x, 不过由于 很多情况下 T(y) = y 所以我们的目标变成了 y | x , 也即 我们希望拟合函数为 h(x) = E[y|x] (这个条件在线性回归和逻辑回归中都满足, 例如在逻辑回归中 $hθ(x) = p(y = 1|x;\theta) = 0 \cdot p(y = 0|x; \theta) + 1 \cdot p(y = 1|x; \theta) = E[y|x;\theta])$
  3. 自然参数 $\eta$ 与 x 是线性关系:$\eta=\theta^Tx$ ($\eta 为向量时 \eta_{i} = \theta_{i}^Tx$)

有了如上假设,就可以进行建模和求解了。

对于伯努利分布,可以推导出:

这也就是逻辑回归中 sigmod 函数的由来。

4. 多分类算法(Softmax Regression)

y有多个可能的分类:{1, 2, …, k}

=======具体的公式略=======

最后求借寻找最佳参数时,跟最小二乘和逻辑回归的解法类似,可以用梯度下降法或者牛顿迭代法。

Referecen

广义线性模型(Generalized Linear Model)

1. 逻辑回归

对于逻辑回归而言,y 的取值不是 0 就是 1,所以 $hθ(x)$ 可以写为
$$h
θ(x) = g(θ^{T}x)=\frac1{1+e^{-θ^{T}x}}$$

其中
$$g(z)=\frac1{1+e^{-z}}$$;

g(z) 被称为 logistic function 或 sigmoid function,其二维坐标下的曲线为:
sigmoid function

我们先取 g(z) 为 sigmoid function,如果有其他使得 y 值从 0 到 1 平滑递增的函数也可以使用。但由于一些列原因(在后续的一般化回归模型 GLM 中会谈到为什么选用这个函数),g(z) is a fairly natural one.

g(z) 的导数我们可以先进行推导:
$$g’(z)=\frac{d}{dz}\frac{1}{1+e^{-z}}= \frac{1}{(1+e^{-z})^2}(e^{-z})$$
$$= \frac{1}{1+e^{-z}}*(1 - \frac{1}{1+e^{-z}})= g(z)(1-g(z))$$

2. 梯度上升法求解逻辑回归

对于给定的逻辑回归函数,我们使用最小二乘法来推导出最大似然估计,假设:
$P(y=1|x;θ)=h_θ(x)$,代表对于给定的 θ,y 取值为 1 的概率。
$P(y=0|x;θ)=1-h_θ(x)$,代表对于给定的 θ,y 取值为 0 的概率。

以上两者可以合并为:
$$P(y|x;θ)=(h_θ(x))^y(1 − h_θ(x))^{(1−y)}$$

假设 m 个训练集是相互独立的,则似然估计为:
$$L(θ)=P(\overrightarrow{y}|X;θ)$$
$$= \prod^m_{i=1}P(y^i|x^i;θ)$$
$$= \prod^m_{i=1}{(h_θ(x^{(i)}))^{y^{(i)}}(1 − h_θ(x^{(i)}))^{(1−y^{(i)})}}$$

和之前一样,上式可以简化为:


$l(θ) = logL(θ)
= \sum_{m}^{i=1}{y^{(i)}}log{h(x^{(i)}) + {(1−y^{(i)})}log(1 − h(x^{(i)}))}$

那么,
如何去最大化似然函数呢,可以应用梯度上升法,因为我们要使 P 的取值足够大,也是就预测准确的概率最够大。

随机梯度上升的公式为:
$$θ:= θ + \alpha\Deltaθl(θ)$$

下面来求$\Deltaθl(θ)$的取值:

$$\frac\partial{\partial\theta_j}l(\theta)$$
$$= (y\frac1{g(\theta^Tx)} - (1-y)\frac1{1-g(\theta^Tx)})\frac\partial{\partial\theta_j}g(\theta^Tx)$$
$$= (y\frac1{g(\theta^Tx)} - (1-y)\frac1{1-g(\theta^Tx)}) g(\theta^Tx)(1-g(\theta^Tx))\frac\partial{\partial\theta_j}\theta^Tx$$
$$= ({y(1-g(\theta^Tx))-(1-y)g(\theta^Tx)})x_j$$
$$= (y - h_{\theta}(x))x_j$$

附上手写的推导过程:
手写推导过程

所以,最终随机梯度上升的公式为:
$$θ_j:=θ_j + \alpha\sum_{i=1}^{m}(y^{(i)} - h_{\theta}(x^{(i))})x_j^{(i)}$$

如何和线性回归的公式放在一起比较,

$$θ_j = θ_j - α \frac1m \sum_{i=1}^{m}{(h_θ(x^{(i)}) - y^{(i)})}x_j^{(i)}$$

会发现,这两者非常相似,实际上却不然,因为这里的 $(h_θ(x^{(i)})$ 定义的不是线性函数。后续我们谈到 GLM 时会发现这并不是巧合,而是有更深层次的原因。

3. 牛顿迭代法求解逻辑回归

牛顿迭代法可以利用到曲线本身的信息,比梯度下降法更容易收敛,即迭代更少次数。

3.1 牛顿迭代法简述

假设我们要求解方程 f(x)=0 的根,首先随便找一个初始值 x0,如果 x0 不是解,做一个经过 (x0,f(x0)) 这个点的切线,与 x 轴的交点为 x1。同样的道理,如果 x1 不是解,做一个经过 (x1,f(x1)) 这个点的切线,与 x 轴的交点为 x2。 以此类推。以这样的方式得到的 xi 会无限趋近于 f(x)=0 的解。

对于任意一点 $(x_n,y_n)$ 做切线,切线的斜率为 $f’(x_n)$,则有方程:
$$ y-f(x_n) = f’(x_n)(x-x_n) $$

3.2 迭代过程

求解 $f(\theta)$ = 0 时 $\theta$ 的取值。
设下一次迭代时 $\theta^{(t+1)}$ 的取值与前一次迭代 $\theta^{(t)}$ 的取值(在 x 轴)距离为 $\Delta$。

则 $\theta^{(t+1)} = \theta^{(t)} - \Delta$,且 $\Delta = \frac{f(\theta^{(t)})}{f’(\theta^{(t)})}$,
所以有:
$$\theta^{(t+1)} = \theta^{(t)} - \frac{f(\theta^{(t)})}{f’(\theta^{(t)})}$$

从泰勒展开到牛顿迭代

也可以由泰勒展开中推导牛顿迭代的公式。这次为了求解方程 f′=0 的根,把原函数 f(x) 的做泰勒展开,展开到二阶形式:
$$ f(x+\Delta x) = f(x)+f’(x)\Delta x+ \frac1{2}f’’(x)\Delta x^2 $$

当且仅当 $\Delta x$ 逼近 0 时,上式成立,此时忽略 1/2 系数的作用,所以有:
$$ f’(x)+ \frac1{2}f’’(x)\Delta x = 0 $$
故:
$$\Delta x = -\frac{f’(x)}{f’’(x)} $$

对函数求极大值的方法
>

  1. 将原函数y=f(x),对x求一次导数,得到dy/dx;
  2. 令dy/dx = 0,解得一次导函数的零点;
  3. 将原函数对x求二次导函数;
  4. 将解得的零点坐标的x值代入二次导函数,
    如果是正值,零点所在位置,就是极小值点,再将该x值代入原函数,得到极小值;
    如果是值值,零点所在位置,就是极大值点,再将该x值代入原函数,得到极大值;
    如果是0,零点所在位置,既不是极小值点,也不是极大值点,是拐点。

所以求 $l(\theta)$ 在极大值处 $\theta$ 的取值,则是求 $l’(\theta) = 0$ 时 $\theta$ 的值,应用牛顿迭代法则有:

$$\theta^{(t+1)} = \theta^{(t)} - \frac{l’(\theta^{(t)})}{l’’(\theta^{(t)})}$$

3.3 多维向量的牛顿迭代

对于多维向量 $\overrightarrow{X}$ 求解。
$$\theta := \theta - H^{-1} \nabla l(\theta)$$
其中
$\nabla l(\theta)$ 是对 $l(\theta)$ 求导的值。

H 是一个 n*n 的矩阵,n 是特征数量,元素的计算公式为:
$$H_ij= \frac{\partial^2{l({\theta)}}}{\partial{\theta_i}\partial{\theta_j}}$$

3.4 牛顿迭代法的特点

是否收敛

通常情况下是收敛的,但是需要满足一些条件,对于逻辑回归来讲,是收敛的。

迭代速度

每次迭代后,有解数字的误差是成平方倍减小的,是二次收敛函数。

优缺点

优点:收敛快
缺点:特征多(上千个)时,每次迭代成本大

Reference

http://blog.csdn.net/baimafujinji/article/details/51179381
http://blog.csdn.net/baimafujinji/article/details/51167852
如何通过牛顿方法解决Logistic回归问题

1. 欠拟合与过拟合

欠拟合:underfitting,与训练数据贴合的不够好,不能准确预测未来目标值。
过拟合:overfitting,与训练数据贴合的太好了,预测未来目标值的准确性有较大风险。

2. 线性模型的概率解释

思考:我们为什么要用最小二乘的指标作为 cost function?为什么不是绝对值或四次方?

最小二乘法(又称最小平方法)是一种数学优化技术。它通过最小化误差的平方和寻找数据的最佳函数匹配。
最小二乘是从函数形式上来看的,极大似然是从概率意义上来看的。事实上,最小二乘可以由高斯噪声假设+极大似然估计推导出来。当然极大似然估计还可以推导出其他的loss function, 比如logistic回归中,loss function是交叉熵。
最大似然估计与最小二乘估计的区别

一般的最小二乘法实际上是在假设误差项满足高斯分布且独立同分布的情况下,使似然性最大化。

推导过程

回到预测房价的例子,假设最终的预测函数,每一次预测都有误差,用$ε^{(i)}$表示误差,则预测函数可以写为:
$$y^{(i)}=\theta^Tx^{(i)} + ε^{(i)} $$

其中,误差是随机分布的,均值为 0,服从高斯分布 $N(0,σ^2)$。

Andrew Ng 讲到在大多数情况下,线性回归的误差值如果综合来看,就是符合高斯分布的。并且根据中心极限定律,正态分布确实是对误差项分布的合理猜想。

所以
$$P(y^{(i)}|x^{(i)}; θ) = \frac{1}{\sqrt{2\pi}\sigma}exp(- \frac{(y^{(i)}-\theta^Tx^{(i)})^2}{2\sigma^2})$$

$P(y^{(i)}|x^{(i)}; θ)$ 表示:在 θ 为给定的参数的情况下,概率 $y^{(i)}$ 以 $x^{(i)}$ 为随机变量的概率分布,注意 θ 不是随机变量。

由于 ε(i) 是独立的同分布(IID:independentlyidentically distribution),所以以 θ 为变量的似然函数为:
$$
L(θ)=L(θ;X,Y)=p(Y|X;θ) = \prod_{i=1}^{m}\frac{1}{\sqrt{2\pi}\sigma}exp(- \frac{(y^{(i)}-\theta^Tx^{(i)})^2}{2\sigma^2})
$$

对 L(θ) 取对数有:
$$
l(\theta)=logL(\theta)
= log\prod_{i=1}^{m}\frac{1}{\sqrt{2\pi}\sigma}exp(- \frac{(y^{(i)}-\theta^Tx^{(i)})^2}{2\sigma^2})
$$
$$
= m\sum_{i=1}^{m}log\frac{1}{\sqrt{2\pi}\sigma} - \frac1{2\sigma^2}\sum_{i=1}^{m}(y^{(i)}-\theta^Tx^{(i)})^2
$$

最大化 $l(\theta)$ 即是最小化 $\frac1{2\sigma^2}\sum_{i=1}^{m}(y^{(i)}-\theta^Tx^{(i)})^2$,这样就是 cost function.

由于目标变量服从正态分布,但分布的均值和方差都未知,对均值和方差两个参数的合理估计是选取两个参数使得在正态分布的前提下,抽到各样本中的 y 值的概率最大,这就是最大似然估计的思想。

Reference

http://www.holehouse.org/mlclass/07_Regularization.html
http://rstudio-pubs-static.s3.amazonaws.com/4810_06e3d8fd26ed40eb8c31aff35eae81ae.html
https://rpubs.com/badbye/ml03
http://www.qiujiawei.com/linear-algebra-15/
最大似然估计

1. 多变量的线性回归

n: 特征(features) 数量
m: 训练集数量
$x^{(i)}$:

  • 表示一条训练数据的向量
  • i is an index into the training set
  • So
    • x is an n-dimensional feature vector
    • $x^{(3)}$ is, for example, the 3rd training data

$x^{(j)}_i$: The value of feature j in the ith training example

例如,当 n=4 时:
$$h_θ(x) = θ_0 + θ_1x_1 + θ_2x_2 + θ_3x_3 + θ_4x_4$$

For convenience of notation, $x_0$ = 1, 所以最后的特征向量的维度是 n+1,从 0 开始,记为”X”,
则有:
$$h_θ(x)=θ^TX$$
$θ^T$: [1 * (n+1)] matrix

1.1 多变量的梯度下降

Cost Function

$$J(θ_0, θ_1, …,θ_n) = \frac1{2m}\sum_{i=1}^{m}{(h_θ(x^{(i)}) - y^{(i)})^2}$$

Gradient descent

Repeat {
$$ θ_j = θ_j - α\frac\partial{\partial J(θ_0, θ_1, …,θ_n)} $$
}

every iterator

  • θj = θj - learning rate (α) times the partial derivative of J(θ) with respect to θJ(…)
  • We do this through a simultaneous update of every θj value

$$ \frac\partial{\partial J(θ_0, θ_1, …,θ_n)} $$
$$ = \frac1m * \sum_{i=1}^{m}{(h_θ(x^{(i)}) - y^{(i)})}*x_j^{(i)} $$

2. Gradient Decent in practice

2.1 Feature Scaling

假设只有 $x_1$,$x_2$ 两个变量,其中:$x_1\in(0,2000), x_2\in(1,5)$,则最后的 J(θ) 图形是一个椭圆,在椭圆下用梯度下降法会比圆形要耗时更久,So we need to rescale this input so it’s more effective,有很多方式,一种是将各个 feature 除以其本身的最大值,缩小范围至[0,1],一种是各个 feature 减去 mean 然后除以最大值,缩小范围至[-0.5,0.5]

Learning Rate α

  • working correctly: If gradient descent is working then J(θ) should decrease after every iteration
  • convergence: 收敛是指每经过一次迭代,J(θ)的值都变化甚小。
  • choose α
    1. When to use a smaller α
      • If J(θ) is increasing, see below picture
      • If J(θ) looks like a series of waves, decreasing and increasing again
      • But if α is too small then rate is too slow
    2. Try a range of α values
      • Plot J(θ) vs number of iterations for each version of alpha
      • Go for roughly threefold increases: 0.001, 0.003, 0.01, 0.03. 0.1, 0.3

2.2 Features and polynomial regression

Can create new features

如何选择 features 和表达式尤为关键,例如房价与房子的长,房子的宽组成的表达式就会麻烦很多,若将房子的长乘以房子的宽得出面积,则有房价与房子面积的表达式,将会更容易拟合出房价的走势。

Polynomial regression

例如房价的走势,如下图,横坐标 x 为房子的面积,纵坐标为房价,使用一元二次的方程,会得出下图的蓝色曲线。容易得到房价今后会有一个下降的过程,可实际上房价是不会随着面积的增大而下降的。所以需要重新选定 Polynomial regression,可以改为使用一元三次的方程或者使用平凡根的方程。

所以选择合适的 Features 和 Polynomial regression 都非常重要。

3. Normal equation 求解多变量线性回归

3.1 Normal equation

举例说明,假设 J(θ) 是一元二次方程,如:J(θ)=a$θ^2$+bθ+c,则令 $$ \frac{d}{dθ}J(θ)=2aθ+b=0$$ 即可,求出最终的 θ 则得到了线性回归方程,可以预测出今后的 y 值。

更普遍地,当 θ 是一个 n+1 维的向量时,θ $\in$ $R^{n+1}$,则 cost function 如下:
$$ J(θ_0, θ_1, …,θ_n) = \frac1{2m}\sum_{i=1}^{m}{(h_θ(x^{(i)}) - y^{(i)})^2} $$
只需要令:
$$ \frac\partial{\partial θ_j}J(θ_0, θ_1, …,θ_n) = … = 0 $$,其中 j = 0,1,2,…,n
设 X 代表训练集的 features 的值的矩阵,y 代表训练集的结果的值的矩阵,假设训练集数量为 m, features 个数为 n, 则 X 为 (m*n) 的矩阵,y 为 (m*1) 的矩阵,可以推导出求 θ 向量的公式如下:
$$θ = (X^TX)^{-1}X^Ty$$

4. Gradient descent Vs Normal equation

Gradient descent

  • Need to chose learning rate
  • Needs many iterations - could make it slower
  • Works well even when n is massive (millions)
  • Better suited to big data
  • What is a big n though: 100 or even a 1000 is still (relativity) small, If n is 10000 then look at using gradient descent
  • 适用于线性回归会逻辑回归

Normal equation

  • No need to chose a learning rate
  • No need to iterate, check for convergence etc.
  • Normal equation needs to compute $(X^TX)^{-1}$
    • This is the inverse of an n x n matrix
    • With most implementations computing a matrix inverse grows by O(n3), So not great
  • Slow of n is large, Can be much slower
  • 仅适用于线性回归

5. 局部加权线性回归

局部加权回归(locally weighted regression)简称 loess,其思想是,针对对某训练数据的每一个点,选取这个点及其临近的一批点做线性回归;同时也需要考虑整个训练数据,考虑的原则是距离该区域越近的点贡献越大,反之则贡献越小,这也正说明局部的思想。其 cost function 为:
$$J(\theta) = \sum_{i=1}^{m} w^{(i)}( y^{(i)}-\theta^Tx^{(i)} )^2$$

其中
$$ w^{(i)} = exp (-\frac{(x^{(i)}-x)^2}{\tau^2})$$

$w^{(i)}$的形式跟正态分布很相似,但二者没有任何关系,仅仅只是便于计算。可以发现,$x^{(j)}$ 离 $x^{(i)}$ 非常近时,${w^{(i)}_j}$ 的值接近于1,此时 j 点的贡献很大,当 $x^{(j)}$ 离 $x^{(i)}$ 非常远时,${w^{(i)}_j}$ 的值接近于 0,此时 j 点的贡献很小。

$\tau^2$ 是波长函数(bandwidth), 控制权重随距离下降的速度,τ 越小则 x 离 $x^{(i)}$ 越远时 $w^{(i)}$ 的值下降的越快。

所以,如果沿着 x 轴的每个点都进行局部直线拟合,那么你会发现对于这个数据集合来说,局部加权的预测结果,能够最终跟踪这条非线性的曲线。

但局部加权回归也有其缺点:

  • 每次对一个点的预测都需要整个数据集的参与,样本量大且需要多点预测时效率低。提高效率的方法参考 Andrew More’s KD Tree
  • 不可外推,对样本所包含的区域外的点进行预测时效果不好,事实上这也是一般线性回归的弱点

对于线性回归算法,一旦拟合出适合训练数据的参数θ,保存这些参数θ,对于之后的预测,不需要再使用原始训练数据集,所以是参数学习算法。

对于局部加权线性回归算法,每次进行预测都需要全部的训练数据(每次进行的预测得到不同的参数θ),没有固定的参数θ,所以是非参数算法(non-parametric algorithm)。

Reference

http://www.holehouse.org/mlclass/04_Linear_Regression_with_multiple_variables.html

引言

本系列的课程来源是 斯坦福大学公开课 CS229: 机器学习课程,也可以看网易公开课的资源,是带字幕的。斯坦福的 CS229 课程相比于 Course 上的 Machine Learning 课程,理论更强,讲解的也更深入,需要有一些的高数基础。两个课程都看了前半部分,更推荐前者,所以相关笔记对应的都是 CS229 课程。

1. 线性回归的定义

适用于监督学习,根据已有的数据集合(x, y),来推断出将来的数据趋势。

2. 单变量线性回归

最后的函数应该是 y = ax + b,假设 hypothesis 为:

$h_{\theta}$(x) = $\theta_{0}$ + $\theta_{1}$

则问题转化为求 $\theta_{0}$ 和 $\theta_{1}$ 的值。要求这两个值需要转化上式,并根据已有的数据来求解。下面介绍损失函数,又叫代价函数的概念。

3. Cost Function

针对每一组数据,公式的值是 $h_{\theta}$($x_{i}$), 实际的值是 $y_{i}$,我们要达到的效果则是公式能够尽量表达已有的 m 组数据集合,即 $( h_{\theta}(x^{(i)}) - y_{i})^{2}$ 的值尽量小。
所以,对于所有数据集合,需要求使得
$$ \frac1{2m}\sum_{i=1}^{m}{(h_{\theta}(x^{(i)}) - y^{(i)})^2}$$ 最小的 $\theta$ 值。

上式又称为 Cost Function,可以写为:

$$ J(\theta_0, \theta_1) = \frac1{2m}\sum_{i=1}^{m}{(h_{\theta}(x^{(i)}) - y^{(i)})^2} $$

我们需要最小化这个 Cost Function。

Cost Function 的作用

假设 $\theta_0$ = 0,则有 $\theta_1$ 和 J($\theta_1$) 的关系,且图形如下:

所以当 $\theta_1$ = 1 时,
$$ J(\theta_1)= \frac1{2m}\sum_{i=1}^{m}{(\theta_1x^{(i)} - y^{(i)})^2} $$
很容易看出,$J(\theta_1)$ 是关于 $\theta_1$ 的一元二次方程,对于所有的训练数据,每个 $\theta_1$ 的取值都会得到一个 $J(\theta_1)$ 值,而 $J(\theta_1)$ 和 $\theta_1$ 的对应关系根据一元二次方程可知,函数曲线如上图。
当 $J(\theta_1)$ 最小时,求得 $\theta_1$ 结果。

当 $\theta_0$ 和 $\theta_1$ 都不为 0 时,J($\theta_0$, $\theta_1$) 的图形如下:

对于两个系数的情况不如一个系数是一个二维坐标系的抛物线那么简单。下面将介绍梯度下降法。

4. 梯度下降法

  • Start with initial guesses
  • Start at 0,0 (or any other value)
  • Keeping changing $\theta_0$ and $\theta_1$ a little bit to try and reduce J($\theta_0$, $\theta_1$)
  • Each time you change the parameters, you select the gradient which reduces J($\theta_0$, $\theta_1$) the most possible
  • Repeat
  • Do so until you converge to a local minimum
    Has an interesting property
    • Where you start can determine which minimum you end up
    • Here we can see one initialization point led to one local minimum
    • The other led to a different one

4.1 具体的计算过程

$$ \theta_j := \theta_j - \alpha \frac\partial{\partial\theta_j}J(\theta_0, \theta_1)$$
(for j = 0 and j = 1)

4.2 Notation

$\alpha$

  • Is a number called the learning rate
  • Controls how big a step you take
    • If α is big have an aggressive gradient descent
    • If α is small take tiny steps
  • Too small
    • Take baby steps
    • Takes too long
  • Too large
    • Can overshoot the minimum and fail to converge

4.3 Computer

每次都是同时计算 $\theta_0, \theta_1$ 的值,如下:
$$ temp0:= \theta_0 - \alpha \frac\partial{\partial\theta_0}J(\theta_0, \theta_1)$$
$$ temp1:= \theta_1 - \alpha \frac\partial{\partial\theta_1}J(\theta_0, \theta_1)$$
$$ \theta_0 := temp0 $$
$$ \theta_1 := temp1 $$

4.4 利用梯度下降法求解线性回归问题

$$ \frac\partial{\partial\theta_j}J(\theta_0, \theta_1) $$

$$ =\frac\partial{\partial\theta_j} * \frac1{2m}\sum_{i=1}^{m}{(h_{\theta}(x^{(i)}) - y^{(i)})^2} $$

$$ =\frac\partial{\partial\theta_j} * \frac1{2m}\sum_{i=1}^{m}{(\theta_0 +\theta_1x^{(i)} - y^{(i)})^2} $$

对于 j = 0 or 1 的情况有:
j = 0:
$$ \frac\partial{\partial\theta_0}J(\theta_0, \theta_1) = \frac1{m}\sum_{i=1}^{m}(h_{\theta}(x^{(i)}) - y^{(i)})$$
j = 1:
$$ \frac\partial{\partial\theta_1}J(\theta_0, \theta_1) = \frac1{m}\sum_{i=1}^{m}(h_{\theta}(x^{(i)}) - y^{(i)})*x^{(i)}$$

4.5 梯度下降法的证明

1、如果优化函数存在解析解。例如我们求最值一般是对优化函数求导,找到导数为0的点。如果代价函数能简单求导,并且求导后为0的式子存在解析解,那么我们就可以直接得到最优的参数。

2、如果式子很难求导,例如函数里面存在隐含的变量或者变量相互间存在耦合,互相依赖的情况。或者求导后式子得不到解释解,或者未知参数的个数大于方程组的个数等。这时候使用迭代算法来一步一步找到最优解。

  • 当目标函数是凸函数时,梯度下降法的解是全局最优解
  • 一般情况下,其解不保证是全局最优解

凸函数

凸函数就是一个定义在某个向量空间的凸子集C(区间)上的实值函数 f,而且对于凸子集C中任意两个向量 $x_1$, $x_2$ 有:
$$f(\frac{x_1+x_2}{2}) \le \frac{f(x_1)+f(x_2)}{2}$$
于是容易得出对于任意(0,1)中有理数 p,有:
$$f(px_1+(1-p)x_2) \le pf(x_1)+(1-p)f(x_2)$$
如果 f 连续,那么 p 可以改成任意(0,1)中实数。则 f 称为 I 上的凸函数,当且仅当其上境图(在函数图像上方的点集)为一个凸集。

梯度下降法的使用

我们首先在函数上任选一点,计算其损失(即我们上面的L(w)) ,然后按照某一规则寻找更低的一点计算新的损失,只要新损失更小(最小化问题),我们就继续下降,直到达到一个可接受的优化目标。
梯度下降方法分为两个部分,第一部分是整体上,我们使用某步长不断下降求损失函数,第二部分是为了防止步长太长导致最后无法收敛,每次当损失上升的时候都调整步长。
通常实践中使用时,都是用一些开源算法,很少需要深度改进,比如使用 libsvm 可以直接求解逻辑回归。

Reference

http://www.cnblogs.com/yysblog/p/3268508.html
http://52opencourse.com/125/coursera%E5%85%AC%E5%BC%80%E8%AF%BE%E7%AC%94%E8%AE%B0-%E6%96%AF%E5%9D%A6%E7%A6%8F%E5%A4%A7%E5%AD%A6%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E7%AC%AC%E5%85%AD%E8%AF%BE-%E9%80%BB%E8%BE%91%E5%9B%9E%E5%BD%92-logistic-regression
http://www.cnblogs.com/chaoren399/p/4851658.html

要解决的问题

json 反序列化 bean 时,当某个字段在 json 中为 null 时,使用 bean 中声明的默认值。

Person 类我们改造下:

1
2
3
4
5
public class Person {
private String name;
// Address is a enum: {CH, US, GZ}
private Region region = Region.GZ;
}

仍然以 Person 类举例,如果 json 串是:

1
{"name":"robert", "region":null}

希望反序列化后的 bean 为

1
Person(name="robert", region=Region.GZ)

解决过程

在上一篇文章 lombok 的 AllArgs 导致 Jackson 反序列化丢失字段默认值 中可以看到 json 反序列化为 bean 的过程,一般情况下,是先调用默认构造函数生成 bean,然后根据 json 中出现的字段挨个赋值。
所以反序列化生成的 bean 的 region 肯定为 null。

解决方案

1. @JsonInclude(Include.NON_NULL) 可行吗?

不可行,这个注解是序列化时忽略 null 值,反序列化时不生效,基本上反序列化时我们不能做什么事情。

2. JsonCreator 可行吗?

在 Region 枚举里写 JsonCreator:

1
2
3
4
5
6
7
8
9
10
@JsonCreator
public static Region getRegion(String value) {
for (Region region : Region.values()) {
if (region.name().equals(value)) {
return region;
}
}
return Region.GZ;
}

直接将 {"region": null} 反序列化为 Region 是可行的,会调用 JsonCreator,但是如果是反序列化 Person 则不会调用到 JsonCreator,为什么呢?

debug 过程:
如前文所述,会调用到 com.fasterxml.jackson.databind.deser.BeanDeserializer#deserialize 这个函数中,然后会调用到
com.fasterxml.jackson.databind.deser.SettableBeanProperty#deserialize,这个函数的实现是:

1
2
3
4
5
6
7
8
9
10
11
public final Object deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
JsonToken t = p.getCurrentToken();
if (t == JsonToken.VALUE_NULL) {
return _valueDeserializer.getNullValue(ctxt);
}
if (_valueTypeDeserializer != null) {
return _valueDeserializer.deserializeWithType(p, ctxt, _valueTypeDeserializer);
}
return _valueDeserializer.deserialize(p, ctxt);
}

所以在这里会把 null 值拦住,直接返回 getNullValue 的结果。

3.自定义 deserializer

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RegionDeserializer extends JsonDeserializer<Region> {
@Override
public Region deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException {
JsonNode node = jsonParser.getCodec().readTree(jsonParser);
Region region = Region.GZ;
try {
if (StringUtils.isNotEmpty(node.textValue())) {
return Region.getRegion(node.textValue());
}
} catch (Exception e) {
type = Region.GZ;
}
return region;
}
@Override
public Region getNullValue(DeserializationContext ctxt) {
return Region.GZ;
}
}

Person 类改为:

1
2
3
4
5
6
7
public class Person {
private String name;
// Address is a enum: {CH, US, GZ}
@JsonDeserialize(using = RegionDeserializer.class)
private Region region = Region.GZ;
}

这样,在com.fasterxml.jackson.databind.deser.SettableBeanProperty#deserialize这个方法里,碰到 null 值,就会返回 getNullValue 的结果,即 Region.GZ,如果不是 null 会进入 getRegion 函数处理,也能处理其他情况。

要解决的问题

希望在反序列化 json 到 bean 时,对于 json 中未出现的字段,在 bean 中赋上默认值。

例如
Person 类如下:

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class Person {
private String name;
private String address = "beijing"; // default value if json missing the age field
}

json:

1
{"name":"robert"}

反序列化后的 bean 为

1
Person(name="robert", address="beijing")

但实际上,发序列化的结果为

1
Person(name="robert", address=null)

解决过程

1. 查看 maven 版本

项目中 jackson 的配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Jackson dependency versions -->
<jackson.version>2.6.5</jackson.version>

配置升到最新后问题仍然存在。

2. debug json 反序列化过程,找到原因

json 反序列化是从

1
com.fasterxml.jackson.databind.ObjectMapper#_readMapAndClose

这个方法调用开始的,里面的一段代码为:

1
2
3
4
5
6
7
8
9
DeserializationConfig cfg = getDeserializationConfig();
DeserializationContext ctxt = createDeserializationContext(jp, cfg);
JsonDeserializer<Object> deser = _findRootDeserializer(ctxt, valueType);
if (cfg.useRootWrapping()) {
result = _unwrapAndDeserialize(jp, ctxt, cfg, valueType, deser);
} else {
result = deser.deserialize(jp, ctxt);
}
ctxt.checkUnresolvedObjectId();

在第 3 行找到的 JsonDeserializer 是 com.fasterxml.jackson.databind.deser.BeanDeserializer
从第 7 行代表进入 com.fasterxml.jackson.databind.deser.BeanDeserializer#deserialize(com.fasterxml.jackson.core.JsonParser, com.fasterxml.jackson.databind.DeserializationContext)

函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Object deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
// common case first
if (p.isExpectedStartObjectToken()) {
if (_vanillaProcessing) {
return vanillaDeserialize(p, ctxt, p.nextToken());
}
p.nextToken();
if (_objectIdReader != null) {
return deserializeWithObjectId(p, ctxt);
}
return deserializeFromObject(p, ctxt);
}
JsonToken t = p.getCurrentToken();
return _deserializeOther(p, ctxt, t);
}

vanillaDeserialize 为 false,最后走到了第 11 行,最后到了

1
com.fasterxml.jackson.databind.deser.BeanDeserializer#_deserializeUsingPropertyBased

然后到

1
com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator#build

在这个函数里有这样一段代码:

1
Object bean = _valueInstantiator.createFromObjectWith(ctxt, buffer.getParameters(_allProperties));

调用的是

1
com.fasterxml.jackson.databind.deser.ValueInstantiator#createFromObjectWith(com.fasterxml.jackson.databind.DeserializationContext, java.lang.Object[])

可以发现,createFromObjectWith 的第二个参数是数组,json 解出来的字段都放在了这个数组里。然后调用了 Person 类的全参构造函数,对于
缺失的字段自动补 null 值,这样就导致了 address 字段为 null。

3. 解决方案

去掉 @AllArgsConstructor 时,没有问题了,因为此时找到的 com.fasterxml.jackson.databind.deser.BeanDeserializer 的 vanillaDeserialize 字段为 true,会调用 vanillaDeserialize(p, ctxt, p.nextToken());,这个函数的实现非常明确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final Object vanillaDeserialize(JsonParser p, DeserializationContext ctxt, JsonToken t) throws IOException {
final Object bean = _valueInstantiator.createUsingDefault(ctxt);
// [databind#631]: Assign current value, to be accessible by custom serializers
p.setCurrentValue(bean);
if (p.hasTokenId(JsonTokenId.ID_FIELD_NAME)) {
String propName = p.getCurrentName();
do {
p.nextToken();
SettableBeanProperty prop = _beanProperties.find(propName);
if (prop != null) { // normal case
try {
prop.deserializeAndSet(p, ctxt, bean);
} catch (Exception e) {
wrapAndThrow(e, bean, propName, ctxt);
}
continue;
}
handleUnknownVanilla(p, ctxt, bean, propName);
} while ((propName = p.nextFieldName()) != null);
}
return bean;
}

先用默认构造函数生成 bean,此时的 bean 是有默认值的,然后将 json 中出现的字段的值赋值给 bean,这样 address 就有值了。

4. 根本原因

看上去是声明了全参构造函数导致的,所以想尝试自己写全参构造函数,在 address 为 null 时给其赋默认值。
写完如下:

1
2
3
4
5
6
7
public Person(String name, String address){
this.name = name;
this.address = address;
if(this.address == null){
this.address = "beijing";
}
}

继续走刚才 debug 的流程,发现居然没有请求这个全参构造函数。

那问题就是 @AllArgsConstructor 生成的全参函数有不同之处,jackson 能够识别出来并用于反序列化。查看 jar 包中 Person 类的代码发现其全参构造函数如下:

1
2
3
4
5
@ConstructorProperties({"name", "address"})
public Person(String name, String address){
this.name = name;
this.address = address;
}

所以,区别就是 @ConstructorProperties({"name", "address"}) 这个注解,这个注解的作用是指定构造函数参数的名字,Spring 可根据参数的名字注入 bean。
但最终为什么这样注解了,jackson 就调用了全参构造函数还不得而知,猜测是 jackson 在 _findRootDeserializer 这一步时,是找最适合的构造函数。

可以通过设置 @AllArgsConstructor(suppressConstructorProperties=true) 来禁用 @ConstructorProperties.

结论

Lombok 的 @AllArgsConstructor 注解导致 Jackson 反序列化时调用了全参构造函数,将没有出现的字段都赋值为 null 了。

修改方式:

  1. 不使用 @AllArgsConstructor
  2. 使用 @AllArgsConstructor 但是不让其在全参构造函数上加入 ConstructorProperties 注解,声明方式改为 @AllArgsConstructor(suppressConstructorProperties = true)

目录

  1. 服务异常的处理流程
  2. 负载
  3. 内存
  4. 服务指标
  5. 工具

1. 服务异常的处理流程

2. 负载

2.1 查看机器 cpu 的负载

1
top -b -n 1 |grep java|awk '{print "VIRT:"$5,"RES:"$6,"cpu:"$9"%","mem:"$10"%"}'

2.2 查找 cpu 占用率高的线程

top -p 25603 -H
printf 0x%x 25842
jstack 25603 | grep 0x64f2

cat /proc/interrupts

(1)CPU
(2)Memory
(3)IO
(4)Network

可以从以下几个方面监控CPU的信息:
(1)中断;
(2)上下文切换;
(3)可运行队列;
(4)CPU 利用率。

3. 内存

3.1 系统内存

free 命令
[root@server ~]# free

1
2
3
4
total used free shared buffers cached
Mem: 3266180 3250000 10000 0 201000 3002000
-/+ buffers/cache: 47000 3213000
Swap: 2048276 80160 1968116

这里的默认显示单位是kb。
各项指标解释

  • total:总计物理内存的大小。
  • used:已使用多大。
  • free:可用有多少。
  • Shared:多个进程共享的内存总额。
  • buffers: 磁盘缓存的大小。
  • cache:磁盘缓存的大小。
  • -/+ buffers/cached): used:已使用多大,free:可用有多少。
  • 已用内存 = 系统used memory - buffers - cached
    (47000 = 3250000-201000-3002000)
  • 可用内存 = 系统free memory + buffers + cached
    (3213000 = 10000+201000+3002000)

什么是buffer/cache?

  • buffer 指 Linux 内存的:Buffer cache,缓冲区缓
  • cache 指 Linux内存中的:Page cache,页面缓存

page cache
page cache 主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有 read/write 操作的时候。如果你仔细想想的话,作为可以映射文件到内存的系统调用:mmap是不是很自然的也应该用到 page cache?在当前的系统实现里,page cache 也被作为其它文件类型的缓存设备来用,所以事实上 page cache 也负责了大部分的块设备文件的缓存工作。

buffer cache
buffer cache 主要用来在系统对块设备进行读写的时候,对块进行数据缓存的系统来使用。这意味着某些对块的操作会使用 buffer cache 进行缓存,比如我们在格式化文件系统的时候。一般情况下两个缓存系统是一起配合使用的,比如当我们对一个文件进行写操作的时候,page cache 的内容会被改变,而 buffer cache 则可以用来将 page 标记为不同的缓冲区,并记录是哪一个缓冲区被修改了。这样,内核在后续执行脏数据的回写(writeback)时,就不用将整个 page 写回,而只需要写回修改的部分即可。

在当前的内核中,page cache 是针对内存页的缓存,说白了就是,如果有内存是以page进行分配管理的,都可以使用page cache作为其缓存来管理使用。
当然,不是所有的内存都是以页(page)进行管理的,也有很多是针对块(block)进行管理的,这部分内存使用如果要用到 cache 功能,则都集中到buffer cache中来使用。(从这个角度出发,是不是buffer cache改名叫做block cache更好?)然而,也不是所有块(block)都有固定长度,系统上块的长度主要是根据所使用的块设备决定的,而页长度在X86上无论是32位还是64位都是4k。

系统如何回收cache?

Linux内核会在内存将要耗尽的时候,触发内存回收的工作,以便释放出内存给急需内存的进程使用。一般情况下,这个操作中主要的内存释放都来自于对buffer/cache的释放。尤其是被使用更多的cache空间。既然它主要用来做缓存,只是在内存够用的时候加快进程对文件的读写速度,那么在内存压力较大的情况下,当然有必要清空释放cache,作为free空间分给相关进程使用。所以一般情况下,我们认为buffer/cache空间可以被释放,这个理解是正确的。

但是这种清缓存的工作也并不是没有成本。理解cache是干什么的就可以明白清缓存必须保证cache中的数据跟对应文件中的数据一致,才能对cache进行释放。所以伴随着cache清除的行为的,一般都是系统IO飙高。因为内核要对比cache中的数据和对应硬盘文件上的数据是否一致,如果不一致需要写回,之后才能回收。

在系统中除了内存将被耗尽的时候可以清缓存以外,我们还可以人工触发缓存清除的操作。

3.2 进程内存

3.2.1 进程内存统计

/proc/[pid]/status
通过/proc//status可以查看进程的内存使用情况,包括虚拟内存大小(VmSize),物理内存大小(VmRSS),数据段大小(VmData),栈的大小(VmStk),代码段的大小(VmExe),共享库的代码段大小(VmLib)等等。

cat /proc/[pid]/status

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Name: gedit /*进程的程序名*/
State: S (sleeping) /*进程的状态信息,具体参见http://blog.chinaunix.net/u2/73528/showart_1106510.html*/
Tgid: 9744 /*线程组号*/
Pid: 9744 /*进程pid*/
PPid: 7672 /*父进程的pid*/
TracerPid: 0 /*跟踪进程的pid*/
VmPeak: 60184 kB /*进程地址空间的大小*/
VmSize: 60180 kB /*进程虚拟地址空间的大小reserved_vm:进程在预留或特殊的内存间的物理页*/
VmLck: 0 kB /*进程已经锁住的物理内存的大小.锁住的物理内存不能交换到硬盘*/
VmHWM: 18020 kB /*文件内存映射和匿名内存映射的大小*/
VmRSS: 18020 kB /*应用程序正在使用的物理内存的大小,就是用ps命令的参数rss的值 (rss)*/
VmData: 12240 kB /*程序数据段的大小(所占虚拟内存的大小),存放初始化了的数据*/
VmStk: 84 kB /*进程在用户态的栈的大小*/
VmExe: 576 kB /*程序所拥有的可执行虚拟内存的大小,代码段,不包括任务使用的库 */
VmLib: 21072 kB /*被映像到任务的虚拟内存空间的库的大小*/
VmPTE: 56 kB /*该进程的所有页表的大小*/
Threads: 1 /*共享使用该信号描述符的任务的个数*/

3.2.2 JVM 内存分配

java内存组成介绍:堆(Heap)和非堆(Non-heap)内存

按照官方的说法:“Java 虚拟机具有一个堆,堆是运行时数据区域,所有类实例和数组的内存均从此处分配。堆是在 Java 虚拟机启动时创建的。”“在JVM中堆之外的内存称为非堆内存(Non-heap memory)”。可以看出JVM主要管理两种类型的内存:堆和非堆。简单来说堆就是Java代码可及的内存,是留给开发人员使用的;非堆就是JVM留给 自己用的,所以方法区、JVM内部处理或优化所需的内存(如JIT编译后的代码缓存)、每个类结构(如运行时常数池、字段和方法数据)以及方法和构造方法 的代码都在非堆内存中。

IMAGE

  1. JVM本身需要的内存,包括其加载的第三方库以及这些库分配的内存
  2. NIO的DirectBuffer是分配的native memory
  3. 内存映射文件,包括JVM加载的一些JAR和第三方库,以及程序内部用到的。上面 pmap 输出的内容里,有一些静态文件所占用的大小不在Java的heap里,因此作为一个Web服务器,赶紧把静态文件从这个Web服务器中人移开吧,放到nginx或者CDN里去吧。
  4. JIT, JVM会将Class编译成native代码,这些内存也不会少,如果使用了Spring的AOP,CGLIB会生成更多的类,JIT的内存开销也会随之变大,而且Class本身JVM的GC会将其放到Perm Generation里去,很难被回收掉,面对这种情况,应该让JVM使用ConcurrentMarkSweep GC,并启用这个GC的相关参数允许将不使用的class从Perm Generation中移除, 参数配置: -XX:+UseConcMarkSweepGC -X:+CMSPermGenSweepingEnabled -X:+CMSClassUnloadingEnabled,如果不需要移除而Perm Generation空间不够,可以加大一点: -X:PermSize=256M -X:MaxPermSize=512M
  5. JNI,一些JNI接口调用的native库也会分配一些内存,如果遇到JNI库的内存泄露,可以使用valgrind等内存泄露工具来检测
  6. 线程栈,每个线程都会有自己的栈空间,如果线程一多,这个的开销就很明显了
  7. jmap/jstack 采样,频繁的采样也会增加内存占用,如果你有服务器健康监控,记得这个频率别太高,否则健康监控变成致病监控了。

1.方法区

也称”永久代” 、“非堆”,它用于存储虚拟机加载的类信息、常量、静态变量、是各个线程共享的内存区域。默认最小值为16MB,最大值为64MB,可以通过-XX:PermSize 和 -XX:MaxPermSize 参数限制方法区的大小。

运行时常量池:是方法区的一部分,Class文件中除了有类的版本、字段、方法、接口等描述信息外,还有一项信息是常量池,用于存放编译器生成的各种符号引用,这部分内容将在类加载后放到方法区的运行时常量池中。

2.虚拟机栈

描述的是java 方法执行的内存模型:每个方法被执行的时候 都会创建一个“栈帧”用于存储局部变量表(包括参数)、操作栈、方法出口等信息。每个方法被调用到执行完的过程,就对应着一个栈帧在虚拟机栈中从入栈到出栈的过程。声明周期与线程相同,是线程私有的。

局部变量表存放了编译器可知的各种基本数据类型(boolean、byte、char、short、int、float、long、double)、对象引用(引用指针,并非对象本身),其中64位长度的long和double类型的数据会占用2个局部变量的空间,其余数据类型只占1个。局部变量表所需的内存空间在编译期间完成分配,当进入一个方法时,这个方法需要在栈帧中分配多大的局部变量是完全确定的,在运行期间栈帧不会改变局部变量表的大小空间。

3.本地方法栈

与虚拟机栈基本类似,区别在于虚拟机栈为虚拟机执行的java方法服务,而本地方法栈则是为Native方法服务。

4.堆

也叫做java 堆、GC堆是java虚拟机所管理的内存中最大的一块内存区域,也是被各个线程共享的内存区域,在JVM启动时创建。该内存区域存放了对象实例及数组(所有new的对象)。其大小通过-Xms(最小值)和-Xmx(最大值)参数设置,-Xms为JVM启动时申请的最小内存,默认为操作系统物理内存的1/64但小于1G,-Xmx为JVM可申请的最大内存,默认为物理内存的1/4但小于1G,默认当空余堆内存小于40%时,JVM会增大Heap到-Xmx指定的大小,可通过-XX:MinHeapFreeRation=来指定这个比列;当空余堆内存大于70%时,JVM会减小heap的大小到-Xms指定的大小,可通过XX:MaxHeapFreeRation=来指定这个比列,对于运行系统,为避免在运行时频繁调整Heap的大小,通常-Xms与-Xmx的值设成一样。

由于现在收集器都是采用分代收集算法,堆被划分为新生代和老年代。新生代主要存储新创建的对象和尚未进入老年代的对象。老年代存储经过多次新生代GC(Minor GC)任然存活的对象。

5.程序计数器

是最小的一块内存区域,它的作用是当前线程所执行的字节码的行号指示器,在虚拟机的模型里,字节码解释器工作时就是通过改变这个计数器的值来选取下一条需要执行的字节码指令,分支、循环、异常处理、线程恢复等基础功能都需要依赖计数器完成。

3.2.3 直接内存

直接内存并不是虚拟机内存的一部分,也不是Java虚拟机规范中定义的内存区域。jdk1.4中新加入的NIO,引入了通道与缓冲区的IO方式,它可以调用Native方法直接分配堆外内存,这个堆外内存就是本机内存,不会影响到堆内存的大小。

3.2.4 JVM 内存分析

查看 JVM 堆内存情况

jmap -heap [pid]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
[root@server ~]$ jmap -heap 837
Attaching to process ID 837, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 24.71-b01
using thread-local object allocation.
Parallel GC with 4 thread(s)//GC 方式
Heap Configuration: //堆内存初始化配置
MinHeapFreeRatio = 0 //对应jvm启动参数-XX:MinHeapFreeRatio设置JVM堆最小空闲比率(default 40)
MaxHeapFreeRatio = 100 //对应jvm启动参数 -XX:MaxHeapFreeRatio设置JVM堆最大空闲比率(default 70)
MaxHeapSize = 2082471936 (1986.0MB) //对应jvm启动参数-XX:MaxHeapSize=设置JVM堆的最大大小
NewSize = 1310720 (1.25MB)//对应jvm启动参数-XX:NewSize=设置JVM堆的‘新生代’的默认大小
MaxNewSize = 17592186044415 MB//对应jvm启动参数-XX:MaxNewSize=设置JVM堆的‘新生代’的最大大小
OldSize = 5439488 (5.1875MB)//对应jvm启动参数-XX:OldSize=<value>:设置JVM堆的‘老生代’的大小
NewRatio = 2 //对应jvm启动参数-XX:NewRatio=:‘新生代’和‘老生代’的大小比率
SurvivorRatio = 8 //对应jvm启动参数-XX:SurvivorRatio=设置年轻代中Eden区与Survivor区的大小比值
PermSize = 21757952 (20.75MB) //对应jvm启动参数-XX:PermSize=<value>:设置JVM堆的‘永生代’的初始大小
MaxPermSize = 85983232 (82.0MB)//对应jvm启动参数-XX:MaxPermSize=<value>:设置JVM堆的‘永生代’的最大大小
G1HeapRegionSize = 0 (0.0MB)
Heap Usage://堆内存使用情况
PS Young Generation
Eden Space://Eden区内存分布
capacity = 33030144 (31.5MB)//Eden区总容量
used = 1524040 (1.4534378051757812MB) //Eden区已使用
free = 31506104 (30.04656219482422MB) //Eden区剩余容量
4.614088270399305% used //Eden区使用比率
From Space: //其中一个Survivor区的内存分布
capacity = 5242880 (5.0MB)
used = 0 (0.0MB)
free = 5242880 (5.0MB)
0.0% used
To Space: //另一个Survivor区的内存分布
capacity = 5242880 (5.0MB)
used = 0 (0.0MB)
free = 5242880 (5.0MB)
0.0% used
PS Old Generation //当前的Old区内存分布
capacity = 86507520 (82.5MB)
used = 0 (0.0MB)
free = 86507520 (82.5MB)
0.0% used
PS Perm Generation//当前的 “永生代” 内存分布
capacity = 22020096 (21.0MB)
used = 2496528 (2.3808746337890625MB)
free = 19523568 (18.619125366210938MB)
11.337498256138392% used
670 interned Strings occupying 43720 bytes.

关于这里的几个generation网上资料一大把就不细说了,这里算一下求和可以得知前者总共给Java环境分配了644M的内存,而ps输出的VSZ和RSS分别是7.4G和2.9G,这到底是怎么回事呢?
前面jmap输出的内容里,MaxHeapSize 是在命令行上配的,-Xmx4096m,这个java程序可以用到的最大堆内存。
VSZ是指已分配的线性空间大小,这个大小通常并不等于程序实际用到的内存大小,产生这个的可能性很多,比如内存映射,共享的动态库,或者向系统申请了更多的堆,都会扩展线性空间大小,要查看一个进程有哪些内存映射,可以使用 pmap 命令来查看:
pmap -x [pid]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@server ~]$ pmap -x 837
837: java
Address Kbytes RSS Dirty Mode Mapping
0000000040000000 36 4 0 r-x-- java
0000000040108000 8 8 8 rwx-- java
00000000418c9000 13676 13676 13676 rwx-- [ anon ]
00000006fae00000 83968 83968 83968 rwx-- [ anon ]
0000000700000000 527168 451636 451636 rwx-- [ anon ]
00000007202d0000 127040 0 0 ----- [ anon ]
...
...
00007f55ee124000 4 4 0 r-xs- az.png
00007fff017ff000 4 4 0 r-x-- [ anon ]
ffffffffff600000 4 0 0 r-x-- [ anon ]
---------------- ------ ------ ------
total kB 7796020 3037264 3023928

这里可以看到很多anon,这些表示这块内存是由mmap分配的。

RSZ是Resident Set Size,常驻内存大小,即进程实际占用的物理内存大小, 在现在这个例子当中,RSZ和实际堆内存占用差了2.3G,这2.3G的内存组成分别为:

查看 JVM 堆各个分区的内存情况

jstat -gcutil [pid]

1
2
3
4
[root@server ~]$ jstat -gcutil 837 1000 20
S0 S1 E O P YGC YGCT FGC FGCT GCT
0.00 80.43 24.62 87.44 98.29 7101 119.652 40 19.719 139.371
0.00 80.43 33.14 87.44 98.29 7101 119.652 40 19.719 139.371

分析 JVM 堆内存中的对象

查看存活的对象统计
jmap -histo:live [pid]

dump 内存
jmap -dump:format=b,file=heapDump [pid]

然后用jhat命令可以参看
jhat -port 5000 heapDump
在浏览器中访问:http://localhost:5000/ 查看详细信息

4. 服务指标

4.1 响应时间(RT)

响应时间是指系统对请求作出响应的时间。直观上看,这个指标与人对软件性能的主观感受是非常一致的,因为它完整地记录了整个计算机系统处理请求的时间。由于一个系统通常会提供许多功能,而不同功能的处理逻辑也千差万别,因而不同功能的响应时间也不尽相同,甚至同一功能在不同输入数据的情况下响应时间也不相同。所以,在讨论一个系统的响应时间时,人们通常是指该系统所有功能的平均时间或者所有功能的最大响应时间。当然,往往也需要对每个或每组功能讨论其平均响应时间和最大响应时间。

对于单机的没有并发操作的应用系统而言,人们普遍认为响应时间是一个合理且准确的性能指标。需要指出的是,响应时间的绝对值并不能直接反映软件的性能的高低,软件性能的高低实际上取决于用户对该响应时间的接受程度。对于一个游戏软件来说,响应时间小于100毫秒应该是不错的,响应时间在1秒左右可能属于勉强可以接受,如果响应时间达到3秒就完全难以接受了。而对于编译系统来说,完整编译一个较大规模软件的源代码可能需要几十分钟甚至更长时间,但这些响应时间对于用户来说都是可以接受的。

4.2 吞吐量(Throughput)

吞吐量是指系统在单位时间内处理请求的数量。对于无并发的应用系统而言,吞吐量与响应时间成严格的反比关系,实际上此时吞吐量就是响应时间的倒数。前面已经说过,对于单用户的系统,响应时间(或者系统响应时间和应用延迟时间)可以很好地度量系统的性能,但对于并发系统,通常需要用吞吐量作为性能指标。

对于一个多用户的系统,如果只有一个用户使用时系统的平均响应时间是t,当有你n个用户使用时,每个用户看到的响应时间通常并不是n×t,而往往比n×t小很多(当然,在某些特殊情况下也可能比n×t大,甚至大很多)。这是因为处理每个请求需要用到很多资源,由于每个请求的处理过程中有许多不走难以并发执行,这导致在具体的一个时间点,所占资源往往并不多。也就是说在处理单个请求时,在每个时间点都可能有许多资源被闲置,当处理多个请求时,如果资源配置合理,每个用户看到的平均响应时间并不随用户数的增加而线性增加。实际上,不同系统的平均响应时间随用户数增加而增长的速度也不大相同,这也是采用吞吐量来度量并发系统的性能的主要原因。一般而言,吞吐量是一个比较通用的指标,两个具有不同用户数和用户使用模式的系统,如果其最大吞吐量基本一致,则可以判断两个系统的处理能力基本一致。

4.3 并发用户数

并发用户数是指系统可以同时承载的正常使用系统功能的用户的数量。与吞吐量相比,并发用户数是一个更直观但也更笼统的性能指标。实际上,并发用户数是一个非常不准确的指标,因为用户不同的使用模式会导致不同用户在单位时间发出不同数量的请求。一网站系统为例,假设用户只有注册后才能使用,但注册用户并不是每时每刻都在使用该网站,因此具体一个时刻只有部分注册用户同时在线,在线用户就在浏览网站时会花很多时间阅读网站上的信息,因而具体一个时刻只有部分在线用户同时向系统发出请求。这样,对于网站系统我们会有三个关于用户数的统计数字:注册用户数、在线用户数和同时发请求用户数。由于注册用户可能长时间不登陆网站,使用注册用户数作为性能指标会造成很大的误差。而在线用户数和同事发请求用户数都可以作为性能指标。相比而言,以在线用户作为性能指标更直观些,而以同时发请求用户数作为性能指标更准确些。

4.4 QPS每秒查询率(Query Per Second)

每秒查询率QPS是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,在因特网上,作为域名系统服务器的机器的性能经常用每秒查询率来衡量。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。

从以上概念来看吞吐量和响应时间是衡量系统性能的重要指标,QPS虽然和吞吐量的计量单位不同,但应该是成正比的,任何一个指标都可以含量服务器的并行处理能力。当然Throughput更关心数据量,QPS更关心处理笔数。

4.5 CPU利用率

CPU Load Average < CPU个数 核数 0.7

Context Switch Rate
就是Process(Thread)的切换,如果切换过多,会让CPU忙于切换,也会导致影响吞吐量。《高性能服务器架构 》这篇文章的第2节就是说的是这个问题的。究竟多少算合适?google了一大圈,没有一个确切的解释。Context Switch大体上由两个部分组成:中断和进程(包括线程)切换,一次中断(Interrupt)会引起一次切换,进程(线程)的创建、激活之类的也会引起一次切换。CS的值也和TPS(Transaction Per Second)相关的,假设每次调用会引起N次CS,那么就可以得出

Context Switch Rate = Interrupt Rate + TPS* N

CSR减掉IR,就是进程/线程的切换,假如主进程收到请求交给线程处理,线程处理完毕归还给主进程,这里就是2次切换。也可以用CSR、IR、TPS的值代入公式中,得出每次事物导致的切换数。因此,要降低CSR,就必须在每个TPS引起的切换上下功夫,只有N这个值降下去,CSR就能降低,理想情况下N=0,但是无论如何如果N >= 4,则要好好检查检查。另外网上说的CSR<5000,我认为标准不该如此单一。

这三个指标在 LoadRunner 中可以监控到;另外,在 linux 中,也可以用 vmstat 查看r(Load Arerage),in(Interrupt)和cs(Context Switch)

5. 工具

uptime

dmesg

top
查看进程活动状态以及一些系统状况

vmstat
查看系统状态、硬件和系统信息等

iostat
查看CPU 负载,硬盘状况

sar
综合工具,查看系统状况

mpstat
查看多处理器状况

netstat
查看网络状况

iptraf
实时网络状况监测

tcpdump
抓取网络数据包,详细分析

mpstat
查看多处理器状况

tcptrace
数据包分析工具

netperf
网络带宽工具

dstat
综合工具,综合了 vmstat, iostat, ifstat, netstat 等多个信息

Reference

http://tmq.qq.com/2016/07/it-is-necessary-to-know-the-background-performance-test/
https://www.ibm.com/developerworks/java/library/j-nativememory-linux/
http://www.oracle.com/technetwork/java/javase/index-137495.html
http://www.hollischuang.com/archives/303