pgsql 数据库消息同步

 

一、前言

最近遇到一个需求,当数据库中的数据发生改变时,要进行业务同步到统计表中去,最开始是想用触发器进行实现,奈何没有数据库大牛,触发器写起来有点麻烦.调研了一下发现pgsql有类似mysql的binlog的消息机制.那么就可以嘿嘿嘿了.

二、配置

要实现消息同步需要使用pgsql的replication slots机制,在postgresql.conf配置文件中设置

wal_level = logical
max_wal_senders=1
max_replication_slots=1

这样就打开了pgsql的逻辑复制机制.还需要配置一个wal插件,把wal日志转成json字符串,方便程序解析.这里使用的插件是wal2json.

  • wal2json 配置
   git clone https://github.com/eulerto/wal2json.git
   cd wal2json
   USE_PGXS=1 make
   USE_PGXS=1 make install

以上就是编译wal2json的步骤,可能会遇到环境的问题导致编译失败,最简单的解决办法是修改wal2json源码中的Makefile 文件中的PG_CONFIG这个变量的值,改为本地pg_config命令的路径.注意在编译的时候要实现安装好g++ 这些依赖.

  • pgsql配置

    修改pgsql配置文件postgresql.conf,使其wal2json插件生效

    shared_preload_libraries = 'wal2json'
    

    重启数据库

  • wal2json测试

    #创建slot -d数据库 --slot名称 -P 插件名称
    pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
    #接受slot的数据 并打印到console上 -o 插件参数,参数说明可以参考https://github.com/eulerto/wal2json
    pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -
    

    一切顺利的话,只要在指定的数据库上进行crud就会看到控制台输出的json

    {
    	"change": [
    	]
    }
    {
    	"change": [
    	]
    }
    {
    	"change": [
    		{
    			"kind": "insert",
    			"schema": "public",
    			"table": "table_with_pk",
    			"columnnames": ["a", "b", "c"],
    			"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    			"columnvalues": [1, "Backup and Restore", "2018-03-27 11:58:28.988414"]
    		}
    		,{
    			"kind": "insert",
    			"schema": "public",
    			"table": "table_with_pk",
    			"columnnames": ["a", "b", "c"],
    			"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    			"columnvalues": [2, "Tuning", "2018-03-27 11:58:28.988414"]
    		}
    		,{
    			"kind": "insert",
    			"schema": "public",
    			"table": "table_with_pk",
    			"columnnames": ["a", "b", "c"],
    			"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    			"columnvalues": [3, "Replication", "2018-03-27 11:58:28.988414"]
    		}
    		,{
    			"kind": "delete",
    			"schema": "public",
    			"table": "table_with_pk",
    			"oldkeys": {
    				"keynames": ["a", "c"],
    				"keytypes": ["integer", "timestamp without time zone"],
    				"keyvalues": [1, "2018-03-27 11:58:28.988414"]
    			}
    		}
    		,{
    			"kind": "delete",
    			"schema": "public",
    			"table": "table_with_pk",
    			"oldkeys": {
    				"keynames": ["a", "c"],
    				"keytypes": ["integer", "timestamp without time zone"],
    				"keyvalues": [2, "2018-03-27 11:58:28.988414"]
    			}
    		}
    		,{
    			"kind": "insert",
    			"schema": "public",
    			"table": "table_without_pk",
    			"columnnames": ["a", "b", "c"],
    			"columntypes": ["integer", "numeric(5,2)", "text"],
    			"columnvalues": [1, 2.34, "Tapir"]
    		}
    	]
    }
    
  • 删除slot

    pg_recvlogical -d postgres --slot test_slot --drop-slot
    

验证slot是否创建成功以及slot里面有没有消息,可以使用一下的sql语句

--查询数据库中的slot
select * from pg_replication_slots;
--查询slot中的消息
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');

三、解析消息

其实解析还算是比较简单的,在pgsql中的jdbc中提供了slot读取消息的api,直接调用即可.

    //以下实例代码来自官方demo https://jdbc.postgresql.org/documentation/head/replication.html
		String url = "jdbc:postgresql://localhost:5432/test";
    Properties props = new Properties();
    PGProperty.USER.set(props, "postgres");
    PGProperty.PASSWORD.set(props, "postgres");
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection con = DriverManager.getConnection(url, props);
    PGConnection replConnection = con.unwrap(PGConnection.class);

    replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("demo_logical_slot")
        .withOutputPlugin("test_decoding")
        .make();

    //some changes after create replication slot to demonstrate receive it
    sqlConnection.setAutoCommit(true);
    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table(name) values('first tx changes')");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("update test_logic_table set name = 'second tx change' where pk = 1");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("delete from test_logic_table where pk = 1");
    st.close();

    PGReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .withStatusInterval(20, TimeUnit.SECONDS)
            .start();

    while (true) {
      //non blocking receive message
      ByteBuffer msg = stream.readPending();

      if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
      }

      int offset = msg.arrayOffset();
      byte[] source = msg.array();
      int length = source.length - offset;
      System.out.println(new String(source, offset, length));

      //feedback
      stream.setAppliedLSN(stream.getLastReceiveLSN());
      stream.setFlushedLSN(stream.getLastReceiveLSN());
    }

当然也可以直接抄迪士尼开源的库pg2k4j,开源的东西怎么能说抄呢.

NOTE

有个问题需要注意一下,开启逻辑复制,可能造成wal日志积压,导致服务器磁盘占用量大,这个问题虽然还没遇到,先占个坑.