FutureTask类
FutureTask使用场景:
FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。
FutureTask执行多任务操作提高处理效率:
订单实体类:
package com.learn.bean;
public class Order {
private Long id;
private String orderId;
private String sku;
public Order() {
}
public Order(Long id, String orderId, String sku) {
super();
this.id = id;
this.orderId = orderId;
this.sku = sku;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getSku() {
return sku;
}
public void setSku(String sku) {
this.sku = sku;
}
@Override
public String toString() {
return "Order [id=" + id + ", orderId=" + orderId + ", sku=" + sku + "]";
}
}用户类实体类:
package com.learn.bean;
public class User {
private Long id;
private String name;
private String sex;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public User(Long id, String name, String sex) {
super();
this.id = id;
this.name = name;
this.sex = sex;
}
public User() {
}
@Override
public String toString() {
return "User [id=" + id + ", name=" + name + ", sex=" + sex + "]";
}
}测试通过FutureTask提高多任务执行的效率:
package com.learn.test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import com.learn.bean.Order;
import com.learn.bean.User;
public class FutureTaskLearn {
//线程池
public static ExecutorService executor = Executors.newCachedThreadPool();
public Map<String,Object> queryUserInfo() throws InterruptedException{
long startTime = System.currentTimeMillis();
//创建用户线程
Callable<User> queryUserThread = new UserCallTread(2L);
//创建订单线程
Callable<Order> queryOrderThread = new OrderCallTread(3L);
// 创建线程池(使用了预定义的配置)
FutureTask<User> queryUserTask = new FutureTask<>(queryUserThread);
executor.execute(queryUserTask);
FutureTask<Order> queryOrderTask = new FutureTask<>(queryOrderThread);
executor.execute(queryOrderTask);
Map<String, Object> map = new HashMap<>();
User user = null;
Order order = null;
try {
user = queryUserTask.get();
order = queryOrderTask.get();
} catch (Exception e) {
e.printStackTrace();
}
map.put("user", user);
map.put("order",order);
long endTime = System.currentTimeMillis();
System.out.println("总用时间--》:"+(endTime-startTime));
return map;
}
public static void main(String[] args) throws InterruptedException {
FutureTaskLearn futureTaskLearn = new FutureTaskLearn();
Map<String, Object> queryUserInfo = futureTaskLearn.queryUserInfo();
System.out.println(queryUserInfo.get("user"));
System.out.println(queryUserInfo.get("order"));
}
}
/**
* 用户线程
* @author wo
*
*/
class UserCallTread implements Callable<User>{
Long id;
@Override
public User call() throws Exception {
User user = new User(id,"张三","男");
//阻塞3秒
Thread.sleep(3000);
return user;
}
public UserCallTread() {
}
public UserCallTread(Long id) {
super();
this.id = id;
}
}
/**
* 订单线程
* @author wo
*
*/
class OrderCallTread implements Callable<Order>{
Long id;
@Override
public Order call() throws Exception {
Order order = new Order(id,"2018032211","syn-1");
//阻塞4秒
Thread.sleep(4000);
return order;
}
public OrderCallTread() {
}
public OrderCallTread(Long id) {
super();
this.id = id;
}
}执行结果:
成功的将串行任务改为了并行任务,在时间上取决于耗时最长的任务,在实际开发过程中如果两个任务存在依赖关系无法进行并行的要注意使用。
FutureTask有下面几个重要的方法:
1.get()
阻塞一直等待执行完成拿到结果
2.get(int timeout, TimeUnit timeUnit)
阻塞一直等待执行完成拿到结果,如果在超时时间内,没有拿到抛出异常
3.isCancelled()
是否被取消
4.isDone()
是否已经完成
5.cancel(boolean mayInterruptIfRunning)
试图取消正在执行的任务
FutureTask如何实现线程阻塞和指定线程的唤醒机制:
FutureTask的get方法如何实现阻塞机制(直接上源码):
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}FutureTask是依靠其内部类java.util.concurrent.FutureTask.Sync<V>类来实现阻塞。
Sync又是实现了AbstractQueuedSynchronizer类。
AbstractQueuedSynchronizer又是靠什么来实现阻塞以及维持协调好各竞争线程间的资源分配的?这里就不具体深入了FutureTask思维导图
这里可以看到FutureTask本质依靠 LockSupport 的LockSupportLockSupportpark和unpark实现阻塞和唤醒机制:
LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。
Java锁和同步器框架的核心AQS:AbstractQueuedSynchronizer,就是通过调用LockSupport.park()和LockSupport.unpark()实现线程的阻塞和唤醒的。LockSupport很类似于二元信号量(只有1个许可证可供使用),如果这个许可还没有被占用,当前线程获取许可并继续执行;如果许可已经被占用,当前线程阻塞,等待获取许可。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。
LockSupport是通过调用Unsafe函数中的接口实现阻塞和解除阻塞的。
LockSupport.park()和unpark(),与object.wait()和notify()的区别?
在调用对象的Wait之前当前线程必须先获得该对象的监视器(Synchronized),被唤醒之后需要重新获取到监视器才能继续执行。
而LockSupport并不需要获取对象的监视器。LockSupport机制是每次unpark给线程1个"许可"——最多只能是1,而park则相反,如果当前线程有许可,那么park方法会消耗1个并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用 park/unpark方法没有任何效果。
因为它们本身的实现机制不一样,所以它们之间没有交集,也就是说LockSupport阻塞的线程,notify/notifyAll没法唤醒.
总结下 LockSupport的park/unpark和Object的wait/notify:
- 面向的对象不同;
- 跟Object的wait/notify不同LockSupport的park/unpark不需要获取对象的监视器;
- 实现的机制不同,因此两者没有交集。
虽然两者用法不同,但是有一点, LockSupport 的park和Object的wait一样也能响应中断.
总体来说,主要的区别应该说是它们面向的对象不同。阻塞和唤醒是对于线程来说的,LockSupport的park/unpark更符合这个语义,以“线程”作为方法的参数, 语义更清晰,使用起来也更方便。而wait/notify的实现使得“阻塞/唤醒对线程本身来说是被动的,要准确的控制哪个线程、什么时候阻塞/唤醒很困难, 要不随机唤醒一个线程(notify)要不唤醒所有的(notifyAll)。






还没有评论,来说两句吧...