DTM

来自 PostgreSQL 维基
跳转到导航跳转到搜索

DTM 资源

可扩展的事务管理器 API

我们将开发一个通用的、可用于 PostgreSQL 的分布式事务管理器,它可以用于许多不同的场景。例如,有几种工具支持 PostgreSQL 分片(数据的水平分区):pgshard、pgpool 等。它们可以将数据分散到多个节点并执行分布式查询。但它们无法提供全局一致性。例如,如果您将资金从一个银行账户转到另一个账户,而账户分散在不同的节点上,那么计算总余额的分布式查询可能会显示不同的值。Postgres-XC/XL 中没有这样的问题,它使用 GTM(全局事务监控器)来提供全局快照。但是 Postgres-XC 是 Postgres 的克隆,它在 Postgres 代码中包含大量更改(超过 30 万行)。在当前状态下,没有机会将 Postgres-XC/XL 合并到主 Postgres 开发分支中。

与 Postgres-XC/XL 相反,我们的分布式事务管理器 (DTM) 旨在主要作为 PostgreSQL 扩展实现。我们将使用标准的 Postgres 扩展实现大部分功能,并使用 Postgres 回调机制来更改默认的 Postgres 行为。

我们必须回答的第一个问题是我们是否需要支持本地事务。本地事务仅在一个节点上执行,不与 DTM 通信。本地事务执行速度应与普通 Postgres 事务相同。Postgres XC/XL 不支持本地事务:所有事务都需要与 GTM 交互。但众所周知,分布式系统的性能只有在大多数事务都是本地事务的情况下才能令人满意。在设计良好的系统中,这通常是情况:应该一起访问的实体应该位于同一个节点上。

我们考虑三种不同的方法

所有方法都基于可扩展事务管理器 API 实现

typedef struct
{
	/* Get current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
	XidStatus (*GetTransactionStatus)(TransactionId xid, XLogRecPtr *lsn);

	/* Set current transaction status (encapsulation of TransactionIdGetStatus in clog.c) */
	void (*SetTransactionStatus)(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);

	/* Get current transaction snaphot (encapsulation of GetSnapshotData in procarray.c) */
	Snapshot (*GetSnapshot)(Snapshot snapshot);

	/* Assign new Xid to transaction (encapsulation of GetNewTransactionId in varsup.c) */
	TransactionId (*GetNewTransactionId)(bool isSubXact);

	/* Get oldest transaction Xid that was running when any current transaction was started (encapsulation of GetOldestXmin in procarray.c) */
	TransactionId (*GetOldestXmin)(Relation rel, bool ignoreVacuum);

	/* Check if current transaction is not yet completed (encapsulation of TransactionIdIsInProgress in procarray.c) */
	bool (*IsInProgress)(TransactionId xid);

	/* Get global transaction XID: returns XID of current transaction if it is global, InvalidTransactionId otherwise */
	TransactionId (*GetGlobalTransactionId)(void);

	/* Is the given XID still-in-progress according to the snapshot (encapsulation of XidInMVCCSnapshot in tqual.c) */
	bool (*IsInSnapshot)(TransactionId xid, Snapshot snapshot);

        /* Detect distributed deadlock */
        bool (*DetectGlobalDeadLock)(PGPROC* proc);
} TransactionManager;

此 API 没有引入任何新的抽象,它只是封装了一些(并非全部)与事务管理相关的 PostgreSQL 函数。我们无法证明这是必要且足够的函数集。但我们已经在此 API 之上实现了几个不同的分布式事务管理器,因此它已被证明足够灵活。

为什么我们选择这些函数?PostgreSQL 事务管理器有很多不同的函数,其中一些函数执行几乎相同的事情,但方式不同。例如,考虑 TransactionIdIsInProgress、TransactionIdIsKnownCompleted、TransactionIdDidCommit、TransactionIdDidAbort、TransactionIdGetStatus。其中一些访问 clog,一些访问 procarray,一些只检查缓存值。因此,它们分散在不同的 Postgres 模块中。

