并行内部排序

来自 PostgreSQL wiki
(从 并行排序 重定向)
跳转到导航跳转到搜索

概述

注意: 本页描述了并行内部排序的早期设计。它与最近的 并行外部排序补丁系列(并行 CREATE INDEX) 无关。

如果我们能够并行执行大型排序,我们可以使查询,更重要的是索引构建更快。此外,它将成为支持通用 并行查询执行 的有用基础设施。我们设想用户后端将由一个或多个“工作者后端”支持,这些后端与普通后端类似,但没有客户端连接。总的来说,我们期望它们看起来非常像自动清理工作者进程,但有一些区别:每个工作者后端都将与一个用户后端相关联,数据将在用户后端及其工作者之间,以及可能在工作者之间来回传递,有时数据量很大。

进程管理和IPC

动态共享内存

共享内存比任何其他IPC机制都快,因为它允许数据在进程之间传递而无需内核参与。然而,主共享内存段不适合并行执行,因为它的大小在启动时固定且无法更改。并行排序可能需要许多 GB 的工作空间(取决于配置的 work_mem 值),这可能占系统可用内存的很大一部分。因此,我们不能采用在主共享内存段中预分配空间的方法,以防万一我们可能希望执行并行排序。相反,我们需要能够动态创建新的共享内存段。截至 2013 年 10 月(提交 0ac5e5a7e152504c71ce2168acc9cef7fde7893c),我们现在有了这种能力。API 看起来像这样

extern dsm_segment *dsm_create(Size size);
extern dsm_segment *dsm_attach(dsm_handle h);
extern void dsm_detach(dsm_segment *seg);
extern void *dsm_segment_address(dsm_segment *seg);
extern Size dsm_segment_map_length(dsm_segment *seg);
extern dsm_handle dsm_segment_handle(dsm_segment *seg);

截至撰写本文时(2013 年 10 月 31 日),动态共享内存系统存在一些残留问题。它没有与我们用于主共享内存段的 on_shmem_exit 机制类似的东西,也没有任何简单的方法像我们使用 ShmemInitHash、ShmemInitStruct 等在主共享内存段中布局数据结构一样。有一些 建议的补丁 来解决这些问题。

工作者后端

PostgreSQL 9.3 引入了在 postmaster 启动时配置后台工作者进程的能力(提交 da07a1e856511dca59cbb1357616e26baa64428e)。然而,这对于并行排序来说是不够的,因为我们需要能够在进行并行排序时启动后台工作者,并在之后关闭它们。这项功能是在 2013 年 7 月添加的(提交 7f7485a0cde92aa4ba235a1ffe4dda0ca0b6cc9a)。2013 年 8 月和 10 月的后续提交引入了确定动态注册的后台工作者是否启动、它是否仍在运行以及如果是,它的 PID 是什么的能力(提交 090d0f2050647958865cb495dff74af7257d2bb4)以及可靠地终止先前注册的后台工作者的能力(提交 523beaa11bdf6a9864e8978b467ed586b792c9ca)。相关的 API 看起来像这样

extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle);
extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp);
extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pid);
extern void TerminateBackgroundWorker(BackgroundWorkerHandle *handle);

共享内存消息队列

如果数据大小固定,大小在分配动态共享内存段时已知,并且在销毁动态共享内存段之前不需要回收使用的存储空间,那么通过动态共享内存段将数据从一个进程移动到另一个进程就足够容易了。然而,总的来说,这些事情并不总是正确的。有一些 建议的补丁 引入了共享内存消息队列系统,其 API 如下

extern shm_mq *shm_mq_create(void *address, Size size);
extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle);
extern void shm_mq_detach(shm_mq *);
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait);

该系统在共享内存中创建了一个环形缓冲区,可用于在后端之间传播消息。消息只是任意长度的字节字符串。这旨在提供一个更高层次的抽象,用户后端和工作者可以使用它相互通信。

并行模式

