Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ DBISTATE_DECLARE;
#define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow)
#define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow)
#define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow)
#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow)
#define TRACE_PQPUTCOPYDATA TRACE_XX "%sPQputCopyData\n", THEADER_slow)
#define TRACE_PQPUTCOPYEND TRACE_XX "%sPQputCopyEnd\n", THEADER_slow)
#define TRACE_PQSETNONBLOCKING TRACE_XX "%sPQsetnonblocking\n", THEADER_slow)
#define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow)
#define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow)
#define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow)
Expand Down
101 changes: 99 additions & 2 deletions Pg.pm
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ use 5.008001;
DBD::Pg::db->install_method('pg_getcopydata');
DBD::Pg::db->install_method('pg_getcopydata_async');
DBD::Pg::db->install_method('pg_notifies');
DBD::Pg::db->install_method('pg_flush');
DBD::Pg::db->install_method('pg_putcopydata');
DBD::Pg::db->install_method('pg_putcopydata_async');
DBD::Pg::db->install_method('pg_putcopyend');
DBD::Pg::db->install_method('pg_putcopyend_async');
DBD::Pg::db->install_method('pg_ping');
DBD::Pg::db->install_method('pg_putline');
DBD::Pg::db->install_method('pg_ready');
Expand Down Expand Up @@ -4424,12 +4427,106 @@ the COPY statement. Returns a 1 on successful input. Examples:
$dbh->pg_putcopydata("Anchovies~6\n");
$dbh->pg_putcopyend();

=head3 B<pg_putcopydata_async>

Non-blocking version of pg_putcopydata for use by async libraries. When called, the
connection is switched into non-blocking mode (via C<PQsetnonblocking>), which is safe
because no other operations are permitted on this connection during a COPY. The
non-blocking mode is automatically restored to blocking when L</pg_putcopyend_async>
completes.

Note: the connection performing the COPY is restricted to COPY operations until
the COPY ends. However, the non-blocking methods allow the event loop to service
other connections and tasks between calls, which is the primary benefit over the
blocking variants.

Return values match C<PQputCopyData>:

1 = data queued successfully (caller should call L</pg_flush> to send)
0 = output buffer full; caller should poll the socket for
write-readiness and retry the same pg_putcopydata_async call
-1 = error

After a successful return of 1, call L</pg_flush> to push the data to the
server. If C<pg_flush> returns 1 (data pending), poll the socket for
write-readiness and call C<pg_flush> again.

Example usage:

## Simple usage (flush after each row):
use IO::Select;
$dbh->do("COPY mytable(id, flavor, slices) FROM STDIN");
for my $row ("123\tPepperoni\t3\n", "314\tMushroom\t8\n") {
$dbh->pg_putcopydata_async($row);
Comment thread
turnstep marked this conversation as resolved.
Outdated
while ($dbh->pg_flush()) {
Comment thread
turnstep marked this conversation as resolved.
Outdated
IO::Select->new($dbh->{pg_socket})->can_write();
Comment thread
turnstep marked this conversation as resolved.
Outdated
}
}
$dbh->pg_putcopyend();

## Robust usage (handles buffer-full and async end):
use IO::Select;
my $sel = IO::Select->new($dbh->{pg_socket});

## Column list is optional but recommended. Default format is
## tab-delimited text with newline row terminators, matching
## PostgreSQL's COPY text format. Use COPY ... WITH (FORMAT csv)
## for CSV data, or WITH (DELIMITER '|') for custom delimiters.
$dbh->do("COPY mytable(id, flavor, slices) FROM STDIN");
my @data = ("123\tPepperoni\t3\n", "314\tMushroom\t8\n",
"6\tAnchovies\t100\n");
for my $row (@data) {
my $status = $dbh->pg_putcopydata_async($row);
while ($status == 0) { # buffer full
$sel->can_write();
$status = $dbh->pg_putcopydata_async($row);
}
die "COPY error" if $status == -1;
while ($dbh->pg_flush()) { # push to server
$sel->can_write();
}
}

## Non-blocking end: poll until server confirms
while ((my $end = $dbh->pg_putcopyend_async()) == 0) {
$sel->can_read();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er...can_read or can_write? Can buffer be full here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — can_read is only correct for the result-polling phase, but pg_putcopyend_async also returns 0 during the flush phase (which needs write-ready). Rather than try to solve that with IO::Select, I replaced the example with Time::HiRes::sleep-based polling to match the style used by the existing async examples (pg_ready, pg_cancel, etc.) elsewhere in the POD. This also sidesteps the read vs write ambiguity entirely. In practice, users of these methods will be in a real event loop (IO::Async, Mojo::IOLoop, AnyEvent, etc.) where they'd register the socket for both read and write events — but examples for specific event loops probably belong in a cookbook or separate document rather than the core POD.

}
die "COPY end error" if $end == -1;

=head3 B<pg_putcopyend>

When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
success. This method will fail if called when not in COPY IN mode.

=head3 B<pg_putcopyend_async>

Non-blocking version of pg_putcopyend for use by async libraries. Sends the COPY
end marker and attempts to collect the server result without blocking. Designed to
be called in a poll loop.

Return values:

1 = COPY completed successfully, connection is back in normal blocking mode
0 = not ready yet; caller should poll the socket for readiness, then call
pg_putcopyend_async again
-1 = error

After pg_putcopyend_async returns 1, the connection is back in blocking mode and
normal queries can be issued.

=head3 B<pg_flush>

Flushes the libpq output buffer. Wraps C<PQflush> directly. Used after
L</pg_putcopydata_async> returns 1 to push queued data to the server.

Return values match C<PQflush>:

0 = all data flushed successfully
Comment thread
esabol marked this conversation as resolved.
1 = data still pending; caller should poll the socket for write-readiness
and call pg_flush again
-1 = error

=head2 Postgres limits

For convenience, DBD::Pg can export certain constants representing the limits of
Expand Down
30 changes: 29 additions & 1 deletion Pg.xs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,17 @@ pg_putcopydata(dbh, dataline)
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline);
RETVAL = pg_db_putcopydata(dbh, dataline, 0);
OUTPUT:
RETVAL

I32
pg_putcopydata_async(dbh, dataline)
INPUT:
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline, 1);
OUTPUT:
RETVAL

Expand All @@ -799,6 +809,24 @@ pg_putcopyend(dbh)
OUTPUT:
RETVAL

I32
pg_putcopyend_async(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_putcopyend_async(dbh);
OUTPUT:
RETVAL

I32
pg_flush(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_flush(dbh);
OUTPUT:
RETVAL

void
getline(dbh, buf, len)
PREINIT:
Expand Down
Loading
Loading