我们已经调查了所有这些函数的代码和用法。我们发现 TransactionIdDidCommit 总是由可见性检查在 TransactionIdIsInProgress 之后调用。它反过来使用 TransactionIdGetStatus 从 clog 中提取有关事务的信息。因此,我们在 XTM 中包含了 TransactionIdIsInProgress 和 TransactionIdGetStatus,但没有包含 TransactionIdDidCommit、TransactionIdDidAbort 和 TransactionIdIsKnownCompleted。

提交事务的故事类似。同样有一组函数:CommitTransactionCommand、CommitTransaction、CommitSubTransaction、RecordTransactionCommit、TransactionIdSetTreeStatus。CommitTransactionCommand - 是来自公共 API 的函数。它启动 Postgres TM 有限状态自动机的状态切换。我们不想影响此自动机的逻辑:对于 DTM 和本地 TM,它都是相同的。所以我们深入研究。CommitTransaction/CommitSubTransaction 由此 FSM 调用。我们也不想改变子事务处理的逻辑。再深入一步。因此,我们到达了 TransactionIdSetTreeStatus。这就是它包含在 XTM 中的原因。

并非总是清楚哪些功能应该由 TM API 涵盖。例如,检查元组的可见性是事务管理器的职责还是不是?或者死锁检测呢?我们将其包含在 XTM 中的主要论点是它们是实现分布式事务管理器所必需的,并且与事务管理器密切相关。将它们提取到单独的模块中会很困难,而且没有那么大的意义。例如,为了能够检测到分布式死锁,我们需要将本地事务 ID 与全局事务 ID 进行映射。这种映射在 TM 中可用。

XTM API 仅包含一个负责元组可见性的函数。在 utils/time/tqual.c 中有一系列 HeapTupleSatisfies* 函数,但它们基于其他几个函数,例如 TransactionIdIsInProgress、TransactionIdIsInProgress、XidInMVCCSnapshot... 由于我们不想更改堆元组格式,因此我们将元组状态位的操作保留原样,只重新定义 XidInMVCCSnapshot() 函数。

自定义 TM 不一定要使用 PostgreSQL 快照。例如,我们的 pg_tsdtm 基于 CSN(提交序列号)。但是,如果 TM 想要保留/扩展 PostgreSQL 快照方法,那么我们需要 XTM 中的方法来获取快照。PostgreSQL 使用不同类型的快照,并且有一组函数用于获取快照:GetCatalogSnapshot、GetTransactionSnapshot、GetNonHistoricCatalogSnapshot、GetLatestSnapshot。以及维护快照栈的函数。但是,有一个低级函数 GetSnapshotData(定义在 procarray.c 中,而不是 snapmgr.c 中),它实际上构建了快照。我们精确地覆盖了这个函数,因为所有更高层次的快照处理逻辑似乎对所有事务管理器都是通用的。

事务管理器可以分配自己的事务 XID,也可以重新使用由 PostgreSQL 核心分配的 XID。pg_dtm 使用仲裁器分配全局 XID。而 pg_tsdtm 使用本地 XID,但提供将其映射到 CSN 的功能。这就是我们将 varsup.c 中的 GetNewTransactionId 函数封装起来的原因。重新定义此函数需要大量代码的剪切粘贴。但很难将此函数分解为一些更低级别的调用。

以下是 PostgreSQL 调用图的子图。XTM 封装的函数用斜体标记

事务提交:

  • CommitTransactionCommand
    • CommitTransaction
      • RecordTransactionCommit
        • TransactionIdCommitTree
          • TransactionIdSetTreeStatus


可见性检查:

  • HeapTupleSatisfies*
    • TransactionIdIsCurrentTransactionId
    • TransactionIdIsInProgress
    • XidInMVCCSnapshot
    • TransactionIdDidCommit
      • TransactionLogFetch
        • TransactionIdGetStatus

获取快照

  • Get*Snapshot
    • GetSnapshotData

