Skip to content

线程的创建方式

继承Thread类方式

java
package com.java.thread;

public class ExtThread extends Thread {
    @Override
    public void run() {
        System.out.println(this.getClass().getName() + "_当前线程id:" + Thread.currentThread().getId());
    }
}
//测试
new ExtThread().start();

实现runnable方式

java
package com.java.thread;

public class ImpRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println(this.getClass().getName() + "_当前线程id:" + Thread.currentThread().getId());
    }
}
//测试
new Thread(new ImpRunnable()).start();

实现callable方式(有返回值)

返回值对象

java
package com.java.thread.callable;

import java.io.Serializable;

public class User implements Serializable {
    private String name;
    private String age;

    public User(String name, String age) {
        this.name = name;
        this.age = age;
    }


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age='" + age + '\'' +
                '}';
    }
}

实现callable

java
package com.java.thread.callable;

import java.util.concurrent.Callable;

public class ImpCallable implements Callable<User> {
    @Override
    public User call() throws Exception {
        System.out.println(this.getClass().getName() + "_当前线程id:" + Thread.currentThread().getId());
        return new User("小明","18");
    }
}
//测试
FutureTask<User> futureTask = new FutureTask<User>(new ImpCallable());
Thread t = new Thread(futureTask);
t.start();
System.out.println("线程id:" + t.getId() + ",_对象:" + futureTask.get().getClass().getName() + ",[futureTask]返回结果:" + futureTask.get().toString());

线程池工具类-Executors

java
public static ExecutorService service = Executors.newFixedThreadPool(10);
//测试
service.execute(new ImpRunnable());

线程池-ThreadPoolExecutor

java
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                10,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );

参数说明

int corePoolSize:

核心线程数 [5],除非设置 (allowCoreThreadTimeOut-boolean), 线程池创建,当前 [5] 个线程就已经就绪。

int maximumPoolSize:

最大线程数量 [10]

long keepAliveTime:

当线程的数量大于 (maximumPoolSize-corePoolSize)核心线程[5] 数量,空闲时间大于keepAliveTime,则释放空闲的线程。

TimeUnit unit:

指定时间单位。

workQueue:
java
new LinkedBlockingQueue<Runnable>(100000)

阻塞队列,存放任务数据。当线程超出最大线程数**[10],这时候不会处理任务,等待有空闲的线程再去处理任务。默认int最大值,**最好设置默认值

ThreadFactory threadFactory:

线程的创建工厂。

RejectedExecutionHandler handler:

如果任务队列已经满了,按照指定的拒绝策略处理任务。

CompletableFuture异步任务

创建异步对象

无返回值
java
public static ExecutorService service = Executors.newFixedThreadPool(10);

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "_当前线程id:" + Thread.currentThread().getId());
        }, service);
有返回值
java
CompletableFuture<User> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "_当前线程id:" + Thread.currentThread().getId());
            return new User("小明", "18");
        }, service);
System.out.println(completableFuture.get().toString());

成功感知-whenComplete

java
CompletableFuture<User> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "_当前线程id:" + Thread.currentThread().getId());
            return new User("小明", "18");
        }, service).whenComplete((result, exception) -> {
            System.out.println("异步线程结束了:" + result.toString() + "_异常信息:" + exception);
        });
//测试
pool-1-thread-1_当前线程id:12
异步线程结束了:User{name='小明', age='18'}_异常信息:null

异常感知-exceptionally

java
//异常情况处理
CompletableFuture<Integer> completableFutureError = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "_当前线程id:" + Thread.currentThread().getId());
            int i = 1 / 0;
            return i;
        }, service).whenComplete((result, exception) -> {
            System.out.println("异步线程结束了:" + result + "_异常信息:" + exception);
        }).exceptionally(exception -> {
            //处理异常返回
            return 8;
        });
System.out.println(completableFutureError.get());
//测试
 	pool-1-thread-1_当前线程id:12
 	异步线程结束了:null_异常信息:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	8

完成处理-handle

java
CompletableFuture<Integer> completableFutureError = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "_当前线程id:" + Thread.currentThread().getId());
            int i = 1 / 0;
            return i;
        }, service).handle((result, ex) -> {
            if (result != null) {
                return result * 3;
            }
            if (ex != null) {
                return 8;
            }
            return 0;
        });

线程串行化-thenRunAsync

java
//任务2不能接收任务1的结果 
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1");
            return 2;
        }, service).thenRunAsync(() -> {
            System.out.println("执行任务2");
        }, service);

线程串行化-thenRunAsync

java
 CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1");
            return 2;
        }, service).thenAcceptAsync((result) -> {
            System.out.println("任务1的结果:"+result);
            System.out.println("执行任务2");
        }, service);

线程串行化-thenApplyAsync

java
CompletableFuture<Object> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1");
            return 2;
        }, service).thenApplyAsync((result) -> {
            System.out.println("任务1的结果:" + result);
            System.out.println("执行任务2");
            return result + 8;
        }, service);

System.out.println(completableFuture.get());
//测试
10

任务组合-完成后在执行

java
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1:" + 1);
            return 1;
        }, service);

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务2:" + 2);
            return 2;
        }, service);

//组合完成后执行-无结果
completableFuture1.runAfterBothAsync(completableFuture2, () -> {
            System.out.println("执行任务3");
        }, service);

//组合完成后执行-获取任务1+任务2结果
completableFuture1.thenAcceptBothAsync(completableFuture2, (result1, result2) -> {
            int sum = result1 + result2;
            System.out.println("执行任务3=任务1+任务2:" + sum);
        }, service);

//组合完成后执行-获取任务1+任务2结果,并返回
CompletableFuture<Integer> integerCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (result1, result2) -> {
            System.out.println("任务1:"+result1+",任务2:"+result2);
            int sum = result1 + result2 + 10;
            System.out.println("执行任务3_=任务1+任务2:" + sum);
            return sum;
        }, service);

System.out.println(integerCompletableFuture.get());

任务组合-任意一个完成执行

注意线程数

java
 public static ExecutorService service = Executors.newFixedThreadPool(3);
java
completableFuture1.runAfterEitherAsync(completableFuture2, () -> {
            System.out.println("执行任务3");
        }, service);

completableFuture1.acceptEitherAsync(completableFuture2, (res) -> {
            System.out.println("任务1或者任务2:" + res);
            System.out.println("执行任务3");
        }, service);

CompletableFuture<Integer> completableFuture3 = completableFuture1.applyToEitherAsync(completableFuture2, (res) -> {
            System.out.println("任务1或者任务2:" + res);
            System.out.println("执行任务3");
            return res;
        }, service);

System.out.println(completableFuture3.get());

多任务组合

java
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1:" + 1);
            return 1;
        }, service);

CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("执行任务2:" + 2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 2;
        }, service);

CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务3:" + 3);
            return 3;
        }, service);

全部完成执行

java

CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3);
System.out.println("阻塞:" + voidCompletableFuture.get());
System.out.println("任务1:" + completableFuture1.get() + "任务2:" + completableFuture2.get() + "任务3:" + completableFuture3.get());
//测试
执行任务1:1
执行任务3:3
执行任务2:2
阻塞:null

其中一个完成执行

java
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(completableFuture1, completableFuture2, completableFuture3);
System.out.println("返回其中一个:" + anyOf.get());
//测试
返回其中一个:1