Atomic Commit with 2 Phase Commit in FDW Distributed Setup

1. Introduction

PostgreSQL’s 2 phase commit (2PC) feature allows a database to store the details of a transaction on disk without committing it. This is done by issuing PREPARE TRANSACTION [name] command at the end of a transaction block. When the user is ready to commit, he/she can issue COMMIT PREPARED [name] where [name] should match the [name] in PREPARE TRANSACTION command. Because the transaction details are stored on disk with 2PC, the server is able to commit this transaction at a later time even if it crashes or out of service for some time. In a single database instance, the use of 2PC is not critical; the plain ‘commit’ can perform the job equally as well. However, in a larger setup, the data may be distributed on 2 or more database instances (for example, via Foreign Data Wrapper (FDW)), the use of 2PC is absolutely critical here to keep every database instance in sync.

2. Atomic Commit Problem with Foreign Data Wrapper (FDW)

Current postgres_fdw does not support the use of 2PC to commit foreign server. When a commit command is sent to the main server, it will send the same commit command to all of the foreign servers before processing the commit for itself. If one of the foreign node fails the commit, the main server will go through a abort process and will not commit itself due to the failure. However, some of the foreign nodes could already been successfully committed, resulting in a partially committed transaction.

Consider this diagram:

where the CN node fails the commit to DN3 and goes through a abort process, but at the same time, DN1 and DN2 have already been committed successfully and can no longer be rollbacked. This scenario creates a partial commit that may not be desirable.

3. FDW with 2PC Capability

If we were to add a 2PC functionality to current postgres_fdw, instead of sending the same commit to all foreign servers, we let the main server to send PREPARE TRANSACTION instead. The main server should proceed to send COMMIT PREPARE to all foreign servers Only when all of the foreign servers have successfully completed the PREPARE TRANSACTION. If one of them fails at the PREPARE stage, the main server is still able to ROLLBACK those foreign server who have successfully prepared.

Consider this diagram:

where the CN node fails the PREPARE TRANSACTION to DN3 and sends ROLLBACK PREPARED to DN1 and DN2 before going to the abort process. With the 2PC method, there will not be any partial commits.

3. Handling COMMIT PREPARED Failure?

If a foreign server fails at the PREPARE stage, it is not too late to rollback the rest of foreign servers that have succeeded the PREPARE, so the main server can still send ROLLBACK PREPARED to the foreign servers. However, if a foreign server fails at the COMMIT PREPARE stage, the other foreign servers who have succeeded it can no longer be rollbacked, potentially causing a partial commit as well.

In our implementation, we still allow the main server to continue with the commit even though a foreign server fails COMMIT PREPARED. In addition, we give user a warning about one of the foreign server may not have committed successfully, which leads to a potential partial commit. The foreign server with a failed COMMIT PREPARE will now have something called a “orphaned prepared transaction” that has yet to be committed.

Consider this diagram:

where DN3 fails a COMMIT PREPARED command and the CN node continues the commit with warning.

4. Handling Orphaned Prepared Transaction

Following the above example, if we perform a SELECT query, we will see that DN3 does not have an updated value while DN1 and DN2 have been updated. Also, DN3 still have the transaction prepared and stored in its disk. What’s left to do is to have somebody to login to DN3 and manually run a COMMIT PREPARED command. If that is done successfully, there will no longer be a partial commit.

The way we handle this is to make a orphaned prepared transaction detector at each foreign server and we introduce an intermediate and external global transaction manager (GTM) node that records all the FDW operations. Again, following the above example, when DN3 detects a orphaned prepare transaction, it will make a connection to the GTM node and check if this prepared transaction comes from a CN node. If it is, then we simply let DN3 do a self-commit of the prepared transaction automatically, without any human intervention. If GTM does not have a record, then this orphaned prepared transaction must be created manually by another DN3 user and it should not do anything to it except to just give a warning in the log file.

This is the general concept how we handle atomic commit and orphaned prepared transactions. There may be better and more complex solutions out there but for us, having an intermediate GTM node to coordinate all the operations between CN and DN nodes seems to be the simplest.

Implement Foreign Scan With FDW Interface API

1. Introduction

Recently I have been tasked to familiarize myself with the Foreign Data Wrapper (FDW) interface API to build a new FDW capable of doing vertical / columnar sharding, meaning that the FDW is capable of collecting column information from multiple sources and combine them together as a result query. I will document and blog about the vertical sharding in later posts. Now, in this blog, I would like to share some of the key findings and understanding of FDW interface related to foreign scan.