DTM 方法

首先,让我们解释 DTM 方法。它需要一个集中式服务(我们称之为仲裁器),负责

  • 分配事务 ID
  • 为参与事务的所有节点提供一致的快照
  • 决定事务提交或回滚
  • 维护分布式事务的状态(恢复需要)

DTM 的提议架构如下

      .------- Backend ----.
     /                     \
    /                       \
Coordinator -- Backend -- Arbiter
    \                       /
     \                     /
      `------ Backend ----´
libpq+xtm procs  libdtm+libsockhub    

有一个抽象协调器。协调器的职责是组织查询的分布式执行。它使用某种标准协议(例如 pqlib)连接到 Postgres 后端。它可以使用任何编程语言实现。它可以是某个特殊服务器进程(如 Posrgres-XC 中)或只是智能客户端或代理(如 pg_shard)... 我们希望为协调器提供最大程度的灵活性。现在,我们已经提供了 DTM 与 pg_shard 和 postgres_fdw 的集成

协调器使用标准 SQL 语句与数据节点通信。此外,我们的扩展还提供了一些特殊的 SQL 函数,用于标记全局事务的开始

CREATE FUNCTION dtm_begin_transaction() RETURNS integer
AS 'MODULE_PATHNAME','dtm_begin_transaction'
LANGUAGE C;

CREATE FUNCTION dtm_join_transaction(xid integer) RETURNS void
AS 'MODULE_PATHNAME','dtm_join_transaction'
LANGUAGE C;

在第一个节点上,协调器应该调用 dtm_begin_transaction(),它返回分配的事务 ID (XID)。分配的 XID 应该由协调器使用 dtm_join_transaction 函数传递给参与分布式事务的其他节点。请注意,dtm_begin/join_transaction() 也是在某个事务中执行的,该事务已经分配了 XID。因此,dtm_begin_transaction() 为下一个事务设置 XID。因此,它们应该按这种方式使用

主节点

xid = select dtm_begin_transaction() ;
begin transaction;
...
commit transaction;

其他节点

select dtm_join_transaction(xid) ;
begin transaction;
...
commit transaction;


以下是协调器代码的 GO 语言示例

xid := execQuery(con1, "select dtm_begin_transaction()")
exec(con2, "select dtm_join_transaction($1)", xid)
exec(con1, "begin transaction")
exec(con2, "begin transaction")
exec(con1, "update t set v = v + $1 where u=$2", amount, account1)
exec(con2, "update t set v = v - $1 where u=$2", amount, account2)
var wg sync.WaitGroup
wg.Add(2)
asyncExec(con1, “commit”, &wg)
asyncExec(cnn2, “commit”, &wg)
wg.Wait()
<pre>

And this is example of GO code working with pg_shard+DTM:

<pre>
exec(con, "begin transaction")
exec(con, "update t set v = v + $1 where u=$2", amount, account1)
exec(con, "update t set v = v - $1 where u=$2", amount, account2)
exec(con, “commit”)

最后是与 posstgres_fdw+DTM 一起工作的 GO 代码示例

exec(con, "select dtm_begin_transaction()")
exec(con, "begin transaction")
exec(con, "update t set v = v + $1 where u=$2", amount, account1)
exec(con, "update t set v = v - $1 where u=$2", amount, account2)
exec(con, “commit”)

评估 DTM 方法

如上的测试是在亚马逊集群实例上进行的。每个节点数量有三组测试。在此测试中,我们在每个步骤中更改了每个主机上基准程序的连接数量,以在每个主机上保持相同的后端数量。绿色线表示在相同工作负载下没有 DTM 的单节点性能。

Dtm-c3-2xlarge.png

tsDTM 方法

这种方法基于使用本地时间戳(假设所有节点上的系统时间或多或少同步):http://research.microsoft.com/en-us/people/samehe/clocksi.srds2013.pdf

这种方法不需要集中式时间戳权威机构,集中式时间戳权威机构可能成为瓶颈和单点故障。时钟同步的精度不会影响此算法的正确性,只会影响性能。这就是为什么希望使用类似 NTP 的东西来同步节点上的时间的原因。

这种方法应该提供最佳的可扩展性。但是恢复更具挑战性,因为它需要节点之间进行一些交互,确定法定人数并达成共识。

tsDTM 的架构比 DTM 更简单

      .------ Backend
     /             
    /               
Coordinator -- Backend
    \               
     \             
      `------ Backend 

