PGQ 教程

来自 PostgreSQL wiki
跳转到导航跳转到搜索

PGQ 是来自 Skytools 的队列解决方案。 Londiste 复制解决方案是一个基于 PGQ 构建的消费者守护进程,并且该 API 可供您使用,您可以创建任何基于队列的异步处理工具。

提示:截至 2021 年,SQL 标准中有一个关于附加部分的提议:“SQL/PGQ”(属性图查询)[1]。 不要将这里提供的信息与该提议混淆。

PGQ 可以解决什么问题?

PGQ 将解决实时事务的异步批处理。

这意味着您正在对实时环境中的行进行一些 INSERT/DELETE/UPDATE 操作,并且您希望触发一些操作,但不是在 COMMIT 时,而是在以后进行。也不是在太远的将来,只是异步地:不阻塞实时事务。

每个有一定规模的应用程序都需要将一些处理推迟到以后,而 PGQ 是为 PostgreSQL 构建的通用的高性能解决方案,它允许实现这一点:批处理。PGQ 将负责事件的异步消费、错误管理、排队行为等,它附带一个简单的 SQL API 来完成此操作。

API 文档在线,本文档将假定您了解它。

安装和设置

您需要在生成事件的数据库上运行一个滴答器实例,这意味着您需要提供一个 ticker.ini 配置文件并运行一个滴答器守护进程。

滴答器

有关该主题的介绍、配置示例以及如何启动守护进程,请参阅 Londiste_Tutorial#The_ticker_daemon

滴答器将生成滴答,然后这些滴答将作为批处理事件的边界。这些批次是按需生成的,每次消费者请求新批次时,都会生成,并且会生成,例如,任何批次都包含 ticker_max_lag 秒的事件或 ticker_max_count 事件,以先到者为准。

守护进程负责维护队列,这是通过将事件 INSERT 旋转到 3 个表中并使用 TRUNCATE 完成的,只要一个表不再包含未处理的事件,就会使用 TRUNCATE。

生成和消费事件

为了不丢失任何事件,您需要确保在生成任何事件之前至少注册了一个消费者,因为没有人感兴趣的事件会丢失。

您可以在同一个事件队列上拥有任意数量的消费者,但它们都会看到相同的事件,而不是共享工作负载。如果您考虑到主要应用程序是复制,那么这是有意义的:消费者是副本,当您有多个副本时,您希望它们都处理相同的事件,以确保所有副本都具有相同的数据集。

目前正在进行一项工作,以提供 pgq 协作消费者,目前这项工作停滞在这个[2] 邮件列表主题中。

更多信息

有关更多信息,请参阅以下内容

生成事件

文档对此解释得很清楚,您可以使用基于函数的 API 或使用基于触发器的 API 生成事件。后者更可取,因为它提供了简单的 SQL 数据类型验证和一个已知的事件插入接口,即经典的 INSERT INTO。

编写 PGQ 消费者

您最好的选择是用 Python 编写消费者代码,其次是 PHP。如果您想重用用其他语言编写的现有代码,您需要自己编写操作系统守护进程支持代码以及围绕 PGQ API 入口点的循环。

所有 PGQ 逻辑都编写为 PostgreSQL 扩展和服务器端代码,因此您的语言只需要提供一种方法来连接到您的 PostgreSQL 服务器并调用函数。

如何使用 PGQ SQL API

首先,您的消费者代码需要能够注册和注销到现有队列。

然后,主要思想非常简单:消费者尽可能快地循环遍历 pgq.next_batch() 调用,直到它收到 NULL,此时它应该休息一下。

请注意,当您的系统没有生成事件时,滴答器守护进程将在每个 ticker_idle_period 生成滴答,因此在低活动期间(例如调试第一个消费者时)获得空批次是完全正常的(并且预期的)。空批次需要尽快消耗,只有当 pgq.next_batch() 返回 NULL 时,您的自定义订阅者才需要休息。

当 pgq.next_batch() 返回一个 id(bigint)时,您可以处理批次事件,您可以使用 pgq.get_batch_events(batch_id) 获得这些事件。对每个事件执行的操作应特定于消费者代码,而不是在通用的 PGQ 消费抽象接口(或库、包,无论您的编程环境设施的名称是什么)中。

事件处理函数应该能够将事件标记为已处理(OK)、失败稍后重试。在重试情况下,该事件将在以后的批次中重新引入,具体取决于您希望它重试多长时间。

