聊聊skywalking的ServiceResetCommand

本文主要研究一下skywalking的ServiceResetCommand

ServiceResetCommand

skywalking-6.6.0/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ServiceResetCommand.java

public class ServiceResetCommand extends BaseCommand implements Serializable, Deserializable<ServiceResetCommand> {
    public static final Deserializable<ServiceResetCommand> DESERIALIZER = new ServiceResetCommand("");
    public static final String NAME = "ServiceMetadataReset";

    public ServiceResetCommand(String serialNumber) {
        super(NAME, serialNumber);
    }

    @Override
    public Command.Builder serialize() {
        return commandBuilder();
    }

    @Override
    public ServiceResetCommand deserialize(Command command) {
        final List<KeyStringValuePair> argsList = command.getArgsList();
        String serialNumber = null;
        for (final KeyStringValuePair pair : argsList) {
            if ("SerialNumber".equals(pair.getKey())) {
                serialNumber = pair.getValue();
                break;
            }
        }
        return new ServiceResetCommand(serialNumber);
    }
}
  • ServiceResetCommand继承了BaseCommand,实现了Serializable、Deserializable接口

InstancePing.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/register/InstancePing.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.register.v2";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";

service ServiceInstancePing {
    rpc doPing (ServiceInstancePingPkg) returns (Commands) {
    }
}

message ServiceInstancePingPkg {
    int32 serviceInstanceId = 1;
    int64 time = 2;
    string serviceInstanceUUID = 3;
}
  • InstancePing.proto定义了ServiceInstancePing服务,它定义了doPing方法

ServiceInstancePingServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v6/grpc/ServiceInstancePingServiceHandler.java

public class ServiceInstancePingServiceHandler extends ServiceInstancePingGrpc.ServiceInstancePingImplBase implements GRPCHandler {
    private static final Logger logger = LoggerFactory.getLogger(ServiceInstancePingServiceHandler.class);

    private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
    private final IServiceInventoryRegister serviceInventoryRegister;
    private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
    private final CommandService commandService;

    public ServiceInstancePingServiceHandler(ModuleManager moduleManager) {
        this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
        this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
        this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
        this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class);
    }

    @Override public void doPing(ServiceInstancePingPkg request, StreamObserver<Commands> responseObserver) {
        int serviceInstanceId = request.getServiceInstanceId();
        long heartBeatTime = request.getTime();
        serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime);

        ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
        if (Objects.nonNull(serviceInstanceInventory)) {
            serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
            responseObserver.onNext(Commands.getDefaultInstance());
        } else {
            logger.warn("Can't find service by service instance id from cache," +
                " service instance id is: {}, will send a reset command to agent side", serviceInstanceId);

            final ServiceResetCommand resetCommand = commandService.newResetCommand(request.getServiceInstanceId(), request.getTime(), request.getServiceInstanceUUID());
            final Command command = resetCommand.serialize().build();
            final Commands nextCommands = Commands.newBuilder().addCommands(command).build();
            responseObserver.onNext(nextCommands);
        }

        responseObserver.onCompleted();
    }
}
  • ServiceInstancePingServiceHandler继承了ServiceInstancePingGrpc.ServiceInstancePingImplBase,实现了GRPCHandler接口;其构造器获取serviceInstanceInventoryCache、serviceInventoryRegister、serviceInstanceInventoryRegister、commandService;其doPing方法执行serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime),若serviceInstanceInventoryCache.get(serviceInstanceId)为nul则给agent发送ServiceResetCommand

ServiceResetCommandExecutor

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ServiceResetCommandExecutor.java

public class ServiceResetCommandExecutor implements CommandExecutor {
    private static final ILog LOGGER = LogManager.getLogger(ServiceResetCommandExecutor.class);

    @Override
    public void execute(final BaseCommand command) throws CommandExecutionException {
        LOGGER.warn("Received ServiceResetCommand, a re-register task is scheduled.");

        ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown();

        RemoteDownstreamConfig.Agent.SERVICE_ID = DictionaryUtil.nullValue();
        RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID = DictionaryUtil.nullValue();
        RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME = DictionaryUtil.nullValue();

        NetworkAddressDictionary.INSTANCE.clear();
        EndpointNameDictionary.INSTANCE.clear();
    }
}
  • ServiceResetCommandExecutor实现了CommandExecutor接口,其execute方法接收ServiceResetCommand,然后执行ServiceManager.INSTANCE.findService(ServiceAndEndpointRegisterClient.class).coolDown(),重置RemoteDownstreamConfig.Agent.SERVICE_ID、RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID、RemoteDownstreamConfig.Agent.INSTANCE_REGISTERED_TIME,清空NetworkAddressDictionary.INSTANCE、EndpointNameDictionary.INSTANCE

小结

ServiceInstancePingServiceHandler继承了ServiceInstancePingGrpc.ServiceInstancePingImplBase,实现了GRPCHandler接口;其构造器获取serviceInstanceInventoryCache、serviceInventoryRegister、serviceInstanceInventoryRegister、commandService;其doPing方法执行serviceInstanceInventoryRegister.heartbeat(serviceInstanceId, heartBeatTime),若serviceInstanceInventoryCache.get(serviceInstanceId)为nul则给agent发送ServiceResetCommand

doc

  • ServiceInstancePingServiceHandler

原文 

https://segmentfault.com/a/1190000022186842

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 聊聊skywalking的ServiceResetCommand

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址