Java Concurrent(一)::互斥同步

Raven's Blog

Home Page View on GitHub Send Email

Java Concurrent(一)::互斥同步

最近打算好好看看java.util.concurrent里面的源码,正好总结一下常用的并发工具类,一级它们的用法和原理,记录在此。

java.util.concurrent这个Java并发工具包是在Java1.5时被添加进去的。使用这个工具包,当我们在编写多线程的java代码时,能更方便/语义更明确的解决线程安全问题。

java.util.concurrent.locks.Lock

在Java1.5之前,多线程同步控制是靠synchronized关键字来实现同步方法/同步代码块;在1.5之后,我们可以使用java.util.concurrent.locks.Lock接口以及多个实现类来进行线程同步了。

注意: 此系列文章内源码为java version 1.8.0_60


//Lock接口部分代码
public interface Lock{
    /**
     * 加锁
     */
    void lock();
    /**
     * 释放锁
     * 对应于lock()、tryLock()、tryLock(xx)、lockInterruptibly()等操作
     */
    void unlock();

    /**
     * 类似lock
     * 但是会优先响应中断
     * 即被其他线程中断的线程会获得锁,处理中断异常
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 非阻塞
     * 仅在调用时锁为空闲状态才获取该锁
     * 如果锁可用,则获取锁,并立即返回值  true
     * 如果锁不可用,则此方法将立即返回值  false
     */
    boolean tryLock();

    /**
     * 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁
     * 如果锁可用,则此方法将立即返回值  true
     * 如果锁不可用,出于线程调度目的,将禁用当前线程,
     * 并且在发生以下三种情况之一前,该线程将一直处于休眠状态:
     *   1. 锁由当前线程获得;或者
     *   2. 其他某个线程中断当前线程,并且支持对锁获取的中断;或者
     *   3. 已超过指定的等待时间
     * 如果获得了锁,则返回值  true 
     * 如果当前线程:
     *   1. 在进入此方法时已经设置了该线程的中断状态;或者
     *   2. 在获取锁时被中断,并且支持对锁获取的中断
     * 则将抛出  InterruptedException ,并会清除当前线程的已中断状态
     * 如果超过了指定的等待时间,则将返回值  false 。如果 time 小于等于 0,该方法将完全不等待
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
}

Lock的常用实现类有ReentrantLock(重入锁)和ReentrantReadWriteLock(重入读写锁),重入锁就像普通的synchronized,每次只能有一个线程获得锁,其他线程阻塞等待锁释放,并且同一把锁,线程可以获取多次;读写锁则可以多个线程获取读锁,只有一个线程获取写锁;


package com.raven.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//重入锁的用法
public class LockTest {
    static final Lock lock = new ReentrantLock();
    static volatile int i = 0; 
    public static void main(String[] args) {
        for(int j=0;j<50;j++){
            new Thread(new Runnable() { 
                @Override
                public void run() {
                    lock.lock();        //加锁
                    System.out.println("value:"+(i++));
                    lock.unlock();      //释放锁
                }
            }).start();
        }
    }
}

正确的使用锁了,可以保证上面多个线程按照正确的顺序输出变量i的值

具体锁的实现原理,这里暂时不分析,准备以后单独写一篇文章详细说明

锁的公平性

在创建锁时,我们可以指定锁的公平性


//默认非公平锁
Lock lock = new ReentrantLock();

//显示指定公平锁
Lock lock = new ReentrantLock(true);
//显示指定非公平
Lock lock = new ReentrantLock(false);

如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的

java.util.concurrent.locks.Condition

锁或者synchronized只是保证避免多个线程修改共享状态时的状态不确定问题,如果需要控制进程的执行顺序,那么就需要用到wait/notify(notifyAll)或者java.util.concurrent.locks.Condition了

不确定的ping-pong顺序


public class Main {
    private final static Object lock = new Object();
    static class MyTask implements Runnable{
        private String msg;
        public MyTask(String msg){
            this.msg = msg;
        }
        private void say(){
            synchronized(lock){
                //System.out.println并不保证线程安全,需要加锁,或者同步代码块
                //但是也不能保证ping->pong->ping->pong...这样正确的顺序
                System.out.println(this.msg);
            }
        }
        @Override
        public void run() {
            for(int i=0;i<3;i++){
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                this.say();
            }
        }
    }
    public static void main(String [] args){
        final Object lock = new Object();
        Thread t1=new Thread(new MyTask("ping"));
        Thread t2=new Thread(new MyTask("pong"));
        t1.start();
        t2.start();
    }
}

使用wait/notify保证一致的顺序(两个线程交替输出ping pong)


public class Main {
    /**
     * 用一个状态表示当前应该运行的线程(从ping开始)
     */
    private static String flag = "ping";

