Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .perlcriticrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ profile-strictness = quiet
exclude = Mardem

[Documentation::PodSpelling]
stop_words = ActiveKids afterwards arrayref arrayrefs attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar
stop_words = ActiveKids afterwards arrayref arrayrefs async attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PQsetnonblocking PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar

[-Bangs::ProhibitBitwiseOperators]
[-Bangs::ProhibitCommentedOutCode]
Expand Down
2 changes: 2 additions & 0 deletions Pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ DBISTATE_DECLARE;
#define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow)
#define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow)
#define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow)
#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow)
#define TRACE_PQPUTCOPYDATA TRACE_XX "%sPQputCopyData\n", THEADER_slow)
#define TRACE_PQPUTCOPYEND TRACE_XX "%sPQputCopyEnd\n", THEADER_slow)
#define TRACE_PQSETNONBLOCKING TRACE_XX "%sPQsetnonblocking\n", THEADER_slow)
#define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow)
#define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow)
#define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow)
Expand Down
101 changes: 99 additions & 2 deletions Pg.pm
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ use 5.008001;
DBD::Pg::db->install_method('pg_getcopydata');
DBD::Pg::db->install_method('pg_getcopydata_async');
DBD::Pg::db->install_method('pg_notifies');
DBD::Pg::db->install_method('pg_flush');
DBD::Pg::db->install_method('pg_putcopydata');
DBD::Pg::db->install_method('pg_putcopydata_async');
DBD::Pg::db->install_method('pg_putcopyend');
DBD::Pg::db->install_method('pg_putcopyend_async');
DBD::Pg::db->install_method('pg_ping');
DBD::Pg::db->install_method('pg_putline');
DBD::Pg::db->install_method('pg_ready');
Expand Down Expand Up @@ -4433,12 +4436,106 @@ the COPY statement. Returns a 1 on successful input. Examples:
$dbh->pg_putcopydata("Anchovies~6\n");
$dbh->pg_putcopyend();

=head3 B<pg_putcopydata_async>

Non-blocking version of pg_putcopydata for use by async libraries. When called, the
connection is switched into non-blocking mode (via C<PQsetnonblocking>), which is safe
because no other operations are permitted on this connection during a COPY. The
non-blocking mode is automatically restored to blocking when L</pg_putcopyend_async>
completes.

Note: the connection performing the COPY is restricted to COPY operations until
the COPY ends. However, the non-blocking methods allow the event loop to service
other connections and tasks between calls, which is the primary benefit over the
blocking variants.

Return values match C<PQputCopyData>:

1 = data queued successfully (caller should call L</pg_flush> to send)
0 = output buffer full; caller should poll the socket for
write-readiness and retry the same pg_putcopydata_async call
-1 = error

After a successful return of 1, call L</pg_flush> to push the data to the
server. If C<pg_flush> returns 1 (data pending), poll the socket for
write-readiness and call C<pg_flush> again.

Example usage:

## Simple usage (flush after each row):
use IO::Select;
$dbh->do("COPY mytable(id, flavor, slices) FROM STDIN");
for my $row ("123\tPepperoni\t3\n", "314\tMushroom\t8\n") {
$dbh->pg_putcopydata_async($row);
Comment thread
turnstep marked this conversation as resolved.
Outdated
while ($dbh->pg_flush()) {
Comment thread
turnstep marked this conversation as resolved.
Outdated
IO::Select->new($dbh->{pg_socket})->can_write();
Comment thread
turnstep marked this conversation as resolved.
Outdated
}
}
$dbh->pg_putcopyend();

## Robust usage (handles buffer-full and async end):
use IO::Select;
my $sel = IO::Select->new($dbh->{pg_socket});

## Column list is optional but recommended. Default format is
## tab-delimited text with newline row terminators, matching
## PostgreSQL's COPY text format. Use COPY ... WITH (FORMAT csv)
## for CSV data, or WITH (DELIMITER '|') for custom delimiters.
$dbh->do("COPY mytable(id, flavor, slices) FROM STDIN");
my @data = ("123\tPepperoni\t3\n", "314\tMushroom\t8\n",
"6\tAnchovies\t100\n");
for my $row (@data) {
my $status = $dbh->pg_putcopydata_async($row);
while ($status == 0) { # buffer full
$sel->can_write();
$status = $dbh->pg_putcopydata_async($row);
}
die "COPY error" if $status == -1;
while ($dbh->pg_flush()) { # push to server
$sel->can_write();
}
}

## Non-blocking end: poll until server confirms
while ((my $end = $dbh->pg_putcopyend_async()) == 0) {
$sel->can_read();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er...can_read or can_write? Can buffer be full here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — can_read is only correct for the result-polling phase, but pg_putcopyend_async also returns 0 during the flush phase (which needs write-ready). Rather than try to solve that with IO::Select, I replaced the example with Time::HiRes::sleep-based polling to match the style used by the existing async examples (pg_ready, pg_cancel, etc.) elsewhere in the POD. This also sidesteps the read vs write ambiguity entirely. In practice, users of these methods will be in a real event loop (IO::Async, Mojo::IOLoop, AnyEvent, etc.) where they'd register the socket for both read and write events — but examples for specific event loops probably belong in a cookbook or separate document rather than the core POD.

}
die "COPY end error" if $end == -1;

=head3 B<pg_putcopyend>

When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
success. This method will fail if called when not in COPY IN mode.

=head3 B<pg_putcopyend_async>

Non-blocking version of pg_putcopyend for use by async libraries. Sends the COPY
end marker and attempts to collect the server result without blocking. Designed to
be called in a poll loop.

Return values:

1 = COPY completed successfully, connection is back in normal blocking mode
0 = not ready yet; caller should poll the socket for readiness, then call
pg_putcopyend_async again
-1 = error

After pg_putcopyend_async returns 1, the connection is back in blocking mode and
normal queries can be issued.

=head3 B<pg_flush>

Flushes the libpq output buffer. Wraps C<PQflush> directly. Used after
L</pg_putcopydata_async> returns 1 to push queued data to the server.

Return values match C<PQflush>:

0 = all data flushed successfully
Comment thread
esabol marked this conversation as resolved.
1 = data still pending; caller should poll the socket for write-readiness
and call pg_flush again
-1 = error

=head2 Postgres limits

For convenience, DBD::Pg can export certain constants representing the limits of
Expand Down
30 changes: 29 additions & 1 deletion Pg.xs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,17 @@ pg_putcopydata(dbh, dataline)
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline);
RETVAL = pg_db_putcopydata(dbh, dataline, 0);
OUTPUT:
RETVAL

I32
pg_putcopydata_async(dbh, dataline)
INPUT:
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline, 1);
OUTPUT:
RETVAL

Expand All @@ -799,6 +809,24 @@ pg_putcopyend(dbh)
OUTPUT:
RETVAL

I32
pg_putcopyend_async(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_putcopyend_async(dbh);
OUTPUT:
RETVAL

I32
pg_flush(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_flush(dbh);
OUTPUT:
RETVAL

void
getline(dbh, buf, len)
PREINIT:
Expand Down
Loading