首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal的BinLogFileQueue

聊聊canal的BinLogFileQueue

作者头像
code4it
发布2020-04-24 19:51:19
4310
发布2020-04-24 19:51:19
举报

本文主要研究一下canal的BinLogFileQueue

BinLogFileQueue

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

public class BinLogFileQueue {

    private String              baseName       = "mysql-bin.";
    private List<File>          binlogs        = new ArrayList<File>();
    private File                directory;
    private ReentrantLock       lock           = new ReentrantLock();
    private Condition           nextCondition  = lock.newCondition();
    private Timer               timer          = new Timer(true);
    private long                reloadInterval = 10 * 1000L;           // 10秒
    private CanalParseException exception      = null;

    public BinLogFileQueue(String directory){
        this(new File(directory));
    }

    public BinLogFileQueue(File directory){
        this.directory = directory;

        if (!directory.canRead()) {
            throw new CanalParseException("Binlog index missing or unreadable;  " + directory.getAbsolutePath());
        }

        List<File> files = listBinlogFiles();
        for (File file : files) {
            offer(file);
        }

        timer.scheduleAtFixedRate(new TimerTask() {

            public void run() {
                try {
                    // File errorFile = new File(BinLogFileQueue.this.directory,
                    // errorFileName);
                    // if (errorFile.isFile() && errorFile.exists()) {
                    // String text = StringUtils.join(IOUtils.readLines(new
                    // FileInputStream(errorFile)), "\n");
                    // exception = new CanalParseException(text);
                    // }
                    List<File> files = listBinlogFiles();
                    for (File file : files) {
                        offer(file);
                    }
                } catch (Throwable e) {
                    exception = new CanalParseException(e);
                }

                if (exception != null) {
                    offer(null);
                }
            }
        }, reloadInterval, reloadInterval);
    }

