1. 前言

线性回归出的模型如果出现过拟合怎么办?

  1. 脏数据太多,需要清洗数据
  2. 增加训练数据的数量和多样性
  3. 特征数量过多,使用正则化减少特征数量

2. 向量的范数

向量的范数是一种用来刻画向量大小的一种度量。实数的绝对值,复数的模,三维空间向量的长度,都是抽象范数概念的原型。上述三个对象统一记为 x ,衡量它们大小的量记为 ||x|| (我们用单竖线表示绝对值,双竖线表示范数),显然它们满足以下三条性质:

L0范数:向量中非零元素的个数。
L1范数:向量中各个元素绝对值之和,又叫“稀疏规则算子”(Lasso regularization)
L2范数:向量中各个元素平方和再开方
p-范数:$||\textbf{x}||_p = (\sum_{i=1}^N|x_i|^p)^{\frac{1}{p}}$,即向量元素绝对值的p次方和的1/p次幂。

下图展示了 p 取不同值时 unit ball 的形状:

3. 正则化

在统计学的缩减中,引入了惩罚项,减少了不重要的参数,同时还可采用正则化(regularization)减少不重要的参数。
既然是减少特征,那么最容易想到的就是使用 L0 范数,求回归函数中的参数向量 w 的非零元素的个数。如果约束 $‖w‖_0≤k$,就是约束非零元素个数不大于 k。不过很明显,L0 范数是不连续的且非凸的,如果在线性回归中加上 L0 范数的约束,就变成了一个组合优化问题:挑出 $≤k$ 个系数然后做回归,找到目标函数的最小值对应的系数组合,这是一个 NP 问题。

有趣的是,L1 范数也可以达到稀疏的效果,是 L0 范数的最优凸近似。我们把引入 L1 范数的线性回归叫做 Lasso 回归。

3.1 Lasso 回归

Lasso 算法(英语:least absolute shrinkage and selection operator,又译最小绝对值收敛和选择算子、套索算法)是一种同时进行特征选择和正则化(数学)的回归分析方法,旨在增强统计模型的预测准确性和可解释性,最初由斯坦福大学统计学教授 Robert Tibshirani 于 1996 年基于 Leo Breiman 的非负参数推断(Nonnegative Garrote, NNG)提出。

优化目标:min $ 1/N\ast\sum_{i = 1}^{N}{(y_{i} -\omega^{T} x_{i})^{2} }$

Lasso 回归:min $1/N\ast\sum_{i = 1}^{N}{(y_{i} -\omega^{T} x_{i})^{2} } + \lambda||\omega||_{1}$

3.2 Ridge 回归

岭回归是加了二阶正则项的最小二乘,主要适用于过拟合严重或各变量之间存在多重共线性的时候,岭回归是有 bias 的,这里的 bias 是为了让 variance 更小。

Ridge 回归:min $1/N\ast\sum_{i = 1}^{N}{(y_{i} -\omega^{T} x_{i})^{2} } + \lambda ||\omega||_{2}^{2} $

岭回归最先是用来处理特征数多与样本数的情况,现在也用于在估计中加入偏差,从而得到更好的估计。这里引入λ限制了所有w的和,通过引入该惩罚项,能够减少不重要的参数,这个技术在统计学上也叫做缩减。缩减方法可以去掉不重要的参数,因此能更好的理解数据。选取不同的λ进行测试,最后得到一个使得误差最小λ。

缩减方法可以去掉不重要的参数,因此能更好地理解数据。此外,与简单的线性回归相比,缩减法能取得更好的预测效果。

3.3 比较两者

Lasso 回归与 Ridge 回归有共同点,也有区别。

共同点

都能解决两个问题:

  1. 线性回归出现的过拟合现象
  2. 使用 Normal equation 求解时,解决 $(X^TX)$ 不可逆的问题。

区别

岭回归加入的正则项是 L2 范数,其结果可以将偏回归系数往 0 的方向进行压缩,但不会把偏回归系数压缩为 0,即岭回归不会剔除变量。Lasso 回归同样也可以将偏回归系数往 0 方向压缩,但是能够将某些变量的偏回归系数压缩为 0,因此可以起到变量筛选的作用。

红色的椭圆和蓝色的区域的切点就是目标函数的最优解,我们可以看到,如果是圆,则很容易切到圆周的任意一点,但是很难切到坐标轴上,因此没有稀疏;但是如果是菱形或者多边形,则很容易切到坐标轴上,因此很容易产生稀疏的结果。这也说明了为什么 L1 范式会是稀疏的。

Reference

Lasso算法 - 维基百科,自由的百科全书
机器学习方法:回归(二):稀疏与正则约束ridge regression,Lasso

1. SVM 定义

支持向量机,因其英文名为 support vector machine,故一般简称 SVM,通俗来讲,它是一种二类分类模型,其基本模型定义为特征空间上的间隔最大的线性分类器,其学习策略便是间隔最大化,最终可转化为一个凸二次规划问题的求解。

2. 函数间隔和几何间隔

svm_margin
上图中哪个分类器最好呢?

2.1 符号定义

为了方便介绍 SVM,重新定义以下符号。
1.在logstic回归中我们用0,1代表两个类, 现在我们改用-1,+1, 即 y∈{-1,-1}
2.在logistic回归中, 我们的 g 是sigmoid函数, 现在改为:
g(z)=1, z>=0
g(z)=-1, z<0
3.在logistic回归中, 我们的假设函数为 $h_θ(x)$, 现在改为,
$h_{x,b}(x)=g(w^{T}x+b)$, 其中
w相当于$[θ_1,θ_2,θ_3,…θ_n]^T$, b 相当于 $θ_0$,即截距。

2.2 函数间隔(functional margin)

对于一个训练样本 $(x^{(i)}, y^{(i)})$,我们定义它到超平面 (w,b) 的函数间隔为:
$$\widehat\gamma=y^{(i)}(w^{T}x^{(i)}+b)$$
我们希望函数间隔越大越好, 即:
if $y^{(i)}=1$, want $(w^{T}x^{(i)}+b) \gg 0 $ (>>代表远大于)
if $y^{(i)}=-1$, want $(w^{T}x^{(i)}+b) \ll 0$

函数间隔越大,代表我们对于分类的结果非常确定。

但这里有一个漏洞,即可以在不改变这个超平面的情况下可以让函数间隔任意大,只要我们成比增加 w,b 就可以达到这个目的了。例如,我们将 w 变为 2w, b 变为 2b,那么我们的函数间隔将会是原来的两倍。

所以对于整个训练集, 函数间隔定义为:
$$\widehat\gamma=min{\widehat\gamma^{(i)}}$$

2.3 几何间隔(geometric margin)

定义对于一个训练样本 $A(x^{(i)}, y^{(i)})$,它到超平面 $w^{T}x^{(i)}+b$ 的几何距离为 $\gamma^{(i)}$。
设 B 为 A 在超平面上的投影,易得超平面的法向量为 $\frac{w}{||w||}$,则有 $A=B+\gamma^{(i)}\frac{w}{||w||}$,
即 $B = A - \gamma^{(i)}\frac{w}{||w||}$。又因为 B 在超平面上,所以有
$w^{T}(x^{(i)} - \gamma^{(i)}\frac{w}{||w||})+b=0$
故几何距离为:
$$\gamma^{(i)}=(\frac{w}{||w||})^{T}x^{(i)} + \frac{b}{||w||}$$

定义其几何间隔:
$$\gamma^{(i)}=y^{(i)}[(\frac{w}{||w||})^{T}x^{(i)} + \frac{b}{||w||}]$$

所以对于整个训练集, 几何间隔定义为:
$$\gamma=min{\gamma^{(i)}}$$

