0%

Java 线程同步原理

java会为每个object对象分配一个monitor,当某个对象的同步方法(synchronized methods )或同步快被多个线程调用时,该对象的monitor将负责处理这些访问的并发独占要求。

当一个线程调用一个对象的同步方法时,JVM会检查该对象的monitor。如果monitor没有被占用,那么这个线程就得到了monitor的占有权,可以继续执行该对象的同步方法;如果monitor被其他线程所占用,那么该线程将被挂起,直到monitor被释放。

当线程退出同步方法调用时,该线程会释放monitor,这将允许其他等待的线程获得monitor以使对同步方法的调用执行下去。

注意:java对象的monitor机制和传统的临界检查代码区技术不一样。java的一个类一个同步方法并不意味着同时只有一个线程独占执行(不同对象的同步方法可以同时执行),但临界检查代码区技术确会保证同步方法在一个时刻只被一个线程独占执行。

java的monitor机制的准确含义是:任何时刻,对一个指定object对象的某同步方法只能由一个线程来调用。

java对象的monitor是跟随object实例来使用的,而不是跟随程序代码。两个线程可以同时执行相同的同步方法,比如:一个类的同步方法是xMethod(),有a,b两个对象实例,一个线程执行a.xMethod(),另一个线程执行b.xMethod(). 互不冲突。

wait(), notify(),notifyAll()

