From 940df53fdbeab07987c3219803f853e2f2ba787d Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Tue, 17 Mar 2026 14:24:45 -0500 Subject: [PATCH 01/12] Add non-blocking async COPY FROM support Add pg_putcopydata_async, pg_putcopyend_async, and pg_flush methods to enable non-blocking COPY FROM STDIN for async Perl libraries. pg_putcopydata_async enables PQsetnonblocking on the connection (safe during COPY state since no other operations are permitted), then uses PQflush to manage the output buffer. Returns 0 when buffer is full or 2 when flush is pending, allowing the caller to poll the socket and retry without blocking the event loop. pg_putcopyend_async sends PQputCopyEnd and polls for the server result using PQconsumeInput/PQisBusy. Uses copystate=-1 as a sentinel to track the "end sent, awaiting result" phase across retries. Restores blocking mode automatically on completion. pg_flush exposes PQflush for callers that need to complete a pending flush between putcopydata_async or putcopyend_async calls. The blocking pg_putcopydata is unchanged (passes async=0 internally). All 3,763 existing tests continue to pass with zero regressions. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.h | 2 + Pg.pm | 62 ++++++++++++++- Pg.xs | 30 ++++++- dbdimp.c | 229 ++++++++++++++++++++++++++++++++++++++++++++++++++--- dbdimp.h | 7 +- t/07copy.t | 167 +++++++++++++++++++++++++++++++++++++- 6 files changed, 482 insertions(+), 15 deletions(-) diff --git a/Pg.h b/Pg.h index 1b83e194..2ce97c3a 100644 --- a/Pg.h +++ b/Pg.h @@ -157,8 +157,10 @@ DBISTATE_DECLARE; #define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow) #define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow) #define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow) +#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow) #define TRACE_PQPUTCOPYDATA TRACE_XX "%sPQputCopyData\n", THEADER_slow) #define TRACE_PQPUTCOPYEND TRACE_XX "%sPQputCopyEnd\n", THEADER_slow) +#define TRACE_PQSETNONBLOCKING TRACE_XX "%sPQsetnonblocking\n", THEADER_slow) #define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow) #define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow) #define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow) diff --git a/Pg.pm b/Pg.pm index 2cf4670b..7d6f90af 100644 --- a/Pg.pm +++ b/Pg.pm @@ -158,8 +158,11 @@ use 5.008001; DBD::Pg::db->install_method('pg_getcopydata'); DBD::Pg::db->install_method('pg_getcopydata_async'); DBD::Pg::db->install_method('pg_notifies'); + DBD::Pg::db->install_method('pg_flush'); DBD::Pg::db->install_method('pg_putcopydata'); + DBD::Pg::db->install_method('pg_putcopydata_async'); DBD::Pg::db->install_method('pg_putcopyend'); + DBD::Pg::db->install_method('pg_putcopyend_async'); DBD::Pg::db->install_method('pg_ping'); DBD::Pg::db->install_method('pg_putline'); DBD::Pg::db->install_method('pg_ready'); @@ -4424,12 +4427,67 @@ the COPY statement. Returns a 1 on successful input. Examples: $dbh->pg_putcopydata("Anchovies~6\n"); $dbh->pg_putcopyend(); +=head3 B + +Non-blocking version of pg_putcopydata for use by async libraries. When called, the +connection is switched into non-blocking mode (via C), which is safe +because no other operations are permitted during a COPY. The non-blocking mode is +automatically restored to blocking when L completes. + +Return values: + + 1 = data queued and flushed successfully + 2 = data queued but output buffer not fully flushed; caller should + poll the socket for write-readiness and call L + 0 = output buffer full; caller should poll the socket for + write-readiness and retry the same pg_putcopydata_async call + -1 = error + +Example usage with an event loop: + + $dbh->do("COPY mytable FROM STDIN"); + my $status = $dbh->pg_putcopydata_async("123\tPepperoni\t3\n"); + if ($status == 0 || $status == 2) { + # poll $dbh->{pg_socket} for write-readiness, then: + # if $status == 2: call $dbh->pg_flush + # if $status == 0: retry the pg_putcopydata_async call + } + =head3 B -When you are finished with pg_putcopydata, call pg_putcopyend to let the server know -that you are done, and it will return to a normal, non-COPY state. Returns a 1 on +When you are finished with pg_putcopydata, call pg_putcopyend to let the server know +that you are done, and it will return to a normal, non-COPY state. Returns a 1 on success. This method will fail if called when not in COPY IN mode. +=head3 B + +Non-blocking version of pg_putcopyend for use by async libraries. Sends the COPY +end marker and attempts to collect the server result without blocking. + +Return values: + + 1 = COPY completed successfully, connection is back in normal blocking mode + 2 = COPY end marker sent but output buffer not fully flushed; caller should + poll the socket for write-readiness and call L, then retry + 0 = server result not ready yet; caller should poll the socket for + read-readiness, then call pg_putcopyend_async again + -1 = error + +After pg_putcopyend_async returns 1, the connection is back in blocking mode and +normal queries can be issued. + +=head3 B + +Flushes the libpq output buffer. Used during non-blocking COPY operations when +L or L return 2 (flush pending). + +Return values: + + 0 = all data flushed successfully + 1 = data still pending; caller should poll the socket for write-readiness + and call pg_flush again + -1 = error + =head2 Postgres limits For convenience, DBD::Pg can export certain constants representing the limits of diff --git a/Pg.xs b/Pg.xs index 1f856d10..96b6209d 100644 --- a/Pg.xs +++ b/Pg.xs @@ -786,7 +786,17 @@ pg_putcopydata(dbh, dataline) SV * dbh SV * dataline CODE: - RETVAL = pg_db_putcopydata(dbh, dataline); + RETVAL = pg_db_putcopydata(dbh, dataline, 0); + OUTPUT: + RETVAL + +I32 +pg_putcopydata_async(dbh, dataline) + INPUT: + SV * dbh + SV * dataline + CODE: + RETVAL = pg_db_putcopydata(dbh, dataline, 1); OUTPUT: RETVAL @@ -799,6 +809,24 @@ pg_putcopyend(dbh) OUTPUT: RETVAL +I32 +pg_putcopyend_async(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_putcopyend_async(dbh); + OUTPUT: + RETVAL + +I32 +pg_flush(dbh) + INPUT: + SV * dbh + CODE: + RETVAL = pg_db_flush(dbh); + OUTPUT: + RETVAL + void getline(dbh, buf, len) PREINIT: diff --git a/dbdimp.c b/dbdimp.c index cd6e0937..27e200b5 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4505,7 +4505,7 @@ int pg_db_getcopydata (SV * dbh, SV * dataline, int async) /* ================================================================== */ -int pg_db_putcopydata (SV * dbh, SV * dataline) +int pg_db_putcopydata (SV * dbh, SV * dataline, int async) { dTHX; D_imp_dbh(dbh); @@ -4513,12 +4513,25 @@ int pg_db_putcopydata (SV * dbh, SV * dataline) const char *copydata; STRLEN copylen; - if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_putcopydata\n", THEADER_slow); + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_putcopydata (async: %d)\n", THEADER_slow, async); /* We must be in COPY IN state */ if (PGRES_COPY_IN != imp_dbh->copystate && PGRES_COPY_BOTH != imp_dbh->copystate) croak("pg_putcopydata can only be called directly after issuing a COPY FROM command\n"); + /* Enable non-blocking mode for async callers (safe during COPY: no other ops allowed) */ + if (async && !imp_dbh->copy_nonblocking) { + TRACE_PQSETNONBLOCKING; + if (PQsetnonblocking(imp_dbh->conn, 1) != 0) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error: setnonblocking)\n", THEADER_slow); + return -1; + } + imp_dbh->copy_nonblocking = 1; + } + if (imp_dbh->pg_utf8_flag && !imp_dbh->copybinary) copydata = SvPVutf8(dataline, copylen); else @@ -4528,23 +4541,39 @@ int pg_db_putcopydata (SV * dbh, SV * dataline) copystatus = PQputCopyData(imp_dbh->conn, copydata, copylen); if (1 == copystatus) { - if (PGRES_COPY_BOTH == imp_dbh->copystate && PQflush(imp_dbh->conn)) { - _fatal_sqlstate(aTHX_ imp_dbh); - TRACE_PQERRORMESSAGE; - pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (async || PGRES_COPY_BOTH == imp_dbh->copystate) { + int flush_status; + TRACE_PQFLUSH; + flush_status = PQflush(imp_dbh->conn); + if (-1 == flush_status) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error: flush)\n", THEADER_slow); + return -1; + } + /* flush_status 1 means data still pending in output buffer */ + if (async && 1 == flush_status) { + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (flush pending)\n", THEADER_slow); + return 2; + } } + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (1)\n", THEADER_slow); + return 1; } - else if (0 == copystatus) { /* non-blocking mode only */ + else if (0 == copystatus) { + /* Non-blocking mode: output buffer full, caller should wait for write-ready */ + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (buffer full)\n", THEADER_slow); + return 0; } else { _fatal_sqlstate(aTHX_ imp_dbh); TRACE_PQERRORMESSAGE; pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error)\n", THEADER_slow); + return -1; } - if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata\n", THEADER_slow); - return copystatus == 1 ? 1 : 0; - } /* end of pg_db_putcopydata */ @@ -4615,6 +4644,186 @@ int pg_db_putcopyend (SV * dbh) } /* end of pg_db_putcopyend */ +/* ================================================================== */ +/* + Non-blocking version of pg_db_putcopyend. + Sends the COPY end marker and attempts to collect the server result + without blocking. Restores blocking mode on completion. + + This function is designed to be called repeatedly in a poll loop: + - First call: sends PQputCopyEnd, attempts flush and result check + - Subsequent calls (when returning 0): polls for the server result + + Returns: + 1 = COPY completed successfully, connection is back in normal mode + 0 = server result not ready yet (caller should poll socket for read-ready) + 2 = flush pending (caller should poll socket for write-ready, then call pg_db_flush) + -1 = error +*/ +int pg_db_putcopyend_async (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_putcopyend_async (copystate: %d)\n", + THEADER_slow, imp_dbh->copystate); + + /* + copystate == -1 means we already sent PQputCopyEnd on a prior call + and are waiting for the server result. Skip straight to result polling. + */ + if (-1 == imp_dbh->copystate) { + goto poll_result; + } + + if (0 == imp_dbh->copystate) { + warn("pg_putcopyend_async cannot be called until a COPY is issued"); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (warning: copystate is 0)\n", THEADER_slow); + return -1; + } + + if (PGRES_COPY_OUT == imp_dbh->copystate) { + warn("pg_putcopyend_async does not need to be called when using pg_getcopydata"); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (warning: copy state is OUT)\n", THEADER_slow); + return -1; + } + + /* Must be PGRES_COPY_IN or PGRES_COPY_BOTH at this point */ + { + int copystatus; + + TRACE_PQPUTCOPYEND; + copystatus = PQputCopyEnd(imp_dbh->conn, NULL); + + if (1 == copystatus) { + int flush_status; + + /* Mark that PQputCopyEnd has been sent, awaiting result */ + imp_dbh->copystate = -1; + + /* Flush the end marker to the server */ + TRACE_PQFLUSH; + flush_status = PQflush(imp_dbh->conn); + if (-1 == flush_status) { + imp_dbh->copystate = 0; + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (error: flush)\n", THEADER_slow); + return -1; + } + if (1 == flush_status) { + /* Data pending in output buffer, caller should poll for write-ready */ + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (flush pending)\n", THEADER_slow); + return 2; + } + + /* Fall through to result polling */ + } + else if (0 == copystatus) { + /* Non-blocking mode: buffer full, caller should poll for write-ready */ + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (buffer full)\n", THEADER_slow); + return 0; + } + else { + imp_dbh->copystate = 0; + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (error)\n", THEADER_slow); + return -1; + } + } + +poll_result: + /* Check if the server result is ready (non-blocking) */ + { + PGresult * result; + ExecStatusType status; + + TRACE_PQCONSUMEINPUT; + if (!PQconsumeInput(imp_dbh->conn)) { + imp_dbh->copystate = 0; + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (error: consumeInput)\n", THEADER_slow); + return -1; + } + + TRACE_PQISBUSY; + if (PQisBusy(imp_dbh->conn)) { + /* Server hasn't replied yet, caller should poll for read-ready */ + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (result pending)\n", THEADER_slow); + return 0; + } + + /* Result is ready. Restore blocking mode and drain results. */ + if (imp_dbh->copy_nonblocking) { + TRACE_PQSETNONBLOCKING; + PQsetnonblocking(imp_dbh->conn, 0); + imp_dbh->copy_nonblocking = 0; + } + + imp_dbh->copystate = 0; + TRACE_PQGETRESULT; + result = PQgetResult(imp_dbh->conn); + status = _sqlstate(aTHX_ imp_dbh, result); + while (result != NULL) { + TRACE_PQCLEAR; + PQclear(result); + TRACE_PQGETRESULT; + result = PQgetResult(imp_dbh->conn); + } + + if (PGRES_COMMAND_OK != status) { + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, status, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (error: status not OK)\n", THEADER_slow); + return -1; + } + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (1)\n", THEADER_slow); + return 1; + } + +} /* end of pg_db_putcopyend_async */ + + +/* ================================================================== */ +/* + Flush the libpq output buffer. Used during non-blocking COPY operations. + + Returns: + 0 = all data flushed successfully + 1 = data still pending (caller should poll for write-ready and call again) + -1 = error +*/ +int pg_db_flush (SV * dbh) +{ + dTHX; + D_imp_dbh(dbh); + int flush_status; + + if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_flush\n", THEADER_slow); + + TRACE_PQFLUSH; + flush_status = PQflush(imp_dbh->conn); + + if (-1 == flush_status) { + _fatal_sqlstate(aTHX_ imp_dbh); + TRACE_PQERRORMESSAGE; + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_flush (error)\n", THEADER_slow); + return -1; + } + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_flush (%d)\n", THEADER_slow, flush_status); + return flush_status; + +} /* end of pg_db_flush */ + + /* ================================================================== */ SV * pg_db_error_field (SV *dbh, char * fieldname) { diff --git a/dbdimp.h b/dbdimp.h index 37a3203c..40d9903b 100644 --- a/dbdimp.h +++ b/dbdimp.h @@ -22,6 +22,7 @@ struct imp_dbh_st { int prepare_number; /* internal prepared statement name modifier */ int copystate; /* 0=none PGRES_COPY_IN PGRES_COPY_OUT */ bool copybinary; /* whether the copy is in binary format */ + bool copy_nonblocking; /* whether PQsetnonblocking was enabled for async COPY */ int pg_errorlevel; /* PQsetErrorVerbosity. Set by user, defaults to 1 */ bool server_prepare; /* do we want to use PQexecPrepared? Can be changed by user */ int switch_prepared; /* how many executes until we switch to PQexecPrepared */ @@ -235,10 +236,14 @@ int pg_db_getline (SV *dbh, SV * svbuf, int length); int pg_db_getcopydata (SV *dbh, SV * dataline, int async); -int pg_db_putcopydata (SV *dbh, SV * dataline); +int pg_db_putcopydata (SV *dbh, SV * dataline, int async); int pg_db_putcopyend (SV * dbh); +int pg_db_putcopyend_async (SV * dbh); + +int pg_db_flush (SV * dbh); + int pg_db_endcopy (SV * dbh); SV * pg_db_error_field (SV *dbh, char * fieldname); diff --git a/t/07copy.t b/t/07copy.t index d8dfe6bc..8d8f5389 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -15,7 +15,7 @@ select(($|=1,select(STDERR),$|=1)[1]); my $dbh = connect_database(); if ($dbh) { - plan tests => 62; + plan tests => 79; } else { plan skip_all => 'Connection to database failed, cannot continue testing'; @@ -426,6 +426,171 @@ is $@, '', 'pg_putcopydata in binary mode works' $t=q{COPY in binary mode round trips}; is_deeply ($dbh->selectall_arrayref('SELECT * FROM binarycopy'), [[1],[1]], $t); ## nospellcheck +## +## Test the async COPY methods +## + +my $async_table = 'dbd_pg_test_async_copy'; +$dbh->do(qq{CREATE TABLE $async_table(id integer, name text)}); +$dbh->commit(); + +# pg_putcopydata_async: basic operation + +$t='pg_putcopydata_async fails if not after a COPY FROM statement'; +eval { + $dbh->pg_putcopydata_async("pizza\tpie"); +}; +like ($@, qr{COPY FROM command}, $t); + +$t='pg_putcopydata_async returns 1 on success'; +$dbh->do("COPY $async_table FROM STDIN"); +$result = $dbh->pg_putcopydata_async("1\tAlice\n"); +ok ($result >= 1, $t); # may be 1 (flushed) or 2 (flush pending) + +$t='pg_putcopydata_async works on second call'; +$result = $dbh->pg_putcopydata_async("2\tBob\n"); +ok ($result >= 1, $t); + +$t='pg_putcopydata_async works on third call'; +$result = $dbh->pg_putcopydata_async("3\tCharlie\n"); +ok ($result >= 1, $t); + +# If any calls returned 2 (flush pending), flush now +if ($result == 2) { + $dbh->pg_flush(); +} + +# pg_putcopyend_async: basic operation + +$t='pg_putcopyend_async completes the COPY'; +my $end_result = $dbh->pg_putcopyend_async(); +# May need to poll if result is 0 (server not ready) or 2 (flush pending) +my $poll_count = 0; +while ($end_result != 1 && $end_result != -1 && $poll_count < 100) { + if ($end_result == 2) { + $dbh->pg_flush(); + } + # Brief delay then retry + select(undef, undef, undef, 0.01); + $end_result = $dbh->pg_putcopyend_async(); + $poll_count++; +} +is ($end_result, 1, $t); + +$t='Data from pg_putcopydata_async was inserted correctly'; +$result = $dbh->selectall_arrayref("SELECT id,name FROM $async_table ORDER BY id"); +$expected = [[1,'Alice'],[2,'Bob'],[3,'Charlie']]; +is_deeply ($result, $expected, $t); + +$dbh->commit(); + +# Normal queries work after async COPY + +$t='Normal queries work after async COPY IN'; +eval { + $dbh->do('SELECT 999'); +}; +is ($@, q{}, $t); + +# Async queries work after async COPY + +$t='Async queries work after async COPY IN'; +eval { + $dbh->do('SELECT 888', { pg_async => PG_ASYNC} ); +}; +is ($@, q{}, $t); +$dbh->pg_result(); + +# pg_putcopyend_async: state checks (uses Test::Warn like blocking variant) + +$t='pg_putcopyend_async warns when not in COPY state'; +eval { require Test::Warn; }; +if ($@) { + pass ('Skipping Test::Warn test for putcopyend_async no-copy'); + pass ('Skipping Test::Warn test for putcopyend_async copy-out'); +} +else { + Test::Warn::warning_like (sub { $dbh->pg_putcopyend_async(); }, qr/until a COPY/, $t); + + $t='pg_putcopyend_async warns when in COPY OUT state'; + $dbh->do("COPY $async_table TO STDOUT"); + Test::Warn::warning_like (sub { $dbh->pg_putcopyend_async(); }, qr/pg_getcopydata/, $t); + # Drain the COPY OUT + 1 while ($dbh->pg_getcopydata($buffer) >= 0); +} + +# pg_flush: works outside COPY (should just return 0 = nothing to flush) + +$t='pg_flush returns 0 when nothing to flush'; +$result = $dbh->pg_flush(); +is ($result, 0, $t); + +# Async COPY with larger data set (tests buffering behavior) + +$dbh->do("DELETE FROM $async_table"); +$dbh->commit(); + +$t='pg_putcopydata_async handles larger data sets'; +$dbh->do("COPY $async_table FROM STDIN"); +my $async_ok = 1; +for my $i (1..1000) { + my $row_result = $dbh->pg_putcopydata_async("$i\tRow number $i\n"); + if ($row_result == -1) { + $async_ok = 0; + last; + } + # Handle flush-pending by flushing + if ($row_result == 2) { + my $flush = $dbh->pg_flush(); + while ($flush == 1) { + select(undef, undef, undef, 0.001); + $flush = $dbh->pg_flush(); + } + } +} +ok ($async_ok, $t); + +$t='pg_putcopyend_async works after large data set'; +$end_result = $dbh->pg_putcopyend_async(); +$poll_count = 0; +while ($end_result != 1 && $end_result != -1 && $poll_count < 100) { + if ($end_result == 2) { + $dbh->pg_flush(); + } + select(undef, undef, undef, 0.01); + $end_result = $dbh->pg_putcopyend_async(); + $poll_count++; +} +is ($end_result, 1, $t); + +$t='All 1000 rows were inserted via async COPY'; +$result = $dbh->selectall_arrayref("SELECT count(*) FROM $async_table"); +is ($result->[0][0], 1000, $t); + +$dbh->commit(); + +# Mixing: blocking putcopydata still works (backward compatibility) + +$dbh->do("DELETE FROM $async_table"); +$dbh->commit(); + +$t='Blocking pg_putcopydata still works after async has been used'; +$dbh->do("COPY $async_table FROM STDIN"); +$result = $dbh->pg_putcopydata("42\tBlocking row\n"); +is ($result, 1, $t); + +$t='Blocking pg_putcopyend still works'; +$result = $dbh->pg_putcopyend(); +is ($result, 1, $t); + +$t='Blocking COPY data was inserted correctly'; +$result = $dbh->selectall_arrayref("SELECT id,name FROM $async_table ORDER BY id"); +$expected = [[42,'Blocking row']]; +is_deeply ($result, $expected, $t); + +$dbh->commit(); + +$dbh->do("DROP TABLE $async_table"); $dbh->do("DROP TABLE $table"); $dbh->commit(); From d6506ffc3dbf9bb276447bd094a012a97cb15707 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Tue, 17 Mar 2026 16:36:44 -0500 Subject: [PATCH 02/12] Simplify async COPY return values to match libpq conventions Drop the invented return value 2 from pg_putcopydata_async and pg_putcopyend_async. The interface now matches libpq and EV::Pg: pg_putcopydata_async: 1 (queued), 0 (buffer full), -1 (error) pg_putcopyend_async: 1 (done), 0 (not ready), -1 (error) pg_flush: 0 (flushed), 1 (pending), -1 (error) After pg_putcopydata_async returns 1, caller calls pg_flush to push data to the server. This is the standard libpq pattern: PQputCopyData queues, PQflush sends. No DBD::Pg-specific protocol to learn. pg_putcopydata_async no longer calls PQflush internally for async mode; that responsibility moves to pg_flush. The COPY_BOTH path (logical replication) still auto-flushes as before. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 44 +++++++++++++++++++++++++++----------------- dbdimp.c | 20 +++++++------------- t/07copy.t | 49 +++++++++++++++++++++++-------------------------- 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/Pg.pm b/Pg.pm index 7d6f90af..08ebfd5e 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4434,24 +4434,35 @@ connection is switched into non-blocking mode (via C), which i because no other operations are permitted during a COPY. The non-blocking mode is automatically restored to blocking when L completes. -Return values: +Return values match C: - 1 = data queued and flushed successfully - 2 = data queued but output buffer not fully flushed; caller should - poll the socket for write-readiness and call L + 1 = data queued successfully (caller should call L to send) 0 = output buffer full; caller should poll the socket for write-readiness and retry the same pg_putcopydata_async call -1 = error +After a successful return of 1, call L to push the data to the +server. If C returns 1 (data pending), poll the socket for +write-readiness and call C again. + Example usage with an event loop: $dbh->do("COPY mytable FROM STDIN"); - my $status = $dbh->pg_putcopydata_async("123\tPepperoni\t3\n"); - if ($status == 0 || $status == 2) { - # poll $dbh->{pg_socket} for write-readiness, then: - # if $status == 2: call $dbh->pg_flush - # if $status == 0: retry the pg_putcopydata_async call + + for my $row (@data) { + my $status = $dbh->pg_putcopydata_async($row); + while ($status == 0) { + # buffer full — poll for write-readiness, retry + poll_writable($dbh->{pg_socket}); + $status = $dbh->pg_putcopydata_async($row); + } + die "COPY error" if $status == -1; + # Flush to server + while ($dbh->pg_flush()) { + poll_writable($dbh->{pg_socket}); + } } + $dbh->pg_putcopyend(); =head3 B @@ -4462,15 +4473,14 @@ success. This method will fail if called when not in COPY IN mode. =head3 B Non-blocking version of pg_putcopyend for use by async libraries. Sends the COPY -end marker and attempts to collect the server result without blocking. +end marker and attempts to collect the server result without blocking. Designed to +be called in a poll loop. Return values: 1 = COPY completed successfully, connection is back in normal blocking mode - 2 = COPY end marker sent but output buffer not fully flushed; caller should - poll the socket for write-readiness and call L, then retry - 0 = server result not ready yet; caller should poll the socket for - read-readiness, then call pg_putcopyend_async again + 0 = not ready yet; caller should poll the socket for readiness, then call + pg_putcopyend_async again -1 = error After pg_putcopyend_async returns 1, the connection is back in blocking mode and @@ -4478,10 +4488,10 @@ normal queries can be issued. =head3 B -Flushes the libpq output buffer. Used during non-blocking COPY operations when -L or L return 2 (flush pending). +Flushes the libpq output buffer. Wraps C directly. Used after +L returns 1 to push queued data to the server. -Return values: +Return values match C: 0 = all data flushed successfully 1 = data still pending; caller should poll the socket for write-readiness diff --git a/dbdimp.c b/dbdimp.c index 27e200b5..cad258b4 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -4541,23 +4541,18 @@ int pg_db_putcopydata (SV * dbh, SV * dataline, int async) copystatus = PQputCopyData(imp_dbh->conn, copydata, copylen); if (1 == copystatus) { - if (async || PGRES_COPY_BOTH == imp_dbh->copystate) { - int flush_status; + /* For COPY_BOTH (logical replication), flush immediately as before */ + if (PGRES_COPY_BOTH == imp_dbh->copystate) { TRACE_PQFLUSH; - flush_status = PQflush(imp_dbh->conn); - if (-1 == flush_status) { + if (PQflush(imp_dbh->conn)) { _fatal_sqlstate(aTHX_ imp_dbh); TRACE_PQERRORMESSAGE; pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error: flush)\n", THEADER_slow); return -1; } - /* flush_status 1 means data still pending in output buffer */ - if (async && 1 == flush_status) { - if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (flush pending)\n", THEADER_slow); - return 2; - } } + /* For async mode, caller is responsible for calling pg_flush */ if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (1)\n", THEADER_slow); return 1; } @@ -4656,8 +4651,7 @@ int pg_db_putcopyend (SV * dbh) Returns: 1 = COPY completed successfully, connection is back in normal mode - 0 = server result not ready yet (caller should poll socket for read-ready) - 2 = flush pending (caller should poll socket for write-ready, then call pg_db_flush) + 0 = not ready yet (caller should poll socket and call again) -1 = error */ int pg_db_putcopyend_async (SV * dbh) @@ -4713,9 +4707,9 @@ int pg_db_putcopyend_async (SV * dbh) return -1; } if (1 == flush_status) { - /* Data pending in output buffer, caller should poll for write-ready */ + /* Data pending in output buffer, caller should poll and retry */ if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopyend_async (flush pending)\n", THEADER_slow); - return 2; + return 0; } /* Fall through to result polling */ diff --git a/t/07copy.t b/t/07copy.t index 8d8f5389..9b66d0e3 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -15,7 +15,7 @@ select(($|=1,select(STDERR),$|=1)[1]); my $dbh = connect_database(); if ($dbh) { - plan tests => 79; + plan tests => 80; } else { plan skip_all => 'Connection to database failed, cannot continue testing'; @@ -445,32 +445,29 @@ like ($@, qr{COPY FROM command}, $t); $t='pg_putcopydata_async returns 1 on success'; $dbh->do("COPY $async_table FROM STDIN"); $result = $dbh->pg_putcopydata_async("1\tAlice\n"); -ok ($result >= 1, $t); # may be 1 (flushed) or 2 (flush pending) +is ($result, 1, $t); + +$t='pg_flush sends data to server'; +$result = $dbh->pg_flush(); +is ($result, 0, $t); # 0 = flushed, 1 = pending $t='pg_putcopydata_async works on second call'; $result = $dbh->pg_putcopydata_async("2\tBob\n"); -ok ($result >= 1, $t); +is ($result, 1, $t); +$dbh->pg_flush(); $t='pg_putcopydata_async works on third call'; $result = $dbh->pg_putcopydata_async("3\tCharlie\n"); -ok ($result >= 1, $t); - -# If any calls returned 2 (flush pending), flush now -if ($result == 2) { - $dbh->pg_flush(); -} +is ($result, 1, $t); +$dbh->pg_flush(); # pg_putcopyend_async: basic operation $t='pg_putcopyend_async completes the COPY'; my $end_result = $dbh->pg_putcopyend_async(); -# May need to poll if result is 0 (server not ready) or 2 (flush pending) +# May need to poll if result is 0 (not ready yet) my $poll_count = 0; -while ($end_result != 1 && $end_result != -1 && $poll_count < 100) { - if ($end_result == 2) { - $dbh->pg_flush(); - } - # Brief delay then retry +while ($end_result == 0 && $poll_count < 100) { select(undef, undef, undef, 0.01); $end_result = $dbh->pg_putcopyend_async(); $poll_count++; @@ -539,13 +536,16 @@ for my $i (1..1000) { $async_ok = 0; last; } - # Handle flush-pending by flushing - if ($row_result == 2) { - my $flush = $dbh->pg_flush(); - while ($flush == 1) { - select(undef, undef, undef, 0.001); - $flush = $dbh->pg_flush(); - } + # If buffer full (0), poll and retry + while ($row_result == 0) { + select(undef, undef, undef, 0.001); + $row_result = $dbh->pg_putcopydata_async("$i\tRow number $i\n"); + } + # Flush after each successful queue + my $flush = $dbh->pg_flush(); + while ($flush == 1) { + select(undef, undef, undef, 0.001); + $flush = $dbh->pg_flush(); } } ok ($async_ok, $t); @@ -553,10 +553,7 @@ ok ($async_ok, $t); $t='pg_putcopyend_async works after large data set'; $end_result = $dbh->pg_putcopyend_async(); $poll_count = 0; -while ($end_result != 1 && $end_result != -1 && $poll_count < 100) { - if ($end_result == 2) { - $dbh->pg_flush(); - } +while ($end_result == 0 && $poll_count < 100) { select(undef, undef, undef, 0.01); $end_result = $dbh->pg_putcopyend_async(); $poll_count++; From f0b0a3d48b882af414d3d2701156d36325ff20c0 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Tue, 17 Mar 2026 17:21:40 -0500 Subject: [PATCH 03/12] Improve pg_putcopydata_async POD examples Show simple and robust usage patterns with real data (matching the existing pg_putcopydata pizza examples), IO::Select for socket polling, explicit column lists, and the full pg_putcopyend_async poll loop. Add a note about COPY text format and alternatives. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/Pg.pm b/Pg.pm index 08ebfd5e..d4ecb7fa 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4445,24 +4445,47 @@ After a successful return of 1, call L to push the data to the server. If C returns 1 (data pending), poll the socket for write-readiness and call C again. -Example usage with an event loop: +Example usage: - $dbh->do("COPY mytable FROM STDIN"); + ## Simple usage (flush after each row): + use IO::Select; + $dbh->do("COPY mytable(id, flavor, slices) FROM STDIN"); + for my $row ("123\tPepperoni\t3\n", "314\tMushroom\t8\n") { + $dbh->pg_putcopydata_async($row); + while ($dbh->pg_flush()) { + IO::Select->new($dbh->{pg_socket})->can_write(); + } + } + $dbh->pg_putcopyend(); + ## Robust usage (handles buffer-full and async end): + use IO::Select; + my $sel = IO::Select->new($dbh->{pg_socket}); + + ## Column list is optional but recommended. Default format is + ## tab-delimited text with newline row terminators, matching + ## PostgreSQL's COPY text format. Use COPY ... WITH (FORMAT csv) + ## for CSV data, or WITH (DELIMITER '|') for custom delimiters. + $dbh->do("COPY mytable(id, flavor, slices) FROM STDIN"); + my @data = ("123\tPepperoni\t3\n", "314\tMushroom\t8\n", + "6\tAnchovies\t100\n"); for my $row (@data) { my $status = $dbh->pg_putcopydata_async($row); - while ($status == 0) { - # buffer full — poll for write-readiness, retry - poll_writable($dbh->{pg_socket}); + while ($status == 0) { # buffer full + $sel->can_write(); $status = $dbh->pg_putcopydata_async($row); } die "COPY error" if $status == -1; - # Flush to server - while ($dbh->pg_flush()) { - poll_writable($dbh->{pg_socket}); + while ($dbh->pg_flush()) { # push to server + $sel->can_write(); } } - $dbh->pg_putcopyend(); + + ## Non-blocking end: poll until server confirms + while ((my $end = $dbh->pg_putcopyend_async()) == 0) { + $sel->can_read(); + } + die "COPY end error" if $end == -1; =head3 B From a20aa96c657581be2e8cb2f4c2c6b3734bf494a4 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Wed, 18 Mar 2026 06:41:33 -0500 Subject: [PATCH 04/12] Add additional async COPY test coverage - pg_putcopydata_async fails in COPY OUT state - pg_putcopydata_async fails with no argument - do() fails during async COPY IN (mirrors blocking test) - Recovery after rude non-COPY attempt during async COPY - Binary COPY round-trip via async methods - Multiple async COPY cycles on the same connection Co-Authored-By: Claude Opus 4.6 (1M context) --- t/07copy.t | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/t/07copy.t b/t/07copy.t index 9b66d0e3..908f499a 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -15,7 +15,7 @@ select(($|=1,select(STDERR),$|=1)[1]); my $dbh = connect_database(); if ($dbh) { - plan tests => 80; + plan tests => 89; } else { plan skip_all => 'Connection to database failed, cannot continue testing'; @@ -587,6 +587,101 @@ is_deeply ($result, $expected, $t); $dbh->commit(); +# pg_putcopydata_async: wrong state checks + +$t='pg_putcopydata_async fails in COPY OUT state'; +$dbh->do("COPY $async_table TO STDOUT"); +eval { + $dbh->pg_putcopydata_async("pizza\tpie"); +}; +like ($@, qr{COPY FROM command}, $t); +# Drain the COPY OUT +1 while ($dbh->pg_getcopydata($buffer) >= 0); + +$t='pg_putcopydata_async fails with no argument'; +$dbh->do("COPY $async_table FROM STDIN"); +eval { + $dbh->pg_putcopydata_async(); +}; +ok ($@, $t); +$dbh->rollback(); + +# do() fails during async COPY IN (same as blocking) + +$t='do() fails during async COPY IN'; +$dbh->do("COPY $async_table FROM STDIN"); +$dbh->pg_putcopydata_async("99\tDuringCopy\n"); +eval { + $dbh->do('SELECT 123'); +}; +like ($@, qr{pg_putcopyend}, $t); + +$t='pg_putcopydata_async works after a rude non-COPY attempt'; +eval { + $result = $dbh->pg_putcopydata_async("100\tAfterRude\n"); +}; +is ($@, q{}, $t); +is ($result, 1, $t); +$dbh->pg_flush(); +$dbh->pg_putcopyend(); +$dbh->commit(); + +# Binary COPY with async methods + +$dbh->do('CREATE TEMP TABLE binarycopy_async AS SELECT 1::INTEGER AS x'); +$dbh->do('COPY binarycopy_async TO STDOUT BINARY'); + +my $bindata; +my $binlen = $dbh->pg_getcopydata($bindata); +while ($dbh->pg_getcopydata(my $tmp) >= 0) { + $bindata .= $tmp; +} + +$t='pg_putcopydata_async works in binary mode'; +$dbh->do('COPY binarycopy_async FROM STDIN BINARY'); +eval { + $dbh->pg_putcopydata_async($bindata); + $dbh->pg_flush(); + my $bend = $dbh->pg_putcopyend_async(); + my $bpoll = 0; + while ($bend == 0 && $bpoll < 100) { + select(undef, undef, undef, 0.01); + $bend = $dbh->pg_putcopyend_async(); + $bpoll++; + } +}; +is ($@, '', $t); + +$t='Binary COPY via async round trips correctly'; +is_deeply ($dbh->selectall_arrayref('SELECT * FROM binarycopy_async'), [[1],[1]], $t); + +# Multiple async COPY cycles on the same connection + +$dbh->do("DELETE FROM $async_table"); +$dbh->commit(); + +$t='Second async COPY cycle works on same connection'; +$dbh->do("COPY $async_table FROM STDIN"); +$dbh->pg_putcopydata_async("50\tFirstCycle\n"); +$dbh->pg_flush(); +my $e1 = $dbh->pg_putcopyend_async(); +while ($e1 == 0) { select(undef, undef, undef, 0.01); $e1 = $dbh->pg_putcopyend_async(); } +$dbh->commit(); + +$dbh->do("COPY $async_table FROM STDIN"); +$dbh->pg_putcopydata_async("51\tSecondCycle\n"); +$dbh->pg_flush(); +my $e2 = $dbh->pg_putcopyend_async(); +while ($e2 == 0) { select(undef, undef, undef, 0.01); $e2 = $dbh->pg_putcopyend_async(); } +is ($e2, 1, $t); + +$t='Both async COPY cycles inserted data correctly'; +$result = $dbh->selectall_arrayref("SELECT id,name FROM $async_table ORDER BY id"); +$expected = [[50,'FirstCycle'],[51,'SecondCycle']]; +is_deeply ($result, $expected, $t); + +$dbh->commit(); + $dbh->do("DROP TABLE $async_table"); $dbh->do("DROP TABLE $table"); $dbh->commit(); From 2a1308ca44f517fee96a81efad161216a60cb4ba Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Wed, 18 Mar 2026 06:46:51 -0500 Subject: [PATCH 05/12] Clarify non-blocking COPY scope in POD Make explicit that the connection itself is restricted to COPY operations, but the non-blocking methods let the event loop service other connections and tasks between calls. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Pg.pm b/Pg.pm index d4ecb7fa..7ff32775 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4431,8 +4431,14 @@ the COPY statement. Returns a 1 on successful input. Examples: Non-blocking version of pg_putcopydata for use by async libraries. When called, the connection is switched into non-blocking mode (via C), which is safe -because no other operations are permitted during a COPY. The non-blocking mode is -automatically restored to blocking when L completes. +because no other operations are permitted on this connection during a COPY. The +non-blocking mode is automatically restored to blocking when L +completes. + +Note: the connection performing the COPY is restricted to COPY operations until +the COPY ends. However, the non-blocking methods allow the event loop to service +other connections and tasks between calls, which is the primary benefit over the +blocking variants. Return values match C: From 8a03679f58fb7945432834466ed8107e1be44295 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Sun, 29 Mar 2026 09:39:46 -0500 Subject: [PATCH 06/12] Address PR #176 review feedback Fix spelling and perlcritic test failures found with AUTHOR_TESTING=1 and RELEASE_TESTING=1. Rename binarycopy_async temp table to use the dbd_pg_ prefix per project convention. Add dbd_pg_test_async_copy to the @tables cleanup list in dbdpg_test_setup.pl. Co-Authored-By: Claude Opus 4.6 (1M context) --- .perlcriticrc | 2 +- t/07copy.t | 9 ++++----- t/99_spellcheck.t | 2 ++ t/dbdpg_test_setup.pl | 1 + 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.perlcriticrc b/.perlcriticrc index 3a2431ab..264ccddb 100644 --- a/.perlcriticrc +++ b/.perlcriticrc @@ -3,7 +3,7 @@ profile-strictness = quiet exclude = Mardem [Documentation::PodSpelling] -stop_words = ActiveKids afterwards arrayref arrayrefs attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar +stop_words = ActiveKids afterwards arrayref arrayrefs async attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PQsetnonblocking PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar [-Bangs::ProhibitBitwiseOperators] [-Bangs::ProhibitCommentedOutCode] diff --git a/t/07copy.t b/t/07copy.t index 908f499a..1631ebb7 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -628,8 +628,8 @@ $dbh->commit(); # Binary COPY with async methods -$dbh->do('CREATE TEMP TABLE binarycopy_async AS SELECT 1::INTEGER AS x'); -$dbh->do('COPY binarycopy_async TO STDOUT BINARY'); +$dbh->do('CREATE TEMP TABLE dbd_pg_test_binarycopy_async AS SELECT 1::INTEGER AS x'); +$dbh->do('COPY dbd_pg_test_binarycopy_async TO STDOUT BINARY'); my $bindata; my $binlen = $dbh->pg_getcopydata($bindata); @@ -638,7 +638,7 @@ while ($dbh->pg_getcopydata(my $tmp) >= 0) { } $t='pg_putcopydata_async works in binary mode'; -$dbh->do('COPY binarycopy_async FROM STDIN BINARY'); +$dbh->do('COPY dbd_pg_test_binarycopy_async FROM STDIN BINARY'); eval { $dbh->pg_putcopydata_async($bindata); $dbh->pg_flush(); @@ -653,7 +653,7 @@ eval { is ($@, '', $t); $t='Binary COPY via async round trips correctly'; -is_deeply ($dbh->selectall_arrayref('SELECT * FROM binarycopy_async'), [[1],[1]], $t); +is_deeply ($dbh->selectall_arrayref('SELECT * FROM dbd_pg_test_binarycopy_async'), [[1],[1]], $t); ## nospellcheck # Multiple async COPY cycles on the same connection @@ -682,7 +682,6 @@ is_deeply ($result, $expected, $t); $dbh->commit(); -$dbh->do("DROP TABLE $async_table"); $dbh->do("DROP TABLE $table"); $dbh->commit(); diff --git a/t/99_spellcheck.t b/t/99_spellcheck.t index b1c36fb9..044a22cc 100644 --- a/t/99_spellcheck.t +++ b/t/99_spellcheck.t @@ -680,6 +680,7 @@ PQexecPrepared PQoids PQprepare PQprotocolVersion +PQputCopyEnd PQresultErrorField PQsend PQsendPrepare @@ -688,6 +689,7 @@ PQsendQueryParams PQsendQueryPrepared PQserverVersion PQsetErrorVerbosity +PQsetnonblocking PQsetSingleRowMode PQstatus pqtype diff --git a/t/dbdpg_test_setup.pl b/t/dbdpg_test_setup.pl index cf7b4b5a..42ae3629 100644 --- a/t/dbdpg_test_setup.pl +++ b/t/dbdpg_test_setup.pl @@ -58,6 +58,7 @@ 'dbd_pg_test1', 'dbd_pg_test', 'dbd_pg_test_geom', + 'dbd_pg_test_async_copy', ); my @sequences = From 311682b3532e49a63a26db4fc265dda127199856 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Fri, 3 Apr 2026 20:12:22 -0500 Subject: [PATCH 07/12] Standardize on 'write-ready' / 'read-ready' terminology in POD Per turnstep's review: use the shorter form consistently across documentation, matching what the C comments already use. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Pg.pm b/Pg.pm index 81feaca0..05c2a025 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4453,12 +4453,12 @@ Return values match C: 1 = data queued successfully (caller should call L to send) 0 = output buffer full; caller should poll the socket for - write-readiness and retry the same pg_putcopydata_async call + write-ready and retry the same pg_putcopydata_async call -1 = error After a successful return of 1, call L to push the data to the server. If C returns 1 (data pending), poll the socket for -write-readiness and call C again. +write-ready and call C again. Example usage: @@ -4532,7 +4532,7 @@ L returns 1 to push queued data to the server. Return values match C: 0 = all data flushed successfully - 1 = data still pending; caller should poll the socket for write-readiness + 1 = data still pending; caller should poll the socket for write-ready and call pg_flush again -1 = error From c7691b85114c127e27dd68a8d6f81ec74763c74e Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Fri, 3 Apr 2026 20:16:57 -0500 Subject: [PATCH 08/12] Remove sloppy simple example from pg_putcopydata_async POD Per turnstep's review: the simple example had unchecked return values, an ambiguous flush loop condition, and created a new IO::Select on every iteration. The robust example already covers proper usage. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/Pg.pm b/Pg.pm index 05c2a025..8f61695f 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4461,19 +4461,6 @@ server. If C returns 1 (data pending), poll the socket for write-ready and call C again. Example usage: - - ## Simple usage (flush after each row): - use IO::Select; - $dbh->do("COPY mytable(id, flavor, slices) FROM STDIN"); - for my $row ("123\tPepperoni\t3\n", "314\tMushroom\t8\n") { - $dbh->pg_putcopydata_async($row); - while ($dbh->pg_flush()) { - IO::Select->new($dbh->{pg_socket})->can_write(); - } - } - $dbh->pg_putcopyend(); - - ## Robust usage (handles buffer-full and async end): use IO::Select; my $sel = IO::Select->new($dbh->{pg_socket}); From 633e496bc4e6350b728ce03e8d50f890cdce67e0 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Fri, 3 Apr 2026 20:34:52 -0500 Subject: [PATCH 09/12] Replace IO::Select example with Time::HiRes sleep polling Per turnstep's review and to match the existing async example style used throughout the POD (pg_ready, pg_cancel examples all use sleep-based polling). Also sidesteps the can_read vs can_write ambiguity in pg_putcopyend_async's multi-phase return. Co-Authored-By: Claude Opus 4.6 (1M context) --- Pg.pm | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/Pg.pm b/Pg.pm index 8f61695f..48b62605 100644 --- a/Pg.pm +++ b/Pg.pm @@ -4461,31 +4461,26 @@ server. If C returns 1 (data pending), poll the socket for write-ready and call C again. Example usage: - use IO::Select; - my $sel = IO::Select->new($dbh->{pg_socket}); + use Time::HiRes 'sleep'; - ## Column list is optional but recommended. Default format is - ## tab-delimited text with newline row terminators, matching - ## PostgreSQL's COPY text format. Use COPY ... WITH (FORMAT csv) - ## for CSV data, or WITH (DELIMITER '|') for custom delimiters. $dbh->do("COPY mytable(id, flavor, slices) FROM STDIN"); my @data = ("123\tPepperoni\t3\n", "314\tMushroom\t8\n", "6\tAnchovies\t100\n"); for my $row (@data) { my $status = $dbh->pg_putcopydata_async($row); - while ($status == 0) { # buffer full - $sel->can_write(); + while ($status == 0) { # buffer full, retry + sleep 0.01; $status = $dbh->pg_putcopydata_async($row); } die "COPY error" if $status == -1; while ($dbh->pg_flush()) { # push to server - $sel->can_write(); + sleep 0.01; } } ## Non-blocking end: poll until server confirms while ((my $end = $dbh->pg_putcopyend_async()) == 0) { - $sel->can_read(); + sleep 0.01; } die "COPY end error" if $end == -1; From 379e0a5197335c98ec75ce1d2b033244317f8299 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Fri, 3 Apr 2026 21:35:45 -0500 Subject: [PATCH 10/12] Check pg_flush return value for errors in large data set test Per turnstep's review: a -1 from pg_flush was silently ignored. Now checks for error and breaks out of the loop, matching the pattern used for pg_putcopydata_async error handling. Co-Authored-By: Claude Opus 4.6 (1M context) --- t/07copy.t | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/t/07copy.t b/t/07copy.t index 1631ebb7..6bafd0fa 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -543,9 +543,17 @@ for my $i (1..1000) { } # Flush after each successful queue my $flush = $dbh->pg_flush(); + if ($flush == -1) { + $async_ok = 0; + last; + } while ($flush == 1) { select(undef, undef, undef, 0.001); $flush = $dbh->pg_flush(); + if ($flush == -1) { + $async_ok = 0; + last; + } } } ok ($async_ok, $t); From bc0b83d20b5cc1ef7ed3136d779bcf11e7310902 Mon Sep 17 00:00:00 2001 From: John Napiorkowski Date: Thu, 9 Apr 2026 16:40:51 -0400 Subject: [PATCH 11/12] Fix flush error handling in large data set test Move the pg_flush error check (-1) outside the while loop so that `last` exits the outer for loop instead of just the inner while. Previously, a flush error during the retry loop would only break the while, allowing the for loop to continue processing rows. Suggested-by: Ed Sabol Co-Authored-By: Claude Opus 4.6 (1M context) --- t/07copy.t | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/t/07copy.t b/t/07copy.t index 6bafd0fa..fd1a0d16 100644 --- a/t/07copy.t +++ b/t/07copy.t @@ -543,17 +543,13 @@ for my $i (1..1000) { } # Flush after each successful queue my $flush = $dbh->pg_flush(); - if ($flush == -1) { - $async_ok = 0; - last; - } while ($flush == 1) { select(undef, undef, undef, 0.001); $flush = $dbh->pg_flush(); - if ($flush == -1) { - $async_ok = 0; - last; - } + } + if ($flush == -1) { + $async_ok = 0; + last; } } ok ($async_ok, $t); From 6dfeeb5c529c8987148bf6e8b4fe8c78b07ab8ab Mon Sep 17 00:00:00 2001 From: Ed Sabol Date: Fri, 1 May 2026 18:14:51 -0400 Subject: [PATCH 12/12] Resolve merge conflict in .perlcriticrc --- .perlcriticrc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.perlcriticrc b/.perlcriticrc index b44c555a..65674747 100644 --- a/.perlcriticrc +++ b/.perlcriticrc @@ -3,11 +3,7 @@ profile-strictness = quiet exclude = Mardem [Documentation::PodSpelling] -<<<<<<< async-copy-from -stop_words = ActiveKids afterwards arrayref arrayrefs async attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PQsetnonblocking PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar -======= -stop_words = ActiveKids afterwards arrayref arrayrefs attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode sslrootcert STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar ->>>>>>> master +stop_words = ActiveKids afterwards arrayref arrayrefs async attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PQsetnonblocking PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode sslrootcert STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar [-Bangs::ProhibitBitwiseOperators] [-Bangs::ProhibitCommentedOutCode]