批量操作实现与优化

news/2025/2/22 17:40:36

1、批量操作

方案设计
  • 基本功能实现

  • java">/**
        * 批量添加题目和题库关联
        *
        * @param questionIdList 题目id列表
        * @param questionBankId 题库id
        * @param loginUser 登录用户
        */
       @Override
       @Transactional(rollbackFor = Exception.class)
       public void batchAddQuestionBankQuestion(List<Long> questionIdList, Long questionBankId, User loginUser) {
           //参数校验
           ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表不能为空");
           ThrowUtils.throwIf(questionBankId <=0, ErrorCode.PARAMS_ERROR, "题目列表不能为空");
           ThrowUtils.throwIf(loginUser == null, ErrorCode.PARAMS_ERROR, "用户未登录");
           //检查题目id是否存在
           List<Question> questions = questionService.listByIds(questionIdList);
           List<Long> QuestionIdList = questions.stream()
                   .map(Question::getId)
                   .collect(Collectors.toList());
           ThrowUtils.throwIf(CollUtil.isEmpty(QuestionIdList), ErrorCode.PARAMS_ERROR, "题目不存在");
           //检查题库id是否存在
           QuestionBank questionBank = questionBankService.getById(questionBankId);
           ThrowUtils.throwIf(questionBank == null, ErrorCode.PARAMS_ERROR, "题库不存在");
           //批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法
           //todo 使用批量插入方法
           for (Long questionId : QuestionIdList) {
               QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
               questionBankQuestion.setQuestionBankId(questionBankId);
               questionBankQuestion.setQuestionId(questionId);
               questionBankQuestion.setUserId(loginUser.getId());
               boolean result = this.save(questionBankQuestion);
               ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "添加失败");
           }
       }
    
批量操作优化
健壮性
  • 对代码进行更详细的异常处理

  • 对代码进行更多的参数校验

    • java">/**
           * 批量添加题目和题库关联
           *
           * @param questionIdList 题目id列表
           * @param questionBankId 题库id
           * @param loginUser 登录用户
           */
          @Override
          @Transactional(rollbackFor = Exception.class)
          public void batchAddQuestionBankQuestion(List<Long> questionIdList, Long questionBankId, User loginUser) {
              //参数校验
              ThrowUtils.throwIf(CollUtil.isEmpty(questionIdList), ErrorCode.PARAMS_ERROR, "题目列表不能为空");
              ThrowUtils.throwIf(questionBankId <=0, ErrorCode.PARAMS_ERROR, "题目列表不能为空");
              ThrowUtils.throwIf(loginUser == null, ErrorCode.PARAMS_ERROR, "用户未登录");
              //检查题目id是否存在
              List<Question> questions = questionService.listByIds(questionIdList);
              List<Long> questionIdLongList = questions.stream()
                      .map(Question::getId)
                      .collect(Collectors.toList());
              ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目不存在");
              //获取合法的题目id列表
              List<Long> questionIdLongList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long)obj);
              ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目不存在");
              //检查题库id是否存在
              QuestionBank questionBank = questionBankService.getById(questionBankId);
              ThrowUtils.throwIf(questionBank == null, ErrorCode.PARAMS_ERROR, "题库不存在");
              //检查题目还不存在与题库中,过滤已存在题库中的题目
              LambdaQueryWrapper<QuestionBankQuestion> lambdaQueryWrapper = Wrappers.lambdaQuery(QuestionBankQuestion.class)
                      .eq(QuestionBankQuestion::getQuestionBankId, questionBankId)
                      .in(QuestionBankQuestion::getQuestionId, questionIdLongList);
              List<QuestionBankQuestion> existQuestionList = this.list(lambdaQueryWrapper);
              //已存在题库中的题目
              Set<Long> questionIdLongSet = existQuestionList.stream()
                      .map(QuestionBankQuestion::getQuestionId)
                      .collect(Collectors.toSet());
              //过滤掉已存在题库中的题目
              questionIdLongList = questionIdLongList.stream().filter(questionId -> {
                  return !questionIdLongSet.contains(questionId);
              }).collect(Collectors.toList());
              ThrowUtils.throwIf(CollUtil.isEmpty(questionIdLongList), ErrorCode.PARAMS_ERROR, "题目都存在与题库中");
      
              //批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法
              //todo 使用批量插入方法
              for (Long questionId : questionIdLongList) {
                  QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
                  questionBankQuestion.setQuestionBankId(questionBankId);
                  questionBankQuestion.setQuestionId(questionId);
                  questionBankQuestion.setUserId(loginUser.getId());
                  try {
                      boolean result = this.save(questionBankQuestion);
                      if (!result) {
                          throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
                      }
                  } catch (DataIntegrityViolationException e) {
                      log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}",
                              questionId, questionBankId, e.getMessage());
                      throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加");
                  } catch (DataAccessException e) {
                      log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}",
                              questionId, questionBankId, e.getMessage());
                      throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败");
                  } catch (Exception e) {
                      // 捕获其他异常,做通用处理
                      log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}",
                              questionId, questionBankId, e.getMessage());
                      throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
                  }
              }
      