首先看一下Java中java.lang.Object类的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Object {
private static native void registerNatives();

static {
registerNatives();
}
public final native Class<?> getClass();

public native int hashCode();

public boolean equals(Object obj) {
return (this == obj);
}

protected native Object clone() throws CloneNotSupportedException;

public String toString() {
return getClass().getName() + "@" + Integer.toHexString(hashCode());
}

public final native void notify();

public final native void notifyAll();

public final native void wait(long timeout) throws InterruptedException;

public final void wait(long timeout, int nanos) throws InterruptedException {
if (timeout < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (nanos < 0 || nanos > 999999) {
throw new IllegalArgumentException("nanosecond timeout value out of range");
}

if (nanos >= 500000 || (nanos != 0 && timeout == 0)) {
timeout++;
}

wait(timeout);
}

public final void wait() throws InterruptedException {
wait(0);
}

protected void finalize() throws Throwable { }
}

wait()方法是object类的方法,解决的问题是线程间的同步,该过程包含了同步锁的获取和释放,调用wait方法将会将调用者的线程挂起,直到其他线程调用同一个对象的notify()方法才会重新激活调用者。

注意:线程调用notify()之后,只有该线程完全从 synchronized代码里面执行完毕后,monitor才会被释放,被唤醒线程才可以真正得到执行权。

使用:

  • obj.wait()方法使本线程挂起,并释放obj对象的monitor,只有其他线程调用obj对象的notify()或notifyAll()时,才可以被唤醒。
  • obj.notifyAll()方法唤醒所有阻塞在obj对象上的沉睡线程,然后被唤醒的众多线程竞争obj对象的monitor占有权,最终得到的那个线程会继续执行下去,但其他线程继续等待。
  • obj.notify()方法是随机唤醒一个沉睡线程,过程更obj.notifyAll()方法类似。

wait,notify和notifyAll只能在同步控制方法或者同步控制块里面使用,例如:

1
2
3
4
synchronized(x){
x.notify()
//或者wait()
}

以上内容说明了为什么调用wait(),notify(),notifyAll()的线程必须要拥有obj实例对象的monitor占有权。

每个对象实例都有一个等待线程队列。这些线程都是等待对该对象的同步方法的调用许可。对一个线程来说,有两种方法可以进入这个等待线程队列。一个是当其他线程执行同步方法时,自身同时也要执行该同步方法;另一个是调用obj.wait()方法。

当同步方法执行完毕或者执行wait()时,其他某个线程将获得对象的访问权。当一个线程被放入等待队列时,必须要确保可以通过notify()的调用来解冻该线程,以使其能够继续执行下去。

native

native is a java keyword. It marks a method, that it will be implemented in other languages, not in Java. The method is declared without a body and cannot be abstract. It works together with JNI (Java Native Interface).
Native methods were used in the past to write performance critical sections but with java getting faster this is now less common. Native methods are currently needed when

You need to call from java a library, written in another language.
You need to access system or hardware resources that are only reachable from the other language (typically C). Actually, many system functions that interact with real computer (disk and network IO, for instance) can only do this because they call native code.

synchronized关键字简洁、清晰、语义明确,因此即使有了Lock接口,使用的还是非常广泛。其应用层的语义是可以把任何一个非null对象作为”锁”。
synchronized在软件层面依赖JVM,Lock在硬件层面依赖特殊的CPU指令。

JVM如何实现synchronized

在java语言中存在两种内建的synchronized语法:synchronized语句和synchronized方法。
synchronized语句被javac编译成bytecode时,会在同步块的入口位置和退出位置分别插入monitorenter和monitorexit字节码指令。
synchronized方法被javac编译成bytecode时,会被翻译成普通的方法调用和返回指令如:invokevirtual、areturn指令,在VM字节码层面并没有任何特别的指令来实现被synchronized修饰的方法,而是在Class文件的方法表中将该方法的access_flags字段中的synchronized标志位置1,表示该方法是同步方法并使用调用该方法的对象或该方法所属的Class在JVM的内部对象表示Klass做为锁对象。

hotspot当前对synchronized的实现

当前的hotspot共有3种类型的锁,来实现synchronize的语义,之所以有3种,是因为这3种要解决的问题不同,所做的优化也不同。这3种锁分别为biased locking,stack lock,infalted(ObjectMonitor).简单除暴的来讲,从轻量级上来说,biased lock最优,inflated 最差。

synchronized锁住的是什么

synchronized锁定的是对象而非函数或代码。
当synchronized作用在方法上时,锁住的便是对象实例(this);当作用在静态方法时锁住的便是对象对应的Class实例,因为Class数据存在于永久带,因此静态方法锁相当于该类的一个全局锁;当synchronized作用于某一个对象实例时,锁住的便是对应的代码块。
每个对象只有一把锁(Lock)与之关联,当进行到synchronized语句或函数的时候,这把锁就会被当前的线程(thread)拿走,其他的(thread)再去访问的时候拿不到锁就被暂停了。
在HotSpot JVM实现中,锁有个专门的名字:对象监视器。

synchronized的使用场景

  1. public synchronized void method1
    锁住的是该对象,类的其中一个实例,当该对象(仅仅是这一个对象)在不同线程中执行这个同步方法时,线程之间会形成互斥。达到同步效果,但如果不同线程同时对该类的不同对象执行这个同步方法时,则线程之间不会形成互斥,因为他们拥有的是不同的锁。
  2. synchronized(this){ //TODO }
    同一
  3. public synchronized static void method3
    锁住的是该类,当所有该类的对象(多个对象)在不同线程中调用这个static同步方法时,线程之间会形成互斥,达到同步效果,但如果多个线程同时调用method1,method3,则不会引互斥,具体讲看最后讲解。
  4. synchronized(Test.class){ //TODO}
    同三
  5. synchronized(o) {}
    这里面的o可以是一个任何Object对象或数组,并不一定是它本身对象或者类,谁拥有o这个锁,谁就能够操作该块程序代码。

Reference

周志明的《深入理解Java虚拟机》
https://blogs.oracle.com/dave/entry/biased_locking_in_hotspot
http://www.javaworld.com/article/2076971/java-concurrency/how-the-java-virtual-machine-performs-thread-synchronization.html
http://f.dataguru.cn/thread-472518-1-1.html

同步的原理

JVM规范规定JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。代码块同步是使用monitorenter和monitorexit指令实现,而方法同步是使用另外一种方式实现的,细节在JVM规范里并没有详细说明,但是方法的同步同样可以使用这两个指令来实现。monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处, JVM要保证每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个 monitor 与之关联,当且一个monitor 被持有后,它将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁。

Java对象头

锁存在Java对象头里。如果对象是数组类型,则虚拟机用3个Word(字宽)存储对象头,如果对象是非数组类型,则用2字宽存储对象头。在32位虚拟机中,一字宽等于四字节,即32bit。

长度 内容 说明
32/64bit Mark Word 存储对象的hashCode或锁信息等
32/64bit Class Metadata Address 存储到对象类型数据的指针
32/64bit Array length 数组的长度(如果当前对象是数组)

Java对象头里的Mark Word里默认存储对象的HashCode,分代年龄和锁标记位。32位JVM的Mark Word的默认存储结构如下:

25 bit 4bit 1bit
是否是偏向锁
2bit
锁标志位
无锁状态 对象的hashCode 对象分代年龄 0 01

在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。Mark Word可能变化为存储以下4种数据:

几种锁的类型

线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作。

Java SE1.6为了减少获得锁和释放锁所带来的性能消耗,引入了“偏向锁”和“轻量级锁”,所以在Java SE1.6里锁一共有四种状态,无锁状态,偏向锁状态,轻量级锁状态和重量级锁状态,它会随着竞争情况逐渐升级。

锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。

偏向锁

Hotspot的作者经过以往的研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。偏向锁的目的是在某个线程获得锁之后,消除这个线程锁重入(CAS)的开销,看起来让这个线程得到了偏护。

偏向锁的进一步理解

偏向锁的释放不需要做任何事情,这也就意味着加过偏向锁的MarkValue会一直保留偏向锁的状态,因此即便同一个线程持续不断地加锁解锁,也是没有开销的。

另一方面,偏向锁比轻量锁更容易被终结,轻量锁是在有锁竞争出现时升级为重量锁,而一般偏向锁是在有不同线程申请锁时升级为轻量锁,这也就意味着假如一个对象先被线程1加锁解锁,再被线程2加锁解锁,这过程中没有锁冲突,也一样会发生偏向锁失效,不同的是这回要先退化为无锁的状态,再加轻量锁,如图:

另外,JVM对那种会有多线程加锁,但不存在锁竞争的情况也做了优化,听起来比较拗口,但在现实应用中确实是可能出现这种情况,因为线程之前除了互斥之外也可能发生同步关系,被同步的两个线程(一前一后)对共享对象锁的竞争很可能是没有冲突的。对这种情况,JVM用一个epoch表示一个偏向锁的时间戳(真实地生成一个时间戳代价还是蛮大的,因此这里应当理解为一种类似时间戳的identifier),对epoch,官方是这么解释的:

A similar mechanism, called bulk rebiasing, optimizes situations in which objects of a class are locked and unlocked by different threads but never concurrently. It invalidates the bias of all instances of a class without disabling biased locking. An epoch value in the class acts as a timestamp that indicates the validity of the bias. This value is copied into the header word upon object allocation. Bulk rebiasing can then efficiently be implemented as an increment of the epoch in the appropriate class. The next time an instance of this class is going to be locked, the code detects a different value in the header word and rebiases the object towards the current thread.

偏向锁的获取

当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要花费CAS操作来加锁和解锁,而只需简单的测试一下对象头的Mark Word里是否存储着指向当前线程的偏向锁,如果测试成功,表示线程已经获得了锁,如果测试失败,则需要再测试下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁),如果没有设置,则使用CAS竞争锁,如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。

偏向锁的撤销

偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,如果线程不处于活动状态,则将对象头设置成无锁状态,如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Mark Word要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。下图中的线程1演示了偏向锁初始化的流程,线程2演示了偏向锁撤销的流程。

偏向锁的设置

关闭偏向锁:偏向锁在Java 6和Java 7里是默认启用的,但是它在应用程序启动几秒钟之后才激活,如有必要可以使用JVM参数来关闭延迟-XX:BiasedLockingStartupDelay = 0。如果你确定自己应用程序里所有的锁通常情况下处于竞争状态,可以通过JVM参数关闭偏向锁-XX:-UseBiasedLocking=false,那么默认会进入轻量级锁状态。

自旋锁

线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作。同时我们可以发现,很多对象锁的锁定状态只会持续很短的一段时间,例如整数的自加操作,在很短的时间内阻塞并唤醒线程显然不值得,为此引入了自旋锁。

所谓“自旋”,就是让线程去执行一个无意义的循环,循环结束后再去重新竞争锁,如果竞争不到继续循环,循环过程中线程会一直处于running状态,但是基于JVM的线程调度,会出让时间片,所以其他线程依旧有申请锁和释放锁的机会。

自旋锁省去了阻塞锁的时间空间(队列的维护等)开销,但是长时间自旋就变成了“忙式等待”,忙式等待显然还不如阻塞锁。所以自旋的次数一般控制在一个范围内,例如10,100等,在超出这个范围后,自旋锁会升级为阻塞锁。

对自旋锁周期的选择上,HotSpot认为最佳时间应是一个线程上下文切换的时间,但目前并没有做到。经过调查,目前只是通过汇编暂停了几个CPU周期,除了自旋周期选择,HotSpot还进行许多其他的自旋优化策略,具体如下:

  • 如果平均负载小于CPUs则一直自旋
  • 如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞
  • 如果正在自旋的线程发现Owner发生了变化则延迟自旋时间(自旋计数)或进入阻塞 如果CPU处于节电模式则停止自旋
  • 自旋时间的最坏情况是CPU的存储延迟(CPU A存储了一个数据,到CPU B得知这个数据直接的时间差)

轻量级锁

轻量级锁加锁

线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,则自旋获取锁,当自旋获取锁仍然失败时,表示存在其他线程竞争锁(两条或两条以上的线程竞争同一个锁),则轻量级锁会膨胀成重量级锁。

轻量级锁解锁

轻量级解锁时,会使用原子的CAS操作来将Displaced Mark Word替换回到对象头,如果成功,则表示同步过程已完成。如果失败,表示有其他线程尝试过获取该锁,则要在释放锁的同时唤醒被挂起的线程。下图是两个线程同时争夺锁,导致锁膨胀的流程图。


重量级锁

重量锁在JVM中又叫对象监视器(Monitor),它很像C中的Mutex,除了具备Mutex互斥的功能,它还负责实现了Semaphore的功能,也就是说它至少包含一个竞争锁的队列,和一个信号阻塞队列(wait队列),前者负责做互斥,后一个用于做线程同步。

锁的优缺点对比

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程使用自旋会消耗CPU 追求响应时间,锁占用时间很短
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量,锁占用时间较长


Reference

>
周志明的《深入理解Java虚拟机》
https://blogs.oracle.com/dave/entry/biased_locking_in_hotspot
https://www.usenix.org/legacy/event/jvm01/full_papers/dice/dice.pdf
http://www.javaworld.com/article/2076971/java-concurrency/how-the-java-virtual-machine-performs-thread-synchronization.html
http://www.infoq.com/cn/articles/java-se-16-synchronized
http://www.majin163.com/2014/03/17/synchronized2/
http://www.cnblogs.com/javaminer/p/3889023.html
http://blog.csdn.net/coslay/article/details/41526635

volatile的含义

volatile是一个类型修饰符(type specifier)。它是被设计用来修饰被不同线程访问和修改的变量。
volatile变量自身具有下列特性:

  • 可见性:对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量的写入。
  • 原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。

volatile与synchronized的比较

Java语言包含两种内在的同步机制:被synchronized修饰的同步块(或方法)和被volatile修饰的变量。这两种机制都是为了实现代码线程的安全性。与 synchronized 块相比,volatile 变量的同步性较差,使用时容易出错,但它更简单且开销更低。

锁提供了两种主要特性:互斥(mutual exclusion)和可见性(visibility)。互斥即一次只允许一个线程持有某个特定的锁,因此可使用该特性实现对共享数据的协调访问协议,这样,一次就只有一个线程能够使用该共享数据。可见性要更加复杂一些,它必须确保释放锁之前对共享数据做出的更改对于随后获得该锁的另一个线程是可见的 —— 如果没有同步机制提供的这种可见性保证,线程看到的共享变量可能是修改前的值或不一致的值,这将引发许多严重问题。

volatile 变量具有 synchronized 的可见性特性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。volatile 变量可用于提供线程安全,但是只能应用于非常有限的一组用例:多个变量之间或者某个变量的当前值与修改后值之间没有约束。因此,单独使用 volatile 还不足以实现计数器、互斥锁或任何具有与多个变量相关的不变式(Invariants)的类(例如 “start <=end”)。

出于简易性或可伸缩性的考虑,一般倾向于使用 volatile 变量而不是锁。但是当使用 volatile 变量时,某些习惯用法(idiom)更加易于编码和阅读。此外,volatile 变量不会像锁那样造成线程阻塞。在某些情况下,如果读操作远远大于写操作,volatile 变量还可以提供优于锁的性能优势。

volatile的适用范围

您只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:
对变量的写操作不依赖于当前值。
该变量没有包含在具有其他变量的不变式中。

volatile的内存语义

volatile写的内存语义如下:

  • 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存。
    volatile读的内存语义如下:
  • 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。后续线程读此变量时,将从主内存中读取并load到本地内存。

volatile内存语义的实现

JMM(Java Memory Model) 如何实现volatile写/读的内存语义。
JMM内部会实现指令重排序。为了实现volatile内存语义,JMM会分别限制这两种类型的重排序类型。

从JSR-133开始,volatile变量的写-读可以实现线程之间的通信。
从内存语义的角度来说,volatile与监视器锁有相同的效果:volatile写和监视器的释放有相同的内存语义;volatile读与监视器的获取有相同的内存语义。

更进一步地说明

处理器为了提高处理速度,不直接和内存进行通讯,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完之后不知道何时会写到内存,如果对声明了Volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能,为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略:

  • 在每个volatile写操作的前面插入一个StoreStore屏障。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadStore屏障。
  • 上述内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的volatile内存语义。

这里A线程写一个volatile变量后,B线程读同一个volatile变量。A线程在写volatile变量之前所有可见的共享变量,在B线程读同一个volatile变量后,将立即变得对B线程可见。

下面对volatile写和volatile读的内存语义做个总结:
线程A写一个volatile变量,实质上是线程A向接下来将要读这个volatile变量的某个线程发出了(其对共享变量所在修改的)消息。
线程B读一个volatile变量,实质上是线程B接收了之前某个线程发出的(在写这个volatile变量之前对共享变量所做修改的)消息。
线程A写一个volatile变量,随后线程B读这个volatile变量,这个过程实质上是线程A通过主内存向线程B发送消息。

volatile的性能

在目前大多数的处理器架构上,volatile 读操作开销非常低 —— 几乎和非 volatile 读操作一样。而 volatile 写操作的开销要比非 volatile 写操作多很多,因为要保证可见性需要实现内存栅栏(Memory barrier),即便如此,volatile 的总开销仍然要比锁获取低。

volatile的使用场景

很多并发性专家事实上往往引导用户远离 volatile 变量,因为使用它们要比使用锁更加容易出错。然而,如果谨慎地遵循一些良好定义的模式,就能够在很多场合内安全地使用 volatile 变量。要始终牢记使用 volatile 的限制 —— 只有在状态真正独立于程序内其他内容时才能使用 volatile —— 这条规则能够避免将这些模式扩展到不安全的用例。

场景1:状态标志

一个布尔状态标志,用于指示发生了一个重要的一次性事件,例如完成初始化或请求停机。

1
2
3
4
5
6
7
8
9
volatile boolean shutdownRequested;

public void shutdown() { shutdownRequested = true; }

public void doWork() {
while (!shutdownRequested) {
// do stuff
}
}

场景2:一次性安全发布(one-time safe publication)

缺乏同步会导致无法实现可见性,这使得确定何时写入对象引用而不是原语值变得更加困难。在缺乏同步的情况下,可能会遇到某个对象引用的更新值(由另一个线程写入)和该对象状态的旧值同时存在。(这就是造成著名的双重检查锁定(double-checked-locking)问题的根源,其中对象引用在没有同步的情况下进行读操作,产生的问题是您可能会看到一个更新的引用,但是仍然会通过该引用看到不完全构造的对象)。
实现安全发布对象的一种技术就是将对象引用定义为 volatile 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BackgroundFloobleLoader {
public volatile Flooble theFlooble;

public void initInBackground() {
// do lots of stuff
theFlooble = new Flooble(); // this is the only write to theFlooble
}
}

public class SomeOtherClass {
public void doWork() {
while (true) {
// do some stuff...
// use the Flooble, but only if it is ready
if (floobleLoader.theFlooble != null)
doSomething(floobleLoader.theFlooble);
}
}
}

上面的代码中,如果 theFlooble 引用不是 volatile 类型,doWork() 中的代码在解除对 theFlooble 的引用时,将会得到一个不完全构造的 Flooble。volatile 类型的引用可以确保对象的发布形式的可见性,但是如果对象的状态在发布后将发生更改,那么就需要额外的同步。

场景3:结合 volatile 和 synchronized 实现的”开销较低的读-写锁”

目前为止,您应该了解了 volatile 的功能还不足以实现计数器。因为 ++x 实际上是三种操作(读、添加、存储)的简单组合,如果多个线程凑巧试图同时对 volatile 计数器执行增量操作,那么它的更新值有可能会丢失。

然而,如果读操作远远超过写操作,您可以结合使用内部锁和 volatile 变量来减少公共代码路径的开销。清单 6 中显示的线程安全的计数器使用synchronized 确保增量操作是原子的,并使用 volatile 保证当前结果的可见性。如果更新不频繁的话,该方法可实现更好的性能,因为读路径的开销仅仅涉及 volatile 读操作,这通常要优于一个无竞争的锁获取的开销。

1
2
3
4
5
6
7
8
9
10
11
12
@ThreadSafe
public class CheesyCounter {
// Employs the cheap read-write lock trick
// All mutative operations MUST be done with the 'this' lock held
@GuardedBy("this") private volatile int value;

public int getValue() { return value; }

public synchronized int increment() {
return value++;
}
}

之所以将这种技术称之为 “开销较低的读-写锁” 是因为您使用了不同的同步机制进行读写操作。因为本例中的写操作违反了使用 volatile 的第一个条件,因此不能使用 volatile 安全地实现计数器 —— 您必须使用锁。然而,您可以在读操作中使用 volatile 确保当前值的可见性,因此可以使用锁进行所有变化的操作,使用 volatile 进行只读操作。其中,锁一次只允许一个线程访问值,volatile 允许多个线程执行读操作,因此当使用 volatile 保证读代码路径时,要比使用锁执行全部代码路径获得更高的共享度 —— 就像读-写操作一样。然而,要随时牢记这种模式的弱点:如果超越了该模式的最基本应用,结合这两个竞争的同步机制将变得非常困难。

总结

与锁相比,volatile 变量是一种非常简单但同时又非常脆弱的同步机制,它在某些情况下将提供优于锁的性能和伸缩性。如果严格遵循 volatile 的使用条件 —— 即变量真正独立于其他变量和自己以前的值 —— 在某些情况下可以使用 volatile 代替 synchronized 来简化代码。然而,使用 volatile 的代码往往比使用锁的代码更加容易出错。本文介绍的模式涵盖了可以使用 volatile 代替 synchronized 的最常见的一些用例。遵循这些模式(注意使用时不要超过各自的限制)可以帮助您安全地实现大多数用例,使用 volatile 变量获得更佳性能。

Reference

http://www.ibm.com/developerworks/cn/java/j-jtp06197.html
http://www.infoq.com/cn/articles/java-memory-model-1
http://www.infoq.com/cn/articles/ftf-java-volatile
http://zhhphappy.iteye.com/blog/2086149

什么是内存栅栏

内存栅栏(Memory Barriers),是让一个CPU处理单元中的内存状态对其它处理单元可见的一项技术。

内存栅栏提供了两个功能

  • 确保从另一个CPU来看栅栏的两边的所有指令都是正确的程序顺序,而保持程序顺序的外部可见性;
  • 实现内存数据可见性,确保内存数据会同步到CPU缓存子系统。

为什么需要内存栅栏

对主存的一次访问一般花费硬件的数百次时钟周期。为了减少这种操作,CPU通过使用Cache来达到高效获取数据的目的。然后Cache为了提高性能,会对指令进行重排序。
当重排序对最终的结果没有影响的时候,这种优化是有益的。但是当多线程共享数据时,重排序将导致错误的结果。所以为了在共享变量的情况下依然可以使用指令重排序,产生了内存栅栏来保证程序的正确性。

内存栅栏是怎么实现的

在底层,内存栅栏是一组指令,一般包括Store Barrier、Load Barrier和Full Barrier。

几乎所有的处理器至少支持一种粗粒度的屏障指令,通常被称为“栅栏(Fence)”,它保证在栅栏前初始化的load和store指令,能够严格有序的在栅栏后的load和store指令之前执行。

不同的CPU架构有不同的实现方式,以X86为例:

  • Store Barrier,强制所有在store屏障指令之前的store指令,都在该store屏障指令执行之前被执行,并把store缓冲区的数据都刷到主存
  • Load Barrier,强制所有在load屏障指令之后的load指令,都在该load屏障指令执行之后被执行,并且一直等到load缓冲区被该CPU读完才能执行之后的load指令。
  • Full Barrier,复合了load和store屏蔽指令。
    无论在何种处理器上,这几乎都是最耗时的操作之一(与原子指令差不多,甚至更消耗资源),所以大部分处理器还会支持更细粒度的屏障指令。

下图是CPU的Local Memory与主存的通信过程:

Java中内存栅栏的使用

Java内存模型中volatile变量在写操作之后会插入一个store屏障,在读操作之前会插入一个load屏障。一个类的final字段会在初始化后插入一个store屏障,来确保final字段在构造函数初始化完成并可被使用时可见。

内存栅栏对性能的影响

内存栅栏阻止了 CPU 很多隐式的内存延迟技术的执行,因此是有性能损耗的,不过在上层看来这种损耗并不大。在合适的时候使用内存栅栏,仍然是一种高效的做法。

Reference

http://mechanical-sympathy.blogspot.jp/2011/07/memory-barriersfences.html
http://ifeve.com/memory-barriers-or-fences/
http://www.infoq.com/cn/articles/memory_barriers_jvm_concurrency

转至: http://news.dbanotes.net/item?id=23383

写这篇文章,缘自于前几天部门内部成员们进行了一次部门内部现有涉及的一些算法的review以及整理。不过比较囧的就是,由于Boss不在,我们讨论讨论着就成了吐槽大会,倒是有一半时间在吐槽产品以及业务部门了。
不过这也算是一件可喜可贺的事情了,这也可以看做是我们数据部门,已经由开轻型挖掘机向深挖阶段迈步了。

因此,借此机会,也对自己接触过的,了解过的,或者做过的一些勉强称得上算法的东西做一个梳理。其实,就个人来说,本身就不是做算法出身的,在大学时代,学习的反倒是网络方面多一些,更不知数据挖掘算法为何物。

其实,就所谓算法而言,个人认为,我有个同事说的很对:所谓算法,并不是说那些复杂的数学模型才是算法,哪怕是你写的一个简单的计算公式,只要能够解决现有业务的痛点,有了自己的模型思路,它就是一个算法,只是它可能不够通用,只能解决特定业务需求而已。

在大规模的数据前提下,其实很多复杂的算法过程,反而效果没有这么好,或者说,我们会想方设法去简化其过程。

举个简单栗子:假设有一批大规模数据集,就以近千万篇博文为例。如果提供一篇博文,让你去查询与其相似度最高的top N,那我们的通常思路是什么?通常的做法是计算这篇博文与其他博文的相似度,至于相似度的计算方法就很多了,最简单的就是计算其向量夹角,根据向量夹角判定相似程度。OK,就算你用最简单的计算过程,你试想一下,运算近千万次需要多久?或许,有的人说,俺使用hadoop,利用分布式的计算能力来完成这个任务,但如果实际操作起来,你就会发现这是一个多么蛋疼的事情。

再举一个简单栗子(好吧,多吃点栗子):比如SVM,这是一种难以收敛的算法,在大数据的前提下,有些人希望使用它,但又希望使用更多的数据来训练模型,毕竟手里数据量太大,很多人还是希望使用尽量多的数据训练的,以达到模型更准确的目的。但是,随着训练数据量的增大,像SVM这种难以收敛的算法,其耗费的计算资源还是很巨大的。

东拉西扯说了这么多,自个的梳理工作还没有完成呢!

一、这些年,我开过的挖掘机

(1)最早接触的应该是贝叶斯的分类了

贝叶斯算是分类算法中最简单的算法了,初学挖掘机算法的人十有八九第一个爱上的绝对是它。其实,贝叶斯的原理真的很简单,就是依据统计学的最大概率原理。这么简单,但是就是尼玛这么好用,多年依然屹立不倒。

训练过程就缺乏可陈了,基本上贝叶斯的都这样,由于是文本,所以一套流程下来,分词,去停词,作为最基本的知识点向量,然后就计算模型概率了。不过比较有趣的是,分类过程是放在Storm里头做的,相当于这是一个实时的分类业务。

(2)说到了文本,自然少不了分词算法了

其实说到分词算法,反倒没啥可说的。如今互联网上各种开源的分词工具,都已经做的很好了,效果也差不了多少,想进一步改进的话也够呛。至于说深入到分词算法的内部,涉及上下文法分析,隐含马尔科夫模型等东西,如果是个人出于兴趣去研究,那我没话说;如果是小公司,花费人力物力去优化分词效果,我只能说他们闲着蛋疼;如果是大公司,人家金多任性也是可以理解的。

所以,至今来说,个人对于分词方面的东西,也仅限于初步了解分词算法的衍变,内部大概涉及的算法,以及几种分词工具的使用。

其实,在文本挖掘方面,仅仅针对于文本的分词是不够的,因为我们使用分词拆分出来的单词,往往很多跟业务都是没有关系的,通常做法是,建立对应业务字典,至于字典的建立,当然也是需要分词的,再进行进一步的加工,甚至可能会加上一些人工的工作。

(3)下一个就是实时热点分析了

我也不知道这算不算是算法,说到实时,自然跟Storm又有关系了(好吧,我承认我是搞这个之后开始接触数据的)。说到实时热点,可能大伙儿都摸不着头脑,举个简单栗子就明了了。

玩hadoop的童鞋都知道WordCount这个经典栗子,MapReduce在Map到Reduce的过程中,自动将相同的Key通过类似hash的方法聚合到一起了,所以,统计单词这个需求通过MR来做是辣么的简单。

那Storm的实时WordCount呢?好吧,这也是一个能够记录到实时技术领域史书上的经典案例(好吧,其实它就是一个Storm的HelloWorld)。Storm虽然没有类似MR那种自动Hash的功能,不过它也提供了一种数据分组流策略,也能达到类似的效果,并且它不像MR那样是批量的,它是实时的、流式的,也就是说你能动态的获取到当前变换的单词词频。

实时热点分析,如果我们把热点映射成单词,那我们是不是就可以实时的获取到当前Top N的热点了。这个方向可是有很大的研究价值的,实时地掌握了用户的热点导向,我们就可以动态的调整业务策略,从而衍生更大的数据价值。

不过,总体来说,这个数据模型更多依靠的是Storm这个实时工具的本身功能,模型设计上的东西反倒是少了。至于说算不算是算法模型,就跟前面所说的那样,看个人看法吧,你说是就是了~~

(4)国内很成熟的一种建模——推荐

就目前在国内做数据挖掘的来说,可能分类与推荐是做的最多的两种方向。分类就不多说了,就比如刚才所说的贝叶斯,简直就是分类中的鼻祖算法了。

可能一说到推荐算法,有人脑海里立马就闪现出关联规则、协同过滤、余弦相似性等这些词。这是没错的,但我要说的不是这个。其实个人想说的是推荐就两个方向:基于用户,基于内容。

我们需要注意两点,我们推荐的对象是用户,或者说是类似用户这种有动作行为的实体;而推荐的东西则就是内容,他没有动作行为,但是他有不同的属性,或者用更砖业说法描述就是他必然有知识点。

基于用户推荐,我们看重的不是内容这个实体,而是用户本身的行为,我们认为用户的行为必然隐含着一些信息,比如,人的兴趣导向,那么既然你有了相关的行为,那么我按照你的行为去给你推荐一些东西,这总是有一定道理的。

基于内容的推荐,我们的侧重点则是内容,这就跟用户的历史行为无关了。我们潜意识的认为,既然你会看这个内容,那么跟这个内容有关系的内容,你是不是也感兴趣呢?或许这样说有失偏颇,但是大体方向是对的。

至于之前说的那些关联规则也好,协同过滤也好,余弦相似性也好,其实就是研究知识点与知识点之间关系所建立的模型。

针对于基于内容推荐,其知识点就是内容之中的各种属性,比如影片推荐,其知识点可能就是各种评论数据、点播数据、顶踩数据、影片类型、演员、导演以及其中的一些情感分析等等;又比如博文,其知识点可能就是一个个带权的词,至于这个词就涉及到词的抽取了,再说到词的权重,可能就会涉及到TFIDF模型、LDA模型了。

而针对基于用户,其知识点最直接的体现就是用户的行为了,就是用户与内容之间的关系,不过深究下去,又会发现,其实跟内容的知识点也紧密联系,只不过这可能不止一个内容实体,而是多个内容实体的集合。

(5)文本单词的加权模型

前面正好提到了TFIDF以及LDA模型,所以顺带也就讲讲文本单词相关的加权模型吧。

说到文本挖掘,可能大部分人都熟悉TFIDF模型,既然涉及到了,那就简单的说一说。我们知道,文本的知识点就是一个个的单词,虽然都是单词,但也总有哪个词重要程度高一点,哪些词重要程度会低一点吧。

或许有人会说,出现多的词就重要。没错,那就是词频,简单的来想,这种思路并没有错,并且,早期的文本挖掘模型就是这么做的。当然,效果肯定是一般般的。因为那些经常出现的词往往都是一些没用的常用词,对文章的作用并不大。

直到TFIDF模型的出现,才根本性地解决了文本挖掘知识点建模的问题。如何判断一个词的重要程度,或者专业点的说法就是判断其对文章的贡献度?TFIDF通过词的词频来加大词在文章中的权重,然后通过其在多个文章中的文档频率来降低其在文章中的权重。说白了就是降低了那些公共词的权重,把真正贡献度大的词给暴露出来。这基本就是TFIDF的基本思路了,至于词频权重怎么加大,文档频的权重怎么降低,这就涉及到具体的模型公式了,根据不同的需求进行调整就OK了。

关于文章知识点主题建模的另外一种很重要的模型,那就是LDA模型了。它是一种比较通用的文章主题模型,它通过概率学原理,说白了就是贝叶斯,建立起知识点(也就是词),主题和文章的三层关系结构。词到主题有一个概率矩阵,主题到文章也有一个概率矩阵的映射关系。

好吧,LDA不能再说下去了,再说下去就露馅了。因为,俺也不是很懂啊。对于LDA,虽然部门内部有在使用,但是我没有做过具体的模型,只是和同事讨论过它,或者更确切的说向同事请教过它的一些原理以及一些设计思路。

(6)相似度计算

相似度计算,比如文本的相似度计算。它是一个很基础的建模,很多地方就用的到它,比如刚才我们说到的推荐,其内部关联的时候,有时候就会涉及到计算实体间的相似度。

关于文本的相似度,其实方法有很多。通常会涉及到TFIDF模型,拿到文本的知识点,也就是带权的词,然后通过这些带权的词去做一些相似度的计算。

比如,余弦相似模型,就是计算两个文本的余弦夹角,其向量自然就是那些带权的词了;又比如,各种算距离的方法,最著名的欧式距离,其向量也依然是这些词。还有很多诸如最长公共子串、最长公共子序列之类的模型,个人就不是很清楚了。

总之,方法很多,也都不是很复杂,原理都很像。至于哪个合适,就得看具体的业务场景了。

(7)文本主题程度——信息熵

曾经和同事尝试对数百万的博文进行领域划分,把技术博文划分成不同的领域,比如大数据领域、移动互联网领域、安全领域等等,其实说白了还是分类。

一开始我们使用贝叶斯进行分类,效果还行,不过最终还是使用SVM去建模了。这都不是重点,重点是我们想对划分到某一领域下的技术博文进行领域程度判断。

我们想了很多办法,尝试建立了数据模型,但效果都不是很理想,最终回归到了一个最本质的方法,那就是使用文本的信息熵去尝试描述程度,最终结果还是不错。这又让我再一次想到同事说过的那句话:简单的东西不一定不好用!

信息熵描述的是一个实体的信息量,通俗一点说就是它能够描述一个实体的信息混乱程度。在某一个领域内,知识点都是相似的,都是那些TFIDF权重的词,因此,是不是可以认为,一个文本其信息熵越小,其主题越集中越明显,信息的混乱度越低,反过来说,有些文本主题很杂乱,可能包含了多种领域的一些东西,其领域的程度就会降低。

最起码表面上,这种说法是行得通的,并且实际的效果还不错。

(8)用户画像

用户画像这个方向可能是近两年比较火的方向了。近年来,各大互联网公司,各大IT企业,都有意识的开始从传统的推荐到个性化推荐的道路衍变,有些可能做的深一些,有些可能浅一些。

商业价值的核心是用户,这自然不用多说。那么如何结合用户进行推荐呢,那就是用户的属性,那关键是用户的属性也不是一开始就有的,我们所有的只是少量用户的固有属性以及用户的各种行为记录。我们连用户是啥子里情况都不清楚,推个毛啊!

所以,我们需要了解用户,于是对用户进行用户画像分析就很有必要了,其实就是把用户标签化,把用户标记成一个个属性标签,这样,我们就知道每一个用户大概是什么情况了。一些商业行为,也就有了目的性。

至于说如何对用户的每一个画像属性进行填充,这就看具体的情况了。简单的,用几个简单模型抽取到一些信息填充进去;复杂的,使用复杂的算法,通过一些复杂的转换,给用户打上标签。

(9)文章热度计算

给你一大坨文章,你如何判断哪篇文章比较热,哪篇文章比较矬,换个说法就是,我进入一个文章列表页,你能给我提供一个热文章的排序列表吗?

可能大部分的思路都很直接,拿到文章能够体现热度的属性,比如点击率、评论情感分析、文章的顶踩情况,弄个简单加权计算模型,咔咔就出来了。

本质上这没错,简单的模型在实际的情况中不一定不好使,部分属性也的确能够体现出一篇文章的热度,通过加权计算的方式也是对的,具体的权重就需要看具体情况了。

但如果这么做的话,实际上会出现什么情况?今天我来了,看见了这个热度推荐列表,明天我来了,还是看到这个列表,后天我来了,依然是这个列表。

尼玛,这是啥情况,咋天天都是这个破列表,你要我看几遍?!不错,这就是现实情况,造成的结果就是,越热的文章越来越热,越冷的文章越冷,永远的沉底了,而热的文章永远在前头。

如何解决这个问题?我们把时间也加入参考,我们要把老文章通过降权的方式,把他人为的沉下去,让新文章有出头的机会。这就是说,需要我们把创建时间也加入权重中,并且随着时间推移,衰减其热度权重,这样,就不会出现热的一直热,冷的一直冷了。至于衰减的曲线,就需要看具体业务了。

这样就能解决根本问题了吗?如果文章本身信息量就不够呢,比如,本身大部分就是新文章,没有顶踩,没有评论,甚至连点击曝光都很少,那用之前的模型就行不通了。

那是不是就无解了呢?方法还是有的,比如,我们寻找到一个相似的站点,他也提供了类似最热文章推荐的功能,并且效果还很不错。那么,我们是不是就可以借助它的热度呢?我们通过计算文章相似度的方法,复刻出一个最热列表出来,如果站点性质相似,用户性质相似,文章质量不错,相似度计算够准确,相信这个热度列表的效果也是会不错滴(这方法太猥琐了~~)。

(10)Google的PageRank

首先,别误会,我真心没有写过这个模型,我也没有条件去写这个模型。

认识它了解它,缘自于跟几个老同学合伙搞网站(酷抉网)。既然搞网站吧,作为IT人猿,一些基本的SEO的技术还是需要了解的。于是,我了解到:想要增大网站的权重,外链是不可缺少的。

我跟我几个老同学说,你们去做外链吧,就是逮住网站就放咱网站的链接。他们问到:一个网站放的链接越多越好吗?放的网站越多越好吗?啥网站放比较好?这都不是重点,关键是他们问:为毛啊?

把我问的那个是哑口无言啊,于是我一怒之下就去研究PageRank了。PageRank具体的推演过程我就不说了(况且凭借我这半吊子的水平也不一定能说清楚),其核心思想有几个:当一个网页被引用的次数越多时,其权重越大;当一个网页的权重越大时,其引用的网页权重也随之增大;当一个网页引用的次数越多时,它引用的网页给它带来的权重越低。

当我们反复迭代路上过程时,我们会发现某个网页的的排名基本就固定了,这就是PageRank的基本思路。当然也有个问题需要解决,比如,初始网页如何给定其初始权重,高计算迭代过程如何简化其计算过程等等。这些问题,在Google的实际操作中,都做了比较好的优化。

(11)从互联网上定向抓取数据

其实我估摸着这跟算法没很大关系了,不过既然有数据的获取设计流程,也勉强算是吧。

之所以有这个需求,是那段时间搞网站搞嗨了,给自己整了个工作室网站,想给别人尤其是一些小企业搭建包括轻度定制企业网站(是不是挺瞎折腾的-_-),也确实是做了几个案例(我的工作室网站:www.mite8.com,有兴趣去看看)。

于是乎,俺就想啊,如何给自己找客户?工作室的客户应该是那些小企业的老板,并且还必须是目前没有企业门户的。作为一个搞数据的程序猿,并且还是开挖掘机的,虽然是半路出身非蓝翔毕业且无证上岗,但好歹是挖过几座山头的呀。

如今是互联网横行的时代,他们总会在互联网上留下一些蛛丝马迹,我要把它给逮出来!我的目标很明确,我要拿到那些无企业网站的企业邮箱,然后做自己EDM营销(电子邮件营销)。

1)我先从智联检索页面,抓取了企业规模小于40人的企业名称,事实证明智联招聘的页面还是很好解析的,都是静态的,并且格式很规整,所以很容易就分析出一批小企业的企业名来了;

2)拿到了企业名,我如何判断这个企业已经有了独立的企业官网?通过分析,我发现通过搜索引擎检索这个企业名的时候,如果有企业官网的话,一定是在首页。并且其页面地址也是有一定规律的,那就是:独立官网的开头通常是www开头的,长度一般不会太长,收尾通常是index.html、index.php以及index.asp等等。

