导航
导航
文章目录
  1. 一、CDC概念
  2. 二、Postgresql CDC 方案
  3. 三、JAVA 示例

Postgresql-CDC 方案踩坑

一、CDC概念

​ CDC(Changing Data Capture)意思是捕捉变化的数据,用流的方式持续捕捉.这与ETC有着本质上的区别,ETL则是定时拉取数据.

​ ETL 概念上是:抽取(E)、转换(T)、导入(L),ETL现在是比较成熟的,方案也比较多.但定时抽取就会意味着有时效性的问题,如果有一种方案,数据库数据出现更改就自动同步到OLAP引擎里面,岂不是美滋滋?那么CDC就是为此而生的,持续不断的监听数据库的变化情况,一旦变化就立马发出消息进行同步消息,但这种方案也并非完美,如遇到大事务的SQL,批量更新的这种,也会有延迟的问题,权衡一下估计两者差不多.

​ 市面上常用的数据库有Mysql,PostgreSql等,Mysql的CDC方案比较多,通过监听binlog实现.而Postgresql的CDC方案则比较少,至少从百度(🐶️)上找的资料来看.

二、Postgresql CDC 方案

​ Postgresql 实现CDC是通过逻辑复制实现的,与Mysql 的binlog有异曲同工之处.详情请看官方文档.只要在pgsql的配置文件中wal_level属性设置为logical就配置好了,再在数据库中创建订阅,以及复制槽,以上操作比较简单,不再记录,详情查看官方文档.

​ 在pgsql中开启了逻辑复制,但还差一个逻辑解码输出插件.市面上解码插件有3款,当然也可以自己写解码插件.

以上3款插件,笔者都试过,最终选择了官方的方案,用起来还是挺爽的.只不过该消息有单独的格式,要自己去解析.具体格式可以参考官方文档,http://postgres.cn/docs/11/protocol-logicalrep-message-formats.html.

三、JAVA 示例

要使用官方的插件分以下几个步骤:

  1. 创建订阅

    1
    2
    3
    CREATE PUBLICATION test FOR TABLE ONLY "user_info" 
    WITH (publish = 'insert,update,delete');
    -- 为表user_info创建名称为test的订阅,发布insert,update,delete消息
  1. 创建复制槽

    1
    2
    CREATE_REPLICATION_SLOT test TEMPORARY LOGICAL pgoutput;
    -- 创建名称为test的复制槽
  2. Java demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    String url = "jdbc:postgresql://localhost:5432/test";
    Properties props = new Properties();
    PGProperty.USER.set(props, "postgres");
    PGProperty.PASSWORD.set(props, "postgres");
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection con = DriverManager.getConnection(url, props);
    PGConnection replConnection = con.unwrap(PGConnection.class);

    //some changes after create replication slot to demonstrate receive it
    sqlConnection.setAutoCommit(true);
    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table(name) values('first tx changes')");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("update test_logic_table set name = 'second tx change' where pk = 1");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("delete from test_logic_table where pk = 1");
    st.close();

    PGReplicationStream stream =
    pgConnection.getReplicationAPI()
    .replicationStream()
    .logical()
    .withSlotName("test")
    .withSlotOption("proto_version", 1)
    .withSlotOption("publication_names", "test")
    .withStartPosition(lastLsn)
    .withStatusInterval(Math.toIntExact(Duration.ofSeconds(10).toMillis()), TimeUnit.MILLISECONDS)
    .start();

    while (true) {
    //non blocking receive message
    ByteBuffer msg = stream.readPending();
    if (msg == null) {
    TimeUnit.MILLISECONDS.sleep(10L);
    continue;
    }
    int offset = msg.arrayOffset();
    byte[] source = msg.array();
    int length = source.length - offset;
    System.out.println(new String(source, offset, length));

    //feedback
    stream.setAppliedLSN(stream.getLastReceiveLSN());
    stream.setFlushedLSN(stream.getLastReceiveLSN());
    }

关于java示例中的用法,可以查看jdbc的文档:https://jdbc.postgresql.org/documentation/head/replication.html

支持一下
扫一扫,请我吃颗大白兔奶糖
  • 支付宝扫一扫