可以发现,当 $||w||=1$时,$\widehat\gamma^{(i)}=\gamma^{(i)}$

3. 最优间隔分类器

几何间隔就是在求 $\frac{\widehat\gamma}{||w||}$ 的最小值,可以发现函数间隔 $\widehat\gamma$ 可放大缩小,且其对结果不产生影响,所以不妨设令${\widehat\gamma}=1$。
现在,目标函数转为了:
$$max \frac{1}{||w||}, s.t., y^{(i)}(w^{T}x^{(i)}+b)\ge1, i=1,2,3…,n$$
等价于
$$min{\frac12{||w||^2}}, s.t., y^{(i)}(w^{T}x^{(i)}+b)\ge1, i=1,2,3…,n$$

利用拉格朗日乘子法可得:
$$L(w,b,\alpha)=\frac12{||w||^2}-\sum_{i=1}^{n}\alpha_i[y^{(i)}(w^{T}x^{(i)}+b)-1]$$


$$\theta(w)=\displaystyle\max_{\alpha_i\ge0}L(w, b, \alpha)$$

则目标函数变成了:
$$\displaystyle\min_{w,b}\theta(w)=\displaystyle\min_{w,b}\max_{\alpha_i\ge0}L(w, b, \alpha)=p^*$$

4. 求解目标函数

4.1 使用对偶问题求解

SVM 中用到了高维映射,将线性不可分的问题映射为线性可分,且映射函数的具体形式无法提前确定,而往往使用核函数映射后,维度 w 会提升很多,甚至至无穷维。
在原问题下,求解算法的复杂度与样本维度(w 的维度)相关;
在对偶问题下,求解算法的复杂度与样本数量(等于拉格朗日算子 a 的数量)相关。

因此,如果是做线性分类,且样本维度低于样本数量的话,可以在原问题下求解。例如 Liblinear 的线性 SVM 默认做法就是这样的;
但如果是做非线性分类,那就会涉及到升维(比如使用高斯核做核函数,其实是将样本升到无穷维),升维后的样本维度往往会远大于样本数量,此时显然在对偶问题下求解会更好。

直接求解原问题有多难? TBD

4.1.1 使用对偶问题的解法

对于不等书约束条件的最优化问题,使用拉格朗日对偶问题来求解。具体介绍见之前的 blog: 拉格朗日乘子法

用 $p^*$ 表示这个问题的最优值,这个问题和我们最初的问题是等价的。
不过,把最小和最大的位置交换一下:
$$\displaystyle\max_{\alpha_i\ge0}\min_{w,b}L(w, b, \alpha)=d^*$$

交换以后的问题不再等价于原问题,新问题的最优值用 $d^*$ 来表示。并且,有 $d^*$ ≤ $p^*$。

第二个问题的最优值 $d^*$ 提供了一个第一个问题的最优
值 $p^*$ 的一个下界。经过论证,原始问题满足强对偶所需要的条件,故这两者相等,所以可以通过求解第二个问题来间接地求解第一个问题。

4.1.2 优化公式

要让 L 关于 w 和 b 最小化,分别对 w,b 求偏导数,即令
$\frac{∂L}{∂w}$ 和 $\frac{∂L}{∂b}$ 等于零,有:
$$\frac{∂L}{∂w}=w-\sum_{i=1}^{n}\alpha_iy^{(i)}x^{(i)}=0$$
$$\frac{∂L}{∂b}=\sum_{i=1}^{n}\alpha_iy^{(i)}=0$$

将上式代入:
$$L(w,b,\alpha)=\frac12{||w||^2}-\sum_{i=1}^{n}\alpha_i[y^{(i)}(w^{T}x^{(i)}+b)-1]$$

推导过程如下:
svm_lagrange_dual_1

这样求出 $\alpha$ 后即可得到 w 和 b。

svm_lagrange_dual_2

现在我们的优化问题变成了如上的形式。对于这个问题,我们有更高效的优化算法,即序列最小优化(SMO)算法。我们通过这个优化算法能得到α,再根据α,我们就可以求解出w和b,进而求得我们最初的目的:找到超平面,即”决策平面”。

4.2 SMO 算法

https://zhuanlan.zhihu.com/p/29212107
https://cloud.tencent.com/developer/article/1076970

SMO_JP_1.jpeg
SMO_JP_2.jpeg

5. 核函数

如果数据集就是线性不可分的应该怎么处理呢?处理方法是将数据集映射到更高维的空间,变成线性可分的。如下图所示:
svm_kernel_demo

一般使用高斯核,但这样会导致映射后的维度非常巨大,也就是 w 的维度很大,这也是为什么要转化为对偶问题来求解的原因,对偶问题的时间复杂度只和数据集的数量有关,与维度无关。

总结 SVM

SVM 是神经网络出现之前最好的分类算法。求解 SVM 的过程也就是找到区分正负数据的最优超平面,所以引入了几何间隔的概念。而求解最大的几何间隔的问题即是在不等式约束条件下求解最优解的问题。这就引入了拉格朗日对偶问题,接着针对对偶问题求解,引入快速学习算法 SMO,最终找到超平面。
对于原始数据线性不可分的情况,引入核函数映射到高维计算,这其中 SVM 求解过程的时间复杂度与维度无关。

附一个很精髓的 SVM十问十答

7. SVM 优缺点

优点:

  1. 可用于线性/非线性分类
  2. 可以解决高维问题,即大型特征空间;
  3. 泛化错误率低
  4. 结果容易解释
  5. 可以避免神经网络结构选择和局部极小点问题
  6. SVM 尽量保持与样本间距离的性质导致它抗攻击的能力更强

缺点:

  1. 对参数调节和和函数的选择敏感,原始分类器不加修改仅适用于处理二分类问题
  2. 在大规模训练样本下,效率不高
  3. 对非线性问题有时很难找到一个合适的核函数
  4. 解决多分类问题存在困难

附录

点到平面的距离

IMAGE

$d$ 维空间中的超平面由下面的方程确定: $w^Tx+b=0$,其中,$w$ 与 $x$ 都是 $d$ 维列向量, $x=(x_1,x_2,…,x_d)^T$ 为平面上的点, $w=(w_1,w_2,…,w_d)^T$为平面的法向量。$b$ 是一个实数, 代表平面与原点之间的距离。

Reference

http://blog.csdn.net/v_july_v/article/details/7624837
http://guoze.me/2014/11/26/svm-knowledge/
https://pdfs.semanticscholar.org/59ee/e096b49d66f39891eb88a6c84cc89acba12d.pdf
http://luojinping.com/2018/03/04/%E6%8B%89%E6%A0%BC%E6%9C%97%E6%97%A5%E4%B9%98%E5%AD%90%E6%B3%95/

1. 拉格朗日乘子法

1.1 定义

在数学中的最优化问题中,拉格朗日乘数法(以数学家约瑟夫·拉格朗日命名)是一种寻找多元函数在其变量受到一个或多个条件的约束时的极值的方法。这种方法可以将一个有n个变量与k个约束条件的最优化问题转换为一个解有n + k个变量的方程组的解的问题。这种方法中引入了一个或一组新的未知数,即拉格朗日乘数,又称拉格朗日乘子,或拉氏乘子,它们是在转换后的方程,即约束方程中作为梯度(gradient)的线性组合中各个向量的系数。

比如,要求 $f(x,y) 在 g(x,y)=c$ 时的最大值时,我们可以引入新变量拉格朗日乘数 $L(x,y,λ)=f(x,y) + λ(g(x,y)-c)$,更一般地,对含n个变量和k个约束的情况,有:
$$L(x_1,x_2, …, x_n, λ_1,λ_2,…,λ_n)=f(x_1,x_2, …, x_n) - \sum_{i=1}^{k}λ_ig_i(x_1,x_2, …, x_n)$$