但 tsDTM 的 SQL API 更复杂


-- Extend global transaction to global. This function is called only once when coordinator decides to access
-- data at some other node.
-- in: gtid (global transaction ID chosen by coordinator)
-- out: snapshot
CREATE FUNCTION dtm_extend(gtid cstring default null) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_extend'
LANGUAGE C;

-- This function should be called by coordinator for all nodes participated in global transaction except first node
-- (at which dtm_extend() is called.
-- in: snapshot (snapshot returned by dtm_extend
-- in: gtid (global transaction ID chosen by coordinator)
-- out: adjusted snapshot (to be passed to other nodes)
CREATE FUNCTION dtm_access(snapshot bigint, gtid cstring default null) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_access'
LANGUAGE C;

-- Start two phase commit
-- in: gtid (global transaction ID chosen by coordinator)
CREATE FUNCTION dtm_begin_prepare(gtid cstring) RETURNS void
AS 'MODULE_PATHNAME','dtm_begin_prepare'
LANGUAGE C;

-- Prepare transaction commit 
-- in: gtid (global transaction ID chosen by coordinator)
-- in: CSN (pass 0 for primary node)
-- out: CSN: propagated CSN
CREATE FUNCTION dtm_prepare(gtid cstring, csn bigint) RETURNS bigint
AS 'MODULE_PATHNAME','dtm_prepare'
LANGUAGE C;

-- Complete two phase commit
-- in: gtid (global transaction ID chosen by coordinator)
-- in: CSN (returned by dtm_prepare)
CREATE FUNCTION dtm_end_prepare(gtid cstring, csn bigint) RETURNS void
AS 'MODULE_PATHNAME','dtm_end_prepare'
LANGUAGE C;

在第一个节点上,协调器应该调用 dtm_extend(),它返回分配的快照。此快照应该使用 dtm_access 函数传播到所有其他节点。最后三个函数用于实现两阶段提交(目前在 PostgreSQL 两阶段提交之上实现)。

此 API 的预期用法如下

因此,它们应该按这种方式使用

主节点

begin transaction;
snapshot = select dtm_access(gtid, snapshot) ;

...

prepare transaction 'gtid';
select dtm_begin_prepare(gtid);
csn = select dtm_prepare(gtid,0)
select dtm_end_prepare(gtid,csn);
commit prepared 'gtid';

其他节点

begin transaction;
snapshot = select dtm_extend(snapshot,gtid) ;

...

prepare transaction 'gtid';
select dtm_begin_prepare(gtid);
csn = select dtm_prepare(gtid,csn)
select dtm_end_prepare(gtid,csn);
commit prepared 'gtid';

本地事务以通常的方式在单个节点上执行,而无需协调器执行任何特殊操作。实际上,所有事务都是作为本地事务启动的。如果事务需要涉及其他节点,则它应该调用dtm_extend,它将返回全局快照 (CSN)。此快照应该由 DTM 使用dtm_access传递到所有其他节点。

为了提交全局事务,协调器应该使用 dtm_prepare'(gtid,csn)' 获取新的 CSN。如果此阶段在所有节点上成功完成,则协调器在所有节点上执行COMMIT PREPARED 'GTID' '。如果在任何节点上发生故障,则协调器在所有节点上执行ROLLBACK PREPARED 'GTID'

以下是协调器代码的 GO 语言示例

tsDTM 评估

我们进行了一系列测试来研究基于时间戳的方法的写入可扩展性。以下事务针对多个服务器执行

exec(con1, "begin transaction")
exec(con2, "begin transaction")
snapshot = execQuery(con1, "select dtm_extend($1)", gtid)        
snapshot = execQuery(con2, "select dtm_access($1, $2)", snapshot, gtid)
exec(con1, "update t set v = v + $1 where u=$2", amount, account1)
exec(con2, "update t set v = v - $1 where u=$2", amount, account2)
exec(con1, "prepare transaction '" + gtid + "'")
exec(con2, "prepare transaction '" + gtid + "'")
exec(con1, "select dtm_begin_prepare($1)", gtid)
exec(con2, "select dtm_begin_prepare($1)", gtid)
csn = execQuery(con1, "select dtm_prepare($1, 0)", gtid)
csn = execQuery(con2, "select dtm_prepare($1, $2)", gtid, csn)
exec(con1, "select dtm_end_prepare($1, $2)", gtid, csn)
exec(con2, "select dtm_end_prepare($1, $2)", gtid, csn)
exec(con1, "commit prepared '" + gtid + "'")
exec(con2, "commit prepared '" + gtid + "'")

我们测量了具有不同成员数量的集群中的 tps。表大小 = 100k。每个线程 10k 个请求。

Tps vs threads.png Tps vs nodes.png


在具有更多节点的不同集群中。在此测试中,我们在每个步骤中更改了每个主机上基准程序的连接数量,以在每个主机上保持相同的后端数量。

GExfGnsyXyHzQAAAABJRU5ErkJggg==.png

绿色线表示在相同工作负载下使用 2pc(如 tsdtm)的单个服务器性能,红色线表示使用普通提交而不是 2pc 的性能。

多主

除了将我们的分布式事务管理器与 pg_shard 和 postgres_fdw 等现有系统集成之外,我们还基于 pg_dtm 为 PostgreSQL 实现了多主。

PostgreSQL 复制支持的一些背景:PostgreSQL 支持单向主从复制。此外,它支持 *热备用* 模式,在这种模式下,可以在副本上执行只读查询。复制可以是异步的也可以是同步的,但即使在同步复制的情况下,主服务器和副本之间也存在时间间隔,因此在副本上执行只读查询的客户端可能无法看到其在主服务器上先前事务中执行的更改。此外,目前只支持一个同步副本,因此实际上无法使用同步复制进行负载均衡。因此,PostgreSQL 中当前的流复制只提供容错 (HA),而不是性能扩展。

2ndQuadrant 提供 Postgres-BDR®,这是一款用于 PostgreSQL 的高级集群和扩展工具。BDR 包含许多功能,包括多主复制,允许对 DDL 和 DML 进行高效复制,并具有自动冲突解决功能。它还提供无冲突复制数据类型 (CRDT) 和 AlwaysOn 可用性。

我们的多主基于 2ndQuadrant 提供的 pglogical_output 插件。我们已经为这个插件实现了接收器部分,它也部分基于 BDR 代码。在接收器端,我们有一个后台工作者池,这些工作者并发地应用从远程 walsender 收到的更改。

全局一致性由 pg_dtm(集中式仲裁器)强制执行。从客户端的角度来看,它只使用一组相同的 PostgreSQL 实例。它可以登录并向其中任何一个发送查询。这并不意味着它是只读查询还是更新查询。但是当然,由于更新必须应用于所有节点,因此多主只能为只读查询提供扩展。

下图显示了安装在三个节点集群上的多主性能结果。我们运行我们的 dtmbench 基准测试,改变更新的百分比。我们将多主的性能结果与独立 PostgreSQL 的性能结果进行比较。为分布式事务提供 ACID 属性会增加必要的开销:多主在更新方面比单节点慢约 4 倍。在异步复制的情况下,可以获得更好的结果,但没有全局一致性。在大部分是只读的工作负载的情况下,多主提供了更好的性能,但要超过单节点,我们需要在集群中至少有 3 个节点。


Rw ratio.png

Reads.png

Updates.png

纵轴:TPS,千次