    static class MyTask implements Runnable {
        //其他略,只改进了say方法
        private void say(){
            synchronized(lock){
                try {
                    //如果当前线程不该运行
                    //那么wait,阻塞线程,让出锁
                    if(!flag.equals(this.msg)){
                        lock.wait();
                    } 
                    System.out.println(this.msg);
                    //修改状态
                    if("ping".equals(flag)){
                        flag = "pong";
                    } else {
                        flag = "ping";
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    //通知其他处于wait态的线程
                    //notify会随机激活一个处于wati的线程
                    //lock.notify();
                    //notifyAll会全部激活wait线程
                    //线程自己从新竞争锁
                    lock.notifyAll();
                }
            }
        }
    }
}

上面使用wait/notify来保证ping-pong顺序,下面我们使用java.util.concurrent.locks.Condition来实现同样的功能


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class Main{
    /**
     * 用一个状态表示当前应该运行的线程(从ping开始)
     */
    private static String flag = "ping";

    private final static Lock lock = new ReentrantLock();
    private final static Condition condition = lock.newCondition();

    static class MyTask implements Runnable {
        //其他略,只改进了say方法
        private void say() {
            //用lock代替synchronized
            lock.lock();
            try {
                if (!flag.equals(this.msg)) {
                    //用condition.await()代替wait
                    condition.await();
                }
                System.out.println(this.msg);
                if ("ping".equals(flag)) {
                    flag = "pong";
                } else {
                    flag = "ping";
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                //用condition.signal(All) 代替 notify(All)
                //condition.signal();
                condition.signalAll();
            }
            lock.unlock();
        }
    }
}

使用Condition我们可以自己实现一个有限阻塞队列(常用于生产者/消费者模式):


package com.raven.lock;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Blocking Queue
 * 
 * @author Raven
 *
 */
public class BQueue {
    private static final BQueue queue = new BQueue<>(10);
    private Lock lock = null;
    private Condition full = null;
    private Condition empty = null;
    private Queue q;
    private int cap;

    public BQueue(int cap) {
        this.cap = cap;
        this.q = new ArrayDeque<>(cap);
        this.lock = new ReentrantLock();
        this.full = this.lock.newCondition();
        this.empty = this.lock.newCondition();
    }

    public void add(T v) {
        lock.lock();
        try {
            while(this.q.size() == this.cap) {
                // full
                this.full.await();
            }
            this.q.add(v);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            this.empty.signal();
        }
        lock.unlock();
    }

    public T remove() {
        T ret = null;
        lock.lock();
        try{
            while(this.q.isEmpty()){
                //empty
                this.empty.await();
            }
            ret = this.q.remove();
        }catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            this.full.signal();
        }
        lock.unlock();
        return ret;
    }


    public static void main(String [] args) throws InterruptedException{
        //消费者
        Thread producter = new Thread(new Runnable() {
            private Random rand =new Random();
            @Override
            public void run() {
                System.out.println("producter running...");
                while(true){
                    //生产
                    int prd = rand.nextInt(1000);
                    System.out.println("set:"+prd);
                    queue.add(prd);
                    prd = rand.nextInt(1000);
                    System.out.println("set:"+prd);
                    queue.add(prd);
                }

            }
        });
        //消费者
        Thread customer = new Thread(new Runnable() {

            @Override
            public void run() {
                System.out.println("customer running...");
                while(true){
                    //消费
                    int ret = queue.remove();
                    System.out.println("get:"+ret);
                }

            }
        });

        producter.start();
        customer.start();

        producter.join();
        customer.join();
    }
}


Java原子操作和Atomic系列类

原子操作指的是在一步之内就完成而且不能被中断。原子操作在多线程环境中是线程安全的,无需考虑同步的问题。在java中,下列操作是原子操作:

  1. all assignments of primitive types except for long and double(除了long和double以外的所有基本类型赋值操作,double和long的赋值不是原子的)
  2. all assignments of references(所有类实例的引用赋值操作)
  3. all operations of java.concurrent.Atomic* classes(所有java.concurrent.atomic.Atomic*类的操作)
  4. all assignments to volatile longs and doubles(所有volatile的long和double型变量赋值操作)

Atomic是Java提供的一系列用来简化同步处理的原子类,比如在Java1.5之前,我们要实现一个线程安全的计数器类,大概是这样的


public class Counter {
    //volatile只保证变量的更新对线程立即可见,但是不保证操作的原子性,如++
    //要保证原子性,还需要自己控制同步
    private volatile int count;

    public Counter(){
        this.count = 0;
    }

    public Counter(int initialValue){
        this.count = initialValue;
    }

    public int get(){
        return this.count;
    }

    /**
     * 递增,并且返回递增的值
     */
    public synchronized int increment(){
        this.count++;
        return this.count;
    }
}

如果使用java.util.concurrent.atomic.AtomicInteger类实现同样的逻辑,则会简单很多:


import java.util.concurrent.atomic.AtomicInteger;
public class Counter{
    private AtomicInteger count;

