并发编程系列之线程之间的通信

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!
点击蓝字关注我们

并发编程系列之线程之间的通信

前言

上节我们介绍了线程从创建到结束的过程,介绍了几种常见的启动和终止方法,我们知道了如何使用一个线程,那么今天我们再接下来看看1个或者多个线程之间是如何进行通信的?OK,让我们一起走进今天的并发之旅吧,祝您旅途愉快。

并发编程系列之线程之间的通信

景点一:共享变量(volatile)

关键字volatile可以用来修饰一个共享的变量字段,使用volatile修饰的变量,当访问该变量时需要从共享内存中获取最新值,而对该变量的更新,必须同步刷新回共享内存,这样就能保证所有线程对该变量访问的可见性;结合示例代码更容易理解:

public class RunThread extends Thread{

 private volatile static boolean isRunning = true;
 private void setRunning(boolean isRunning){
   RunThread.isRunning = isRunning;
 }
 
 public void run(){
   System.out.println("进入run方法..");
   while(isRunning == true){
   }
   System.out.println("线程感知isRunning值被设置成false,线程停止!!!");
 }
 
 public static void main(String[] args) throws InterruptedException {
   RunThread runThread1 = new RunThread();
   RunThread runThread2 = new RunThread();
   runThread1.start();
   runThread2.start();
   // 主线程睡眠1秒之后线程1将标识设为false
   Thread.sleep(1000);
   runThread1.setRunning(false);
   System.out.println("isRunning的值已经被设置了false");
   Thread.sleep(2000);
   System.out.println(runThread1.isAlive());
   System.out.println(runThread2.isAlive());
 }
}

结果:
进入run方法..
进入run方法..
isRunning的值已经被设置了false
线程感知isRunning值被设置成false,线程停止!!!
线程感知isRunning值被设置成false,线程停止!!!
false
false

我们可以看到,开启2个线程,线程运行终止的条件是isRunning是否为false,当2个线程都启动运行之后,主线程睡眠1秒,然后线程1将isRunning设为false,会发现,线程1和线程2立马感应到了isRunning值的更新,结束了线程的运行;

当去掉volatile修饰共享变量时,结果如下:2个线程都未感应isRunning的变化。

并发编程系列之线程之间的通信

并发编程系列之线程之间的通信

景点二:同步代码(synchronized)

synchronized修饰的方法或者代码块,主要是确保多个线程在同一时刻,只能有一个线程处于方法或者代码块中,保证了线程对变量访问的可见性和排他性,我们来看下面这段代码示例:

public class SynchronizedDemo {

   // 方法1 线程1将调用
   public synchronized void method1() {
       try {
           System.out.println(Thread.currentThread().getName() + "执行method1方法,并获取锁,主线程睡眠4秒");
           Thread.sleep(4000);
           System.out.println(Thread.currentThread().getName() + "释放锁");
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }

   // 方法2 线程2将调用
   public synchronized void method2() {
       System.out.println(Thread.currentThread().getName() + "执行method2方法");
   }

   public static void main(String[] args) throws Exception {
       final SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run() {
               synchronizedDemo.method1();
           }
       }, "线程1");
       // 为了保证线程1和2的执行顺序我让线程2延迟0.5秒再启动
       Thread.sleep(500);
       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run() {
               synchronizedDemo.method2();
           }
       }, "线程2");
       t1.start();
       t2.start();
   }
}

执行结果如下:

并发编程系列之线程之间的通信

当我们把method2方法的synchronized去掉之后,再看执行结果:

并发编程系列之线程之间的通信

从上面示例我们就能看出:当加了synchronized修饰之后线程1首先启动并持有对象SynchronizedDemo的对象锁,线程2启动时,发现锁被线程1占用,处于等待状态,等待3.4秒之后线程1释放锁,线程2获得锁,执行method2方法。本示例中,synchronized同步的是普通方法,所以持有的是当前实例对象锁,如果是方法块,锁是synchronized包含的代码块。

