Skip to content

Latest commit

 

History

History
599 lines (481 loc) · 15.1 KB

abstract-queued-synchronizer.md

File metadata and controls

599 lines (481 loc) · 15.1 KB

Compare And Set/Swap

  • 乐观锁
  • 无锁优化
  • 自旋
  • 用的unsafe类
  • CPU保证CAS的过程中不能被打断

判断要改的值是否和预期的值一样,如果一样,就把新的值替换过去;否则就循环。

cas(V, Expected, NewValue)
	if V==Expected
		V=NewValue
	else
		try again

ABA问题

加版本号


AtomicXXX

原子类

public class TestAtomicInteger {
	AtomicInteger count = new AtomicInteger(0); 

	void m() { 
		for (int i = 0; i < 10000; i++)
			count.incrementAndGet(); //count1++
	}

	public static void main(String[] args) {
		TestAtomicInteger t = new TestAtomicInteger();

		List<Thread> threads = new ArrayList<Thread>();

		for (int i = 0; i < 10; i++) {
			threads.add(new Thread(t::m, "thread-" + i));
		}

		threads.forEach((o) -> o.start());

		threads.forEach((o) -> {
			try {
				o.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});
		System.out.println(t.count);
	}
}

LongAdder

  • 类似分段锁概念
  • 线程数多的时候比较有优势
LongAdder count = new LongAdder();
count.increment(); // count++

ReentrantLock

  • 可重入锁
  • 需要手动释放锁(写在try-finally)
  • 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
  • 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
  • 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,在一个线程等待锁的过程中,可以被打断
  • ReentrantLock还可以指定为公平锁,默认为非公平锁
  • 底层是AQS,其实也就是CAS

CountDownLatch

倒数门栓

public class TestCountDownLatch {
    public static void main(String[] args) {
        usingJoin();
        usingCountDownLatch();
    }

    private static void usingCountDownLatch() {
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int j = 0; j < 10000; j++) {
                    result += j;
                }
                latch.countDown(); // 每次一个线程执行完,latch的值减1
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        try {
            latch.await(); // latch值为0时,程序继续向下执行,否则所有线程都阻塞在此。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end latch");
    }