通过这些规则,我就可以将那些有企业官网的企业名给pass掉了。其中遇到了两个难点,一个就是搜索引擎的很多页面源码都是动态加载的,于是我模拟了浏览器访问的过程,把页面源码给抓取下来了,这也是爬虫的通用做法;第二个就是,一开始我尝试的是通过百度去获取,结果百度貌似是有放结果抓取的一些措施,导致结果不如人意,于是我换了目的,使用的是360的检索,问题就解决了(事实证明百度在搜索引擎方面比360还是强了不少的),并且效果也差不多。

3)解决了排除的问题,那根本的问题就来了,我如何拿到企业的企业邮箱?通过分析搜索引擎的返回结果,我发现很多小企业喜欢用第三方网站提供的一些公司黄页,里头包含了企业联系邮箱;还有部分公司发布的招聘信息上会带有企业邮箱。

通过数据解析,终于拿到了这部分数据,最后还做了一些类似邮箱是否有效的基本解析等等。最终拿到了大概3000多个企业邮箱,有效率达到了80%以上。

问题是解决了,但还是有些地方需要优化的:首先就是效率问题,我整整跑了近12个小时,才把这3000多个邮箱给跑出来,太多需要解析的地方,并且模拟的浏览器在效率上不高;其次就是对邮箱的有效不是很好判断,有些邮箱根本就是人为瞎写的;还有就是部分网站对邮箱进行了图片化混杂处理,即做成了类似的验证码的东西,防抓取,我没有对图片类的邮箱数据进行解析,其实这个问题也是有解决办法的,我们拿到一些样本图片,进行图片字母识别的训练,这样就能解析出其中的邮箱了。

