Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .perlcriticrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ profile-strictness = quiet
exclude = Mardem

[Documentation::PodSpelling]
stop_words = ActiveKids afterwards arrayref arrayrefs attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar
stop_words = ActiveKids afterwards arrayref arrayrefs async attr autocommit AutoCommit AutoInactiveDestroy backend bitmask bool boolean Bunce bytea CachedKids cancelled ChildHandles ChopBlanks CompatMode CursorName datatype Datatype datatypes dbd DBD dbdpg dbh DBI deallocation deallocated dev dr DSN enum ErrCount errstr fd FetchHashKeyName filename func getfd getline github HandleError HandleSetErr hashref hashrefs InactiveDestroy JSON largeobject len libpq LongReadLen LongTruncOk lseg Mergl Momjian Mullane nullable NULLABLE Oid OID onwards param ParamTypes ParamValues perl Perlish PgBouncer pgbuiltin pgend pglibpq pglogin pgprefix pgquote PGSERVICE PGSERVICEFILE pgsql pgstart PGSYSCONFDIR PID Postgres PostgreSQL PQexecParams PQexecPrepared PQsetnonblocking PrintError PrintWarn pseudotype RaiseError README ReadOnly RowCache RowCacheSize RowsInCache runtime Sabino savepoint savepoints Savepoints schemas ShowErrorStatement SQL SQLSTATE SSL sslmode STDERR STDIN STDOUT stringify subdirectory tablename tablespace tablespaces TaintIn TaintOut TraceLevel tuple typename undef username Username UTF varchar

[-Bangs::ProhibitBitwiseOperators]
[-Bangs::ProhibitCommentedOutCode]
Expand Down
2 changes: 2 additions & 0 deletions Pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ DBISTATE_DECLARE;
#define TRACE_PQPORT TRACE_XX "%sPQport\n", THEADER_slow)
#define TRACE_PQPREPARE TRACE_XX "%sPQprepare\n", THEADER_slow)
#define TRACE_PQPROTOCOLVERSION TRACE_XX "%sPQprotocolVersion\n", THEADER_slow)
#define TRACE_PQFLUSH TRACE_XX "%sPQflush\n", THEADER_slow)
#define TRACE_PQPUTCOPYDATA TRACE_XX "%sPQputCopyData\n", THEADER_slow)
#define TRACE_PQPUTCOPYEND TRACE_XX "%sPQputCopyEnd\n", THEADER_slow)
#define TRACE_PQSETNONBLOCKING TRACE_XX "%sPQsetnonblocking\n", THEADER_slow)
#define TRACE_PQRESULTERRORFIELD TRACE_XX "%sPQresultErrorField\n", THEADER_slow)
#define TRACE_PQRESULTERRORMESSAGE TRACE_XX "%sPQresultErrorMessage\n", THEADER_slow)
#define TRACE_PQRESULTSTATUS TRACE_XX "%sPQresultStatus\n", THEADER_slow)
Expand Down
83 changes: 81 additions & 2 deletions Pg.pm
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ use 5.008001;
DBD::Pg::db->install_method('pg_getcopydata');
DBD::Pg::db->install_method('pg_getcopydata_async');
DBD::Pg::db->install_method('pg_notifies');
DBD::Pg::db->install_method('pg_flush');
DBD::Pg::db->install_method('pg_putcopydata');
DBD::Pg::db->install_method('pg_putcopydata_async');
DBD::Pg::db->install_method('pg_putcopyend');
DBD::Pg::db->install_method('pg_putcopyend_async');
DBD::Pg::db->install_method('pg_ping');
DBD::Pg::db->install_method('pg_putline');
DBD::Pg::db->install_method('pg_ready');
Expand Down Expand Up @@ -4433,12 +4436,88 @@ the COPY statement. Returns a 1 on successful input. Examples:
$dbh->pg_putcopydata("Anchovies~6\n");
$dbh->pg_putcopyend();