拉格朗日乘数法所得的极点会包含原问题的所有极值点,但并不保证每个极值点都是原问题的极值点。拉格朗日乘数法的正确性的证明牵涉到偏微分,全微分或链法。

1.2 用处

现要解决以下问题,在满足 g(x,y)=c 这个等式的前提下,求 f(x,y) 函数的最小值(最大值道理相同)。这样的问题我们在高中的时候就遇到过了,只不过高中时遇到的限制条件 g(x,y)=c 都比较简单,一般而言都可以将 y 用 x 的式子表示出来,然后用变量替换的方法代回 f(x,y) 求解。但是,如果 g(x,y) 的形式过于复杂,或者变量太多时,这种方法就失效了。而拉格朗日乘子法就是解决这类问题的通用策略。

2. 等式约束条件的优化问题

2.1 解法

设 $f(x,y)=x^2+y^2$, 约束条件为: $g(x,y)=xy−1=0$。
将三维的 $ f(x,y)$ 图像投影到二维平面上,为下图中红色曲线,也称为 $f(x,y)$ 的等高线。g(x,y) 为图中蓝色曲线。

IMAGE

沿着蓝线往内部的圆走,经过橙色点,此时不是最优解,当走到黑色点时,找到了最优解。此时可认为找到了在蓝线这个限制条件下 $f(x,y )$ 的最低点。

拉格朗日观察到,黑点位置,蓝线与圆是相切的,而橙点位置显然不满足这个性质。且拉格朗日还指出黑点位置蓝线与圆一定是相切的,这正是拉格朗日乘子法的核心。

在最低点,蓝线的切线方向都指向橙线的等高线方向。换句话说,在切点的位置沿蓝线移动很小的一步,都相当于在橙线的等高线上移动,这个时候,可以认为函数值已经趋于稳定了。所以,我们认为这个点的值“可能”是最低(高)的。
相切,意味着在切点的位置,等高线和蓝色曲线的等高线方向是平行的,考虑到梯度与等高线垂直,我们可以用两条曲线的梯度平行来求出切点位置(最低点)。

所以有:∇f=λ∇g,其中 λ 表示一个标量,因为我们虽然能保证两个梯度平行,但不能保证它们的长度一样(或者方向相同)。在高维函数中,∇f 表示的是函数在各个自变量方向的偏导。对于上面的例子,我们可以求出函数 f 和 g 的偏导,再根据方程组(1):
$\frac{∂f}{∂x}=λ\frac{∂g}{∂x}$
$\frac{∂f}{∂y}=λ\frac{∂g}{∂y}$
$g(x,y)=0$

求解时,使用一个统一的拉格朗日函数:$L(x,y,λ)=f(x,y)+λg(x,y)$,令这个函数偏导为 0,可以得到方程组(2):
$\frac{∂L}{∂x}=\frac{∂f}{∂x}−λ\frac{∂g}{∂x}=0$
$\frac{∂L}{∂y}=\frac{∂f}{∂y}−λ\frac{∂g}{∂y}=0$
$\frac{∂L}{∂λ}=g(x,y)=0$

可以发现方程组(2)与方程组(1)一样,联立以上三式即可求出 x, y, λ 的值。

如果是多个约束条件,则拉格朗日函数为:
$L(x_1,…,x_n,λ_1,…,λ_k)=f(x1,…,xn)−\sum_{j=1}^{k}λ_jg_j(x_1,…,x_n)$

2.2 进一步理解

二维情况下的另一个图示,便于理解两个曲线的梯度变化情况:
IMAGE

根据拉格朗日乘子法的定义,这是一种寻找极值的策略,换句话说,该方法并不能保证找到的一定是最低点或者最高点。事实上,它只是一种寻找极值点的过程,而且,拉格朗日乘子法找到的切点可能不只一个(也就是上面的方程组可能找到多个解),例如下图:
IMAGE

所以联立方程组得到的解为所有极值点,是最优解的必要条件,具体是否为极值点需根据问题本身的具体情况检验. 这个方程组称为等式约束的极值必要条件。如果是凸函数,可以保证最优解是存在的。


已经解决的在等式约束条件下的求函数极值的问题,那不等式约束条件下,应该如何解决呢?

3. 凸优化

在开始不等式约束条件下求解函数极值之前,先了解凸优化和对偶问题。

3.1 凸优化的意义

  1. 其应用非常广泛,机器学习中很多优化问题都要通过凸优化来求解;
  2. 在非凸优化中,凸优化同样起到重要的作用,很多非凸优化问题,可以转化为凸优化问题来解决;
  3. 如上引用所述,凸优化问题可以看作是具有成熟求解方法的问题,而其他优化问题则未必。

而在最优化中,凸优化是最为常见而又最为重要的,因为凸优化有一个良好的性质:局部最优是全局最优,这个性质使得我们不需要去证明解是否会收敛到全局最优,或者如何避免局部最优。因此凸优化有广泛应用,在优化问题不是凸的时候,往往也会尝试将其变为凸问题便于求解。

3.2 相关定义

凸集:定义目标函数和约束函数的定义域。
凸函数:定义优化相关函数的凸性限制。
凸优化:中心内容的标准描述。
凸优化问题求解:核心内容。相关算法,梯度下降法、牛顿法、内点法等。

3.3 对偶问题

对偶问题是优化问题中非常重要的方法,将一般优化问题转化为凸优化问题,是求解非凸优化问题的有效方法。
IMAGE

对于一般的优化问题,不管其是不是凸的,其对偶问题一定是凸优化问题。

4. 不等式约束条件的优化问题

4.1 广义拉格朗日函数

有不等式约束条件的最优化问题描述如下:
$min.:f(x)$
$s.t.:$
$g_i(x) ≤ 0, i=1,2,…,p,$
$h_j(x) = 0, j=1,2,…,q,$
$x∈Ω⊂Rn$
其中. $f(x)$为目标函数, $g_i(x)≤0,i=1,2,…,p$ 为不等式约束条件, $h_j(x)=0,k=1,2,…,q$为等式约束条件。

引入广义拉格朗日函数:
$$L(x,λ,μ)=f(x)+\sum_{i=1}^{p}μ_ig_i(x)+\sum_{j=1}^{q}λ_jh_j(x)$$
其中,$μ_i\ge0$。

4.2 定义原始问题最优解

现在,如果把 $L(x,λ,μ)$ 看作是关于 $λ,μ$ 的函数,经过优化(不管用什么方法),就是确定的值使得 $L(x,λ,μ)$ 取得最大值(此过程中把 $x$ 看做常量),确定了 $λ,μ$ 的值,就可以得到 $L(x,λ,μ)$ 的最大值,因为 $λ,μ$ 已经确定,显然 $L(x,λ,μ)$ 的最大值就是只和 $x$ 有关的函数,定义这个函数为:
$$\theta_p(x)={max}_{λ,μ:μ_i\ge0}L(x,λ,μ)=L(x,λ,μ)$$

那么定义 $\theta_p(x)$ 的意义在哪里呢?在于其值就是 f(x),也就是说 $\theta_p(x)=f(x)$。下面证明这个等式。

  1. 如果 $x$ 违反了约束条件,即 $g_i(x) > 0$ 或者 $h_j(x) \neq 0$,则容易存在 $μ_i -> +∞$,也容易存在 $λ_j$ 使得 $λ_jh_j(x) -> +∞$,这样显然 $\theta_p(x)$ 是没有最大值的。
  2. 如果 $x$ 满足了约束条件,则 $\theta_p(x)={max}_{λ,μ:μ_i\ge0}L(x,λ,μ)={max}_{λ,μ:μ_i\ge0}f(x)=f(x)$,因为 ${max}_{λ,μ:μ_i\ge0}f(x)$ 与 $λ,μ$ 没有关系,所以其等于 $f(x)$。

