Java多线程相关说明
异步调用分为两类
有返回值:用户需要Callable、Futrue等来接收最终的处理结果,但这个过程是异步非阻塞的。无返回值:用户提交请求到接口之后不需要任何返回值,请求到达服务端之后就没有任何关系了,用户可以不等待而去做其他的业务操作。
多线程调用方法
Callable:有返回值的线程方法,call将会对用户请求做出结果反馈。Runnable:线程的接口,需要实现run方法。Thread:通过Thread类继承重写run方法实现。
多线程线程池
Executor提供多种线程调度的线程池方案。AsyncTaskExecutor异步线程池方案。ThreadPoolTaskExecutor任务线程池方案。
注意:线程池合理分配可以实现服务器CPU高效利用,初始化大小,最大大小,排队队列的大小都会影响系统业务吞吐量,可以使用Jmeter压测接口来验证。切记:接口异步了并不代表所有的任务线程池都能吸纳,如果排队满了,线程池会抛异常,所以为了避免抛异常我们在提交任务到线程池的时候需要加一个本地队列,切不可将所有请求都怼到线程池。线程池的大小有限制,迟早会到达这个边界,到达边界就会异常,所以应该避免。
线程池并不是越大越好,CPU时间片轮转机制实质上一个CPUcore同一时间只能执行一个线程只是这个时间很短暂我们是无法感知的,线程池越大线程越多,切换就越频繁,性能反而下降。
线程数=(core核心数)*2+(磁盘数dis
Springboot实现线程池配置
异步任务线程池,这个从名字上看一眼就知道是Executor的系列接口:
public interface AsyncTaskExecutor extends TaskExecutor {
long TIMEOUT_IMMEDIATE = 0L;
long TIMEOUT_INDEFINITE = 9223372036854775807L;
void execute(Runnable var1, long var2);
Future> submit(Runnable var1);
Future submit(Callable var1);
}
@FunctionalInterface
public interface TaskExecutor extends Executor {
void execute(Runnable var1);
}
/** @since 1.5
* @author Doug Lea
*/
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
在Springboot中可以作为配置使用:
@Configuration
public class AsyncTaskExecutorConfig extends WebMvcConfigurerAdapter {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(-1);
configurer.setTaskExecutor(asyncTaskExecutor());
}
@Bean('asyncTaskExecutor')
public AsyncTaskExecutor asyncTaskExecutor() {
return new SimpleAsyncTaskExecutor('async');
}
}
这个类是比较常用的有界任务线程池。在Springboot中也作为配置类使用:
@Configuration
public class ThreadPoolTaskExecutorConfig {
@Value('${executors.threadPoolTaskExecutor.corePoolSize:10}')
private Integer corePoolSize;
@Value('${executors.threadPoolTaskExecutor.maxPoolSize:500}')
private Integer maxPoolSize;
@Value('${executors.threadPoolTaskExecutor.queueCapacity:500}')
private Integer queueCapacity;
@Value('${executors.threadPoolTaskExecutor.keepAliveSeconds:60}')
private Integer keepAliveSeconds;
@Value('${executors.threadPoolTaskExecutor.allowCoreThreadTimeOut:true}')
private Boolean allowCoreThreadTimeOut;
@Value('${executors.threadPoolTaskExecutor.threadNamePrefix:ThreadPoolTaskExecutor}')
private String threadNamePrefix;
@Value('${executors.threadPoolTaskExecutor.awaitTerminationSeconds:10}')
private Integer awaitTerminationSeconds;
@Value('${executors.threadPoolTaskExecutor.rejectedExecutionHandler:AbortPolicy}')
private String rejectedExecutionHandler;
/**
* 线程池配置【不能将所有任务都提交到线程会产生线程排队,即使线程排队足够大也不能这样处理】
*
* @return
*/
@Bean('taskExecutor')
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// 核心线程数
taskExecutor.setCorePoolSize(corePoolSize);
// 最大线程数
taskExecutor.setMaxPoolSize(maxPoolSize);
// 队列容量大小
taskExecutor.setQueueCapacity(queueCapacity);
// 是否允许核心线程超时
taskExecutor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
// 线程保活时间
taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
// 线程命名前缀规则
taskExecutor.setThreadNamePrefix(threadNamePrefix);
// 等待终止时间(秒)
taskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds);
/**
* @see https://blog.csdn.net/pfnie/article/details/52755769
* 线程池满了之后如何处理:默认是 new AbortPolicy();
*
* (1) ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException;
* (2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
* (3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
* (4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
*/
switch (rejectedExecutionHandler){
case 'AbortPolicy':
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
break;
case 'CallerRunsPolicy':
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
break;
case 'DiscardPolicy':
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
break;
case 'DiscardOldestPolicy':
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
break;
}
// 初始化线程池
taskExecutor.initialize();
return taskExecutor;
}
}
上面的参数配置的比较详细了,这个类所有参数基本都在上面了很简单。后面的示例用的就是该线程池。
Rest风格的异步请求接口
@Async注解
示例如下:
@Slf4j
@RestController
@RequestMapping('/test')
public class AsyncController {
/**
* HTTP心跳发送接口
*
* @return
*/
@RequestMapping('/async/heartBeat.do')
@Async
public void heartBeat(XhyPosition position) {
long start = System.currentTimeMillis();
try {
//mobileService.savePositionBySync(position);
log.info('HTTP心跳发送成功!');
} catch (Exception e) {
e.printStackTrace();
} finally {
long end = System.currentTimeMillis();
log.info('HTTP心跳耗时=' + (end - start));
}
}
}
需要在Springboot启动入口添加@EnableAsync注解。
Callable异步返回值
Callable类型的返回Spring在底层已经做了代理进行处理,不需要通过线程池提交。
@Slf4j
@RestController
@RequestMapping('/test')
public class CallableController {
@Autowired
MobileWebService mobileService;
/**
* HTTP心跳发送接口
*
* @return
*/
@RequestMapping('/callable/heartBeat.do')
public Callable heartBeat(XhyPosition position) {
Callable responseEntityCallable = new Callable() {
@Override
public ResponseEntity call() throws Exception {
//mobileService.savePositionBySync(position);
log.info('HTTP心跳发送成功!');
return ResponseEntity.ok('HTTP心跳发送成功!');
}
};
return responseEntityCallable;
}
}
但值得注意的是同样的写法,没有返回值的回调线程方法是不执行的,需要通过线程池进行提交。
/**
* HTTP心跳发送接口
*
* @return
*/
@RequestMapping('/callable/heartBeat2.do')
public void heartBeat2(XhyPosition position) {
Callable responseEntityCallable = new Callable() {
@Override
public ResponseEntity call() throws Exception {
//mobileService.savePositionBySync(position);
log.info('HTTP心跳发送成功!');
return ResponseEntity.ok('HTTP心跳发送成功!');
}
};
// 必须进行手动提交
threadPoolExecutor.submit(responseEntityCallable);
}
DeferredResult是一个结果的封装类型,SpringMVC推荐使用DeferredResult、Callable、CompletableFuture的返回值来实现异步接口调用。下面是本节示例:
@RestController
@RequestMapping('/test')
public class DeferredResultController {
/**
* HTTP心跳发送接口
*
* @return
*/
@RequestMapping('/deferred/heartBeat.do')
public DeferredResult heartBeat(XhyPosition position) {
DeferredResult deferredResult = new DeferredResult<>();
deferredResult.setResult(StatusCode.SUCCESS);
return deferredResult;
}
}
WebAsyncTask提供了多环节的回调操作,可以满足大多数需求,但Spring并不推荐此种方式的异步。
/**
* @Copyright: 2019-2021
* @FileName: WebAsyncTaskController.java
* @Author: PJL
* @Date: 2020/4/14 10:04
* @Description: 异步接口调用之WebAsyncTask[操作完善:但Spring不推荐使用,推荐使用:DeferredResultController、Callable、CompletableFuture]
*/
@Slf4j
@RestController
@RequestMapping('/test')
public class WebAsyncTaskController {
/**
* 异步线程超时设置
*/
@Value('${system.asyncRequestTimeout:10}')
private Integer asyncRequestTimeout;
@Autowired
MobileWebService mobileService;
@Autowired
AggregationRedisService aggregationRedisService;
@Qualifier('asyncTaskExecutor')
@Autowired
AsyncTaskExecutor executor;
/**
* HTTP[异步请求]心跳发送接口
* @return
*/
@RequestMapping('/webAsyncTask/blank.do')
public WebAsyncTask blank(){
WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> {
return StatusCode.SUCCESS;
});
return webAsyncTask;
}
/**
* HTTP[异步请求]心跳发送接口
* @return
*/
@RequestMapping('/webAsyncTask/heartBeat.do')
public WebAsyncTask heartBeat(XhyPosition position){
WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> {
position.setT(System.currentTimeMillis());
MobileServiceDataQueue.addToPositionQueue(position);
return StatusCode.SUCCESS;
});
// 处理完成异步回调
webAsyncTask.onCompletion(()->{
log.debug('HTTP心跳发送........完成!');
// aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,'HTTP心跳处理完成!');
});
// 超时处理
webAsyncTask.onTimeout(()->{
//log.debug('HTTP心跳发送........超时!');
return StatusCode.TIMEOUT;
});
// 错误处理
webAsyncTask.onError(()->{
log.debug('HTTP心跳发送........错误!');
return StatusCode.ERROR;
});
return webAsyncTask;
}
/**
* HTTP[线程池]心跳发送接口
* @return
*/
@RequestMapping('/webAsyncTask/heartBeatWithThreadPool.do')
public WebAsyncTask heartBeatWithThreadPool(XhyPosition position){
WebAsyncTask webAsyncTaskByPool= new WebAsyncTask(asyncRequestTimeout*1000L,executor,()-> {
position.setT(System.currentTimeMillis());
MobileServiceDataQueue.addToPositionQueue(position);
return StatusCode.SUCCESS;
});
// 处理完成异步回调
webAsyncTaskByPool.onCompletion(()->{
log.debug('HTTP心跳发送....by pool....完成!');
// aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,'HTTP心跳处理完成!');
});
// 超时处理
webAsyncTaskByPool.onTimeout(()->{
log.debug('HTTP心跳发送........超时!');
return StatusCode.TIMEOUT;
});
// 错误处理
webAsyncTaskByPool.onError(()->{
log.debug('HTTP心跳发送........错误!');
return StatusCode.ERROR;
});
return webAsyncTaskByPool;
}
}
多线程异步并发异常
[org.apache.catalina.core.AsyncListenerWrapper]
java.lang.IllegalArgumentException: Cannot dispatch without an AsyncContext
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.dispatch(StandardServletAsyncWebRequest.java:131)
at org.springframework.web.context.request.async.WebAsyncManager.setConcurrentResultAndDispatch(WebAsyncManager.java:391)
at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$2(WebAsyncManager.java:315)
at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.lambda$onError$0(StandardServletAsyncWebRequest.java:146)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onError(StandardServletAsyncWebRequest.java:146)
at org.apache.catalina.core.AsyncListenerWrapper.fireOnError(AsyncListenerWrapper.java:49)
at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:410)
at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:239)
at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:242)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
因为HTTP底层包含了TCP的连接,客户端发送Jmeter方法请求会分配系统临时端口用于客户端请求处理。Jmeter接口请求参数keep_alive需要去掉勾选不然端口不会及时释放。
这个问题可以总结为是:Spring容器异步分发工作机制客户端与服务端在连接断开时没法处理异步请求的目标servlet无法分发所致。这个问题不是程序本身业务代码报错,可以忽略。
模拟Servlet异步请求分发
文章为作者独立观点,不代表观点
michael2023-01-02
不要傻,他们根本不在意连花清瘟好不好,有没有用,他们是在做空以岭药业,我之前看过一个人说,只要每天竞价挂涨停价卖,尾盘撤销。券商就没办法把股票的票卖给做空者。忍受不了就先卖了呗。企稳在回来,何必硬杠呢