summaryrefslogtreecommitdiff
path: root/src/pl/plperl
diff options
context:
space:
mode:
authorPeter Eisentraut2018-01-22 13:30:16 +0000
committerPeter Eisentraut2018-01-22 13:43:06 +0000
commit8561e4840c81f7e345be2df170839846814fa004 (patch)
treea984a1022a84ea22f9c7a96c37f865f96212cad6 /src/pl/plperl
parentb9ff79b8f17697f3df492017d454caa9920a7183 (diff)
Transaction control in PL procedures
In each of the supplied procedural languages (PL/pgSQL, PL/Perl, PL/Python, PL/Tcl), add language-specific commit and rollback functions/commands to control transactions in procedures in that language. Add similar underlying functions to SPI. Some additional cleanup so that transaction commit or abort doesn't blow away data structures still used by the procedure call. Add execution context tracking to CALL and DO statements so that transaction control commands can only be issued in top-level procedure and block calls, not function calls or other procedure or block calls. - SPI Add a new function SPI_connect_ext() that is like SPI_connect() but allows passing option flags. The only option flag right now is SPI_OPT_NONATOMIC. A nonatomic SPI connection can execute transaction control commands, otherwise it's not allowed. This is meant to be passed down from CALL and DO statements which themselves know in which context they are called. A nonatomic SPI connection uses different memory management. A normal SPI connection allocates its memory in TopTransactionContext. For nonatomic connections we use PortalContext instead. As the comment in SPI_connect_ext() (previously SPI_connect()) indicates, one could potentially use PortalContext in all cases, but it seems safest to leave the existing uses alone, because this stuff is complicated enough already. SPI also gets new functions SPI_start_transaction(), SPI_commit(), and SPI_rollback(), which can be used by PLs to implement their transaction control logic. - portalmem.c Some adjustments were made in the code that cleans up portals at transaction abort. The portal code could already handle a command *committing* a transaction and continuing (e.g., VACUUM), but it was not quite prepared for a command *aborting* a transaction and continuing. In AtAbort_Portals(), remove the code that marks an active portal as failed. As the comment there already predicted, this doesn't work if the running command wants to keep running after transaction abort. And it's actually not necessary, because pquery.c is careful to run all portal code in a PG_TRY block and explicitly runs MarkPortalFailed() if there is an exception. So the code in AtAbort_Portals() is never used anyway. In AtAbort_Portals() and AtCleanup_Portals(), we need to be careful not to clean up active portals too much. This mirrors similar code in PreCommit_Portals(). - PL/Perl Gets new functions spi_commit() and spi_rollback() - PL/pgSQL Gets new commands COMMIT and ROLLBACK. Update the PL/SQL porting example in the documentation to reflect that transactions are now possible in procedures. - PL/Python Gets new functions plpy.commit and plpy.rollback. - PL/Tcl Gets new commands commit and rollback. Reviewed-by: Andrew Dunstan <andrew.dunstan@2ndquadrant.com>
Diffstat (limited to 'src/pl/plperl')
-rw-r--r--src/pl/plperl/GNUmakefile2
-rw-r--r--src/pl/plperl/SPI.xs9
-rw-r--r--src/pl/plperl/expected/plperl_transaction.out133
-rw-r--r--src/pl/plperl/plperl.c69
-rw-r--r--src/pl/plperl/plperl.h2
-rw-r--r--src/pl/plperl/sql/plperl_transaction.sql120
6 files changed, 332 insertions, 3 deletions
diff --git a/src/pl/plperl/GNUmakefile b/src/pl/plperl/GNUmakefile
index b829027d05d..933abb47c49 100644
--- a/src/pl/plperl/GNUmakefile
+++ b/src/pl/plperl/GNUmakefile
@@ -55,7 +55,7 @@ endif # win32
SHLIB_LINK = $(perl_embed_ldflags)
REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=plperl --load-extension=plperlu
-REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call
+REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog plperl_util plperl_init plperlu plperl_array plperl_call plperl_transaction
# if Perl can support two interpreters in one backend,
# test plperl-and-plperlu cases
ifneq ($(PERL),)
diff --git a/src/pl/plperl/SPI.xs b/src/pl/plperl/SPI.xs
index d9e6f579d41..b98c547e8be 100644
--- a/src/pl/plperl/SPI.xs
+++ b/src/pl/plperl/SPI.xs
@@ -152,6 +152,15 @@ spi_spi_cursor_close(sv)
plperl_spi_cursor_close(cursor);
pfree(cursor);
+void
+spi_spi_commit()
+ CODE:
+ plperl_spi_commit();
+
+void
+spi_spi_rollback()
+ CODE:
+ plperl_spi_rollback();
BOOT:
items = 0; /* avoid 'unused variable' warning */
diff --git a/src/pl/plperl/expected/plperl_transaction.out b/src/pl/plperl/expected/plperl_transaction.out
new file mode 100644
index 00000000000..bd7b7f8660b
--- /dev/null
+++ b/src/pl/plperl/expected/plperl_transaction.out
@@ -0,0 +1,133 @@
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b
+---+---
+ 0 |
+ 2 |
+ 4 |
+ 6 |
+ 8 |
+(5 rows)
+
+TRUNCATE test1;
+DO
+LANGUAGE plperl
+$$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+$$;
+SELECT * FROM test1;
+ a | b
+---+---
+ 0 |
+ 2 |
+ 4 |
+ 6 |
+ 8 |
+(5 rows)
+
+TRUNCATE test1;
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+return 1;
+$$;
+SELECT transaction_test2();
+ERROR: invalid transaction termination at line 5.
+CONTEXT: PL/Perl function "transaction_test2"
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query("CALL transaction_test1()");
+return 1;
+$$;
+SELECT transaction_test3();
+ERROR: invalid transaction termination at line 5. at line 2.
+CONTEXT: PL/Perl function "transaction_test3"
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- DO block inside function
+CREATE FUNCTION transaction_test4() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$');
+return 1;
+$$;
+SELECT transaction_test4();
+ERROR: invalid transaction termination at line 1. at line 2.
+CONTEXT: PL/Perl function "transaction_test4"
+-- commit inside cursor loop
+CREATE TABLE test2 (x int);
+INSERT INTO test2 VALUES (0), (1), (2), (3), (4);
+TRUNCATE test1;
+DO LANGUAGE plperl $$
+my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
+my $row;
+while (defined($row = spi_fetchrow($sth))) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
+ spi_commit();
+}
+$$;
+ERROR: cannot commit transaction while a cursor is open at line 6.
+CONTEXT: PL/Perl anonymous code block
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- rollback inside cursor loop
+TRUNCATE test1;
+DO LANGUAGE plperl $$
+my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
+my $row;
+while (defined($row = spi_fetchrow($sth))) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
+ spi_rollback();
+}
+$$;
+ERROR: cannot abort transaction while a cursor is open at line 6.
+CONTEXT: PL/Perl anonymous code block
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+DROP TABLE test1;
+DROP TABLE test2;
diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c
index 10feef11cf3..77c41b28215 100644
--- a/src/pl/plperl/plperl.c
+++ b/src/pl/plperl/plperl.c
@@ -1929,7 +1929,7 @@ plperl_inline_handler(PG_FUNCTION_ARGS)
current_call_data = &this_call_data;
- if (SPI_connect() != SPI_OK_CONNECT)
+ if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI manager");
select_perl_context(desc.lanpltrusted);
@@ -2396,13 +2396,18 @@ plperl_call_perl_event_trigger_func(plperl_proc_desc *desc,
static Datum
plperl_func_handler(PG_FUNCTION_ARGS)
{
+ bool nonatomic;
plperl_proc_desc *prodesc;
SV *perlret;
Datum retval = 0;
ReturnSetInfo *rsi;
ErrorContextCallback pl_error_context;
- if (SPI_connect() != SPI_OK_CONNECT)
+ nonatomic = fcinfo->context &&
+ IsA(fcinfo->context, CallContext) &&
+ !castNode(CallContext, fcinfo->context)->atomic;
+
+ if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI manager");
prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, false, false);
@@ -3953,6 +3958,66 @@ plperl_spi_freeplan(char *query)
SPI_freeplan(plan);
}
+void
+plperl_spi_commit(void)
+{
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ PG_TRY();
+ {
+ if (ThereArePinnedPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot commit transaction while a cursor is open")));
+
+ SPI_commit();
+ SPI_start_transaction();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Punt the error to Perl */
+ croak_cstr(edata->message);
+ }
+ PG_END_TRY();
+}
+
+void
+plperl_spi_rollback(void)
+{
+ MemoryContext oldcontext = CurrentMemoryContext;
+
+ PG_TRY();
+ {
+ if (ThereArePinnedPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
+ errmsg("cannot abort transaction while a cursor is open")));
+
+ SPI_rollback();
+ SPI_start_transaction();
+ }
+ PG_CATCH();
+ {
+ ErrorData *edata;
+
+ /* Save error info */
+ MemoryContextSwitchTo(oldcontext);
+ edata = CopyErrorData();
+ FlushErrorState();
+
+ /* Punt the error to Perl */
+ croak_cstr(edata->message);
+ }
+ PG_END_TRY();
+}
+
/*
* Implementation of plperl's elog() function
*
diff --git a/src/pl/plperl/plperl.h b/src/pl/plperl/plperl.h
index 78366aac04f..6fe7803088e 100644
--- a/src/pl/plperl/plperl.h
+++ b/src/pl/plperl/plperl.h
@@ -125,6 +125,8 @@ HV *plperl_spi_exec_prepared(char *, HV *, int, SV **);
SV *plperl_spi_query_prepared(char *, int, SV **);
void plperl_spi_freeplan(char *);
void plperl_spi_cursor_close(char *);
+void plperl_spi_commit(void);
+void plperl_spi_rollback(void);
char *plperl_sv_to_literal(SV *, char *);
void plperl_util_elog(int level, SV *msg);
diff --git a/src/pl/plperl/sql/plperl_transaction.sql b/src/pl/plperl/sql/plperl_transaction.sql
new file mode 100644
index 00000000000..5c14d4732eb
--- /dev/null
+++ b/src/pl/plperl/sql/plperl_transaction.sql
@@ -0,0 +1,120 @@
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+DO
+LANGUAGE plperl
+$$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+$$;
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+ if ($i % 2 == 0) {
+ spi_commit();
+ } else {
+ spi_rollback();
+ }
+}
+return 1;
+$$;
+
+SELECT transaction_test2();
+
+SELECT * FROM test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query("CALL transaction_test1()");
+return 1;
+$$;
+
+SELECT transaction_test3();
+
+SELECT * FROM test1;
+
+
+-- DO block inside function
+CREATE FUNCTION transaction_test4() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query('DO LANGUAGE plperl $x$ spi_commit(); $x$');
+return 1;
+$$;
+
+SELECT transaction_test4();
+
+
+-- commit inside cursor loop
+CREATE TABLE test2 (x int);
+INSERT INTO test2 VALUES (0), (1), (2), (3), (4);
+
+TRUNCATE test1;
+
+DO LANGUAGE plperl $$
+my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
+my $row;
+while (defined($row = spi_fetchrow($sth))) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
+ spi_commit();
+}
+$$;
+
+SELECT * FROM test1;
+
+
+-- rollback inside cursor loop
+TRUNCATE test1;
+
+DO LANGUAGE plperl $$
+my $sth = spi_query("SELECT * FROM test2 ORDER BY x");
+my $row;
+while (defined($row = spi_fetchrow($sth))) {
+ spi_exec_query("INSERT INTO test1 (a) VALUES (" . $row->{x} . ")");
+ spi_rollback();
+}
+$$;
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
+DROP TABLE test2;