序
本文主要研究一下skywalking的metric-exporter
metric-exporter.proto
skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.skywalking.oap.server.exporter.grpc"; service MetricExportService { rpc export (stream ExportMetricValue) returns (ExportResponse) { } rpc subscription (SubscriptionReq) returns (SubscriptionsResp) { } } message ExportMetricValue { string metricName = 1; string entityName = 2; string entityId = 3; ValueType type = 4; int64 timeBucket = 5; int64 longValue = 6; double doubleValue = 7; } message SubscriptionsResp { repeated string metricNames = 1; } enum ValueType { LONG = 0; DOUBLE = 1; } message SubscriptionReq { } message ExportResponse { }
- metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法
GRPCExporterSetting
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
private String targetHost;
private int targetPort;
private int bufferChannelSize = 20000;
private int bufferChannelNum = 2;
}
- GRPCExporterSetting定义了targetHost、targetPort、bufferChannelSize、bufferChannelNum属性
GRPCExporterProvider
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
public class GRPCExporterProvider extends ModuleProvider {
private GRPCExporterSetting setting;
private GRPCExporter exporter;
@Override public String name() {
return "grpc";
}
@Override public Class<? extends ModuleDefine> module() {
return ExporterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
setting = new GRPCExporterSetting();
return setting;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
exporter = new GRPCExporter(setting);
this.registerServiceImplementation(MetricValuesExportService.class, exporter);
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();
exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));
exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));
exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));
exporter.initSubscriptionList();
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
- GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()
MetricFormatter
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java
@Setter
public class MetricFormatter {
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
protected String getEntityName(MetricsMetaInfo meta) {
int scope = meta.getScope();
if (DefaultScopeDefine.inServiceCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return serviceInventoryCache.get(entityId).getName();
} else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return serviceInstanceInventoryCache.get(entityId).getName();
} else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return endpointInventoryCache.get(entityId).getName();
} else if (scope == DefaultScopeDefine.ALL) {
return "";
} else {
return null;
}
}
}
- MetricFormatter提供了getEntityName方法,用于从MetricsMetaInfo提取entityName
MetricValuesExportService
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
public interface MetricValuesExportService extends Service {
/**
* This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
*
* @param event value is only accurate when the method invokes. Don't cache it.
*/
void export(ExportEvent event);
}
- MetricValuesExportService继承了Service,它定义了export方法
GRPCExporter
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<GRPCExporter.ExportData> { private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class); private GRPCExporterSetting setting; private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub; private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub; private final DataCarrier exportBuffer; private final Set<String> subscriptionSet; public GRPCExporter(GRPCExporterSetting setting) { this.setting = setting; GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort()); client.connect(); ManagedChannel channel = client.getChannel(); exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel); blockingStub = MetricExportServiceGrpc.newBlockingStub(channel); exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize()); exportBuffer.consume(this, 1, 200); subscriptionSet = new HashSet<>(); } @Override public void export(ExportEvent event) { if (ExportEvent.EventType.TOTAL == event.getType()) { Metrics metrics = event.getMetrics(); if (metrics instanceof WithMetadata) { MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta(); if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) { exportBuffer.produce(new ExportData(meta, metrics)); } } } } public void initSubscriptionList() { SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build()); subscription.getMetricNamesList().forEach(subscriptionSet::add); logger.debug("Get exporter subscription list, {}", subscriptionSet); } @Override public void init() { } @Override public void consume(List<ExportData> data) { if (data.size() == 0) { return; } ExportStatus status = new ExportStatus(); StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export( new StreamObserver<ExportResponse>() { @Override public void onNext(ExportResponse response) { } @Override public void onError(Throwable throwable) { status.done(); } @Override public void onCompleted() { status.done(); } } ); AtomicInteger exportNum = new AtomicInteger(); data.forEach(row -> { ExportMetricValue.Builder builder = ExportMetricValue.newBuilder(); Metrics metrics = row.getMetrics(); if (metrics instanceof LongValueHolder) { long value = ((LongValueHolder)metrics).getValue(); builder.setLongValue(value); builder.setType(ValueType.LONG); } else if (metrics instanceof IntValueHolder) { long value = ((IntValueHolder)metrics).getValue(); builder.setLongValue(value); builder.setType(ValueType.LONG); } else if (metrics instanceof DoubleValueHolder) { double value = ((DoubleValueHolder)metrics).getValue(); builder.setDoubleValue(value); builder.setType(ValueType.DOUBLE); } else { return; } MetricsMetaInfo meta = row.getMeta(); builder.setMetricName(meta.getMetricsName()); String entityName = getEntityName(meta); if (entityName == null) { return; } builder.setEntityName(entityName); builder.setEntityId(meta.getId()); builder.setTimeBucket(metrics.getTimeBucket()); streamObserver.onNext(builder.build()); exportNum.getAndIncrement(); }); streamObserver.onCompleted(); long sleepTime = 0; long cycle = 100L; /** * For memory safe of oap, we must wait for the peer confirmation. */ while (!status.isDone()) { try { sleepTime += cycle; Thread.sleep(cycle); } catch (InterruptedException e) { } if (sleepTime > 2000L) { logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime); cycle = 2000L; } } logger.debug("Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime); } @Override public void onError(List<ExportData> data, Throwable t) { logger.error(t.getMessage(), t); } @Override public void onExit() { } @Getter(AccessLevel.PRIVATE) public class ExportData { private MetricsMetaInfo meta; private Metrics metrics; public ExportData(MetricsMetaInfo meta, Metrics metrics) { this.meta = meta; this.metrics = metrics; } } private class ExportStatus { private boolean done = false; private void done() { done = true; } public boolean isDone() { return done; } } }
- GRPCExporter继承了MetricFormatter,实现了MetricValuesExportService、IConsumer接口;其构造器根据GRPCExporterSetting实例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,并创建DataCarrier,然后注册自身的IConsumer到exportBuffer;其export方法主要是执行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是构造ExportMetricValue,然后执行streamObserver.onNext
小结
metric-exporter.proto定义了MetricExportService服务,它有export、subscription两个rpc方法;GRPCExporterProvider继承了ModuleProvider,其prepare方法创建GRPCExporter,然后执行registerServiceImplementation;其notifyAfterCompleted方法主要是给exporter设置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然后执行exporter.initSubscriptionList()
doc
- metric-exporter
原文
https://segmentfault.com/a/1190000022147971
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 聊聊skywalking的metric-exporter