所以当 $x$ 满足约束条件时,$min_x\theta_p(x)=min_xf(x)$,我们定义 $p^*=min_x\theta_p(x)$ 代表原始问题的最优解。

4.3 定义对偶问题

定义关于 $λ,μ$ 的函数:
$$\theta_D(λ,μ)={min}_xL(x,λ,μ)$$
其中 ${min}_xL(x,λ,μ)$ 是关于 $x$ 的函数的最小化,确定 $x$ 以后,最小值就只与 $λ,μ$ 有关,所以是一个关于 $λ,μ$ 的函数。

考虑极大化 $\theta_D(λ,μ)$ 即 $${max}_{λ,μ:μ_i\ge0}\theta_D(λ,μ)={max}_{λ,μ:μ_i\ge0}{min}_xL(x,λ,μ)$$,与原始问题最优解的定义很相似,形式上是对称的。定义对偶问题的最优解 $d^*={max}_{λ,μ:μ_i\ge0}\theta_D(λ,μ)$

4.4 对偶问题与原始问题的关系

很显然有 $d^* \le p^*$,当两个问题的最优解相等时,即 $x^*, λ^*, μ^*$ $d^* = p^*$,称原问题和对偶问题是强对偶的,否则是弱对偶的。

所以为了通过求对偶问题来求原问题,我们希望他们是强对偶的。

当满足以下条件时他们是强对偶的:

  1. 原始问题是凸优化问题的时候
  2. 原始问题满足 Slater 条件(这一条还可以换成其他条件)
  3. 最优解满足 KKT 条件

Slater 条件

若原始问题为凸优化问题,且存在严格满足约束条件的点 x,这里的“严格”是指 $g_i(x)≤0$ 中的“≤”严格取到“<”,即存在 x 满足 $g_i(x)<0 ,i=1,2,…,n$,则存在 $x^∗,α^∗,β^∗$ 使得 $x^∗$ 是原始问题的解, $α^∗,β^∗$ 是对偶问题的解,且满足:
$p^∗=d^∗=L(x^∗,α^∗,β^∗)$

也就是说如果原始问题是凸优化问题并且满足 Slater 条件的话,那么强对偶性成立。需要注意的是,这里只是指出了强对偶成立的一种情况,并不是唯一的情况。例如,对于某些非凸优化的问题,强对偶也成立。SVM 中的原始问题 是一个凸优化问题(二次规划也属于凸优化问题),Slater 条件在 SVM 中指的是存在一个超平面可将数据分隔开,即数据是线性可分的。当数据不可分时,强对偶是不成立的,这个时候寻找分隔平面这个问题本身也就是没有意义了,所以对于不可分的情况预先加个 kernel 就可以了。

4.5 KKT条件

KKT 条件(Karush-Kuhn-Tucker Conditions),是指在满足一些有规则的条件下, 一个非线性规划(Nonlinear Programming)问题能有最优化解法的一个必要和充分条件,这是一个广义化拉格朗日乘数的成果。

可以发现最优解要么在 $g_i(x) = 0$ 上,要么在 $g_i(x) < 0$ 范围内,如下图:
IMAGE

定义不等式约束下的拉格朗日函数L:
$$L(x,λ,μ)=f(x)+\sum_{i=1}^{p}μ_ig_i(x)+\sum_{j=1}^{q}λ_jh_j(x)$$

所谓 KKT 最优化条件,就是指上式的最优解$x^*$必须满足下面的条件:

  1. 约束条件满足 $g_i(x^*)≤0,i=1,2,…,p$, 以及 $h_j(x^*)=0,j=1,2,…,q$
  2. $∇f(x^*)+\sum_{i=1}^{p}μ_i∇g_i(x^*)+\sum_{j=1}^{q}λ_j∇h_j(x^*)=0$, 其中$∇$为梯度算子,也就是 L(x,u,λ) 对 x 求导为 0
  3. $λ_j≠0$ 且不等式约束条件满足 $μ_i≥0$, $μ_ig_i(x^*)=0,i=1,2,…,p$,因为 $g(x)<=0$,如果要满足这个等式,必须 $μ_i=0$ 或者 $g(x)=0$

第三个式子非常有趣,因为g(x)<=0,如果要满足这个等式,必须$μ_i=0$或者g(x)=0,这是 SVM 的很多重要性质的来源,如支持向量的概念。

KKT条件的推导

IMAGE

IMAGE

4.6 总结

关于不等式约束条件的优化问题,原问题分两种情况:

  • 原问题是一个凸优化问题,则可以直接应用 KKT 条件来求解
  • 原问题不是一个凸优化问题,则通过对偶问题转化为凸优化问题,在满足强对偶的条件下应用 KKT 条件求解

Reference

http://jermmy.xyz/2017/07/27/2017-7-27-understand-lagrange-multiplier/
https://www.zhihu.com/question/38586401/answer/134473412
https://zhuanlan.zhihu.com/p/27731819
http://blog.csdn.net/Mr_KkTian/article/details/53750424
http://www.cnblogs.com/mo-wang/p/4775548.html
http://www.hanlongfei.com/convex/2015/11/05/duality/
https://www.cnblogs.com/90zeng/p/Lagrange_duality.html
http://bioinfo.ict.ac.cn/~dbu/AlgorithmCourses/Lectures/KKT-examples.pdf

连续

设 $y=f(x)$ 在 $x_0$ 有定义,若
$\displaystyle\lim_{x\to x_0}{f(x)}=f(x_0)$
则函数 $f(x)$ 在点 $x_0$ 处连续。

另外一种定义:
在点 $x_0$ 附近,如果自变量的该变量是无穷小时,对应的因变量的该变量也是无穷小,则这个函数在点 $x_0$ 处连续。

通俗地说,所谓“连续”,就是不间断。放到函数上,就是没有“断点”(但是可以有“拐点”)。

二元函数的连续性定义与一元函数类似。

导数

定义

导数(Derivative)是微积分学中重要的基础概念。一个函数在某一点的导数描述了这个函数在这一点附近的变化率。导数的本质是通过极限的概念对函数进行局部的线性逼近。

导数的几何定义是:曲线上的纵坐标对点的横坐标的导数是曲线在该点的切线的斜率。

导数的物理意义之一是:位移s对时间t的导数是瞬时速度,即 $v=\frac{ds}{dt}$

可导

函数可导定义:
(1)设 f(x)在 $x_0$ 及其附近有定义,则当 $\displaystyle\lim_{\Delta{x}\to0}{\frac{f(x_0+\Delta{x})-f(x_0)}{a}}$ 存在, 则称 f(x) 在 $x_0$ 处可导。
(2)若对于区间 (a,b) 上任意一点 (x,f(x)) 均可导,则称 f(x) 在 (a,b) 上可导。

左导数:$\displaystyle\lim_{\Delta{x}\to0^-}{\frac{f(x_0+\Delta{x})-f(x_0)}{a}}$
右导数:$\displaystyle\lim_{\Delta{x}\to0^+}{\frac{f(x_0+\Delta{x})-f(x_0)}{a}}$

连续函数可导条件:函数在该点的左右偏导数都存在且相等。
即就是一个函数在某一点求极限,如果极限存在,则为可导,若所得导数等于函数在该点的函数值,则函数为连续可导函数,否则为不连续可导函数。

微分

在数学中,微分是对函数的局部变化率的一种线性描述。微分可以近似地描述当函数自变量的取值作足够小的改变时,函数的值是怎样改变的。

