Lock接口
1 2 3 4 5 6 7
| Lock lock = new ReentrantLock(true); lock.lock(); try {
} finally { lock.unlock(); }
|
不把lock.lock()写到try{}里面是因为如果这一步出错了,即锁没获取到,finally会执行锁的释放。
Lock比synchronized的优势在于:
- 尝试非阻塞获取锁
- 能被中断的获取锁
- 超时获取锁
对比一下就知道,synchronized就是阻塞获取锁,不能响应中断获取锁,没有超时限制获取锁。
队列同步器
这是个可以自定义锁的框架,大神把实现封装好,只需要按照模板方式实现对应的方法,就能很方便的实现一个锁。
同步器是面对锁的实现者,锁面对的是使用者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package chapter05;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
public void lock() { sync.acquire(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
private static final long serialVersionUID = 1L; private final Sync sync = new Sync();
static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L;
@Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); setState(0); return true; }
@Override protected boolean isHeldExclusively() { return getState() == 1; }
Condition newCondition() { return new ConditionObject(); } } }
|
这是个简单的定制独占锁,将AbstractQueuedSynchronizer设为static内部类,然后重写一些方法。在Lock方法里把相对应的方法代理给sync,就这么简单。
下面是共享锁Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| package chapter05;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
public class TwinsLock implements Lock {
public void lock() { sync.acquireShared(1); }
public void unlock() { sync.releaseShared(1); }
public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); }
public Condition newCondition() { return sync.newCondition(); }
private final Sync sync = new Sync(2);
static class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { setState(count); }
@Override protected int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
@Override protected boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) { return true; } } }
@Override protected boolean isHeldExclusively() { return false; }
Condition newCondition() { return new ConditionObject(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package chapter05;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;
public class MainTest { final TwinsLock lock = new TwinsLock(); final CountDownLatch gun = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException { new MainTest().run(); }
public void run() throws InterruptedException { for (int i = 0; i < 10; i++) { Thread t = new Thread(new Runner(), "" + i); t.setDaemon(true); t.start(); } gun.countDown(); for (int i = 0; i < 10; i++) { TimeUnit.MILLISECONDS.sleep(500); System.out.println("------------");
} }
class Runner implements Runnable { public void run() { try { gun.await(); } catch (InterruptedException e) { e.printStackTrace(); } lock.lock(); try { TimeUnit.MILLISECONDS.sleep(500); System.out.println(Thread.currentThread().getName()); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
|
重入锁
非公平锁比公平锁更少的上下文切换,所以有更高的吞吐量,但是容易有“饥饿现象”。
读写锁
ReentrantLock是排他锁,而读写锁可以在同一时间允许多个读线程访问。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排他锁更好的并发性和吞吐量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package chapter05;
import java.util.HashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DIYCache { private HashMap<String, String> cache = new HashMap<String, String>(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock rLock = rwLock.readLock(); private final Lock wLock = rwLock.writeLock();
public final String get(String key) { rLock.lock(); try { return cache.get(key); } finally { rLock.unlock(); } }
public final void put(String key, String value) { wLock.lock(); try { cache.put(key, value); } finally { wLock.unlock(); } }
public final void clear() { wLock.lock(); try { cache.clear(); } finally { wLock.unlock(); } } }
|
读写锁的实现分析
这个放到以后源代码分析中。
LockSupport工具
Condition接口
有界队列Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| package chapter05;
import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class BoundedQueue<T> { private PriorityQueue<T> que; private int capacity = 0; private final ReentrantLock lock = new ReentrantLock(); private final Condition empty = lock.newCondition(); private final Condition full = lock.newCondition();
public static void main(String[] args) { final BoundedQueue<Integer> que = new BoundedQueue<Integer>(5); Thread t1 = new Thread(new Runnable() { public void run() { for (int i = 0; i < 10; i++) { que.add(i); System.out.println("add=" + i); try { } catch (Exception e) { e.printStackTrace(); } } } }); Thread t2 = new Thread(new Runnable() { public void run() { while (true) { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } int t = que.peek(); System.out.println("peek=" + t); } } }); t2.setDaemon(true);
t2.start(); t1.start(); }
BoundedQueue(int capacity) { this.capacity = capacity; que = new PriorityQueue<T>(capacity); }
public void add(T t) { lock.lock(); try { if (capacity == que.size()) { System.out.println("----- full -----"); full.await(); } que.add(t); empty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public T peek() { lock.lock(); try { if (0 == que.size()) { System.out.println("---- empty -----"); empty.await(); } T t = que.peek(); que.remove(t); full.signal(); return t; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return null; }
public int getCapacity() { return capacity; } }
|
Condition的实现分析