跳至主要內容

通用异步调用组件

科哒大约 5 分钟

前言

在我们编写业务代码的时候,避免不了使用异步任务来拆解任务,避免阻塞主流程, 提高执行效率。一下是一些常见的做法:

  1. 使用spring事件机制拆解
  2. 使用mq拆解
  3. 使用java异步线程拆解

但是上面都有一些比较常见的弊端:“专人专用”、代码量较多、执行失败无法感知只有通过日志排查。

现需要一个通用异步组件,比较快捷的实现异步处理,拆分任务。

目的

优点

  1. 基本对代码无入侵,使用独立的数据库,独立定时任务,独立消息队列,独立的人工执行界面,消息告警执行异常。
  2. 使用spring事务事件机制,即使异步策略解析失败也不会影响业务。
  3. 如果你的方法正在运行事务,会等事务提交后或回滚后再处理事件。
  4. 事务提交了,异步策略解析失败了,兜底方案定时补偿执行。

最终目的实现数据的一致性。

原理

  1. 容器初始化bean完成后遍历所有方法,把有@AsyncExec注解的方法缓存起来。(注意:是接口方法)
  2. 方法运行时通过AOP切面发布事件
  3. 事务事件监听处理异步执行策略
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
  • <font style="color:rgb(30, 107, 184);">fallbackExecution=true</font>
  • <font style="color:rgb(30, 107, 184);">TransactionPhase.AFTER_COMPLETION</font>

组件

设计模式

原理流程图

数据库脚本

必备两张数据库表:

  • async_log 记录执行日志。
  • async_req 记录执行的方法和参数。
/*
 Navicat Premium Data Transfer

 Source Server         : localhost
 Source Server Type    : MySQL
 Source Server Version : 80031 (8.0.31)
 Source Host           : localhost:3306
 Source Schema         : fc_async

 Target Server Type    : MySQL
 Target Server Version : 80031 (8.0.31)
 File Encoding         : 65001

 Date: 15/05/2024 14:05:16
*/

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for async_log
-- ----------------------------
DROP TABLE IF EXISTS `async_log`;
CREATE TABLE `async_log`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `async_id` bigint NOT NULL,
  `error_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
  `create_time` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Table structure for async_req
-- ----------------------------
DROP TABLE IF EXISTS `async_req`;
CREATE TABLE `async_req`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `application_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `sign` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `class_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `async_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `exec_status` int NULL DEFAULT NULL,
  `exec_count` int NULL DEFAULT '0',
  `param_json` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
  `remark` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
  `create_time` datetime NULL DEFAULT NULL,
  `update_time` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

异步策略

AsyncTypeEnum

执行类型描述兜底方案
SAVE_ASYNC先保存数据库再异步消息处理保存数据库失败
SYNC_SAVE先同步处理失败再保存数据库保存数据库失败
ASYNC_SAVE先异步消息处理失败再保存数据库保存数据库失败
ASYNC仅异步消息处理发送消息失败
THREAD仅异步线程处理

安全级别

由高到低:SAVE_ASYNC -> SYNC_SAVE ->ASYNC_SAVE ->ASYNC - > THREAD 。

理论上,执行性能,则反之。

执行状态

入库数据,能否顺利运行,执行状态如下:

ExecStatusEnum

字段名称字段值描述
INIT0初始化
ERROR1执行失败
SUCCESS2执行成功

执行流程图

比较复杂一点的是,先保存数据库再异步消息处理的执行逻辑。

其他 执行策略方式,失败处理逻辑基本一致,下面是保存数据库再异步消息处理的执行图。

使用

配置信息

# 开关
async.enabled=true  
#  数据库配置
async.datasource.driver-class-name=com.mysql.jdbc.Driver
async.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
async.datasource.username=root
async.datasource.password=123456

# 线程池参数设置
async.executor.thread.corePoolSize=10

async.executor.thread.maxPoolSize=50

async.executor.thread.queueCapacity=10000

async.executor.thread.keepAliveSeconds=600

# 是否开启物理删除
async.exec.deleted=true

# mq主体
async.topic=kchedule-async

# 重试次数设置
async.exec.count=5

# 重试条数限制
async.retry.limit=100

async.comp.limit=100

# 模块名称
spring.application.name=kchedule-async

# 告警企业微信机器人 key
async.webhook.key=

# 人工处理页面
async.monitor.url=


开启使用

  1. 开关
async.enabled=true  
  1. 在需要异步执行的方法加注解 (必须是spring代理方法)

    这里,一定必须是spring代理方法, 否则获取不到。

  @AsyncExec(type = AsyncTypeEnum.SAVE_ASYNC, remark = "测试异步")
  1. 人工处理页面
async.monitor.url=

自我对接界面展示数据

  1. 企业微信告警

注意事项

spring.application.name

${async.topic:${spring.application.name}}_async_queue

自定义topic:<font style="color:rgb(30, 107, 184);">async.topic=xxx</font>

https://gitee.com/keyyds/async-keopen in new window

上次编辑于:
贡献者: 黄科铭