聊聊debezium的BlockingReader

本文主要研究一下debezium的BlockingReader

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {

    public static enum State {
        /**
         * The reader is stopped and static.
         */
        STOPPED,

        /**
         * The reader is running and generated records.
         */
        RUNNING,

        /**
         * The reader has completed its work or been explicitly stopped, but not all of the generated records have been
         * consumed via {@link Reader#poll() polling}.
         */
        STOPPING;
    }

    public String name();

    public State state();

    public void uponCompletion(Runnable handler);

    public default void initialize() {
        // do nothing
    }

    public default void destroy() {
        // do nothing
    }

    public void start();

    public void stop();

    public List<SourceRecord> poll() throws InterruptedException;
}
  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

BlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java

public class BlockingReader implements Reader {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
    private final AtomicReference<State> state = new AtomicReference<>();
    private final Metronome metronome;

    private final String name;
    private final String runningLogMessage;

    public BlockingReader(String name, String runningLogMessage) {
        this.name = name;
        this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        this.runningLogMessage = runningLogMessage;

    }

    /**
     * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.
     */
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        if (state.get() == State.STOPPED) {
            return null;
        }

        metronome.pause();
        state.compareAndSet(State.RUNNING, State.STOPPING);

        return null;
    }

    @Override
    public State state() {
        return state.get();
    }

    @Override
    public void uponCompletion(Runnable handler) {
        assert this.uponCompletion.get() == null;
        this.uponCompletion.set(handler);
    }

    @Override
    public void start() {
        state.set(State.RUNNING);
        logger.info(runningLogMessage);
    }

    @Override
    public void stop() {
        try {
            state.set(State.STOPPED);

            // Cleanup Resources
            Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
            if (completionHandler != null) {
                completionHandler.run();
            }

        }
        finally {
            logger.info("Blocking Reader has completed.");
        }
    }

    @Override
    public String name() {
        return name;
    }

}
  • BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

TimedBlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java

public class TimedBlockingReader extends BlockingReader {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private final Duration timeout;
    private volatile Timer timer;

    /**
     * @param name Name of the reader
     * @param timeout Duration of time until this TimedBlockingReader should stop
     */
    public TimedBlockingReader(String name, Duration timeout) {
        super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
        this.timeout = timeout;
    }

    @Override
    public void start() {
        super.start();
        this.timer = Threads.timer(Clock.SYSTEM, timeout);
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        super.poll();

        // Stop when we've reached the timeout threshold
        if (timer != null && timer.expired()) {
            stop();
        }

        return null;
    }
}
  • TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null

小结

BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

doc

  • BlockingReader

原文 

https://segmentfault.com/a/1190000022656622

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 聊聊debezium的BlockingReader

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址