Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ef59a28
Add pipeline mode build infrastructure and declarations
jjn1056 Mar 30, 2026
b02a796
Add pg_pipeline_status method and pipeline test file
jjn1056 Mar 30, 2026
75b188c
Add pg_enter_pipeline_mode and pg_exit_pipeline_mode
jjn1056 Mar 30, 2026
bf76f2a
Add pg_pipeline_sync and pg_getresult methods
jjn1056 Mar 30, 2026
ccc018f
Add pg_send_query_params with INSERT, SELECT, and error tests
jjn1056 Mar 30, 2026
80518d4
Add pg_send_prepare, pg_send_query_prepared, pg_send_flush_request, p…
jjn1056 Mar 30, 2026
1e2664a
Add COPY guards, exit-with-pending tests, multi-cycle and large pipeline
jjn1056 Mar 30, 2026
b4ccb0a
Add pipeline mode POD documentation and fix author tests
jjn1056 Mar 30, 2026
0274ad6
Initialize pipeline field, remove unused XS var, fix #ifdef scoping
jjn1056 Mar 30, 2026
dde3b47
Add pg_flush POD, Changes entry, deadlock note, getcopydata guard
jjn1056 Mar 30, 2026
bfa16ef
Add missing pipeline test coverage
jjn1056 Mar 30, 2026
8327437
Fix Changes entry format for release test compatibility
jjn1056 Mar 30, 2026
94f9a3a
Support ? placeholders in pipeline send methods
jjn1056 Mar 30, 2026
8de3c06
Replace naive ? regex with proper C-level placeholder conversion
jjn1056 Mar 30, 2026
e6596b5
Add edge case tests for C-level placeholder conversion
jjn1056 Mar 30, 2026
15001cb
Add trace context to placeholder converter, fix lint and spellcheck
jjn1056 Mar 30, 2026
696e424
Remove pipeline mode entry from released 3.20.0 Changes section
jjn1056 Apr 9, 2026
193583e
Fix irregular whitespace alignment in pipeline mode POD example
jjn1056 Apr 9, 2026
eba3c90
Clarify undef separator pattern in pipeline mode POD
jjn1056 Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ RT refers to rt.cpan.org

Version 3.20.0 (released March 19, 2026)

- Add pipeline mode support for PostgreSQL 14+
Comment thread
esabol marked this conversation as resolved.
Outdated

- Cleanup and improve the statistics_info() function.
We no longer return "clustered" as a "TYPE"
[Greg Sabino Mullane]
Expand Down
1 change: 1 addition & 0 deletions MANIFEST
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ t/07copy.t
t/08async.t
t/09arrays.t
t/10_pg_error_field.t
t/11pipeline.t
t/12placeholders.t
t/20savepoints.t
t/30unicode.t
Expand Down
3 changes: 3 additions & 0 deletions Makefile.PL
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ my $defines = " -DPGLIBVERSION=$serverversion -DPGDEFPORT=$defaultport";
if ($Config{ivsize} >= 8 && $serverversion >= 90300) {
$defines .= ' -DHAS64BITLO';
}
if ($serverversion >= 140000) {
$defines .= ' -DDBDPG_HAS_PIPELINE';
}
my $comp_opts = $Config{q{ccflags}} . $defines;

