diff --git a/src/main/java/neatlogic/framework/scheduler/core/JobBase.java b/src/main/java/neatlogic/framework/scheduler/core/JobBase.java index f96d0fec7badefd7f0bd988e54315f66e58a3a19..78733f73912a342f58a26d1f3ccc300aa10615cb 100644 --- a/src/main/java/neatlogic/framework/scheduler/core/JobBase.java +++ b/src/main/java/neatlogic/framework/scheduler/core/JobBase.java @@ -27,7 +27,6 @@ import neatlogic.framework.scheduler.annotation.Param; import neatlogic.framework.scheduler.annotation.Prop; import neatlogic.framework.scheduler.dao.mapper.SchedulerMapper; import neatlogic.framework.scheduler.dto.*; -import neatlogic.framework.scheduler.exception.ScheduleHandlerNotFoundException; import neatlogic.framework.scheduler.exception.ScheduleIllegalParameterException; import neatlogic.framework.scheduler.exception.ScheduleParamNotExistsException; import neatlogic.framework.transaction.util.TransactionUtil; @@ -42,13 +41,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionSynchronizationManager; import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; /** @@ -96,13 +93,14 @@ public abstract class JobBase implements IJob { // 如果锁的状态是running状态,证明其他节点已经在执行,直接返回 if (jobLockVo.getLock().equals(JobLockVo.RUNNING) && !jobLockVo.getServerId().equals(Config.SCHEDULE_SERVER_ID)) { jobLockVo = null; + } else { + // 修改锁状态 + jobLockVo.setServerId(Config.SCHEDULE_SERVER_ID); + jobLockVo.setLock(JobLockVo.RUNNING); + schedulerMapper.updateJobLock(jobLockVo); } - } - if (jobLockVo != null) { - // 修改锁状态 - jobLockVo.setServerId(Config.SCHEDULE_SERVER_ID); - jobLockVo.setLock(JobLockVo.RUNNING); - schedulerMapper.updateJobLock(jobLockVo); + } else { + logger.error(String.format("执行定时作业(jobName=%s,jobGroup=%s)时,`schedule_job_lock`表中没有对应数据", jobName, jobGroup)); } } finally { transactionUtil.commitTx(ts); @@ -166,6 +164,10 @@ public abstract class JobBase implements IJob { @Override public final void execute(JobExecutionContext context) throws JobExecutionException { + if (TransactionSynchronizationManager.isSynchronizationActive()) { + logger.error(String.format("定时作业%s类的execute()方法不应该开始事务", this.getClassName())); + return; + } InputFromContext.init(InputFrom.CRON); Date fireTime = new Date(); JobDetail jobDetail = context.getJobDetail(); @@ -176,6 +178,7 @@ public abstract class JobBase implements IJob { String tenantUuid = jobObject.getTenantUuid(); //如果租户不存在则不执行该租户的作业 if (!TenantUtil.hasTenant(tenantUuid)) { + logger.error(String.format("执行定时作业(jobName=%s,jobGroup=%s)时,租户:%s不存在", jobName, jobGroup, tenantUuid)); return; } // 从job组名中获取租户uuid,切换到租户的数据源 @@ -185,19 +188,26 @@ public abstract class JobBase implements IJob { IJob jobHandler = SchedulerManager.getHandler(this.getClassName()); if (jobHandler == null) { schedulerManager.unloadJob(jobObject); - throw new ScheduleHandlerNotFoundException(jobObject.getJobHandler()); + logger.error(String.format("执行定时作业(jobName=%s,jobGroup=%s)时,定时作业组件:%s不存在", jobName, jobGroup, this.getClassName())); + return; +// throw new ScheduleHandlerNotFoundException(jobObject.getJobHandler()); } if (!jobHandler.isHealthy(jobObject)) { // not healthy 不能unloadJob 否则会删除作业状态和锁,导致正常接管的server也无法跑作业。 // 例如:A Server 修改cron 每天0点跑作业,B Server 修改cron每分钟跑。 当A Server 发现not healthy 则会unload 并删除status,lock。后续判断会导致B Server 也不会再跑作业。 // 应该有jobHandler来自行判断是否需要unloadJob //schedulerManager.unloadJob(jobObject); + logger.error(String.format("执行定时作业(jobName=%s,jobGroup=%s)时,isHealthy()方法检查出作业不正常", jobName, jobGroup)); return; } Date currentFireTime = context.getFireTime();// 本次执行激活时间 JobStatusVo beforeJobStatusVo = schedulerMapper.getJobStatusByJobNameGroup(jobName, jobGroup); + if (beforeJobStatusVo == null) { + logger.error(String.format("执行定时作业(jobName=%s,jobGroup=%s)时,`schedule_job_status`表中没有对应数据", jobName, jobGroup)); + return; + } // 如果数据库中记录的下次激活时间在本次执行激活时间之后,则放弃执行业务逻辑 - if (beforeJobStatusVo == null || (beforeJobStatusVo.getNextFireTime() != null && beforeJobStatusVo.getNextFireTime().after(currentFireTime))) { + if (beforeJobStatusVo.getNextFireTime() != null && beforeJobStatusVo.getNextFireTime().after(currentFireTime)) { return; } @@ -209,7 +219,7 @@ public abstract class JobBase implements IJob { } JobStatusVo oldJobStatusVo = schedulerMapper.getJobStatusByJobNameGroup(jobName, jobGroup); // 前后执行次数不一致,证明已经执行过,直接退出 - if (beforeJobStatusVo.getExecCount().intValue() != oldJobStatusVo.getExecCount().intValue()) { + if (!Objects.equals(beforeJobStatusVo.getExecCount(), oldJobStatusVo.getExecCount())) { return; } try { @@ -248,7 +258,11 @@ public abstract class JobBase implements IJob { schedulerMapper.updateJobAudit(auditVo); } } else { - jobHandler.executeInternal(context, jobObject); + try { + jobHandler.executeInternal(context, jobObject); + } catch (Exception ex) { + logger.error(ex.getMessage(), ex); + } } /* 异步模式,如果事务hold住时间太长,可以考虑使用异步模式,但作业的执行时间需要手动处理 diff --git a/src/main/java/neatlogic/framework/scheduler/dto/JobVo.java b/src/main/java/neatlogic/framework/scheduler/dto/JobVo.java index d9abcbb087809ebaaf826774f8a677635928249d..3be89ea1124916e3a5bc4ce65828d2f5cd91d436 100644 --- a/src/main/java/neatlogic/framework/scheduler/dto/JobVo.java +++ b/src/main/java/neatlogic/framework/scheduler/dto/JobVo.java @@ -56,6 +56,9 @@ public class JobVo extends BasePageVo { @EntityField(name = "结束时间", type = ApiParamType.LONG) private Date endTime; + @EntityField(name = "是否已加载", + type = ApiParamType.INTEGER) + private Integer isLoad; private JobStatusVo jobStatus; @@ -181,4 +184,11 @@ public class JobVo extends BasePageVo { return 0; } + public Integer getIsLoad() { + return isLoad; + } + + public void setIsLoad(Integer isLoad) { + this.isLoad = isLoad; + } }