并行内部排序

摘自 PostgreSQL Wiki
跳转到导航跳转到搜索

概述

注意:此页面描述了并行内部排序的早期设计。它与最近的 并行外部排序补丁系列(并行 CREATE INDEX) 无关。

如果我们能够并行执行大排序,我们可以让查询,更重要的是让索引构建运行得更快。此外,这将是支持普通 并行查询执行 的有用的基础设施。我们想象用户后端将受到一个或多个“工作程序后端”支持,这些后端类似于普通后端,但没有客户端连接。从广义上讲,我们希望它们看起来很像自动真空工作进程,但有一些不同:每个后端都将与一个用户后端相关联,并且数据将在用户后端及其工作程序之间,以及工作程序之间传递,有时是大容量数据。

进程管理和 IPC

动态共享内存

共享内存比任何其他 IPC 机制快得多,因为它允许在进程之间传递数据,而无需内核参与。但是,主要共享内存段不适合并行执行,因为其大小在启动时固定并且无法更改。并行排序可能需要多个千兆字节的工作空间(取决于 work_mem 的配置值),这可能占到系统可用内存的很大一部分。因此,我们无法采用一种方法,即在主要共享内存段中预先分配空间,以防我们想要执行并行排序。相反,我们需要能够动态创建新的共享内存段。截至 2013 年 10 月(提交 0ac5e5a7e152504c71ce2168acc9cef7fde7893c),我们现在具有此功能。API 如下所示:

extern dsm_segment *dsm_create(Size size);
extern dsm_segment *dsm_attach(dsm_handle h);
extern void dsm_detach(dsm_segment *seg);
extern void *dsm_segment_address(dsm_segment *seg);
extern Size dsm_segment_map_length(dsm_segment *seg);
extern dsm_handle dsm_segment_handle(dsm_segment *seg);

在撰写本文时(2013 年 10 月 31 日),动态共享内存系统存在一些问题。对于我们用于主要共享内存段的 on_shmem_exit 机制,它没有可与之媲美的机制,也没有任何简单的方法可以像我们使用 ShmemInitHash、ShmemInitStruct 等为主要共享内存段设置共享内存中的数据结构那样。有 建议的补丁 来修复这些问题。

辅助后端

PostgreSQL 9.3 引入了在后端启动时配置后台工作进程的功能(提交 da07a1e856511dca59cbb1357616e26baa64428e)。然而,这不适用于并行排序,我们需要能够在需要并行排序时启动后台工作进程,然后在完成之后将其关闭。在 2013 年 7 月添加了此功能(提交 7f7485a0cde92aa4ba235a1ffe4dda0ca0b6cc9a)。2013 年 8 月和 2013 年 10 月的后续提交引入了确定动态注册的后台工作进程是否已启动、是否仍在运行以及如果是,PID 是什么(提交 090d0f2050647958865cb495dff74af7257d2bb4)的功能,以及可靠地终止先前注册的后台工作进程(提交 523beaa11bdf6a9864e8978b467ed586b792c9ca)的功能。相关的 API 如下所示:

extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle);
extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp);
extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pid);
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);

共享内存消息队列

如果数据是固定大小、在分配动态共享内存段时已知该大小,并且在销毁动态共享内存段之前不需要回收所用存储,则通过动态共享内存段将数据从一个进程移动到另一个进程 достаточно简单。但通常情况下,并非总是如此。有一些 已建议的修补程序 引入了具有以下 API 的共享内存消息队列系统

extern shm_mq *shm_mq_create(void *address, Size size);
extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle);
extern void shm_mq_detach(shm_mq *);
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait);

此系统在共享内存中创建一个环形缓冲区,可用于在后端之间传播消息。消息只是任意长度的字节字符串。这旨在提供更高级别的抽象,用户后端和工作进程可以使用该抽象进行相互通信。

并行模式

