• 一、线程间通信的两种方式
    • 1.wait()/notify()
    • wait()方法使用注意事项
    • notify()方法使用注意事项
    • wait()/notify()协作的两个注意事项
  • 2.Condition实现等待/通知
  • 二、生产者/消费者模式实现
    • 1.一生产与一消费
    • 2.多生产与多消费

    一、线程间通信的两种方式

    1.wait()/notify()

    Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。

    ①wait()方法: 让当前线程进入等待,并释放锁。

    ②wait(long)方法: 让当前线程进入等待,并释放锁,不过等待时间为long,超过这个时间没有对当前线程进行唤醒,将自动唤醒

    ③notify()方法: 让当前线程通知那些处于等待状态的线程,当前线程执行完毕后释放锁,并从其他线程中唤醒其中一个继续执行。

    ④notifyAll()方法: 让当前线程通知那些处于等待状态的线程,当前线程执行完毕后释放锁,将唤醒所有等待状态的线程。

    wait()方法使用注意事项

    ①当前的线程必须拥有当前对象的monitor,也即lock,就是锁,才能调用wait()方法,否则将抛出异常java.lang.IllegalMonitorStateException。

    ②线程调用wait()方法,释放它对锁的拥有权,然后等待另外的线程来通知它(通知的方式是notify()或者notifyAll()方法),这样它才能重新获得锁的拥有权和恢复执行。

    ③要确保调用wait()方法的时候拥有锁,即,wait()方法的调用必须放在synchronized方法或synchronized块中。

    wait()与sleep()比较

    当线程调用了wait()方法时,它会释放掉对象的锁。

    Thread.sleep(),它会导致线程睡眠指定的毫秒数,但线程在睡眠的过程中是不会释放掉对象的锁的。

    notify()方法使用注意事项

    ①如果多个线程在等待,它们中的一个将会选择被唤醒。这种选择是随意的,和具体实现有关。(线程等待一个对象的锁是由于调用了wait()方法)。

    ②被唤醒的线程是不能被执行的,需要等到当前线程放弃这个对象的锁,当前线程会在方法执行完毕后释放锁。

    wait()/notify()协作的两个注意事项

    ①通知过早

    如果通知过早,则会打乱程序的运行逻辑。

    1. public class MyRun {
    2. private String lock = new String("");
    3. public Runnable runnableA = new Runnable() {
    4. @Override
    5. public void run() {
    6. try {
    7. synchronized (lock) {
    8. System.out.println("begin wait");
    9. lock.wait();
    10. System.out.println("end wait");
    11. }
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }
    16. };
    17. public Runnable runnableB = new Runnable() {
    18. @Override
    19. public void run() {
    20. synchronized (lock) {
    21. System.out.println("begin notify");
    22. lock.notify();
    23. System.out.println("end notify");
    24. }
    25. }
    26. };
    27. }

    两个方法,分别执行wait()/notify()方法。

    1. public static void main(String[] args) throws InterruptedException {
    2. MyRun run = new MyRun();
    3. Thread bThread = new Thread(run.runnableB);
    4. bThread.start();
    5. Thread.sleep(100);
    6. Thread aThread = new Thread(run.runnableA);
    7. aThread.start();
    8. }

    如果notify()方法先执行,将导致wait()方法释放锁进入等待状态后,永远无法被唤醒,影响程序逻辑。应避免这种情况。

    ②等待wait的条件发生变化

    在使用wait/notify模式时,还需要注意另外一种情况,也就是wait等待条件发生了变化,也容易造成程序逻辑的混乱。

    Add类,执行加法操作,然后通知Subtract类

    1. public class Add {
    2. private String lock;
    3. public Add(String lock) {
    4. super();
    5. this.lock = lock;
    6. }
    7. public void add(){
    8. synchronized (lock) {
    9. ValueObject.list.add("anyThing");
    10. lock.notifyAll();
    11. }
    12. }
    13. }

    Subtract类,执行减法操作,执行完后进入等待状态,等待Add类唤醒notify

    1. public class Subtract {
    2. private String lock;
    3. public Subtract(String lock) {
    4. super();
    5. this.lock = lock;
    6. }
    7. public void subtract(){
    8. try {
    9. synchronized (lock) {
    10. if(ValueObject.list.size()==0){
    11. System.out.println("wait begin ThreadName="+Thread.currentThread().getName());
    12. lock.wait();
    13. System.out.println("wait end ThreadName="+Thread.currentThread().getName());
    14. }
    15. ValueObject.list.remove(0);
    16. System.out.println("list size ="+ValueObject.list.size());
    17. }
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. }
    22. }

    线程ThreadAdd

    1. public class ThreadAdd extends Thread{
    2. private Add pAdd;
    3. public ThreadAdd(Add pAdd) {
    4. super();
    5. this.pAdd = pAdd;
    6. }
    7. @Override
    8. public void run() {
    9. pAdd.add();
    10. }
    11. }

    线程ThreadSubtract

    1. public class ThreadSubtract extends Thread{
    2. private Subtract rSubtract;
    3. public ThreadSubtract(Subtract rSubtract) {
    4. super();
    5. this.rSubtract = rSubtract;
    6. }
    7. @Override
    8. public void run() {
    9. rSubtract.subtract();
    10. }
    11. }

    先开启两个ThreadSubtract线程,由于list中没有元素,进入等待状态。再开启一个ThreadAdd线程,向list中增加一个元素,然后唤醒两个ThreadSubtract线程。

    1. public static void main(String[] args) throws InterruptedException {
    2. String lock = new String("");
    3. Add add = new Add(lock);
    4. Subtract subtract = new Subtract(lock);
    5. ThreadSubtract subtractThread1 = new ThreadSubtract(subtract);
    6. subtractThread1.setName("subtractThread1");
    7. subtractThread1.start();
    8. ThreadSubtract subtractThread2 = new ThreadSubtract(subtract);
    9. subtractThread2.setName("subtractThread2");
    10. subtractThread2.start();
    11. Thread.sleep(1000);
    12. ThreadAdd addThread = new ThreadAdd(add);
    13. addThread.setName("addThread");
    14. addThread.start();
    15. }

    输出结果

    wait begin ThreadName=subtractThread1
    wait begin ThreadName=subtractThread2
    wait end ThreadName=subtractThread2
    Exception in thread “subtractThread1” list size =0
    wait end ThreadName=subtractThread1
    java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
    at java.util.ArrayList.rangeCheck(Unknown Source)
    at java.util.ArrayList.remove(Unknown Source)
    at com.lvr.communication.Subtract.subtract(Subtract.java:18)
    at com.lvr.communication.ThreadSubtract.run(ThreadSubtract.java:12)

    当第二个ThreadSubtract线程执行减法操作时,抛出下标越界异常。

    原因分析:一开始两个ThreadSubtract线程等待状态,当ThreadAdd线程添加一个元素并唤醒所有线程后,第一个ThreadSubtract线程接着原来的执行到的地点开始继续执行,删除一个元素并输出集合大小。同样,第二个ThreadSubtract线程也如此,可是此时集合中已经没有元素了,所以抛出异常。

    解决办法:从等待状态被唤醒后,重新判断条件,看看是否扔需要进入等待状态,不需要进入再进行下一步操作。即把if()判断,改成while()。

    1. public void subtract(){
    2. try {
    3. synchronized (lock) {
    4. while(ValueObject.list.size()==0){
    5. System.out.println("wait begin ThreadName="+Thread.currentThread().getName());
    6. lock.wait();
    7. System.out.println("wait end ThreadName="+Thread.currentThread().getName());
    8. }
    9. ValueObject.list.remove(0);
    10. System.out.println("list size ="+ValueObject.list.size());
    11. }
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }

    这是线程间协作中经常出现的一种情况,需要避免。

    2.Condition实现等待/通知

    关键字synchronized与wait()和notify()/notifyAll()方法相结合可以实现等待/通知模式,类似ReentrantLock也可以实现同样的功能,但需要借助于Condition对象。

    关于Condition实现等待/通知就不详细介绍了,可以完全类比wait()/notify(),基本使用和注意事项完全一致。
    就只简单介绍下类比情况:

    condition.await()————>lock.wait()

    condition.await(long time, TimeUnit unit)————>lock.wait(long timeout)

    condition.signal()————>lock.notify()

    condition.signaAll()————>lock.notifyAll()

    特殊之处:synchronized相当于整个ReentrantLock对象只有一个单一的Condition对象情况。而一个ReentrantLock却可以拥有多个Condition对象,来实现通知部分线程。

    具体实现方式:
    假设有两个Condition对象:ConditionA和ConditionB。那么由ConditionA.await()方法进入等待状态的线程,由ConditionA.signalAll()通知唤醒;由ConditionB.await()方法进入等待状态的线程,由ConditionB.signalAll()通知唤醒。篇幅有限,代码示例就不写了。

    二、生产者/消费者模式实现

    1.一生产与一消费

    下面情形是一个生产者,一个消费者的模式。假设场景:一个String对象,其中生产者为其设置值,消费者拿走其中的值,不断的循环往复,实现生产者/消费者的情形。

    wait()/notify()实现

    生产者

    1. public class Product {
    2. private String lock;
    3. public Product(String lock) {
    4. super();
    5. this.lock = lock;
    6. }
    7. public void setValue(){
    8. try {
    9. synchronized (lock) {
    10. if(!StringObject.value.equals("")){
    11. //有值,不生产
    12. lock.wait();
    13. }
    14. String value = System.currentTimeMillis()+""+System.nanoTime();
    15. System.out.println("set的值是:"+value);
    16. StringObject.value = value;
    17. lock.notify();
    18. }
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. }
    23. }

    消费者

    1. public class Consumer {
    2. private String lock;
    3. public Consumer(String lock) {
    4. super();
    5. this.lock = lock;
    6. }
    7. public void getValue(){
    8. try {
    9. synchronized (lock) {
    10. if(StringObject.value.equals("")){
    11. //没值,不进行消费
    12. lock.wait();
    13. }
    14. System.out.println("get的值是:"+StringObject.value);
    15. StringObject.value = "";
    16. lock.notify();
    17. }
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. }
    21. }
    22. }

    生产者线程

    1. public class ThreadProduct extends Thread{
    2. private Product product;
    3. public ThreadProduct(Product product) {
    4. super();
    5. this.product = product;
    6. }
    7. @Override
    8. public void run() {
    9. //死循环,不断的生产
    10. while(true){
    11. product.setValue();
    12. }
    13. }
    14. }

    消费者线程

    1. public class ThreadConsumer extends Thread{
    2. private Consumer consumer;
    3. public ThreadConsumer(Consumer consumer) {
    4. super();
    5. this.consumer = consumer;
    6. }
    7. @Override
    8. public void run() {
    9. //死循环,不断的消费
    10. while(true){
    11. consumer.getValue();
    12. }
    13. }
    14. }

    开启生产者/消费者模式

    1. public class Test {
    2. public static void main(String[] args) throws InterruptedException {
    3. String lock = new String("");
    4. Product product = new Product(lock);
    5. Consumer consumer = new Consumer(lock);
    6. ThreadProduct pThread = new ThreadProduct(product);
    7. ThreadConsumer cThread = new ThreadConsumer(consumer);
    8. pThread.start();
    9. cThread.start();
    10. }
    11. }

    输出结果:

    set的值是:148827033184127168687409691
    get的值是:148827033184127168687409691
    set的值是:148827033184127168687449887
    get的值是:148827033184127168687449887
    set的值是:148827033184127168687475117
    get的值是:148827033184127168687475117

    Condition方式实现类似,篇幅有限不全部贴出来。

    2.多生产与多消费

    特殊情况: 按照上述一生产与一消费的情况,通过创建多个生产者和消费者线程,实现多生产与多消费的情况,将会出现“假死”。

    具体原因: 多个生产者和消费者线程。当全部运行后,生产者线程生产数据后,可能唤醒的同类即生产者线程。此时可能会出现如下情况:所有生产者线程进入等待状态,然后消费者线程消费完数据后,再次唤醒的还是消费者线程,直至所有消费者线程都进入等待状态,此时将进入“假死”。

    解决方法: 将notify()或signal()方法改为notifyAll()或signalAll()方法,这样就不怕因为唤醒同类而进入“假死”状态了。

    Condition方式实现
    生产者

    1. public class Product {
    2. private ReentrantLock lock;
    3. private Condition condition;
    4. public Product(ReentrantLock lock, Condition condition) {
    5. super();
    6. this.lock = lock;
    7. this.condition = condition;
    8. }
    9. public void setValue() {
    10. try {
    11. lock.lock();
    12. while (!StringObject.value.equals("")) {
    13. // 有值,不生产
    14. condition.await();
    15. }
    16. String value = System.currentTimeMillis() + "" + System.nanoTime();
    17. System.out.println("set的值是:" + value);
    18. StringObject.value = value;
    19. condition.signalAll();
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }finally {
    23. lock.unlock();
    24. }
    25. }
    26. }

    消费者

    1. public class Consumer {
    2. private ReentrantLock lock;
    3. private Condition condition;
    4. public Consumer(ReentrantLock lock,Condition condition) {
    5. super();
    6. this.lock = lock;
    7. this.condition = condition;
    8. }
    9. public void getValue(){
    10. try {
    11. lock.lock();
    12. while(StringObject.value.equals("")){
    13. //没值,不进行消费
    14. condition.await();
    15. }
    16. System.out.println("get的值是:"+StringObject.value);
    17. StringObject.value = "";
    18. condition.signalAll();
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }finally {
    22. lock.unlock();
    23. }
    24. }
    25. }

    生产者线程和消费者线程与一生产一消费的模式相同。

    开启多生产/多消费模式

    1. public static void main(String[] args) throws InterruptedException {
    2. ReentrantLock lock = new ReentrantLock();
    3. Condition newCondition = lock.newCondition();
    4. Product product = new Product(lock,newCondition);
    5. Consumer consumer = new Consumer(lock,newCondition);
    6. for(int i=0;i<3;i++){
    7. ThreadProduct pThread = new ThreadProduct(product);
    8. ThreadConsumer cThread = new ThreadConsumer(consumer);
    9. pThread.start();
    10. cThread.start();
    11. }
    12. }

    输出结果:

    set的值是:148827212374628960540784817
    get的值是:148827212374628960540784817
    set的值是:148827212374628960540810047
    get的值是:148827212374628960540810047

    可见交替地进行get/set实现多生产/多消费模式。

    注意:相比一生产一消费的模式,改动了两处。①signal()—>signalAll()避免进入“假死”状态。②if()判断—>while()循环,重新判断条件,避免逻辑混乱。

    以上就是Java线程间通信的相关知识,以生产者/消费者模式为例,讲解线程间通信的使用以及注意事项。