我们去就提到过,synchronized是通过Monitor对象来实现同步的,那么我们今天来对这点做个补充,我们看下Monitor是如何工作的,如下图:

并发编程系列之线程之间的通信

总结:任意一个线程对Object对象(synchronized修饰)的访问,首先要获得Object对象的监视器,如果获取失败,该线程将进入同步队列等待,该线程状态变为阻塞(BLOCKED),当访问Object的前(已经获得锁的线程)线程释放了锁,则唤醒阻塞队列中的线程,使阻塞线程重新尝试对监视器的获取;

并发编程系列之线程之间的通信

景点三:等待/通知

等待/通知机制指的是一个线程1调用了对象的wait()方法进入等待状态,另一个线程2调用该对象的notifyAll()方法,线程1收到了通知之后从对象的wait()方法返回,进而执行后续的操作,两个线程通过对象来完成交互,而对象上的wait和notify/notifyAll的关系就像一个开关信号一样,用来完成等待方和通知方之间的交互工作。我们先来看看这几个方法:

  • notify():通知一个在对象上等待的线程,由WAITING状态变为BLOCKING状态,从等待队列移动到同步队列,等待CPU调度获取该对象的锁,当该线程获取到了对象的锁后,该线程从wait()方法返回;

  • notifyAll():通知所有等待在该对象上的线程,由WAITING状态变为BLOCKING状态,等待CPU调度获取该对象的锁;

  • wait():调用该方法的线程进入WAITING状态,并将当前线程放置到对象的等待队列,只有等待另外线程的通知或被中断才会返回,(需要注意,调用wait()方法后,会释放对象的锁)

  • wait(long):超时等待一段时间,参数为毫秒,也就是等待长达n毫秒,如果没有通知就超时返回;

  • wait(long,int):第一个参数为毫秒,第二个参数为纳秒,对超时返回更细粒度的控制;

那么这几个方法如何使用呢?我们来看下面2个示例:

public class WaitAndNotifyDemo {
   // 定义一个list属性
   private volatile static List list = new ArrayList();
   // 提供一个add方法
   public void add() {
       list.add("justin");
   }
   // 获取list的大小
   public int size() {
       return list.size();
   }

