多线程优化API请求:CountDownLatch与PriorityBlockingQueue的应用

RemainderTime 2024-08-22 15:05:03 阅读 61

目录

前言

CountDownLatch是什么?

PriorityBlockingQueue是什么?

场景描述

解决方案

定义统一工厂制造类

定义制造厂

定义客户请求实现

定义控制器

定义启动类

结果呈现

启动项目

请求制造操作

总结


前言

写这篇文章的缘由是因为之前在面试期间经常被提到的一个场景题,“前端向后端发起一个API请求,该API需要处理复杂的业务逻辑,涉及多个相互独立的业务模块。每个业务模块都需要执行特定的操作,且这些操作彼此之间没有依赖关系。然而,每个模块的处理都需要一定的时间,导致整体的接口响应时间较长,请给出优化接口的方案,而且结果必须通过当前接口返回”。或许大家立马想到的都是通过多线程或者通过队列异步来完成,结果延迟返回,问题的难点在于怎么能在当前接口返回最终的结果呢?在学习完RocketMq源码后我找到了最佳方案。多线程(Runnable)结合CountDownLatch以及PriorityBlockingQueue就是答案。

CountDownLatch是什么?

<code>CountDownLatch 是 Java 并发工具库中的一个类,用于同步一个或多个线程,确保某些操作在其他操作完成之前不会继续执行。它能够使一个线程等待其他线程完成各自的工作后再继续执行。

读完CountDownLatch的概述我们大概能猜出它在方案中的作用了吧,描述得也很清晰了。就是在场景是主线程启动了多个工作线程,并等待所有工作线程完成工作后再继续。

主要方法

void await():使当前线程等待,直到计数到达零释放。boolean await(long timeout, TimeUnit unit):使当前线程等待,直到计数到达零或者等待超时才释放。void countDown():递减计数,如果计数到达零,则释放所有等待的线程。long getCount():返回当前计数。

PriorityBlockingQueue是什么?

PriorityBlockingQueue 是 Java 并发包 (java.util.concurrent) 提供的一个线程安全的无界优先级队列。它结合了优先级队列和阻塞队列的特点,在多线程环境下非常有用。默认使用元素的自然顺序(通过实现 Comparable 接口的 compareTo 方法)。你也可以通过提供自定义的 Comparator 实现定制排序逻辑。

在本次方案中就是作为一个内存队列,通知其他独立的业务模块执行操作。

主要方法

boolean add(E e) / boolean offer(E e): 插入指定元素,返回 true 表示插入成功。E take(): 检索并移除队列的头部元素,如果队列为空,则等待直到有元素可用。E poll(long timeout, TimeUnit unit): 检索并移除队列的头部元素,如果队列为空,则等待指定的时间。E peek(): 检索但不移除队列的头部元素,如果队列为空,则返回 nullint size(): 返回队列中的元素数量。

场景描述

根据上面问题,我们模拟一个场景:一个客户订购了一架飞机、一艘轮船、一辆汽车,但是飞机、轮船、汽车各自的工厂都相隔很远,客户想以最短的时间同时拥有它们,但生性多疑的他,为了防止制造厂商偷工减料他提出想亲自监工这三个交通工具的制造。请问客户与制造厂该怎么配合才能在客户亲自监工的情况下又能最快时间的同时拥有汽车、轮船和飞机呢?

一架飞机的制造时间为4s一艘轮船的制造时间为3s一辆汽车的制造时间为2s

方案一:传统方案就是,客户先去飞机厂通知开始制造飞机并现场监督,飞机制造完成后再去轮船厂其次再去汽车厂(顺序任意),抛出中间路程的时间,总共用时至少得 4+3+2=9s,时间太长客户不接受。

方案二:客户通过手机同时给飞机、轮船以及汽车厂发生消息通知他们开始制造,并且在三个厂商制造间安装监控,客户通过同时监督它们制造,因为飞机制造时间最长,那么当飞机制造完成,轮船和汽车肯定也已经完成了,那么总共用时最快4s,完美解决。

解决方案

 GitHub源码地址:点击获取

首先通过idea创建一个springboot项目

定义统一工厂制造类

每个工厂都有制造方法FactoryService

<code>public interface FactoryService {

//制造

boolean factoryOfManufacture();

}

定义制造厂

飞机、轮船、汽车都是独立的制造厂家,所以需要三个独立的线程来处理.

定义公共抽象线程类ServiceThread并实现多线程Runnablel类的启动方法start()

@Slf4j

public abstract class ServiceThread implements Runnable{

protected Thread thread;

protected boolean isDaemon = false;

//Make it able to restart the thread

private final AtomicBoolean started = new AtomicBoolean(false);

public ServiceThread() {

}

//获取线程名称

public abstract String getServiceName();

//线程启动方法

public void start() {

log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);

if (!started.compareAndSet(false, true)) {

return;

}

this.thread = new Thread(this, getServiceName());

this.thread.setDaemon(isDaemon);

this.thread.start();

log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);

}

}

定义飞机厂类AirPlaneService基础线程类ServiceThread以及实现工厂类FactoryService

@Service

@Slf4j

