diff options
| author | Peter Eisentraut | 2018-01-22 13:30:16 +0000 |
|---|---|---|
| committer | Peter Eisentraut | 2018-01-22 13:43:06 +0000 |
| commit | 8561e4840c81f7e345be2df170839846814fa004 (patch) | |
| tree | a984a1022a84ea22f9c7a96c37f865f96212cad6 /src/pl/plpython | |
| parent | b9ff79b8f17697f3df492017d454caa9920a7183 (diff) | |
Transaction control in PL procedures
In each of the supplied procedural languages (PL/pgSQL, PL/Perl,
PL/Python, PL/Tcl), add language-specific commit and rollback
functions/commands to control transactions in procedures in that
language. Add similar underlying functions to SPI. Some additional
cleanup so that transaction commit or abort doesn't blow away data
structures still used by the procedure call. Add execution context
tracking to CALL and DO statements so that transaction control commands
can only be issued in top-level procedure and block calls, not function
calls or other procedure or block calls.
- SPI
Add a new function SPI_connect_ext() that is like SPI_connect() but
allows passing option flags. The only option flag right now is
SPI_OPT_NONATOMIC. A nonatomic SPI connection can execute transaction
control commands, otherwise it's not allowed. This is meant to be
passed down from CALL and DO statements which themselves know in which
context they are called. A nonatomic SPI connection uses different
memory management. A normal SPI connection allocates its memory in
TopTransactionContext. For nonatomic connections we use PortalContext
instead. As the comment in SPI_connect_ext() (previously SPI_connect())
indicates, one could potentially use PortalContext in all cases, but it
seems safest to leave the existing uses alone, because this stuff is
complicated enough already.
SPI also gets new functions SPI_start_transaction(), SPI_commit(), and
SPI_rollback(), which can be used by PLs to implement their transaction
control logic.
- portalmem.c
Some adjustments were made in the code that cleans up portals at
transaction abort. The portal code could already handle a command
*committing* a transaction and continuing (e.g., VACUUM), but it was not
quite prepared for a command *aborting* a transaction and continuing.
In AtAbort_Portals(), remove the code that marks an active portal as
failed. As the comment there already predicted, this doesn't work if
the running command wants to keep running after transaction abort. And
it's actually not necessary, because pquery.c is careful to run all
portal code in a PG_TRY block and explicitly runs MarkPortalFailed() if
there is an exception. So the code in AtAbort_Portals() is never used
anyway.
In AtAbort_Portals() and AtCleanup_Portals(), we need to be careful not
to clean up active portals too much. This mirrors similar code in
PreCommit_Portals().
- PL/Perl
Gets new functions spi_commit() and spi_rollback()
- PL/pgSQL
Gets new commands COMMIT and ROLLBACK.
Update the PL/SQL porting example in the documentation to reflect that
transactions are now possible in procedures.
- PL/Python
Gets new functions plpy.commit and plpy.rollback.
- PL/Tcl
Gets new commands commit and rollback.
Reviewed-by: Andrew Dunstan <andrew.dunstan@2ndquadrant.com>
Diffstat (limited to 'src/pl/plpython')
| -rw-r--r-- | src/pl/plpython/Makefile | 1 | ||||
| -rw-r--r-- | src/pl/plpython/expected/plpython_test.out | 4 | ||||
| -rw-r--r-- | src/pl/plpython/expected/plpython_transaction.out | 135 | ||||
| -rw-r--r-- | src/pl/plpython/plpy_main.c | 21 | ||||
| -rw-r--r-- | src/pl/plpython/plpy_plpymodule.c | 49 | ||||
| -rw-r--r-- | src/pl/plpython/sql/plpython_transaction.sql | 115 |
6 files changed, 317 insertions, 8 deletions
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index cc91afebde7..d09910835d1 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -90,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_transaction \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412e..39b994f4468 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -48,6 +48,7 @@ select module_contents(); Error Fatal SPIError + commit cursor debug error @@ -60,10 +61,11 @@ select module_contents(); quote_ident quote_literal quote_nullable + rollback spiexceptions subtransaction warning -(18 rows) +(20 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out new file mode 100644 index 00000000000..1fadc69b636 --- /dev/null +++ b/src/pl/plpython/expected/plpython_transaction.out @@ -0,0 +1,135 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; +SELECT transaction_test2(); +ERROR: invalid transaction termination +CONTEXT: PL/Python function "transaction_test2" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; +SELECT transaction_test3(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test3", line 2, in <module> + plpy.execute("CALL transaction_test1()") +PL/Python function "transaction_test3" +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; +SELECT transaction_test4(); +ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination +CONTEXT: Traceback (most recent call last): + PL/Python function "transaction_test4", line 2, in <module> + plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +PL/Python function "transaction_test4" +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; +WARNING: forcibly aborting a subtransaction that has not been exited +ERROR: cannot commit while a subtransaction is active +CONTEXT: PL/Python anonymous code block +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; +ERROR: cannot commit transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- rollback inside cursor loop +TRUNCATE test1; +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; +ERROR: cannot abort transaction while a cursor is open +CONTEXT: PL/Python anonymous code block +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +DROP TABLE test1; +DROP TABLE test2; diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 695de305838..5a197ce27ac 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -60,7 +60,7 @@ static void plpython_error_callback(void *arg); static void plpython_inline_error_callback(void *arg); static void PLy_init_interp(void); -static PLyExecutionContext *PLy_push_execution_context(void); +static PLyExecutionContext *PLy_push_execution_context(bool atomic_context); static void PLy_pop_execution_context(void); /* static state for Python library conflict detection */ @@ -219,14 +219,19 @@ plpython2_validator(PG_FUNCTION_ARGS) Datum plpython_call_handler(PG_FUNCTION_ARGS) { + bool nonatomic; Datum retval; PLyExecutionContext *exec_ctx; ErrorContextCallback plerrcontext; PLy_initialize(); + nonatomic = fcinfo->context && + IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; + /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); /* @@ -235,7 +240,7 @@ plpython_call_handler(PG_FUNCTION_ARGS) * here and the PG_TRY. (plpython_error_callback expects the stack entry * to be there, so we have to make the context first.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(!nonatomic); /* * Setup error traceback support for ereport() @@ -303,7 +308,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) PLy_initialize(); /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */ - if (SPI_connect() != SPI_OK_CONNECT) + if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); MemSet(&fake_fcinfo, 0, sizeof(fake_fcinfo)); @@ -332,7 +337,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS) * need the stack entry, but for consistency with plpython_call_handler we * do it in this order.) */ - exec_ctx = PLy_push_execution_context(); + exec_ctx = PLy_push_execution_context(codeblock->atomic); /* * Setup error traceback support for ereport() @@ -430,12 +435,14 @@ PLy_get_scratch_context(PLyExecutionContext *context) } static PLyExecutionContext * -PLy_push_execution_context(void) +PLy_push_execution_context(bool atomic_context) { PLyExecutionContext *context; + /* Pick a memory context similar to what SPI uses. */ context = (PLyExecutionContext *) - MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext)); + MemoryContextAlloc(atomic_context ? TopTransactionContext : PortalContext, + sizeof(PLyExecutionContext)); context->curr_proc = NULL; context->scratch_ctx = NULL; context->next = PLy_execution_contexts; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 23f99e20ca3..3d7dd13f0cf 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -6,8 +6,10 @@ #include "postgres.h" +#include "access/xact.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" #include "plpython.h" @@ -15,6 +17,7 @@ #include "plpy_cursorobject.h" #include "plpy_elog.h" +#include "plpy_main.h" #include "plpy_planobject.h" #include "plpy_resultobject.h" #include "plpy_spi.h" @@ -41,6 +44,8 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_ident(PyObject *self, PyObject *args); +static PyObject *PLy_commit(PyObject *self, PyObject *args); +static PyObject *PLy_rollback(PyObject *self, PyObject *args); /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ @@ -95,6 +100,12 @@ static PyMethodDef PLy_methods[] = { */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, + /* + * transaction control + */ + {"commit", PLy_commit, METH_NOARGS, NULL}, + {"rollback", PLy_rollback, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -577,3 +588,41 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw) */ Py_RETURN_NONE; } + +static PyObject * +PLy_commit(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot commit transaction while a cursor is open"))); + + SPI_commit(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} + +static PyObject * +PLy_rollback(PyObject *self, PyObject *args) +{ + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + if (ThereArePinnedPortals()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION), + errmsg("cannot abort transaction while a cursor is open"))); + + SPI_rollback(); + SPI_start_transaction(); + + /* was cleared at transaction end, reset pointer */ + exec_ctx->scratch_ctx = NULL; + + Py_RETURN_NONE; +} diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql new file mode 100644 index 00000000000..36c7b2ef385 --- /dev/null +++ b/src/pl/plpython/sql/plpython_transaction.sql @@ -0,0 +1,115 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +SELECT * FROM test1; + + +TRUNCATE test1; + +-- not allowed in a function +CREATE FUNCTION transaction_test2() RETURNS int +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +return 1 +$$; + +SELECT transaction_test2(); + +SELECT * FROM test1; + + +-- also not allowed if procedure is called from a function +CREATE FUNCTION transaction_test3() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("CALL transaction_test1()") +return 1 +$$; + +SELECT transaction_test3(); + +SELECT * FROM test1; + + +-- DO block inside function +CREATE FUNCTION transaction_test4() RETURNS int +LANGUAGE plpythonu +AS $$ +plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$") +return 1 +$$; + +SELECT transaction_test4(); + + +-- commit inside subtransaction (prohibited) +DO LANGUAGE plpythonu $$ +with plpy.subtransaction(): + plpy.commit() +$$; + + +-- commit inside cursor loop +CREATE TABLE test2 (x int); +INSERT INTO test2 VALUES (0), (1), (2), (3), (4); + +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.commit() +$$; + +SELECT * FROM test1; + + +-- rollback inside cursor loop +TRUNCATE test1; + +DO LANGUAGE plpythonu $$ +for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"): + plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x']) + plpy.rollback() +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; +DROP TABLE test2; |