总体来说,这次体验还是挺有成就感的,毕竟在业余的时间解决了自己实际中的一些痛点,熟练了一些所学到的东西,或者说实施的过程中学到了很多东西。

ps:github上检索webmite就是这个项目了,我把代码托管到了github上,或者从我的博客上进入。

二、对自己做一个总结吧

其实个人的缺点很明显,首先就是没有经过系统的数据挖掘学习(没去过蓝翔,挖掘机自学的),也就是野路子出身。因此对很多算法的原理不够清楚,这样的话,对于有些业务场景,可能就提不出有建设性的意见了。并且,对于很多算法库的使用,还是不够了解的。

其次就是在数学功底上有所欠缺。我们知道,一些复杂的算法,是需要有强大的数学基础的。算法模型,其本质就是数学模型。因此,这方面也是我的短板吧。

由于个人是由做大数据偏向挖掘的,基于大数据模式下的数据挖掘过程,可能跟传统的数据过程有很大的不一样。比如,数据的预处理过程,大数据挖掘的预处理很多依赖的是目前比较流行的分布式的一些开源系统,比如实时处理系统Storm、消息队列Kafka、分布式数据收集系统Flume、数据离线批处理Hadoop等等,在数据分析存储上可能依赖的Hive以及一些Nosql会多一些。反倒对于传统的一些挖掘工具,比如SAS、SPSS、Excel等工具,个人还是比较陌生的。不过这也说不上是缺点吧,侧重点不一样。总体而言,大规模数据的挖掘将会是趋势。

三、给小伙伴们的一些建议

说了这么多,前面的那些东西可能对大伙儿的用处并不是很大,当然对于开挖掘机的朋友还是有一定帮助的。现在我想表达的东西可能跟挖掘就没有直接的关系了,更多的给动物园动物(程序猿,攻城狮)的学习以及自我进化的建议。

(1)为了学到东西,脸皮是毛玩意儿?

对于这点,个人可是深有体会。想当年(好吧,这个词还是很蛋疼的),大学那会儿专业是信息安全,偏向于网络多一点,因此在语言方面更多的是c和c++,对于java可是连课都没有开的,说白了就是用java写个HelloWorld都不会。

刚毕业那会儿,兴冲冲地跑去公司写c,结果不到一个月,新项目来了,需求变了(尼玛,开发最怕的就是这句话),变了就变了吧,尼玛要研究大数据,用c能干毛啊!一些个开源系统工具,十个倒是有九个是java写的。当时我就哭了!

于是就纠缠着一个同组的伙伴,逮住时间就问他问题,有些问题在熟悉java的人看来,绝对是白痴又白痴的。但是对于初学者来说,绝对是金玉良言,人家一句话的事,如果自己去查找,可能是几个小时都搞不定。一个月之后,总算入门了,后面就轻松多了。

往后的一些日子里,遇到了一些问题,总是会厚着脸皮缠着交流群中的一些大拿们死问,慢慢地就进步了。近段时间,开始学习scala,幸好旁边有个scala小高手,哈哈,可苦了他了~~

所以,遇到自己不懂的东西,不要怕自己的问题简单不好意思问,一定要脸皮厚!你连这么简单的问题都不懂,你还有资格担心自己的脸皮?!

(2)交流与分享

对于交流与分享这点感想,缘自于2012年末研究Storm的那段时间。Storm在2012年那会儿,并不像今天这样火,研究的人也不多,无处交流,可用的资料就更少了,所以解决起问题来很费事。

当然其中有几个博客给我的帮助还是很大的,包括了“大园那些事儿”、“庄周梦蝶”等几个博客,都是早期研究Storm并且分享经验技术的博客。当时我就萌生了写博客的想法。