public class AirPlaneService extends ServiceThread implements FactoryService {

public static ConcurrentMap<String, AirplaneRequest> requestTable =

new ConcurrentHashMap<>();

//接收制造请求通知

public static PriorityBlockingQueue<AirplaneRequest> requestQueue =

new PriorityBlockingQueue<>();

//请求通知静态内部类对象

public static class AirplaneRequest implements Comparable<AirplaneRequest> {

//线程完成设置为0

private CountDownLatch countDownLatch = new CountDownLatch(1);

//用户id

private String userId;

public CountDownLatch getCountDownLatch() {

return countDownLatch;

}

public void setCountDownLatch(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

}

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

@Override

public int compareTo(AirplaneRequest o) {

return 0;

}

}

//获取当前线程名称并设置线程名称

@Override

public String getServiceName() {

return AirPlaneService.class.getSimpleName();

}

//执行线程

@Override

public void run() {

log.info("飞机工厂启动-----------");

//循环处理不同的请求通知

while (this.factoryOfManufacture()) ;

}

public boolean factoryOfManufacture() {

boolean isSuccess = false;

AirplaneRequest airplaneRequest = null;

try {

//等待飞机制造请求

airplaneRequest = requestQueue.take();

log.info("开始飞机制造-----------");

//校验数据是否合法

AirplaneRequest expectedRequest = this.requestTable.get(airplaneRequest.getUserId());

if (null == expectedRequest) {

log.warn("this mmap request expired, maybe cause timeout " + airplaneRequest.getUserId());

return true;

}

if (expectedRequest != airplaneRequest) {

log.warn("never expected here, maybe cause timeout " + airplaneRequest.getUserId());

return true;

}

//...业务处理

Thread.sleep(4000);

//...

isSuccess = true;

} catch (InterruptedException e) {

log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");

return false;

} finally {

if (airplaneRequest != null && isSuccess) {

log.info("飞机制造完成啦-----------");

airplaneRequest.getCountDownLatch().countDown();

}

}

return true;

}

}

定义轮船厂类ShipService基础线程类ServiceThread以及实现工厂类FactoryService

@Service

@Slf4j

public class ShipService extends ServiceThread implements FactoryService {

public static ConcurrentMap<String, ShipRequest> requestTable =

new ConcurrentHashMap<>();

public static PriorityBlockingQueue<ShipRequest> requestQueue =

new PriorityBlockingQueue<>();

public static class ShipRequest implements Comparable<ShipRequest> {

private CountDownLatch countDownLatch = new CountDownLatch(1);

private String userId;

public CountDownLatch getCountDownLatch() {

return countDownLatch;

}

public void setCountDownLatch(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

}

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

@Override

public int compareTo(ShipRequest o) {

return 0;

}

}

@Override

public String getServiceName() {

return ShipService.class.getSimpleName();

}

@Override

public void run() {

log.info("轮船工厂启动-----------");

while (this.factoryOfManufacture()) ;

}

@Override

public boolean factoryOfManufacture() {

boolean isSuccess = false;

ShipRequest shipRequest = null;

try {

shipRequest = requestQueue.take();

log.info("开始制造轮船-----------");

//校验数据是否合法

ShipRequest expectedRequest = this.requestTable.get(shipRequest.getUserId());

if (null == expectedRequest) {

log.warn("this mmap request expired, maybe cause timeout " + shipRequest.getUserId());

return true;

}

if (expectedRequest != shipRequest) {

log.warn("never expected here, maybe cause timeout " + shipRequest.getUserId());

return true;

}

//...业务处理

Thread.sleep(3000);

//...

isSuccess = true;

} catch (InterruptedException e) {

log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");

return false;

} finally {

if (shipRequest != null && isSuccess) {

log.info("轮船制造完成啦-----------");

shipRequest.getCountDownLatch().countDown();

}

}

return true;

}

}

定义汽车厂类CarService基础线程类ServiceThread以及实现工厂类FactoryService

@Service

@Slf4j

public class CarService extends ServiceThread implements FactoryService {

public static ConcurrentMap<String, CarRequest> requestTable =

new ConcurrentHashMap<>();

public static PriorityBlockingQueue<CarRequest> requestQueue =

new PriorityBlockingQueue<>();

public static class CarRequest implements Comparable<CarRequest> {

private CountDownLatch countDownLatch = new CountDownLatch(1);

private String userId;

public CountDownLatch getCountDownLatch() {

return countDownLatch;

}

public void setCountDownLatch(CountDownLatch countDownLatch) {

this.countDownLatch = countDownLatch;

}

public String getUserId() {

return userId;

}

public void setUserId(String userId) {

this.userId = userId;

}

@Override

public int compareTo(CarRequest o) {

return 0;

}

}

@Override

public String getServiceName() {

return CarService.class.getSimpleName();

}

@Override

public void run() {

log.info("汽车工厂启动-----------");

while (this.factoryOfManufacture());

}

@Override

public boolean factoryOfManufacture() {

boolean isSuccess = false;

CarRequest carRequest = null;

try {

carRequest = requestQueue.take();

log.info("开始汽车制造-----------");

//校验数据是否合法

CarRequest expectedRequest = this.requestTable.get(carRequest.getUserId());

if (null == expectedRequest) {

log.warn("this mmap request expired, maybe cause timeout " + carRequest.getUserId());

return true;

}

if (expectedRequest != carRequest) {

log.warn("never expected here, maybe cause timeout " + carRequest.getUserId());

return true;

}

//...业务处理

Thread.sleep(2000);

//...

isSuccess = true;

} catch (InterruptedException e) {

log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");

return false;

} finally {

if (carRequest != null && isSuccess) {

log.info("汽车制造完成啦-----------");

carRequest.getCountDownLatch().countDown();

}

}

return true;

}

}

