Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • JavaSE

  • JavaEE

  • Linux

  • MySQL

  • NoSQL

  • Python

  • Python模块

  • 机器学习

  • 设计模式

  • 传智健康

  • 畅购商城

  • 博客项目

  • JVM

  • JUC

    • 多线程
    • CompletableFuture
      • Future和Callable接口
        • get()阻塞
        • isDone()轮询
      • 对Future的改进
        • CompletableFuture和CompletionStage
        • 接口CompletionStage
        • 类CompletableFuture
      • 核心的四个静态方法,来创建一个异步操作
        • runAsync 无返回值
        • supplyAsync 有返回值
        • 减少阻塞和轮询
        • CompletableFuture的优点
      • 案例精讲
        • 函数式编程
        • Runnable
        • Function
        • Consumer
        • Supplier
        • BiConsumer
        • 小结
        • 比价网站
      • CompletableFuture常用方法
        • 获得结果和触发计算
        • 获取结果
        • 主动触发计算
        • 对计算结果进行处理
        • thenApply
        • handle
        • 小结
        • 对计算结果进行消费
        • Code之任务之间的顺序执行
        • 对计算速度进行选用
        • 对计算结果进行合并
    • 锁
    • LockSupport与线程中断
    • Java内存模型之JMM
    • Volatile与Java内存模型
    • CAS
    • 原子操作类
    • ThreadLocal
    • Java对象内存布局和对象头
    • Synchronized与锁升级
    • AbstractQueuedSynchronizer之AQS
    • ReentrantLock、ReentrantReadWriteLock、StampedLock讲解
    • JUC总结
    • 并发集合
  • Golang

  • Kubernetes

  • 硅谷课堂

  • C

  • 源码

  • 神领物流

  • RocketMQ

  • 短链平台

  • 后端
  • JUC
Iekr
2023-12-03
目录

CompletableFuture

# CompletableFuture

# Future 和 Callable 接口

Future 接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

Callable 接口中定义了需要有返回的任务需要实现的方法。

image-20231203190646517

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

Future 接口

image-20231203190718986

image-20231203190723728

# get () 阻塞

一旦调用 get () 方法,不管是否计算完成都会导致阻塞

package com.zzyy.study.test;

import java.util.concurrent.*;

/**
 * @auther zzyy
 * @create 2020-06-14 17:02
 */
public class CompletableFutureDemo
{
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
    {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return ""+ThreadLocalRandom.current().nextInt(100);
        });

        Thread t1 = new Thread(futureTask,"t1");
        t1.start();

        //3秒钟后才出来结果,还没有计算你提前来拿(只要一调用get方法,对于结果就是不见不散,会导致阻塞)
        //System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get());

        //3秒钟后才出来结果,我只想等待1秒钟,过时不候
        System.out.println(Thread.currentThread().getName()+"\t"+futureTask.get(1L,TimeUnit.SECONDS));

        System.out.println(Thread.currentThread().getName()+"\t"+" run... here");

    }
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# isDone () 轮询

轮询的方式会耗费无谓的 CPU 资源,而且也不见得能及时地得到计算结果。如果想要异步获取结果,通常都会以轮询的方式去获取结果,尽量不要阻塞。

package com.zzyy.study.test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("-----come in FutureTask");
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return ""+ThreadLocalRandom.current().nextInt(100);
        });

        new Thread(futureTask,"t1").start();

        System.out.println(Thread.currentThread().getName()+"\t"+"线程完成任务");

        /**
         * 用于阻塞式获取结果,如果想要异步获取结果,通常都会以轮询的方式去获取结果
         */
        while(true)
        {
            if (futureTask.isDone())
            {
                System.out.println(futureTask.get());
                break;
            }
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 对 Future 的改进

# CompletableFuture 和 CompletionStage

image-20231203190912615

image-20231203190917539

# 接口 CompletionStage

image-20231203190945169

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。

# 类 CompletableFuture

image-20231203191017955

# 核心的四个静态方法,来创建一个异步操作

# runAsync 无返回值

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
1
2

如果没有指定 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

案例

package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----task is over");
        });
        System.out.println(future.get());
    }
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# supplyAsync 有返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
1
2

如果没有指定 Executor 的方法,直接使用默认的 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

案例

package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        });

        System.out.println(completableFuture.get());
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 减少阻塞和轮询

从 Java8 开始引入了 CompletableFuture,它是 Future 的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

package com.atguigu.juc.senior.inner.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-23 12:59
 */
public class cfuture4
{
    public static void main(String[] args) throws Exception
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----计算结束耗时1秒钟,result: "+result);
            if(result > 6)
            {
                int age = 10/0;
            }
            return result;
        }).whenComplete((v,e) ->{
            if(e == null)
            {
                System.out.println("-----result: "+v);
            }
        }).exceptionally(e -> {
            System.out.println("-----exception: "+e.getCause()+"\t"+e.getMessage());
            return -44;
        });

        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# CompletableFuture 的优点

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

# 案例精讲

从电商网站的比价需求说开去

# 函数式编程

在 JDK8 中函数式编程已经主流,Lambda +Stream + 链式调用 + Java8 函数式编程带走。

# Runnable

image-20231203191415453

# Function

image-20231203191430306

# Consumer

image-20231203191439156

# Supplier

image-20231203191447270

# BiConsumer

image-20231203191455363