在往后的时间里,我花费了很大一部分精力,将我学到的Storm相关的东西整理了出来,并且由于当时感叹没有一个很好的交流平台,创建了“Storm-分布式-IT”技术群(群号191321336,主要搞Storm以及大数据方面的,有兴趣的可以进来),并把整理的资料、代码、经验分享到了平台以及博客中。

由于我一直主张“进步始于交流,收获源于分享”这个理念,不断有搞技术的朋友加入到这个大家庭中,并且不断的把一些经验技术反馈到群贡献中,达到了一个良性的循环。 短短不到两年的时间,群已经发展到了千人,并且无论是技术氛围还是群员素质,在IT技术群中绝对可以算的上名列前茅的。

就个人从中的收获来看,这种交流是能够学到很多的东西的,你要相信三人行必有我师,这句话是有道理的。而分享则是促进交流的基石,只有让大家意识到自己所收获的东西是源自于别人的分享,这样才能让更多的人参与进来。

其实说了这么多,想表达的意思就两点:多多与他人交流,听取他人的意见;至于分享自己的所得,这就是属于良心发现了。

(3)多看书,随时给自己大脑补充营养

其实这点也不止是给大伙儿的建议,也算是给自己的一个告诫吧。

个人在这方面做的也不是很好,很久之前给自己定了一个目标:一个月看完一本书。结果工作的问题,其他杂七杂八的事情很多,这个一直没有落实下来,至今买来的《我的互联网方法论》才看了前几章。最好的案例算是上上一个月,我花费了近一个月上下班等地铁、倒地铁的零碎时间,终于把《构建之法:现代软件工程》给看完了。

书中有没有颜如玉我不知道,但书中肯定有黄金屋。平时多看一些书,多学一些,跳槽时跟面试官总是能多唠一些的,哈哈,提薪酬的时候是不是底气就足了些?!

关于说看书的内容,工作中涉及的一些必须了解,必须看的我就不多说了。如果业余时间比较多,还是推荐多涉猎一些其他相关领域,毕竟,人不可能一辈子就只窝在自己那一亩三分地上的;就算你一直坚持某个技术方向,随着时间的推移,技术的升华也必然会涉及到其他很多的相关知识。

所以,多看书,多充实一下自己,这一定是对的!

(4)经常梳理一下自己,整理一下自己

经常给自己做一下梳理工作:自己目前掌握了哪些东西,目前自己缺乏什么东西,掌握的东西够不够,缺乏的东西如何去弥补。这些都是需要我们经常去反思的,只有整理清楚了自己,才知道自己要干什么,才有目标。

当然梳理完了,你还需要去实际操作,不然的话,你会发现,每一次梳理,结果都是一样的。我们需要在每一次梳理过后,进行对比,了解自己进步了多少。当然每一次梳理,都是为了给自己做一个计划,计划自己大概需要在哪些方向进行加强。

其实很多人一到了跳槽季就犹犹豫豫,其实他们对目前的工作已经是有所不满的了,但是总感觉自己能力不够,可能辞了也难找工作。这是因为他们对自己认识的不够,连他自己都不明白自己到底有多少料,那么,请问面试官会知道吗?

如果,你对自己掌握了多少东西都一清二楚,核心领域已经熟悉了,相关领域也有所涉猎,那么你还在担心什么呢?如果真有面试官对你说no,你可以说:hi,刚好我也没什么时间,我还回去挑选offer呢!

(5)善于在实际生活中寻找学习的动力

人是懒惰的,很多时候,有些事情可做可不做的,往往人都是不去做的,也不愿意去深根究底。

  这个我很想学,那个我也很想了解,关键是一到大周末,我更想躺被窝!说到底,就是没有学习的动力!也就是说,我们要善于在实际的生活中,寻找到推动我们取学习的理由。
举几个简单的栗子:

1)之前也说过,有段时间在研究网站。为了让网站推广出去,各种去研究SEO,现在来看,自己虽然远远达不到一个SEO专业人员的标准,但最起码是知道了为毛通过搜索引擎检索,有些网页就排在前面有些就排在后面(PageRank算法);也知道了怎么去编译一篇文章,更好的方便搜索引擎收录(等俺失业了,不搞挨踢了,去做网编估计也是行的,又多了一条活路,哈哈)等等。

2)为了给EDM寻找目标,我自己使用业余的时间去分析互联网上的数据,然后写代码,跑数据,测试数据等。其实,在那之前,我对爬虫的了解是不多的,对于网页数据的解析也不在行,这完全都是通过“从互联网抓取有用数据”的个人需求上去驱动的。还不止如此,拿到邮箱之后,为了让EDM邮件看起来更“砖业”一点,我开始自学如何使用html来制作好看的电子营销邮件页面。

3)曾经有一段时间,工作很是清闲,突发奇想的把大学时想写小说的梦给圆了。于是就开始在纵横小说网上写小说。不过,这都不是重点,重点是纵横要求每一个作者给自己的小说配小说封面。我去问了一下,尼玛一张破封面需要20多大洋。心想,一张破封面就要20大洋,自己都是搞IT的人,干脆不自己P一个呢。于是,我开始捡起了大学时期放弃的PS学习计划,只用了两个星期,PS基本功能就熟练了。后来的话,自己的封面当然是搞定了,并且还服务了至少数十位作者朋友们。当然,这都是题外话了。至于小说,哈哈,不但签约了,稿费还是挣了上千大洋,关键是过了一把写小说的瘾。在PS技术方面,虽然跟专业的前端人员比不得,但是改改图、修修照片还是木有问题滴。

4)远的太远,说一个近一点的事吧。前一段时间开始学习scala,其实就个人需求来说,写那个项目用java来写也完全能够搞定,但关键是我对我自己说,错过了这次机会,下次说不定啥时候才有决心去学习这个很有前途的语言了。于是,狠下心使用这个全新的语言去开发,过程虽然磕磕绊绊,毕竟马上使用一种陌生的语言去敲代码是很蛋疼的事,但一个星期来,结果还是不错的,最起码一些基本的用法是会了。完事开头难,熟悉了一些基本的东西,剩下的就是累积的过程了。

其实这些归结起来就一个观点:我们要适时的给自己找一些理由,逼着我们自己去学习,去获取新的东西,去提升自己。

或许有人会说,哥我天天加班,还有毛线时间去问问题、去交流、去看书,大周末的好不容易有假期了,吃饱了我不去睡觉去给自己找动力干不给钱的活,我脑抽啊?!好吧,如果你是这么想的,抱歉耽误了你这么多睡觉的时间。

其实上面说了这么多零碎的栗子,关键还是在于态度!你有没有想学习的欲望,有没有提升自己、升华自己的想法,有没有升职、加薪、当上UFO、迎娶白富美的念头。是的,这些东西都是自己去做的,没人逼你。如果你有这些想法的话,那么这些东西多多少少还是有一些帮助的。

除了对待事情的态度,我们的心态也很重要,看待事情要乐观一点。前几天,群里有个搞互联网招聘的朋友问我:你是搞技术的吧?我说是。他说我认识很多搞技术的都很闷,不像你这么开朗。我说我不想哪天死在了马桶上~~

搞IT的给大部分人的映象确实是闷骚、不善言谈、不善交际。其实也是,每天大量的工作,领导又开会训人了、产品这边需求又改了,确实让人疯狂。工作压力大是IT人的标准属性了。

我们需要调整好自己的心态,就像之前所说的,学习一个东西,虽然可能会占用本来就不多的业余时间,但是我们应该不是那种单纯为了解决问题而去学习,去获取,当成一种提升自己、升华自己的途径,而不是逼不得已的无奈之举。如果一份工作,你确认自己不喜欢,那就别犹豫,果断跳吧!脑中有货还怕找不到买家!

时刻警醒自己对待任何事情要有一个好的态度,认清自己,抓住一切机会提升自己、升华自我,保持一个良好的心态,这就是我想说的东西。

吭吭唧唧说了一大坨,其实我也知道很多是废话,但是我依然希望,我的这些废话能够帮助到你,做为同一个动物园里的人,一起努力吧!

AQS简介

AQS介绍

AbstractQueuedSynchronizer提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:

1
2
3
java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

AQS用处

同步器与锁

同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。 可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。
同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

1
2
3
4
5
6
7
Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}

以上五个成员变量主要负责保存该节点的线程引用,同步等待队列(以下简称sync队列)的前驱和后继节点,同时也包括了同步状态。

属性名称 描述
int waitStatus 表示节点的状态。其中包含的状态有:
CANCELLED,值为1,表示当前的线程被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
值为0,表示当前节点在sync队列中,等待着获取锁。
Node prev 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node next 后继节点。
Node nextWaiter 存储condition队列中的后继节点。
Thread thread 入队列时的当前线程。

节点成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。

API说明


实现自定义同步器时,需要使用同步器提供的getState()、setState()和compareAndSetState()方法来操纵状态的变迁。

方法名称 描述
protected boolean tryAcquire(int arg) 排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。
protected boolean tryRelease(int arg) 释放状态。
protected int tryAcquireShared(int arg) 共享的模式下获取状态。
protected boolean tryReleaseShared(int arg) 共享的模式下释放状态。
protected boolean isHeldExclusively() 在排它模式下,状态是否被占用。

实现这些方法必须是非阻塞而且是线程安全的,推荐使用该同步器的父类java.util.concurrent.locks.AbstractOwnableSynchronizer来设置当前的线程。
开始提到同步器内部基于一个FIFO队列,对于一个独占锁的获取和释放有以下伪码可以表示。
获取一个排他锁。

1
2
3
4
5
6
7
8
9
10
11
while(获取锁) {
if (获取到) {
退出while循环
} else {
if(当前线程没有入队列) {
那么入队列
}
阻塞当前线程
}
}
释放一个排他锁。
1
2
3
4
if (释放成功) {
删除头结点
激活原头结点的后继节点
}

Mutex 示例


下面通过一个排它锁的例子来深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能够更加深入了解其他的并发组件。
排他锁的实现,一次只能一个线程获取到锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Mutex implements Lock, java.io.Serializable {
// 内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() { return new ConditionObject(); }
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

可以看到Mutex将Lock接口均代理给了同步器的实现。
使用方将Mutex构造出来之后,调用lock获取锁,调用unlock进行解锁。下面以Mutex为例子,详细分析以下同步器的实现逻辑。

独占模式

acquire

实现分析
public final void acquire(int arg)
该方法以排他的方式获取锁,对中断不敏感,完成synchronized语义。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上述逻辑主要包括:

  1. 尝试获取(调用tryAcquire更改状态,需要保证原子性);
    在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队。
  2. 如果获取不到,将当前线程构造成节点Node并加入sync队列;
    进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。
  3. 再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。
    使用LockSupport将当前线程unpark,关于LockSupport后续会详细介绍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}

上述逻辑主要包括:

  1. 使用当前线程构造Node;
    对于一个节点需要做的是将当节点前驱节点指向尾节点(current.prev = tail),尾节点指向它(tail = current),原有的尾节点的后继节点指向它(t.next = current)而这些操作要求是原子的。上面的操作是利用尾节点的设置来保证的,也就是compareAndSetTail来完成的。
  • 先行尝试在队尾添加;
    如果尾节点已经有了,然后做如下操作:
    1. 分配引用T指向尾节点;
    2. 将节点的前驱节点更新为尾节点(current.prev = tail);
    3. 如果尾节点是T,那么将当尾节点设置为该节点(tail = current,原子更新);
    4. T的后继节点指向当前节点(T.next = current)。

      注意第3点是要求原子的。
      这样可以以最短路径O(1)的效果来完成线程入队,是最大化减少开销的一种方式。
  • 如果队尾添加失败或者是第一个入队的节点。
    如果是第1个节点,也就是sync队列没有初始化,那么会进入到enq这个方法,进入的线程可能有多个,或者说在addWaiter中没有成功入队的线程都将进入enq这个方法。
    可以看到enq的逻辑是确保进入的Node都会有机会顺序的添加到sync队列中,而加入的步骤如下:
    1. 如果尾节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化;
    2. 然后是重复在addWaiter中做的工作,但是在一个for(;;)的循环中,直到当前节点入队为止。

