多线程之JdbcSession中断问题
前言
之前重构过一个基于 RabbitMQ 的多线程队列消费项目,原项目是用 Python 实现,我用 Java 重写,过程中也遇到过不少问题,一路披荆斩棘。 简单记一下曾经遇到过的问题:
- 向 MongoDB 批量写入时会随着 MongoDB 中已存在的数据量增多而变的越来越慢。解决方法是,在 MongoDB 的官网找到了
Capped Collection
的介绍,其中有一段这样的描述Capped collections are fixed-size collections that support high-throughput operations that insert and retrieve documents based on insertion order
,是说在写入数据时Capped collections
支持高吞吐量操作。 - 向 MongoB 批量写入时会出现重复数据。解决方法是,当有一个或多个属性值需要唯一时,给这一个或多个属性设置唯一索引。
- 向 Mysql 并发写入数据时会出现死锁(Gap Locks)。解决方法是,修改 sql 语句,在删除前先查询需要删除的数据是否存在,然后按照主键删除,当删除多条数据时注意 id 用
or
来连接,避免用in
操作;批量更新/插入数据时,使用INSERT INTO ... ON DUPLICATE KEY UPDATE
,注意这并不是标准SQL语法,而是MYSQL的特意语法,避免用REPLACE INTO ...
,因为这种方式会先将原记录删除再插入新记录,依然容易造成死锁。这些点注意之后可以尽量避免死锁,但仍不能完全避免死锁,遇到死锁问题还是得重试。
今天遇到的问题是多线程向 MYSQL 写入时,抛出了Could not open JDBC Connection for transaction; nested exception is java.sql.SQLException: interrupt
这个异常。在开发环境及测试环境从未出现过,重构版很早就部署上线了,之前仿真环境和真实环境也没出现过这个问题,偏偏今天刚在将要发版的时候,在仿真环境出现了这个异常。起初能想到的可能是网络问题,但这是运维的事而且没凭没据的也不好找别人,只能自己先处理一番看看。
1. 异常分析
异常堆栈:
03:54:22.109 [rps--1] ERROR c.f.f.r.s.c.SymbolDataExchangeRunner 102 - consumer error: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLException: interrupt
org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLException: interrupt
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:305)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:595)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:382)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.finpoints.framework.rabbitmq.symbol.consumer.service.SymbolHandleService$$EnhancerBySpringCGLIB$$22bf6655.handleMoreMT5(<generated>)
at com.finpoints.framework.rabbitmq.symbol.consumer.handler.SymbolHandler.handleMore(SymbolHandler.java:96)
at com.finpoints.framework.rabbitmq.symbol.consumer.handler.SymbolHandler.handle(SymbolHandler.java:47)
at com.finpoints.framework.rabbitmq.symbol.consumer.SymbolDataExchangeRunner.run(SymbolDataExchangeRunner.java:100)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.sql.SQLException: interrupt
at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1635)
at com.alibaba.druid.pool.DruidDataSource.getConnectionDirect(DruidDataSource.java:1427)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1407)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:1397)
at com.alibaba.druid.pool.DruidDataSource.getConnection(DruidDataSource.java:100)
at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:261)
... 17 more
Caused by: java.lang.InterruptedException
at java.base/java.util.concurrent.locks.ReentrantLock$Sync.lockInterruptibly(ReentrantLock.java:159)
at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:372)
at com.alibaba.druid.pool.DruidDataSource.getConnectionInternal(DruidDataSource.java:1632)
... 22 more
本项目中 JDK 版本为 1.8,使用到和数据库操作有关的jar包依赖如下:
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
</dependencies>
通过异常栈可以知道是由于interrupt
问题导致的Could not open JDBC Connection for transaction
问题,而interrupt
问题产生在DruidDataSource.java
文件的1635
行。于是我们打开这个文件并定位到这一行:
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e); // line: 1635
}
那么指定是lock.lockInterruptibly();
方法中抛出的异常,于是进入到这个方法内部查看,好在内部并不复杂,最终找到了如下方法:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
这下知道了,当Thread.interrupted()
线程发生了中断时就会抛出InterruptedException
,就是上文异常堆栈的栈底抛出来的异常,然后导致Druid
出现interrupt
,进而导致Spring
产生Could not open JDBC Connection for transaction
。
2. 解决问题
从上文中我们可以分析出,问题产生的原因是,当某个线程在打开 jdbc 会话时,可能由于网络延迟或中断等原因,未能在预期时间内建立好会话连接,由于在多线程环境,接着 CPU 调度到了其他线程,导致这个线程失去了 CPU 执行权,于是产生了中断,然后抛出这个异常栈。
既然知道原因,那就可以对症下药了。只要程序中设置的最大线程不超过当前 CPU 的最大线程数,即可保证当前线程任务执行时不会因为发生 CPU 调度而产生中断。
3.线程池介绍
public static void main(String[] args) throws InterruptedException {
// 这里是注册了一个钩子函数,在主线程结束的时候被执行,写在主函数的任意位置都可以
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("主线程结束!");
}));
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,
32,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new BasicThreadFactory.Builder().namingPattern("rps-%s").build(),// BasicThreadFactory 是 commons-lang3 包下的
new ThreadPoolExecutor.CallerRunsPolicy()// 拒绝策略
);
int count = 64;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
final int fi = i;
Runnable task = () -> {
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("值: %d,线程:%s%n", fi, Thread.currentThread().getName());
latch.countDown();
};
pool.execute(task);
}
latch.await();
System.out.println("关闭线程池...");
pool.shutdown();
}
解析:
这段代码先是注册了一个钩子(Hook)函数,它会在主线程结束的时候被执行,因此写在主函数的任意位置都可以。
创建线程池:
阿里巴巴编程规范中禁止使用Executors
方法创建线程池,拿Executors.newFixedThreadPool(5)
举例,其内部创建的阻塞队列大小是Integer.MAX_VALUE
,这个值是0x7fffffff
也就是2^31-1
很大,意味着很容易放入足够使内存溢出的任务数量,因此强制要求自定义线程池,这就要我们了解线程池参数的含义。
自定义线程池一般会问 5 参构造器和 7 参构造器,5 参构造器的区别在于最后两个参数使用的使用默认值,下面介绍这 7 个参数的含义(按照参数顺序):
- corePoolSize:线程池创建完成后,将在池中维持的线程个数;
- maximumPoolSize:线程池中最大的线程个数;
- keepAliveTime:当线程处于空闲状态时,将在指定的延迟时间后销毁一定数量,然后保持线程个数为 corePoolSize 个;
- unit:上一个参数的时间单位;
- workQueue:阻塞队列。用来存放待执行的任务;
- threadFactory:线程工厂。用来自定义创建的线程;
- handler:拒绝策略。当任务数量达到指定的上限后,线程池的处理策略;
Java 中的四大拒绝策略(handler):
CallerRunsPolicy
: 只要线程池没有关闭,就由提交任务的线程处理(这里是 main 线程)。一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了;AbortPolicy
: 直接抛出拒绝执行的异常,打断当前执行流程。ThreadPoolExecutor中默认的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程;DiscardPolicy
: 不触发任何动作。如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了;DiscardOldestPolicy
: 如果线程池未关闭,就弹出队列头部的元素,然后尝试执行。这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较;
从上面可以看出DiscardPolicy
和DiscardOldestPolicy
都是会悄悄的丢掉任务,一般较少有场景会使用这个,而AbortPolicy
则是会抛出异常,CallerRunsPolicy
则保证任务一定会被执行。
此外,new SynchronousQueue<>()
相当于new LinkedBlockingQueue<>(1)
,队列长度为 1。
接下来说说workQueue
、maximumPoolSize
以及handler
这三个参数搭配会有不同的表现。以我本机配置(8核16线程)举例。
情景一: 当 workQueue =
new LinkedBlockingQueue<>()
,默认是链表无界阻塞队列,待执行的任务会放到这个队列中,遵循 FIFO 的顺序。此时无论maximumPoolSize
、handler
和待执行的任务数为多少,线程池中都将只会派出一个线程来执行这所有的任务,这种情况其实已经不算多线程了,而是单线程按照任务顺序挨个执行。情景二: 当 workQueue =
new LinkedBlockingQueue<>(n)
或new ArrayBlockingQueue<>(n)
,也就是指定队列中存放的最大任务数 n,设待执行的任务数为 count,CPU 的最大线程数为 threads。
若 count <= n <= maximumPoolSize <= threads,这种情况达不到拒绝策略的触发条件,而且也没有超过 CPU 的最大线程数,此时 handler 的值就不重要了。
但实际上 threads 是有限的,而 count 一般会远大于 threads,否则也用不着多线程。而 maximumPoolSize 也会大于 threads,就会发生线程之间的时间调度,可能造成线程不安全
若 count > n: handler = new ThreadPoolExecutor.CallerRunsPolicy()
时,则调用者线程(此处是main线程)会执行任务,在 maximumPoolSize 之外,用 main 线程执行。
handler = new ThreadPoolExecutor.AbortPolicy()
时,则直接抛出异常,终止执行。
上面说的可能有点绕,总之,是否会发生 CPU 时间调度则要看 maximumPoolSize 是否大于 threads,大于就会发生时间调度;是否触发拒绝策略要看待执行任务数是否超过阻塞队列的上限,超过了就会触发响应的拒绝策略。
另外说说execute()
和submit()
方法的区别:
execute()
只能接收Runnable
参数,submit()
可以接收Runnable
和Callable
参数。Runnable
没有返回值,不能抛异常,Callable
有返回值,且可以抛异常;execute()
方法无返回值,submit()
方法有返回值;execute()
是Executor
接口中的方法,submit()
是ExecutorService
接口中的方法。ExecutorService
接口继承于Executor
接口,从命名可以看出来Executor
只管执行,ExecutorService
则是线程服务,包括了线程的调用、中断、等待、关闭等各中标志的操作;
纸上得来终觉浅,觉知此事要躬行。运行上面的示例代码,根据本机 CPU 的线程数情况,尝试修改count
、maximumPoolSize
、workQueue
、handler
这几个值试试看。
再这说说最大线程数该如何设置,一般普通版的 CPU 一个核心上也就运行一个线程,但也有的 CPU 一个核心可以运行 2 个线程。而多线程任务可分为 CPU(计算)密集型和 IO 密集型。
- IO 密集型:线程会在 IO 操作上阻塞,此时就会切换到另一个线程执行任务,而 IO 消耗的时长占任务处理总时长的百分比就是阻塞系数
λ
,可以通过java.lang.management
下的 API 或其他监控工具来测出数值,最大线程数(以我本机为例)公式为maximumPoolSize=16*(1+λ)
。一般阻塞系数会趋近于1,所以一般也就是2倍的线程数,实际应用中可以会考虑多个方面,比如内存容量消耗,任务耗时等,可以对这个公式进行不断的场景调整适配。 - CPU 密集型:即纯计算的任务,阻塞系数趋近于0,频繁切换上下文反而会降低效率,此时就应该减少线程上下文切换,因此
maximumPoolSize=16
,不过一般会设置为线程数+1,这是因为当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
版权所有
版权归属:Mayee