稳定性
避免长事务问题
  • java">//批量插入题库题目关联数据,未使用批量插入方法,数据量不是很大,可以考虑使用批量插入方法
            //todo 使用批量插入方法
            //分批次插入题库题目关联数据
            final int batchSize = 1000;
            int questionIdLongListSize = questionIdLongList.size();
            for (int i = 0; i < questionIdLongListSize; i ++) {
                //生成当前批次的题目id列表
                List<Long> currentQuestionIdList = questionIdLongList.subList(i, Math.min(i + batchSize, questionIdLongListSize));
                List<QuestionBankQuestion> questionBankQuestions = currentQuestionIdList.stream()
                        .map(questionId -> {
                            QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
                            questionBankQuestion.setQuestionBankId(questionBankId);
                            questionBankQuestion.setQuestionId(questionId);
                            questionBankQuestion.setUserId(loginUser.getId());
                            return questionBankQuestion;
                        }).collect(Collectors.toList());
            }
            //使用事务管理每个批次的插入操作
            //获取事务代理,防止事务失效
            QuestionBankQuestionServiceImpl questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy();
            questionBankQuestionService.batchAddQuestionBankQuestionInner(notExistQuestionList);
    
  • java">/**
         * 批量添加题目和题库关联 内部方法,不对外暴露
         * @param questionBankQuestionLists 题库题目关联列表
         */
        @Transactional(rollbackFor = Exception.class)
        @Override
        public void batchAddQuestionBankQuestionInner(List<QuestionBankQuestion> questionBankQuestionLists){
            for (QuestionBankQuestion questionBankQuestion : questionBankQuestionLists) {
                Long questionId = questionBankQuestion.getQuestionId();
                Long questionBankId = questionBankQuestion.getQuestionBankId();
                try {
                    boolean result = this.save(questionBankQuestion);
                    if (!result) {
                        throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
                    }
                } catch (DataIntegrityViolationException e) {
                    log.error("数据库唯一键冲突或违反其他完整性约束,题目 id: {}, 题库 id: {}, 错误信息: {}",
                            questionId, questionBankId, e.getMessage());
                    throw new BusinessException(ErrorCode.OPERATION_ERROR, "题目已存在于该题库,无法重复添加");
                } catch (DataAccessException e) {
                    log.error("数据库连接问题、事务问题等导致操作失败,题目 id: {}, 题库 id: {}, 错误信息: {}",
                            questionId, questionBankId, e.getMessage());
                    throw new BusinessException(ErrorCode.OPERATION_ERROR, "数据库操作失败");
                } catch (Exception e) {
                    // 捕获其他异常,做通用处理
                    log.error("添加题目到题库时发生未知错误,题目 id: {}, 题库 id: {}, 错误信息: {}",
                            questionId, questionBankId, e.getMessage());
                    throw new BusinessException(ErrorCode.OPERATION_ERROR, "向题库添加题目失败");
                }
            }
    
        }
    
    
性能
使用批量插入数据
  • java">boolean result = this.saveBatch(questionBankQuestionLists);
    