    private List<File> listBinlogFiles() {
        List<File> files = new ArrayList<File>();
        files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {

            public boolean accept(File file) {
                Pattern pattern = Pattern.compile("\\d+$");
                Matcher matcher = pattern.matcher(file.getName());
                return file.getName().startsWith(baseName) && matcher.find();
            }

            public boolean accept(File dir, String name) {
                return true;
            }
        }, null));
        // 排一下序列
        Collections.sort(files, new Comparator<File>() {

            public int compare(File o1, File o2) {
                return o1.getName().compareTo(o2.getName());
            }

        });
        return files;
    }

    private boolean offer(File file) {
        try {
            lock.lockInterruptibly();
            if (file != null) {
                if (!binlogs.contains(file)) {
                    binlogs.add(file);
                    nextCondition.signalAll();// 唤醒
                    return true;
                }
            }

            nextCondition.signalAll();// 唤醒
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } finally {
            lock.unlock();
        }
    }

    //......

    /**
     * 获取当前所有binlog文件
     */
    public List<File> currentBinlogs() {
        return new ArrayList<File>(binlogs);
    }

    public void destory() {
        try {
            lock.lockInterruptibly();
            timer.cancel();
            binlogs.clear();

            nextCondition.signalAll();// 唤醒线程,通知退出
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    //......
}
  • BinLogFileQueue的构造器通过listBinlogFiles加载directory目录下的以baseName开头的文件并按文件名排序,然后挨个执行offer方法,最后使用timer定时调度执行listBinlogFiles及offer方法;其currentBinlogs返回binlogs文件列表;其destory方法取消timer,然后清空binlogs

waitForNextFile

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.java

public class BinLogFileQueue {

    //......

    public File waitForNextFile(File pre) throws InterruptedException {
        try {
            lock.lockInterruptibly();
            if (binlogs.size() == 0) {
                nextCondition.await();// 等待新文件
            }

            if (exception != null) {
                throw exception;
            }
            if (pre == null) {// 第一次
                return binlogs.get(0);
            } else {
                int index = seek(pre);
                if (index < binlogs.size() - 1) {
                    return binlogs.get(index + 1);
                } else {
                    nextCondition.await();// 等待新文件
                    return waitForNextFile(pre);// 唤醒之后递归调用一下
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private int seek(File file) {
        for (int i = 0; i < binlogs.size(); i++) {
            File binlog = binlogs.get(i);
            if (binlog.getName().equals(file.getName())) {
                return i;
            }
        }

        return -1;
    }

    public File getNextFile(File pre) {
        try {
            lock.lockInterruptibly();
            if (exception != null) {
                throw exception;
            }

            if (binlogs.size() == 0) {
                return null;
            } else {
                if (pre == null) {// 第一次
                    return binlogs.get(0);
                } else {
                    int index = seek(pre);
                    if (index < binlogs.size() - 1) {
                        return binlogs.get(index + 1);
                    } else {
                        return null;
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

    public File getBefore(File file) {
        try {
            lock.lockInterruptibly();
            if (exception != null) {
                throw exception;
            }

            if (binlogs.size() == 0) {
                return null;
            } else {
                if (file == null) {// 第一次
                    return binlogs.get(binlogs.size() - 1);
                } else {
                    int index = seek(file);
                    if (index > 0) {
                        return binlogs.get(index - 1);
                    } else {
                        return null;
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

    //......

}
  • BinLogFileQueue还提供了waitForNextFile方法,它根据指定文件的index,找下一个binlog文件,找不到则通过nextCondition.await();它还提供了getNextFile方法,该方法根据指定文件找下一个binlog文件,找不到则返回null,不等待;它还提供了getBefore方法,该方法根据指定文件找上一个binlog文件,找不到则返回null

LocalBinLogConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.java

public class LocalBinLogConnection implements ErosaConnection {

    private static final Logger logger     = LoggerFactory.getLogger(LocalBinLogConnection.class);
    private BinLogFileQueue     binlogs    = null;
    private boolean             needWait;
    private String              directory;
    private int                 bufferSize = 16 * 1024;
    private boolean             running    = false;
    private long                serverId;
    private FileParserListener  parserListener;

    public LocalBinLogConnection(){
    }

    public LocalBinLogConnection(String directory, boolean needWait){
        this.needWait = needWait;
        this.directory = directory;
    }

    @Override
    public void connect() throws IOException {
        if (this.binlogs == null) {
            this.binlogs = new BinLogFileQueue(this.directory);
        }
        this.running = true;
    }

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        File current = new File(directory, binlogfilename);

        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        try {
            fetcher.open(current, binlogPosition);
            context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
            while (running) {
                boolean needContinue = true;
                LogEvent event = null;
                while (fetcher.fetch()) {
                    event = decoder.decode(fetcher, context);
                    if (event == null) {
                        continue;
                    }
                    if (serverId != 0 && event.getServerId() != serverId) {
                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
                    }

                    if (!func.sink(event)) {
                        needContinue = false;
                        break;
                    }
                }

                fetcher.close(); // 关闭上一个文件
                parserFinish(current.getName());
                if (needContinue) {// 读取下一个

                    File nextFile;
                    if (needWait) {
                        nextFile = binlogs.waitForNextFile(current);
                    } else {
                        nextFile = binlogs.getNextFile(current);
                    }

                    if (nextFile == null) {
                        break;
                    }

                    current = nextFile;
                    fetcher.open(current);
                    context.setLogPosition(new LogPosition(nextFile.getName()));
                } else {
                    break;// 跳出
                }
            }
        } catch (InterruptedException e) {
            logger.warn("LocalBinLogConnection dump interrupted");
        } finally {
            if (fetcher != null) {
                fetcher.close();
            }
        }
    }

    //......

}
  • LocalBinLogConnection的connect方法创建BinLogFileQueue,其dump方法创建FileLogFetcher,然后使用while执行fetcher.fetch(),然后通过LogDecoder来解析数据,然后通过SinkFunction的sink方法来消费事件,fetch完之后执行fetcher.close(),之后通过binlogs.waitForNextFile(current)方法获取下一个binlog文件,替换current,然后执行fetcher.open(current)及context.setLogPosition(new LogPosition(nextFile.getName())),继续while循环fetcher.fetch()消费事件

小结

  • BinLogFileQueue的构造器通过listBinlogFiles加载directory目录下的以baseName开头的文件并按文件名排序,然后挨个执行offer方法,最后使用timer定时调度执行listBinlogFiles及offer方法;其currentBinlogs返回binlogs文件列表;其destory方法取消timer,然后清空binlogs
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BinLogFileQueue
  • waitForNextFile
  • LocalBinLogConnection
  • 小结
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档