- 尼恩:百亿级海量任务调度平台起源
- 百亿级海量任务调度平台
- 百亿级海量任务调度平台业务诉求:
- 海量百亿级任务调度平台主要特点和功能:
- 海量亿级任务调度平台解决方案:
- 海量百亿级任务调度平台的功能性诉求
- 海量百亿级任务调度平台的非功能性诉求
- 百亿级任务量和百万TPM 并发的架构方案
- 海量百亿级任务调度平台的实操
- XXL-JOB 入门学习
- 单体任务调度框架
- 分布式任务调度框架
- 方式一:手工部署XXL-Job 的5大步骤
- 方式二:通过编译源码部署 XXL-Job 的2大步骤
- 步骤1:解压的编译源码
- 步骤2:配置部署“调度中心”
- 方式三:通过Docker 镜像方式搭建调度中心
- 开发、配置、部署XXl-JOB“执行器项目”
- 步骤2:maven依赖
- 步骤3:执行器配置
- 步骤4:编写任务执行逻辑
- 步骤5:部署执行器项目:
- 步骤6:执行器集群(可选):
- 开发第一个任务“Hello World”
- XXL-Job 几种不同的任务模式
- “GLUE模式(Java)” 运行模式的开发第一个调度任务
- 步骤一:新建任务:
- 步骤二:“GLUE模式(Java)” 任务开发:
- 步骤三:触发执行:
- 步骤四:查看日志:
- 大规模任务的分片调度
- XXL-JOB的配置属性说明
- XXL-JOB实现原理
- 1. 调度中心(JobAdmin):
- 2. 执行器(JobExecutor):
- 工作流程:
- XXL-JOB 系统总体架构
- JobAdmin服务器启动流程
- JobAdmin服务器初始化总体流程
- 初始化JobTriggerPool触发器线程池
- 初始化registryOrRemoveThreadPool 线程池,剔除超时主机,更新主机列表
- 执行失败重试和告警(发邮件)
- 初始化callbackThreadPool回调线程池,更改丢失主机的任务状态为失败
- 启动统计日志线程
- 启动调度器
- 执行器Executor 启动流程
- 1 遍历XxlJob注解查找方法
- 2 启动客户端执行器
- XXL-JOB的服务注册
- 客户端
- 服务端
- XXL-JOB的 执行任务
- 服务端下发任务
- 客户端处理任务
- 客户端回调服务端
- 调度中心处理回调
- 执行子任务
- XXL-JOB的mysql分布式锁
- 百亿级海量任务调度平台的架构与实现
- 说在最后:有问题找老架构取经
MTTR 是指 Mean Time to Repair,即平均修复时间。 在系统运维和故障处理领域,MTTR 是一个重要的性能指标,用于衡量系统故障发生后,从故障发生到恢复正常运行所花费的平均时间。 MTTR 的计算方式为: 𝑀𝑇𝑇𝑅=总的修复时间/发生故障次数 MTTR 的值越小越好,因为它反映了系统修复故障的效率和速度。 较低的 MTTR 意味着系统能够更快地从故障中恢复,减少了系统的停机时间,提高了系统的可用性和稳定性。 在实际运维工作中,降低 MTTR 可以通过以下方式实现:
自动化故障检测和修复: 引入自动化工具和脚本,实现对常见故障的自动检测和修复,减少人工干预的时间和错误。 监控和告警系统: 配置有效的监控系统,及时发现系统异常和故障,通过告警通知运维人员进行快速响应和处理。 故障排查和诊断: 提前准备好故障排查的工具和流程,快速定位故障原因,并采取有效的措施进行修复。
| 功能 | quartz | elastic-job | xxl-job |
conf/application.properties 文件,配置数据库连接信息,包括数据库地址、用户名、密码等。docs/sql/tables_xxl_job.sql 文件中的 SQL 脚本,在数据库中创建 XXL-Job 所需的表结构。bin/startup.sh(Linux/Mac)或 bin/startup.cmd(Windows)启动 XXL-Job 服务。/xxl-job/doc/db/tables_xxl_job.sql
http://localhost:8080/xxl-job-admin。admin。xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
xxl-job-executor-sample-frameless:无框架版本;
/xxl-job/xxl-job-admin/src/main/resources/application.properties
### 调度中心JDBC链接:链接地址请保持和 2.1章节 所创建的调度数据库的地址一致
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
spring.mail.username=xxx@qq.com
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30
// Docker地址:https://hub.docker.com/r/xuxueli/xxl-job-admin/ (建议指定版本号)
docker pull xuxueli/xxl-job-admin
/**
* 如需自定义 mysql 等配置,可通过 "-e PARAMS" 指定,参数格式 PARAMS="--key=value --key2=value2" ;
* 配置项参考文件:/xxl-job/xxl-job-admin/src/main/resources/application.properties
* 如需自定义 JVM内存参数 等配置,可通过 "-e JAVA_OPTS" 指定,参数格式 JAVA_OPTS="-Xmx512m" ;
*/
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本}
/xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
/xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/java/com/xxl/job/executor/core/config/XxlJobConfig.java
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
xxl-job-executor-sample-springboot:项目编译打包成springboot类型的可执行JAR包,命令启动即可;
xxl-job-executor-sample-frameless:项目编译打包成JAR包,命令启动即可;
GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并"groovy"源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务; GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段"shell"脚本; GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段"python"脚本; GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段"php"脚本; GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段"nodejs"脚本; GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段"PowerShell"脚本;
static Map<Integer, String> singleMachineMultiTasks = new HashMap<>();
static {
singleMachineMultiTasks.put(1, "武汉");
singleMachineMultiTasks.put(2, "222");
singleMachineMultiTasks.put(3, "北京");
singleMachineMultiTasks.put(4, "444");
singleMachineMultiTasks.put(5, "上海");
singleMachineMultiTasks.put(6, "666");
}
……
@Override
public void afterPropertiesSet() throws Exception {
//利用静态声明的只会加载一次的特性,初始化一个单例对象。
adminConfig = this;
//初始化xxjob调度器
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
public void init() throws Exception {
//1. init i18n
initI18n();
//2. admin trigger pool start 初始化下发任务(触发器)线程池
JobTriggerPoolHelper.toStart();
//3. admin registry monitor run 初始化注册中心线程池,心跳30秒一次
JobRegistryHelper.getInstance().start();
//4. admin lose-monitor run ( depend on JobTriggerPoolHelper ) 初始化失败重试和告警(发邮件)线程,间隔10s执行一次
JobFailMonitorHelper.getInstance().start();
//5. admin lose-monitor run ( depend on JobTriggerPoolHelper ) 初始化回调线程池,初始化监控线程,记录运行了10分钟,执行器不在线的任务日志
JobCompleteHelper.getInstance().start();
//6. admin log report start 6. 初始化统计日志线程,间隔1分钟执行一次,统计3天日志,应该是管控台要用,如果设置了日志保留天数,清除日志
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper ) 执行调度器
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
public static void toStart() {
helper.start();
}
public void start(){
//最大200线程,最多处理1000任务
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
//最大100线程,最多处理2000任务
//一分钟内超时10次,则采用慢触发器执行
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
registryOrRemoveThreadPool 线程池主要用于执行器的注册和注销操作。下面是它的主要功能和特点:registryOrRemoveThreadPool 线程池负责处理执行器注册的请求,确保注册操作的可靠性和及时性。registryOrRemoveThreadPool 线程池也负责处理执行器注销的请求,确保注销操作的及时性和有效性。registryOrRemoveThreadPool 线程池支持并发处理多个注册和注销请求,根据系统的负载情况和注册/注销请求的数量,动态调整线程池的大小,以提高系统的并发处理能力。registryOrRemoveThreadPool 线程池的稳定运行对于保障任务调度的正常进行非常重要。它保证了注册和注销操作的及时处理,确保任务调度系统的可靠性和稳定性。registryOrRemoveThreadPool 线程池实现了异常处理机制,当注册或注销操作发生异常时,会进行相应的异常处理,如记录日志、发送告警等,保证系统的稳定运行。registryOrRemoveThreadPool 线程池在 XXL-JOB 中起着至关重要的作用,它保证了执行器注册和注销操作的可靠性和及时性,是任务调度系统的重要组成部分。public void start(){
// for registry or remove
//初始化注册或者删除线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {
boolean result = false;
if (jobAlarmList!=null && jobAlarmList.size()>0) {
result = true; // success means all-success
for (JobAlarm alarm: jobAlarmList) {
boolean resultItem = false;
try {
resultItem = alarm.doAlarm(info, jobLog);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!resultItem) {
result = false;
}
}
}
return result;
}
public void start(){
// for callback 针对回调函数处理的线程池
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// finish 若父任务正常结束,则终止子任务,以及设置Childmsg
finishJob(xxlJobLog);
// text最大64kb 避免长度过长 截断超过长度限制字符
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
}
// fresh handle 更新超时joblog
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}
public void start(){
logrThread = new Thread(new Runnable() {
//每分钟刷新一次
@Override
public void run() {
// last clean log time 记录上次清除日志时间
long lastCleanLogTime = 0;
while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// today 分别统计今天,昨天,前天0~24点的数据
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
//下5秒之后执行一次,等待服务器启动。
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
//xxjob管理地址
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
//注册应用名称
xxlJobSpringExecutor.setAppname(appname);
//xxl作业执行器注册表地址
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
//注册地址的token
xxlJobSpringExecutor.setAccessToken(accessToken);
//日志路径
xxlJobSpringExecutor.setLogPath(logPath);
//日志保存天数
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) 初始化调度器资源管理器
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory 刷新GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) 初始化调度器资源管理器(从spring容器中将标记了XxlJob注解的方法,将其封装并添加到map中。)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory 刷新GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
执行器启动,主要做了如下事情:
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);//初始化日志文件
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken); //初始化admin链接路径存储集合
// init JobLogFileCleanThread 清除过期日志
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server 执行内嵌服务
initEmbedServer(address, ip, port, appname, accessToken);
}
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
}
// mk base dir 日志文件不存在则创建
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();
// mk glue dir 创建glue目录
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) { //多个admin地址以,分隔
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
} //将admin地址以及token添加adminBiz中
adminBizList.add(adminBiz);
}
}
}
}
public void start(final long logRetentionDays){
// limit min value 日志最大保存天数<3天,直接退出
if (logRetentionDays < 3 ) {
return;
}
//一天执行一次
public void start() {
// valid 是否有配置admin路径
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port 若没设置端口,则寻找可用端口
port = port>0?port: NetUtil.findAvailablePort(9999);
//若没设置IP,则获取本机Ip
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address 构造地址,若没设置地址,则将ip,port拼接成地址
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start 启动嵌入服务器 ,向服务端注册,以及监听端口,主要服务于服务端调用。
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
public void startRegistry(final String appname, final String address) {
// start registry
ExecutorRegistryThread.getInstance().start(appname, address);
}
public void start(final String appname, final String address){
// valid appname不允许为null
if (appname==null || appname.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
return;
}//服务端地址不能为null
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registry(registryParam);
}
public ReturnT<String> registry(RegistryParam registryParam) {
// valid 校验
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute 异步注册
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() { //更新修改时间
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {//说明暂未数据,才新增
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh 空实现
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param 设置默认值
if (executorParam == null) {
executorParam = "";
}
//触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
return new ReturnT<String>(addressList.get(0));
}
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//获取业务执行器地址,就执行器地址后面拼接token
//通过post请求客户端执行job
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
//返回结果设置msg
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
//从缓冲中通过地址获取ExecutorBiz
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache 找不到就新建
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
//添加缓存
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);//解析请求数据
String uri = msg.uri();//获取uri,后面通过uri来处理不同的请求
HttpMethod httpMethod = msg.method();//获取请求方式,Post/Get
boolean keepAlive = HttpUtil.isKeepAlive(msg);//保持长连接
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json 响应结果转json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid 不是POST直接返回异常
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
//获取job绑定线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
//获取作业处理器
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);//启动新线程处理工作任务
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
//存储jobId与绑定工作的线程
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {//中断并删除旧线程
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat 若包含,则说明重复执行
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
public void execute() throws Exception {
//利用反射调用方法
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
@Override
public void execute() throws Exception {
Class<?>[] paramTypes = method.getParameterTypes();
if (paramTypes.length > 0) {
method.invoke(target, new Object[paramTypes.length]); // method-param can not be primitive-types
} else {
method.invoke(target);
}
}
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat 若包含,则说明重复执行
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error 获取admin地址
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try { //回调admin,返回执行结果
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
callbackThreadPool.execute(new Runnable() {
@Override
public void run() {
for (HandleCallbackParam handleCallbackParam: callbackParamList) {
ReturnT<String> callbackResult = callback(handleCallbackParam);
logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
}
}
});
return ReturnT.SUCCESS;
}
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item 加载log
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
if (log == null) { //日志没找到返回异常
return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
}
if (log.getHandleCode() > 0) { //说明重复执行
return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc
}
// handle msg
StringBuffer handleMsg = new StringBuffer();
if (log.getHandleMsg()!=null) {
handleMsg.append(log.getHandleMsg()).append("<br>");
}
if (handleCallbackParam.getHandleMsg() != null) {
handleMsg.append(handleCallbackParam.getHandleMsg());
}
// success, save log 更改
log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getHandleCode()); //更改处理状态,200正常,500错误
log.setHandleMsg(handleMsg.toString());
XxlJobCompleter.updateHandleInfoAndFinish(log);
return ReturnT.SUCCESS;
}
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// finish 若父任务正常结束,则终止子任务,以及设置Childmsg
finishJob(xxlJobLog);
// text最大64kb 避免长度过长 截断超过长度限制字符
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
}
// fresh handle 更新joblog
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}
private static void finishJob(XxlJobLog xxlJobLog){
// 1、handle success, to trigger child job
String triggerChildMsg = null;
if (XxlJobContext.HANDLE_COCE_SUCCESS == xxlJobLog.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
//获取子任务id
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
if (childJobId > 0) {
//触发子任务
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
// add msg
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
(i+1),
childJobIds.length,
childJobIds[i],
(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
triggerChildResult.getMsg());
} else {
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
(i+1),
childJobIds.length,
childJobIds[i]);
}
}
}
}
if (triggerChildMsg != null) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg );
}
// 2、fix_delay trigger next
// on the way
}
基于数据库的集群方案,数据库选用Mysql;集群分布式并发环境中进行定时任务调度时,会在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
public class JobScheduleHelper {
private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
private static JobScheduleHelper instance = new JobScheduleHelper();
public static JobScheduleHelper getInstance(){
return instance;
}
实现职业转型,极速上岸
实现架构转型,再无中年危机