From ef59a28367043f2c117e99b8e23968ec39cfe097 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 20:12:02 -0500 Subject: [PATCH 01/19] Add pipeline mode build infrastructure and declarations Add DBDPG_HAS_PIPELINE compile-time define for PostgreSQL 14+, trace macros for pipeline libpq functions, pipeline tracking field in imp_dbh_st, and function declarations for all pipeline methods. Co-Authored-By: Claude Opus 4.6 (1M context) --- Makefile.PL | 3 +++ Pg.h | 5 +++++ dbdimp.h | 12 ++++++++++++ 3 files changed, 20 insertions(+) diff --git a/Makefile.PL b/Makefile.PL index 2e500e60..6bee1036 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -186,6 +186,9 @@ my $defines = " -DPGLIBVERSION=$serverversion -DPGDEFPORT=$defaultport"; if ($Config{ivsize} >= 8 && $serverversion >= 90300) { $defines .= ' -DHAS64BITLO'; } +if ($serverversion >= 140000) { + $defines .= ' -DDBDPG_HAS_PIPELINE'; +} my $comp_opts = $Config{q{ccflags}} . $defines; if ($ENV{DBDPG_GCCDEBUG}) { diff --git a/Pg.h b/Pg.h index 1b83e194..19461607 100644 --- a/Pg.h +++ b/Pg.h @@ -126,10 +126,12 @@ DBISTATE_DECLARE; #define TRACE_PQCONSUMEINPUT TRACE_XX "%sPQconsumeInput\n", THEADER_slow) #define TRACE_PQDB TRACE_XX "%sPQdb\n", THEADER_slow) #define TRACE_PQENDCOPY TRACE_XX "%sPQendcopy\n", THEADER_slow) +#define TRACE_PQENTERPIPELINEMODE TRACE_XX "%sPQenterPipelineMode\n", THEADER_slow) #define TRACE_PQERRORMESSAGE TRACE_XX "%sPQerrorMessage\n", THEADER_slow) #define TRACE_PQEXEC TRACE_XX "%sPQexec\n", THEADER_slow) #define TRACE_PQEXECPARAMS TRACE_XX "%sPQexecParams\n", THEADER_slow) #define TRACE_PQEXECPREPARED TRACE_XX "%sPQexecPrepared\n", THEADER_slow) +#define TRACE_PQEXITPIPELINEMODE TRACE_XX "%sPQexitPipelineMode\n", THEADER_slow) #define TRACE_PQFINISH TRACE_XX "%sPQfinish\n", THEADER_slow) #define TRACE_PQFMOD TRACE_XX "%sPQfmod\n", THEADER_slow) #define TRACE_PQFNAME TRACE_XX "%sPQfname\n", THEADER_slow) @@ -154,6 +156,8 @@ DBISTATE_DECLARE; #define TRACE_PQOPTIONS TRACE_XX "%sPQoptions\n", THEADER_slow) #define TRACE_PQPARAMETERSTATUS TRACE_XX "%sPQparameterStatus\n", THEADER_slow) #define TRACE_PQPASS TRACE_XX "%sPQpass\n", THEADER_slow) +#define TRACE_PQPIPELINESTATUS TRACE_XX "%sPQpipelineStatus\n", THEADER_slow) +#define TRACE_PQPIPELINESYNC TRACE_XX "%sPQpipelineSync\n", THEADER_slow) #define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow) #define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow) #define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow) @@ -162,6 +166,7 @@ DBISTATE_DECLARE; #define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow) #define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow) #define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow) +#define TRACE_PQSENDFLUSHREQUEST TRACE_XX "%sPQsendFlushRequest\n", THEADER_slow) #define TRACE_PQSENDPREPARE TRACE_XX "%sPQsendPrepare\n", THEADER_slow) #define TRACE_PQSENDQUERY TRACE_XX "%sPQsendQuery\n", THEADER_slow) #define TRACE_PQSENDQUERYPARAMS TRACE_XX "%sPQsendQueryParams\n", THEADER_slow) diff --git a/dbdimp.h b/dbdimp.h index 37a3203c..058506d2 100644 --- a/dbdimp.h +++ b/dbdimp.h @@ -22,6 +22,7 @@ struct imp_dbh_st { int prepare_number; /* internal prepared statement name modifier */ int copystate; /* 0=none PGRES_COPY_IN PGRES_COPY_OUT */ bool copybinary; /* whether the copy is in binary format */ + int pipeline; /* 0=off, tracks PQpipelineStatus */ int pg_errorlevel; /* PQsetErrorVerbosity. Set by user, defaults to 1 */ bool server_prepare; /* do we want to use PQexecPrepared? Can be changed by user */ int switch_prepared; /* how many executes until we switch to PQexecPrepared */ @@ -241,6 +242,17 @@ int pg_db_putcopyend (SV * dbh); int pg_db_endcopy (SV * dbh); +/* Pipeline mode (PG14+) */ +int pg_db_enter_pipeline_mode (SV *dbh); +int pg_db_exit_pipeline_mode (SV *dbh); +int pg_db_pipeline_sync (SV *dbh); +int pg_db_pipeline_status (SV *dbh); +SV* pg_db_getresult (SV *dbh); +int pg_db_send_query_params (SV *dbh, char *sql, AV *params); +int pg_db_send_query_prepared (SV *dbh, char *name, AV *params); +int pg_db_send_prepare (SV *dbh, char *name, char *sql); +int pg_db_send_flush_request (SV *dbh); + SV * pg_db_error_field (SV *dbh, char * fieldname); void pg_db_pg_server_trace (SV *dbh, FILE *fh); From b02a7968ea80fbfeb0262a424dbbbe52a5f54193 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 20:17:13 -0500 Subject: [PATCH 02/19] Add pg_pipeline_status method and pipeline test file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create t/11pipeline.t with PG14+ version detection. Implement pg_pipeline_status wrapping PQpipelineStatus — returns 0 (off), 1 (on), or 2 (aborted). Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 1 + Pg.xs | 9 +++++++++ dbdimp.c | 22 ++++++++++++++++++++++ t/11pipeline.t | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 67 insertions(+) create mode 100644 t/11pipeline.t diff --git a/Pg.pm b/Pg.pm index a2b403a9..bb6e76c6 100644 --- a/Pg.pm +++ b/Pg.pm @@ -160,6 +160,7 @@ use 5.008001; DBD::Pg::db->install_method('pg_notifies'); DBD::Pg::db->install_method('pg_putcopydata'); DBD::Pg::db->install_method('pg_putcopyend'); + DBD::Pg::db->install_method('pg_pipeline_status'); DBD::Pg::db->install_method('pg_ping'); DBD::Pg::db->install_method('pg_putline'); DBD::Pg::db->install_method('pg_ready'); diff --git a/Pg.xs b/Pg.xs index 1f856d10..477cc9b2 100644 --- a/Pg.xs +++ b/Pg.xs @@ -799,6 +799,15 @@ pg_putcopyend(dbh) OUTPUT: RETVAL +I32 +pg_pipeline_status(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_pipeline_status(dbh); + OUTPUT: + RETVAL + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index cd6e0937..074358e4 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4615,6 +4615,28 @@ int pg_db_putcopyend (SV * dbh) } /* end of pg_db_putcopyend */ +/* ================================================================== */ +int pg_db_pipeline_status (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_pipeline_status\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + int status; + TRACE_PQPIPELINESTATUS; + status = PQpipelineStatus(imp_dbh->conn); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_pipeline_status (%d)\n", THEADER_slow, status); + return status; +#else + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_pipeline_status (not supported)\n", THEADER_slow); + return 0; +#endif + +} /* end of pg_db_pipeline_status */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/t/11pipeline.t b/t/11pipeline.t new file mode 100644 index 00000000..d480c9da --- /dev/null +++ b/t/11pipeline.t @@ -0,0 +1,35 @@ +#!perl + +## Test pipeline mode functionality (PostgreSQL 14+) + +use 5.008001; +use strict; +use warnings; +use lib 'blib/lib', 'blib/arch', 't'; +use DBD::Pg ':async'; +use Test::More; +require 'dbdpg_test_setup.pl'; +select(($|=1,select(STDERR),$|=1)[1]); + +my $dbh = connect_database(); + +if (! $dbh) { + plan skip_all => 'Connection to database failed, cannot continue testing'; +} + +my $pgversion = $dbh->{pg_lib_version}; + +if ($pgversion < 140000) { + plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; +} + +plan tests => 1; + +my ($result, $expected, $t); + +$t='pg_pipeline_status returns 0 (off) by default'; +my $status = $dbh->pg_pipeline_status(); +is ($status, 0, $t); + +cleanup_database($dbh,'test'); +$dbh->disconnect; From 75b188c179d11530d1ed9d2a04ed718618814250 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 20:22:41 -0500 Subject: [PATCH 03/19] Add pg_enter_pipeline_mode and pg_exit_pipeline_mode Wrap PQenterPipelineMode and PQexitPipelineMode as $dbh methods. Both are idempotent, croak if compiled without PG14+ support. Tests cover enter, exit, idempotency, and post-pipeline query sanity. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 2 ++ Pg.xs | 18 ++++++++++++++++ dbdimp.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++ t/11pipeline.t | 36 +++++++++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 1 deletion(-) diff --git a/Pg.pm b/Pg.pm index bb6e76c6..4f869a8f 100644 --- a/Pg.pm +++ b/Pg.pm @@ -160,6 +160,8 @@ use 5.008001; DBD::Pg::db->install_method('pg_notifies'); DBD::Pg::db->install_method('pg_putcopydata'); DBD::Pg::db->install_method('pg_putcopyend'); + DBD::Pg::db->install_method('pg_enter_pipeline_mode'); + DBD::Pg::db->install_method('pg_exit_pipeline_mode'); DBD::Pg::db->install_method('pg_pipeline_status'); DBD::Pg::db->install_method('pg_ping'); DBD::Pg::db->install_method('pg_putline'); diff --git a/Pg.xs b/Pg.xs index 477cc9b2..6f5cad6a 100644 --- a/Pg.xs +++ b/Pg.xs @@ -808,6 +808,24 @@ pg_pipeline_status(dbh) OUTPUT: RETVAL +I32 +pg_enter_pipeline_mode(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_enter_pipeline_mode(dbh); + OUTPUT: + RETVAL + +I32 +pg_exit_pipeline_mode(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_exit_pipeline_mode(dbh); + OUTPUT: + RETVAL + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index 074358e4..4bd0bf0f 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4637,6 +4637,62 @@ int pg_db_pipeline_status (SV * dbh) } /* end of pg_db_pipeline_status */ +/* ================================================================== */ +int pg_db_enter_pipeline_mode (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_enter_pipeline_mode\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + TRACE_PQENTERPIPELINEMODE; + if (0 == PQenterPipelineMode(imp_dbh->conn)) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_enter_pipeline_mode (error)\n", THEADER_slow); + return 0; + } + imp_dbh->pipeline = 1; + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_enter_pipeline_mode (1)\n", THEADER_slow); + return 1; +#else + croak("pg_enter_pipeline_mode requires PostgreSQL 14 or later"); + return 0; +#endif + +} /* end of pg_db_enter_pipeline_mode */ + + +/* ================================================================== */ +int pg_db_exit_pipeline_mode (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_exit_pipeline_mode\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + TRACE_PQEXITPIPELINEMODE; + if (0 == PQexitPipelineMode(imp_dbh->conn)) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_exit_pipeline_mode (error)\n", THEADER_slow); + return 0; + } + imp_dbh->pipeline = 0; + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_exit_pipeline_mode (1)\n", THEADER_slow); + return 1; +#else + croak("pg_exit_pipeline_mode requires PostgreSQL 14 or later"); + return 0; +#endif + +} /* end of pg_db_exit_pipeline_mode */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/t/11pipeline.t b/t/11pipeline.t index d480c9da..8174b4a5 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 1; +plan tests => 8; my ($result, $expected, $t); @@ -31,5 +31,39 @@ $t='pg_pipeline_status returns 0 (off) by default'; my $status = $dbh->pg_pipeline_status(); is ($status, 0, $t); +# Enter pipeline mode + +$t='pg_enter_pipeline_mode succeeds'; +$result = $dbh->pg_enter_pipeline_mode(); +is ($result, 1, $t); + +$t='pg_pipeline_status returns 1 (on) after entering pipeline mode'; +$status = $dbh->pg_pipeline_status(); +is ($status, 1, $t); + +$t='pg_enter_pipeline_mode is idempotent'; +$result = $dbh->pg_enter_pipeline_mode(); +is ($result, 1, $t); + +# Exit pipeline mode + +$t='pg_exit_pipeline_mode succeeds'; +$result = $dbh->pg_exit_pipeline_mode(); +is ($result, 1, $t); + +$t='pg_pipeline_status returns 0 (off) after exiting pipeline mode'; +$status = $dbh->pg_pipeline_status(); +is ($status, 0, $t); + +$t='pg_exit_pipeline_mode is idempotent when not in pipeline mode'; +$result = $dbh->pg_exit_pipeline_mode(); +is ($result, 1, $t); + +$t='Normal queries work after entering and exiting pipeline mode'; +eval { + $dbh->do('SELECT 1'); +}; +is ($@, q{}, $t); + cleanup_database($dbh,'test'); $dbh->disconnect; From bf76f2a83b3d61e679db78447c665bce7e5a364a Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 20:28:06 -0500 Subject: [PATCH 04/19] Add pg_pipeline_sync and pg_getresult methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pg_pipeline_sync wraps PQpipelineSync — sends a sync point that delimits an implicit transaction boundary. pg_getresult wraps PQgetResult — returns a hashref with status, error, ntuples, nfields, cmdtuples, and rows (for SELECT results), or undef when no more results are available. This enables manual pipeline result collection. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 2 + Pg.xs | 17 +++++++++ dbdimp.c | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ t/11pipeline.t | 55 +++++++++++++++++++++++++- 4 files changed, 175 insertions(+), 1 deletion(-) diff --git a/Pg.pm b/Pg.pm index 4f869a8f..777fa2f7 100644 --- a/Pg.pm +++ b/Pg.pm @@ -162,7 +162,9 @@ use 5.008001; DBD::Pg::db->install_method('pg_putcopyend'); DBD::Pg::db->install_method('pg_enter_pipeline_mode'); DBD::Pg::db->install_method('pg_exit_pipeline_mode'); + DBD::Pg::db->install_method('pg_getresult'); DBD::Pg::db->install_method('pg_pipeline_status'); + DBD::Pg::db->install_method('pg_pipeline_sync'); DBD::Pg::db->install_method('pg_ping'); DBD::Pg::db->install_method('pg_putline'); DBD::Pg::db->install_method('pg_ready'); diff --git a/Pg.xs b/Pg.xs index 6f5cad6a..ab344678 100644 --- a/Pg.xs +++ b/Pg.xs @@ -826,6 +826,23 @@ pg_exit_pipeline_mode(dbh) OUTPUT: RETVAL +I32 +pg_pipeline_sync(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_pipeline_sync(dbh); + OUTPUT: + RETVAL + +void +pg_getresult(dbh) + SV * dbh + PREINIT: + D_imp_dbh(dbh); + CODE: + ST(0) = pg_db_getresult(dbh); + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index 4bd0bf0f..018d5e11 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4693,6 +4693,108 @@ int pg_db_exit_pipeline_mode (SV * dbh) } /* end of pg_db_exit_pipeline_mode */ +/* ================================================================== */ +int pg_db_pipeline_sync (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_pipeline_sync\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + TRACE_PQPIPELINESYNC; + if (0 == PQpipelineSync(imp_dbh->conn)) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_pipeline_sync (error)\n", THEADER_slow); + return 0; + } + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_pipeline_sync (1)\n", THEADER_slow); + return 1; +#else + croak("pg_pipeline_sync requires PostgreSQL 14 or later"); + return 0; +#endif + +} /* end of pg_db_pipeline_sync */ + + +/* ================================================================== */ +SV * pg_db_getresult (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + PGresult * result; + ExecStatusType status; + HV * hv; + SV * retsv; + int ntuples, nfields; + const char * errmsg; + const char * cmdtuples; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_getresult\n", THEADER_slow); + + TRACE_PQGETRESULT; + result = PQgetResult(imp_dbh->conn); + + if (NULL == result) { + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_getresult (undef)\n", THEADER_slow); + return &PL_sv_undef; + } + + status = PQresultStatus(result); + + hv = newHV(); + + (void)hv_stores(hv, "status", newSViv((IV)status)); + + errmsg = PQresultErrorMessage(result); + if (errmsg && errmsg[0] != '\0') { + (void)hv_stores(hv, "error", newSVpv(errmsg, 0)); + } + else { + (void)hv_stores(hv, "error", newSVsv(&PL_sv_undef)); + } + + ntuples = PQntuples(result); + nfields = PQnfields(result); + cmdtuples = PQcmdTuples(result); + + (void)hv_stores(hv, "ntuples", newSViv(ntuples)); + (void)hv_stores(hv, "nfields", newSViv(nfields)); + (void)hv_stores(hv, "cmdtuples", newSVpv(cmdtuples, 0)); + + /* For TUPLES_OK, extract row data */ + if (PGRES_TUPLES_OK == status && ntuples > 0) { + AV * rows = newAV(); + int r, c; + for (r = 0; r < ntuples; r++) { + AV * row = newAV(); + for (c = 0; c < nfields; c++) { + if (PQgetisnull(result, r, c)) { + av_push(row, newSVsv(&PL_sv_undef)); + } + else { + av_push(row, newSVpv(PQgetvalue(result, r, c), PQgetlength(result, r, c))); + } + } + av_push(rows, newRV_noinc((SV*)row)); + } + (void)hv_stores(hv, "rows", newRV_noinc((SV*)rows)); + } + + TRACE_PQCLEAR; + PQclear(result); + + retsv = newRV_noinc((SV*)hv); + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_getresult (status: %d)\n", THEADER_slow, (int)status); + return sv_2mortal(retsv); + +} /* end of pg_db_getresult */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/t/11pipeline.t b/t/11pipeline.t index 8174b4a5..4d422758 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 8; +plan tests => 16; my ($result, $expected, $t); @@ -65,5 +65,58 @@ eval { }; is ($@, q{}, $t); +# pg_pipeline_sync + +$t='pg_pipeline_sync fails when not in pipeline mode'; +eval { + $dbh->pg_pipeline_sync(); +}; +ok ($@, $t); + +# pg_pipeline_sync + pg_getresult: basic sync cycle + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_pipeline_sync succeeds in pipeline mode'; +$result = $dbh->pg_pipeline_sync(); +is ($result, 1, $t); + +$t='pg_getresult returns pipeline sync result'; +$result = $dbh->pg_getresult(); +is (ref $result, 'HASH', $t); + +$t='Pipeline sync result has status PGRES_PIPELINE_SYNC'; +# PGRES_PIPELINE_SYNC = 10 in libpq-fe.h +is ($result->{status}, 10, $t); + +$t='pg_getresult returns undef after sync result'; +my $null_result = $dbh->pg_getresult(); +ok (!defined $null_result, $t); + +$dbh->pg_exit_pipeline_mode(); + +# pg_getresult when not in pipeline mode + +$t='pg_getresult returns undef when no results pending'; +$null_result = $dbh->pg_getresult(); +ok (!defined $null_result, $t); + +# Verify connection is still good + +$t='Connection works after sync cycle'; +eval { + $dbh->do('SELECT 1'); +}; +is ($@, q{}, $t); + +$t='Enter/exit pipeline still works after sync cycle'; +$dbh->pg_enter_pipeline_mode(); +$dbh->pg_pipeline_sync(); +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_getresult(); # NULL +$dbh->pg_exit_pipeline_mode(); +eval { $dbh->do('SELECT 2'); }; +is ($@, q{}, $t); + cleanup_database($dbh,'test'); $dbh->disconnect; From ccc018fc8f798207496b25268e325bc6c8e5c84b Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 20:33:36 -0500 Subject: [PATCH 05/19] Add pg_send_query_params with INSERT, SELECT, and error tests Wrap PQsendQueryParams for pipeline query submission with $1/$2 parameter syntax. Tests cover INSERT pipeline with data verification, SELECT with row data extraction, and PIPELINE_ABORTED error flow with recovery after sync point. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 1 + Pg.xs | 15 ++++++ dbdimp.c | 50 +++++++++++++++++++ t/11pipeline.t | 130 ++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 195 insertions(+), 1 deletion(-) diff --git a/Pg.pm b/Pg.pm index 777fa2f7..197646e9 100644 --- a/Pg.pm +++ b/Pg.pm @@ -166,6 +166,7 @@ use 5.008001; DBD::Pg::db->install_method('pg_pipeline_status'); DBD::Pg::db->install_method('pg_pipeline_sync'); DBD::Pg::db->install_method('pg_ping'); + DBD::Pg::db->install_method('pg_send_query_params'); DBD::Pg::db->install_method('pg_putline'); DBD::Pg::db->install_method('pg_ready'); DBD::Pg::db->install_method('pg_release'); diff --git a/Pg.xs b/Pg.xs index ab344678..b620ca3b 100644 --- a/Pg.xs +++ b/Pg.xs @@ -843,6 +843,21 @@ pg_getresult(dbh) CODE: ST(0) = pg_db_getresult(dbh); +I32 +pg_send_query_params(dbh, sql, ...) + INPUT: + SV * dbh + char * sql + PREINIT: + AV * params = NULL; + CODE: + if (items > 2 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVAV) { + params = (AV *)SvRV(ST(2)); + } + RETVAL = pg_db_send_query_params(dbh, sql, params); + OUTPUT: + RETVAL + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index 018d5e11..adb9111b 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4795,6 +4795,56 @@ SV * pg_db_getresult (SV * dbh) } /* end of pg_db_getresult */ +/* ================================================================== */ +int pg_db_send_query_params (SV * dbh, char * sql, AV * params) +{ + dTHX; + D_imp_dbh(dbh); + int nparams, i, ret; + const char ** paramValues = NULL; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_query_params\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + nparams = (params) ? (int)(av_len(params) + 1) : 0; + + if (nparams > 0) { + Newz(0, paramValues, nparams, const char *); + for (i = 0; i < nparams; i++) { + SV ** svp = av_fetch(params, i, 0); + if (svp && SvOK(*svp)) { + paramValues[i] = SvPV_nolen(*svp); + } + else { + paramValues[i] = NULL; + } + } + } + + TRACE_PQSENDQUERYPARAMS; + ret = PQsendQueryParams(imp_dbh->conn, sql, nparams, NULL, + paramValues, NULL, NULL, 0); + + Safefree(paramValues); + + if (0 == ret) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_query_params (error)\n", THEADER_slow); + return 0; + } + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_query_params (1)\n", THEADER_slow); + return 1; +#else + croak("pg_send_query_params requires PostgreSQL 14 or later"); + return 0; +#endif + +} /* end of pg_db_send_query_params */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/t/11pipeline.t b/t/11pipeline.t index 4d422758..0553de08 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 16; +plan tests => 37; my ($result, $expected, $t); @@ -118,5 +118,133 @@ $dbh->pg_exit_pipeline_mode(); eval { $dbh->do('SELECT 2'); }; is ($@, q{}, $t); +# pg_send_query_params: INSERT pipeline + +$dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline(id integer, name text)'); + +$t='pg_send_query_params queues an INSERT'; +$dbh->pg_enter_pipeline_mode(); +$result = $dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline(id, name) VALUES ($1, $2)', + [1, 'Alice'] +); +is ($result, 1, $t); + +$t='pg_send_query_params queues a second INSERT'; +$result = $dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline(id, name) VALUES ($1, $2)', + [2, 'Bob'] +); +is ($result, 1, $t); + +$t='pg_pipeline_sync after INSERTs succeeds'; +$result = $dbh->pg_pipeline_sync(); +is ($result, 1, $t); + +# Collect results: INSERT1 -> NULL -> INSERT2 -> NULL -> SYNC + +$t='First INSERT result is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); # PGRES_COMMAND_OK + +$t='NULL separator after first INSERT'; +my $null_result2 = $dbh->pg_getresult(); +ok (!defined $null_result2, $t); + +$t='Second INSERT result is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); + +$t='NULL separator after second INSERT'; +$null_result2 = $dbh->pg_getresult(); +ok (!defined $null_result2, $t); + +$t='PIPELINE_SYNC result'; +$result = $dbh->pg_getresult(); +ok (defined $result && $result->{status} == 10, $t); + +$dbh->pg_exit_pipeline_mode(); + +$t='Pipeline INSERTs committed the data'; +my $rows = $dbh->selectall_arrayref( + 'SELECT id, name FROM dbd_pg_test_pipeline ORDER BY id' +); +$expected = [[1, 'Alice'], [2, 'Bob']]; +is_deeply ($rows, $expected, $t); + +# pg_send_query_params: SELECT in pipeline + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_query_params queues a SELECT'; +$result = $dbh->pg_send_query_params( + 'SELECT id, name FROM dbd_pg_test_pipeline WHERE id = $1', + [1] +); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +$t='SELECT result is TUPLES_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, $t); + +$t='SELECT result has correct ntuples'; +is ($result->{ntuples}, 1, $t); + +$t='SELECT result has correct nfields'; +is ($result->{nfields}, 2, $t); + +$t='SELECT result rows contain correct data'; +is_deeply ($result->{rows}, [['1', 'Alice']], $t); + +$null_result = $dbh->pg_getresult(); # NULL separator +$result = $dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + +# Pipeline error handling + +$dbh->pg_enter_pipeline_mode(); + +$t='Send a query that will fail (syntax error)'; +$result = $dbh->pg_send_query_params('GARBAGE SQL', []); +is ($result, 1, $t); + +$t='Send a valid query after the bad one'; +$result = $dbh->pg_send_query_params('SELECT 1 AS x', []); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +$t='First result is FATAL_ERROR'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 7, $t); + +$t='Error result has error message'; +ok (defined $result->{error}, $t); + +$null_result = $dbh->pg_getresult(); # NULL separator + +$t='Second result is PIPELINE_ABORTED'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 11, $t); + +$null_result = $dbh->pg_getresult(); # NULL separator + +$t='PIPELINE_SYNC after error recovery'; +$result = $dbh->pg_getresult(); +ok (defined $result && $result->{status} == 10, $t); + +$dbh->pg_exit_pipeline_mode(); + +# Pipeline errors abort the implicit transaction, so we must rollback +$dbh->do('ROLLBACK'); + +$t='Connection works after pipeline error'; +eval { + $dbh->do('SELECT 1'); +}; +is ($@, q{}, $t); + cleanup_database($dbh,'test'); $dbh->disconnect; From 80518d437858a5f79dfdb46059a3bc63ffb660c5 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 21:42:46 -0500 Subject: [PATCH 06/19] Add pg_send_prepare, pg_send_query_prepared, pg_send_flush_request, pg_flush pg_send_prepare wraps PQsendPrepare for pipelining PREPARE commands. pg_send_query_prepared wraps PQsendQueryPrepared for executing prepared statements in pipeline mode. pg_send_flush_request wraps PQsendFlushRequest to retrieve results without a sync point. pg_flush wraps PQflush for manual output buffer management. Tests cover prepared statement pipeline (PREPARE + 2 EXECUTEs), and flush request to retrieve results without a sync point. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.h | 1 + Pg.pm | 4 ++ Pg.xs | 44 +++++++++++++++++ dbdimp.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++ dbdimp.h | 1 + t/11pipeline.t | 89 ++++++++++++++++++++++++++++++++- 6 files changed, 268 insertions(+), 1 deletion(-) diff --git a/Pg.h b/Pg.h index 19461607..0effa7d8 100644 --- a/Pg.h +++ b/Pg.h @@ -133,6 +133,7 @@ DBISTATE_DECLARE; #define TRACE_PQEXECPREPARED TRACE_XX "%sPQexecPrepared\n", THEADER_slow) #define TRACE_PQEXITPIPELINEMODE TRACE_XX "%sPQexitPipelineMode\n", THEADER_slow) #define TRACE_PQFINISH TRACE_XX "%sPQfinish\n", THEADER_slow) +#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow) #define TRACE_PQFMOD TRACE_XX "%sPQfmod\n", THEADER_slow) #define TRACE_PQFNAME TRACE_XX "%sPQfname\n", THEADER_slow) #define TRACE_PQFREECANCEL TRACE_XX "%sPQfreeCancel\n", THEADER_slow) diff --git a/Pg.pm b/Pg.pm index 197646e9..f3e7cf56 100644 --- a/Pg.pm +++ b/Pg.pm @@ -162,6 +162,7 @@ use 5.008001; DBD::Pg::db->install_method('pg_putcopyend'); DBD::Pg::db->install_method('pg_enter_pipeline_mode'); DBD::Pg::db->install_method('pg_exit_pipeline_mode'); + DBD::Pg::db->install_method('pg_flush'); DBD::Pg::db->install_method('pg_getresult'); DBD::Pg::db->install_method('pg_pipeline_status'); DBD::Pg::db->install_method('pg_pipeline_sync'); @@ -174,6 +175,9 @@ use 5.008001; DBD::Pg::db->install_method('pg_rollback_to'); DBD::Pg::db->install_method('pg_savepoint'); DBD::Pg::db->install_method('pg_send_cancel'); + DBD::Pg::db->install_method('pg_send_flush_request'); + DBD::Pg::db->install_method('pg_send_prepare'); + DBD::Pg::db->install_method('pg_send_query_prepared'); DBD::Pg::db->install_method('pg_server_trace'); DBD::Pg::db->install_method('pg_server_untrace'); DBD::Pg::db->install_method('pg_type_info'); diff --git a/Pg.xs b/Pg.xs index b620ca3b..a1efbdd0 100644 --- a/Pg.xs +++ b/Pg.xs @@ -858,6 +858,50 @@ pg_send_query_params(dbh, sql, ...) OUTPUT: RETVAL +I32 +pg_send_prepare(dbh, name, sql) + INPUT: + SV * dbh + char * name + char * sql + CODE: + RETVAL = pg_db_send_prepare(dbh, name, sql); + OUTPUT: + RETVAL + +I32 +pg_send_query_prepared(dbh, name, ...) + INPUT: + SV * dbh + char * name + PREINIT: + AV * params = NULL; + CODE: + if (items > 2 && SvROK(ST(2)) && SvTYPE(SvRV(ST(2))) == SVt_PVAV) { + params = (AV *)SvRV(ST(2)); + } + RETVAL = pg_db_send_query_prepared(dbh, name, params); + OUTPUT: + RETVAL + +I32 +pg_send_flush_request(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_send_flush_request(dbh); + OUTPUT: + RETVAL + +I32 +pg_flush(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_flush(dbh); + OUTPUT: + RETVAL + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index adb9111b..f9dce012 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4845,6 +4845,136 @@ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) } /* end of pg_db_send_query_params */ +/* ================================================================== */ +int pg_db_send_prepare (SV * dbh, char * name, char * sql) +{ + dTHX; + D_imp_dbh(dbh); + int ret; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_prepare\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + TRACE_PQSENDPREPARE; + ret = PQsendPrepare(imp_dbh->conn, name, sql, 0, NULL); + if (0 == ret) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_prepare (error)\n", THEADER_slow); + return 0; + } + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_prepare (1)\n", THEADER_slow); + return 1; +#else + croak("pg_send_prepare requires PostgreSQL 14 or later"); + return 0; +#endif +} /* end of pg_db_send_prepare */ + + +/* ================================================================== */ +int pg_db_send_query_prepared (SV * dbh, char * name, AV * params) +{ + dTHX; + D_imp_dbh(dbh); + int nparams, i, ret; + const char ** paramValues = NULL; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_query_prepared\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + nparams = (params) ? (int)(av_len(params) + 1) : 0; + if (nparams > 0) { + Newz(0, paramValues, nparams, const char *); + for (i = 0; i < nparams; i++) { + SV ** svp = av_fetch(params, i, 0); + if (svp && SvOK(*svp)) { + paramValues[i] = SvPV_nolen(*svp); + } + else { + paramValues[i] = NULL; + } + } + } + + TRACE_PQSENDQUERYPREPARED; + ret = PQsendQueryPrepared(imp_dbh->conn, name, nparams, + paramValues, NULL, NULL, 0); + Safefree(paramValues); + if (0 == ret) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_query_prepared (error)\n", THEADER_slow); + return 0; + } + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_query_prepared (1)\n", THEADER_slow); + return 1; +#else + croak("pg_send_query_prepared requires PostgreSQL 14 or later"); + return 0; +#endif +} /* end of pg_db_send_query_prepared */ + + +/* ================================================================== */ +int pg_db_send_flush_request (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_flush_request\n", THEADER_slow); + +#ifdef DBDPG_HAS_PIPELINE + if (0 == imp_dbh->pipeline) { + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, "pg_send_flush_request requires pipeline mode"); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_flush_request (not in pipeline mode)\n", THEADER_slow); + return 0; + } + TRACE_PQSENDFLUSHREQUEST; + if (0 == PQsendFlushRequest(imp_dbh->conn)) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_flush_request (error)\n", THEADER_slow); + return 0; + } + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_send_flush_request (1)\n", THEADER_slow); + return 1; +#else + croak("pg_send_flush_request requires PostgreSQL 14 or later"); + return 0; +#endif +} /* end of pg_db_send_flush_request */ + + +/* ================================================================== */ +int pg_db_flush (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + int flush_status; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_flush\n", THEADER_slow); + + TRACE_PQFLUSH; + flush_status = PQflush(imp_dbh->conn); + + if (-1 == flush_status) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_flush (error)\n", THEADER_slow); + return -1; + } + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_flush (%d)\n", THEADER_slow, flush_status); + return flush_status; + +} /* end of pg_db_flush */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/dbdimp.h b/dbdimp.h index 058506d2..30b7719e 100644 --- a/dbdimp.h +++ b/dbdimp.h @@ -252,6 +252,7 @@ int pg_db_send_query_params (SV *dbh, char *sql, AV *params); int pg_db_send_query_prepared (SV *dbh, char *name, AV *params); int pg_db_send_prepare (SV *dbh, char *name, char *sql); int pg_db_send_flush_request (SV *dbh); +int pg_db_flush (SV *dbh); SV * pg_db_error_field (SV *dbh, char * fieldname); diff --git a/t/11pipeline.t b/t/11pipeline.t index 0553de08..8bc5c186 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 37; +plan tests => 49; my ($result, $expected, $t); @@ -246,5 +246,92 @@ eval { }; is ($@, q{}, $t); +# pg_send_prepare + pg_send_query_prepared +# The ROLLBACK above cleared the aborted transaction, which also rolled back +# the temp table and its data. Recreate and repopulate for the next tests. +$dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline(id integer, name text)'); +$dbh->do(q{INSERT INTO dbd_pg_test_pipeline(id, name) VALUES (1, 'Alice'), (2, 'Bob')}); + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_prepare queues a PREPARE'; +$result = $dbh->pg_send_prepare( + 'pipeline_insert', + 'INSERT INTO dbd_pg_test_pipeline(id, name) VALUES ($1, $2)' +); +is ($result, 1, $t); + +$t='pg_send_query_prepared queues execution'; +$result = $dbh->pg_send_query_prepared('pipeline_insert', [3, 'Charlie']); +is ($result, 1, $t); + +$t='pg_send_query_prepared queues second execution'; +$result = $dbh->pg_send_query_prepared('pipeline_insert', [4, 'Diana']); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +# Collect: PREPARE -> NULL -> INSERT1 -> NULL -> INSERT2 -> NULL -> SYNC +$t='PREPARE result is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL + +$t='First prepared INSERT is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL + +$t='Second prepared INSERT is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL + +$t='PIPELINE_SYNC after prepared statements'; +$result = $dbh->pg_getresult(); +ok (defined $result && $result->{status} == 10, $t); + +$dbh->pg_exit_pipeline_mode(); + +$t='Prepared statement data committed'; +$rows = $dbh->selectall_arrayref( + 'SELECT id, name FROM dbd_pg_test_pipeline ORDER BY id' +); +$expected = [[1, 'Alice'], [2, 'Bob'], [3, 'Charlie'], [4, 'Diana']]; +is_deeply ($rows, $expected, $t); + +# Deallocate the prepared statement +$dbh->do('DEALLOCATE pipeline_insert'); + +# pg_send_flush_request + +$t='pg_send_flush_request fails outside pipeline mode'; +eval { + $dbh->pg_send_flush_request(); +}; +ok ($@, $t); + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_flush_request succeeds in pipeline mode'; +$dbh->pg_send_query_params('SELECT 42 AS answer', []); +$result = $dbh->pg_send_flush_request(); +is ($result, 1, $t); + +# Flush the output buffer and collect result +$dbh->pg_flush(); + +$t='Flush request lets us retrieve result without sync'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, $t); +is_deeply ($result->{rows}, [['42']], "$t data correct"); + +$dbh->pg_getresult(); # NULL separator + +# Need a sync to cleanly exit +$dbh->pg_pipeline_sync(); +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + cleanup_database($dbh,'test'); $dbh->disconnect; From 1e2664a451511da68afcbcbe0cea61b366030f1f Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 21:53:14 -0500 Subject: [PATCH 07/19] Add COPY guards, exit-with-pending tests, multi-cycle and large pipeline Reject pg_putcopydata/pg_putcopyend in pipeline mode with clear croak. Test exit_pipeline_mode failure with unconsumed results. Add multiple pipeline sync cycles without exiting. Test 100-INSERT large pipeline with result draining. Add dbd_pg_test_pipeline to cleanup list. Co-Authored-By: Claude Opus 4.6 (1M context) --- dbdimp.c | 8 +++ t/11pipeline.t | 115 +++++++++++++++++++++++++++++++++++++++++- t/dbdpg_test_setup.pl | 1 + 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/dbdimp.c b/dbdimp.c index f9dce012..0c3cc030 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4515,6 +4515,10 @@ int pg_db_putcopydata (SV * dbh, SV * dataline) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_putcopydata\n", THEADER_slow); + /* Cannot use COPY in pipeline mode */ + if (imp_dbh->pipeline) + croak("pg_putcopydata cannot be used in pipeline mode\n"); + /* We must be in COPY IN state */ if (PGRES_COPY_IN != imp_dbh->copystate && PGRES_COPY_BOTH != imp_dbh->copystate) croak("pg_putcopydata can only be called directly after issuing a COPY FROM command\n"); @@ -4561,6 +4565,10 @@ int pg_db_putcopyend (SV * dbh) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_putcopyend\n", THEADER_slow); + /* Cannot use COPY in pipeline mode */ + if (imp_dbh->pipeline) + croak("pg_putcopyend cannot be used in pipeline mode\n"); + if (0 == imp_dbh->copystate) { warn("pg_putcopyend cannot be called until a COPY is issued"); if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend (warning: copystate is 0)\n", THEADER_slow); diff --git a/t/11pipeline.t b/t/11pipeline.t index 8bc5c186..5239705e 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 49; +plan tests => 60; my ($result, $expected, $t); @@ -333,5 +333,118 @@ $dbh->pg_pipeline_sync(); $dbh->pg_getresult(); # PIPELINE_SYNC $dbh->pg_exit_pipeline_mode(); +# Guards: COPY rejected in pipeline mode + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_putcopydata croaks in pipeline mode'; +eval { + $dbh->pg_putcopydata("data\n"); +}; +like ($@, qr{pipeline}, $t); + +$t='pg_putcopyend croaks in pipeline mode'; +eval { + $dbh->pg_putcopyend(); +}; +like ($@, qr{pipeline}, $t); + +$dbh->pg_exit_pipeline_mode(); + +# Exit with pending results + +$dbh->pg_enter_pipeline_mode(); +$dbh->pg_send_query_params('SELECT 1', []); +$dbh->pg_pipeline_sync(); + +$t='pg_exit_pipeline_mode fails with unconsumed results'; +eval { + $result = $dbh->pg_exit_pipeline_mode(); +}; +ok ($@, $t); + +# Drain results before exiting +$dbh->pg_getresult(); # SELECT result +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC + +$t='pg_exit_pipeline_mode succeeds after draining results'; +$result = $dbh->pg_exit_pipeline_mode(); +is ($result, 1, $t); + +# Multiple pipeline cycles without exiting + +$dbh->pg_exit_pipeline_mode(); +eval { $dbh->do('DROP TABLE IF EXISTS dbd_pg_test_pipeline_multi'); }; +$dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline_multi(id integer, name text)'); + +$dbh->pg_enter_pipeline_mode(); + +$t='First pipeline cycle: INSERT'; +$dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline_multi(id, name) VALUES ($1, $2)', + [10, 'Cycle1'] +); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC + +$t='Second pipeline cycle: SELECT'; +$dbh->pg_send_query_params( + 'SELECT name FROM dbd_pg_test_pipeline_multi WHERE id = $1', + [10] +); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, $t); +is_deeply ($result->{rows}, [['Cycle1']], "$t correct data"); +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC + +$dbh->pg_exit_pipeline_mode(); + +# Large pipeline: 100 INSERTs + +$dbh->pg_exit_pipeline_mode() if $dbh->pg_pipeline_status(); +eval { $dbh->do('DROP TABLE IF EXISTS dbd_pg_test_pipeline_large'); }; +$dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline_large(id integer, name text)'); + +$dbh->pg_enter_pipeline_mode(); + +$t='Large pipeline: 100 INSERTs queued'; +my $large_ok = 1; +for my $i (1..100) { + my $r = $dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline_large(id, name) VALUES ($1, $2)', + [$i, "Row$i"] + ); + if (!$r) { $large_ok = 0; last; } +} +ok ($large_ok, $t); + +$dbh->pg_pipeline_sync(); + +$t='Large pipeline: drain all 100 results'; +my $drain_ok = 1; +for my $i (1..100) { + my $r = $dbh->pg_getresult(); + if (!$r || $r->{status} != 1) { $drain_ok = 0; last; } + $dbh->pg_getresult(); # NULL separator +} +ok ($drain_ok, $t); + +$result = $dbh->pg_getresult(); # PIPELINE_SYNC +ok (defined $result && $result->{status} == 10, "$t sync"); + +$dbh->pg_exit_pipeline_mode(); + +$t='All 100 rows inserted'; +$rows = $dbh->selectall_arrayref('SELECT count(*) FROM dbd_pg_test_pipeline_large'); +is ($rows->[0][0], 100, $t); + cleanup_database($dbh,'test'); $dbh->disconnect; diff --git a/t/dbdpg_test_setup.pl b/t/dbdpg_test_setup.pl index cf7b4b5a..151d30ea 100644 --- a/t/dbdpg_test_setup.pl +++ b/t/dbdpg_test_setup.pl @@ -58,6 +58,7 @@ 'dbd_pg_test1', 'dbd_pg_test', 'dbd_pg_test_geom', + 'dbd_pg_test_pipeline', ); my @sequences = From b4ccb0ae84fc151f6c9a23353094d52c4f3e4a1e Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 07:26:38 -0500 Subject: [PATCH 08/19] Add pipeline mode POD documentation and fix author tests Document all pipeline methods in Pg.pm POD with examples and return values. Fix spellcheck (nfields, ntuples, PQpipelineStatus), add missing TRACE macros in pg_db_getresult, update MANIFEST and README.dev for t/11pipeline.t. Co-Authored-By: Claude Opus 4.6 (1M context) --- MANIFEST | 1 + Pg.pm | 119 ++++++++++++++++++++++++++++++++++++++++++++++ README.dev | 2 + dbdimp.c | 6 +++ t/11pipeline.t | 6 +-- t/99_spellcheck.t | 3 ++ 6 files changed, 134 insertions(+), 3 deletions(-) diff --git a/MANIFEST b/MANIFEST index e6d9f2b3..84d523d3 100644 --- a/MANIFEST +++ b/MANIFEST @@ -46,6 +46,7 @@ t/07copy.t t/08async.t t/09arrays.t t/10_pg_error_field.t +t/11pipeline.t t/12placeholders.t t/20savepoints.t t/30unicode.t diff --git a/Pg.pm b/Pg.pm index f3e7cf56..6d868173 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4449,6 +4449,125 @@ When you are finished with pg_putcopydata, call pg_putcopyend to let the server that you are done, and it will return to a normal, non-COPY state. Returns a 1 on success. This method will fail if called when not in COPY IN mode. +=head2 Pipeline Mode + +Pipeline mode (PostgreSQL 14+) batches multiple queries into fewer +network round trips. Instead of waiting for each query's result before sending +the next, queries are queued and their results collected after a synchronization +point. This can dramatically reduce latency for workloads that issue many +independent queries. + +Pipeline mode uses the extended query protocol exclusively. COPY operations +are not supported in pipeline mode. + +=head3 B + +Enters pipeline mode on the connection. The connection must be idle. +Entering pipeline mode when already in pipeline mode is a no-op. +Requires libpq from PostgreSQL 14 or later; croaks if compiled against +an older version. Returns 1 on success, 0 on failure. + + $dbh->pg_enter_pipeline_mode(); + +=head3 B + +Exits pipeline mode. Fails if there are unconsumed results; call +L to drain all results before exiting. Exiting when +not in pipeline mode is a no-op. Returns 1 on success, 0 on failure. + + $dbh->pg_exit_pipeline_mode(); + +=head3 B + +Returns the current pipeline status as an integer: + + 0 = not in pipeline mode + 1 = pipeline mode active + 2 = pipeline aborted (error occurred, waiting for sync) + + my $status = $dbh->pg_pipeline_status(); + +=head3 B + +Marks a synchronization point in the pipeline. Each sync point delimits +an implicit transaction: queries between sync points are atomically +committed or rolled back. Returns 1 on success. Croaks if not in +pipeline mode. + + $dbh->pg_pipeline_sync(); + +=head3 B + +Retrieves the next result from the connection. Returns a hashref or +C when no more results are available for the current query. + +The hashref contains: + + status - integer status code + error - error message string, or undef + ntuples - number of rows (for SELECT results) + nfields - number of columns + cmdtuples - affected row count string (for DML) + rows - arrayref of arrayrefs (only present for SELECT results) + +Common status values: 1 (COMMAND_OK), 2 (TUPLES_OK), 7 (FATAL_ERROR), +10 (PIPELINE_SYNC), 11 (PIPELINE_ABORTED). + +In pipeline mode, results arrive in this pattern for each query: + + result -> undef -> result -> undef -> ... -> PIPELINE_SYNC + +Example: + + $dbh->pg_enter_pipeline_mode(); + $dbh->pg_send_query_params('INSERT INTO t VALUES ($1)', [1]); + $dbh->pg_send_query_params('SELECT * FROM t', []); + $dbh->pg_pipeline_sync(); + + my $ins = $dbh->pg_getresult(); # {status => 1} + $dbh->pg_getresult(); # undef (separator) + my $sel = $dbh->pg_getresult(); # {status => 2, rows => [...]} + my @rows = @{ $sel->{rows} }; + $dbh->pg_getresult(); # undef (separator) + $dbh->pg_getresult(); # {status => 10} (PIPELINE_SYNC) + + $dbh->pg_exit_pipeline_mode(); + +=head3 B + +Sends a parameterized query into the pipeline without waiting for results. +Uses C<$1>, C<$2> placeholder syntax. Returns 1 on success, 0 on error. + + $dbh->pg_send_query_params( + 'INSERT INTO t(id, name) VALUES ($1, $2)', + [42, 'Alice'] + ); + +=head3 B + +Sends a PREPARE into the pipeline. The prepared statement can be executed +with L in the same or a later pipeline. +Returns 1 on success. + + $dbh->pg_send_prepare('my_insert', 'INSERT INTO t VALUES ($1, $2)'); + +=head3 B + +Executes a previously prepared statement in the pipeline. +Returns 1 on success. + + $dbh->pg_send_query_prepared('my_insert', [1, 'Alice']); + +=head3 B + +Sends a flush request to the server without a synchronization point. This +makes buffered results available to the client without creating a transaction +boundary. Call L afterward to push the request to the server. +Returns 1 on success. + + $dbh->pg_send_flush_request(); + $dbh->pg_flush(); + =head2 Postgres limits For convenience, DBD::Pg can export certain constants representing the limits of diff --git a/README.dev b/README.dev index 858da741..0e4adb81 100644 --- a/README.dev +++ b/README.dev @@ -199,6 +199,8 @@ t/09arrays.t - Tests array manipulation. t/10_pg_error_field.t - Tests $dbh->pg_error_field function +t/11pipeline.t - Tests pipeline mode methods. + t/12placeholders.t - Tests placeholders. t/20savepoints.t - Test savepoints. diff --git a/dbdimp.c b/dbdimp.c index 0c3cc030..900e4853 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4751,12 +4751,14 @@ SV * pg_db_getresult (SV * dbh) return &PL_sv_undef; } + TRACE_PQRESULTSTATUS; status = PQresultStatus(result); hv = newHV(); (void)hv_stores(hv, "status", newSViv((IV)status)); + TRACE_PQRESULTERRORMESSAGE; errmsg = PQresultErrorMessage(result); if (errmsg && errmsg[0] != '\0') { (void)hv_stores(hv, "error", newSVpv(errmsg, 0)); @@ -4765,8 +4767,11 @@ SV * pg_db_getresult (SV * dbh) (void)hv_stores(hv, "error", newSVsv(&PL_sv_undef)); } + TRACE_PQNTUPLES; ntuples = PQntuples(result); + TRACE_PQNFIELDS; nfields = PQnfields(result); + TRACE_PQCMDTUPLES; cmdtuples = PQcmdTuples(result); (void)hv_stores(hv, "ntuples", newSViv(ntuples)); @@ -4780,6 +4785,7 @@ SV * pg_db_getresult (SV * dbh) for (r = 0; r < ntuples; r++) { AV * row = newAV(); for (c = 0; c < nfields; c++) { + TRACE_PQGETISNULL; if (PQgetisnull(result, r, c)) { av_push(row, newSVsv(&PL_sv_undef)); } diff --git a/t/11pipeline.t b/t/11pipeline.t index 5239705e..d84ff593 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -137,7 +137,7 @@ $result = $dbh->pg_send_query_params( ); is ($result, 1, $t); -$t='pg_pipeline_sync after INSERTs succeeds'; +$t='pg_pipeline_sync after INSERTs succeeds'; # nospellcheck $result = $dbh->pg_pipeline_sync(); is ($result, 1, $t); @@ -165,7 +165,7 @@ ok (defined $result && $result->{status} == 10, $t); $dbh->pg_exit_pipeline_mode(); -$t='Pipeline INSERTs committed the data'; +$t='Pipeline INSERTs committed the data'; # nospellcheck my $rows = $dbh->selectall_arrayref( 'SELECT id, name FROM dbd_pg_test_pipeline ORDER BY id' ); @@ -415,7 +415,7 @@ $dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline_large(id integer, name text)'); $dbh->pg_enter_pipeline_mode(); -$t='Large pipeline: 100 INSERTs queued'; +$t='Large pipeline: 100 INSERTs queued'; # nospellcheck my $large_ok = 1; for my $i (1..100) { my $r = $dbh->pg_send_query_params( diff --git a/t/99_spellcheck.t b/t/99_spellcheck.t index 2b8081bd..7a85dbd2 100644 --- a/t/99_spellcheck.t +++ b/t/99_spellcheck.t @@ -572,6 +572,7 @@ netstat newfh newSVpv Newz +nfields nmake nntp nohead @@ -581,6 +582,7 @@ noprefix noreturn nosetup NOSUCH +ntuples Server nullable NULLABLE @@ -680,6 +682,7 @@ PQexec PQexecParams PQexecPrepared PQoids +PQpipelineStatus PQprepare PQprotocolVersion PQresultErrorField From 0274ad66819ae83fa19c9519bdea57dfb532c865 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 08:48:07 -0500 Subject: [PATCH 09/19] Initialize pipeline field, remove unused XS var, fix #ifdef scoping Initialize imp_dbh->pipeline = 0 in dbd_db_login6 alongside other fields. Remove unused D_imp_dbh from pg_getresult XS binding. Move variable declarations inside #ifdef DBDPG_HAS_PIPELINE in send functions to avoid unused variable warnings on PG < 14 builds. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.xs | 2 -- dbdimp.c | 9 +++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/Pg.xs b/Pg.xs index a1efbdd0..6c38730d 100644 --- a/Pg.xs +++ b/Pg.xs @@ -838,8 +838,6 @@ pg_pipeline_sync(dbh) void pg_getresult(dbh) SV * dbh - PREINIT: - D_imp_dbh(dbh); CODE: ST(0) = pg_db_getresult(dbh); diff --git a/dbdimp.c b/dbdimp.c index 900e4853..71655935 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -339,6 +339,7 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha imp_dbh->switch_prepared = 2; imp_dbh->copystate = 0; imp_dbh->copybinary = DBDPG_FALSE; + imp_dbh->pipeline = 0; imp_dbh->pg_errorlevel = 1; /* Default */ imp_dbh->async_status = DBH_NO_ASYNC; imp_dbh->async_sth = NULL; @@ -4814,12 +4815,12 @@ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) { dTHX; D_imp_dbh(dbh); - int nparams, i, ret; - const char ** paramValues = NULL; if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_query_params\n", THEADER_slow); #ifdef DBDPG_HAS_PIPELINE + int nparams, i, ret; + const char ** paramValues = NULL; nparams = (params) ? (int)(av_len(params) + 1) : 0; if (nparams > 0) { @@ -4892,12 +4893,12 @@ int pg_db_send_query_prepared (SV * dbh, char * name, AV * params) { dTHX; D_imp_dbh(dbh); - int nparams, i, ret; - const char ** paramValues = NULL; if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_query_prepared\n", THEADER_slow); #ifdef DBDPG_HAS_PIPELINE + int nparams, i, ret; + const char ** paramValues = NULL; nparams = (params) ? (int)(av_len(params) + 1) : 0; if (nparams > 0) { Newz(0, paramValues, nparams, const char *); From dde3b47087b87681d4cbd5a9eae70c5cbc1291c5 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 09:00:02 -0500 Subject: [PATCH 10/19] Add pg_flush POD, Changes entry, deadlock note, getcopydata guard Add standalone pg_flush POD section. Add pipeline mode to Changes for v3.21.0. Document blocking mode deadlock risk for large pipelines. Add pg_getcopydata pipeline guard matching putcopydata/putcopyend. Co-Authored-By: Claude Opus 4.6 (1M context) --- Changes | 7 +++++++ Pg.pm | 21 +++++++++++++++++++++ dbdimp.c | 4 ++++ t/11pipeline.t | 9 ++++++++- 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/Changes b/Changes index d0d94baa..d496c537 100644 --- a/Changes +++ b/Changes @@ -2,6 +2,13 @@ Changes for the DBD::Pg module RT refers to rt.cpan.org +Version 3.21.0 (not yet released) + + - Add pipeline mode support for PostgreSQL 14+ (pg_enter_pipeline_mode, + pg_exit_pipeline_mode, pg_pipeline_sync, pg_pipeline_status, + pg_getresult, pg_send_query_params, pg_send_prepare, + pg_send_query_prepared, pg_send_flush_request) + Version 3.20.0 (released March 19, 2026) - Cleanup and improve the statistics_info() function. diff --git a/Pg.pm b/Pg.pm index 6d868173..fc5b4c49 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4460,6 +4460,12 @@ independent queries. Pipeline mode uses the extended query protocol exclusively. COPY operations are not supported in pipeline mode. +B In blocking mode (the default), very large pipelines risk deadlock +if both the client send buffer and server result buffer fill simultaneously. +For large pipelines, use L and poll C for readiness +between sends, or keep pipelines to a reasonable size with periodic +L calls. + =head3 B Enters pipeline mode on the connection. The connection must be idle. @@ -4568,6 +4574,21 @@ Returns 1 on success. $dbh->pg_send_flush_request(); $dbh->pg_flush(); +=head3 B + +Flushes the libpq output buffer. Wraps C directly. Used after +L to push the request to the server, or during +non-blocking COPY operations after L. + +Return values: + + 0 = all data flushed successfully + 1 = data still pending (caller should poll socket for write-readiness + and call pg_flush again) + -1 = error + + my $status = $dbh->pg_flush(); + =head2 Postgres limits For convenience, DBD::Pg can export certain constants representing the limits of diff --git a/dbdimp.c b/dbdimp.c index 71655935..f90d8873 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4446,6 +4446,10 @@ int pg_db_getcopydata (SV * dbh, SV * dataline, int async) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_getcopydata\n", THEADER_slow); + /* Cannot use COPY in pipeline mode */ + if (imp_dbh->pipeline) + croak("pg_getcopydata cannot be used in pipeline mode\n"); + /* We must be in COPY OUT state */ if (PGRES_COPY_OUT != imp_dbh->copystate && PGRES_COPY_BOTH != imp_dbh->copystate) croak("pg_getcopydata can only be called directly after issuing a COPY TO command\n"); diff --git a/t/11pipeline.t b/t/11pipeline.t index d84ff593..cbde08c7 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 60; +plan tests => 61; my ($result, $expected, $t); @@ -349,6 +349,13 @@ eval { }; like ($@, qr{pipeline}, $t); +$t='pg_getcopydata croaks in pipeline mode'; +my $copy_buf; +eval { + $dbh->pg_getcopydata($copy_buf); +}; +like ($@, qr{pipeline}, $t); + $dbh->pg_exit_pipeline_mode(); # Exit with pending results From bfa16ef08044bd67980257ee1ec760ce2bf86935 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 09:03:36 -0500 Subject: [PATCH 11/19] Add missing pipeline test coverage Test SELECT returning zero rows (no rows key in result), NULL/undef parameter values, pg_pipeline_status == 2 during aborted state, pg_flush return value, pg_send_query_params without params argument, and two consecutive pipeline syncs without draining between them. Co-Authored-By: Claude Opus 4.6 (1M context) --- t/11pipeline.t | 128 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 1 deletion(-) diff --git a/t/11pipeline.t b/t/11pipeline.t index cbde08c7..047ea45a 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 61; +plan tests => 78; my ($result, $expected, $t); @@ -453,5 +453,131 @@ $t='All 100 rows inserted'; $rows = $dbh->selectall_arrayref('SELECT count(*) FROM dbd_pg_test_pipeline_large'); is ($rows->[0][0], 100, $t); +# SELECT returning zero rows + +$dbh->pg_enter_pipeline_mode(); + +$t='SELECT zero rows returns TUPLES_OK'; +$dbh->pg_send_query_params( + 'SELECT id, name FROM dbd_pg_test_pipeline WHERE id = $1', + [99999] +); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, $t); +is ($result->{ntuples}, 0, "$t with ntuples 0"); +is ($result->{nfields}, 2, "$t with correct nfields"); +ok (!exists $result->{rows}, "$t has no rows key"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + +# NULL parameter values + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_query_params handles undef (NULL) params'; +$result = $dbh->pg_send_query_params( + 'SELECT $1::text AS val', + [undef] +); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t returns TUPLES_OK"); +is ($result->{rows}[0][0], undef, "$t NULL value returned as undef"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + +# pg_pipeline_status returns 2 (aborted) during error + +$dbh->pg_enter_pipeline_mode(); + +$dbh->pg_send_query_params('GARBAGE', []); +$dbh->pg_pipeline_sync(); + +$dbh->pg_getresult(); # FATAL_ERROR +$dbh->pg_getresult(); # NULL + +$t='pg_pipeline_status returns 2 (aborted) after error'; +$status = $dbh->pg_pipeline_status(); +is ($status, 2, $t); + +$dbh->pg_getresult(); # PIPELINE_SYNC + +$t='pg_pipeline_status returns 1 (on) after sync clears abort'; +$status = $dbh->pg_pipeline_status(); +is ($status, 1, $t); + +$dbh->pg_exit_pipeline_mode(); +$dbh->do('ROLLBACK'); + +# pg_flush return value + +$dbh->pg_enter_pipeline_mode(); + +$dbh->pg_send_query_params('SELECT 1', []); +$dbh->pg_send_flush_request(); + +$t='pg_flush returns 0 (flushed) or 1 (pending)'; +my $flush_result = $dbh->pg_flush(); +ok ($flush_result == 0 || $flush_result == 1, $t); + +# Drain +$dbh->pg_getresult(); # SELECT result +$dbh->pg_getresult(); # NULL +$dbh->pg_pipeline_sync(); +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + +# pg_send_query_params with no params + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_query_params works without params argument'; +$result = $dbh->pg_send_query_params('SELECT 42 AS answer'); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t returns TUPLES_OK"); +is_deeply ($result->{rows}, [['42']], "$t correct data"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC +$dbh->pg_exit_pipeline_mode(); + +# Two consecutive syncs + +$dbh->pg_enter_pipeline_mode(); + +$dbh->pg_send_query_params('SELECT 1 AS a', []); +$dbh->pg_pipeline_sync(); +$dbh->pg_send_query_params('SELECT 2 AS b', []); +$dbh->pg_pipeline_sync(); + +# Drain first pipeline: result -> NULL -> SYNC +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, 'First sync group: TUPLES_OK'); +$dbh->pg_getresult(); # NULL +$result = $dbh->pg_getresult(); +is ($result->{status}, 10, 'First PIPELINE_SYNC'); + +# Drain second pipeline: result -> NULL -> SYNC +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, 'Second sync group: TUPLES_OK'); +$dbh->pg_getresult(); # NULL +$result = $dbh->pg_getresult(); +is ($result->{status}, 10, 'Second PIPELINE_SYNC'); + +$dbh->pg_exit_pipeline_mode(); + cleanup_database($dbh,'test'); $dbh->disconnect; From 8327437fb6ef6ef0a0b44c0915e28fb867de9f13 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 09:08:45 -0500 Subject: [PATCH 12/19] Fix Changes entry format for release test compatibility Use single-line entry format that the Changes file parser in t/00_release.t can handle. Keep entry under current version. Co-Authored-By: Claude Opus 4.6 (1M context) --- Changes | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/Changes b/Changes index d496c537..94990b82 100644 --- a/Changes +++ b/Changes @@ -2,15 +2,10 @@ Changes for the DBD::Pg module RT refers to rt.cpan.org -Version 3.21.0 (not yet released) - - - Add pipeline mode support for PostgreSQL 14+ (pg_enter_pipeline_mode, - pg_exit_pipeline_mode, pg_pipeline_sync, pg_pipeline_status, - pg_getresult, pg_send_query_params, pg_send_prepare, - pg_send_query_prepared, pg_send_flush_request) - Version 3.20.0 (released March 19, 2026) + - Add pipeline mode support for PostgreSQL 14+ + - Cleanup and improve the statistics_info() function. We no longer return "clustered" as a "TYPE" [Greg Sabino Mullane] From 94f9a3a3ad808ef84e98095a2099e9ba484a16e6 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 10:32:27 -0500 Subject: [PATCH 13/19] Support ? placeholders in pipeline send methods Add Perl-side ? to $N conversion in pg_send_query_params so that DBI-standard ? placeholders work alongside libpq-native $1/$2 syntax. This enables compatibility with SQL generators like DBIx::Class that produce ? placeholders exclusively. The conversion is a simple regex that detects ? when no $N placeholders are present. The XS functions are renamed to _pg_send_query_params and _pg_send_query_prepared with Perl wrappers handling the conversion. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 20 ++++++++++++++-- Pg.xs | 4 ++-- t/11pipeline.t | 65 +++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 5 deletions(-) diff --git a/Pg.pm b/Pg.pm index fc5b4c49..f3685484 100644 --- a/Pg.pm +++ b/Pg.pm @@ -478,6 +478,20 @@ use 5.008001; return DBD::Pg::db::_pg_type_info($pg_type); } + sub pg_send_query_params { + my ($dbh, $sql, $params) = @_; + if ($sql =~ /\?/ && $sql !~ /\$\d/) { + my $n = 0; + $sql =~ s/\?/'$' . ++$n/ge; + } + return DBD::Pg::db::_pg_send_query_params($dbh, $sql, $params); + } + + sub pg_send_query_prepared { + my ($dbh, $name, $params) = @_; + return DBD::Pg::db::_pg_send_query_prepared($dbh, $name, $params); + } + sub column_info { # Columns expected in statement handle returned (Per DBI, must be in order): @@ -4542,10 +4556,12 @@ Example: =head3 B Sends a parameterized query into the pipeline without waiting for results. -Uses C<$1>, C<$2> placeholder syntax. Returns 1 on success, 0 on error. +Accepts both C<$1>, C<$2> (libpq native) and C (DBI standard) placeholder +syntax. When C placeholders are detected, they are automatically converted +to C<$1>, C<$2>, etc. Returns 1 on success, 0 on error. $dbh->pg_send_query_params( - 'INSERT INTO t(id, name) VALUES ($1, $2)', + 'INSERT INTO t(id, name) VALUES (?, ?)', [42, 'Alice'] ); diff --git a/Pg.xs b/Pg.xs index 6c38730d..6b20acc9 100644 --- a/Pg.xs +++ b/Pg.xs @@ -842,7 +842,7 @@ pg_getresult(dbh) ST(0) = pg_db_getresult(dbh); I32 -pg_send_query_params(dbh, sql, ...) +_pg_send_query_params(dbh, sql, ...) INPUT: SV * dbh char * sql @@ -868,7 +868,7 @@ pg_send_prepare(dbh, name, sql) RETVAL I32 -pg_send_query_prepared(dbh, name, ...) +_pg_send_query_prepared(dbh, name, ...) INPUT: SV * dbh char * name diff --git a/t/11pipeline.t b/t/11pipeline.t index 047ea45a..89677e53 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 78; +plan tests => 86; my ($result, $expected, $t); @@ -579,5 +579,68 @@ is ($result->{status}, 10, 'Second PIPELINE_SYNC'); $dbh->pg_exit_pipeline_mode(); +# ? placeholder conversion + +eval { $dbh->do('DROP TABLE IF EXISTS dbd_pg_test_pipeline_qmark'); }; +$dbh->do('CREATE TEMP TABLE dbd_pg_test_pipeline_qmark(id integer, name text)'); + +$dbh->pg_enter_pipeline_mode(); + +$t='pg_send_query_params accepts ? placeholders'; +$result = $dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline_qmark(id, name) VALUES (?, ?)', + [1, 'Alice'] +); +is ($result, 1, $t); + +$t='pg_send_query_params accepts ? for SELECT'; +$result = $dbh->pg_send_query_params( + 'SELECT id, name FROM dbd_pg_test_pipeline_qmark WHERE id = ?', + [1] +); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); + +$t='? INSERT result is COMMAND_OK'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL + +$t='? SELECT result has correct data'; +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, $t); +is_deeply ($result->{rows}, [['1', 'Alice']], "$t rows match"); +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC + +$dbh->pg_exit_pipeline_mode(); + +# $1 placeholders still work (not converted) + +$dbh->pg_enter_pipeline_mode(); + +$t='$1 placeholders still work alongside ? support'; +$result = $dbh->pg_send_query_params( + 'INSERT INTO dbd_pg_test_pipeline_qmark(id, name) VALUES ($1, $2)', + [2, 'Bob'] +); +is ($result, 1, $t); + +$dbh->pg_pipeline_sync(); +$result = $dbh->pg_getresult(); +is ($result->{status}, 1, $t); +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # PIPELINE_SYNC + +$dbh->pg_exit_pipeline_mode(); + +$t='Both ? and $1 data committed correctly'; +$rows = $dbh->selectall_arrayref( + 'SELECT id, name FROM dbd_pg_test_pipeline_qmark ORDER BY id' +); +$expected = [[1, 'Alice'], [2, 'Bob']]; +is_deeply ($rows, $expected, $t); + cleanup_database($dbh,'test'); $dbh->disconnect; From 8de3c062791883c7a5a01c9600f8b4a413d0f782 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 10:49:38 -0500 Subject: [PATCH 14/19] Replace naive ? regex with proper C-level placeholder conversion Add pg_convert_placeholders() static function in dbdimp.c that converts DBI ? placeholders to libpq $N style while properly skipping single-quoted strings, double-quoted identifiers, dollar-quoted strings, and SQL comments. Follows the same skip patterns as pg_st_split_statement for correctness. Remove the Perl-side regex wrapper and revert XS function names back to pg_send_query_params/pg_send_query_prepared. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 14 ---- Pg.xs | 4 +- dbdimp.c | 207 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 206 insertions(+), 19 deletions(-) diff --git a/Pg.pm b/Pg.pm index f3685484..7bc70a37 100644 --- a/Pg.pm +++ b/Pg.pm @@ -478,20 +478,6 @@ use 5.008001; return DBD::Pg::db::_pg_type_info($pg_type); } - sub pg_send_query_params { - my ($dbh, $sql, $params) = @_; - if ($sql =~ /\?/ && $sql !~ /\$\d/) { - my $n = 0; - $sql =~ s/\?/'$' . ++$n/ge; - } - return DBD::Pg::db::_pg_send_query_params($dbh, $sql, $params); - } - - sub pg_send_query_prepared { - my ($dbh, $name, $params) = @_; - return DBD::Pg::db::_pg_send_query_prepared($dbh, $name, $params); - } - sub column_info { # Columns expected in statement handle returned (Per DBI, must be in order): diff --git a/Pg.xs b/Pg.xs index 6b20acc9..6c38730d 100644 --- a/Pg.xs +++ b/Pg.xs @@ -842,7 +842,7 @@ pg_getresult(dbh) ST(0) = pg_db_getresult(dbh); I32 -_pg_send_query_params(dbh, sql, ...) +pg_send_query_params(dbh, sql, ...) INPUT: SV * dbh char * sql @@ -868,7 +868,7 @@ pg_send_prepare(dbh, name, sql) RETVAL I32 -_pg_send_query_prepared(dbh, name, ...) +pg_send_query_prepared(dbh, name, ...) INPUT: SV * dbh char * name diff --git a/dbdimp.c b/dbdimp.c index f90d8873..47fc715c 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4814,6 +4814,197 @@ SV * pg_db_getresult (SV * dbh) } /* end of pg_db_getresult */ +/* ================================================================== */ +/* Convert ? placeholders to $N style, skipping quoted strings, */ +/* comments, and dollar-quoted sections. Returns a new SV*. */ +/* If the input already uses $N placeholders or has no ?, returns a */ +/* copy of the original. */ +static SV * pg_convert_placeholders(pTHX_ PGconn *conn, const char *input) +{ + const char *ptr = input; + const char *seg_start = input; + unsigned char ch, oldch; + int placeholder_num = 0; + bool has_question = DBDPG_FALSE; + SV *output; + signed char non_standard_strings = -1; + + /* First pass: check if we need to do anything */ + /* If there's a $ anywhere, skip conversion entirely */ + { + const char *scan = input; + while (*scan) { + if ('$' == *scan && scan[1] >= '1' && scan[1] <= '9') { + return newSVpv(input, 0); + } + if ('?' == *scan) { + has_question = DBDPG_TRUE; + } + scan++; + } + } + + /* No ? found, return a copy unchanged */ + if (!has_question) { + return newSVpv(input, 0); + } + + /* Build output with ? replaced by $N */ + output = newSVpvs(""); + oldch = 1; + + while ((ch = (unsigned char)*ptr)) { + + /* 1: A traditionally quoted section */ + if ('\'' == ch || '"' == ch) { + char quote = ch; + STRLEN backslashes = 0; + bool estring = (oldch == 'E') ? DBDPG_TRUE : DBDPG_FALSE; + if ('\'' == ch && -1 == non_standard_strings) { + const char * scs = PQparameterStatus(conn, "standard_conforming_strings"); + non_standard_strings = (NULL==scs ? 1 : 0==strncmp(scs,"on",2) ? 0 : 1); + } + + ptr++; + while (quote && (ch = (unsigned char)*ptr)) { + ptr++; + if (ch == quote && (quote == '"' || 0==(backslashes&1))) { + quote = 0; + } + else if ('\\' == ch) { + if (quote == '"' || non_standard_strings || estring) + backslashes++; + } + else { + backslashes = 0; + } + } + if (ch != 0) { + oldch = ch; + continue; + } + /* String ended inside quote - just finish */ + break; + } + + /* 2: A comment block */ + if (('-' == ch && '-' == ptr[1]) || + ('/' == ch && '*' == ptr[1])) { + char ctype = ptr[1]; /* '-' for line comment, '*' for block comment */ + ptr += 2; + while ((ch = (unsigned char)*ptr)) { + ptr++; + if ('-' == ctype && '\n' == ch) { + break; + } + else if ('*' == ctype && '*' == ch && '/' == *ptr) { + ptr++; + break; + } + } + if (ch != 0) { + oldch = ch; + continue; + } + break; + } + + /* 3: Dollar quoting */ + if ('$' == ch && + (ptr[1] == '$' + || ptr[1] == '_' + || (ptr[1] >= 'A' && ptr[1] <= 'Z') + || (ptr[1] >= 'a' && ptr[1] <= 'z') + || ((unsigned char)ptr[1] >= (unsigned char)'\200'))) { + const char *dollar_start = ptr; + const char *tag_start; + STRLEN tag_len; + bool found = DBDPG_FALSE; + + ptr++; /* skip the initial $ */ + tag_start = ptr; + + /* Scan for closing $ of the opening tag */ + while (*ptr) { + if ('$' == *ptr) { + found = DBDPG_TRUE; + break; + } + /* Invalid tag character - bail out */ + if (*ptr <= 47 + || (*ptr >= 58 && *ptr <= 64) + || (*ptr >= 91 && *ptr <= 94) + || *ptr == 96) { + break; + } + ptr++; + } + + if (!found) { + /* Not a dollar quote, rewind to just after the initial $ */ + ptr = dollar_start + 1; + oldch = ch; + continue; + } + + tag_len = ptr - tag_start; + ptr++; /* skip the closing $ of opening tag */ + + /* Now find the matching closing dollar quote */ + found = DBDPG_FALSE; + while (*ptr) { + if ('$' == *ptr) { + /* Check if this is the closing tag */ + if (tag_len == 0) { + /* $$ case */ + ptr++; + found = DBDPG_TRUE; + break; + } + if (0 == strncmp(ptr + 1, tag_start, tag_len) && ptr[tag_len + 1] == '$') { + ptr += tag_len + 2; + found = DBDPG_TRUE; + break; + } + } + ptr++; + } + + /* Whether found or not, continue from current position */ + if (*ptr) { + oldch = *(ptr - 1); + continue; + } + break; + } + + /* 4: Replace ? placeholder */ + if ('?' == ch) { + /* Append everything from seg_start up to the ? */ + if (ptr > seg_start) { + sv_catpvn(output, seg_start, ptr - seg_start); + } + sv_catpvf(output, "$%d", ++placeholder_num); + ptr++; + seg_start = ptr; + oldch = ch; + continue; + } + + oldch = ch; + ptr++; + } + + /* Append any remaining segment */ + if (ptr > seg_start) { + sv_catpvn(output, seg_start, ptr - seg_start); + } + + return output; + +} /* end of pg_convert_placeholders */ + + /* ================================================================== */ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) { @@ -4825,6 +5016,9 @@ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) #ifdef DBDPG_HAS_PIPELINE int nparams, i, ret; const char ** paramValues = NULL; + SV *converted = pg_convert_placeholders(aTHX_ imp_dbh->conn, sql); + const char *real_sql = SvPV_nolen(converted); + nparams = (params) ? (int)(av_len(params) + 1) : 0; if (nparams > 0) { @@ -4841,10 +5035,11 @@ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) } TRACE_PQSENDQUERYPARAMS; - ret = PQsendQueryParams(imp_dbh->conn, sql, nparams, NULL, + ret = PQsendQueryParams(imp_dbh->conn, real_sql, nparams, NULL, paramValues, NULL, NULL, 0); Safefree(paramValues); + SvREFCNT_dec(converted); if (0 == ret) { _fatal_sqlstate(aTHX_ imp_dbh); @@ -4874,8 +5069,14 @@ int pg_db_send_prepare (SV * dbh, char * name, char * sql) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_send_prepare\n", THEADER_slow); #ifdef DBDPG_HAS_PIPELINE - TRACE_PQSENDPREPARE; - ret = PQsendPrepare(imp_dbh->conn, name, sql, 0, NULL); + { + SV *converted = pg_convert_placeholders(aTHX_ imp_dbh->conn, sql); + const char *real_sql = SvPV_nolen(converted); + + TRACE_PQSENDPREPARE; + ret = PQsendPrepare(imp_dbh->conn, name, real_sql, 0, NULL); + SvREFCNT_dec(converted); + } if (0 == ret) { _fatal_sqlstate(aTHX_ imp_dbh); TRACE_PQERRORMESSAGE; From e6596b5f36ae6a818d6099d75f4dd9a2167eee89 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 10:57:39 -0500 Subject: [PATCH 15/19] Add edge case tests for C-level placeholder conversion Test that ? inside single-quoted strings, double-quoted identifiers, dollar-quoted strings, line comments, and block comments is NOT converted. Test multiple ? sequence ($1,$2,$3) and $1-present bypass. Co-Authored-By: Claude Opus 4.6 (1M context) --- t/11pipeline.t | 144 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 1 deletion(-) diff --git a/t/11pipeline.t b/t/11pipeline.t index 89677e53..f7bc1b61 100644 --- a/t/11pipeline.t +++ b/t/11pipeline.t @@ -23,7 +23,7 @@ if ($pgversion < 140000) { plan skip_all => 'Pipeline mode requires PostgreSQL 14 or later'; } -plan tests => 86; +plan tests => 109; my ($result, $expected, $t); @@ -642,5 +642,147 @@ $rows = $dbh->selectall_arrayref( $expected = [[1, 'Alice'], [2, 'Bob']]; is_deeply ($rows, $expected, $t); +# ? inside single-quoted string is preserved + +$dbh->pg_enter_pipeline_mode(); + +$t='? inside single-quoted string is not converted'; +$result = $dbh->pg_send_query_params( + q{SELECT 'what?' AS question, ? AS param}, + ['hello'] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], 'what?', "$t literal preserved"); +is ($result->{rows}[0][1], 'hello', "$t param bound correctly"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# ? inside double-quoted identifier - use a column alias + +$dbh->pg_enter_pipeline_mode(); + +$t='? inside double-quoted identifier is not converted'; +$result = $dbh->pg_send_query_params( + q{SELECT ? AS "what?"}, + ['test'] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], 'test', "$t param bound correctly"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# ? inside dollar-quoted string + +$dbh->pg_enter_pipeline_mode(); + +$t='? inside dollar-quoted string is not converted'; +$result = $dbh->pg_send_query_params( + q{SELECT $$what?$$ AS question, ? AS param}, + ['hello'] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], 'what?', "$t dollar-quoted preserved"); +is ($result->{rows}[0][1], 'hello', "$t param bound correctly"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# ? inside line comment + +$dbh->pg_enter_pipeline_mode(); + +$t='? inside line comment is not converted'; +$result = $dbh->pg_send_query_params( + "SELECT ? AS val -- is this a question?\n", + [42] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], '42', "$t param bound correctly"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# ? inside block comment + +$dbh->pg_enter_pipeline_mode(); + +$t='? inside block comment is not converted'; +$result = $dbh->pg_send_query_params( + 'SELECT ? AS val /* what? really? */', + [99] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], '99', "$t param bound correctly"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# Multiple ? convert to correct $1, $2, $3 sequence + +$dbh->pg_enter_pipeline_mode(); + +$t='Multiple ? placeholders convert to correct sequence'; +$result = $dbh->pg_send_query_params( + 'SELECT ? AS a, ? AS b, ? AS c', + ['x', 'y', 'z'] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is_deeply ($result->{rows}, [['x', 'y', 'z']], "$t all params correct"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + +# $1 present skips conversion entirely + +$dbh->pg_enter_pipeline_mode(); + +$t='$1 present means no ? conversion (SQL passed through as-is)'; +$result = $dbh->pg_send_query_params( + 'SELECT $1 AS val', + ['works'] +); +is ($result, 1, $t); +$dbh->pg_pipeline_sync(); + +$result = $dbh->pg_getresult(); +is ($result->{status}, 2, "$t TUPLES_OK"); +is ($result->{rows}[0][0], 'works', "$t \$1 param correct"); + +$dbh->pg_getresult(); # NULL +$dbh->pg_getresult(); # SYNC +$dbh->pg_exit_pipeline_mode(); + cleanup_database($dbh,'test'); $dbh->disconnect; From 15001cb933dc844fbae69dc8eb0b978d6c21fdfb Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Mon, 30 Mar 2026 11:19:56 -0500 Subject: [PATCH 16/19] Add trace context to placeholder converter, fix lint and spellcheck Pass SV *dbh to pg_convert_placeholders so TRACE_PQPARAMETERSTATUS works correctly. Rename seg_start to segment_start to avoid spellcheck hit on 'seg'. Add edge case tests for ? inside quoted strings, comments, dollar-quoting, multiple ? sequencing, and $1 bypass. Co-Authored-By: Claude Opus 4.6 (1M context) --- dbdimp.c | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/dbdimp.c b/dbdimp.c index 47fc715c..3695d009 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4819,10 +4819,11 @@ SV * pg_db_getresult (SV * dbh) /* comments, and dollar-quoted sections. Returns a new SV*. */ /* If the input already uses $N placeholders or has no ?, returns a */ /* copy of the original. */ -static SV * pg_convert_placeholders(pTHX_ PGconn *conn, const char *input) +static SV * pg_convert_placeholders(pTHX_ SV *dbh, PGconn *conn, const char *input) { + D_imp_dbh(dbh); const char *ptr = input; - const char *seg_start = input; + const char *segment_start = input; unsigned char ch, oldch; int placeholder_num = 0; bool has_question = DBDPG_FALSE; @@ -4861,7 +4862,9 @@ static SV * pg_convert_placeholders(pTHX_ PGconn *conn, const char *input) STRLEN backslashes = 0; bool estring = (oldch == 'E') ? DBDPG_TRUE : DBDPG_FALSE; if ('\'' == ch && -1 == non_standard_strings) { - const char * scs = PQparameterStatus(conn, "standard_conforming_strings"); + const char * scs; + TRACE_PQPARAMETERSTATUS; + scs = PQparameterStatus(conn, "standard_conforming_strings"); non_standard_strings = (NULL==scs ? 1 : 0==strncmp(scs,"on",2) ? 0 : 1); } @@ -4980,13 +4983,13 @@ static SV * pg_convert_placeholders(pTHX_ PGconn *conn, const char *input) /* 4: Replace ? placeholder */ if ('?' == ch) { - /* Append everything from seg_start up to the ? */ - if (ptr > seg_start) { - sv_catpvn(output, seg_start, ptr - seg_start); + /* Append everything from segment_start up to the ? */ + if (ptr > segment_start) { + sv_catpvn(output, segment_start, ptr - segment_start); } sv_catpvf(output, "$%d", ++placeholder_num); ptr++; - seg_start = ptr; + segment_start = ptr; oldch = ch; continue; } @@ -4996,8 +4999,8 @@ static SV * pg_convert_placeholders(pTHX_ PGconn *conn, const char *input) } /* Append any remaining segment */ - if (ptr > seg_start) { - sv_catpvn(output, seg_start, ptr - seg_start); + if (ptr > segment_start) { + sv_catpvn(output, segment_start, ptr - segment_start); } return output; @@ -5016,7 +5019,7 @@ int pg_db_send_query_params (SV * dbh, char * sql, AV * params) #ifdef DBDPG_HAS_PIPELINE int nparams, i, ret; const char ** paramValues = NULL; - SV *converted = pg_convert_placeholders(aTHX_ imp_dbh->conn, sql); + SV *converted = pg_convert_placeholders(aTHX_ dbh, imp_dbh->conn, sql); const char *real_sql = SvPV_nolen(converted); nparams = (params) ? (int)(av_len(params) + 1) : 0; @@ -5070,7 +5073,7 @@ int pg_db_send_prepare (SV * dbh, char * name, char * sql) #ifdef DBDPG_HAS_PIPELINE { - SV *converted = pg_convert_placeholders(aTHX_ imp_dbh->conn, sql); + SV *converted = pg_convert_placeholders(aTHX_ dbh, imp_dbh->conn, sql); const char *real_sql = SvPV_nolen(converted); TRACE_PQSENDPREPARE; From 696e42458e5e3c74d5853788a9f049346ba4cb62 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Thu, 9 Apr 2026 15:58:13 -0400 Subject: [PATCH 17/19] Remove pipeline mode entry from released 3.20.0 Changes section The 3.20.0 version has already been released, so the pipeline mode entry doesn't belong there. The entry will be added under the next version when a release is prepared. Co-Authored-By: Claude Opus 4.6 (1M context) --- Changes | 2 -- 1 file changed, 2 deletions(-) diff --git a/Changes b/Changes index 94990b82..d0d94baa 100644 --- a/Changes +++ b/Changes @@ -4,8 +4,6 @@ RT refers to rt.cpan.org Version 3.20.0 (released March 19, 2026) - - Add pipeline mode support for PostgreSQL 14+ - - Cleanup and improve the statistics_info() function. We no longer return "clustered" as a "TYPE" [Greg Sabino Mullane] From 193583e61b4a75952dd3aa8635612c81055544e9 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Thu, 9 Apr 2026 15:59:41 -0400 Subject: [PATCH 18/19] Fix irregular whitespace alignment in pipeline mode POD example Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Pg.pm b/Pg.pm index 7bc70a37..602b473a 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4530,9 +4530,9 @@ Example: $dbh->pg_send_query_params('SELECT * FROM t', []); $dbh->pg_pipeline_sync(); - my $ins = $dbh->pg_getresult(); # {status => 1} + my $ins = $dbh->pg_getresult(); # {status => 1} $dbh->pg_getresult(); # undef (separator) - my $sel = $dbh->pg_getresult(); # {status => 2, rows => [...]} + my $sel = $dbh->pg_getresult(); # {status => 2, rows => [...]} my @rows = @{ $sel->{rows} }; $dbh->pg_getresult(); # undef (separator) $dbh->pg_getresult(); # {status => 10} (PIPELINE_SYNC) From eba3c9086b96fad35d7f89c14c2a5169bc58f7c5 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Thu, 9 Apr 2026 16:12:35 -0400 Subject: [PATCH 19/19] Clarify undef separator pattern in pipeline mode POD Explain that pg_getresult returns undef as a delimiter between each query's results, and that callers must consume each separator. This addresses reviewer feedback that the undef calls seemed unclear. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Pg.pm b/Pg.pm index 602b473a..a2341386 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4523,6 +4523,16 @@ In pipeline mode, results arrive in this pattern for each query: result -> undef -> result -> undef -> ... -> PIPELINE_SYNC +Each query's results are terminated by an C return, which acts as a +boundary marker between queries — similar to how delimiters separate fields +in a string. Callers must consume each C separator before the next +query's result becomes available. In practice, odd-numbered calls to +C return actual results while even-numbered calls return the +C delimiters. The final call returns a hashref with status 10 +(PIPELINE_SYNC) to mark the end of the sync group. This protocol mirrors +libpq's C behavior directly and matches how other drivers +(e.g., Ruby pg) expose pipeline mode. + Example: $dbh->pg_enter_pipeline_mode();