在 PostgreSQL10 中故障转移逻辑复制时,主键插入从零开始

在 PostgreSQL10 中故障转移逻辑复制时,主键插入从零开始

我有两个正在运行的 PostgreSQL 服务器,一个发布所有表,另一个通过逻辑复制订阅所有表。我目前正在测试故障转移解决方案,似乎如果我删除辅助服务器上的订阅者并将应用程序重定向到它,插入就会失败,因为每个表的主键都试图从 1 开始。

例如,我看到很多这样的内容:

Oct 25 15:48:10 icinga-master1 icinga2[29819]: Error "ERROR:  duplicate key value violates unique constraint "pk_customvariable_id"
                                                            DETAIL:  Key (customvariable_id)=(1) already exists.
                                                            " when executing query "INSERT INTO icinga_customvariables (config_type, instance_id, is_json, object_id, varname, varvalue) VALUES (E'1', 1, E'0', 2677, E'gfl_bmname', E'tomcat1_filenotfound')"

当尝试手动插入时,我在 postgres 中收到相同的错误。但是,我可以从辅助服务器导入 pg_dump,一切正常。这里我是否遗漏了某个设置?我还尝试过添加 recovery.conf 并使用 pg_ctl 进行升级,但这似乎阻止了订阅服务器的工作并导致相同的问题。也许我没有订阅我应该订阅的系统表?

我的 postgresql.conf 中每个都有以下 wal 设置:

wal_level = logical
hot_standby = on
hot_standby_feedback = on
max_logical_replication_workers = 8
max_sync_workers_per_subscription = 8

在主服务器上:

                       List of publications
Name    |  Owner   | All tables | Inserts | Updates | Deletes 
------------+----------+------------+---------+---------+---------
icinga_pub | postgres | t          | t       | t       | t

在辅助设备上(我在测试时确实将其设置为禁用):

             List of subscriptions
Name    |  Owner   | Enabled | Publication  
------------+----------+---------+--------------
icinga_sub | postgres | t       | {icinga_pub}

我删除了数据库,并在故障排除时重新开始辅助数据库,以保持一切正常。任何帮助都值得感激。

答案1

这是 PostgreSQL 10 中逻辑复制的已知限制。

以下是文档的摘录https://www.postgresql.org/docs/10/logical-replication-restrictions.html

序列数据不会被复制。序列支持的序列或标识列中的数据当然会作为表的一部分进行复制,但序列本身仍会在订阅服务器上显示起始值。如果订阅服务器用作只读数据库,那么这通常不会成为问题。但是,如果打算对订阅服务器数据库进行某种切换或故障转移,则需要将序列更新为最新值,方法是从发布服务器复制当前数据(可能使用 pg_dump)或从表本身确定足够高的值。

换句话说,您所看到的是预期的行为,尽管不一定是想要的行为。

我们有两个函数可以为我们重置序列。它们可能并不完美,但它们在我们的环境中有效。

CREATE OR REPLACE FUNCTION public.update_sequence(
    IN tabschema text,
    IN tabname text,
    OUT tschema text,
    OUT tname text,
    OUT pkname text,
    OUT seqname text,
    OUT startval bigint,
    OUT minval bigint,
    OUT maxval bigint,
    OUT incr bigint,
    OUT maxseq bigint,
    OUT lastval bigint,
    OUT newseq bigint,
    OUT prevcalled boolean)
  RETURNS record AS
$BODY$
DECLARE
  seq_offset CONSTANT bigint := 0;
  seq_range CONSTANT bigint := 9999999999999;
BEGIN
  tschema := tabschema;
  tname := tabname;

  -- protect against concurrent inserts while you update the counter
  EXECUTE format('LOCK TABLE %I.%I IN EXCLUSIVE MODE', tabschema, tabname);

  SELECT column_name, table_name||'_'||column_name||'_seq' FROM information_schema.columns WHERE column_default IS NOT NULL AND data_type = 'bigint' AND column_default ilike 'nextval(%_seq''::regclass)' AND table_schema = tabschema AND table_name = tabname INTO pkname, seqname;
  SELECT start_value, min_value, max_value, increment_by FROM pg_sequences WHERE schemaname = tabschema AND sequencename = seqname INTO startval, minval, maxval, incr;
  EXECUTE format('SELECT last_value, is_called FROM %I.%I', tabschema, seqname) INTO lastval, prevcalled;
  EXECUTE format('SELECT max(%I) FROM %I.%I WHERE %I between $1 AND $2', pkname, tabschema, tabname, pkname) USING seq_offset+1, seq_offset+seq_range INTO maxseq;
  newseq := CASE WHEN maxseq IS NULL THEN seq_offset+incr ELSE coalesce(greatest(maxseq+incr, CASE WHEN prevcalled THEN lastval+incr ELSE lastval END), seq_offset+incr) END;

  EXECUTE format('ALTER SEQUENCE %I.%I MINVALUE %s START %s RESTART %s MAXVALUE %s;', tabschema, seqname, seq_offset+1, seq_offset+1, newseq, seq_offset+seq_range);
END;
$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;


CREATE OR REPLACE FUNCTION public.update_all_sequences()
  RETURNS TABLE(tabschema text, tabname text, pkname text, seqname text, startval bigint, minval bigint, maxval bigint, incr bigint, maxseq bigint, lastval bigint, newseq bigint, prevcalled boolean) AS
$BODY$
BEGIN
  RETURN QUERY WITH table_list (tschema, tname) AS (
    SELECT n.nspname, c.relname FROM pg_class c LEFT JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname NOT IN ('information_schema', 'pg_catalog') ORDER BY n.nspname, c.relname
  )
  SELECT a.* FROM table_list t JOIN update_sequence(tschema, tname) a on t.tschema = a.tschema and t.tname = a.tname;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;

然后,作为故障转移的一部分,您需要做的就是在将虚拟 IP 切换到新的主服务器之前运行该update_all_sequences函数以确保序列处于正确的点(如果序列上的值> 1,则总是有可能缺少一些序列increment_by)。

如果您想从 1 以外的其他值开始,则可以有 seq_offset 和 seq_range 值,我们这样做是因为我们有一个地理分布的系统,并且来自其他数据中心的数据库从其他值开始。

答案2

我喜欢 Malcolm 的解决方案,因为它是一个纯粹的 postgres 选项。或者,如果您要将其改造成其他东西,以下是我在 Python 中循环遍历我的表的基本函数。删除订阅的那些假设您的主服务器完全死机,因此您的辅助服务器在唤醒时不会开始从中拉取数据。

ALTER SUBSCRIPTION [sub] DISABLE;
ALTER SUBSCRIPTION [sub] SET (slot_name = NONE);
COMMIT;
DROP SUBSCRIPTION [sub];

我使用一种不常见的方法,通过序列获取了所有表的列表。我无法弄清楚如何通过获取序列列表来反向引用。

SELECT table_name,column_name FROM information_schema.columns WHERE column_default like('nextval%');

然后我循环遍历它,首先收集最后一个值:

SELECT MAX([column]) FROM [table];

获取相关序列:

SELECT pg_get_serial_sequence('[table]','[column]');

最后,

SELECT setval('[sequence]','[maxvalue]');
COMMIT;

我的方法虽然不太好,但是却有效。

相关内容