一旦处理完批次中的所有事件,您就必须调用 pgq.finish_batch(batch_id),然后 COMMIT 您的批处理事务。

用伪语言,这将给出

do
  batch_id = pgq.next_batch()
  if batch_id is not null
  then
    BEGIN;
    events = pgq.get_batch_events(batch_id);
    
    // this could be a function pointer or a virtual method or a delegate, e.g.
    user_defined_process_event(events);
    
    pgq.finish_batch(batch_id);
    
    COMMIT;
  end if;
while true;

远程消费

这就是事件处理发生在与生成事件的数据库不同的数据库上的情况。Londiste 是远程消费的一个很好的例子。

在这种情况下,您需要实现一种方法来避免在处理 COMMIT 成功但 pgq.finish_batch()(它必须在生产者数据库上完成而不是在消费者数据库上完成)失败时再次处理批次。其他 3 种情况如何不是问题留给读者。

Skytools 提供 pgq_ext(扩展)作为一种方法来确保您不会处理同一个批次多次,这里讨论了这一点:[[3]].

pgq_ext 的想法是在消费者数据库上记录最后一个已处理的 batch_id,并让 UPDATE 在处理事务中完成。这可以细化为最后一个已处理的事件 id,对于您没有简单的方法回滚所有批次处理以防远程站点发生故障的情况。

作为旁注,您可能想知道 londiste 使用了 pgq_ext,但将 SQL 条目(函数和表)移动到了 londiste 模式中。

非事务处理

您的消费者代码可能会发送邮件而不是更改数据库状态。在这种情况下,处理不是事务性的(您无法回滚批处理),您必须自己解决可靠性问题,PGQ 不会提供魔法。

使用 Python API

Skytools 主要用 Python 编写,并提供编写自己的消费者所需的一切。

用例:用于 count(*) 加速的行计数器

这是一个 PGQ 消费者的 Python 代码片段

import pgq 
class RowCounter(pgq.Consumer): 
    def process_batch(self, db, batch_id, ev_list): 
        tbl = self.cf.get('table_name'); delta = 0 
        for ev in ev_list: 
            if   ev.type == 'I' and ev.extra1 == tbl: delta += 1 
            elif ev.type == 'D' and ev.extra1 == tbl: delta -= 1 
            ev.tag_done() 
        q = 'select update_stats(%s, %s)' 
        db.cursor().execute(q, [tbl, delta]) 
RowCounter('row_counter', 'db', sys.argv[1:]).start() 

使用 PHP API

PHP API 也已贡献,允许轻松地用 PHP 编写 PGQ 消费者守护进程。有关更多详细信息,请参阅 README

这是一个例子

#!/usr/bin/php5
<?php
require( "pgq/PGQRemoteConsumer.php" );
require("conf/duration.php");

define("CONFIGURATION", "conf/duration.php");

$con_src = "dbname=foo_db port=5432 host=localhost";
$con_dst = "dbname=bar_db port=5432 host=localhost"

class PGQDaemonExample extends PGQRemoteConsumer
{
	public function config( )
	{
		unset($Config);
		if( $this->log !== null )
			$this->log->notice("Reloading configuration (HUP) from '%s'", CONFIGURATION);

		global $Config;
		require(CONFIGURATION);
		
		$this->loglevel = $Config["LOGLEVEL"];
		$this->logfile  = $Config["LOGFILE"];
		$this->delay    = $Config["DELAY"];
	}
	
	public function process_event( &$event ) 
	{
		$this->log->notice("Starting process event");
		
		$id = $event->data["id"];
		$code = $event->data["code"];
		$data = $event->data["data"];
		
		$this->log->notice("Processing event : %d ", $id);
		
		$query = sprintf( "UPDATE table SET ... WHERE ...", $code );
		$this->log->debug( $query );
		$result = pg_query( $this->pg_dst_con, $query );
		
		if( $result === False ) 
		{
			$this->log->error( "Unable to update : %s ", $query );
			$event->retry_delay = 2 * $this->delay;
			return PGQ_EVENT_RETRY;
		}
				
		return PGQ_EVENT_OK;
	}
}

$daemon = new PGQDaemonExample( "mydaemon", "daemonq", "table", $argc, $argv, $con_src, $con_dst );
?>

Java API

Java 消费者 API 也是 可用的。文档在 README.md 文件中提供。

故障排除

请参阅 Londiste 关于此主题的章节。