本文由Scarb发表于金甲虫的博客,转载请注明出处

Java Concurrency Utilities

[TOC]

多线程数字递增

sync

Atomic

CAS操作

LongAdder

分段锁(CAS操作)

适用于更多线程

基于CAS操作的新类型锁 AQS

ReentrantLock可重入锁 互斥锁/排他锁

  • Lock接口的实现
  • 可重入
  • 公平锁new ReentrantLock(true)

synchronized的区别

  • A synchronized block makes no guarantees about the sequence in which threads waiting to entering it are granted access.
  • You cannot pass any parameters to the entry of a synchronized block. Thus, having a timeout trying to get access to a synchronized block is not possible.
  • The synchronized block must be fully contained within a single method. A Lock can have it’s calls to lock() and unlock() in separate methods.

主要方法

  • lock():申请锁,锁定对象为Lock实例
  • lockInterruptibly():申请锁,可以被interrupt()打断
  • tryLock()
  • tryLock(long timeout, TimeUnit timeUnit)
  • unlock()

The lock() method locks the Lock instance if possible. If the Lock instance is already locked, the thread calling lock() is blocked until the Lock is unlocked.

The lockInterruptibly() method locks the Lock unless the thread calling the method has been interrupted. Additionally, if a thread is blocked waiting to lock the Lock via this method, and it is interrupted, it exits this method calls.

The tryLock() method attempts to lock the Lock instance immediately. It returns true if the locking succeeds, false if Lock is already locked. This method never blocks.

The tryLock(long timeout, TimeUnit timeUnit) works like the tryLock() method, except it waits up the given timeout before giving up trying to lock the Lock.

The unlock() method unlocks the Lock instance. Typically, a Lock implementation will only allow the thread that has locked the Lock to call this method. Other threads calling this method may result in an unchecked exception (RuntimeException).

ReadWriteLock 共享锁

高级接口,允许多个线程读同一个资源,但是每次只允许一个线程写入该资源。

规则如下

  • ReadLock:允许其他读线程申请锁,不允许写线程申请

    • If no threads have locked the ReadWriteLock for writing, and no thread have requested a write lock (but not yet obtained it).
      Thus, multiple threads can lock the lock for reading.
  • WriteLock
    • If no threads are reading or writing.
      Thus, only one thread at a time can lock the lock for writing.

实现

  • ReentrantReadWriteLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static Lock lock = new ReentrantLock();
private static int value;

static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();

public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);

//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());

for(int i = 0; i < 18; i++) new Thread(readR).start();
for(int i = 0; i < 2; i++) new Thread(writeR).start();
}

CountDownLatch^CountDownLatch

A java.util.concurrent.CountDownLatch is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.

A CountDownLatch is initialized with a given count. This count is decremented by calls to the countDown() method. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero.

Below is a simple example. After the Decrementer has called countDown() 3 times on the CountDownLatch, the waiting Waiter is released from the await() call.

1
2
3
4
5
6
7
8
9
CountDownLatch latch = new CountDownLatch(3);

Waiter waiter = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter) .start();
new Thread(decrementer).start();