定义

若 y=f(x) 在处可导,则函数的增量,
$\Delta{y}=f(x+\Delta{x})-f(x)=f’(x)\Delta{x}+\alpha(\Delta{x})\Delta{x}$
其中 $\alpha(\Delta{x})$ 是$\Delta{x}(\Delta{x} \to 0)$ 的高阶无穷小。
称 $f’(x)\Delta{x}$ 为 $f(x)$ 在 $x$ 处的微分。
而 $f’(x)$ 是 $f(x)$ 在 $x$ 处的导数。

可微

设函数$y= f(x)$,若自变量在点x的改变量Δx与函数相应的改变量Δy有关系$Δy=AΔx+ο(Δx)$
其中A与Δx无关,则称函数f(x)在点x可微,并称AΔx为函数f(x)在点x的微分,记作dy,即$dy=AΔx$
当$x=x_0$时,则记作$dy∣x=x_0$.

可微条件:
必要条件:若函数在某点可微,则该函数在该点对x和y的偏导数必存在。
充分条件:若函数对x和y的偏导数在这点的某一邻域内都存在,且均在这点连续,则该函数在这点可微。

连续但不可微的例子:

魏尔斯特拉斯函数连续,但在任一点都不可微

连续,可导,可微的关系

  • 一元函数:可导必然连续,连续推不出可导,可导与可微等价。
  • 多元函数:可偏导与连续之间没有联系,也就是说可偏导推不出连续,连续推不出可偏导。
  • 多元函数中可微必可偏导,可微必连续,可偏导推不出可微,但若一阶偏导具有连续性则可推出可微。

全微分

对于多元函数,如果两个自变量都变化的话,这时候函数的微分就称为全微分。如果函数z=f(x,y)在(x,y)处的全增量$\Delta z=f(x+\Delta x,y+\Delta y)-f(x,y)$可以写成$\Delta z=A\Delta x+B\Delta y+o(\rho)$,
取其线性主部,称为二元函数的全微分 $\Delta z=A\Delta x+B\Delta y$

可以证明全微分又可写成$dz=f_x dx+f_ydy =\frac{\partial z}{\partial x}dx+\frac{\partial z}{\partial y}dy$

上式又称为全微分的叠加原理。可以这么理解:全增量包含两个部分,$\Delta x$引起的函数值增量和$\Delta y$引起的函数值增量。

一句话感性说全微分其实就在表达曲面在某一点处的切平面。

梯度

梯度的本意是一个向量(矢量),表示某一函数在该点处的方向导数沿着该方向取得最大值,即函数在该点处沿着该方向(此梯度的方向)变化最快,变化率最大(为该梯度的模)。

例如:在一元函数 f(x) 中,梯度只能沿 x 轴正方向或负方向,而在二元函数 f(x,y) 中,梯度则是一个二维向量 (∂f/∂x,∂f/∂y)。

梯度一个重要的性质:梯度跟函数等高线是垂直的。
证明:
假设 Δx,Δy 是两个极小的变化量,根据全微分的知识,可以得到:
f(x+Δx,y+Δy)≈f(x,y)+∂f∂xΔx+∂f∂yΔy
如果 (Δx,Δy) 是在等高线方向的增量,那么 f(x+Δx,y+Δy)≈f(x,y),这意味着 ∂f∂xΔx+∂f∂yΔy=0,换句话说,向量 ∇f 和向量 (Δx,Δy) 的内积为 0。所以,梯度和函数的等高线是垂直的。

Referen

http://jermmy.xyz/2017/07/27/2017-7-27-understand-lagrange-multiplier/

概率论在机器学习中扮演着一个核心角色,因为机器学习算法的设计通常依赖于对数据的概率假设。

随机变量在概率论中扮演着一个重要角色。最重要的一个事实是,随机变量并不是变量,它们实际上是将(样本空间中的)结果映射到真值的函数。我们通常用一个大写字母来表示随机变量。

条件分布

条件分布为概率论中用于探讨不确定性的关键工具之一。它明确了在另一随机变量已知的情况下(或者更通俗来说,当已知某事件为真时)的某一随机变量的分布。

正式地,给定$Y=b$时,$X=a$的条件概率定义为:
$$P(X=a|Y=b)= \frac{P(X=a,Y=b)}{P(Y=b)}$$

其中,$P(Y=b)>0$

独立性

在概率论中,独立性是指随机变量的分布不因知道其它随机变量的值而改变。在机器学习中,我们通常都会对数据做这样的假设。例如,我们会假设训练样本是从某一底层空间独立提取;并且假设样例i的标签独立于样例j(i≠j)的特性。
从数学角度来说,随机变量X独立于Y,当:

P(X)=P(X|Y)

注意,上式没有标明X,Y的取值,也就是说该公式对任意X,Y可能的取值均成立。)
利用等式(2),很容易可以证明如果X对Y独立,那么Y也独立于X。当X和Y相互独立时,记为X⊥Y。
对于随机变量X和Y的独立性,有一个等价的数学公式:
P(X,Y)=P(X)P(Y)

我们有时也会讨论条件独立,就是当我们当我们知道一个随机变量(或者更一般地,一组随机变量)的值时,那么其它随机变量之间相互独立。正式地,我们说“给定Z,X和Y条件独立”,如果:
P(X|Z)=P(X|Y,Z)

或者等价的:
P(X,Y|Z)=P(X|Z)P(Y|Z)

链式法则

我们现在给出两个与联合分布和条件分布相关的,基础但是重要的可操作定理。第一个叫做链式法则,它可以看做等式(2)对于多变量的一般形式。
定理1(链式法则):

P(X1,X2,…,Xn)=P(X1)P(X2|X1)…P(Xn|X1,X2,…,Xn−1)…………(3)

链式法则通常用于计算多个随机变量的联合概率,特别是在变量之间相互为(条件)独立时会非常有用。注意,在使用链式法则时,我们可以选择展开随机变量的顺序;选择正确的顺序通常可以让概率的计算变得更加简单。
第二个要介绍的是贝叶斯定理。利用贝叶斯定理,我们可以通过条件概率P(Y|X)计算出P(X|Y),从某种意义上说,就是“交换”条件。它也可以通过等式(2)推导出。

条件概率

条件概率

如果 A,B 是条件组 S 下的随机事件,事件 A 发生的概率随事件 B 是否发生而变化,同样,事件 B 发生的概率也随事件 A 是否发生而变化。
事件 A 在另外一个事件 B 已经发生条件下的发生概率称为条件概率,表示为P(A|B),读作「在 B 条件下 A 的概率」。
当 P(B) > 0 时,有:
$$P(A|B)= \frac{P(AB)}{P(B)}$$

P.S. 如果 A,B 是独立事件,则 A 发生的概率与 B 无关,那么 $P(A|B) = P(A)$,并且 $P(AB)=P(A)P(B)$。

联合概率

联合概率表示两个事件共同发生的概率。A 与 B 的联合概率表示为
$P(A\cap B)$ 或者 ${\displaystyle P(A,B)}$ 或者 $P(A,B)$。

边缘概率

边缘概率是某个事件发生的概率。边缘概率是这样得到的:在联合概率中,把最终结果中不需要的那些事件合并成其事件的全概率而消失(对离散随机变量用求和得全概率,对连续随机变量用积分得全概率)。这称为边缘化(marginalization)。A的边缘概率表示为$P(A)$,B的边缘概率表示为$P(B)$。

全概率公式

law of total probability

贝叶斯定理

$$P(X|Y)=\frac{P(Y|X)P(X)}{P(Y)}$$