    private static void usingJoin() {
        Thread[] threads = new Thread[100];

        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                int result = 0;
                for (int j = 0; j < 10000; j++) {
                    result += j;
                }
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("end join");
    }
}

CyclicBarrier

循环栅栏,相较于CountDownLatch可以循环利用

线程到一定数量才继续往下执行

public class TestCyclicBarrier {
    public static void main(String[] args) {
        //CyclicBarrier barrier = new CyclicBarrier(20);
        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人,发车"));
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    barrier.await(); // barrier的值-1,直到barrier的值为0,才往下执行,否则阻塞在此。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Phaser

分阶段的栅栏

CyclicBarrier的升级版

按不同的阶段,对线程进行执行

可以自定义栅栏数和推倒栅栏的线程数

等人到齐进入下一个阶段

public class TestPhaser {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();

    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        phaser.bulkRegister(7);

        for(int i=0; i<5; i++) {
            new Thread(new Person("p" + i)).start();
        }
        new Thread(new Person("新郎")).start();
        new Thread(new Person("新娘")).start();
    }

    static class MarriagePhaser extends Phaser {
        // 栅栏推倒时自动调用
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            switch (phase) {
                case 0: // 第一个阶段
                    System.out.println("所有人到齐了!" + registeredParties);
                    System.out.println();
                    return false;
                case 1: // 第二个阶段
                    System.out.println("所有人吃完了!" + registeredParties);
                    System.out.println();
                    return false;
                case 2: // 第三个阶段
                    System.out.println("所有人离开了!" + registeredParties);
                    System.out.println();
                    return false;
                case 3: // 第四个阶段
                    System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }

    static class Person implements Runnable {
        String name;

        public Person(String name) {
            this.name = name;
        }

        public void arrive() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        private void hug() {
            if(name.equals("新郎") || name.equals("新娘")) {
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 洞房!\n", name);
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
                //phaser.register()
            }
        }

        @Override
        public void run() {
            arrive();
            eat();
            leave();
            hug();
        }
    }
}

ReadWriteLock

读写锁

  • 读锁(共享锁):允许多个线程同时读,不允许写。
  • 写锁(排他锁):只能有一个线程写,其它线程既不能读也不能写。
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();

StampedLock

ReadWriteLock的升级版


Semaphore

  • 信号量
  • 限流
  • 车道和收费站
public class TestSemaphore {
    public static void main(String[] args) {
        // 只允许2个线程同时执行,第二个参数为true表示使用公平锁(使用AQS维护的队列),默认是非公平锁
        Semaphore s = new Semaphore(2, true);

        new Thread(()->{
            try {
                s.acquire(); // 每次执行acquire()方法s的值-1,如果s的值为0,当前线程就不能往下执行了,除非其它线程调用release()方法。

                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release(); // s的值+1
            }
        }).start();

        new Thread(()->{
            try {
                s.acquire();
                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Exchanger

  • 两个线程交换数据
public class TestExchanger {
    public static Exchanger<String> exchanger = new Exchanger<>();

    // 将t1线程的变量s->"T1"与t2线程的变量s->"T2"进行交换
    // 交换之后,t1线程的s为"T2",t2线程的s为"T1"
    public static void main(String[] args) {
        new Thread(() -> {
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t1").start();

        new Thread(() -> {
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
        }, "t2").start();
    }
}

LockSupport

该类不属于AQS下

  • LockSupport.park()方法可以阻塞指定的一个线程
  • LockSupport.unpark(Thread)方法可以让指定的一个线程结束阻塞继续运行
  • unpark方法可以先与park方法执行
public class TestLockSupport {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if (i == 5) {
                    LockSupport.park();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t.start();

        // LockSupport.unpark(t);
        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after 8 senconds!");
        LockSupport.unpark(t);
    }
}

公平锁

新来一个线程会先检查队列中是否有其它线程,如果有,则当前线程直接加入队列;如果没有,则去获得锁。


面试题

线程间通信

/**
 * 曾经的面试题:(淘宝?)
 * 实现一个容器,提供两个方法,add,size
 * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束
 * 
 * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢?
 * 
 * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁
 * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以
 * 
 * 阅读下面的程序,并分析输出结果
 * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出
 * 想想这是为什么?
 * 
 * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行
 * 整个通信过程比较繁琐
 * 
 * 使用Latch(门闩)替代wait notify来进行通知
 * 好处是通信方式简单,同时也可以指定等待时间
 * 使用await和countdown方法替代wait和notify
 * CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行
 * 当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了
 * 这时应该考虑countdownlatch/cyclicbarrier/semaphore
 */

//TODO park unpark
public class LockSupport_WithoutSleep {

	// 添加volatile,使t2能够得到通知
	volatile List lists = new ArrayList();

	public void add(Object o) {
		lists.add(o);
	}

	public int size() {
		return lists.size();
	}

	static Thread t1 = null, t2 = null;

	public static void main(String[] args) {
		LockSupport_WithoutSleep c = new LockSupport_WithoutSleep();

		t1 = new Thread(() -> {
			System.out.println("t1启动");
			for (int i = 0; i < 10; i++) {
				c.add(new Object());
				System.out.println("add " + i);
				if (c.size() == 5) {
					LockSupport.unpark(t2);
					LockSupport.park();
				}
			}
		}, "t1");

		t2 = new Thread(() -> {
			LockSupport.park();
			System.out.println("t2 结束");
			LockSupport.unpark(t1);
		}, "t2");

		t2.start();
		t1.start();
	}
}

生产者消费者

/**
 * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
 * 能够支持2个生产者线程以及10个消费者线程的阻塞调用
 * 
 * 使用wait和notify/notifyAll来实现
 * 
 * 使用Lock和Condition来实现
 * 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
 * 
 */
public class MyContainer<T> {
	final private LinkedList<T> lists = new LinkedList<>();
	final private int MAX = 10; //最多10个元素
	private int count = 0;
	
	private Lock lock = new ReentrantLock();
	private Condition producer = lock.newCondition();
	private Condition consumer = lock.newCondition();
	
	public void put(T t) {
		try {
			lock.lock();
			while(lists.size() == MAX) { //想想为什么用while而不是用if?
				producer.await();
			}
			
			lists.add(t);
			++count;
			consumer.signalAll(); //通知消费者线程进行消费
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
	
	public T get() {
		T t = null;
		try {
			lock.lock();
			while(lists.size() == 0) {
				consumer.await();
			}
			t = lists.removeFirst();
			count --;
			producer.signalAll(); //通知生产者进行生产
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
		return t;
	}
	
	public static void main(String[] args) {
		MyContainer<String> c = new MyContainer<>();
		//启动消费者线程
		for(int i=0; i<10; i++) {
			new Thread(()->{
				for(int j=0; j<5; j++) System.out.println(c.get());
			}, "c" + i).start();
		}
		
		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		//启动生产者线程
		for(int i=0; i<2; i++) {
			new Thread(()->{
				for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
			}, "p" + i).start();
		}
	}
}