一、CDC概念
CDC(Changing Data Capture)意思是捕捉变化的数据,用流的方式持续捕捉.这与ETC有着本质上的区别,ETL则是定时拉取数据.
ETL 概念上是:抽取(E)、转换(T)、导入(L),ETL现在是比较成熟的,方案也比较多.但定时抽取就会意味着有时效性的问题,如果有一种方案,数据库数据出现更改就自动同步到OLAP引擎里面,岂不是美滋滋?那么CDC就是为此而生的,持续不断的监听数据库的变化情况,一旦变化就立马发出消息进行同步消息,但这种方案也并非完美,如遇到大事务的SQL,批量更新的这种,也会有延迟的问题,权衡一下估计两者差不多.
市面上常用的数据库有Mysql,PostgreSql等,Mysql的CDC方案比较多,通过监听binlog实现.而Postgresql的CDC方案则比较少,至少从百度(🐶️)上找的资料来看.
二、Postgresql CDC 方案
Postgresql 实现CDC是通过逻辑复制实现的,与Mysql 的binlog有异曲同工之处.详情请看官方文档.只要在pgsql的配置文件中wal_level
属性设置为logical
就配置好了,再在数据库中创建订阅,以及复制槽,以上操作比较简单,不再记录,详情查看官方文档.
在pgsql中开启了逻辑复制,但还差一个逻辑解码输出插件.市面上解码插件有3款,当然也可以自己写解码插件.
-
逻辑解码插件列表
-
wal2json
这款插件会把消息解码成json格式,方便于消费端进行消费.一个事务一条消息,github 地址:https://github.com/eulerto/wal2json.
优点:
消息是json格式方便使用
缺点:
对于大事务消息,比如一次性修改几十万条数据,会耗尽内存.
没有现场的插件,需要自行编译.在centos上进行编译有点点麻烦.
java端的消费者代码可以参考迪士尼的项目:https://github.com/disneystreaming/pg2k4j
-
decoderbufs
消息格式为protobuf,比json省带宽.一个事务多条消息.github地址:https://github.com/xstevens/decoderbufs,现在这个插件原作者已经不维护了.可以用debezium维护的版本https://github.com/debezium/postgres-decoderbufs.
优点:
大事务不会耗尽内存,效率比json高
缺点:
在centos上不好编译,编译劝退.服务器不建议使用centos,免得各种编译问题.(关键是编译好了,在java端一读取消息就崩,至今没找到原因,退了,退了.)
java端的消费者代码可以参考debezium项目中的connector-postgresql:https://github.com/debezium/debezium
-
pgoutput
该插件是官方的,只能适用于10+的版本,如果是10以下的版本还是劝退吧.该插件用起来目前感觉比以上两款都要爽,不用编译,官方自带,没有内存耗尽的问题.
java端的消费者代码可以参考debezium项目中的connector-postgresql:https://github.com/debezium/debezium
以上3款插件,笔者都试过,最终选择了官方的方案,用起来还是挺爽的.只不过该消息有单独的格式,要自己去解析.具体格式可以参考官方文档,http://postgres.cn/docs/11/protocol-logicalrep-message-formats.html.
-
三、JAVA 示例
要使用官方的插件分以下几个步骤:
-
创建订阅
CREATE PUBLICATION test FOR TABLE ONLY "user_info" WITH (publish = 'insert,update,delete'); -- 为表user_info创建名称为test的订阅,发布insert,update,delete消息
-
创建复制槽
CREATE_REPLICATION_SLOT test TEMPORARY LOGICAL pgoutput; -- 创建名称为test的复制槽
-
Java demo
String url = "jdbc:postgresql://localhost:5432/test"; Properties props = new Properties(); PGProperty.USER.set(props, "postgres"); PGProperty.PASSWORD.set(props, "postgres"); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); PGProperty.REPLICATION.set(props, "database"); PGProperty.PREFER_QUERY_MODE.set(props, "simple"); Connection con = DriverManager.getConnection(url, props); PGConnection replConnection = con.unwrap(PGConnection.class); //some changes after create replication slot to demonstrate receive it sqlConnection.setAutoCommit(true); Statement st = sqlConnection.createStatement(); st.execute("insert into test_logic_table(name) values('first tx changes')"); st.close(); st = sqlConnection.createStatement(); st.execute("update test_logic_table set name = 'second tx change' where pk = 1"); st.close(); st = sqlConnection.createStatement(); st.execute("delete from test_logic_table where pk = 1"); st.close(); PGReplicationStream stream = pgConnection.getReplicationAPI() .replicationStream() .logical() .withSlotName("test") .withSlotOption("proto_version", 1) .withSlotOption("publication_names", "test") .withStartPosition(lastLsn) .withStatusInterval(Math.toIntExact(Duration.ofSeconds(10).toMillis()), TimeUnit.MILLISECONDS) .start(); while (true) { //non blocking receive message ByteBuffer msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; System.out.println(new String(source, offset, length)); //feedback stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); }
关于java示例中的用法,可以查看jdbc的文档:https://jdbc.postgresql.org/documentation/head/replication.html