https://zh.wikipedia.org/wiki/%E6%9D%A1%E4%BB%B6%E6%A6%82%E7%8E%87
http://blog.csdn.net/u012566895/article/details/51220127
http://www.cnblogs.com/leoo2sk/archive/2010/09/17/naive-bayesian-classifier.html

生成学习算法介绍

有监督机器学习可以分为判别学习算法(generative learning algorithm)和生成学习算法(discriminative learning algorithm)。

  • 判别学习算法常见的有:逻辑回顾,支持向量机等。
  • 生成学习算法常见的有:混合高斯模型、朴素贝叶斯法和隐形马尔科夫模型等。

判别学习算法是直接学习 p(y|x) 或者是从输入直接映射到输出的算法。

生成学习算法是计算变量x在变量y上的条件分布p(x|y)和变量y的分布p(y) ,然后使用贝叶斯公式: $p(y|x)=\frac{p(x,y)}{p(x)}=\frac{p(y)*p(x|y)}{p(x)}$ 计算出p(y|x)。

针对课程中提到的两种生成学习算法中,高斯判别分析(Gaussian Discriminant Analysis)和朴素贝叶斯(Navie Bayes)分别解决了两种场景下的问题。
GDA 是针对的是特征向量 X 为连续值时的问题,而 Navie Bayes 则针对的是特征向量为离散值时的问题。

高斯判别分析

多维正态分布(The multivariate normal distribution)

假设随机变量 $X$ 满足 $n$ 维的多项正态分布,参数为均值向量 $μ ∈ R^{n} $,协方差矩阵$Σ ∈ R^{n×n}$,记为 $N(μ,Σ)$ 其概率密度表示为:

$$p(x;μ,Σ)=\frac{1}{(2π)^{\frac{n}2}(detΣ)^{\frac12}}exp(−\frac12(x−μ)^TΣ^{−1}(x−μ))$$

$detΣ$ 表示矩阵 $Σ$ 的行列式(determinant)。
均值向量: $μ$
协方差矩阵: $Σ=E[(X−E[X])(X−E[X])T]=E[(x−μ)(x−μ)T]$

高斯判别分析

GDA 模型针对的是输入特征为连续值时的分类问题,这个模型的基本假设是目标值 y 服从伯努利分布(0-1分布),条件概率 P(x|y) 服从多元正态分布((multivariate normal distribution)),即:
$y∼Bernoulli(\phi)$
$P(x|y=0)∼N(μ_0,\Sigma)$
$P(x|y=1)∼N(μ_1,\Sigma)$

它们的概率密度为:
$$p(y)=\phi^y(1−\phi)^{1−y}$$
$$p(x|y=0)=\frac1{(2π)^{n/2}|\Sigma|^{1/2}}exp(−\frac12(x−μ_0)^T\Sigma^{−1}(x−μ_0))$$
$$p(x|y=1)=\frac1{(2π)^{n/2}|\Sigma|^{1/2}}exp(−\frac12(x−μ_1)^T\Sigma^{−1}(x−μ_1))$$

我们模型的参数包括,$\phi,\Sigma,μ_0,μ_1$ 注意到,我们使用了两种不同的均值向量$μ_0$和$μ_1$,但是使用了同一种协方差矩阵 $\Sigma$, 则我们的极大似然函数的对数如下所示:
$$L(\phi,μ_0,μ_1,\Sigma)=log\Pi_{i=1}^mp(x^{(i)},y^{(i)};\phi,μ_0,μ_1,\Sigma)$$
$$=log\Pi_{i=1}^mp(x^{(i)}|y^{(i)};\phi,μ_0,μ_1,\Sigma)p(y^{(i)};\phi)$$

对极大似然函数对数最大化,我们就得到了GDA模型各参数的极大虽然估计(略)。

GDA 与 LR

前面我们提到:
$${argmax}_yp(y|x)={argmax}_y\frac{p(x|y)p(y)}{p(x)}={argmax}_yp(x|y)p(y)$$
我们有:
$$p(y=1|x)=\frac{p(x|y=1)p(y=1)}{p(x|y=1)p(y=1)+p(x|y=0)p(y=0)}$$
上式实际上可以表示成logistic函数的形式:
$$p(y=1|x;ϕ,μ0,μ1,Σ)=\frac1{1+exp(−θ^TX)}$$
其中,θ是参数ϕ,μ0,μ1,Σθ是参数ϕ,μ0,μ1,Σ某种形式的函数。GDA的后验分布可以表示logistic函数的形式。
 
下图为用 GDA 对两类样本分别拟合高斯概率密度函数p(x|y=0)和p(x|y=1),得到两个钟形曲线。沿x轴遍历样本,在x轴上方画出相应的p(y=1|x)。如选x轴靠左的点,那么它属于1的概率几乎为0,p(y=1|x)=0,两条钟形曲线交点处,属于0或1的概率相同,p(y=1|x)=0.5,x轴靠右的点,输出1的概率几乎为1,p(y=1|x)=1。最终发现,得到的曲线和sigmoid函数曲线很相似。
gaussian discriminant analysis probability distribution
 
 
实际上,可以证明,不仅仅当先验概率分布服从多变量正态分布时可以推导出逻辑回归的模型,当先验分布属于指数分布簇中的任何一个分布,如泊松分布时,都可以推导出逻辑回归模型。而反之不成立,逻辑回归的先验概率分布不一定必须得是指数分布簇中的成员。基于这些原因,在实践中使用逻辑回归比使用GDA更普遍。

生成学习算法比判决学习算法需要更少的数据。如GDA的假设较强,所以用较少的数据能拟合出不错的模型。而逻辑回归的假设较弱,对模型的假设更为健壮,拟合数据需要更多的样本。

朴素贝叶斯

考虑自变量比较多的情况,比如垃圾邮件的识别需要检测成百上千甚至上万的字符是否出现,如有免费购买等类似的词出现的邮件很大可能是垃圾邮件。这种情况下若有k个自变量,考虑各变量之间的交互作用就需要计算$2^k$次,为了简化计算量对模型作一个更强的假设:
给定因变量 y 的值,各自变量之间相互独立.

所以有
$$p(x_1,…,x_n|y)=p(x_1|y)p(x_2|y,x_1)p(x_3|y,x_1,x_2)…p(x_n|y,x_1,x_2,…,x_{n-1})$$
$$=p(x_1|y)p(x_2|y)p(x_3|y)…p(x_n|y)=\Pi_{i=1}^np(x_i|y)$$

第一个等式是根据通常的概率论得到的,第二个等式是根据贝叶斯假设得到的。虽然贝叶斯假设是个很强的假设,但是实践证明在许多问题上都表现得很好。

参数的极大似然估计及p(y|x)的推导过程略。

拉普拉斯平滑

拉普拉斯平滑(Laplace Smoothing)又称为加1平滑。平滑方法的存在是为了解决零概率问题。

所谓的零概率问题,就是在计算新实例的概率时,如果某个分量在训练集中从没出现过,会导致整个实例的概率计算结果为0,针对文本分类问题就是当一个词语在训练集中没有出现过,那么该词语的概率为0,使用连乘计算文本出现的概率时,整个文本出现的概率也为0,这显然不合理,因为不能因为一个事件没有观测到就判断该事件的概率为0.

Reference

http://xtf615.com/2017/03/25/%E7%94%9F%E6%88%90%E7%AE%97%E6%B3%95/
http://blog.csdn.net/v1_vivian/article/details/52190572
http://www.cnblogs.com/mikewolf2002/p/7763475.html

Problem Description

Given a binary tree, find the maximum path sum.

For this problem, a path is defined as any sequence of nodes from some starting node to any node in the tree along the parent-child connections. The path must contain at least one node and does not need to go through the root.

For example:
Given the below binary tree,

1
2
3
1
/ \
2 3

