Java多线程及高并发基础

发布于 2021-02-26  112 热度


JUC多线程及高并发

常用包

java.util.concurrent 并发包(JUC)

java.util.concurrent.atomic(原子)

java.util.concurrent.locks

相关概念

并发:多线程去访问同一资源

并行:线程之间互不影响

Volatile关键字

作用

  1. 线程可见:保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

    ​ 1.1 线程对变量进行修改之后,要立即写到主内存

    ​ 1.2 线程对变量读取时,要从主内存中读,而不是缓存。

  2. 禁止重排序:禁止字节码指令重排序。

  3. 不保证原子性:只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。

底层原理

通过内存屏障来防止指令重排序的

硬件层面的内存屏障分为Load Barrier 和 Store Barrier即读屏障和写屏障

对于Load Barrier来说,在指令前插入Load Barrier,可以让高速缓存中的数据失效,强制从新从主内存加载数据。对于Store Barrier来说,在指令后插入Store Barrier,能让写入缓存中的最新数据更新写入主内存,让其他线程可见。

volatile防止指令重排序具体步骤:

在每个volatile写操作的前面插入一个StoreStore屏障。

在每个volatile写操作的后面插入一个StoreLoad屏障。

在每个volatile读操作的后面插入一个LoadLoad屏障。

在每个volatile读操作的后面插入一个LoadStore屏障。

使用场景

  1. 标识状态
  2. 单例模式DCL(Double Check Lock)双重检测锁,防止使用到半初始化状态的单例。
  3. 保证可见性、顺序性。

Atomic与CAS

概念

java.util.concurrent.atomic.AtomicXXX原子类,保证了多线程的原子性。

Compare and swap,比较再交换

原子类

AtomicInteger:原子整型

AtomicIntegerArray:原子整型数组

AtomicReference\:原子引用

AtomicStampedReference\:原子版本引用(通过版本号控制,解决ABA问题)

其余还有:

AtomicBoolean、AtomicLong、AtomicLongArray、AtomicReferenceArray\等。

原理

对于int++而言,对应着原子类atomicInteger.getAndIncrement();

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;//value的地址&value

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));//通过底层C++获取&value
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    public final int getAndIncrement() {//i++
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

public final class Unsafe {
    public native int getIntVolatile(Object var1, long var2);//获取var1.*var2
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);//比较var1.*var2是否与var4相等,若相等更新var1.*var2的值为var5并返回true,否则返回false

    /**
    * 该方法相当于i+=k
    * var1:当前对象
    * var2:操作数i的地址
    * var4:自增数k
    * 总体就是var1.*var2+=var4
    */
    public final int getAndAddInt(Object var1, long var2, int var4) {//*var2+=var4
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);//var5 = var1.*var2
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        //比较var5=?=var1.*var2,若不等,重新从内存读,若等,就更新内存中的值位var5+var4。

        return var5;//返回var5原值
    }
}

原子类Unsafe类的直接获取value地址的值,比较并更新value地址的值,Unsafe底层通过C++原语(不可拆分)来进行更新操作,保证了原子性。

Collection线程不安全

ArrayList、HashMap、HashSet线程不安全,多线程同时操作会出现结果错误甚至ConcurrentModificationException(同步更新异常)。

如何保证线程安全?

List

  1. 采用List list = new Vector()。Vector是线程安全的,所有方法都加锁,并发效率低。

  2. 使用List list = Collections.synchronizedList(new ArrayList<>()),给ArrayList外面包一层锁。

  3. 使用List list = new CopyOnWriteArrayList<>()。

    CopyOnWriteArrayList默认容量大小为0,当更新值时会加写锁并重新拷贝一个新的数组,读值不加锁。

Set

HashSet底层原理是HashMap:

public class HashSet<E>
    extends AbstractSet<E>
    implements Set<E>, Cloneable, java.io.Serializable
{
    private transient HashMap<E,Object> map;

    // Dummy value to associate with an Object in the backing Map
    private static final Object PRESENT = new Object();

    /**
     * Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
     * default initial capacity (16) and load factor (0.75).
     */
    public HashSet() {
        map = new HashMap<>();
    }
    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }
}
  1. 采用Set set = Collections.synchronizedSet(new HashSet<>()),给HashSet外面包一层锁。
  2. 采用Set set = new CopyOnWriteArraySet<>(),CopyOnWriteArraySet底层是CopyOnWriteArrayList。添加时如何进行元素去重呢?通过调用CopyOnWriteArrayList.addIfAbsent(e),如果元素已存在就不再添加新的进去。

