Overview of PostgreSQL Internals Author This chapter originated as part of , Stefan Simkovics' Master's Thesis prepared at Vienna University of Technology under the direction of O.Univ.Prof.Dr. Georg Gottlob and Univ.Ass. Mag. Katrin Seyr. This chapter gives an overview of the internal structure of the backend of PostgreSQL. After having read the following sections you should have an idea of how a query is processed. This chapter does not aim to provide a detailed description of the internal operation of PostgreSQL, as such a document would be very extensive. Rather, this chapter is intended to help the reader understand the general sequence of operations that occur within the backend from the point at which a query is received, to the point at which the results are returned to the client. The Path of a Query Here we give a short overview of the stages a query has to pass in order to obtain a result. A connection from an application program to the PostgreSQL server has to be established. The application program transmits a query to the server and waits to receive the results sent back by the server. The parser stage checks the query transmitted by the application program for correct syntax and creates a query tree. The rewrite system takes the query tree created by the parser stage and looks for any rules (stored in the system catalogs) to apply to the query tree. It performs the transformations given in the rule bodies. One application of the rewrite system is in the realization of views. Whenever a query against a view (i.e., a virtual table) is made, the rewrite system rewrites the user's query to a query that accesses the base tables given in the view definition instead. The planner/optimizer takes the (rewritten) query tree and creates a query plan that will be the input to the executor. It does so by first creating all possible paths leading to the same result. For example if there is an index on a relation to be scanned, there are two paths for the scan. One possibility is a simple sequential scan and the other possibility is to use the index. Next the cost for the execution of each path is estimated and the cheapest path is chosen. The cheapest path is expanded into a complete plan that the executor can use. The executor recursively steps through the plan tree and retrieves rows in the way represented by the plan. The executor makes use of the storage system while scanning relations, performs sorts and joins, evaluates qualifications and finally hands back the rows derived. In the following sections we will cover each of the above listed items in more detail to give a better understanding of PostgreSQL's internal control and data structures. How Connections are Established PostgreSQL is implemented using a simple process per user client/server model. In this model there is one client process connected to exactly one server process. As we do not know ahead of time how many connections will be made, we have to use a master process that spawns a new server process every time a connection is requested. This master process is called postgres and listens at a specified TCP/IP port for incoming connections. Whenever a request for a connection is detected the postgres process spawns a new server process. The server tasks communicate with each other using semaphores and shared memory to ensure data integrity throughout concurrent data access. The client process can be any program that understands the PostgreSQL protocol described in . Many clients are based on the C-language library libpq, but several independent implementations of the protocol exist, such as the Java JDBC driver. Once a connection is established the client process can send a query to the backend (server). The query is transmitted using plain text, i.e., there is no parsing done in the frontend (client). The server parses the query, creates an execution plan, executes the plan and returns the retrieved rows to the client by transmitting them over the established connection. The Parser Stage The parser stage consists of two parts: The parser defined in gram.y and scan.l is built using the Unix tools bison and flex. The transformation process does modifications and augmentations to the data structures returned by the parser. Parser The parser has to check the query string (which arrives as plain text) for valid syntax. If the syntax is correct a parse tree is built up and handed back; otherwise an error is returned. The parser and lexer are implemented using the well-known Unix tools bison and flex. The lexer is defined in the file scan.l and is responsible for recognizing identifiers, the SQL key words etc. For every key word or identifier that is found, a token is generated and handed to the parser. The parser is defined in the file gram.y and consists of a set of grammar rules and actions that are executed whenever a rule is fired. The code of the actions (which is actually C code) is used to build up the parse tree. The file scan.l is transformed to the C source file scan.c using the program flex and gram.y is transformed to gram.c using bison. After these transformations have taken place a normal C compiler can be used to create the parser. Never make any changes to the generated C files as they will be overwritten the next time flex or bison is called. The mentioned transformations and compilations are normally done automatically using the makefiles shipped with the PostgreSQL source distribution. A detailed description of bison or the grammar rules given in gram.y would be beyond the scope of this paper. There are many books and documents dealing with flex and bison. You should be familiar with bison before you start to study the grammar given in gram.y otherwise you won't understand what happens there. Transformation Process The parser stage creates a parse tree using only fixed rules about the syntactic structure of SQL. It does not make any lookups in the system catalogs, so there is no possibility to understand the detailed semantics of the requested operations. After the parser completes, the transformation process takes the tree handed back by the parser as input and does the semantic interpretation needed to understand which tables, functions, and operators are referenced by the query. The data structure that is built to represent this information is called the query tree. The reason for separating raw parsing from semantic analysis is that system catalog lookups can only be done within a transaction, and we do not wish to start a transaction immediately upon receiving a query string. The raw parsing stage is sufficient to identify the transaction control commands (BEGIN, ROLLBACK, etc), and these can then be correctly executed without any further analysis. Once we know that we are dealing with an actual query (such as SELECT or UPDATE), it is okay to start a transaction if we're not already in one. Only then can the transformation process be invoked. The query tree created by the transformation process is structurally similar to the raw parse tree in most places, but it has many differences in detail. For example, a FuncCall node in the parse tree represents something that looks syntactically like a function call. This might be transformed to either a FuncExpr or Aggref node depending on whether the referenced name turns out to be an ordinary function or an aggregate function. Also, information about the actual data types of columns and expression results is added to the query tree. The <productname>PostgreSQL</productname> Rule System PostgreSQL supports a powerful rule system for the specification of views and ambiguous view updates. Originally the PostgreSQL rule system consisted of two implementations: The first one worked using row level processing and was implemented deep in the executor. The rule system was called whenever an individual row had been accessed. This implementation was removed in 1995 when the last official release of the Berkeley Postgres project was transformed into Postgres95. The second implementation of the rule system is a technique called query rewriting. The rewrite system is a module that exists between the parser stage and the planner/optimizer. This technique is still implemented. The query rewriter is discussed in some detail in , so there is no need to cover it here. We will only point out that both the input and the output of the rewriter are query trees, that is, there is no change in the representation or level of semantic detail in the trees. Rewriting can be thought of as a form of macro expansion. Planner/Optimizer The task of the planner/optimizer is to create an optimal execution plan. A given SQL query (and hence, a query tree) can be actually executed in a wide variety of different ways, each of which will produce the same set of results. If it is computationally feasible, the query optimizer will examine each of these possible execution plans, ultimately selecting the execution plan that is expected to run the fastest. In some situations, examining each possible way in which a query can be executed would take an excessive amount of time and memory space. In particular, this occurs when executing queries involving large numbers of join operations. In order to determine a reasonable (not necessarily optimal) query plan in a reasonable amount of time, PostgreSQL uses a Genetic Query Optimizer (see ) when the number of joins exceeds a threshold (see ). The planner's search procedure actually works with data structures called paths, which are simply cut-down representations of plans containing only as much information as the planner needs to make its decisions. After the cheapest path is determined, a full-fledged plan tree is built to pass to the executor. This represents the desired execution plan in sufficient detail for the executor to run it. In the rest of this section we'll ignore the distinction between paths and plans. Generating Possible Plans The planner/optimizer starts by generating plans for scanning each individual relation (table) used in the query. The possible plans are determined by the available indexes on each relation. There is always the possibility of performing a sequential scan on a relation, so a sequential scan plan is always created. Assume an index is defined on a relation (for example a B-tree index) and a query contains the restriction relation.attribute OPR constant. If relation.attribute happens to match the key of the B-tree index and OPR is one of the operators listed in the index's operator class, another plan is created using the B-tree index to scan the relation. If there are further indexes present and the restrictions in the query happen to match a key of an index, further plans will be considered. Index scan plans are also generated for indexes that have a sort ordering that can match the query's ORDER BY clause (if any), or a sort ordering that might be useful for merge joining (see below). If the query requires joining two or more relations, plans for joining relations are considered after all feasible plans have been found for scanning single relations. The three available join strategies are: nested loop join: The right relation is scanned once for every row found in the left relation. This strategy is easy to implement but can be very time consuming. (However, if the right relation can be scanned with an index scan, this can be a good strategy. It is possible to use values from the current row of the left relation as keys for the index scan of the right.) merge join: Each relation is sorted on the join attributes before the join starts. Then the two relations are scanned in parallel, and matching rows are combined to form join rows. This kind of join is more attractive because each relation has to be scanned only once. The required sorting might be achieved either by an explicit sort step, or by scanning the relation in the proper order using an index on the join key. hash join: the right relation is first scanned and loaded into a hash table, using its join attributes as hash keys. Next the left relation is scanned and the appropriate values of every row found are used as hash keys to locate the matching rows in the table. When the query involves more than two relations, the final result must be built up by a tree of join steps, each with two inputs. The planner examines different possible join sequences to find the cheapest one. If the query uses fewer than relations, a near-exhaustive search is conducted to find the best join sequence. The planner preferentially considers joins between any two relations for which there exist a corresponding join clause in the WHERE qualification (i.e., for which a restriction like where rel1.attr1=rel2.attr2 exists). Join pairs with no join clause are considered only when there is no other choice, that is, a particular relation has no available join clauses to any other relation. All possible plans are generated for every join pair considered by the planner, and the one that is (estimated to be) the cheapest is chosen. When geqo_threshold is exceeded, the join sequences considered are determined by heuristics, as described in . Otherwise the process is the same. The finished plan tree consists of sequential or index scans of the base relations, plus nested-loop, merge, or hash join nodes as needed, plus any auxiliary steps needed, such as sort nodes or aggregate-function calculation nodes. Most of these plan node types have the additional ability to do selection (discarding rows that do not meet a specified Boolean condition) and projection (computation of a derived column set based on given column values, that is, evaluation of scalar expressions where needed). One of the responsibilities of the planner is to attach selection conditions from the WHERE clause and computation of required output expressions to the most appropriate nodes of the plan tree. Executor The executor takes the plan created by the planner/optimizer and recursively processes it to extract the required set of rows. This is essentially a demand-pull pipeline mechanism. Each time a plan node is called, it must deliver one more row, or report that it is done delivering rows. To provide a concrete example, assume that the top node is a MergeJoin node. Before any merge can be done two rows have to be fetched (one from each subplan). So the executor recursively calls itself to process the subplans (it starts with the subplan attached to lefttree). The new top node (the top node of the left subplan) is, let's say, a Sort node and again recursion is needed to obtain an input row. The child node of the Sort might be a SeqScan node, representing actual reading of a table. Execution of this node causes the executor to fetch a row from the table and return it up to the calling node. The Sort node will repeatedly call its child to obtain all the rows to be sorted. When the input is exhausted (as indicated by the child node returning a NULL instead of a row), the Sort code performs the sort, and finally is able to return its first output row, namely the first one in sorted order. It keeps the remaining rows stored so that it can deliver them in sorted order in response to later demands. The MergeJoin node similarly demands the first row from its right subplan. Then it compares the two rows to see if they can be joined; if so, it returns a join row to its caller. On the next call, or immediately if it cannot join the current pair of inputs, it advances to the next row of one table or the other (depending on how the comparison came out), and again checks for a match. Eventually, one subplan or the other is exhausted, and the MergeJoin node returns NULL to indicate that no more join rows can be formed. Complex queries can involve many levels of plan nodes, but the general approach is the same: each node computes and returns its next output row each time it is called. Each node is also responsible for applying any selection or projection expressions that were assigned to it by the planner. The executor mechanism is used to evaluate all four basic SQL query types: SELECT, INSERT, UPDATE, and DELETE. For SELECT, the top-level executor code only needs to send each row returned by the query plan tree off to the client. For INSERT, each returned row is inserted into the target table specified for the INSERT. This is done in a special top-level plan node called ModifyTable. (A simple INSERT ... VALUES command creates a trivial plan tree consisting of a single Result node, which computes just one result row, and ModifyTable above it to perform the insertion. But INSERT ... SELECT can demand the full power of the executor mechanism.) For UPDATE, the planner arranges that each computed row includes all the updated column values, plus the TID (tuple ID, or row ID) of the original target row; this data is fed into a ModifyTable node, which uses the information to create a new updated row and mark the old row deleted. For DELETE, the only column that is actually returned by the plan is the TID, and the ModifyTable node simply uses the TID to visit each target row and mark it deleted. Overview of <productname>Postgres-XL</productname> Internals This chapter gives an overview of the internal structure of Postgres-XL. <productname>Postgres-XL</productname> Components As described in , Postgres-XL is a database cluster which consists of multiple database servers based upon PostgreSQL. Postgres-XL provides global transparent transaction management to all the database servers involved and provide both read and write scalability. To achieve these features, Postgres-XL is composed of three major components as follows: GTM GTM stands for Global Transaction Manager. It provides global transaction IDs and snapshots for each transaction in the Postgres-XL database cluster. It also provide several global values such as sequences and global timestamps. To improve scalability itself, each server hardware or virtual machine may have GTM-Proxy. GTM-Proxy groups commands and response from/to GTM to reduce number of interaction and the amount of data which GTM reads and writes. Coordinator Coordinator is an entry point for Postgres-XL from applications. You can configure more than one Coordinators in the same Postgres-XL. With the help of GTM, they provide transparent concurrency and integrity of transactions globally. Applications can choose any Coordinator to connect to. Any Coordinator provides the same view of the database. Datanode Datanode stores user data. As described in and , more than one Datanodes can be configured. Each table can be replicated or distributed among Datanodes. A table is distributed, you can choose a column as the distribute key, whose value is used to determine which Datanode each row should be stored. GTM and Global Transaction Management Review of <productname>PostgreSQL</productname> Transaction Management Internals In PostgreSQL, each transaction is given unique ID called transaction ID (or XID). XID is given in ascending order to distinguish which transaction is older/newer. More precisely, XID is 32bit integer. When XID reaches the max value, it wraps around to the lowest value (3, as to the latest definition). PostgreSQL has a means to handle this, as well as Postgres-XL. For simplicity, it will not be described in this document. When a transaction tries to read a tuple, This description is somewhat simplified for explanation. You will find the precise rule in tqual.c file in PostgreSQL's source code. each tuple has a set of XIDs to indicate transactions which created and deleted the tuple. So if the target tuple is created by an active transaction, it is not committed or aborted and the transaction should ignore such tuple. In such way (in practice, this is done by versup module in PostgreSQL core), if we give each transaction a unique transaction Id throughout the system and maintain snapshot what transaction is active, not only in a single server but transaction in all the servers, we can maintain global consistent visibility of each tuple even when a server accepts new statement from other transactions running on the other server. These information is stored in "xmin" and "xmax" fields of each row of table. When we INSERT rows, XID of inserting transaction is recorded at xmin field. When we update rows of tables (with UPDATE or DELETE statement), PostgreSQL does not simply overwrite the old rows. Instead, PostgreSQL "marks" the old rows as "deleted" by writing updating transaction's XID to xmax field. In the case of UPDATE (just like INSERT), new rows are created whose xmin field is "marked" with XIDs of the creating transaction. These "xmin" and "xmax" are used to determine which row is visible to a transaction. To do this, PostgreSQL needs a data to indicate what transactions are running, which is called the "snapshot". If the creating transaction is not running, visibility of each row depends upon the fact if the creating transaction was committed or aborted. Suppose a row of a table which was created by some transaction and is not deleted yet. If the creating transaction is running, such row is visible to the transaction which created the row, but not visible to other transactions. If the creating transaction is not running and was committed the row is visible. If the transaction was aborted, this row is not visible. Therefore, PostgreSQL needs two kinds of information to determine "which transaction is running" and "if an old transaction was committed or aborted." The former information is obtained as "snapshot." PostgreSQL maintains the latter information as "CLOG." PostgreSQL uses all these information to determine which row is visible to a given transaction. Making Transaction Management Global In Postgres-XL, the following features of transaction management and visibility checking extracted out from the nodes and pulled into the GTM. Assigning XID globally to transactions (GXID, Global Transaction ID). This can be done globally to identify each Transactions in the system. Providing snapshots. GTM collects all the transaction's status (running, committed, aborted etc.) to provide snapshots globally (global snapshot). Please note that each global snapshot includes GXID initiated by other Coordinators or Datanodes. This is needed because some older transaction may visit new server after a while. In this case, if GXID of such a transaction is not included in the snapshot, this transaction may be regarded as "old enough" and uncommitted rows may be read. If GXID of such transaction is included in the snapshot from the beginning, such inconsistency does not take place. To do this, Postgres-XL introduced a dedicated component called GTM (Global Transaction Manager). GTM runs on one of the servers and provides unique and ordered transaction id to each transaction running on Postgres-XL servers. Because this is a globally unique ID, we call this GXID (Global Transaction Id). GTM receives GXID request from transactions and provide GXID. It also keeps track of all the transactions when it started and finished to generate snapshots used to control each tuple visibility. Because snapshots here is also a global property, it is called Global Snapshot. As long as each transaction runs with a GXID and a Global Snapshot, it can maintain consistent visibility throughout the system and it is safe to run transactions in parallel in any servers. On the other hand, a transaction, composed of multiple statements, can be executed using multiple servers maintaining database consistency. GTM provides Global Transaction Id to each transaction and keeps track of the status of all the transactions, whether it is running, committed or aborted, to calculate global snapshots to maintain tuple visibility. For this purpose, each transaction reports when it starts and ends, as well as when it issues PREPARE command in two-phase commit protocol. Each transaction requests snapshots according to the transaction isolation level as done in PostgreSQL. If the transaction isolation level is "read committed", then transaction will request a snapshot for each statement. If it is "serializable" transaction will request a snapshot at the beginning of transaction and reuse it thought the transaction. Improving GTM Performance Because GTM can be regarded as "serializing" all the transaction processing, people may think that GTM can be a performance bottleneck. In fact, GTM can limit the whole scalability. GTM should not be used in very slow network environment such as wide area network. GTM architecture is intended to be used with Gigabit local network. It is encouraged to install Postgres-XL with a local Gigabit network with minimum latency, that is, use as few switches involved in the connection among GTM, Coordinator and Datanodes. In addition, consider putting all components on their own subnet if you have multiple network ports in the systems. Primitive GTM Implementation Primitive GTM implementation can be done as follows: The Coordinator backend is provided with a GTM client library to obtain GXID and snapshots and to report the transaction status. GTM opens a port to accept connections from each Coordinator and Datanode backend. When GTM accepts a connection, it creates a thread (GTM Thread) to handle requests to GTM from the connected Coordinator backend. GTM Thread receives each request, records it and sends GXID, snapshot and other response to the Coordinator backend. They are repeated until the Coordinator backend requests disconnect. GTM Proxy Implementation Each transaction is issuing requests to GTM frequently. We can collect them into single block of requests in each Coordinator to reduce the amount of interaction by using a GTM-Proxy. In this configuration, each Coordinator and Datanode backend does not connect to GTM directly. Instead, we have GTM Proxy between GTM and Coordinator backend to group multiple requests and responses. GTM Proxy, like GTM explained in the previous sections, accepts connections from the Coordinator backend. However, it does not create new thread. The following paragraphs explains how GTM Proxy is initialized and how it handles requests from Coordinator backends. GTM Proxy, as well as GTM, is initialized as follows: GTM starts up normally, but now can accept connections from GTM proxies. GTM Proxy starts up. GTM Proxy creates GTM Proxy Threads. Each GTM Proxy Thread connects to the GTM in advance. The number of GTM Proxy Threads can be specified at the startup. A typical number of threads is one or two so it can save the number of connections between GTM and Coordinators. GTM Main Thread waits for the request connection from each backend. When each Coordinator backend requests for connection, the Proxy Main Thread assigns a GTM Proxy Thread to handle request. Therefore, one GTM Proxy Thread handles multiple Coordinator backends. If a Coordinator has one hundred Coordinator backends and one GTM Proxy Thread, this thread takes care of one hundred Coordinator backend. Then GTM Proxy Thread scans all the requests from Coordinator backend. If Coordinator is busy, it is expected to capture more requests in a single scan. Therefore, the proxy can group many requests into single block of requests, to reduce the number of interaction between GTM and the Coordinator. Furthermore, in a single scan, we may have multiple request for snapshots. Because these requests can be regarded as received at the same time, we can represent multiple snapshots with single one. This will reduce the amount of data which GTM provides. Coordinator Coordinator handles SQL statements from applications and determines which Datanode should be involved and generates local SQL statements for each Datanode. In the most simplest case, if a single Datanode is involved, the Coordinator simply proxies incoming statements to the Datanode. In more complicated cases, for example, if the target Datanode cannot be determined, then the Coordinator generates local statements for each Datanode, collects the result to materialize at the Coordinator for further handling. In this case, the Coordinator will try to optimize the plan by Pushdown WHERE clause to Datanodes, Pushdown joins to Datanodes, Pushdown projection (column list in SELECT clause), Pushdown ORDER BY clause, as well as other clauses. If a transaction is involved by more than one Datanodes and/or Coordinators, the Coordinator will handle the transaction with two-phase commit protocol internally. In the case of aggregate functions, Postgres-XL introduced new function collection function between existing transition function and finalize function. Collection function runs on the Coordinator to collect all the intermediate results from involved Datanodes. For details, see and . In the case of reading replicated tables, the Coordinator can choose any Datanode to read. The most efficient way is to select one running in the same hardware or virtual machine. This is called preferred Datanode and can be specified by a GUC local to each Coordinator. On the other hand, in the case of writing replicated tables, all the Coordinators choose the same Datanode to begin with to avoid update conflicts. This is called primary Datanode. Coordinators also take care of DDL statements. Because DDL statements handles system catalogs, which are replicated in all the Coordinators and Datanodes, they are proxied to all the Coordinators and Datanodes. To synchronize the catalog update in all the nodes, the Coordinator handles DDL with two-phase commit protocol internally. Datanode While Coordinators handle cluster-wide SQL statements, Datanodes take care of just local issues. In this sense, Datanodes are essentially PostgreSQL servers except that transaction management information is obtained from GTM, as well as other global value. Coordinator And Datanode Connection The number of connections between Coordinators and Datanodes may increase from time to time. This may leave unused connection and waste system resources. Repeating real connect and disconnect requires Datanode backend initialization which increases latency and also wastes system resources. For example, as in the case of GTM, if each Coordinator has one hundred connections to applications and we have ten Coordinators, after a while, each Coordinator may have connection to each data node. It means that each Coordinator backend has ten connections to Coordinators and each Coordinator has one thousand (10 x 10) connections to Coordinators. Because we consume much more resources for locks and other control information per backend and only a few of such connection is active at a given time, it is not a good idea to hold such unused connections between Coordinator and Datanode. To improve this, Postgres-XL is equipped with connection pooler between Coordinator and Datanode. When a Coordinator backend requires connection to a Datanode, the pooler looks for appropriate connection from the pool. If there's an available one, the pooler assigns it to the Coordinator backend. When the connection is no longer needed, the Coordinator backend returns the connection to the pooler. The pooler does not disconnect the connection. It keeps the connection to the pool for later reuse, keeping Datanode backend running.