Return 6.

problem link:
https://leetcode.com/problems/binary-tree-maximum-path-sum/description/

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Solution {
int maxValue;
public int maxPathSum(TreeNode root) {
maxValue = Integer.MIN_VALUE;
maxPathDown(root);
return maxValue;
}
private int maxPathDown(TreeNode node) {
if (node == null) return 0;
int left = Math.max(0, maxPathDown(node.left));
int right = Math.max(0, maxPathDown(node.right));
maxValue = Math.max(maxValue, left + right + node.val);
return Math.max(left, right) + node.val;
}
}

解题思路

每一个结点可以选和不选,处理方法就是:int left = Math.max(0, maxPathDown(node.left));,其中的 Math.max(0, x),当取值为 0 时就是不取这个结点。

全局变量 maxValue 就覆盖了子树中的 ^ 这种类型,例如子树如下:

1
2
3
x
a y
b c

则 b->a->c 这种路径的最大值被 maxValue 保存了。而 b->a->x->y 这种经过根节点的路径被 Math.max(left, right) + node.val; 覆盖了。

Problem Description

Given n non-negative integers representing an elevation map where the width of each bar is 1, compute how much water it is able to trap after raining.

For example,
Given [0,1,0,2,1,0,1,3,2,1,2,1], return 6.

problem link:
https://leetcode.com/problems/trapping-rain-water

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TrappingRainWater_42 {
public int trap(int[] height) {
int a = 0;
int b = height.length - 1;
int max = 0;
int leftMax = 0;
int rightMax = 0;
while (a <= b) {
leftMax = Math.max(leftMax, height[a]);
rightMax = Math.max(rightMax, height[b]);
if (leftMax < rightMax) {
// leftMax is smaller than rightMax, so the (leftMax-A[a]) water can be stored
max += (leftMax - height[a]);
a++;
} else {
max += (rightMax - height[b]);
b--;
}
}
return max;
}
}

算法解释

对任意位置 i,在 i 上的积水,由左右两边最高的 bar 决定。

说明: kafka 版本号为 0.11.0

Consumer 拉取消息的实现

在 Kafka Consumer 正常消费时,观察其调用堆栈。

1
2
3
4
5
6
7
8
9
10
11
"pool-16-thread-7" #154 prio=5 os_prio=0 tid=0x00007ff581c8c000 nid=0x326d runnable [0x00007ff5468e7000]
java.lang.Thread.State: RUNNABLE
...
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2e04f90> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
...

对应的代码实现是 org.apache.kafka.clients.consumer.KafkaConsumer#poll,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public ConsumerRecords<K, V> poll(long timeout) {
...
try {
...
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}

其中 org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
...
// ConsumerCoordinator coordinator;
coordinator.poll(time.milliseconds(), timeout);
...
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
...
return fetcher.fetchedRecords();
}

所以可以看到 consumer 每次 poll 时是先从 fetcher 中 fetchedRecords 的,如果拿不到结果,就新发起一个 sendFetches 请求。

Consumer 拉取消息的数量

org.apache.kafka.clients.consumer.internals.Fetcher#fetchedRecords 可以看到 maxPollRecords(max.poll.records 配置) 变量限制了每次 poll 的消息条数,不管 consumer 对应多少个 partition,从所有 partition 拉取到的消息条数总和不会超过 maxPollRecords

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches 可以看到 fetchSize(max.partition.fetch.bytes 配置) 用于每次创建 FetchRequest 时的 org.apache.kafka.common.requests.FetchRequest.PartitionData 的参数设置。fetchSize限制了 consumer 每次从每个 partition 拉取的数据量。
不过,还是看代码中的 ConsumerConfig#MAX_PARTITION_FETCH_BYTES_DOC 说明吧:

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See “ + FETCH_MAX_BYTES_CONFIG + “ for limiting the consumer request size.

poll 和 fetch 的关系

在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

Consumer 的心跳机制

org.apache.kafka.clients.consumer.internals.AbstractCoordinat 中启动 HeartbeatThread 线程来定时发送心跳和检查 consumer 的状态。
每个 Consumer 都有一个 ConsumerCoordinator(继承 AbstractCoordinator),每个 ConsumerCoordinator 都启动一个 HeartbeatThread 线程来维护心跳,心跳信息存放在 org.apache.kafka.clients.consumer.internals.Heartbeat

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}
public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

sessionTimeout

如果是 sessionTimeout 则 Mark the current coordinator as dead,此时 会将 consumer 踢掉,重新分配 partition 和 consumer 的对应关系。

在 Kafka Server 端,Consumer 的 Group 定义了五个状态::
Consumer Group State

pollTimeout

如果是 pollTimeout 则 Reset the generation and memberId because we have fallen out of the group,此时 consumer 会退出 group,当再次 poll 时又会 rejoin group 触发 rebalance group。

Rebalance Generation

表示 rebalance 之后的一届成员,主要是用于保护 consumer group,隔离无效 offset 提交。每次 group 进行 rebalance 之后,generation 号都会加 1,表示 group 进入到了一个新的版本,下图所示为 consumer 2 退出后 consumer 4 加入时 Rebalance Generation 的过程:
Rebalance Generation

partition 的数量设置

  • 一个 partition 只能被 Consumer Group 中的一个 consumer 消费,因此,为了提高并发量,可以提高 partition 的数量,但是这会造成 replica 副本拷贝的网络请求增加,故障恢复时的耗时增加。因为 kafka 使用 batch pull 的方式,所以单个线程的消费速率还是有保障的。并且 partition 数量过多,zk 维护 ISR 列表负载较重。

  • partiton 数量最好是 consumer 数目的整数倍,比如取 24, consumer 数目的设置就会灵活很多。

  • consumer 消费消息时不时严格有序的。当从多个 partition 读数据时,kafka 只保证在一个 partition 上数据是有序的,多个 partition 的消息消费很可能就不是严格有序的了。

参数设置

heartbeat.interval.ms

心跳间隔。心跳是在 consumer 与 coordinator 之间进行的。心跳是确定 consumer 存活,加入或者退出 group 的有效手段。
这个值必须设置的小于 session.timeout.ms,因为:
当 consumer 由于某种原因不能发 heartbeat 到 coordinator 时,并且时间超过 session.timeout.ms 时,就会认为该 consumer 已退出,它所订阅的 partition 会分配到同一 group 内的其它的 consumer 上。

参数值

默认值:3000 (3s),通常设置的值要低于session.timeout.ms的1/3。

session.timeout.ms

consumer session 过期时间。如果超时时间范围内,没有收到消费者的心跳,broker 会把这个消费者置为失效,并触发消费者负载均衡。因为只有在调用 poll 方法时才会发送心跳,更大的 session 超时时间允许消费者在 poll 循环周期内处理消息内容,尽管这会有花费更长时间检测失效的代价。如果想控制消费者处理消息的时间,

参数值

默认值:10000 (10s),这个值必须设置在 broker configuration 中的 group.min.session.timeout.ms 与 group.max.session.timeout.ms 之间。

max.poll.interval.ms

This config sets the maximum delay between client calls to poll().

When the timeout expires, the consumer will stop sending heartbeats and send an explicit LeaveGroup request.

As soon as the consumer resumes processing with another call to poll(), the consumer will rejoin the group.

By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned frompoll(long). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.

参数设置大一点可以增加两次 poll 之间处理消息的时间。
当 consumer 一切正常(也就是保持着 heartbeat ),且参数的值小于消息处理的时长,会导致 consumer leave group 然后又 rejoin group,触发无谓的 group balance,出现 consumer livelock 现象。

但如果设置的太大,会延迟 group rebalance,因为消费者只会在调用 poll 时加入rebalance。