进入sync队列之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。而每个线程都是一个独立的个体,它们自省的观察,当条件满足的时候(自己的前驱是头结点并且原子性的获取了状态),那么这个线程能够继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head &&tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

上述逻辑主要包括:

  1. 获取当前节点的前驱节点;
    需要获取当前节点的前驱节点,而头结点所对应的含义是当前占有锁且正在运行。
  2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;
    如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
  3. 否则进入等待状态。
    如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。
    这里针对acquire做一下总结:
  4. 状态的维护;
    需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并且利用compareAndSet来确保原子性的修改。
  5. 状态的获取;
    一旦成功的修改了状态,当前线程或者说节点,就被设置为头节点。
  6. sync队列的维护。
    在获取资源未果的过程中条件不符合的情况下(不该自己,前驱节点不是头节点或者没有获取到资源)进入睡眠状态,停止线程调度器对当前节点线程的调度。
    这时引入的一个释放的问题,也就是说使睡眠中的Node或者说线程获得通知的关键,就是前驱节点的通知,而这一个过程就是释放,释放会通知它的后继节点从睡眠中返回准备运行。
    下面的流程图基本描述了一次acquire所需要经历的过程:

如上图所示,其中的判定退出队列的条件,判定条件是否满足和休眠当前线程就是完成了自旋spin的过程。

release

public final boolean release(int arg)
在unlock方法的实现中,使用了同步器的release方法。相对于在之前的acquire方法中可以得出调用acquire,保证能够获取到锁(成功获取状态),而release则表示将状态设置回去,也就是将资源释放,或者说将锁释放。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

上述逻辑主要包括:

  1. 尝试释放状态;
    tryRelease能够保证原子化的将状态设置回去,当然需要使用compareAndSet来保证。如果释放状态成功过之后,将会进入后继节点的唤醒过程。
  2. 唤醒当前节点的后继节点所包含的线程。
    通过LockSupport的unpark方法将休眠中的线程唤醒,让其继续acquire状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
// 将状态设置为同步状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* 获取当前节点的后继节点,如果满足状态,那么进行唤醒操作
* 如果没有满足状态,从尾部开始找寻符合要求的节点并将其唤醒
*/

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

上述逻辑主要包括,该方法取出了当前节点的next引用,然后对其线程(Node)进行了唤醒,这时就只有一个或合理个数的线程被唤醒,被唤醒的线程继续进行对资源的获取与争夺。
回顾整个资源的获取和释放过程:
在获取时,维护了一个sync队列,每个节点都是一个线程在进行自旋,而依据就是自己是否是首节点的后继并且能够获取资源;
在释放时,仅仅需要将资源还回去,然后通知一下后继节点并将其唤醒。
这里需要注意,队列的维护(首节点的更换)是依靠消费者(获取时)来完成的,也就是说在满足了自旋退出的条件时的一刻,这个节点就会被设置成为首节点。

tryAcquire

protected boolean tryAcquire(int arg)
tryAcquire是自定义同步器需要实现的方法,也就是自定义同步器非阻塞原子化的获取状态,如果锁该方法一般用于Lock的tryLock实现中,这个特性是synchronized无法提供的。

public final void acquireInterruptibly(int arg)
该方法提供获取状态能力,当然在无法获取状态的情况下会进入sync队列进行排队,这类似acquire,但是和acquire不同的地方在于它能够在外界对当前线程进行中断的时候提前结束获取状态的操作,换句话说,就是在类似synchronized获取锁时,外界能够对当前线程进行中断,并且获取锁的这个操作能够响应中断并提前返回。一个线程处于synchronized块中或者进行同步I/O操作时,对该线程进行中断操作,这时该线程的中断标识位被设置为true,但是线程依旧继续运行。
如果在获取一个通过网络交互实现的锁时,这个锁资源突然进行了销毁,那么使用acquireInterruptibly的获取方式就能够让该时刻尝试获取锁的线程提前返回。而同步器的这个特性被实现Lock接口中的lockInterruptibly方法。根据Lock的语义,在被中断时,lockInterruptibly将会抛出InterruptedException来告知使用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 检测中断标志位
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

上述逻辑主要包括:

  1. 检测当前线程是否被中断;
    判断当前线程的中断标志位,如果已经被中断了,那么直接抛出异常并将中断标志位设置为false。
  2. 尝试获取状态;
    调用tryAcquire获取状态,如果顺利会获取成功并返回。
  3. 构造节点并加入sync队列;
    获取状态失败后,将当前线程引用构造为节点并加入到sync队列中。退出队列的方式在没有中断的场景下和acquireQueued类似,当头结点是自己的前驱节点并且能够获取到状态时,即可以运行,当然要将本节点设置为头结点,表示正在运行。
  4. 中断检测。
    在每次被唤醒时,进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。

doAcquireNanos

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
该方法提供了具备有超时功能的获取状态的调用,如果在指定的nanosTimeout内没有获取到状态,那么返回false,反之返回true。可以将该方法看做acquireInterruptibly的升级版,也就是在判断是否被中断的基础上增加了超时控制。
针对超时控制这部分的实现,主要需要计算出睡眠的delta,也就是间隔值。间隔可以表示为nanosTimeout = 原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。如果nanosTimeout大于0,那么还需要使当前线程睡眠,反之则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head &&tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
//计算时间,当前时间减去睡眠之前的时间得到睡眠的时间,然后被
//原有超时时间减去,得到了还应该睡眠的时间
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

上述逻辑主要包括:

  1. 加入sync队列;
    将当前线程构造成为节点Node加入到sync队列中。
  2. 条件满足直接返回;
    退出条件判断,如果前驱节点是头结点并且成功获取到状态,那么设置自己为头结点并退出,返回true,也就是在指定的nanosTimeout之前获取了锁。
  3. 获取状态失败休眠一段时间;
    通过LockSupport.unpark来指定当前线程休眠一段时间。
  4. 计算再次休眠的时间;
    唤醒后的线程,计算仍需要休眠的时间,该时间表示为nanosTimeout = 原有nanosTimeout – now(当前时间)+ lastTime(睡眠之前记录的时间)。其中now – lastTime表示这次睡眠所持续的时间。
  5. 休眠时间的判定。
    唤醒后的线程,计算仍需要休眠的时间,并无阻塞的尝试再获取状态,如果失败后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超时,没有获取到锁。 如果nanosTimeout小于等于1000L纳秒,则进入快速的自旋过程。那么快速自旋会造成处理器资源紧张吗?结果是不会,经过测算,开销看起来很小,几乎微乎其微。Doug Lea应该测算了在线程调度器上的切换造成的额外开销,因此在短时1000纳秒内就让当前线程进入快速自旋状态,如果这时再休眠相反会让nanosTimeout的获取时间变得更加不精确。
    上述过程可以如下图所示:

上述这个图中可以理解为在类似获取状态需要排队的基础上增加了一个超时控制的逻辑。每次超时的时间就是当前超时剩余的时间减去睡眠的时间,而在这个超时时间的基础上进行了判断,如果大于0那么继续睡眠(等待),可以看出这个超时版本的获取状态只是一个近似超时的获取状态,因此任何含有超时的调用基本结果就是近似于给定超时。

共享模式

acquireShared

public final void acquireShared(int arg)
调用该方法能够以共享模式获取状态,共享模式和之前的独占模式有所区别。以文件的查看为例,如果一个程序在对其进行读取操作,那么这一时刻,对这个文件的写操作就被阻塞,相反,这一时刻另一个程序对其进行同样的读操作是可以进行的。如果一个程序在对其进行写操作,那么所有的读与写操作在这一时刻就被阻塞,直到这个程序完成写操作。
以读写场景为例,描述共享和独占的访问模式,如下图所示:

上图中,红色代表被阻塞,绿色代表可以通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

上述逻辑主要包括:

  1. 尝试获取共享状态;
    调用tryAcquireShared来获取共享状态,该方法是非阻塞的,如果获取成功则立刻返回,也就表示获取共享锁成功。
  2. 获取失败进入sync队列;
    在获取共享状态失败后,当前时刻有可能是独占锁被其他线程所把持,那么将当前线程构造成为节点(共享模式)加入到sync队列中。
  3. 循环内判断退出队列条件;
    如果当前节点的前驱节点是头结点并且获取共享状态成功,这里和独占锁acquire的退出队列条件类似。
  4. 获取共享状态成功;
    在退出队列的条件上,和独占锁之间的主要区别在于获取共享状态成功之后的行为,而如果共享状态获取成功之后会判断后继节点是否是共享模式,如果是共享模式,那么就直接对其进行唤醒操作,也就是同时激发多个线程并发的运行。
  5. 获取共享状态失败。
    通过使用LockSupport将当前线程从线程调度器上摘下,进入休眠状态。
    对于上述逻辑中,节点之间的通知过程如下图所示:

上图中,绿色表示共享节点,它们之间的通知和唤醒操作是在前驱节点获取状态时就进行的,红色表示独占节点,它的被唤醒必须取决于前驱节点的释放,也就是release操作,可以看出来图中的独占节点如果要运行,必须等待前面的共享节点均释放了状态才可以。而独占节点如果获取了状态,那么后续的独占式获取和共享式获取均被阻塞。

releaseShared

public final boolean releaseShared(int arg)
调用该方法释放共享状态,每次获取共享状态acquireShared都会操作状态,同样在共享锁释放的时候,也需要将状态释放。比如说,一个限定一定数量访问的同步工具,每次获取都是共享的,但是如果超过了一定的数量,将会阻塞后续的获取操作,只有当之前获取的消费者将状态释放才可以使阻塞的获取操作得以运行。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

上述逻辑主要就是调用同步器的tryReleaseShared方法来释放状态,并同时在doReleaseShared方法中唤醒其后继节点。

一个例子 TwinsLock

在上述对同步器AbstractQueuedSynchronizer进行了实现层面的分析之后,我们通过一个例子来加深对同步器的理解:
设计一个同步工具,该工具在同一时刻,只能有两个线程能够并行访问,超过限制的其他线程进入阻塞状态。
对于这个需求,可以利用同步器完成一个这样的设定,定义一个初始状态,为2,一个线程进行获取那么减1,一个线程释放那么加1,状态正确的范围在[0,1,2]三个之间,当在0时,代表再有新的线程对资源进行获取时只能进入阻塞状态(注意在任何时候进行状态变更的时候均需要以CAS作为原子性保障)。由于资源的数量多于1个,同时可以有两个线程占有资源,因此需要实现tryAcquireShared和tryReleaseShared方法,这里谢谢luoyuyou和同事小明指正,已经修改了实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7889272986162341211L;

Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}

public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}

public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}

public void lock() {
sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}

public void unlock() {
sync.releaseShared(1);
}

@Override
public Condition newCondition() {
return null;
}
}

上述测试用例的逻辑主要包括:

​1. 打印线程
Worker在两次睡眠之间打印自身线程,如果一个时刻只能有两个线程同时访问,那么打印出来的内容将是成对出现。
​2. 分隔线程
不停的打印换行,能让Worker的输出看起来更加直观。
该测试的结果是在一个时刻,仅有两个线程能够获得到锁,并完成打印,而表象就是打印的内容成对出现。

总结

AQS简核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:

  • 线程阻塞队列的维护
  • 线程阻塞和唤醒

共享变量的修改都是通过Unsafe类提供的CAS操作完成的。
AbstractQueuedSynchronizer类的主要方法是acquire和release,典型的模板方法, 下面这4个方法由子类去实现:

1
2
3
4
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)

acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lock和unlock方法。

refer

http://ifeve.com/introduce-abstractqueuedsynchronizer/
http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-AbstractQueuedSynchronizer.html
http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html

动态规划的适用场景

动态规划常常适用于有重叠子问题和最优子结构性质的问题,动态规划方法所耗时间往往远少于朴素解法。

动态规划的基本思想

动态规划背后的基本思想非常简单。大致上,若要解一个给定问题,我们需要解其不同部分(即子问题),再合并子问题的解以得出原问题的解。
通常许多子问题非常相似,为此动态规划法试图仅仅解决每个子问题一次,从而减少计算量:一旦某个给定子问题的解已经算出,则将其记忆化存储,以便下次需要同一个子问题解之时直接查表。这种做法在重复子问题的数目关于输入的规模呈指数增长时特别有用。