从长远来看,并行查询将需要读取数据库表数据的功能。例如,具有筛选器的并行顺序扫描在没有该功能时几乎无法很好地执行。在并行排序的上下文中,需要受到更多限制,这种情况会在工作进程后端遇到 toast 指针和 catcache 丢失时出现。例如,如果没有读取 pg_enum,则无法对 enum 类型进行排序。即使执行任何种类的 btree 比较器,也需要大量目录访问。在这种情况下,可以让工作进程执行其自己的表读取操作会很方便。主要的替代方法是通过用户后端引导此类读取操作。这将难以实现,并且会限制并行性所能实现的可能收益。当我们稍后授予工作进程直接读取表的权限时,这种机制将过时。我们最好从完整的表读取功能开始。

这些问题是普遍存在的;因此,解决这些问题的最佳方法似乎是创建一个“并行模式”的一般概念,一个用于启动工作进程和使用合适的的状态初始化工作进程的一般工具包,以便工作进程能够有意义地辅助并行计算。

状态同步

当每个新的后台工作进程启动时,我们要对其同步多个状态。我们还将在工作进程或用户后端中禁止对此状态进行非瞬态更改,以便在并行阶段,保证所有后端都对世界保持相同的看法。即使不允许更持久的更改,也允许针对特定操作而本地的对这些状态的瞬态更改。我们目前计划将以下状态从用户后端复制到每个工作进程,并相应地禁止在并行模式结束之前对这些值进行进一步更改

  • 用户 ID 和数据库。
  • GUC。即使在并行模式中,也会允许以下临时更改(在进入 proconfig 已设置的函数时会还原):但必须禁止打算持续存在的更改(例如类似于调用 set_config 的操作)。
  • 事务状态。在并行方式中不允许启动或结束子事务;例如,您无法执行类似 SAVEPOINT 或 ROLLBACK TO SAVEPOINT 的操作。但是,为实现 PL/pgsql EXCEPTION 块或类似块的目的而启动和结束子事务可能允许。在事务结束时,工作进程不得尝试写入提交或中止记录;这应由用户后端完成。(可能将工作进程视为在父 (子)xact 的无 XID 只读子事务中运行,并且会中止该子事务,然后丢弃其事务堆栈的其余部分,这样会很有帮助。)
  • CurrentSnapshot,ActiveSnapshot。必须在用户后端建立任何事务快照,然后才能开始并行处理。禁止 CommandCounterIncrement(),因为它会修改 CurrentSnapshot。PushActiveSnapshot()/PopActiveSnapshot() 方式所做的临时更改是允许的。
  • 组合 CID 哈希。所有数据库写入在并行方式中都应禁止,因为创建新组合 CID 的任何内容都会有问题。我们最终可能会寻找方法来放宽此限制,但这并不是并行排序的严重问题。
  • 已准备好的语句。或者,我们可以禁止在并行方式中执行 EXECUTE。无论如何都应禁止 PREPARE/DEALLOCATE。允许内部使用缓存计划,例如 PL/pgSQL 使用的计划。

在后台工作进程中禁止的操作

这些操作即使在并行工作进程存在时也可能在用户后端允许,但工作进程本身不得被允许执行这些操作。

  • 序列。序列值缓存在后端本地内存中,因此在多个协作后端中访问同一序列将具有与在同一后端中多次访问所产生的不同的面向用户语义。那可能允许,因为用户应该不依赖于分配的特定值。更大的问题是:接触序列的并行工作进程不会更新主机 currval() 或 lastval() 的概念;与其尝试解决此问题,不如仅仅禁止在并行方式中访问所有序列。
  • 使无效的消息。工作进程最好不执行可以生成本地或共享使无效消息的任何操作。本地使无效消息看起来特别有疑问,因为没有明显方法可以确保所有后端同时处理它们。由于组合 CID 问题,我们必须已经禁止写入,因此不清楚我们是否需要禁止此处的任何其他具体内容;但即使我们以某种方式解决了组合 CID 问题,我们对系统目录的写入仍然会有麻烦,至少。
  • 游标。这些还具有后端本地状态。即使我们将状态复制到所有工作进程中,它们每个都会分别更新游标位置,因此语义不会与单后端中发生的情况相匹配。最好坚持,后台工作进程不能接触游标。我们最终可能会为瞬态游标(例如用于实现 PL/pgsql FOR 循环的游标)提供例外。
  • 大对象。大对象具有关联的光标,该对象上的操作可能会移动这些光标。由于目前我们没有办法在多个后端之间同步这一点,因此现在我们只禁止辅助后端触及大对象。
  • LISTEN/NOTIFY。这需要专用代码才能将更改传播回用户后端。从并行模式调用 LISTEN 很可能不会受到关注,只要禁止写入,NOTIFY 也不会受到关注。