max.poll.records

Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

0.11.0 Kafka 的默认配置是

  • max.poll.interval.ms=5min
  • max.poll.records=500

即平均 600ms 要处理完一条消息,如果消息的消费时间高于 600ms,则一定要调整 max.poll.records 或 max.poll.interval.ms。

Kafka Javadoc - Detecting Consumer Failures

After subscribing to a set of topics, the consumer will automatically join the group when poll(long) is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.
It is also possible that the consumer could encounter a “livelock” situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don’t call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a CommitFailedException thrown from a call to commitSync()). This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

Reference

Kafka消费组(consumer group)
kafka.apache.org javadoc
Coordinator实现原理
kafka params
kafka源码分析之kafka的consumer的负载均衡管理
Group Management Protocol
Kafka 之 Group 状态变化分析及 Rebalance 过程
KIP-62: Allow consumer to send heartbeats from a background thread
Kafka: The Definitive Guide Chapter 4 - Kafka Consumers

运行环境说明

kafka 版本号为 0.11.0

Kafka Consumer 的参数配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Map<String, Object> getDefaultConsumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
// 手动设置自动提交为false,交由 spring-kafka 启动的invoker执行提交
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 从partition中获取消息最大大小
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "102400");
return propsMap;
}

Consumer 卡顿现象

Consumer 卡顿时的日志

每次卡顿不消费时都出现以下日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11429299 for partition my_topic-27 returned fetch data (error=NONE, highWaterMark=11429299, lastStableOffset = -1, logStartOffset = 10299493, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Added READ_UNCOMMITTED fetch request for partition my_topic-27 at offset 11429299 to node p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-10 org.apache.kafka.clients.consumer.internals.Fetcher - Sending READ_UNCOMMITTED fetch for partitions [my_topic-27] to broker p-kafka-host-03.ali.keep:9092 (id: 6 rack: null)
2017/11/09 19:35:29:DEBUG kafka-coordinator-heartbeat-thread | myConsumerGroup org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending Heartbeat request for group myConsumerGroup to coordinator p-kafka-host-02:9092 (id: 2147483642 rack: null)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]
2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-21] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]
2017/11/09 19:35:29:INFO pool-16-thread-4 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-21]
...
2017/11/09 19:35:29:DEBUG pool-16-thread-4 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 11426689 for partition my_topic-21 returned fetch data (error=NONE, highWaterMark=11426689, lastStableOffset = -1, logStartOffset = 10552294, abortedTransactions = null, recordsSizeInBytes=0)
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group myConsumerGroup committed offset 11429849 for partition my_topic-18
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup
2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ((type: JoinGroupRequest, groupId=myConsumerGroup, sessionTimeout=30000, rebalanceTimeout=300000, memberId=p-my-consumer-host-03-12-97c12fb0-9bb7-4762-8478-538f06be9e90, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@54371fac)) to coordinator p-kafka-02.ali.keep:9092 (id: 2147483642 rack: null)

其中最重要的部分是:

2017/11/09 19:35:29:DEBUG pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heartbeat failed for group myConsumerGroup since it is rebalancing.
2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [my_topic-18] for group myConsumerGroup
2017/11/09 19:35:29:INFO pool-16-thread-13 org.springframework.kafka.listener.ConcurrentMessageListenerContainer - partitions revoked: [my_topic-18]

2017/11/09 19:35:29:INFO pool-16-thread-13 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group myConsumerGroup

那为什么每次会这样呢?我们是有单独的线程在发起心跳的!!!

Consumer 卡顿时的 jstack

观察日志可以发现,卡顿时 ConsumerCoordinator 在不停地 rejoin group,并且做 rebalance,所以需要对比在正常和卡顿这两种情况下 ConsumerCoordinator 的行为。

正常时的 ConsumerCoordinator

1
2
3
cat jstack.normal.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
22 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

卡顿时的 ConsumerCoordinator

1
2
3
4
5
cat jstack.pause.log | grep ConsumerCoordinator -B1 | grep -v ConsumerCoordinator | sort | uniq -c
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
14 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:920)
8 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:931)
32 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:950)

根据以上的现场信息,可以发现关键就在 AbstractCoordinator.ensureActiveGroup 这一步,继续观察 jstack.pause.log 中的相关堆栈信息,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
"pool-16-thread-14" #167 prio=5 os_prio=0 tid=0x00007f5b19dbf000 nid=0x7ac2 runnable [0x00007f5ae4ccb000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c2e816b0> (a sun.nio.ch.Util$2)
- locked <0x00000000c2e816a0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c2e742a0> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:529)
at org.apache.kafka.common.network.Selector.poll(Selector.java:321)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000000c2f00da0> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

卡顿原因分析

卡顿原因:Consumer 在 Region Group

根据以上信息,结合 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 的代码可以发现在
ConsumerCoordinator#poll 中判断 needRejoin() 为 true 时会调用 ensureActiveGroup() 函数,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void poll(long now, long remainingMs) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned()) {
...
if (needRejoin()) {
...
ensureActiveGroup();
...
}
} else {
...
}
}
pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}

Region Group 原因:Consumer Leave Group

那么问题就是什么情况下 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#needRejoin 会返回 true,我们还是看看他的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;
// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;
return super.needRejoin();
}

kafka metadata 什么时候变化????

可以看到,不是 metadataSnapshot 有变化,也不是 订阅者 subscriptions 有变化,那就是 super.needRejoin() 返回了 true,问题就转到了 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#needRejoin 这个函数,其实现是:

1
2
3
protected synchronized boolean needRejoin() {
return rejoinNeeded;
}

从代码上看 rejoinNeeded 的整个变化过程,初始化为 true,在 initiateJoinGroup 成功后,会赋值为 false,在 maybeLeaveGroup 时会赋值为 true,所以怀疑卡顿时是 consumer leave group 了。

Consumer Leave Group 原因:pollTimeoutExpired

org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread#run 中调用了 maybeLeaveGroup() 函数,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
try {
log.debug("Heartbeat thread for group {} started", groupId);
while (true) {
synchronized (AbstractCoordinator.this) {
...
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
...
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
coordinatorDead();
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
// poll again after waiting for the retry backoff in case the heartbeat failed or the
// coordinator disconnected
AbstractCoordinator.this.wait(retryBackoffMs);
} else {
heartbeat.sentHeartbeat(now);
...
}
} // end synchronized
} // end while
} //end try
} // end run

其中最重要的两个 timeout 函数:

1
2
3
4
5
6
7
public boolean sessionTimeoutExpired(long now) {
return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}
public boolean pollTimeoutExpired(long now) {
return now - lastPoll > maxPollInterval;
}

所以是 pollTimeoutExpired 引起了 leave group.

根本原因:pollTimeoutExpired

pollTimeoutExpired 的原因是两次 poll 的时间间隔超过了设置的 maxPollInterval 值。

解决方案

调整以下参数

  • max.poll.records:100 (默认值 500)
  • max.poll.interval.ms:600000 (默认值 300000,也就是5分钟)

后续

至此,问题已经解决了,但是有一些疑问。

  • 对于这两个参数值的设定, 是 max.poll.records 越小越好,max.poll.interval.ms 越大越好吗?
  • 已经设置过的 session.timeout.msheartbeat.interval.ms难道没用吗?为什么有这么多超时参数的设置啊?
  • 已经设置过的 max.partition.fetch.bytes 没用吗?为什么还要设置 max.poll.records 啊?
  • 整体上还需要调哪些参数才可以让 consumer 运行正常,或者是性能达到最大呢?

在下一篇博客「Kafka Consumer 的实现」中,将会继续分析 Kafka Consumer 的消费过程和参数配置,试图回答以上问题。