=head3 B<pg_putcopydata_async>

Non-blocking version of pg_putcopydata for use by async libraries. When called, the
connection is switched into non-blocking mode (via C<PQsetnonblocking>), which is safe
because no other operations are permitted on this connection during a COPY. The
non-blocking mode is automatically restored to blocking when L</pg_putcopyend_async>
completes.

Note: the connection performing the COPY is restricted to COPY operations until
the COPY ends. However, the non-blocking methods allow the event loop to service
other connections and tasks between calls, which is the primary benefit over the
blocking variants.

Return values match C<PQputCopyData>:

1 = data queued successfully (caller should call L</pg_flush> to send)
0 = output buffer full; caller should poll the socket for
write-ready and retry the same pg_putcopydata_async call
-1 = error

After a successful return of 1, call L</pg_flush> to push the data to the
server. If C<pg_flush> returns 1 (data pending), poll the socket for
write-ready and call C<pg_flush> again.

Example usage:
use Time::HiRes 'sleep';

$dbh->do("COPY mytable(id, flavor, slices) FROM STDIN");
my @data = ("123\tPepperoni\t3\n", "314\tMushroom\t8\n",
"6\tAnchovies\t100\n");
for my $row (@data) {
my $status = $dbh->pg_putcopydata_async($row);
while ($status == 0) { # buffer full, retry
sleep 0.01;
$status = $dbh->pg_putcopydata_async($row);
}
die "COPY error" if $status == -1;
while ($dbh->pg_flush()) { # push to server
sleep 0.01;
}
}

## Non-blocking end: poll until server confirms
while ((my $end = $dbh->pg_putcopyend_async()) == 0) {
sleep 0.01;
}
die "COPY end error" if $end == -1;

=head3 B<pg_putcopyend>

When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
When you are finished with pg_putcopydata, call pg_putcopyend to let the server know
that you are done, and it will return to a normal, non-COPY state. Returns a 1 on
success. This method will fail if called when not in COPY IN mode.

=head3 B<pg_putcopyend_async>

Non-blocking version of pg_putcopyend for use by async libraries. Sends the COPY
end marker and attempts to collect the server result without blocking. Designed to
be called in a poll loop.

Return values:

1 = COPY completed successfully, connection is back in normal blocking mode
0 = not ready yet; caller should poll the socket for readiness, then call
pg_putcopyend_async again
-1 = error

After pg_putcopyend_async returns 1, the connection is back in blocking mode and
normal queries can be issued.

=head3 B<pg_flush>

Flushes the libpq output buffer. Wraps C<PQflush> directly. Used after
L</pg_putcopydata_async> returns 1 to push queued data to the server.

Return values match C<PQflush>:

0 = all data flushed successfully
Comment thread
esabol marked this conversation as resolved.
1 = data still pending; caller should poll the socket for write-ready
and call pg_flush again
-1 = error

=head2 Postgres limits

For convenience, DBD::Pg can export certain constants representing the limits of
Expand Down
30 changes: 29 additions & 1 deletion Pg.xs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,17 @@ pg_putcopydata(dbh, dataline)
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline);
RETVAL = pg_db_putcopydata(dbh, dataline, 0);
OUTPUT:
RETVAL

I32
pg_putcopydata_async(dbh, dataline)
INPUT:
SV * dbh
SV * dataline
CODE:
RETVAL = pg_db_putcopydata(dbh, dataline, 1);
OUTPUT:
RETVAL

Expand All @@ -799,6 +809,24 @@ pg_putcopyend(dbh)
OUTPUT:
RETVAL

I32
pg_putcopyend_async(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_putcopyend_async(dbh);
OUTPUT:
RETVAL

I32
pg_flush(dbh)
INPUT:
SV * dbh
CODE:
RETVAL = pg_db_flush(dbh);
OUTPUT:
RETVAL

void
getline(dbh, buf, len)
PREINIT:
Expand Down
Loading