sql优化
  • java">//检查题目id是否存在,使用了select*
    List<Question> questions = questionService.listByIds(questionIdList);
    
    //优化
    LambdaQueryWrapper<Question> questionLambdaQueryWrapper = Wrappers.lambdaQuery(Question.class)
                    .select(Question::getId)
                    .in(Question::getId, questionIdList);
    List<Question> questions = questionService.list(questionLambdaQueryWrapper);
    
  • java">//因为我们只要id列表,直接将查询到的数据转换为id列表
    List<Long> questionIdLongList = questionService.listObjs(questionLambdaQueryWrapper, obj -> (Long)obj);
    
并发编程
  • CompletableFuture 是Java8中引入的一个类,用于表示异步操作的结果。它是 Future 的增强版本,不仅可以表示一个
    异步计算,还可以对异步计算的结果进行组合、转换和处理,实现异步任务的编排。

  • ?但是要注意,CompletableFuture 默认使用的是 ForkJoinPool.commonpool()方法得到的线程池,这是一个全局共享
    的线程池,如果有多种不同的任务都依赖该线程池进行处理,可能会导致资源争抢、代码阻塞等不确定的问题。所以建议
    针对每种任务,自定义线程池来处理,实现线程池资源的隔离。

  • 如何分配线程池参数

    • 对于计算密集型任务(消耗 CPU 资源),设置核心线程数为 n+1 或者 n(n是 CPU 核心数),可以充分利用 CPU.
      多一个线程是为了可以在某些线程短暂阻塞或执行调度时,确保有足够的线程保持 CPU 繁忙,最大化 CPU 的利用
      率。
    • 对于 I/O 密集型任务(消耗 I/O资源),可以增大核心线程数为 CPU 核心数的2-4倍,可以提升并发执行任务的数
      重。
  • 批处理时是串行的,可以使用并发编程,使其并行,提升运行速度

  • 对于数据库的批量插入业务,是属于IO密集型的任务

    java">//自定义线程池
            //自定义线程池,使用线程池执行插入操作
            ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
                    20,//核心线程数
                    40,//最大线程数
                    60L,//线程存活时间为60秒
                    TimeUnit.SECONDS,//时间单位
                    new ArrayBlockingQueue<>(10000),//阻塞队列大小
                    new ThreadPoolExecutor.DiscardPolicy() //拒绝策略,由调用线程处理
            );
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            //分批次插入题库题目关联数据
            final int batchSize = 1000;
            int questionIdLongListSize = questionIdLongList.size();
            for (int i = 0; i < questionIdLongListSize; i ++) {
                //生成当前批次的题目id列表
                List<Long> currentQuestionIdList = questionIdLongList.subList(i, Math.min(i + batchSize, questionIdLongListSize));
                List<QuestionBankQuestion> questionBankQuestions = currentQuestionIdList.stream()
                        .map(questionId -> {
                            QuestionBankQuestion questionBankQuestion = new QuestionBankQuestion();
                            questionBankQuestion.setQuestionBankId(questionBankId);
                            questionBankQuestion.setQuestionId(questionId);
                            questionBankQuestion.setUserId(loginUser.getId());
                            return questionBankQuestion;
                        }).collect(Collectors.toList());
                //使用事务管理每个批次的插入操作
                //获取事务代理,防止事务失效
                QuestionBankQuestionServiceImpl questionBankQuestionService = (QuestionBankQuestionServiceImpl) AopContext.currentProxy();
                //异步执行插入操作,生成异步任务
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    questionBankQuestionService.batchAddQuestionBankQuestionInner(questionBankQuestions);
                }, customExecutor).exceptionally(ex -> {
                    log.error("批处理插入失败", ex);
                    return null;
                });
                //添加异步任务到列表
                futures.add(future);
            }
            //等待所有异步任务完成
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            //关闭线程池
            customExecutor.shutdown();
        }
    