从长远来看,并行查询将需要能够从数据库表中读取数据。例如,带有过滤器的并行顺序扫描如果没有该功能就很难执行良好。在并行排序的上下文中,这种需求更为有限,当工作者后端遇到 toast 指针和 catcache 丢失时就会出现。例如,枚举类型的排序在没有读取 pg_enum 的情况下无法完成。即使执行任何类型的 btree 比较器 - 也需要一堆目录访问。在这些情况下,让工作者执行他们自己的表读取将很方便。主要的替代方案是通过用户后端引导这些读取。这在实现上将很复杂,并且会限制从并行化中获得的潜在收益。当我们稍后授予工作者直接读取表的权限时,这种机制就会过时。我们最好从完整的表读取功能开始。

这些问题是通用的;因此,似乎最好通过创建一个通用的“并行模式”概念来解决它们,这是一个启动工作者并使用适当状态初始化它们的通用工具包,以使它们能够有效地协助并行计算。

状态同步

我们将同步到每个新后台工作者的状态有几个部分。我们还将禁止工作者或用户后端对这些状态进行非瞬态更改,以便在整个并行阶段,所有后端都保证拥有相同的视图。这些状态的瞬态更改(这些更改特定于某个操作)即使在更持久的更改不被允许的情况下也可能是允许的。我们目前计划将以下状态从用户后端复制到每个工作者,并相应地禁止进一步更改这些值,直到并行模式结束

  • 用户 ID 和数据库。
  • GUC。即使在并行模式下,也允许对将被恢复的临时更改,例如进入 proconfig 设置的函数,但必须禁止对意图持续存在的更改(例如,类似于调用 set_config 的东西)。
  • 事务状态。在并行模式内不允许启动或结束子事务;例如,你不能执行像 SAVEPOINT 或 ROLLBACK TO SAVEPOINT 这样的操作。出于实现 PL/pgsql EXCEPTION 块或类似操作的目的而启动和结束子事务可能是可以的。在事务结束时,工作者不得尝试写入提交或中止记录;这应该由用户后端完成。(可以将工作者视为在父(子)事务的无 XID 只读子事务中运行,让它中止该子事务,然后丢弃其复制的事务堆栈的剩余部分。)
  • CurrentSnapshot、ActiveSnapshot。任何事务快照都必须在并行性开始之前在用户后端建立。禁止 CommandCounterIncrement(),这会修改 CurrentSnapshot。通过 PushActiveSnapshot()/PopActiveSnapshot() 进行的临时更改是可以的。
  • Combo CID 哈希。在并行模式下,所有数据库写入都需要被禁止,因为任何创建新的 combo CID 的操作都会有问题。我们最终可能会寻找放宽此限制的方法,但这对于并行排序来说并不是一个严重的问题。
  • 准备好的语句。或者,我们可以在并行模式下禁止 EXECUTE。无论如何,PREPARE/DEALLOCATE 都应该被禁止。内部使用缓存的计划,例如 PL/pgSQL 所采用的,是可以的。

后台工作者中禁止的操作

这些操作即使在并行工作者存在的情况下也可能在用户后端被允许,但工作者本身不得被允许执行它们。

  • 序列。序列值在后端本地内存中缓存,因此在多个协作后端中访问同一个序列将具有与在同一个后端中多次访问它不同的用户可见语义。这可能是可以的,因为用户可能不应该依赖于分配的特定值。一个更大的问题是,一个接触序列的并行工作者不会更新主服务器对 currval() 或 lastval() 的概念;与其尝试修复它,不如直接禁止在并行模式下访问所有序列。
  • 失效消息。工作者最好不要执行任何可能生成本地或共享失效消息的操作。本地失效消息似乎特别成问题,因为没有明显的方法来确保它们被所有后端同时处理。由于我们必须由于 combo CID 问题而禁止写入,因此不清楚我们是否需要在此禁止任何其他特定操作;但是,即使我们以某种方式修复了 combo CID 问题,我们仍然会遇到对系统目录的写入问题,至少会如此。
  • 游标。这些也有后端本地状态。即使我们将状态复制到所有工作者中,他们也会各自独立地更新游标位置,因此语义将与单个后端中发生的不匹配。似乎最好坚持要求后台工作者不能接触游标。我们最终可能会为瞬态游标(例如用于实现 PL/pgsql FOR 循环的游标)做出例外。
  • 大型对象。大型对象有相关的游标,对这些对象的任何操作都可能移动这些游标。由于目前我们没有办法在多个后端之间同步它,因此让我们先禁止工作者后端接触大型对象。
  • 监听/通知。这些需要专用代码将更改传播回用户后端。在并行模式下调用 LISTEN 的兴趣不大,而只要禁止写入,NOTIFY 的兴趣就依然不大。