消息传播

在执行并行操作的过程中,在辅助程序中运行的代码可能会通过调用 ereport() 或 elog() 生成消息。除了 elevel >= PANIC 之外,这些消息应传播回用户后端,然后再传播到用户。请注意,保留错误的详细信息很重要 - 例如,如果辅助程序出现 ERROR: 除数为零 的错误,用户后端也应以类似的方式出现错误。警告和较小程度的消息必须与错误一样进行传播。

要解决这个问题,我们可以为每个辅助程序建立一个共享内存消息队列,其中辅助程序为发送方,用户后端为接收方。每当消息通常会被发送到客户端时,辅助程序将改为对其进行编组并将其发送到此队列。用户后端需要定期清空这些队列中的消息,以免辅助程序被队列占满从而导致阻塞。

重量级锁管理

如果不更改锁管理器,如果辅助程序后端尝试获取与用户后端已持有锁不兼容的锁,则会发生未检测到的死锁。我们可以通过禁止用户后端持有任何强锁并允许辅助程序后端仅获取弱锁来修复这个问题,但这感觉像是一个人为限制。最好修改重量级锁管理器,以便用户后端及其辅助程序形成一个锁定组,其锁定互不冲突。

成功完成后进行资源传输

当并行操作在没有错误的情况下结束时,每个辅助程序都将对几部分本地状态进行序列化,以供用户后端将其传输到自己的本地状态中。这种传输类似于在 CommitSubTransaction() 中发生的将资源传输到父事务(子事务)中。这可能可以通过在上述有关消息传播讨论中提出的相同消息队列来实现。受影响的资源如下

  • 重量级锁
  • pgstat 计数器更新
  • LocalPredicateLockHash

临时缓冲区

如果有任何可能性导致后台辅助程序尝试访问属于用户后端的临时关系,则用户后端需要在进入并行模式之前将其全部写出到操作系统中;随后,用户后端和辅助程序都需要避免修改它们。(尽管设置提示位可能可以。)

或者,我们(a)禁止在并发模式下访问临时关系或(b)采用某种选项使用对正常缓冲区的并发模式锁定和固定的内容,在动态共享内存中存储临时缓冲区。

决定是否并行化

决定是否并行化时需要考虑两个因素:是否安全、是否有助于性能。

并行安全吗?

如果并行工作者最终要执行在并发模式中禁止的任何操作(例如,写入数据库或操作光标),那么我们就不想尝试,也不想失败。我们想要决定从一开始就不要调用并行化。还有一些操作可能不安全,我们无法切实地检测到它们;例如,一个带有用户指定种子的伪随机数生成器是不安全的,因为用户期望一组特定的随机数,这些数依赖于后端本地状态,并且后端之间无法共享该状态。

除非我们能够解决停机问题,否则不可能通过机械地检查函数确定该函数是否安全,因此,我们需要将函数标记为并行安全或非并行安全。如果这样的标记足够一般,可以应用到我们未来希望并行化的其他内容(不仅仅是并行排序),那就太好了。尚不清楚简单的标记形式(例如,proisparallelsafe)是否是最好的前进路线,或者我们是否想使其比这更通用。在初始实现中某些非并行安全的内容,可能在将来的版本中随着功能的添加而变得安全,因此,与其简单地标记内容安全或不安全,不如尝试根据它们可能执行的操作对它们进行分类,然后将其与当前支持的并发模式操作集进行比较。这样的标记可能找到其他应用(例如,查询优化)。