定义客户请求实现

@Service

public interface PurchaseService {

//请求制造

Boolean manufacturing(String idCard);

}

实现逻辑类

@Service

@Slf4j

public class PurchaseServiceImpl implements PurchaseService {

//超时时间

private static int waitTimeOut = 1000 * 5;

@Autowired

private AirPlaneService airPlaneService;

@Autowired

private CarService carService;

@Autowired

private ShipService shipService;

@Override

public Boolean manufacturing(String userId) {

long startTime = System.currentTimeMillis();

//通知飞机厂计算飞机制造

AirPlaneService.AirplaneRequest airplaneRequest = new AirPlaneService.AirplaneRequest();

airplaneRequest.setUserId(userId);

airPlaneService.requestTable.put(userId, airplaneRequest);

airPlaneService.requestQueue.offer(airplaneRequest);

//通知汽车厂计算汽车制造

CarService.CarRequest carRequest = new CarService.CarRequest();

carRequest.setUserId(userId);

carService.requestTable.put(userId, carRequest);

carService.requestQueue.offer(carRequest);

//通知轮船厂计算轮船制造

ShipService.ShipRequest shipRequest = new ShipService.ShipRequest();

shipRequest.setUserId(userId);

shipService.requestTable.put(userId, shipRequest);

shipService.requestQueue.offer(shipRequest);

//获取飞机制造

AirPlaneService.AirplaneRequest airplaneOK = airPlaneService.requestTable.get(userId);

//获取飞机制造

CarService.CarRequest carOK = carService.requestTable.get(userId);

//获取轮船制造

ShipService.ShipRequest shipOK = shipService.requestTable.get(userId);

try {

//等待获取制造结果

if (airplaneOK != null && carOK!= null && shipOK!= null) {

boolean waitOK = airplaneOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

boolean waitOK2 = carOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

boolean waitOK3 = shipOK.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);

//如果都成功了,返回true

if (waitOK && waitOK2 && waitOK3) {

log.info("总共用时:"+(System.currentTimeMillis() - startTime));

airPlaneService.requestTable.remove(userId);

carService.requestTable.remove(userId);

shipService.requestTable.remove(userId);

return true;

}

}

} catch (InterruptedException e) {

Thread.currentThread().interrupt();

throw new RuntimeException(e);

}

System.out.println("失败了总共用时:"+(System.currentTimeMillis() - startTime));

return false;

}

}

定义控制器

@RestController

public class UserController {

@Autowired

private PurchaseService purchaseService;

@GetMapping("/manufacturing")

public Boolean manufacturing(String userId) {

return purchaseService.manufacturing(userId);

}

}

定义启动类

同时启动飞机、轮船以及汽车的厂家线程

@SpringBootApplication

public class AsyncOneApplication {

public static void main(String[] args) {

SpringApplication.run(AsyncOneApplication.class, args);

//启动飞机制造线程

AirPlaneService airPlaneService = new AirPlaneService();

airPlaneService.start();

//启动汽车制造线程

CarService carService = new CarService();

carService.start();

//启动轮船制造线程

ShipService shipService = new ShipService();

shipService.start();

}

}

结果呈现

启动项目

可以看到飞机、汽车以及轮船各自的线程都启动了并成功设置了对应的线程名称

并且第一条日志输出成功,因为内存队列requestQueue中目前没有数据,所以线程阻塞。其他两个同理。

请求制造操作

请求接口返回true

查看日志

可以发现,飞机、轮船、汽车的制造都完成了,并且和前面说的一样用时4s左右,完美实现。

可能有人会发出疑问,为何每个服务中的请求对象中都要单独定义CountDownLatch,难道不可以定义一个全局的CountDownLatch数字设置为3就可以了,每个线程完成了就减1就可以了,其实这种方式也可以,但考虑到实际业务中3个服务线程并不一定会被同时投递,或者小明只单独订购飞机呢是吧,所以对每个线程的请求对象单独进行维护CountDownLatch。


总结

在本文中,我们深入探讨了如何优化一个复杂的API请求,涉及多个独立业务模块且需要尽快返回结果的问题。通过结合多线程(Runnable)、CountDownLatch以及PriorityBlockingQueue,我们实现了高效的并行处理有序的任务管理,成功地在当前接口返回了最终的结果。这一解决方案不仅提高了系统性能,还确保了结果的及时返回,具有很强的实用性和可操作性。希望本文提供的思路和实践能为大家在类似场景中提供有效的解决方案。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。