消息传播

在执行并行操作的过程中,在工作进程中运行的代码可以通过调用 ereport() 或 elog() 来生成消息。除了 elevel >= PANIC 时,这些消息应传播回用户后端,并从那里传播到用户。请注意,保留错误的详细信息非常重要 - 例如,如果工作进程因 ERROR: division by zero 失败,则用户后端应以类似的方式失败。警告和较低级别的消息必须与错误一样多地传播。

为了解决这个问题,我们可以为每个工作进程建立一个共享内存消息队列,工作进程作为发送方,用户后端作为接收方。每当通常将消息发送到客户端时,工作进程都会将其编组并发送到此队列。用户后端需要定期从这些队列中提取消息,以便工作进程不会因队列已满而阻塞。

重量级锁管理

如果没有对锁管理器进行更改,如果工作进程尝试获取与用户后端已持有的锁不兼容的锁,则会发生未检测到的死锁。我们可以通过禁止用户后端持有任何强锁并允许工作进程仅获取弱锁来解决此问题,但这感觉像是人为限制。似乎修改重量级锁管理器会更好,这样用户后端及其工作进程就会形成一个锁定组,其锁是相互不冲突的。

成功完成时的资源转移

当并行操作在没有错误的情况下结束时,每个工作进程都会将本地状态的几个部分序列化,以便用户后端将其转移到自己的本地状态中。这种转移类似于在 CommitSubTransaction() 中发生的将资源转移到父(子)事务的转移。这或许可以使用上面讨论的消息传播下提出的相同消息队列来完成。以下资源受到影响

  • 重量级锁
  • pgstat 计数器更新
  • LocalPredicateLockHash

临时缓冲区

如果存在任何可能导致后台工作进程尝试访问属于用户后端的临时关系,则用户后端需要在进入并行模式之前将所有临时关系写入操作系统;随后,用户后端和工作进程都需要避免修改它们。(设置提示位可能没问题。)

或者,我们可以 (a) 禁止在并行模式下访问临时关系,或者 (b) 拥有某种选项,将临时缓冲区存储在动态共享内存中,并使用与并行模式使用时对普通缓冲区进行的锁和固定操作相同的类型。

决定是否并行化

决定是否并行化时,需要考虑两个因素:是否安全以及是否可以提高性能。

并行化安全吗?

如果并行工作进程最终执行了在并行模式下禁止的操作(例如写入数据库或操作游标),那么我们不想尝试并失败。相反,我们希望一开始就决定不调用并行化。此外,某些操作可能在无法实际检测到的方式下不安全;例如,使用用户指定种子值的伪随机数生成器不安全,因为用户期望特定随机数序列,该序列取决于后端本地状态,而该状态不会在协作后端之间共享。

除非我们能解决停机问题,否则不可能通过机械检查函数来最终确定函数是否安全,因此我们将需要将函数标记为并行安全或不安全。如果该标记足够通用,可以应用于我们将来可能希望并行化的其他事项,而不仅仅是并行排序,那将是很好的。目前尚不清楚简单形式的标记(例如 proisparallelsafe)是否是最佳前进路线,或者我们是否希望使其比这更通用。在初始实现中不安全的某些内容在未来的版本中可能会变得安全,因为功能已添加,因此,与其简单地标记为安全或不安全,我们可以尝试根据它们可能执行的操作对其进行分类,然后将其与当前支持并行模式的操作集进行比较。这种标记可能会找到其他应用(例如查询优化)。