   public static void main(String[] args) {
       final WaitAndNotifyDemo list2 = new WaitAndNotifyDemo();
       final Object lock = new Object();
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run()
{
               try {
                   // 使用synchronized对象锁加锁
                   synchronized (lock) {
                       System.out.println("t1启动..");
                       for (int i = 0; i < 6; i++) {
                           // 往list中添加元素,当list中添加了5个元素之后,唤醒等待对象锁的线程2
                           list2.add();
                           System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了第"+(i+1)+"个元素..");
                           Thread.sleep(500);
                           if (list2.size() == 5) {
                               System.out.println("已经发出通知..");
                               lock.notify();
                           }
                       }
                   }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }, "t1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run()
{
               // synchronized对象锁加锁吗,当list的大小不为5时,就等待,wait会释放lock锁,t1才能获得锁
               synchronized (lock) {
                   System.out.println("t2启动..");
                   if (list2.size() != 5) {
                       try {
                           lock.wait();
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                   }
                   System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
                   throw new RuntimeException();
               }
           }
       }, "t2");
       // 为了保证先启动线程2执行等待,所以我给线程1加了个延迟0.5秒启动
       t2.start();
       try {
           Thread.sleep(500);
           t1.start();
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

执行结果如下:

并发编程系列之线程之间的通信

我们再看另一个例子,利用等待/通知机制我们来自定义实现一个有界阻塞队列ArrayBlockingQueue:

public class DefinedArrayBlockingQueue {

   // 定义一个list
   private final LinkedList<Object> list = new LinkedList<Object>();
   // 用于原子操作计数器
   private final AtomicInteger count = new AtomicInteger(0);
   // 队列最大值和最小值
   private final int maxSize;
   private final int minSize = 0;
   // 用于synchronized对象锁
   private final Object lock = new Object();

   public DefinedArrayBlockingQueue(int maxSize) {
       this.maxSize = maxSize;
   }

   // 往队列中加入元素
   public void put(Object obj) {
       synchronized (lock) {
           // 当队列中元素达到最大值就等待状态
           while (count.get() == maxSize) {
               try {
                   lock.wait();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           list.add(obj);
           // 计数器+1
           count.getAndIncrement();
           System.out.println(" 元素 " + obj + " 被添加 ");
           // 唤醒take方法的wait
           lock.notify();
       }
   }

   public Object take() {
       Object temp = null;
       synchronized (lock) {
           while (count.get() == minSize) {
               try {
                   lock.wait();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           // 计数器-1
           count.getAndDecrement();
           temp = list.removeFirst();
           System.out.println(" 元素 " + temp + " 被消费 ");
           // 唤醒put方法里面的wait
           lock.notify();
       }
       return temp;
   }

   public int size() {
       return count.get();
   }

   public static void main(String[] args) throws Exception {
       // 定义一个自定义队列,最大长度设置为5
       final DefinedArrayBlockingQueue m = new DefinedArrayBlockingQueue(5);
       m.put("1");
       m.put("2");
       m.put("3");
       m.put("4");
       m.put("5");
       System.out.println("当前元素个数:" + m.size());
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run()
{
               m.put("6");
               m.put("7");
           }
       }, "t1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run()
{
               try {
                   Thread.sleep(1000);
                   m.take();
                   Thread.sleep(1000);
                   m.take();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }, "t2");
       t1.start();
       Thread.sleep(1000);
       t2.start();
   }
}

运行结果:

并发编程系列之线程之间的通信

我们再来分析下,等待唤醒机制,并做个总结,整个等待唤醒的过程如下:等待线程首先获取对象锁,然后调用对象的wait()方法,此时释放对象锁,由于等待线程释放了对象锁,唤醒线程随后获取到对象锁,并调用对象的notify()方法,将等待队列中的线程移到了同步队列中,此时等待线程状态的状态变为阻塞状态,唤醒线程释放了锁之后,等待线程再次获取到对象锁,并从wait()方法返回继续执行,过程如下图:

并发编程系列之线程之间的通信

并发编程系列之线程之间的通信

景点四:管道输入/输出流

管道输入输出流和普通文件的输入输出不同之处在于,它是作用于线程之间的数据传输,以内存作为传输媒介,主要包括下面4种具体实现:

  • PipedOutputStream(字节)

  • PipedInputStream(字节)

  • PipedReader(字符)

  • PipedWriter(字符)

我们来看下下面这个demo:

public class PipedDemo {

   public static void main(String[] args) throws IOException {
       // 定义一个输入流
       PipedWriter out = new PipedWriter();
       // 定义一个输出流
       PipedReader in = new PipedReader();
       // 输入输出建立连接
       out.connect(in);
       Thread printThread = new Thread(new PrintThread(in),"打印");
       printThread.start();
       int receive = 0;
       try {
           // 输入键盘字符
           System.out.println("请输入任意字符,并按enter键发送:");
           while ((receive = System.in.read()) != -1) {
               out.write(receive);
           }
       }finally {
           out.close();
       }
   }


   static class PrintThread implements Runnable{
       private PipedReader in;
       // 读取写入的字符
       public PrintThread(PipedReader in) {
           this.in = in;
       }

       @Override
       public void run()
{
           int receive = 0;
           try {
               while ((receive = in.read()) != -1){
                   System.out.print((char)receive);
               }
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
   }
}

运行结果如下:

并发编程系列之线程之间的通信

并发编程系列之线程之间的通信

景点五:join方法

如果一个线程执行了thread.join()方法,就说明:当前主线程需要等待执行join方法的线程终止之后才从thread.join()中返回,主线程才继续往下执行,join主要包括下面3个方法:

  • join源码:

  • public final void join() throws InterruptedException {
           join(0);
       }

    join超时返回源码:

    public final synchronized void join(long millis)
       throws InterruptedException
    {
           long base = System.currentTimeMillis();
           long now = 0;

           if (millis < 0) {
               throw new IllegalArgumentException("timeout value is negative");
           }

           if (millis == 0) {
               while (isAlive()) {
                   wait(0);
               }
           } else {
               while (isAlive()) {
                   long delay = millis - now;
                   if (delay <= 0) {
                       break;
                   }
                   wait(delay);
                   now = System.currentTimeMillis() - base;
               }
           }
       }
  • join超时返回源码(更细粒度的控制超时时间):

    public final synchronized void join(long millis, int nanos)
       throws InterruptedException
    {
           if (millis < 0) {
               throw new IllegalArgumentException("timeout value is negative");
           }
           if (nanos < 0 || nanos > 999999) {
               throw new IllegalArgumentException(
                                   "nanosecond timeout value out of range");
           }
           if (nanos >= 500000 || (nanos != 0 && millis == 0)) {
               millis++;
           }
           join(millis);
       }


我们再看下如何使用join:

public class JoinDemo implements Runnable {
       @Override
       public void run()
{
           System.out.println("线程"+Thread.currentThread().getName()+":开始运行"+ DateUtil.getNowDate());
           try{
               Thread.sleep(5000);
           }catch(InterruptedException e){
               e.printStackTrace();
           }
           System.out.println("线程"+Thread.currentThread().getName()+":结束运行"+DateUtil.getNowDate());
       }

   public static void main(String[] args) {
       System.out.println("Main Thread Start..."+DateUtil.getNowDate());
       JoinDemo joinDemo = new JoinDemo();
       Thread t1 = new Thread(joinDemo,"t1");
       Thread t2 = new Thread(joinDemo,"t2");
       t1.start();
       t2.start();
   }
}

运行结果如下:

并发编程系列之线程之间的通信

我们使用join之后再看看运行结果会怎样:

并发编程系列之线程之间的通信

使用join之后我们发现,线程1没有返回之前,主线程一直在阻塞着,线程2一直没有启动,等到5秒线程1运行结束之后,主线程继续,线程2启动并执行。

并发编程系列之线程之间的通信

景点六:ThreadLocal本地线程

线程变量,以一个ThreadLocal对象为键,任意对象为值得存储结构,一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值,相当于每个线程都有一个独立的本地对象,这块内存是线程私有的。对其他线程是不可见的,线程可以利用这块内存做自己的事,而不会受其他线程干扰。

我们来看下面的demo:

public class ThreadLocalDemo {
   public static ThreadLocal<String> th = new ThreadLocal<String>();
   public void setTh(String value){
       th.set(value);
   }
   public void getTh(){
       System.out.println(Thread.currentThread().getName() + "的值:" + this.th.get());
   }

   public static void main(String[] args) throws InterruptedException {
       final ThreadLocalDemo ct = new ThreadLocalDemo();
       Thread t1 = new Thread(new Runnable() {
           @Override
           public void run()
{
               ct.setTh("Justin");
               ct.getTh();
           }
       }, "线程1");

       Thread t2 = new Thread(new Runnable() {
           @Override
           public void run()
{
               try {
                   Thread.sleep(1000);
                   //ct.setTh("Java");
                   ct.getTh();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }, "线程2");

       t1.start();
       t2.start();
   }
}

运行结果如下:结果很正常,对于同一个对象,线程1第一次赋值,线程2第二次赋值,分别输出第一次和第二次的结果。

并发编程系列之线程之间的通信

当我们把线程2赋值注释起来,按道理是应该会拿到线程1赋的值,但是我们看下面的结果:很明显线程2拿到个null,说明线程2没获取到线程1的值,这相当于,虽然是同一个对象,但是在两个线程中却有着不同的副本。

并发编程系列之线程之间的通信


今天的旅途就到这了,我们已经逛完了线程之间通信的所有景点,感谢光临,下次再见!!!

并发编程系列之线程之间的通信

相关旅程:

并发编程系列之线程简介

并发编程系列之线程的启动终止



原文始发于微信公众号(Justin的后端书架):并发编程系列之线程之间的通信