summaryrefslogtreecommitdiff
path: root/src/pl/plpython
diff options
context:
space:
mode:
authorPeter Eisentraut2018-01-22 13:30:16 +0000
committerPeter Eisentraut2018-01-22 13:43:06 +0000
commit8561e4840c81f7e345be2df170839846814fa004 (patch)
treea984a1022a84ea22f9c7a96c37f865f96212cad6 /src/pl/plpython
parentb9ff79b8f17697f3df492017d454caa9920a7183 (diff)
Transaction control in PL procedures
In each of the supplied procedural languages (PL/pgSQL, PL/Perl, PL/Python, PL/Tcl), add language-specific commit and rollback functions/commands to control transactions in procedures in that language. Add similar underlying functions to SPI. Some additional cleanup so that transaction commit or abort doesn't blow away data structures still used by the procedure call. Add execution context tracking to CALL and DO statements so that transaction control commands can only be issued in top-level procedure and block calls, not function calls or other procedure or block calls. - SPI Add a new function SPI_connect_ext() that is like SPI_connect() but allows passing option flags. The only option flag right now is SPI_OPT_NONATOMIC. A nonatomic SPI connection can execute transaction control commands, otherwise it's not allowed. This is meant to be passed down from CALL and DO statements which themselves know in which context they are called. A nonatomic SPI connection uses different memory management. A normal SPI connection allocates its memory in TopTransactionContext. For nonatomic connections we use PortalContext instead. As the comment in SPI_connect_ext() (previously SPI_connect()) indicates, one could potentially use PortalContext in all cases, but it seems safest to leave the existing uses alone, because this stuff is complicated enough already. SPI also gets new functions SPI_start_transaction(), SPI_commit(), and SPI_rollback(), which can be used by PLs to implement their transaction control logic. - portalmem.c Some adjustments were made in the code that cleans up portals at transaction abort. The portal code could already handle a command *committing* a transaction and continuing (e.g., VACUUM), but it was not quite prepared for a command *aborting* a transaction and continuing. In AtAbort_Portals(), remove the code that marks an active portal as failed. As the comment there already predicted, this doesn't work if the running command wants to keep running after transaction abort. And it's actually not necessary, because pquery.c is careful to run all portal code in a PG_TRY block and explicitly runs MarkPortalFailed() if there is an exception. So the code in AtAbort_Portals() is never used anyway. In AtAbort_Portals() and AtCleanup_Portals(), we need to be careful not to clean up active portals too much. This mirrors similar code in PreCommit_Portals(). - PL/Perl Gets new functions spi_commit() and spi_rollback() - PL/pgSQL Gets new commands COMMIT and ROLLBACK. Update the PL/SQL porting example in the documentation to reflect that transactions are now possible in procedures. - PL/Python Gets new functions plpy.commit and plpy.rollback. - PL/Tcl Gets new commands commit and rollback. Reviewed-by: Andrew Dunstan <andrew.dunstan@2ndquadrant.com>
Diffstat (limited to 'src/pl/plpython')
-rw-r--r--src/pl/plpython/Makefile1
-rw-r--r--src/pl/plpython/expected/plpython_test.out4
-rw-r--r--src/pl/plpython/expected/plpython_transaction.out135
-rw-r--r--src/pl/plpython/plpy_main.c21
-rw-r--r--src/pl/plpython/plpy_plpymodule.c49
-rw-r--r--src/pl/plpython/sql/plpython_transaction.sql115
6 files changed, 317 insertions, 8 deletions
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index cc91afebde7..d09910835d1 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -90,6 +90,7 @@ REGRESS = \
plpython_quote \
plpython_composite \
plpython_subtransaction \
+ plpython_transaction \
plpython_drop
REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index 847e4cc412e..39b994f4468 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -48,6 +48,7 @@ select module_contents();
Error
Fatal
SPIError
+ commit
cursor
debug
error
@@ -60,10 +61,11 @@ select module_contents();
quote_ident
quote_literal
quote_nullable
+ rollback
spiexceptions
subtransaction
warning
-(18 rows)
+(20 rows)
CREATE FUNCTION elog_test_basic() RETURNS void
AS $$
diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out
new file mode 100644
index 00000000000..1fadc69b636
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_transaction.out
@@ -0,0 +1,135 @@
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b
+---+---
+ 0 |
+ 2 |
+ 4 |
+ 6 |
+ 8 |
+(5 rows)
+
+TRUNCATE test1;
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+$$;
+SELECT * FROM test1;
+ a | b
+---+---
+ 0 |
+ 2 |
+ 4 |
+ 6 |
+ 8 |
+(5 rows)
+
+TRUNCATE test1;
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+return 1
+$$;
+SELECT transaction_test2();
+ERROR: invalid transaction termination
+CONTEXT: PL/Python function "transaction_test2"
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("CALL transaction_test1()")
+return 1
+$$;
+SELECT transaction_test3();
+ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "transaction_test3", line 2, in <module>
+ plpy.execute("CALL transaction_test1()")
+PL/Python function "transaction_test3"
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- DO block inside function
+CREATE FUNCTION transaction_test4() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$")
+return 1
+$$;
+SELECT transaction_test4();
+ERROR: spiexceptions.InvalidTransactionTermination: invalid transaction termination
+CONTEXT: Traceback (most recent call last):
+ PL/Python function "transaction_test4", line 2, in <module>
+ plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$")
+PL/Python function "transaction_test4"
+-- commit inside subtransaction (prohibited)
+DO LANGUAGE plpythonu $$
+with plpy.subtransaction():
+ plpy.commit()
+$$;
+WARNING: forcibly aborting a subtransaction that has not been exited
+ERROR: cannot commit while a subtransaction is active
+CONTEXT: PL/Python anonymous code block
+-- commit inside cursor loop
+CREATE TABLE test2 (x int);
+INSERT INTO test2 VALUES (0), (1), (2), (3), (4);
+TRUNCATE test1;
+DO LANGUAGE plpythonu $$
+for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
+ plpy.commit()
+$$;
+ERROR: cannot commit transaction while a cursor is open
+CONTEXT: PL/Python anonymous code block
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+-- rollback inside cursor loop
+TRUNCATE test1;
+DO LANGUAGE plpythonu $$
+for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
+ plpy.rollback()
+$$;
+ERROR: cannot abort transaction while a cursor is open
+CONTEXT: PL/Python anonymous code block
+SELECT * FROM test1;
+ a | b
+---+---
+(0 rows)
+
+DROP TABLE test1;
+DROP TABLE test2;
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 695de305838..5a197ce27ac 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -60,7 +60,7 @@ static void plpython_error_callback(void *arg);
static void plpython_inline_error_callback(void *arg);
static void PLy_init_interp(void);
-static PLyExecutionContext *PLy_push_execution_context(void);
+static PLyExecutionContext *PLy_push_execution_context(bool atomic_context);
static void PLy_pop_execution_context(void);
/* static state for Python library conflict detection */
@@ -219,14 +219,19 @@ plpython2_validator(PG_FUNCTION_ARGS)
Datum
plpython_call_handler(PG_FUNCTION_ARGS)
{
+ bool nonatomic;
Datum retval;
PLyExecutionContext *exec_ctx;
ErrorContextCallback plerrcontext;
PLy_initialize();
+ nonatomic = fcinfo->context &&
+ IsA(fcinfo->context, CallContext) &&
+ !castNode(CallContext, fcinfo->context)->atomic;
+
/* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */
- if (SPI_connect() != SPI_OK_CONNECT)
+ if (SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0) != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/*
@@ -235,7 +240,7 @@ plpython_call_handler(PG_FUNCTION_ARGS)
* here and the PG_TRY. (plpython_error_callback expects the stack entry
* to be there, so we have to make the context first.)
*/
- exec_ctx = PLy_push_execution_context();
+ exec_ctx = PLy_push_execution_context(!nonatomic);
/*
* Setup error traceback support for ereport()
@@ -303,7 +308,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS)
PLy_initialize();
/* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */
- if (SPI_connect() != SPI_OK_CONNECT)
+ if (SPI_connect_ext(codeblock->atomic ? 0 : SPI_OPT_NONATOMIC) != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
MemSet(&fake_fcinfo, 0, sizeof(fake_fcinfo));
@@ -332,7 +337,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS)
* need the stack entry, but for consistency with plpython_call_handler we
* do it in this order.)
*/
- exec_ctx = PLy_push_execution_context();
+ exec_ctx = PLy_push_execution_context(codeblock->atomic);
/*
* Setup error traceback support for ereport()
@@ -430,12 +435,14 @@ PLy_get_scratch_context(PLyExecutionContext *context)
}
static PLyExecutionContext *
-PLy_push_execution_context(void)
+PLy_push_execution_context(bool atomic_context)
{
PLyExecutionContext *context;
+ /* Pick a memory context similar to what SPI uses. */
context = (PLyExecutionContext *)
- MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext));
+ MemoryContextAlloc(atomic_context ? TopTransactionContext : PortalContext,
+ sizeof(PLyExecutionContext));
context->curr_proc = NULL;
context->scratch_ctx = NULL;
context->next = PLy_execution_contexts;
diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c
index 23f99e20ca3..3d7dd13f0cf 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -6,8 +6,10 @@
#include "postgres.h"
+#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "utils/builtins.h"
+#include "utils/snapmgr.h"
#include "plpython.h"
@@ -15,6 +17,7 @@
#include "plpy_cursorobject.h"
#include "plpy_elog.h"
+#include "plpy_main.h"
#include "plpy_planobject.h"
#include "plpy_resultobject.h"
#include "plpy_spi.h"
@@ -41,6 +44,8 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw);
static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
static PyObject *PLy_quote_ident(PyObject *self, PyObject *args);
+static PyObject *PLy_commit(PyObject *self, PyObject *args);
+static PyObject *PLy_rollback(PyObject *self, PyObject *args);
/* A list of all known exceptions, generated from backend/utils/errcodes.txt */
@@ -95,6 +100,12 @@ static PyMethodDef PLy_methods[] = {
*/
{"cursor", PLy_cursor, METH_VARARGS, NULL},
+ /*
+ * transaction control
+ */
+ {"commit", PLy_commit, METH_NOARGS, NULL},
+ {"rollback", PLy_rollback, METH_NOARGS, NULL},
+
{NULL, NULL, 0, NULL}
};
@@ -577,3 +588,41 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw)
*/
Py_RETURN_NONE;
}
+
+static PyObject *
+PLy_commit(PyObject *self, PyObject *args)
+{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+
+ if (ThereArePinnedPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot commit transaction while a cursor is open")));
+
+ SPI_commit();
+ SPI_start_transaction();
+
+ /* was cleared at transaction end, reset pointer */
+ exec_ctx->scratch_ctx = NULL;
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+PLy_rollback(PyObject *self, PyObject *args)
+{
+ PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+
+ if (ThereArePinnedPortals())
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
+ errmsg("cannot abort transaction while a cursor is open")));
+
+ SPI_rollback();
+ SPI_start_transaction();
+
+ /* was cleared at transaction end, reset pointer */
+ exec_ctx->scratch_ctx = NULL;
+
+ Py_RETURN_NONE;
+}
diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql
new file mode 100644
index 00000000000..36c7b2ef385
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_transaction.sql
@@ -0,0 +1,115 @@
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+$$;
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+ if i % 2 == 0:
+ plpy.commit()
+ else:
+ plpy.rollback()
+return 1
+$$;
+
+SELECT transaction_test2();
+
+SELECT * FROM test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("CALL transaction_test1()")
+return 1
+$$;
+
+SELECT transaction_test3();
+
+SELECT * FROM test1;
+
+
+-- DO block inside function
+CREATE FUNCTION transaction_test4() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("DO LANGUAGE plpythonu $x$ plpy.commit() $x$")
+return 1
+$$;
+
+SELECT transaction_test4();
+
+
+-- commit inside subtransaction (prohibited)
+DO LANGUAGE plpythonu $$
+with plpy.subtransaction():
+ plpy.commit()
+$$;
+
+
+-- commit inside cursor loop
+CREATE TABLE test2 (x int);
+INSERT INTO test2 VALUES (0), (1), (2), (3), (4);
+
+TRUNCATE test1;
+
+DO LANGUAGE plpythonu $$
+for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
+ plpy.commit()
+$$;
+
+SELECT * FROM test1;
+
+
+-- rollback inside cursor loop
+TRUNCATE test1;
+
+DO LANGUAGE plpythonu $$
+for row in plpy.cursor("SELECT * FROM test2 ORDER BY x"):
+ plpy.execute("INSERT INTO test1 (a) VALUES (%s)" % row['x'])
+ plpy.rollback()
+$$;
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
+DROP TABLE test2;