当前项目需要一个拨测系统来检测服务是否正常运行,拨测系统需要满足以下需求:
拨测系统原理上就是定时检查服务,那是否可以偷懒,拿开源的定时任务系统来改造呢。基于这种想法,在研究多个开源项目之后,选择了 xxl-job (当前版本 2.2.1 )。
以上是 xxl-job 添加定时任务的界面,先修改 jobinfo.index.ftl 文件,隐藏掉跟 监控 无关的字段,隐藏的字段相当于采用了默认值。效果如下:
这里保留了:
BEAN+JobHandler 即可满足, 当拨测的接口协议很复杂,无法使用通用的拨测方法时,这里可以选择 GUL 模式来自定义请求脚本。 拨测类别 ,当前仅提供了 接口HTTP拨测 ,后续会添加其他的 拨测类型 ,比如 redis检查 。 拨测的URL 。 在 executor 项目开发处理器(xxl-job分为 admin 和 executor 两个项目, admin 是管理、分发, executor 执行定时任务业务逻辑),拨测URL等参数由 任务参数 来输入,这里选择 json 作为输入格式,定义了几个 json字段 :
POST
HTPP拨测Handler代码:
@Component
@Slf4j
public class DialTestHandler {
private WebClient webClient;
public DialTestHandler() {
webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
// 允许重定向
HttpClient.create().followRedirect(true)
)).build();
}
/**
* @param param
* @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
* @author
* @date
*/
@XxlJob("httpDialTestHandler")
public ReturnT<String> httpDialTestHandler(String param) {
Stopwatch stopwatch = Stopwatch.createStarted();
XxlJobLogger.log("begin httpDialTestHandler, param: {}", param);
try {
DialTestConfig dialTestConfig = JSON.parseObject(param, DialTestConfig.class);
if (StringUtils.isBlank(dialTestConfig.getUrl())) {
throw new Exception("url required!");
}
if (StringUtils.isBlank(dialTestConfig.getMethod())) {
dialTestConfig.setMethod("POST");
}
Mono<String> resultMono = webClient.method(HttpMethod.valueOf(dialTestConfig.getMethod())).uri(dialTestConfig.getUrl())
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8)
.bodyValue(Optional.ofNullable(dialTestConfig.getPostBody()).orElse(""))
.retrieve()
.onStatus(status -> !status.is2xxSuccessful(), resp -> {
Mono<String> body = resp.body(BodyExtractors.toMono(String.class));
return body.flatMap(str -> {
log.error("request[{}]error,result: {}", dialTestConfig.getUrl(), str);
XxlJobLogger.log("request[{}]error,result: {}", dialTestConfig.getUrl(), str);
// 业务返回的错误
if (StringUtils.isNotBlank(str)) {
throw new RequestException(str, resp.statusCode().value());
}
throw new RequestException(resp.statusCode().getReasonPhrase(), resp.statusCode().value());
});
}).bodyToMono(String.class)
.doOnError(Exception.class, err -> {
throw new RequestException(err.getMessage(), -1);
});
// 这里不做超时限制,由xxl-job任务的超时时间来打断(interrupt)
// String result = resultMono.block(Duration.ofSeconds(2000));
String result = resultMono.block();
if (!CollectionUtils.isEmpty(dialTestConfig.getResultKeys())) {
for (String key : dialTestConfig.getResultKeys()) {
if (!result.contains(key)) {
throw new Exception(String.format("request[%s]error,result not contain: %s", dialTestConfig.getUrl(), key));
}
}
}
log.info("httpDialTestHandler success, param: {}", param);
XxlJobLogger.log("httpDialTestHandler success, param: {}", param);
return ReturnT.SUCCESS;
} catch (RequestException re) {
log.error("httpDialTestHandler RequestException, param: {}, code: {}", param, re.getCode(), re);
XxlJobLogger.log("httpDialTestHandler error, param: {}, code: {}, RequestException: {}", param, re.getCode(), re.getMessage());
} catch (Exception e) {
log.error("httpDialTestHandler Exception, param: {}", param, e);
XxlJobLogger.log("httpDialTestHandler error, param: {}, Exception: {}", param, e.getMessage());
} finally {
stopwatch.stop();
XxlJobLogger.log("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.info("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
return ReturnT.FAIL;
}
}
@Data
public class DialTestConfig {
private String url;
/**
* POST/GET/...
*/
private String method;
/**
* 返回结果关键字
*/
private List<String> resultKeys;
/**
* POST Body
*/
private String postBody;
}
public class RequestException extends RuntimeException {
public static final int DEFAULT_ERROR_CODE = HttpStatus.INTERNAL_SERVER_ERROR.value();
private int code = DEFAULT_ERROR_CODE;
public RequestException(String message) {
super(message);
}
public RequestException(int code) {
this.code = code;
}
public RequestException(String message, int code) {
super(message);
this.code = code;
}
public RequestException(String message, Throwable cause, int code) {
super(message, cause);
this.code = code;
}
public RequestException(Throwable cause, int code) {
super(cause);
this.code = code;
}
public RequestException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace, int code) {
super(message, cause, enableSuppression, writableStackTrace);
this.code = code;
}
public int getCode() {
return code;
}
public RequestException setCode(int code) {
this.code = code;
return this;
}
}
代码逻辑很简单:
resultKeys ,则轮询判断接口是否包含了 key ,如果没有包含,任务失败。 配置示例如下(为了展示的更直观,配置参数了包含了所有配置项):
这里报警邮件里填的并不是邮件,而是企业微信名,是因为后面也改造了监控,告警时同时发送企业微信和邮件,详细后面会讲到。
当拨测接口协议比较复杂时,可以自定义请求脚本,而不是使用现有的 BEAN:httpDialTestHandler 模式。在允许模式选择 GLUE(XXX) ,比如 GLUE(java) ,然后在列表页面选择 GLUE IDE 编辑脚本,脚本里可以引入 执行器executor 项目的类,因为类最终会是在 executor 项目上运行。
注:当前版本的 GLUE(java) 不支持 lambda 语法。
告警逻辑在 admin 项目, xxl在任务失败重试时,失败几次(含重试次数),就会告警几次,这里改造成重试的最后一次(含不重试的任务)才告警。
告警逻辑在 JobFailMonitorHelper 类。
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
改造成
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0 && log.getExecutorFailRetryCount() == 0) {
log.getExecutorFailRetryCount() 返回的是 当前任务剩余重试次数 。
同样的告警内容里的 失败重试次数 的值从 剩余重试次数 改为 当前已重试次数 。
告警内容逻辑在 XxlJobTrigger.processTrigger 方法里。
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
改为
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(jobInfo.getExecutorFailRetryCount() - finalFailRetryCount);
企业微信
xxl-job只有邮件告警,这里加上企业微信告警。
从 JobAlarmer 类看到 XxlJob 是通过 applicationContext.getBeansOfType(JobAlarm.class) 获取告警类别列表,所以只需要写一个自定义 JobAlarm 实现类并注册成spring的Bean既可,告警内容可以在邮件内容上做减法。
@Component
public class RtxJobAlarm implements JobAlarm {
private static Logger logger = LoggerFactory.getLogger(RtxJobAlarm.class);
/**
* fail alarm
*
* @param jobLog
*/
@Override
public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog) {
boolean alarmResult = true;
if (info != null && info.getAlarmEmail() != null && info.getAlarmEmail().trim().length() > 0) {
// alarmContent
String alarmContent = "Alarm Job LogId=" + jobLog.getId();
// 这里xxl的triggerMsg里已经使用了<br>,所以追加内容时也先用<br>,后面再统一换成企业微信的/n
alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();
if (jobLog.getHandleCode() > 0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();
}
String title = I18nUtil.getString("jobconf_monitor");
List<String> mails = Arrays.asList(info.getAlarmEmail().split(","));
try {
MessageUtil.postMessage(Lists.newArrayList(MessageTypeEnum.RTX),
mails,
title,
info.getJobDesc() + "/n" + alarmContent.replaceAll("<br>", "/n"));
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job, job fail alarm rtx send error, JobLogId:{}", jobLog.getId(), e);
alarmResult = false;
}
}
return alarmResult;
}
}
邮件
前面添加任务时,有讲到 报警邮件 填的不是邮件,而是企业微信名,所以在发送邮件时,需要补上 企业邮箱后缀 。
参考:
分布式任务调度平台XXL-JOB