重叠子问题

动态规划在查找有很多重叠子问题的情况的最优解时有效。它将问题重新组合成子问题。为了避免多次解决这些子问题,它们的结果都逐渐被计算并被保存,从简单的问题直到整个问题都被解决。因此,动态规划保存递归时的结果,因而不会在解决同样的问题时花费时间。

最优子结构

动态规划只能应用于有最优子结构的问题。最优子结构的意思是局部最优解能决定全局最优解(对有些问题这个要求并不能完全满足,故有时需要引入一定的近似)。简单地说,问题能够分解成子问题来解决。

动态规划的三要素

  • 最优子结构性质。如果问题的最优解所包含的子问题的解也是最优的,我们就称该问题具有最优子结构性质(即满足最优化原理)。最优子结构性质为动态规划算法解决问题提供了重要线索。
  • 无后效性。即子问题的解一旦确定,就不再改变,不受在这之后、包含它的更大的问题的求解决策影响。
  • 子问题重叠性质。子问题重叠性质是指在用递归算法自顶向下对问题进行求解时,每次产生的子问题并不总是新问题,有些子问题会被重复计算多次。动态规划算法正是利用了这种子问题的重叠性质,对每一个子问题只计算一次,然后将其计算结果保存在一个表格中,当再次需要计算已经计算过的子问题时,只是在表格中简单地查看一下结果,从而获得较高的效率。

动态规划算法的设计步骤:

  1. 刻画最优解的结构特征(寻找最优子结构)
  2. 递归地定义最优解的值(确定状态转移方程)
  3. 计算最优解的值(有两种方法:带备忘录自顶向下法、自底向上法)
  4. 利用计算出的信息构造一个最优解(通常是将具体的最优解输出)

一般的解法

把动态规划的解法分为自顶向下和自底向上两种方式。
自顶向下的方式其实就是使用递归来求解子问题,最终解只需要调用递归式,子问题逐步往下层递归的求解。我们可以使用缓存把每次求解出来的子问题缓存起来,下次调用的时候就不必再递归计算了。
自底向上是另一种求解动态规划问题的方法,它不使用递归式,而是直接使用循环来计算所有可能的结果,往上层逐渐累加子问题的解。

LeetCode题

1. House Robber题目,转化过来的意思是,一个数组nums[],求最大的不存在相邻元素的子数组的和。

用动态规划的递归解法,自顶向下。时间复杂度O(nlogn),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//Time Limit Exceeded
public int robRecursiveDP(int[] nums, int length) {
if (length == 0) {
return 0;
}

if (length == 1) {
return nums[0];
}

if (length == 2) {
return Math.max(nums[0], nums[1]);
}

int rob1 = robRecursiveDP(nums, length - 1);
int rob2 = robRecursiveDP(nums, length - 2);

if (rob1 == rob2) {
return rob1 + nums[length - 1];
} else if (rob1 > rob2) {
return Math.max(rob2 + nums[length - 1], rob1);
} else {
System.out.println("data error");
return 0;
}

动态规划的状态转移方程:
dp[i][0] = Math.max(dp[i - 1][0], dp[i - 1][1]);
dp[i][1] = num[i - 1] + dp[i - 1][0];

用动态规划的自底向上解法, 时间复杂度O(n),空间复杂度O(n)

1
2
3
4
5
6
7
8
9
// 300ms
public int robDP(int[] num) {
// dp[i][1] means we rob the current house and dp[i][0] means we don't
int[][] dp = new int[num.length + 1][2];
for (int i = 1; i <= num.length; i++) {
dp[i][0] = Math.max(dp[i - 1][0], dp[i - 1][1]);
dp[i][1] = num[i - 1] + dp[i - 1][0];
}
return Math.max(dp[num.length][0], dp[num.length][1]);

用带备忘录的自底向上动态规划的解法, 时间复杂度O(n),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
// 250ms
public int rob(int[] num) {
int prevNo = 0;
int prevYes = 0;
for (int n : num) {
int temp = prevNo;
prevNo = Math.max(prevNo, prevYes);
prevYes = n + temp;
}
return Math.max(prevNo, prevYes);
}

2. House Robber2题目,转化过来的意思是,一个数组nums[], 首尾看成相邻,求最大的不存在相邻元素的子数组的和。

Actually, extending from the logic that if house i is not robbed, then you are free to choose whether to rob house i + 1, you can break the circle by assuming a house is not robbed.
For example, 1 -> 2 -> 3 -> 1 becomes 2 -> 3 if 1 is not robbed.
Since every house is either robbed or not robbed and at least half of the houses are not robbed, the solution is simply the larger of two cases with consecutive houses, i.e. house i not robbed, break the circle, solve it, or house i + 1 not robbed. Hence, the following solution. I chose i = n and i + 1 = 0 for simpler coding. But, you can choose whichever two consecutive ones.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public int robCycle(int[] nums) {
if (nums.length == 0) {
return 0;
}
if (nums.length == 1) {
return nums[0];
}

if (nums.length == 2) {
return Math.max(nums[0], nums[1]);
}

return Math.max(rob(Arrays.copyOfRange(nums, 0, nums.length - 1)),
rob(Arrays.copyOfRange(nums, 1, nums.length)));
}

3. Maximum Subarray题目,求最大连续子数组和。

不用动态规划的解法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int maxSubArray(int[] nums) {
int sumMax = nums[0], sum = 0;
for (int num : nums) {
sum += num;
if (sum < num) {
sum = num;
}
if (sum >= sumMax) {
sumMax = sum;
}
}

return sumMax;
}

动态规划的状态转移方程:dp[i] = Math.max(dp[i - 1] + nums[i - 1], nums[i - 1]);

用动态规划的自底向上解法, 时间复杂度O(n),空间复杂度O(n)

1
2
3
4
5
6
7
8
9
10
public int maxSubArrayDP(int[] nums) {
int sumMax = nums[0];
int[] dp = new int[nums.length + 1];
for (int i = 1; i <= nums.length; i++) {
dp[i] = Math.max(dp[i - 1] + nums[i - 1], nums[i - 1]);
sumMax = Math.max(sumMax, dp[i]);
}

return sumMax;
}

用带备忘录的自底向上动态规划的解法, 时间复杂度O(n),空间复杂度O(1)

1
2
3
4
5
6
7
8
9
public int maxSubArrayDPWithMem(int[] nums) {
int sumMax = nums[0], sumPre = 0;
for (int i = 1; i <= nums.length; i++) {
sumPre = Math.max(sumPre + nums[i - 1], nums[i - 1]);
sumMax = Math.max(sumMax, sumPre);
}

return sumMax;
}

4. Interleaving String题目,Given s1, s2, s3, find whether s3 is formed by the interleaving of s1 and s2.
For example,
Given:
s1 = “aabcc”,
s2 = “dbbca”,
When s3 = “aadbbcbcac”, return true.
When s3 = “aadbbbaccc”, return false.

动态规划的递归调用解法,时间复杂度不符合要求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean isInterleaveRecursiveDP(String s1, String s2, String s3) {
if (s3.length() != s1.length() + s2.length()) {
return false;
}

if (s1.length() == 0 && s2.length() == 0 && s3.length() == 0) {
return true;
} else if (s3.length() == 0) {
return false;
}

String newS3 = s3.substring(0, s3.length() - 1);
String newS1 = s1.length() > 0 ? s1.substring(0, s1.length() - 1) : "";
String newS2 = s2.length() > 0 ? s2.substring(0, s2.length() - 1) : "";

boolean equalS1 = s1.length() > 0 && (s1.charAt(s1.length() - 1) == s3.charAt(s3.length() -
1));
boolean equalS2 = s2.length() > 0 && s2.charAt(s2.length() - 1) == s3.charAt(s3.length() -
1);

if (equalS1 && !equalS2) {
return isInterleaveRecursiveDP(newS1, s2, newS3);
} else if (!equalS1 && equalS2) {
return isInterleaveRecursiveDP(s1, newS2, newS3);
} else if (equalS1 && equalS2) {
return isInterleaveRecursiveDP(newS1, s2, newS3) || isInterleaveRecursiveDP(s1, newS2, newS3);
} else {
return false;
}
}

动态规划带备忘录自底向上的解法,难点就在于如何将解法1的递归公式转化为动态转移方程,下述代码构造的二维数组很好地诠释了这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean isInterleaveDP(String s1, String s2, String s3) {
if (s3.length() != s1.length() + s2.length())
return false;

boolean[][] table = new boolean[s1.length() + 1][s2.length() + 1];

for (int i = 0; i < s1.length() + 1; i++)
for (int j = 0; j < s2.length() + 1; j++) {
if (i == 0 && j == 0)
table[i][j] = true;
else if (i == 0)
table[i][j] = (table[i][j - 1] && s2.charAt(j - 1) == s3.charAt(i + j - 1));
else if (j == 0)
table[i][j] = (table[i - 1][j] && s1.charAt(i - 1) == s3.charAt(i + j - 1));
else
table[i][j] = (table[i - 1][j] && s1.charAt(i - 1) == s3.charAt(i + j - 1) ||
(table[i][j - 1] && s2.charAt(j - 1) == s3.charAt(i + j - 1)));
}

return table[s1.length()][s2.length()];
}

Java中断的现象

首先,看看Thread类里的几个方法:
public static boolean interrupted 测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外)。
public boolean isInterrupted() 测试线程是否已经中断。线程的中断状态 不受该方法的影响。
public void interrupt() 中断线程。

Thread.interrupt API:
Interrupts this thread. First the checkAccess method of this thread is invoked, which may cause a SecurityException to be thrown.

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread’s interrupt status will be set, and the thread will receive a ClosedByInterruptException.

If this thread is blocked in a Selector then the thread’s interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector’s wakeup method were invoked.

If none of the previous conditions hold then this thread’s interrupt status will be set.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

其实,Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(不一定就是对象的属性,事实上,该状态也确实不是Thread的字段),interrupt方法仅仅只是将该状态置为true
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestInterrupt { 
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
t.interrupt();
System.out.println("已调用线程的interrupt方法");
}
static class MyThread extends Thread {
public void run() {
int num = longTimeRunningNonInterruptMethod(2, 0);
System.out.println("长时间任务运行结束,num=" + num);
System.out.println("线程的中断状态:" + Thread.interrupted());
}
private static int longTimeRunningNonInterruptMethod(int count, int initNum) {
for(int i=0; i<count; i++) {
for(int j=0; j<Integer.MAX_VALUE; j++) {
initNum ++;
}
}
return initNum;
}
}
}

一般情况下,会打印如下内容:
已调用线程的interrupt方法
长时间任务运行结束,num=-2
线程的中断状态:true
可见,interrupt方法并不一定能中断线程。但是,如果改成下面的程序,情况会怎样呢?
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.TimeUnit; 
public class TestInterrupt {
public static void main(String[] args) {
Thread t = new MyThread();
t.start();
t.interrupt();
System.out.println("已调用线程的interrupt方法");
}
static class MyThread extends Thread {
public void run() {
int num = -1;
try {
num = longTimeRunningInterruptMethod(2, 0);
} catch (InterruptedException e) {
System.out.println("线程被中断");
throw new RuntimeException(e);
}
System.out.println("长时间任务运行结束,num=" + num);
System.out.println("线程的中断状态:" + Thread.interrupted());
}
private static int longTimeRunningInterruptMethod(int count, int initNum) throws InterruptedException{
for(int i=0; i<count; i++) {
TimeUnit.SECONDS.sleep(5);
}
return initNum;
}
}
}