Map

  1. 使用Map map = new ConcurrentHashMap(),ConcurrentHashMap加了写锁。
  2. 使用Map map = Collections.synchronizedMap(new HashMap<>()),给ArrayList外面包一层锁。

公平锁与非公平锁

公平锁:先到先得,公平排队

非公平锁:允许某些线程后到先得,会出现饥饿现象

可重入锁(递归锁)

可重入锁(ReentrantLock,又名递归锁):同一线程外层函数获得锁后,内层递归函数能直接获取该锁。

ReentrantLock/synchronized就是一个典型的可重入锁。

public synchronized void outer(){
    inner();
}
public synchronized void inner(){
}
public static void main(String args[]){
    outer();
}

main--锁1-->outer--锁2-->inner

主线程已经获取到锁1后,在调用inner时便可以直接获取锁2。

可重入锁得最大作用就是避免死锁

自旋锁

是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁。

好处:减少线程上下文切换的消耗。

缺点:循环消耗CPU(犹如死循环)。

手写自旋锁:通过原子引用的CAS实现。

public class Test {
    private AtomicReference<Thread> atomicThread = new AtomicReference<>();
    public void lock(){
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName()+"进来了!");
        while (!atomicThread.compareAndSet(null,thread)) {//看看电脑那是否有人,如果没人,自己就过去上网。
            //thread等空机子...
            //等不到就一直循环等,在这里自旋。因此称为自旋锁。
        }
        System.out.println(thread.getName()+"正在上网!");
    }
    public void unlock(){
        Thread thread = Thread.currentThread();
        atomicThread.compareAndSet(thread,null);
        System.out.println(thread.getName()+"走了!");
    }
    public static void main(String[] args) {
        Test test = new Test();
        new Thread(() -> {
            test.lock();
            Thread.sleep(2000);
            System.out.println("2小时后");
            test.unlock();
        },"张三").start();

        new Thread(() -> {
            test.lock();
            Thread.sleep(2000);
            System.out.println("3小时后");
            test.unlock();
        },"李四").start();
    }
}

结果:

张三进来了!
张三正在上网!
李四进来了!
2小时后
张三走了!
李四正在上网!
3小时后
李四走了!

独占锁与共享锁

独占锁:该锁一次只能被一个线程所持有。ReentrantLock和synchronized都是独占锁。

共享锁:该锁可被多个线程所持有。

ReentrantReadWriteLock其读锁是共享锁,写锁是独占锁。

读-读能共存

读-写不能共存

写-写不能共存

读写锁演示:

public class ReadWriteLockTest {
    private Map<String,String> map = new HashMap<>();
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    public void write(String key,String value) {
        lock.writeLock().lock();//写锁
        System.out.println("开始写入:"+key+"...");
        Thread.sleep(1000);
        map.put(key,value);
        System.out.println("写入完成!"+key+":"+value);
        lock.writeLock().unlock();
    }
    public void read(String key) {
        lock.readLock().lock();
        System.out.println("开始读取!"+key+"...");
        Thread.sleep(500);
        System.out.println("读取完成!"+key+":"+map.get(key));
        lock.readLock().unlock();
    }

    public static void main(String[] args) {
        ReadWriteLockTest test = new ReadWriteLockTest();
        for(int i=0;i<5;i++) {
            String name = i+"";
            new Thread(() -> {
                test.read(name);
            }, name+"_read").start();
            new Thread(() -> {
                test.write(name +"", name);
            }, name+"_write").start();
        }
    }
}

执行结果:

开始读取:0...
开始读取:1...
读取完成!0:null
读取完成!1:null
开始写入:1...
写入完成!1:1
开始写入:0...
写入完成!0:0
开始写入:2...
写入完成!2:2
开始读取:2...
开始读取:3...
读取完成!3:null
读取完成!2:2
开始写入:3...
写入完成!3:3
开始读取:4...
读取完成!4:null
开始写入:4...
写入完成!4:4