FDW foreign scan API routines have some similarities to the table access method API’s sequential scan routines. You can learn more about it in the blog post here. Most noticeable difference is that the FDW routines require us to take part during the planning stage, so we have to come out with a foreign plan for the exectuor so it knows what and how to perform the foreign scan.

In order to complete a foreign scan, the following FDW routines are invoked in order:

  • IsForeignScanParallelSafe
  • GetForeignRelSize
  • GetForeignPaths
  • GetForeignPlan
  • BeginForeignScan
  • IterateForeignScan
  • IterateForeignScan
  • EndForeignScan

2.1 IsForeignScanParallelSafe

In this routine, you have to tell PG whether or not the foreign scan can support parallel Scan by returning true to indicate it is supported, or false otherwise. Parallel scan is invokved by the planner when a scan query involves retrieving a large amount of data. Depending on the foreign server and your implementation, you have to decide whether or not parallel scan is supported.

2.2 GetForeignRelSize

Similar to the table access method’s relation_estimate_size() API, this routine requires you to provide an estimate of the amount of tuples/rows would be involved during this round of scanning. This information is very important because it directly affects the planner’s decision to execute the query. For example, if you say foreign relation has 1 million tuples to be fetched, then the planner is most likely to use parallel scan to complete the scan for performance reasons if you answer “true” in the IsForeignScanParallelSafe() function call.

2.3 GetForeignPaths

In this routine, we are required to provide a list of possible paths to complete the required foreign scan. Paths simply means ways to execute. For example, you can complete a foreign scan by doing a sequential scan by fetching 1 tuple at a time, or you can complete the same scan using index to look-up a target tuple if the table has an index defined and user gives specific WHERE clause to pinpoint a tuple. If user provides a WHERE clause to filter the result, you also have an option to estimate whether the WHERE can be executed by the foreign server or by the local server as it fetches a tuple.

All these are possible pathnodes are required by the planner to decide on the optimal path to execute the scan. Each pathnode requires you to provide the startup cost, total cost, number of rows and path keys if any. The official documentation suggests that you should use add_path and create_foreignscan_path routines to prepare a list of path nodes.

2.4 GetForeignPlan

Having provided all the possible pathnodes, planner will pick one that is the most optimal and pass that pathnode to this routine and ask you to make a foreign scan plan for the executor. This is a very important function because whatever plan that we provide here directly affects how the executor is going to function, so be cautious here.

In this function, the planner will give you everything about the query such as the target attribute list and the restricting clauses (WHERE clauses) and also the information about the foreign relation, such as its OID, and each column’s data types. Official PostgreSQL documentation suggests using make_foreignscan() routine with the desired arguments to create the foreign plan.

In here, you also will have an option to push down the restricting clauses or not. In other words, you can choose to send all the WHERE clauses to the foreign server to process if it can support it. This results in much less data communication. You can also choose having all the WHERE clauses to be processed locally, but this requires the FDW to fetch every single row from the foreign server, resulting in more data communication. This decision is controlled by the List *qpqual argument to the make_foreignscan() function. If this list contains the list of your restricting clauses, the local server will perform the WHERE evaluation locally; if this list is empty, then the local server assumes the FDW will do the WHERE evaluation on the foreign server

2.5 BeginForeignScan

This routine will be called before starting the foreign scan, giving you a chance to allocate all the necessary control information required for your foreign scan. You can define your own control information, for example, cursor location, remote connection structure, current offset…etc. and store in node->fdw_state and it will be passed down to the IterateForeignScan as the scan is begin completed.

2.6 IterateForeignScan

This is the main routine to perform the actual scan. You need to put the proper logic here depending on your use case to fetch one or more rows from the foreign server. When you have the row data, you will need to convert that to a TupleTableSlot strucutre in which the PG internals can understand. We normally use ExecStoreHeapTuple() routine to convert a HeapTuple into a TupleTableSlot. This routine will be continuously called by the PG executor as long as you still have data to fetch and it will only stop once you return an empty TupleTableSlot.

2.7 EndForeignScan

This routine will be called at the end of foreign scan, giving you the opportunity to clean up the control information allocated in BeginForeignScan(). This marks the end of foreign scan via FDW.

3. Summary

There is a lot of information and articals out there related to PostgreSQL’s FDW API interface. This blog is merely a quick summary of what I have learned during the FDW evaluation and there is much more to what’s discussed here. For more detail, you can refer to the official documentation about the FDW callback functons here

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now