if ($ENV{DBDPG_GCCDEBUG}) {
Expand Down
6 changes: 6 additions & 0 deletions Pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ DBISTATE_DECLARE;
#define TRACE_PQCONSUMEINPUT TRACE_XX "%sPQconsumeInput\n", THEADER_slow)
#define TRACE_PQDB TRACE_XX "%sPQdb\n", THEADER_slow)
#define TRACE_PQENDCOPY TRACE_XX "%sPQendcopy\n", THEADER_slow)
#define TRACE_PQENTERPIPELINEMODE TRACE_XX "%sPQenterPipelineMode\n", THEADER_slow)
#define TRACE_PQERRORMESSAGE TRACE_XX "%sPQerrorMessage\n", THEADER_slow)
#define TRACE_PQEXEC TRACE_XX "%sPQexec\n", THEADER_slow)
#define TRACE_PQEXECPARAMS TRACE_XX "%sPQexecParams\n", THEADER_slow)
#define TRACE_PQEXECPREPARED TRACE_XX "%sPQexecPrepared\n", THEADER_slow)
#define TRACE_PQEXITPIPELINEMODE TRACE_XX "%sPQexitPipelineMode\n", THEADER_slow)
#define TRACE_PQFINISH TRACE_XX "%sPQfinish\n", THEADER_slow)
#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow)
#define TRACE_PQFMOD TRACE_XX "%sPQfmod\n", THEADER_slow)
#define TRACE_PQFNAME TRACE_XX "%sPQfname\n", THEADER_slow)
#define TRACE_PQFREECANCEL TRACE_XX "%sPQfreeCancel\n", THEADER_slow)
Expand All @@ -154,6 +157,8 @@ DBISTATE_DECLARE;
#define TRACE_PQOPTIONS TRACE_XX "%sPQoptions\n", THEADER_slow)
#define TRACE_PQPARAMETERSTATUS TRACE_XX "%sPQparameterStatus\n", THEADER_slow)
#define TRACE_PQPASS TRACE_XX "%sPQpass\n", THEADER_slow)
#define TRACE_PQPIPELINESTATUS TRACE_XX "%sPQpipelineStatus\n", THEADER_slow)
#define TRACE_PQPIPELINESYNC TRACE_XX "%sPQpipelineSync\n", THEADER_slow)
#define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow)
#define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow)
#define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow)
Expand All @@ -162,6 +167,7 @@ DBISTATE_DECLARE;
#define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow)
#define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow)
#define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow)
#define TRACE_PQSENDFLUSHREQUEST TRACE_XX "%sPQsendFlushRequest\n", THEADER_slow)
#define TRACE_PQSENDPREPARE TRACE_XX "%sPQsendPrepare\n", THEADER_slow)
#define TRACE_PQSENDQUERY TRACE_XX "%sPQsendQuery\n", THEADER_slow)
#define TRACE_PQSENDQUERYPARAMS TRACE_XX "%sPQsendQueryParams\n", THEADER_slow)
Expand Down
152 changes: 152 additions & 0 deletions Pg.pm
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,24 @@ use 5.008001;
DBD::Pg::db->install_method('pg_notifies');
DBD::Pg::db->install_method('pg_putcopydata');
DBD::Pg::db->install_method('pg_putcopyend');
DBD::Pg::db->install_method('pg_enter_pipeline_mode');
DBD::Pg::db->install_method('pg_exit_pipeline_mode');
DBD::Pg::db->install_method('pg_flush');
DBD::Pg::db->install_method('pg_getresult');
DBD::Pg::db->install_method('pg_pipeline_status');
DBD::Pg::db->install_method('pg_pipeline_sync');
DBD::Pg::db->install_method('pg_ping');
DBD::Pg::db->install_method('pg_send_query_params');
DBD::Pg::db->install_method('pg_putline');
DBD::Pg::db->install_method('pg_ready');
DBD::Pg::db->install_method('pg_release');
DBD::Pg::db->install_method('pg_result'); ## NOT duplicated below!
DBD::Pg::db->install_method('pg_rollback_to');
DBD::Pg::db->install_method('pg_savepoint');
DBD::Pg::db->install_method('pg_send_cancel');
DBD::Pg::db->install_method('pg_send_flush_request');
DBD::Pg::db->install_method('pg_send_prepare');
DBD::Pg::db->install_method('pg_send_query_prepared');
DBD::Pg::db->install_method('pg_server_trace');
DBD::Pg::db->install_method('pg_server_untrace');
DBD::Pg::db->install_method('pg_type_info');
Expand Down Expand Up @@ -4439,6 +4449,148 @@ When you are finished with pg_putcopydata, call pg_putcopyend to let the server
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
success. This method will fail if called when not in COPY IN mode.

=head2 Pipeline Mode

Pipeline mode (PostgreSQL 14+) batches multiple queries into fewer
network round trips. Instead of waiting for each query's result before sending
the next, queries are queued and their results collected after a synchronization
point. This can dramatically reduce latency for workloads that issue many
independent queries.

Pipeline mode uses the extended query protocol exclusively. COPY operations
are not supported in pipeline mode.

B<Note:> In blocking mode (the default), very large pipelines risk deadlock
if both the client send buffer and server result buffer fill simultaneously.
For large pipelines, use L</pg_flush> and poll C<pg_socket> for readiness
between sends, or keep pipelines to a reasonable size with periodic
L</pg_pipeline_sync> calls.

=head3 B<pg_enter_pipeline_mode>

Enters pipeline mode on the connection. The connection must be idle.
Entering pipeline mode when already in pipeline mode is a no-op.
Requires libpq from PostgreSQL 14 or later; croaks if compiled against
an older version. Returns 1 on success, 0 on failure.

$dbh->pg_enter_pipeline_mode();

=head3 B<pg_exit_pipeline_mode>

Exits pipeline mode. Fails if there are unconsumed results; call
L</pg_getresult> to drain all results before exiting. Exiting when
not in pipeline mode is a no-op. Returns 1 on success, 0 on failure.

$dbh->pg_exit_pipeline_mode();

=head3 B<pg_pipeline_status>

Returns the current pipeline status as an integer:

0 = not in pipeline mode
1 = pipeline mode active
2 = pipeline aborted (error occurred, waiting for sync)

my $status = $dbh->pg_pipeline_status();

=head3 B<pg_pipeline_sync>

Marks a synchronization point in the pipeline. Each sync point delimits
an implicit transaction: queries between sync points are atomically
committed or rolled back. Returns 1 on success. Croaks if not in
pipeline mode.

$dbh->pg_pipeline_sync();

=head3 B<pg_getresult>

Retrieves the next result from the connection. Returns a hashref or
C<undef> when no more results are available for the current query.

The hashref contains:

status - integer status code
error - error message string, or undef
ntuples - number of rows (for SELECT results)
nfields - number of columns
cmdtuples - affected row count string (for DML)
rows - arrayref of arrayrefs (only present for SELECT results)

Common status values: 1 (COMMAND_OK), 2 (TUPLES_OK), 7 (FATAL_ERROR),
10 (PIPELINE_SYNC), 11 (PIPELINE_ABORTED).

In pipeline mode, results arrive in this pattern for each query:

result -> undef -> result -> undef -> ... -> PIPELINE_SYNC

Example:

$dbh->pg_enter_pipeline_mode();
$dbh->pg_send_query_params('INSERT INTO t VALUES ($1)', [1]);
$dbh->pg_send_query_params('SELECT * FROM t', []);
$dbh->pg_pipeline_sync();

my $ins = $dbh->pg_getresult(); # {status => 1}
$dbh->pg_getresult(); # undef (separator)
Comment thread
esabol marked this conversation as resolved.
my $sel = $dbh->pg_getresult(); # {status => 2, rows => [...]}
my @rows = @{ $sel->{rows} };
$dbh->pg_getresult(); # undef (separator)
$dbh->pg_getresult(); # {status => 10} (PIPELINE_SYNC)

$dbh->pg_exit_pipeline_mode();

=head3 B<pg_send_query_params>

Sends a parameterized query into the pipeline without waiting for results.
Accepts both C<$1>, C<$2> (libpq native) and C<?> (DBI standard) placeholder
syntax. When C<?> placeholders are detected, they are automatically converted
to C<$1>, C<$2>, etc. Returns 1 on success, 0 on error.

$dbh->pg_send_query_params(
'INSERT INTO t(id, name) VALUES (?, ?)',
[42, 'Alice']
);

=head3 B<pg_send_prepare>

Sends a PREPARE into the pipeline. The prepared statement can be executed
with L</pg_send_query_prepared> in the same or a later pipeline.
Returns 1 on success.

$dbh->pg_send_prepare('my_insert', 'INSERT INTO t VALUES ($1, $2)');

=head3 B<pg_send_query_prepared>

Executes a previously prepared statement in the pipeline.
Returns 1 on success.

$dbh->pg_send_query_prepared('my_insert', [1, 'Alice']);

=head3 B<pg_send_flush_request>

Sends a flush request to the server without a synchronization point. This
makes buffered results available to the client without creating a transaction
boundary. Call L</pg_flush> afterward to push the request to the server.
Returns 1 on success.

$dbh->pg_send_flush_request();
$dbh->pg_flush();

=head3 B<pg_flush>

Flushes the libpq output buffer. Wraps C<PQflush> directly. Used after
L</pg_send_flush_request> to push the request to the server, or during
non-blocking COPY operations after L</pg_putcopydata>.

Return values:

0 = all data flushed successfully
1 = data still pending (caller should poll socket for write-readiness
and call pg_flush again)
-1 = error

my $status = $dbh->pg_flush();

=head2 Postgres limits

For convenience, DBD::Pg can export certain constants representing the limits of
Expand Down
101 changes: 101 additions & 0 deletions Pg.xs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,107 @@ pg_putcopyend(dbh)
OUTPUT:
RETVAL

I32
pg_pipeline_status(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_pipeline_status(dbh);
OUTPUT:
RETVAL

I32
pg_enter_pipeline_mode(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_enter_pipeline_mode(dbh);
OUTPUT:
RETVAL

I32
pg_exit_pipeline_mode(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_exit_pipeline_mode(dbh);
OUTPUT:
RETVAL

I32
pg_pipeline_sync(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_pipeline_sync(dbh);
OUTPUT:
RETVAL

void
pg_getresult(dbh)
SV * dbh
CODE:
ST(0) = pg_db_getresult(dbh);

I32
pg_send_query_params(dbh, sql, ...)
INPUT:
SV * dbh
char * sql
PREINIT:
AV * params = NULL;
CODE:
if (items > 2 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVAV) {
params = (AV *)SvRV(ST(2));
}
RETVAL = pg_db_send_query_params(dbh, sql, params);
OUTPUT:
RETVAL

I32
pg_send_prepare(dbh, name, sql)
INPUT:
SV * dbh
char * name
char * sql
CODE:
RETVAL = pg_db_send_prepare(dbh, name, sql);
OUTPUT:
RETVAL

I32
pg_send_query_prepared(dbh, name, ...)
INPUT:
SV * dbh
char * name
PREINIT:
AV * params = NULL;
CODE:
if (items > 2 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVAV) {
params = (AV *)SvRV(ST(2));
}
RETVAL = pg_db_send_query_prepared(dbh, name, params);
OUTPUT:
RETVAL

I32
pg_send_flush_request(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_send_flush_request(dbh);
OUTPUT:
RETVAL

I32
pg_flush(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_flush(dbh);
OUTPUT:
RETVAL

void
getline(dbh, buf, len)
PREINIT:
Expand Down
2 changes: 2 additions & 0 deletions README.dev
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ t/09arrays.t - Tests array manipulation.

t/10_pg_error_field.t - Tests $dbh->pg_error_field function

t/11pipeline.t - Tests pipeline mode methods.

t/12placeholders.t - Tests placeholders.

t/20savepoints.t - Test savepoints.
Expand Down
Loading