# 小结

image-20231203191506269

# 比价网站

经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。

我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。同理, 对于分布式微服务的调用,按照实际业务,如果是无关联 step by step 的业务,可以尝试是否可以多箭齐发,同时调用。

我们去比同一个商品在各个平台上的价格,要求获得一个清单列表, 1 step by step,查完京东查淘宝,查完淘宝查天猫......

2 all 一口气同时查询。。。。。

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @auther zzyy
 * @create 2020-06-28 10:07
 */
public class T1
{
    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("tmall"),
            new NetMall("pdd"),
            new NetMall("mi")
    );

    public static List<String> findPriceSync(List<NetMall> list,String productName)
    {
        return list.stream().map(mall -> String.format(productName+" %s price is %.2f",mall.getNetMallName(),mall.getPriceByName(productName))).collect(Collectors.toList());
    }

    public static List<String> findPriceASync(List<NetMall> list,String productName)
    {
        return list.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format(productName + " %s price is %.2f", mall.getNetMallName(), mall.getPriceByName(productName)))).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
    }


    public static void main(String[] args)
    {
        long startTime = System.currentTimeMillis();
        List<String> list1 = findPriceSync(list, "thinking in java");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime - startTime) +" 毫秒");

        long startTime2 = System.currentTimeMillis();
        List<String> list2 = findPriceASync(list, "thinking in java");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("----costTime: "+(endTime2 - startTime2) +" 毫秒");
    }
}

class NetMall
{
    @Getter
    private String netMallName;

    public NetMall(String netMallName)
    {
        this.netMallName = netMallName;
    }

    public double getPriceByName(String productName)
    {
        return calcPrice(productName);
    }

    private double calcPrice(String productName)
    {
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        return ThreadLocalRandom.current().nextDouble() + productName.charAt(0);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

# CompletableFuture 常用方法

# 获得结果和触发计算

# 获取结果

public T get(); // 获取结果
public T get(long timeout, TimeUnit unit); // 设置超时时间
1
2

getNow()

/**
没有计算完成的情况下,给我一个替代结果
立即获取结果不阻塞;计算完,返回计算完成后的结果;没算完,返回设定的valueIfAbsent值
*/
public T getNow(T valueIfAbsent); 

// 案例
package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 533;
        });

        //去掉注释上面计算没有完成,返回444
        //开启注释上满计算完成,返回计算结果
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        System.out.println(completableFuture.getNow(444));


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

join()

public T join();

 
package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        System.out.println(CompletableFuture.supplyAsync(() -> "abc").thenApply(r -> r + "123").join());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 主动触发计算

public boolean complete(T value); // 是否打断get方法立即返回括号值

// 案例
 
package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 533;
        });

        //注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        //当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
        System.out.println(completableFuture.complete(444)+"\t"+completableFuture.get());


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 对计算结果进行处理

# thenApply

计算结果存在依赖关系,这两个线程串行化,由于存在依赖关系 (当前步错,不走下一步),当前步骤有异常的话就叫停。

package com.zzyy.study.test;

import java.util.concurrent.*;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
    //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
    CompletableFuture.supplyAsync(() -> {
        //暂停几秒钟线程
        try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("111");
        return 1024;
    }).thenApply(f -> {
        System.out.println("222");
        return f + 1;
    }).thenApply(f -> {
        //int age = 10/0; // 异常情况:那步出错就停在那步。
        System.out.println("333");
        return f + 1;
    }).whenCompleteAsync((v,e) -> {
        System.out.println("*****v: "+v);
    }).exceptionally(e -> {
        e.printStackTrace();
        return null;
    });

    System.out.println("-----主线程结束,END");

    // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
    try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
}
    
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# handle

有异常也可以往下一步走,根据带的异常参数可以进一步处理

package com.zzyy.study.test;

import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            int age = 10/0;
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        System.out.println("-----主线程结束,END");

        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
 

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 小结

image-20231203192155682

whenComplete 和 whenCompleteAsync 的区别:

  • whenComplete: 是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行

# 对计算结果进行消费

thenAccept() 接收任务的处理结果,并消费处理,无返回结果

public static void main(String[] args) throws ExecutionException, InterruptedException
{
    CompletableFuture.supplyAsync(() -> {
        return 1;
    }).thenApply(f -> {
        return f + 2;
    }).thenApply(f -> {
        return f + 3;
    }).thenApply(f -> {
        return f + 4;
    }).thenAccept(r -> System.out.println(r));
}
1
2
3
4
5
6
7
8
9
10
11
12

# Code 之任务之间的顺序执行

  • thenRun: thenRun(Runnable runnable) 任务 A 执行完执行 B,并且 B 不需要 A 的结果
  • thenAccept: thenAccept(Consumer action) 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
  • thenApply: thenApply(Function fn) :任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
 

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());


System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
1
2
3
4
5
6
7

# 对计算速度进行选用

谁快用谁 applyToEither()

package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return f + 1;
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    }
}
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 对计算结果进行合并

两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理,先完成的先等着,等待其它分支任务。

thenCombine()

package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return x + y;
        });
        
        System.out.println(thenCombineResult.get());
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

code 表达式

package com.zzyy.study.test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @auther zzyy
 * @create 2020-06-16 20:16
 */
public class CompletableFutureDemo2
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());


        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
多线程
锁

← 多线程 锁→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式