并行化会使它更快吗?

在并行环境的设置中存在一些开销,因此,如果能在并发模式中完成的工作量很小,则可能没有必要这样操作。但是,估计值只会与优化器本身一样准确。例如,在排序的情况下,int4 比较比文本比较便宜约 1000 倍,但目前系统不知道这一点。我们需要插入更明智的 procost 值,以便在这个领域做出正确的决策。

对于并行排序的第一个版本,我们也许可以简单地使用声明语法;例如,仅针对 CREATE INDEX 启用此功能,并让用户指定所需的并行化级别。忘记规划并永远不在查询执行期间使用并行化。

即使某项操作看上去很适合于并行化,但可能没有足够的未用后台工作线程槽位来并行化它。更微妙的是,如果收益很小,那么占用一个或多个后台工作线程槽位以实现该收益对于整个系统性能来说可能不好;也许我们最好等待更好的候选者到来。

并行内部排序

有几种可能的算法可用于并行内部排序。这些算法包括并行归并排序、并行快速排序和样本排序。并行归并排序很稳定,但我们不保证排序的稳定性。与其他算法不同,并行快速排序不需要对数组的完整副本留出空间,这对于 PostgreSQL 来说似乎是一个有用的优势,因为我们往往非常关心排序内存使用量。我们计划首先实现此算法。

快速排序很容易实现并行化,因为数组的每个分区都会产生两个子分区,每个子分区都可以由一个单独的处理器排序。但是,在完成最初的几个分区步骤之前,采取这种方式几乎无法带来任何 CPU。为了解决这个问题,并行快速排序的设计可以用多个 CPU 来执行单个分区操作。在并行分区步骤期间,我们设想数组被分成固定大小的块。每个工作线程分别从数组的前端和后端声明一个块,然后对块对执行快速排序分区循环。当特定工作线程用尽其前端块或后端块时,该块被视为已中和,并且工作线程会声明一个新块以替换该块。当没有块可声明时,单个进程通过清理所有未被任何工作线程中和掉的后分区块来完成分区操作;这些块的数量不会多于所用工作线程的数量。并行分区效率略低于串行分区,因此仅在算法的前几个步骤中使用它,直到数组被充分分解到足以让每个 CPU 专门处理数组的不同部分。

有些工作线程可能比其他工作线程进展得更快,既可能是因为它们收到的数组部分较小,也可能是因为它们特定值的比较操作比其他工作线程执行得更快(例如,有些值可能需要去软包,而其他值则很短,不需要;有些键可能在字符串开头不同,而另一些则直到后面才不同;等等)。因此,即使每个工作线程都被专用于数组的一个独立块之后,也应允许早期完成的工作线程“窃取”其他工作线程的工作。在每个递归步骤中,工作线程将一个分区分成两个子分区;它会递归到一个分区中,并将另一个分区边界存储在共享内存中,在此空闲工作线程可以窃取它。

共享内存分配上下文

如果我们计划执行并行内部排序,然后发现数据无法纳入 work_mem,我们需要切换至磁带排序。理想情况下,我们不希望这涉及到重新执行过多的工作。例如,如果动态共享内存中元组的布局与非并行排序执行操作的布局非常不同,我们可能需要重新排列所有数据;这非常麻烦。为解决此问题,也许我们可以创建一个从共享内存进行分配的新类型内存上下文。当共享内存用尽时,它将切换到后端私有内存(可能指向另一个上下文,或可能使用定制代码)。我们可以检查上下文是否溢出,以了解是否应尝试调用并行内部排序代码。

并行快速排序记账