并行化会使其更快吗?

在设置并行环境时会产生一些开销,因此,如果可以在并行模式下完成的工作量很少,那么这样做可能毫无意义。但是,估计的准确性仅与优化器本身一样准确。例如,在排序的情况下,int4 比较比文本比较便宜 ~1000 倍,但目前系统不知道这一点。我们需要插入更合理的 procost 值,才能在该领域做出明智的决策。

也许对于并行排序的第一个版本,我们可以简单地使用声明性语法;例如,仅为 CREATE INDEX 启用它,并让用户指定所需的并行化级别。忘记计划它,并且在查询执行期间永远不要使用并行化。

即使某个操作看起来像是并行化的良好候选者,也可能没有足够的未使用的后台工作进程插槽来对其进行并行化。更微妙的是,如果收益很小,那么为了实现这种收益而占用一个甚至多个后台工作进程插槽可能不利于系统的整体性能;也许我们最好等待更好的候选者出现。

并行内部排序

有几种可能的算法可用于并行内部排序。这些包括并行合并排序、并行快速排序和样本排序。并行合并排序是稳定的,但我们不保证排序是稳定的。与其他算法不同,并行快速排序不需要完整数组副本的空间,这对于 PostgreSQL 来说似乎是一个有用的优势,因为我们倾向于非常关注排序内存的使用情况。我们计划首先实现此算法。

快速排序很容易并行化,因为数组的每个分区都会生成两个子分区,每个子分区都可以由单独的处理器进行排序。但是,在完成前几个分区步骤之前,很少有 CPU 可以以这种方式使用。为了解决这个问题,并行快速排序的设计使其可以在单个分区操作中使用多个 CPU。在并行分区步骤中,我们假设数组被划分为固定大小的块。每个工作进程从数组的前面和后面分别声称一个块,然后对块对执行快速排序分区循环。当某个特定工作进程耗尽其前块或后块时,该块就被认为是中立的,并且工作进程声称一个新块来替换它。当没有剩余的块可供声称时,单个进程会通过清理任何未被任何工作进程中立的块来完成分区操作;这些块的数量不能超过使用的工作进程数量。并行分区比串行分区效率略低,因此它仅用于算法的前几个步骤,直到数组被充分细分,以允许每个 CPU 专注于数组的不同部分。

某些工作进程的进度可能比其他工作进程快,要么是因为它们收到更小的数组部分进行处理,要么是因为它们对特定值的比较操作执行速度比其他工作进程快(例如,某些值可能需要取消烘焙,而其他值很短且不需要取消烘焙;某些键可能在字符串开头附近不同,而另一些键仅在字符串的后面很远才不同;等等)。因此,即使每个工作进程都已专用于数组的单独块,也应提供机制以允许完成得早的工作进程“窃取”其他工作进程的工作。在每个递归步骤中,工作进程将分区分成两个子分区;它递归进入一个子分区并将另一个子分区的边界存储在共享内存中,这样其他空闲的工作进程就可以从那里窃取它。

共享内存分配上下文

如果我们计划执行并行内部排序,然后发现数据无法容纳在 work_mem 中,则我们需要切换到磁带排序。理想情况下,我们不希望这涉及到重新执行太多工作。例如,如果动态共享内存中元组的布局与非并行排序的布局有很大不同,我们可能需要重新排列所有数据;那将很糟糕。为了解决这个问题,也许我们可以创建一个新的内存上下文类型,该类型从共享内存中分配。当共享内存耗尽时,它会失效到后端私有内存(可能指向另一个上下文,或者可能使用定制代码)。我们可以检查上下文是否已溢出以确定我们是否应该尝试调用并行内部排序代码。

并行快速排序簿记

