mysql binlog监听_mysql binlog 监听-程序员宅基地

技术标签: java  mysql  数据库  

背景

之前在做某个需求的时候用到了mysql binlog监听做相应业务处理的功能,比较简单所以采用了java的mysql-binlog-connector-java。
网上也有很多关于该插件的使用方法。但是到自己使用,上生产的时候又碰到几个问题。

问题

1.首先第一个,上线后发现binlog日志无法定位到最新的位置的问题。因为公司测试环境以磁盘空间有限,所以未开启binlog。所以我的测试只能在本地window完成,window安装的mysql serverId默认为1。在本地的测试都是正常,但是上生产以后就发现binlog无法定位到最新的文件,后面查了才发现需要设置好serverId,所以使用命令 :show variables like ‘server_id’,查阅线上数据库,并且在client初始化时设置就可以解决这个问题。
在这里插入图片描述
2.后面又发现根本无法监听各种mysql插入更新等操作,本地mysql使用的binlog_format=row,而线上使用的binlog_format=mixed,通过对比两种不同格式的binlog日志,可以发现row格式的binlog会记录所有的更新,插入事件为不同的事件类型,但是mixed格式的binlog只会记录Query类型事件,而所有的create,update,drop等操作都是该类型的事件,因此这里使用binlog=row格式去监听binlog,导致无法监听到插入更新的操作。考虑到线上binlog日志数量巨大无法轻易变更binlog格式,所以只能在代码层面对binlog_format=mixed进行相应的兼容。

需要用到的查询sql如下:
show master status;显示当前binlog文件和位置
show BINARY logs;显示所有的binlog文件,监听binlog需要开通权限
show binlog events in ‘binlog.002969’ 显示该binlog所有记录的event
show VARIABLES like ‘binlog_format’; 显示binlog格式
show variables like 'server_id’显示serverId

mixed格式:
在这里插入图片描述
row格式:
在这里插入图片描述
3.mixed格式的binlog_format只能去监听QUERY事件,想要监听某张表的话也只能对这个事件中的sql做相应的解析再做处理。

public class BinlogListener implements CommandLineRunner {
    

    private static final Logger logger = LoggerFactory.getLogger(BinlogListener.class);

    @Resource
    private BinlogConfig config;

    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(1), new ThreadFactoryImpl("BinlogListenerThread_"), new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    public void run(String... args) throws Exception {
    
        // 起一个新的线程与主线程隔离,防止run失败导致项目无法启动
        threadPool.submit(()->{
    
            List<String> tables = Arrays.asList(config.getTable().split(","));
            BinaryLogClient client = new BinaryLogClient(config.getHost(),config.getPort(),config.getUsername(),config.getPassword());
            EventDeserializer eventDeserializer = new EventDeserializer();
            client.setEventDeserializer(eventDeserializer);
            client.setServerId(config.getServerId());
            client.registerEventListener(event -> {
    
                EventHeader header = event.getHeader();
                EventType eventType = header.getEventType();
                // 监听数据表变动,这里由于线上binlog_format=mixed所有的数据更新事件都是QUERY
                if(EventType.QUERY == eventType){
    
                    QueryEventData data = event.getData();
                    String sql = data.getSql();
                    String tableName = getTableName(sql);
                    if(tables.contains(tableName)){
    
                        // 业务处理
                    }
                }
            });
            try {
    
                client.connect();
            } catch (IOException e) {
    
            }
        });
    }

    private String getTableName(String sql){
    
        if(sql.startsWith("insert") || sql.startsWith("INSERT")){
    
            String result = ReUtil.getGroup0("(?<=INSERT INTO |insert into )(.*?)(?= \\(|\\()",sql);
            if(StringUtils.isEmpty(result)){
    
                return result;
            }
            result = result.replace("'","").replace("`",""); 
            if(result.contains(".")){
    
                return result.split("\\.")[1];
            }else {
    
                return result;
            }
        }
        if(sql.startsWith("update") || sql.startsWith("UPDATE")){
    
            String result = ReUtil.getGroup0("(?<=UPDATE |update )(.*?)(?= |\\n)",sql);
            if(StringUtils.isEmpty(result)){
    
                return result;
            }
            result = result.replace("'","").replace("`","");
            if(result.contains(".")){
    
                return result.split("\\.")[1];
            }else {
    
                return result;
            }
        }
        if(sql.startsWith("delete") || sql.startsWith("DELETE")){
    
            String result = ReUtil.getGroup0("(?<=DELETE from |delete from )(.*?)(?= |\\n)",sql);
            if(StringUtils.isEmpty(result)){
    
                return result;
            }
            result = result.replace("'","").replace("`","");
            if(result.contains(".")){
    
                return result.split("\\.")[1];
            }else {
    
                return result;
            }
        }
        return null;
    }
}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Neil_11111/article/details/125602418

智能推荐

nvm use (node版本号)时报错: exit status 1: ��û���㹻��Ȩ��ִ�д˲�����_nvpuse-程序员宅基地

文章浏览阅读303次。nvm use (node版本号)时报错: exit status 1: ��û���㹻��Ȩ��ִ�д˲�����_nvpuse

Spark Streaming高级特性在NDCG计算实践_ndcg sql-程序员宅基地