Thread.sleep(4000);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Waiter implements Runnable{

CountDownLatch latch = null;

public Waiter(CountDownLatch latch) {
this.latch = latch;
}

public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("Waiter Released");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Decrementer implements Runnable {

CountDownLatch latch = null;

public Decrementer(CountDownLatch latch) {
this.latch = latch;
}

public void run() {

try {
Thread.sleep(1000);
this.latch.countDown();

Thread.sleep(1000);
this.latch.countDown();

Thread.sleep(1000);
this.latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

CyclicBarrier^CyclicBarrier

It is a barrier that all threads must wait at, until all threads reach it, before any of the threads can continue.

Two threads waiting for each other at CyclicBarriers.

The threads wait for each other by calling the await() method on the CyclicBarrier. Once N threads are waiting at the CyclicBarrier, all threads are released and can continue running.

Creating a CyclicBarrier

When you create a CyclicBarrier you specify how many threads are to wait at it, before releasing them. Here is how you create a CyclicBarrier:

1
CyclicBarrier barrier = new CyclicBarrier(2);

Waiting at a CyclicBarrier

Here is how a thread waits at a CyclicBarrier:

1
barrier.await();

You can also specify a timeout for the waiting thread. When the timeout has passed the thread is also released, even if not all N threads are waiting at the CyclicBarrier. Here is how you specify a timeout:

1
barrier.await(10, TimeUnit.SECONDS);

The waiting threads waits at the CyclicBarrier until either:

  • The last thread arrives (calls await() )
  • The thread is interrupted by another thread (another thread calls its interrupt() method)
  • Another waiting thread is interrupted
  • Another waiting thread times out while waiting at the CyclicBarrier
  • The CyclicBarrier.reset() method is called by some external thread.

CyclicBarrier Action

The CyclicBarrier supports a barrier action, which is a Runnable that is executed once the last thread arrives. You pass the Runnable barrier action to the CyclicBarrier in its constructor, like this:

1
2
Runnable      barrierAction = ... ;
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);

CyclicBarrier Example

Here is a code example that shows you how to use a CyclicBarrier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Runnable barrier1Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 1 executed ");
}
};
Runnable barrier2Action = new Runnable() {
public void run() {
System.out.println("BarrierAction 2 executed ");
}
};

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

CyclicBarrierRunnable barrierRunnable1 =
new CyclicBarrierRunnable(barrier1, barrier2);

CyclicBarrierRunnable barrierRunnable2 =
new CyclicBarrierRunnable(barrier1, barrier2);

new Thread(barrierRunnable1).start();
new Thread(barrierRunnable2).start();

Here is the CyclicBarrierRunnable class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CyclicBarrierRunnable implements Runnable{

CyclicBarrier barrier1 = null;
CyclicBarrier barrier2 = null;

public CyclicBarrierRunnable(
CyclicBarrier barrier1,
CyclicBarrier barrier2) {

this.barrier1 = barrier1;
this.barrier2 = barrier2;
}

public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 1");
this.barrier1.await();

Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +
" waiting at barrier 2");
this.barrier2.await();

System.out.println(Thread.currentThread().getName() +
" done!");

} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

Here is the console output for an execution of the above code. Note that the sequence in which the threads gets to write to the console may vary from execution to execution. Sometimes Thread-0 prints first, sometimes Thread-1 prints first etc.

1
2
3
4
5
6
7
8
Thread-0 waiting at barrier 1
Thread-1 waiting at barrier 1
BarrierAction 1 executed
Thread-1 waiting at barrier 2
Thread-0 waiting at barrier 2
BarrierAction 2 executed
Thread-0 done!
Thread-1 done!

Phaser

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class T09_TestPhaser2 {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();

static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
phaser.bulkRegister(7);

for (int i = 0; i < 5; i++) {
new Thread(new Person("p" + i)).start();
}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();
}

static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完了!" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人离开了!" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
return true;
default:
return true;
}
}
}

static class Person implements Runnable {
String name;

public Person(String name) {
this.name = name;
}

public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
phaser.arriveAndAwaitAdvance();
}

public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
phaser.arriveAndAwaitAdvance();
}

public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
phaser.arriveAndAwaitAdvance();
}

private void hug() {
if (name.equals("新郎") || name.equals("新娘")) {
milliSleep(r.nextInt(1000));
System.out.printf("%s 洞房!\n", name);
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
//phaser.register()
}
}

@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
p0 到达现场!
p3 到达现场!
新娘 到达现场!
p1 到达现场!
p4 到达现场!
新郎 到达现场!
p2 到达现场!
所有人到齐了!7

p2 吃完!
p3 吃完!
新娘 吃完!
p1 吃完!
p0 吃完!
新郎 吃完!
p4 吃完!
所有人吃完了!7

p2 离开!
p0 离开!
新娘 离开!
p4 离开!
新郎 离开!
p1 离开!
p3 离开!
所有人离开了!7

新郎 洞房!
新娘 洞房!
婚礼结束!新郎新娘抱抱!2

Exchanger

The java.util.concurrent.Exchanger class represents a kind of rendezvous point where two threads can exchange objects. Here is an illustration of this mechanism:

Two threads exchanging objects via an Exchanger.

Exchanging objects is done via one of the two exchange() methods. Here is an example:

1
2
3
4
5
6
7
8
9
10
Exchanger exchanger = new Exchanger();

ExchangerRunnable exchangerRunnable1 =
new ExchangerRunnable(exchanger, "A");

ExchangerRunnable exchangerRunnable2 =
new ExchangerRunnable(exchanger, "B");

new Thread(exchangerRunnable1).start();
new Thread(exchangerRunnable2).start();

