聊聊debezium的BlockingReader

本文主要研究一下debezium的BlockingReader

Reader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java

public interface Reader {

    public static enum State {
        /**
         * The reader is stopped and static.
         */
        STOPPED,

        /**
         * The reader is running and generated records.
         */
        RUNNING,

        /**
         * The reader has completed its work or been explicitly stopped, but not all of the generated records have been
         * consumed via {@link Reader#poll() polling}.
         */
        STOPPING;
    }

    public String name();

    public State state();

    public void uponCompletion(Runnable handler);

    public default void initialize() {
        // do nothing
    }

    public default void destroy() {
        // do nothing
    }

    public void start();

    public void stop();

    public List<SourceRecord> poll() throws InterruptedException;
}
  • Reader接口定义了name、state、uponCompletion、start、stop、poll方法

BlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java

public class BlockingReader implements Reader {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
    private final AtomicReference<State> state = new AtomicReference<>();
    private final Metronome metronome;

    private final String name;
    private final String runningLogMessage;

    public BlockingReader(String name, String runningLogMessage) {
        this.name = name;
        this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        this.runningLogMessage = runningLogMessage;

    }

    /**
     * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.
     */
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        if (state.get() == State.STOPPED) {
            return null;
        }

        metronome.pause();
        state.compareAndSet(State.RUNNING, State.STOPPING);

        return null;
    }

    @Override
    public State state() {
        return state.get();
    }

    @Override
    public void uponCompletion(Runnable handler) {
        assert this.uponCompletion.get() == null;
        this.uponCompletion.set(handler);
    }

    @Override
    public void start() {
        state.set(State.RUNNING);
        logger.info(runningLogMessage);
    }

    @Override
    public void stop() {
        try {
            state.set(State.STOPPED);

            // Cleanup Resources
            Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
            if (completionHandler != null) {
                completionHandler.run();
            }

        }
        finally {
            logger.info("Blocking Reader has completed.");
        }
    }

    @Override
    public String name() {
        return name;
    }

}
  • BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

TimedBlockingReader

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TimedBlockingReader.java

public class TimedBlockingReader extends BlockingReader {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private final Duration timeout;
    private volatile Timer timer;

    /**
     * @param name Name of the reader
     * @param timeout Duration of time until this TimedBlockingReader should stop
     */
    public TimedBlockingReader(String name, Duration timeout) {
        super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
        this.timeout = timeout;
    }

    @Override
    public void start() {
        super.start();
        this.timer = Threads.timer(Clock.SYSTEM, timeout);
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        super.poll();

        // Stop when we've reached the timeout threshold
        if (timer != null && timer.expired()) {
            stop();
        }

        return null;
    }
}
  • TimedBlockingReader继承了BlockingReader,其start方法通过Threads.timer(Clock.SYSTEM, timeout)创建了Timer;其poll方法先执行父类的poll方法,然后在timer.expired()为true时执行stop(),最后返回null

小结

BlockingReader实现了Reader接口,其start方法设置state为State.RUNNING,其stop方法设置state为State.STOPPED,同时执行completionHandler.run();其poll方法在state为State.STOPPED直接返回null,否则执行metronome.pause(),然后设置state为State.STOPPED,最后返回null

doc

  • BlockingReader
原文  https://segmentfault.com/a/1190000022656622
正文到此结束
Loading...