以下针对我们可能使用并行快速排序的共享内存数据结构的备草稿;最初由 Noah Misch 编制,由 Robert Haas 修改。

/*
 * Size of a parallel work unit block.  Smaller values add contention, but
 * they reduce the amount of serial work to be done by the coordinator when
 * finalizing a parallel partition task.
 */
#define PAR_BLK_SIZE 2048

/*
 * When a worker would otherwise add to a pending list a range of tuples less
 * numerous than this, it shall instead fully sort the range itself.  A higher
 * value curbs bookkeeping overhead, but it causes earlier starvation.  The
 * ideal value is actually dependent on comparator cost.
 */
#define PAR_LOCAL_THRESHOLD 8192

/*
 * Index into a memtuples array.  Currently "int" for compatibility with
 * historic tuplesort design.  At some point, this should become intptr_t or
 * simply int64 to support internal sorts of >INT_MAX tuples.
 */
typedef int memtupsz_t;

/*
 * A parallel work unit block ordinal.  It's almost certainly the same type as
 * memtupsz_t; this is just documentation.
 */
typedef int parblk_t;

struct SortWorker
{
    /* Protects all subsequent fields. */
    slock_t     mutex;

    /* Process attached to this slot. */
    PGPROC     *proc;

    /* True if this sort process aborted unexpectedly. */
    bool        proc_died;

    /*----------
     * Work assignment indicator, either an index into AllSortWorkers or -1.
     * A worker plays one of three roles at any given time: coordinator,
     * helper, or individual:
     *      coordinator:    assignment == my_offset
     *      helper:         assignment != my_offset, assignment != -1
     *      individual:     assignment == -1
     *----------
     */
    int         assignment;

    /*
     * Coordinators and individuals: assigned tuple range.  May be -1 for an
     * individual to signal that the worker should search pending lists for
     * further work.
     */
    memtupsz_t  first;                  /* first tuple of in assigned range */
    memtupsz_t  count;                  /* number of tuples in assigned range */

    /*
     * Coordinators: block reservation bookkeeping.  Block b covers the range
     * [first, first + Min(count, b * PAR_BLK_SIZE)).  Thus the last block is
     * typically smaller than PAR_BLK_SIZE.  When a worker seeking to reserve
     * a new block finds next_left > next_right, its contribution to the
     * parallel phase of this partitioning task is finished.
     */
    parblk_t    next_left;              /* lowest unreserved left block */
    parblk_t    next_right;             /* highest unreserved right block */

    /* Individuals: first undefined position in the serial_pending stack. */
    int         pending_stack_pos;

    /*
     * Array with one entry per process participating in the sort.  Usage will
     * often be sparse, but we still consume only 78 KiB for 100 workers.
     */
    union
    {
        /*
         * Coordinators: left and right blocks reserved by each participating
         * worker (coordinator itself or a helper).  A participant's
         * AllSortWorkers offset is also its offset in the coordinator's "u"
         * array.  Left and right reservations both start at -1, indicating
         * that no block is reserved.  A busy worker has two blocks reserved.
         * A finished worker most typically has one block reserved, though it
         * will have zero blocks reserved when it neutralized its final pair
         * of blocks simultaneously.
         */
        struct
        {
            parblk_t    left;
            parblk_t    right;
        } parallel_reserved;

        /*
         * Individuals: stack of pending tuple ranges.  If the worker would
         * push onto its stack and finds the stack to be full, the worker will
         * continue stack depth accrual in local memory.  If we manage to fill
         * this much stack, it's unlikely that other individuals will drain it
         * and starve before we manage to add to it once again.
         */
        struct
        {
            memtupsz_t  first;
            memtupsz_t  count;
        } serial_pending;
    } u[FLEXIBLE_ARRAY_MEMBER];
};

/*
 * Array of SortWorker, one per process participating in this sort.  Offset
 * zero is for the foreground process that initiated the sort.  Each
 * background worker learns its offset at startup.
 */
struct SortWorker *AllSortWorker;