可以看到,多个读取线程可以交替执行,而一个写的线程不可以被其他线程打断,说明读-读之间互不影响,写读、写写相互之间影响。

CountDownLatch

CountDownLatch给定一个初始的数,每执行一次countDown(),计数就减1,当数减少到0时,闭锁才会打开。

代码演示:

public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        int count = 5;
        CountDownLatch latch = new CountDownLatch(count);
        for(int i=0;i<count;i++){
            int finalI = i;
            new Thread(() -> {
                System.out.println(finalI +"号同学离开了教室");
                latch.countDown();//闭锁减1
            }).start();
        }
        latch.await();//闭锁不为0,就等待,直到减到0。
        System.out.println("班长把教室门锁了。");
    }
}

输出结果:

1号同学离开了教室
3号同学离开了教室
2号同学离开了教室
0号同学离开了教室
4号同学离开了教室
班长把教室门锁了。

CyclicBarrier

CountDownLatch给定一个初始的数,每执行一次await,计数就加1,当数从0增加到初始的数时,就会触发回调事件。

代码演示:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        int count = 5;
        CyclicBarrier barrier = new CyclicBarrier(count, () -> {
            System.out.println(count + "人到齐,开始开会...");
        });
        for(int i=0;i<count;i++){
            int finalI = i;
            new Thread(()->{
                System.out.println(finalI +"到了,在会议室等着...");
                barrier.await();
            }).start();
        }
    }
}

执行结果:

1到了,在会议室等着...
4到了,在会议室等着...
3到了,在会议室等着...
0到了,在会议室等着...
2到了,在会议室等着...
5人到齐,开始开会...

Semaphore

信号量(Semaphore),给定初始资源量,每次占用资源(acquire)可用资源量-1,每次释放资源(release)可用资源量+1,当可用资源类为0,新的获取资源请求将被阻塞,直到有新的可用资源才被唤醒。

代码演示:

public class SemaphoreTest {
    public static void main(String[] args) {
        Random random = new Random();
        Semaphore semaphore = new Semaphore(3);
        System.out.println("一共有"+semaphore.availablePermits()+"个车位。");
        for(int i=0;i<5;i++){
            int finalI = i;
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(finalI + "号车抢到了车位。");
                    Thread.sleep(random.nextInt(2000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                finally {
                    semaphore.release();
                    System.out.println(finalI + "号车离开了车位。");
                }
            }).start();
        }
    }
}

执行结果:

一共有3个车位。
0号车抢到了车位。
1号车抢到了车位。
2号车抢到了车位。
0号车离开了车位。
4号车抢到了车位。
2号车离开了车位。
3号车抢到了车位。
3号车离开了车位。
1号车离开了车位。
4号车离开了车位。

一个生产者消费者模型:

