转载

快速构建拨测系统

背景

当前项目需要一个拨测系统来检测服务是否正常运行,拨测系统需要满足以下需求:

  1. 支持对接口请求结果做判断。
  2. 支持对接口的耗时做判断。
  3. 支持重试:可能在某一瞬间网络出现了延迟,导致接口请求失败,所以需要重试,连续重试N次失败才算异常。
  4. 失败告警,可配置不同的告警接收人。
  5. 通用、可配置:支持各种场景的接口协议。
  6. 图表展示(可选)。

方案

拨测系统原理上就是定时检查服务,那是否可以偷懒,拿开源的定时任务系统来改造呢。基于这种想法,在研究多个开源项目之后,选择了 xxl-job (当前版本 2.2.1 )。

改造

1. 添加任务

改造任务界面

快速构建拨测系统

以上是 xxl-job 添加定时任务的界面,先修改 jobinfo.index.ftl 文件,隐藏掉跟 监控 无关的字段,隐藏的字段相当于采用了默认值。效果如下:

快速构建拨测系统

这里保留了:

  • 任务描述。
  • Cron:自定义配置执行间隔。
  • 运行模式:常规场景,通过 BEAN+JobHandler 即可满足, 当拨测的接口协议很复杂,无法使用通用的拨测方法时,这里可以选择 GUL 模式来自定义请求脚本。
  • JobHandler:选择 拨测类别 ,当前仅提供了 接口HTTP拨测 ,后续会添加其他的 拨测类型 ,比如 redis检查
  • 任务超时时间:可以当做接口性能检测,判断性能是否符合预期。
  • 失败重试次数:可能在某一瞬间网络出现了延迟,导致接口请求失败,所以需要重试,连续重试N次失败才算异常。
  • 负责人。
  • 报警邮件:拨测失败报警,天企业微信名,多个之间用','分隔。
  • 任务参数:输入拨测时需要的参数,比如 拨测的URL

开发接口拨测Handler

executor 项目开发处理器(xxl-job分为 adminexecutor 两个项目, admin 是管理、分发, executor 执行定时任务业务逻辑),拨测URL等参数由 任务参数 来输入,这里选择 json 作为输入格式,定义了几个 json字段

POST

HTPP拨测Handler代码:

@Component
@Slf4j
public class DialTestHandler {
    private WebClient webClient;
    public DialTestHandler() {
        webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(
                        // 允许重定向
                        HttpClient.create().followRedirect(true)
                )).build();
    }
    /**
     * @param param
     * @return com.xxl.job.core.biz.model.ReturnT<java.lang.String>
     * @author 
     * @date 
     */
    @XxlJob("httpDialTestHandler")
    public ReturnT<String> httpDialTestHandler(String param) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        XxlJobLogger.log("begin httpDialTestHandler, param: {}", param);
        try {
            DialTestConfig dialTestConfig = JSON.parseObject(param, DialTestConfig.class);
            if (StringUtils.isBlank(dialTestConfig.getUrl())) {
                throw new Exception("url required!");
            }
            if (StringUtils.isBlank(dialTestConfig.getMethod())) {
                dialTestConfig.setMethod("POST");
            }
            Mono<String> resultMono = webClient.method(HttpMethod.valueOf(dialTestConfig.getMethod())).uri(dialTestConfig.getUrl())
                    .contentType(MediaType.APPLICATION_JSON_UTF8)
                    .accept(MediaType.APPLICATION_JSON_UTF8)
                    .bodyValue(Optional.ofNullable(dialTestConfig.getPostBody()).orElse(""))
                    .retrieve()
                    .onStatus(status -> !status.is2xxSuccessful(), resp -> {
                        Mono<String> body = resp.body(BodyExtractors.toMono(String.class));
                        return body.flatMap(str -> {
                            log.error("request[{}]error,result: {}", dialTestConfig.getUrl(), str);
                            XxlJobLogger.log("request[{}]error,result: {}", dialTestConfig.getUrl(), str);
                            // 业务返回的错误
                            if (StringUtils.isNotBlank(str)) {
                                throw new RequestException(str, resp.statusCode().value());
                            }
                            throw new RequestException(resp.statusCode().getReasonPhrase(), resp.statusCode().value());
                        });
                    }).bodyToMono(String.class)
                    .doOnError(Exception.class, err -> {
                        throw new RequestException(err.getMessage(), -1);
                    });
            // 这里不做超时限制,由xxl-job任务的超时时间来打断(interrupt)
            // String result = resultMono.block(Duration.ofSeconds(2000));
            String result = resultMono.block();
            if (!CollectionUtils.isEmpty(dialTestConfig.getResultKeys())) {
                for (String key : dialTestConfig.getResultKeys()) {
                    if (!result.contains(key)) {
                        throw new Exception(String.format("request[%s]error,result not contain: %s", dialTestConfig.getUrl(), key));
                    }
                }
            }
            log.info("httpDialTestHandler success, param: {}", param);
            XxlJobLogger.log("httpDialTestHandler success, param: {}", param);
            return ReturnT.SUCCESS;
        } catch (RequestException re) {
            log.error("httpDialTestHandler RequestException, param: {}, code: {}", param, re.getCode(), re);
            XxlJobLogger.log("httpDialTestHandler error, param: {}, code: {}, RequestException: {}", param, re.getCode(), re.getMessage());
        } catch (Exception e) {
            log.error("httpDialTestHandler Exception, param: {}", param, e);
            XxlJobLogger.log("httpDialTestHandler error, param: {}, Exception: {}", param, e.getMessage());
        } finally {
            stopwatch.stop();
            XxlJobLogger.log("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS));
            log.info("finish httpDialTestHandler, param: {}, cost: {}", param, stopwatch.elapsed(TimeUnit.MILLISECONDS));
        }
        return ReturnT.FAIL;
    }
}
@Data
public class DialTestConfig {
    private String url;
    /**
     * POST/GET/...
     */
    private String method;
    /**
     * 返回结果关键字
     */
    private List<String> resultKeys;
    /**
     * POST Body
     */
    private String postBody;
}
public class RequestException extends RuntimeException {
    public static final int DEFAULT_ERROR_CODE = HttpStatus.INTERNAL_SERVER_ERROR.value();
    private int code = DEFAULT_ERROR_CODE;
    public RequestException(String message) {
        super(message);
    }
    public RequestException(int code) {
        this.code = code;
    }
    public RequestException(String message, int code) {
        super(message);
        this.code = code;
    }
    public RequestException(String message, Throwable cause, int code) {
        super(message, cause);
        this.code = code;
    }
    public RequestException(Throwable cause, int code) {
        super(cause);
        this.code = code;
    }
    public RequestException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace, int code) {
        super(message, cause, enableSuppression, writableStackTrace);
        this.code = code;
    }
    public int getCode() {
        return code;
    }
    public RequestException setCode(int code) {
        this.code = code;
        return this;
    }
}