    public Counter(){
        this.count = new AtomicInteger(0);
    }

    public Counter(int initialValue){
        this.count = new AtomicInteger(initialValue);
    }

    public int get(){
        return this.count.get();
    }

    /**
     * 递增,并且返回递增的值
     */
    public int increment(){
        //AtomicInteger类给我们提供了线程的方法
        return this.count.incrementAndGet();
    }
}

Atomic系列类中最重要的方法是boolean compareAndSet(except, update)方法,也就是java中实现Lock-Free的是CAS(比较与交换,Compare and Swap)算法;

我们先看下CAS操作。 CAS(Compare and Swap) 比较并交换操作是一个三元操作: 目标地址的值T(arget),期望值E(xpected),实际值R(eal),

  1. 只有当目标值T == 期望值E时,才会把目标值T设置为实际值R,否则不改变目标值
  2. 不管目标值是否改变,都返回之前的目标值T CAS类似如下操作

public class CAS{
    private int value;
    public synchronized int get(){
        return value;
    }
    public synchronized int compareAndSwap(int expected, int real){
        int oldValue = this.value;
        if(this.value == expected){
            this.value = real;
        }
        return oldValue;
    }

    public synchronized boolean compareAndSet(int expected, int real){
        return (expected == compareAndSet(expected, real));
    }
}

CAS只比较期望值和目标值是否相当,相当就设置新值。那么ABA问题就来了:

  1. 由于CAS只是值比较,比如目标是A, 期望值也是A, 那么CAS操作会成功。但是这时候目标A可能不是原来的那个A了,它可能是A变成了B,再变成了A。所以叫ABA问题,很形象。ABA问题可能会使程序出错,比如限时有界队列锁中的节点有几个状态,虽然引用值是A,但是可能对象的状态已经变了,这时候的A实际已经不是原来的A了
  2. 需要注意的是ABA问题不是说CAS操作的过程中A变成了ABA,CAS操作是原子操作,不会被打断

解决ABA问题方法就是给状态设置时间戳,这是并发中加乐观锁的常见做法,如果状态的时间戳发生了改变,证明已经不是原来的对象了,所以操作失败

使用AtomicBoolean,我们可以基于CAS算法实现一个不可重入的乐观锁:


package com.raven.lock.impl;

import java.util.concurrent.atomic.AtomicBoolean;

import com.raven.lock.Lock;

public class SimpleLock implements Lock {
    //冲突次数
    private AtomicBoolean status = new AtomicBoolean(false);
    @Override
    public void lock() {
        //自旋
        while(!status.compareAndSet(false, true)){
            //当compareAndSet返回false时,线程会进入死循环(自旋)达到竞争锁的目的
            //compareAndSet返回false,说明已经有其他线程修改了status的值,不再是false了
            //获得锁成功的线程进行了compareAndSet操作,更新了status的值为true
            //乐观锁不会真正的阻塞线程,而是让线程进入死循环(自旋状态)
        }
    }

    @Override
    public void unlock() {
        //释放锁只需要将status改为false
        //其他在自旋状态的线程会退出死循环
        //从而获得锁
        status.set(false);
    }
}

SimpleLock是不可重入的


import com.raven.lock.impl.SimpleLock;

public class LockTest {
    static final Lock lock = new SimpleLock();
    static volatile int i = 0; 
    public static void main(String[] args) {
        for(int j=0;j<50;j++){
            new Thread(new Runnable() {

                @Override
                public void run() {
                    lock.lock();
                    //注意SimpleLock是不可重入的锁
                    //lock.lock(); //这里如果再获取锁,会死锁
                    System.out.println("value:"+(i++));
                    //lock.unlock();
                    lock.unlock();
                }
            }).start();
        }
    }
}

要改成可重入乐观锁,也不是很难:


public class SimpleLock implements Lock {
    //记录当前线程
    private Thread currentThread = null;
    //锁的引用计数
    private int reentrantNum = 0;
    //冲突次数
    private AtomicBoolean status = new AtomicBoolean(false);
    @Override
    public void lock() {
        if(this.currentThread == Thread.currentThread()){
                this.reentrantNum++;
                //同一个线程重入了锁,直接返回
                return;
        }
        //自旋
        while(!status.compareAndSet(false, true)){}
        //
        this.currentThread = Thread.currentThread();
    }

    @Override
    public void unlock() {
        if(this.reentrantNum>0){
            //重入过,释放的时候引用计数减一
            this.reentrantNum--;
            return;
        }
        if(this.reentrantNum==0){
            //引用计数为0
            this.currentThread = null;
            //释放锁
            status.set(false);
        }
    }
}


总结:


参考资料:


相关文章:

联系作者:aducode@126.com
更多精彩文章请点击:http://aducode.github.io