public class ProducerConsumerTest {
    private Semaphore food = new Semaphore(0);//0个包子
    private Semaphore basket = new Semaphore(3);//3个篮子
    public void producer(){
        new Thread(()->{
            long id = Thread.currentThread().getId();
            String name = "生产者"+id;
            System.out.println(name+"开始工作了");
            while(true){
                try {
                    int basketNumber = basket.availablePermits();
                    if(basketNumber==0){
                        basketNumber = 1;
                    }
                    basket.acquire(basketNumber);
                    System.out.println(name+"拿了"+basketNumber+"个篮子");
                    Thread.sleep(10000);//用时间制作包子
                    System.out.println(name+"做了"+basketNumber+"篮包子");
                    food.release(basketNumber);//做一篮包子
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    public void consumer(){
        new Thread(()->{
            long id = Thread.currentThread().getId();
            String name = "消费者"+id;
            System.out.println(name+"来光临了");
            while(true){
                try {
                    food.acquire();//消费者吃一篮包子
                    System.out.println(name+"吃了一篮包子");
                    Thread.sleep(2000);//用时间吃包子
                    System.out.println(name+"还了一个篮子");
                    basket.release();//吃完还篮子
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    public static void main(String[] args) {
        ProducerConsumerTest producerConsumerTest = new ProducerConsumerTest();
        producerConsumerTest.consumer();
        producerConsumerTest.consumer();
        producerConsumerTest.producer();
    }
}

结果:

消费者13来光临了
消费者12来光临了
生产者14开始工作了
生产者14拿了3个篮子
生产者14做了3篮包子
消费者13吃了一篮包子
消费者12吃了一篮包子
消费者13还了一个篮子
生产者14拿了1个篮子
消费者13吃了一篮包子
消费者12还了一个篮子
消费者13还了一个篮子
生产者14做了1篮包子
生产者14拿了2个篮子
消费者12吃了一篮包子
消费者12还了一个篮子
生产者14做了2篮包子
生产者14拿了1个篮子
消费者13吃了一篮包子
消费者12吃了一篮包子
消费者12还了一个篮子
消费者13还了一个篮子
生产者14做了1篮包子
生产者14拿了2个篮子
消费者12吃了一篮包子
.....

synchronized与Lock的区别?

区别 synchronized Lock(ReentrantLock)
等待 wait() await()
唤醒 obj.notify():随机唤醒一个线程,obj.notifyAll():唤醒全部线程。不能精确唤醒 condition.signal(),condition.signalAll()。可以创建多个condition,可以精确唤醒需要唤醒的线程
类型 Java关键字,属于JVM层面。对应字节码为monitorenter和monitorexit。 JUC下面的类,是API层面的
使用方法 不需要手动创建/释放 需要手动创建/释放
是否可中断 不可中断,除非抛异常或正常运行完成 可中断
是否公平锁 非公平锁 两者都可以,构造方法可以传入boolean值,true为公平,false为非公平,默认非公平

如何定位死锁?

  1. 通过jps -l查看java线程。
    >jps -l
    13334 com.xxx.DeadLockTest
    
  2. 通过jstack 线程号分析线程栈
    >jstack 13334
    

阻塞队列

概览

BlockingQueue<Queue<Collection

核心方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e),队列满抛异常 offer(e),队列满返回false put(e),队列满阻塞 offer(e,time,unit),队列满等一会,依然满返回false
移除 remove(),队列空抛异常 poll(),队列空返回null take(),队列空阻塞 poll(time,unit),队列空等一会,依然空返回null
检查(首元素) element() peek()

ArrayBlockingQueue

由数组结构组成的有界阻塞队列

LinkedBlockingQueue

由链表结构组成的有界(Integer.MAX_VALUE)阻塞队列

SynchronousQueue

同步队列,不存储元素的阻塞队列,只存储一个单个元素的队列

线程池

优势

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达,任务可以不需等待线程创建就能立即执行。
  3. 提高线程的可管理性。

架构

底层依赖的是ThreadPoolExecutor这个类。它有7大参数。

七大参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}
  1. corePoolSize:核心线程数,最少线程数。
  2. maximumPoolSize:最大线程数,当核心线程数不够用时,会创建新的线程,但线程数不会超过最大线程数。
  3. keepAliveTime:额外线程的存活时间,当任务少,额外线程空闲时间达到该值时,会销毁空闲的额外线程。
  4. unit:额外线程的存活时间单位。
  5. workQueue:任务队列。
  6. threadFactory:线程工厂。用来创建线程的工厂。
  7. handler:拒绝策略。当线程数达到最大,且任务队列排满,又有新的任务到达时,只能采取相应拒绝策略。

拒绝策略

  1. AbortPolicy(默认):中止策略,直接抛出异常中止系统正常运行。
  2. CallerRunsPolicy:调用者运行策略。
  3. DiscardOldestPolicy:抛弃最老策略。抛弃队列中等待最久的任务,然后把当前任务加入队列。
  4. DiscardPolicy:抛弃策略。直接丢弃任务。

默认创建方式

public class ThreadPoolTest {
    public static void main(String[] args) {
        //创建指定线程数的线程池
        ExecutorService es1 = Executors.newFixedThreadPool(10);

        //创建单线程池
        ExecutorService es2 = Executors.newSingleThreadExecutor();

        //创建缓存线程池,需要时才创建线程,线程保留1分钟。
        ExecutorService es3 = Executors.newCachedThreadPool();

        //定时线程池,定时执行任务。
        ExecutorService es4 = Executors.newScheduledThreadPool(10);
    }
}
public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

我一直在开辟我的天空