Here is the ExchangerRunnable code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ExchangerRunnable implements Runnable{

Exchanger exchanger = null;
Object object = null;

public ExchangerRunnable(Exchanger exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}

public void run() {
try {
Object previous = this.object;

this.object = this.exchanger.exchange(this.object);

System.out.println(
Thread.currentThread().getName() +
" exchanged " + previous + " for " + this.object
);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

This example prints out this:

1
2
Thread-0 exchanged A for B
Thread-1 exchanged B for A

Semaphore

1
Semaphore semaphore = new Semaphore(permits, fair);
  • acquire()
  • release()

The counting semaphore is initialized with a given number of “permits”. For each call to acquire() a permit is taken by the calling thread. For each call to release() a permit is returned to the semaphore. Thus, at most N threads can pass the acquire() method without any release() calls, where N is the number of permits the semaphore was initialized with. The permits are just a simple counter. Nothing fancy here.

Semaphore作用

  1. To guard a critical section against entry by more than N threads at a time. (限流,最多允许N个线程同时运行一段代码)
  2. To send signals between two threads. (线程间信号传递)

Sending Signals Between Threads

If you use a semaphore to send signals between threads, then you would typically have one thread call the acquire() method, and the other thread to call the release() method.

If no permits are available, the acquire() call will block until a permit is released by another thread. Similarly, a release() calls is blocked if no more permits can be released into this semaphore.

Thus it is possible to coordinate threads. For instance, if acquire was called after Thread 1 had inserted an object in a shared list, and Thread 2 had called release() just before taking an object from that list, you had essentially created a blocking queue. The number of permits available in the semaphore would correspond to the maximum number of elements the blocking queue could hold.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyQueue<E> {
Semaphore s = new Semaphore(0, true);
private Queue<E> queue = new ConcurrentLinkedQueue<>();

public boolean isEmpty() {
return this.queue.isEmpty();
}

public void enqueue(E e) {
queue.add(e);
s.release();
}

public E dequeue() {
E e;
try {
s.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
e = queue.remove();
return e;
}
}

Fairness

No guarantees are made about fairness of the threads acquiring permits from the Semaphore. That is, there is no guarantee that the first thread to call acquire() is also the first thread to obtain a permit. If the first thread is blocked waiting for a permit, then a second thread checking for a permit just as a permit is released, may actually obtain the permit ahead of thread 1.

If you want to enforce fairness, the Semaphore class has a constructor that takes a boolean telling if the semaphore should enforce fairness. Enforcing fairness comes at a performance / concurrency penalty, so don’t enable it unless you need it.

Here is how to create a Semaphore in fair mode:

1
Semaphore semaphore = new Semaphore(1, true);

LockSupport[^LockSupport]

介绍

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport中的park()unpark()的作用分别是阻塞线程和解除阻塞线程,而且park()unpark()不会遇到Thread.suspendThread.resume所可能引发的死锁问题。
因为park()unpark()有许可的存在;调用park()的线程和另一个试图将其unpark()的线程之间的竞争将保持活性。

方法列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。
static Object getBlocker(Thread t)
// 为了线程调度,禁用当前线程,除非许可可用。
static void park()
// 为了线程调度,在许可可用之前禁用当前线程。
static void park(Object blocker)
// 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。
static void parkNanos(long nanos)
// 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间。
static void parkNanos(Object blocker, long nanos)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(long deadline)
// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
static void parkUntil(Object blocker, long deadline)
// 如果给定线程的许可尚不可用,则使其可用。
static void unpark(Thread thread)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// use synchronized
public class WaitTest1 {
public static void main(String[] args) {
ThreadA ta = new ThreadA("ta");
synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
try {
System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();
System.out.println(Thread.currentThread().getName()+" block");
// 主线程等待
ta.wait();
System.out.println(Thread.currentThread().getName()+" continue");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
System.out.println(Thread.currentThread().getName()+" wakup others");
notify(); // 唤醒“当前对象上的等待线程”
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// use LockSupport
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest1 {
private static Thread mainThread;
public static void main(String[] args) {

ThreadA ta = new ThreadA("ta");
// 获取主线程
mainThread = Thread.currentThread();

System.out.println(Thread.currentThread().getName()+" start ta");
ta.start();

System.out.println(Thread.currentThread().getName()+" block");
// 主线程阻塞
LockSupport.park(mainThread);

System.out.println(Thread.currentThread().getName()+" continue");
}

static class ThreadA extends Thread{
public ThreadA(String name) {
super(name);
}
public void run() {
System.out.println(Thread.currentThread().getName()+" wakup others");
// 唤醒“主线程”
LockSupport.unpark(mainThread);
}
}
}

[^LockSupport]: Java多线程系列–“JUC锁”07之 LockSupport