目录
前言
循环操作的代码
使用手动事务的操作代码
尝试多线程进行数据修改
基于两个 CountDownLatch 控制多线程事务提交
基于 TransactionStatus 集合来控制多线程事务提交
使用 union 连接多个 select 实现批量 update
总结
前言
项目概况如下:
项目代码基于:MySQL 数据
开发框架为:SpringBoot、Mybatis
开发语言为:Java8
项目代码:
1 https://gitee.com/john273766764/springboot-mybatis-threads
公司业务中遇到一个需求,需要同时修改最多约 5 万条数据,而且还不支持批量或异步修改操作。于是只能写个 for 循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。具体操作如下。
循环操作的代码
先写一个最简单的 for 循环代码,看看耗时情况怎么样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test void updateStudent () { List<Student> allStudents = studentMapper.getAll(); allStudents.forEach(s -> { String teacher = s.getTeacher(); String newTeacher = "TNO_" + new Random ().nextInt(100 ); s.setTeacher(newTeacher); studentMapper.update(s); }); }
循环修改整体耗时约 1 分 54 秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。
使用手动事务的操作代码
修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Autowired private DataSourceTransactionManager dataSourceTransactionManager;@Autowired private TransactionDefinition transactionDefinition;@Test void updateStudentWithTrans () { List<Student> allStudents = studentMapper.getAll(); TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { allStudents.forEach(s -> { String teacher = s.getTeacher(); String newTeacher = "TNO_" + new Random ().nextInt(100 ); s.setTeacher(newTeacher); studentMapper.update(s); }); dataSourceTransactionManager.commit(transactionStatus); } catch (Throwable e) { dataSourceTransactionManager.rollback(transactionStatus); throw e; } }
添加手动事务操控制后,整体耗时约 24 秒,这相对于自动事务提交的代码,快了约 5 倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。
尝试多线程进行数据修改
添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。
先添加一个 Service 将批量修改操作整合一下,具体代码如下:
StudentServiceImpl.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Service public class StudentServiceImpl implements StudentService { @Autowired private StudentMapper studentMapper; @Autowired private DataSourceTransactionManager dataSourceTransactionManager; @Autowired private TransactionDefinition transactionDefinition; @Override public void updateStudents (List<Student> students, CountDownLatch threadLatch) { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); System.out.println("子线程:" + Thread.currentThread().getName()); try { students.forEach(s -> { String newTeacher = "TNO_" + new Random ().nextInt(100 ); s.setTeacher(newTeacher); studentMapper.update(s); }); dataSourceTransactionManager.commit(transactionStatus); threadLatch.countDown(); } catch (Throwable e) { e.printStackTrace(); dataSourceTransactionManager.rollback(transactionStatus); } } }
批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Autowired private DataSourceTransactionManager dataSourceTransactionManager;@Autowired private TransactionDefinition transactionDefinition;@Autowired private StudentService studentService;@Test void updateStudentWithThreads () { List<Student> allStudents = studentMapper.getAll(); final Integer threadCount = 100 ; final Integer dataPartionLength = (allStudents.size() + threadCount - 1 ) / threadCount; ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch (threadCount); for (int i = 0 ; i < threadCount; i++) { List<Student> threadDatas = allStudents.stream() .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList()); studentThreadPool.execute(() -> { studentService.updateStudents(threadDatas, threadLatchs); }); } try { threadLatchs.await(30 , TimeUnit.SECONDS); } catch (Throwable e) { e.printStackTrace(); } System.out.println("主线程完成" ); }
多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,多线程修改 50000 条数据时,不同线程数耗时对比(秒)。
根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在 2-5 个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。
基于两个 CountDownLatch 控制多线程事务提交
由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务。
这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。
我们对代码进行了一点修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Override public void updateStudentsThread (List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); System.out.println("子线程:" + Thread.currentThread().getName()); try { students.forEach(s -> { String newTeacher = "TNO_" + new Random ().nextInt(100 ); s.setTeacher(newTeacher); studentMapper.update(s); }); } catch (Throwable e) { taskStatus.setIsError(); } finally { threadLatch.countDown(); } try { mainLatch.await(); } catch (Throwable e) { taskStatus.setIsError(); } if (taskStatus.getIsError()) { dataSourceTransactionManager.rollback(transactionStatus); } else { dataSourceTransactionManager.commit(transactionStatus); } } @Test void updateStudentWithThreadsAndTrans () { List<Student> allStudents = studentMapper.getAll(); final Integer threadCount = 4 ; final Integer dataPartionLength = (allStudents.size() + threadCount - 1 ) / threadCount; ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch (threadCount); CountDownLatch mainLatch = new CountDownLatch (1 ); StudentTaskError taskStatus = new StudentTaskError (); for (int i = 0 ; i < threadCount; i++) { List<Student> threadDatas = allStudents.stream() .skip(i * dataPartionLength).limit(dataPartionLength) .collect(Collectors.toList()); studentThreadPool.execute(() -> { studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus); }); } try { boolean await = threadLatchs.await(30 , TimeUnit.SECONDS); if (!await) { taskStatus.setIsError(); } } catch (Throwable e) { e.printStackTrace(); taskStatus.setIsError(); } mainLatch.countDown(); studentThreadPool.shutdown(); System.out.println("主线程完成" ); }
本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过 10 个时,执行时就报错。
具体错误内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms. at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309 ) at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400 ) at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373 ) at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58 ) at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3 (StudentTest.java:164 ) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149 ) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624 ) at java.lang.Thread.run(Thread.java:748 ) Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms. at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696 ) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197 ) at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162 ) at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128 ) at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265 ) ... 7 more
错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在 30s 的时候超时了。
由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。
看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为 100。
具体配置如下:
1 2 3 4 5 6 7 8 9 10 11 spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=100 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.max-lifetime=1800000
1 2 再次执行测试发现没有报错,修改线程数为 20 又执行了一下,同样执行成功了。
基于 TransactionStatus 集合来控制多线程事务提交
在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 @Service public class StudentsTransactionThread { @Autowired private StudentMapper studentMapper; @Autowired private StudentService studentService; @Autowired private PlatformTransactionManager transactionManager; List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList <TransactionStatus>()); @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public void updateStudentWithThreadsAndTrans () throws InterruptedException { List<Student> allStudents = studentMapper.getAll(); final Integer threadCount = 2 ; final Integer dataPartionLength = (allStudents.size() + threadCount - 1 ) / threadCount; ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount); CountDownLatch threadLatchs = new CountDownLatch (threadCount); AtomicBoolean isError = new AtomicBoolean (false ); try { for (int i = 0 ; i < threadCount; i++) { List<Student> threadDatas = allStudents.stream() .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList()); studentThreadPool.execute(() -> { try { try { studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas); } catch (Throwable e) { e.printStackTrace(); isError.set(true ); }finally { threadLatchs.countDown(); } } catch (Exception e) { e.printStackTrace(); isError.set(true ); } }); } boolean await = threadLatchs.await(30 , TimeUnit.SECONDS); if (!await) { isError.set(true ); } } catch (Throwable e) { e.printStackTrace(); isError.set(true ); } if (!transactionStatuses.isEmpty()) { if (isError.get()) { transactionStatuses.forEach(s -> transactionManager.rollback(s)); } else { transactionStatuses.forEach(s -> transactionManager.commit(s)); } } System.out.println("主线程完成" ); } } @Override @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class}) public void updateStudentsTransaction (PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) { DefaultTransactionDefinition def = new DefaultTransactionDefinition (); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); transactionStatuses.add(status); students.forEach(s -> { String newTeacher = "TNO_" + new Random ().nextInt(100 ); s.setTeacher(newTeacher); studentMapper.update(s); }); System.out.println("子线程:" + Thread.currentThread().getName()); }
由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用 Jdbc 连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量。
使用 union 连接多个 select 实现批量 update
有些情况写不支持,批量 update,但支持 insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条 select 语句,然后使用 union 连接起来,再使用 update 关联这个数据进行 update。
具体代码演示如下:
1 2 3 4 5 6 7 8 9 10 11 update student,( (select 1 as id,'teacher_A' as teacher) union (select 2 as id,'teacher_A' as teacher) union (select 3 as id,'teacher_A' as teacher) union (select 4 as id,'teacher_A' as teacher) ) as new_teacher set student.teacher= new_teacher.teacher where student.id= new_teacher.id
这种方式在 MySQL 数据库没有配置 allowMultiQueries=true 也可以实现批量更新。
总结
如下:
对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率
多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在 2-5 个线程时操作时间最快。
对于多线程阻塞事务提交时,线程数量不能过多
如果能有办法实现批量更新那是最好
文章来源:https://c1n.cn/by3nt