以下是我们可能用于并行快速排序的共享内存数据结构的草案;最初由 Noah Misch 编写,由 Robert Haas 修改。

/*
 * Size of a parallel work unit block.  Smaller values add contention, but
 * they reduce the amount of serial work to be done by the coordinator when
 * finalizing a parallel partition task.
 */
#define PAR_BLK_SIZE 2048

/*
 * When a worker would otherwise add to a pending list a range of tuples less
 * numerous than this, it shall instead fully sort the range itself.  A higher
 * value curbs bookkeeping overhead, but it causes earlier starvation.  The
 * ideal value is actually dependent on comparator cost.
 */
#define PAR_LOCAL_THRESHOLD 8192

/*
 * Index into a memtuples array.  Currently "int" for compatibility with
 * historic tuplesort design.  At some point, this should become intptr_t or
 * simply int64 to support internal sorts of >INT_MAX tuples.
 */
typedef int memtupsz_t;

/*
 * A parallel work unit block ordinal.  It's almost certainly the same type as
 * memtupsz_t; this is just documentation.
 */
typedef int parblk_t;

struct SortWorker
{
    /* Protects all subsequent fields. */
    slock_t     mutex;

    /* Process attached to this slot. */
    PGPROC     *proc;

    /* True if this sort process aborted unexpectedly. */
    bool        proc_died;

    /*----------
     * Work assignment indicator, either an index into AllSortWorkers or -1.
     * A worker plays one of three roles at any given time: coordinator,
     * helper, or individual:
     *      coordinator:    assignment == my_offset
     *      helper:         assignment != my_offset, assignment != -1
     *      individual:     assignment == -1
     *----------
     */
    int         assignment;

    /*
     * Coordinators and individuals: assigned tuple range.  May be -1 for an
     * individual to signal that the worker should search pending lists for
     * further work.
     */
    memtupsz_t  first;                  /* first tuple of in assigned range */
    memtupsz_t  count;                  /* number of tuples in assigned range */

    /*
     * Coordinators: block reservation bookkeeping.  Block b covers the range
     * [first, first + Min(count, b * PAR_BLK_SIZE)).  Thus the last block is
     * typically smaller than PAR_BLK_SIZE.  When a worker seeking to reserve
     * a new block finds next_left > next_right, its contribution to the
     * parallel phase of this partitioning task is finished.
     */
    parblk_t    next_left;              /* lowest unreserved left block */
    parblk_t    next_right;             /* highest unreserved right block */

    /* Individuals: first undefined position in the serial_pending stack. */
    int         pending_stack_pos;

    /*
     * Array with one entry per process participating in the sort.  Usage will
     * often be sparse, but we still consume only 78 KiB for 100 workers.
     */
    union
    {
        /*
         * Coordinators: left and right blocks reserved by each participating
         * worker (coordinator itself or a helper).  A participant's
         * AllSortWorkers offset is also its offset in the coordinator's "u"
         * array.  Left and right reservations both start at -1, indicating
         * that no block is reserved.  A busy worker has two blocks reserved.
         * A finished worker most typically has one block reserved, though it
         * will have zero blocks reserved when it neutralized its final pair
         * of blocks simultaneously.
         */
        struct
        {
            parblk_t    left;
            parblk_t    right;
        } parallel_reserved;

        /*
         * Individuals: stack of pending tuple ranges.  If the worker would
         * push onto its stack and finds the stack to be full, the worker will
         * continue stack depth accrual in local memory.  If we manage to fill
         * this much stack, it's unlikely that other individuals will drain it
         * and starve before we manage to add to it once again.
         */
        struct
        {
            memtupsz_t  first;
            memtupsz_t  count;
        } serial_pending;
    } u[FLEXIBLE_ARRAY_MEMBER];
};

/*
 * Array of SortWorker, one per process participating in this sort.  Offset
 * zero is for the foreground process that initiated the sort.  Each
 * background worker learns its offset at startup.
 */
struct SortWorker *AllSortWorker;