文章浏览阅读618次。从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方式实现了实时处理框架。为进一步了解spark streaming的相关内容,飞马网于3月20日晚邀请到历任百度大数据的高级工程师—王富平,在线上直播中,王老师针对spark streaming高级特性..._ndcg sql

我第一份Python自动化测试工作能找到13k的工作,就是掌握了这些技术栈_python技术栈有哪些-程序员宅基地

文章浏览阅读791次。给大家总结我一年时间学了哪些python自动化测试技术:_python技术栈有哪些

postgresql 函数之游标、更新数据_错误: 无法以游标的形式打开查询update-程序员宅基地

文章浏览阅读1.1k次。postgresql 函数之游标、更新数据根据人员信息更新另一个表中的部门新CREATE OR REPLACE FUNCTION "public"."updatedeptcode"() RETURNS "pg_catalog"."varchar" AS $BODY$ DECLARE unbound_refcursor refcursor; --游标 deptCode VARCHAR(100); deptName VARCHAR(100); persPin VARCH_错误: 无法以游标的形式打开查询update

在线靶场-墨者-电子数据取证3星-日志文件分析溯源(时差)-程序员宅基地

文章浏览阅读147次。打开靶场,下载日志文件。打开日志文件,寻找目的ip地址根据题目意思,找到时间为9月27日14点23分40秒,访问过/admin/的ip把该ip地址输入靶场,即可得到本题的key。..._日志文件分析溯源(时差)

linux命令:vim_vim换行-程序员宅基地

vim是Linux下功能强大的文本编辑器。有三种模式:命令模式、编辑模式和底线命令模式。命令模式输入命令,编辑模式输入内容,底线命令模式用于保存和退出文件。

随便推点

matlab simulink基于自抗扰控制的机械臂位置仿真_matlab机械臂仿真-程序员宅基地

文章浏览阅读1.9k次。虽然叫做扩展状态观测器,但与普通的状态观测器不同。设计扩展状态观测器的目的就是观测扩展出来的状态变量,用来估计未知扰动和控制对象未建模部分,实现动态系统的反馈线性化,将控制对象变为积分串联型。应用于机械臂控制系统的设计方法主要包括PID控制、自适应控制和鲁棒控制等,然而由于它们自身所存在的缺陷,促使其与神经网络、模糊控制等算法相结合,一些新的控制方法也在涌现,很多算法是彼此结合在一起的。因而机械臂的建模模型也存在着不确定性,对于不同的任务,需要规划机械臂关节空间的运动轨迹,从而级联构成末端位姿。_matlab机械臂仿真

java移位运算符-程序员宅基地

文章浏览阅读279次。/*移位运算符:规律:一个操作数进行左移运算的时候,结果就是等于操作数乘以2的n次方,n就是左移 的位数.333>>(右移)规律:一个操作数在做右移运算的时候,实际上就是等于该操作数除以2的n次方,n就是右移的位数。3>>1 = 3 / 2(1) = 13>>2 = 3 / 2(2) = 0 。。>>>(无符号右移) :

X264 输出的统计值的含义(X264 Stats Output)_x264怎么输出每个参数的值-程序员宅基地

文章浏览阅读9.7k次,点赞6次,收藏13次。典型的x264输出如下:avis [info]: 1280x720 @ 1.77 fps (40997 frames)yuv4mpeg: 640x480@30/1fps, 0:0x264 [info]: 352x288 (given by file name) @ 25.00 fps取决于输入源会有不同,但大致形式一样,以上三行分别对应于avisynth, y4m, yuv输入。_x264怎么输出每个参数的值

addEventListener和on的区别-程序员宅基地

文章浏览阅读3.2k次,点赞2次,收藏6次。为什么需要addEventListener?先来看一个片段:html代码&lt;div id="box"&gt;追梦子&lt;/div&gt;用on的代码 1 window.onload = function(){ 2 var box = document.getElementById("box"); 3 box.onclick = function(){ 4 ..._addeventlistener和on的区别

干货 | 播音主持基本功解析(上)_吸提推送具体做法-程序员宅基地

文章浏览阅读168次。参加播音主持艺考的同学每天都要进行基本功训练,这也是学好播音主持的基石。播音主持基本功训练有五个阶段,今天为大家详细的讲解下前2个阶段训练的是什么,怎么练习!第一阶段:未曾出声先练气1、深吸慢呼气息控制延长练习其要领是:先学会“蓄气”,先压一下气,把废气排出,然后用鼻和舌尖间隙像“闻花”一样,自然松畅地轻轻吸,吸的要饱,然后气沉丹田,慢慢地放松胸肋,使气像细水长流般慢慢呼出,呼得均匀,控制时间越长越好,反复练习4-6次。2、深吸慢呼数字练习我们把第一步骤称为“吸提推送‘,”吸提“的气.._吸提推送具体做法

20190323-Excel使用手册-程序员宅基地

文章浏览阅读1.8k次。文章目录1 输入数据1.1常规数据的输入技巧1.1.1数据输入的方式1.1.2 输入数值型数据1.1.3 输入时间日期型数据1.1.4 输入文本型数据1.2 特殊数据的输入技巧1.2.1输入身份证号码1.2.2 输入特殊符号1.2.3 通过插入批注来为单元格添加注释1.3 高效数据的输入技巧1.3.1 输入有规律的数据(填充)1.3.2 输入相同数据1.3.3 自动输入数据1.3.4 输入有效性数..._excel使用手册

推荐文章

热门文章

相关标签