代码逻辑很简单:

  1. 从任务参数里获取配置信息。
  2. 请求url。
  3. 当接口返回的HTTP STATUS不为200时,任务失败。
  4. 如果有配置 resultKeys ,则轮询判断接口是否包含了 key ,如果没有包含,任务失败。
  5. 任务完成。

通用配置

配置示例如下(为了展示的更直观,配置参数了包含了所有配置项):

快速构建拨测系统

这里报警邮件里填的并不是邮件,而是企业微信名,是因为后面也改造了监控,告警时同时发送企业微信和邮件,详细后面会讲到。

自定义请求脚本

当拨测接口协议比较复杂时,可以自定义请求脚本,而不是使用现有的 BEAN:httpDialTestHandler 模式。在允许模式选择 GLUE(XXX) ,比如 GLUE(java) ,然后在列表页面选择 GLUE IDE 编辑脚本,脚本里可以引入 执行器executor 项目的类,因为类最终会是在 executor 项目上运行。

快速构建拨测系统

注:当前版本的 GLUE(java) 不支持 lambda 语法。

2. 监控告警

告警次数

告警逻辑在 admin 项目, xxl在任务失败重试时,失败几次(含重试次数),就会告警几次,这里改造成重试的最后一次(含不重试的任务)才告警。

告警逻辑在 JobFailMonitorHelper 类。

if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
改造成
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0 && log.getExecutorFailRetryCount() == 0) {

log.getExecutorFailRetryCount() 返回的是 当前任务剩余重试次数

同样的告警内容里的 失败重试次数 的值从 剩余重试次数 改为 当前已重试次数

告警内容逻辑在 XxlJobTrigger.processTrigger 方法里。

triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
改为
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(jobInfo.getExecutorFailRetryCount() - finalFailRetryCount);

告警类别

企业微信

xxl-job只有邮件告警,这里加上企业微信告警。

JobAlarmer 类看到 XxlJob 是通过 applicationContext.getBeansOfType(JobAlarm.class) 获取告警类别列表,所以只需要写一个自定义 JobAlarm 实现类并注册成spring的Bean既可,告警内容可以在邮件内容上做减法。

@Component
public class RtxJobAlarm implements JobAlarm {
    private static Logger logger = LoggerFactory.getLogger(RtxJobAlarm.class);
    /**
     * fail alarm
     *
     * @param jobLog
     */
    @Override
    public boolean doAlarm(XxlJobInfo info, XxlJobLog jobLog) {
        boolean alarmResult = true;
        if (info != null && info.getAlarmEmail() != null && info.getAlarmEmail().trim().length() > 0) {
            // alarmContent
            String alarmContent = "Alarm Job LogId=" + jobLog.getId();
            // 这里xxl的triggerMsg里已经使用了<br>,所以追加内容时也先用<br>,后面再统一换成企业微信的/n
            alarmContent += "<br>TriggerMsg=<br>" + jobLog.getTriggerMsg();
            if (jobLog.getHandleCode() > 0 && jobLog.getHandleCode() != ReturnT.SUCCESS_CODE) {
                alarmContent += "<br>HandleCode=" + jobLog.getHandleMsg();
            }
            String title = I18nUtil.getString("jobconf_monitor");
            List<String> mails = Arrays.asList(info.getAlarmEmail().split(","));
            try {
                MessageUtil.postMessage(Lists.newArrayList(MessageTypeEnum.RTX),
                        mails,
                        title,
                        info.getJobDesc() + "/n" + alarmContent.replaceAll("<br>", "/n"));
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job, job fail alarm rtx send error, JobLogId:{}", jobLog.getId(), e);
                alarmResult = false;
            }
        }
        return alarmResult;
    }
}

邮件

前面添加任务时,有讲到 报警邮件 填的不是邮件,而是企业微信名,所以在发送邮件时,需要补上 企业邮箱后缀

参考:

分布式任务调度平台XXL-JOB

原文  https://segmentfault.com/a/1190000023258566
正文到此结束
Loading...