通用异步调用组件
大约 5 分钟
前言
在我们编写业务代码的时候,避免不了使用异步任务来拆解任务,避免阻塞主流程, 提高执行效率。一下是一些常见的做法:
- 使用spring事件机制拆解
- 使用mq拆解
- 使用java异步线程拆解
但是上面都有一些比较常见的弊端:“专人专用”、代码量较多、执行失败无法感知只有通过日志排查。
现需要一个通用异步组件,比较快捷的实现异步处理,拆分任务。
目的
优点
- 基本对代码无入侵,使用独立的数据库,独立定时任务,独立消息队列,独立的人工执行界面,消息告警执行异常。
- 使用spring事务事件机制,即使异步策略解析失败也不会影响业务。
- 如果你的方法正在运行事务,会等事务提交后或回滚后再处理事件。
- 事务提交了,异步策略解析失败了,兜底方案定时补偿执行。
最终目的实现数据的一致性。
原理
- 容器初始化bean完成后遍历所有方法,把有@AsyncExec注解的方法缓存起来。(注意:是接口方法)
- 方法运行时通过AOP切面发布事件
- 事务事件监听处理异步执行策略
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)
<font style="color:rgb(30, 107, 184);">fallbackExecution=true</font>
<font style="color:rgb(30, 107, 184);">TransactionPhase.AFTER_COMPLETION</font>
组件
设计模式
原理流程图
数据库脚本
必备两张数据库表:
- async_log 记录执行日志。
- async_req 记录执行的方法和参数。
/*
Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 80031 (8.0.31)
Source Host : localhost:3306
Source Schema : fc_async
Target Server Type : MySQL
Target Server Version : 80031 (8.0.31)
File Encoding : 65001
Date: 15/05/2024 14:05:16
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for async_log
-- ----------------------------
DROP TABLE IF EXISTS `async_log`;
CREATE TABLE `async_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`async_id` bigint NOT NULL,
`error_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
`create_time` datetime NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Table structure for async_req
-- ----------------------------
DROP TABLE IF EXISTS `async_req`;
CREATE TABLE `async_req` (
`id` bigint NOT NULL AUTO_INCREMENT,
`application_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`sign` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`class_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`async_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
`exec_status` int NULL DEFAULT NULL,
`exec_count` int NULL DEFAULT '0',
`param_json` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
`remark` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL,
`create_time` datetime NULL DEFAULT NULL,
`update_time` datetime NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
异步策略
AsyncTypeEnum
执行类型 | 描述 | 兜底方案 |
---|---|---|
SAVE_ASYNC | 先保存数据库再异步消息处理 | 保存数据库失败 |
SYNC_SAVE | 先同步处理失败再保存数据库 | 保存数据库失败 |
ASYNC_SAVE | 先异步消息处理失败再保存数据库 | 保存数据库失败 |
ASYNC | 仅异步消息处理 | 发送消息失败 |
THREAD | 仅异步线程处理 |
安全级别
由高到低:SAVE_ASYNC -> SYNC_SAVE ->ASYNC_SAVE ->ASYNC - > THREAD 。
理论上,执行性能,则反之。
执行状态
入库数据,能否顺利运行,执行状态如下:
ExecStatusEnum
字段名称 | 字段值 | 描述 |
---|---|---|
INIT | 0 | 初始化 |
ERROR | 1 | 执行失败 |
SUCCESS | 2 | 执行成功 |
执行流程图
比较复杂一点的是,先保存数据库再异步消息处理的执行逻辑。
其他 执行策略方式,失败处理逻辑基本一致,下面是保存数据库再异步消息处理的执行图。
使用
配置信息
# 开关
async.enabled=true
# 数据库配置
async.datasource.driver-class-name=com.mysql.jdbc.Driver
async.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=true
async.datasource.username=root
async.datasource.password=123456
# 线程池参数设置
async.executor.thread.corePoolSize=10
async.executor.thread.maxPoolSize=50
async.executor.thread.queueCapacity=10000
async.executor.thread.keepAliveSeconds=600
# 是否开启物理删除
async.exec.deleted=true
# mq主体
async.topic=kchedule-async
# 重试次数设置
async.exec.count=5
# 重试条数限制
async.retry.limit=100
async.comp.limit=100
# 模块名称
spring.application.name=kchedule-async
# 告警企业微信机器人 key
async.webhook.key=
# 人工处理页面
async.monitor.url=
开启使用
- 开关
async.enabled=true
在需要异步执行的方法加注解 (必须是spring代理方法)
这里,一定必须是spring代理方法, 否则获取不到。
@AsyncExec(type = AsyncTypeEnum.SAVE_ASYNC, remark = "测试异步")
- 人工处理页面
async.monitor.url=
自我对接界面展示数据
- 企业微信告警
注意事项
spring.application.name
${async.topic:${spring.application.name}}_async_queue
自定义topic:<font style="color:rgb(30, 107, 184);">async.topic=xxx</font>