数据库连接池优化
  • 数据库连接池是用于管理与数据库之间连接的资源池,它能够 复用 现有的数据库连接,而不是在每次请求时都新建和销毁连接,从而提升系统的性能和响应速度。

    • java"><!--            druid数据库连接池-->
              <dependency>
                  <groupId>com.alibaba</groupId>
                  <artifactId>druid</artifactId>
                  <version>1.2.23</version>
              </dependency>
      
    • 配置druid

      • spring:
          # 数据源配置
          datasource:
            driver-class-name: com.mysql.cj.jdbc.Driver
            url: jdbc:mysql://localhost:3306/interview_bank
            username: root
            password: zzj0806
            # 指定数据源类型
            type: com.alibaba.druid.pool.DruidDataSource
            # Druid 配置
            druid:
              # 配置初始化大小、最小、最大
              initial-size: 10
              minIdle: 10
              max-active: 10
              # 配置获取连接等待超时的时间(单位:毫秒)
              max-wait: 60000
              # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
              time-between-eviction-runs-millis: 2000
              # 配置一个连接在池中最小生存的时间,单位是毫秒
              min-evictable-idle-time-millis: 600000
              max-evictable-idle-time-millis: 900000
              # 用来测试连接是否可用的SQL语句,默认值每种数据库都不相同,这是mysql
              validationQuery: select 1
              # 应用向连接池申请连接,并且testOnBorrow为false时,连接池将会判断连接是否处于空闲状态,如果是,则验证这条连接是否可用
              testWhileIdle: true
              # 如果为true,默认是false,应用向连接池申请连接时,连接池会判断这条连接是否是可用的
              testOnBorrow: false
              # 如果为true(默认false),当应用使用完连接,连接池回收连接的时候会判断该连接是否还可用
              testOnReturn: false
              # 是否缓存preparedStatement,也就是PSCache。PSCache对支持游标的数据库性能提升巨大,比如说oracle
              poolPreparedStatements: true
              # 要启用PSCache,必须配置大于0,当大于0时, poolPreparedStatements自动触发修改为true,
              # 在Druid中,不会存在Oracle下PSCache占用内存过多的问题,
              # 可以把这个数值配置大一些,比如说100
              maxOpenPreparedStatements: 20
              # 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作
              keepAlive: true
              # Spring 监控,利用aop 对指定接口的执行时间,jdbc数进行记录
              aop-patterns: "com.springboot.template.dao.*"
              ########### 启用内置过滤器(第一个 stat 必须,否则监控不到SQL)##########
              filters: stat,wall,log4j2
              # 自己配置监控统计拦截的filter
              filter:
                # 开启druiddatasource的状态监控
                stat:
                  enabled: true
                  db-type: mysql
                  # 开启慢sql监控,超过2s 就认为是慢sql,记录到日志中
                  log-slow-sql: true
                  slow-sql-millis: 2000
                # 日志监控,使用slf4j 进行日志输出
                slf4j:
                  enabled: true
                  statement-log-error-enabled: true
                  statement-create-after-log-enabled: false
                  statement-close-after-log-enabled: false
                  result-set-open-after-log-enabled: false
                  result-set-close-after-log-enabled: false
              ########## 配置WebStatFilter,用于采集web关联监控的数据 ##########
              web-stat-filter:
                enabled: true                   # 启动 StatFilter
                url-pattern: /* # 过滤所有url
                exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" # 排除一些不必要的url
                session-stat-enable: true       # 开启session统计功能
                session-stat-max-count: 1000 # session的最大个数,默认100
              ########## 配置StatViewServlet(监控页面),用于展示Druid的统计信息 ##########
              stat-view-servlet:
                enabled: true                   # 启用StatViewServlet
                url-pattern: /druid/* # 访问内置监控页面的路径,内置监控页面的首页是/druid/index.html
                reset-enable: false              # 不允许清空统计数据,重新计算
                login-username: root # 配置监控页面访问密码
                login-password: 123
                allow: 127.0.0.1 # 允许访问的地址,如果allow没有配置或者为空,则允许所有访问
                deny: # 拒绝访问的地址,deny优先于allow,如果在deny列表中,就算在allow列表中,也会被拒绝
        
        
数据一致性
添加事务
使用分布式锁
  • 使用Redis和redisson分布式锁实现
使用乐观锁
  • 简称CAS,给数据添加版本号。
可观测性
日志记录
监控
  • 例如druid监控等等
返回值优化
  • 定义一个返回值对象,包含失败的原因,帮助定位失败的信息

    • java">public class BatchAddResult {
          private int total;
          private int successCount;
          private int failureCount;
          private List<String> failureReasons;
      }
      
  • 示例代码

    • java">public BatchAddResult batchAddQuestionsToBank(List<Long> questionIdList, Long questionBankId, User loginUser) {
          BatchAddResult result = new BatchAddResult();
          result.setTotal(questionIdList.size());
      
          // 执行批量插入逻辑
          for (Long questionId : questionIdList) {
              try {
                  // 插入操作
                  saveQuestionToBank(questionId, questionBankId, loginUser);
                  result.setSuccessCount(result.getSuccessCount() + 1);
              } catch (Exception e) {
                  result.setFailureCount(result.getFailureCount() + 1);
                  result.getFailureReasons().add("题目ID " + questionId + " 插入失败:" + e.getMessage());
              }
          }
      
          return result; // 返回批量处理的结果
      }
      
      

http://www.niftyadmin.cn/n/5862588.html

相关文章

从0开始的操作系统手搓教程9:更好的内核1——简单讨论一下C与ASM的混合编程

现在&#xff0c;我们已经成功的进入了我们的内核。在之后更长的一段时间&#xff0c;我们可以使用更加高级的编程语言&#xff0c;也就是我们的C语言&#xff0c;而不是汇编来完成我们的工作。 C语言的确被广泛认为是一门非常接近硬件的编程语言&#xff0c;但是&#xff0c;…

【Excel】【VBA】根据内容调整打印区域

Excel VBA&#xff1a;自动调整打印区域的实用代码解析 在Excel中&#xff0c;我们经常需要调整打印区域。今天介绍一段VBA代码&#xff0c;它可以根据C列的内容自动调整打印区域。 Dim ws As Worksheet Dim lastRow As Long Dim r As Long 设置当前工作表 Set ws ActiveSh…

【目标检测】【YOLOv4】YOLOv4:目标检测的最佳速度与精度

YOLOv4&#xff1a;目标检测的最佳速度与精度 0.论文摘要 有许多特征被认为可以提高卷积神经网络&#xff08;CNN&#xff09;的准确性。需要在大规模数据集上对这些特征的组合进行实际测试&#xff0c;并对结果进行理论上的验证。某些特征仅适用于特定模型和特定问题&#…

网络安全入门持续学习与进阶路径(一)

职业认证与实战进阶 1 考取核心安全认证 认证名称适合人群考试要求考试时间/费用核心价值注意点CEH&#xff08;道德黑客认证&#xff09;渗透测试方向&#xff0c;入门级无强制经验要求&#xff0c;需参加官方培训&#xff08;或自学&#xff09;4小时&#xff0c;125题&…

基于云的物联网系统用于实时有害藻华监测:通过MQTT和REST API无缝集成ThingsBoard

论文标题 **英文标题&#xff1a;**Cloud-Based IoT System for Real-Time Harmful Algal Bloom Monitoring: Seamless ThingsBoard Integration via MQTT and REST API **中文标题&#xff1a;**基于云的物联网系统用于实时有害藻华监测&#xff1a;通过MQTT和REST API无缝集…

深度学习笔记16-VGG-16算法-Pytorch实现人脸识别

目录 前言 一、 前期准备 1. 设置GPU 2. 导入数据 3. 划分数据集 二、调用官方的VGG-16模型 三、 训练模型 1. 编写训练函数 2. 编写测试函数 3. 设置动态学习率 4. 正式训练 四、 结果可视化 1. Loss与Accuracy图 2. 指定图片进行预测 3. 模型评估 五、总结 前言 &#x1f368…

什么是矩阵账号?如何高效运营tiktok矩阵账号

‍‌​​‌‌​‌​‍‌​​​‌‌​​‍‌​​​‌​‌​‍‌​​‌​​‌​‍‌​‌‌​‌‌‌‍‌​‌​‌​​​‍‌​​‌​‌‌​‍‌​​​​‌‌​‍‌​‌​​‌‌‌‍‌​​‌‌​‌​‍‌​‌​​‌‌‌‍‌​‌‌‌​​‌‍‌‌​​‌‌‌​‍‌‌​​‌‌​​‍‌…

SpringBoot有几种获取Request对象的方法

HttpServletRequest 简称 Request&#xff0c;它是一个 Servlet API 提供的对象&#xff0c;用于获取客户端发起的 HTTP 请求信息。例如&#xff1a;获取请求参数、获取请求头、获取 Session 会话信息、获取请求的 IP 地址等信息。 那么问题来了&#xff0c;在 Spring Boot 中…