经运行可以发现,程序抛出异常停止了,run方法里的后两条打印语句没有执行。那么,区别在哪里?
一般说来,如果一个方法声明抛出InterruptedException,表示该方法是可中断的(没有在方法中处理中断却也声明抛出InterruptedException的除外),也就是说可中断方法会对interrupt调用做出响应(例如sleep响应interrupt的操作包括清除中断状态,抛出InterruptedException),如果interrupt调用是在可中断方法之前调用,可中断方法一定会处理中断,像上面的例子,interrupt方法极可能在run未进入sleep的时候就调用了,但sleep检测到中断,就会处理该中断。如果在可中断方法正在执行中的时候调用interrupt,会怎么样呢?这就要看可中断方法处理中断的时机了,只要可中断方法能检测到中断状态为true,就应该处理中断。让我们为开头的那段代码加上中断处理。
那么自定义的可中断方法该如何处理中断呢?那就是在适合处理中断的地方检测线程中断状态并处理。
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TestInterrupt { 
public static void main(String[] args) throws Exception {
Thread t = new MyThread();
t.start();
// TimeUnit.SECONDS.sleep(1);//如果不能看到处理过程中被中断的情形,可以启用这句再看看效果
t.interrupt();
System.out.println("已调用线程的interrupt方法");
}
static class MyThread extends Thread {
public void run() {
int num;
try {
num = longTimeRunningNonInterruptMethod(2, 0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("长时间任务运行结束,num=" + num);
System.out.println("线程的中断状态:" + Thread.interrupted());
}
private static int longTimeRunningNonInterruptMethod(int count, int initNum) throws InterruptedException {
if(interrupted()) {
throw new InterruptedException("正式处理前线程已经被请求中断");
}
for(int i=0; i<count; i++) {
for(int j=0; j<Integer.MAX_VALUE; j++) {
initNum ++;
}
//假如这就是一个合适的地方
if(interrupted()) {
//回滚数据,清理操作等
throw new InterruptedException("线程正在处理过程中被中断");
}
}
return initNum;
}
}
}

如上面的代码,方法longTimeRunningMethod此时已是一个可中断的方法了。在进入方法的时候判断是否被请求中断,如果是,就不进行相应的处理了;处理过程中,可能也有合适的地方处理中断,例如上面最内层循环结束后。
这段代码中检测中断用了Thread的静态方法interrupted,它将中断状态置为false,并将之前的状态返回,而isInterrupted只是检测中断,并不改变中断状态。一般来说,处理过了中断请求,应该将其状态置为false。但具体还要看实际情形。

Java中断的本质

在历史上,Java试图提供过抢占式限制中断,但问题多多,例如已被废弃的Thread.stop、Thread.suspend和 Thread.resume等。另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率。
如今,Java的线程调度不提供抢占式中断,而采用协作式的中断。其实,协作式的中断,原理很简单,就是轮询某个表示中断的标记,我们在任何普通代码的中都可以实现。 例如下面的代码:
代码如下:

1
2
3
4
5
volatile bool isInterrupted; 
//…
while(!isInterrupted) {
compute();
}

但是,上述的代码问题也很明显。当compute执行时间比较长时,中断无法及时被响应。另一方面,利用轮询检查标志变量的方式,想要中断wait和sleep等线程阻塞操作也束手无策。
如果仍然利用上面的思路,要想让中断及时被响应,必须在虚拟机底层进行线程调度的对标记变量进行检查。是的,JVM中确实是这样做的。下面摘自java.lang.Thread的源代码:
代码如下:

1
2
3
4
5
public static boolean interrupted() { 
return currentThread().isInterrupted(true);
}
//…
private native boolean isInterrupted(boolean ClearInterrupted);

可以发现,isInterrupted被声明为native方法,取决于JVM底层的实现。
实际上,JVM内部确实为每个线程维护了一个中断标记。但应用程序不能直接访问这个中断变量,必须通过下面几个方法进行操作:
代码如下:

1
2
3
4
5
6
7
8
9
public class Thread { 
//设置中断标记
public void interrupt() { ... }
//获取中断标记的值
public boolean isInterrupted() { ... }
//清除中断标记,并返回上一次中断标记的值
public static boolean interrupted() { ... }
...
}

通常情况下,调用线程的interrupt方法,并不能立即引发中断,只是设置了JVM内部的中断标记。因此,通过检查中断标记,应用程序可以做一些特殊操作,也可以完全忽略中断。

你可能想,如果JVM只提供了这种简陋的中断机制,那和应用程序自己定义中断变量并轮询的方法相比,基本也没有什么优势。

JVM内部中断变量的主要优势,就是对于某些情况,提供了模拟自动“中断陷入”的机制。
在执行涉及线程调度的阻塞调用时(例如wait、sleep和join),如果发生中断,被阻塞线程会“尽可能快的”抛出InterruptedException。因此,我们就可以用下面的代码框架来处理线程阻塞中断:
代码如下:

1
2
3
4
5
6
try { 
//wait、sleep或join
}
catch(InterruptedException e) {
//某些中断处理工作
}

所谓“尽可能快”,我猜测JVM就是在线程调度调度的间隙检查中断变量,速度取决于JVM的实现和硬件的性能。

一些不会抛出 InterruptedException 的线程阻塞操作

然而,对于某些线程阻塞操作,JVM并不会自动抛出InterruptedException异常。例如,某些I/O操作和内部锁操作。对于这类操作,可以用其他方式模拟中断:
1)java.io中的异步socket I/O
读写socket的时候,InputStream和OutputStream的read和write方法会阻塞等待,但不会响应java中断。不过,调用Socket的close方法后,被阻塞线程会抛出SocketException异常。
2)利用Selector实现的异步I/O
如果线程被阻塞于Selector.select(在java.nio.channels中),调用wakeup方法会引起ClosedSelectorException异常。
3)锁获取
如果线程在等待获取一个内部锁,我们将无法中断它。但是,利用Lock类的lockInterruptibly方法,我们可以在等待锁的同时,提供中断能力。

四、两条编程原则
另外,在任务与线程分离的框架中,任务通常并不知道自身会被哪个线程调用,也就不知道调用线程处理中断的策略。所以,在任务设置了线程中断标记后,并不能确保任务会被取消。因此,有以下两条编程原则:
1)除非你知道线程的中断策略,否则不应该中断它。
这条原则告诉我们,不应该直接调用Executer之类框架中线程的interrupt方法,应该利用诸如Future.cancel的方法来取消任务。

2)任务代码不该猜测中断对执行线程的含义。
这条原则告诉我们,一般代码遇在到InterruptedException异常时,不应该将其捕获后“吞掉”,而应该继续向上层代码抛出。
总之,Java中的非抢占式中断机制,要求我们必须改变传统的抢占式中断思路,在理解其本质的基础上,采用相应的原则和模式来编程。

总结

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程。但它提供了中断(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的的工作。—— 『Java并发编程实战』 第7章 取消与关闭 p111

中断是一种协作机制。一个线程不能强制其它线程停止正在执行的操作而去执行其它的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停下来。—— 『Java并发编程实战』 第5章 基础构建模块 p77

总之,中断只是一种协作机制,需要被中断的线程自己处理中断。停止一个线程最佳实践是 中断 + 条件变量。

Reference

http://www.infoq.com/cn/articles/java-interrupt-mechanism
《Java Concurrency in Practice》
《Concurrent Programming in Java Design principles and patterns》
http://docs.oracle.com/javase/1.4.2/docs/guide/misc/threadPrimitiveDeprecation.html
http://ibruce.info/2013/12/19/how-to-stop-a-java-thread/

悲观锁

悲观锁假定其他用户企图访问或者改变你正在访问、更改的对象的概率是很高的,因此在悲观锁的环境中,在你开始改变此对象之前就将该对象锁住,并且直到你提交了所作的更改之后才释放锁。悲观的缺陷是不论是页锁还是行锁,加锁的时间可能会很长,这样可能会长时间的限制其他用户的访问,也就是说悲观锁的并发访问性不好。


乐观锁

乐观锁则认为其他用户企图改变你正在更改的对象的概率是很小的,因此乐观锁直到你准备提交所作的更改时才将对象锁住,当你读取以及改变该对象时并不加锁。可见乐观锁加锁的时间要比悲观锁短,乐观锁可以用较大的锁粒度获得较好的并发访问性能。但是如果第二个用户恰好在第一个用户提交更改之前读取了该对象,那么当他完成了自己的更改进行提交时,数据库就会发现该对象已经变化了,这样,第二个用户不得不重新读取该对象并作出更改。这说明在乐观锁环境中,会增加并发用户读取对象的次数。


例子

以版本控制系统为例,来说说两种最基本的并发性问题。


【丢失更新】

小张想修改源代码里面的a方法,正在她修改的同时,小李打开了这个文件,修改了b方法并且保存了文件,等小张修改完成后,保存文件,小李所做的修改就被覆盖了。


【不一致的读】

小张想要知道包里面一共有多少个类,包分了a,b两个子包。小张打开a包,看到了7个类。突然小张接到老婆打来的电话,在小张接电话的时候,小李往a包中加了2个类,b包中加了3个类(原先b包中是5个类)。

小张接完电话后再打开b包,看到了8个类,很自然得出结论:包中一共有15个类。

很遗憾,15个永远不是正确的答案。在小李修改前,正确答案是12(7+5),修改后是17(9+8)。这两个答案都是正确的,虽然有一个不是当前的。但15不对,因为小张读取的数据是不一致的。

小结:不一致读指你要读取两种数据,这两种数据都是正确的,但是在同一时刻两者并非都正确。


隔离和不可变

在企业应用中,解决并发冲突的两种常用手段是隔离和不可变。

只有当多个活动(进程或者线程)同时访问同一数据时才会引发并发问题。一种很自然的思路就是同一时刻只允许一个活动访问数据。如果小张打开了文件,就不允许其他人打开,或者其他人只能通过只读的方式打开副本,就可以解决这个问题。

隔离能够有效减少发生错误的可能。我们经常见到程序员陷入到并发问题的泥潭里,每一段代码写完都要考虑并发问题,这样太累了。我们可以利用隔离技术创建出隔离区域,当程序进入隔离区域时不用关心并发问题。好的并发性设计就是创造这样的一些隔离区域,并保证代码尽可能的运行在其中。

另一种思路:只有当你需要修改共享的数据时才可能引发并发性问题,所以我们可以将要共享的数据制作为“不可变”的,以避免并发性问题。当然我们不可能将所有的数据都做成不可变的,但如果一些数据是不可变的,对它们进行并发操作时我们就可以放松自己的神经了。


乐观并发控制、悲观并发控制

如果数据是可变的,并且无法隔离呢?这种情况下最常用的两种控制就是乐观并发控制和悲观并发控制。

假设小张和小李想要同时修改同一个文件。如果使用乐观锁,俩人都能打开文件进行修改,如果小张先提交了内容,没有问题,他所做的改变会保存到服务器上。但小李提交时就会遇到麻烦,版本控制服务器会检测出两种修改的冲突,小李的提交会被具体,并由小李决定该如何处理这种情况(对于绝大部分版本控制软件来说,会读取并标识出小张做的改变,然后由小李决定是否合并)。

如果使用的是悲观锁,小张先检出(check out)文件,那么小李就无法再次检出同一文件,直到小张提交了他的改变。

建议你将乐观锁想成一种检测冲突的手段,而悲观锁是一种避免冲突的手段(严格来说,乐观锁其实不能称之为“锁”,但是这个名字已经流传开了,那就继续使用吧)。一些老的版本控制系统,比如VSS 6.0使用的是悲观锁的机制。而现代的版本控制系统一般两种都支持,默认使用乐观锁。

乐观锁可以提高并发访问的效率,但是如果出现了冲突只能向上抛出,然后重来一遍;悲观锁可以避免冲突的发生,但是会降低效率。

选择使用那一种锁取决于访问频率和一旦产生冲突的严重性。如果系统被并发访问的概率很低,或者冲突发生后的后果不太严重(所谓后果应该指被检测到冲突的提交会失败,必须重来一次),可以使用乐观锁,否则使用悲观锁。


实现

我们经常会在访问数据库的时候用到锁,怎么实现乐观锁和悲观锁呢?以Hibernate为例,可以通过为记录添加版本或时间戳字段来实现乐观锁。可以用session.Lock()锁定对象来实现悲观锁(本质上就是执行了SELECT * FROM t FOR UPDATE语句)。


转至:

http://www.cnblogs.com/chenlulouis/archive/2010/08/17/1801358.html