pgsql 数据库消息同步

pgsql 数据库消息同步

一、前言

最近遇到一个需求,当数据库中的数据发生改变时,要进行业务同步到统计表中去,最开始是想用触发器进行实现,奈何没有数据库大牛,触发器写起来有点麻烦.调研了一下发现pgsql有类似mysql的binlog的消息机制.那么就可以嘿嘿嘿了.

二、配置

要实现消息同步需要使用pgsql的replication slots机制,在postgresql.conf配置文件中设置

1
2
3
wal_level = logical
max_wal_senders=1
max_replication_slots=1

这样就打开了pgsql的逻辑复制机制.还需要配置一个wal插件,把wal日志转成json字符串,方便程序解析.这里使用的插件是wal2json.

  • wal2json 配置

    1
    2
    3
    4
    git clone https://github.com/eulerto/wal2json.git
    cd wal2json
    USE_PGXS=1 make
    USE_PGXS=1 make install

    以上就是编译wal2json的步骤,可能会遇到环境的问题导致编译失败,最简单的解决办法是修改wal2json源码中的Makefile 文件中的PG_CONFIG这个变量的值,改为本地pg_config命令的路径.注意在编译的时候要实现安装好g++ 这些依赖.

  • pgsql配置

    修改pgsql配置文件postgresql.conf,使其wal2json插件生效

    1
    shared_preload_libraries = 'wal2json'

    重启数据库

  • wal2json测试

    1
    2
    3
    4
    #创建slot -d数据库 --slot名称 -P 插件名称
    pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
    #接受slot的数据 并打印到console上 -o 插件参数,参数说明可以参考https://github.com/eulerto/wal2json
    pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -f -

    一切顺利的话,只要在指定的数据库上进行crud就会看到控制台输出的json

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    {
    "change": [
    ]
    }
    {
    "change": [
    ]
    }
    {
    "change": [
    {
    "kind": "insert",
    "schema": "public",
    "table": "table_with_pk",
    "columnnames": ["a", "b", "c"],
    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    "columnvalues": [1, "Backup and Restore", "2018-03-27 11:58:28.988414"]
    }
    ,{
    "kind": "insert",
    "schema": "public",
    "table": "table_with_pk",
    "columnnames": ["a", "b", "c"],
    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    "columnvalues": [2, "Tuning", "2018-03-27 11:58:28.988414"]
    }
    ,{
    "kind": "insert",
    "schema": "public",
    "table": "table_with_pk",
    "columnnames": ["a", "b", "c"],
    "columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
    "columnvalues": [3, "Replication", "2018-03-27 11:58:28.988414"]
    }
    ,{
    "kind": "delete",
    "schema": "public",
    "table": "table_with_pk",
    "oldkeys": {
    "keynames": ["a", "c"],
    "keytypes": ["integer", "timestamp without time zone"],
    "keyvalues": [1, "2018-03-27 11:58:28.988414"]
    }
    }
    ,{
    "kind": "delete",
    "schema": "public",
    "table": "table_with_pk",
    "oldkeys": {
    "keynames": ["a", "c"],
    "keytypes": ["integer", "timestamp without time zone"],
    "keyvalues": [2, "2018-03-27 11:58:28.988414"]
    }
    }
    ,{
    "kind": "insert",
    "schema": "public",
    "table": "table_without_pk",
    "columnnames": ["a", "b", "c"],
    "columntypes": ["integer", "numeric(5,2)", "text"],
    "columnvalues": [1, 2.34, "Tapir"]
    }
    ]
    }
  • 删除slot

    1
    pg_recvlogical -d postgres --slot test_slot --drop-slot

验证slot是否创建成功以及slot里面有没有消息,可以使用一下的sql语句

1
2
3
4
--查询数据库中的slot
select * from pg_replication_slots;
--查询slot中的消息
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');

三、解析消息

其实解析还算是比较简单的,在pgsql中的jdbc中提供了slot读取消息的api,直接调用即可.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  //以下实例代码来自官方demo https://jdbc.postgresql.org/documentation/head/replication.html
String url = "jdbc:postgresql://localhost:5432/test";
Properties props = new Properties();
PGProperty.USER.set(props, "postgres");
PGProperty.PASSWORD.set(props, "postgres");
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");

Connection con = DriverManager.getConnection(url, props);
PGConnection replConnection = con.unwrap(PGConnection.class);

replConnection.getReplicationAPI()
.createReplicationSlot()
.logical()
.withSlotName("demo_logical_slot")
.withOutputPlugin("test_decoding")
.make();

//some changes after create replication slot to demonstrate receive it
sqlConnection.setAutoCommit(true);
Statement st = sqlConnection.createStatement();
st.execute("insert into test_logic_table(name) values('first tx changes')");
st.close();

st = sqlConnection.createStatement();
st.execute("update test_logic_table set name = 'second tx change' where pk = 1");
st.close();

st = sqlConnection.createStatement();
st.execute("delete from test_logic_table where pk = 1");
st.close();

PGReplicationStream stream =
replConnection.getReplicationAPI()
.replicationStream()
.logical()
.withSlotName("demo_logical_slot")
.withSlotOption("include-xids", false)
.withSlotOption("skip-empty-xacts", true)
.withStatusInterval(20, TimeUnit.SECONDS)
.start();

while (true) {
//non blocking receive message
ByteBuffer msg = stream.readPending();

if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}

int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
System.out.println(new String(source, offset, length));

//feedback
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
}

当然也可以直接抄迪士尼开源的库pg2k4j,开源的东西怎么能说抄呢.

NOTE

有个问题需要注意一下,开启逻辑复制,可能造成wal日志积压,导致服务器磁盘占用量大,这个问题虽然还没遇到,先占个坑.

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×