In te current concept, we are going to combine Foreign tables inheritance with the postgres_fdw extension, both being already available features since 9.5 version.
Coss-node partitioning allows a better data locality and more scalable model than keeping local partitions. Being said, the data will be split into several nodes and organized using a particular key, which will determine in which shard data will be allocated. FOr the current POC, we are going to specify the sharKey, which is a simple char(2) type.
Until today, the only way yo perform findings over this method, was a from the application layer, by issuing queries directly to the nodes by keeping certain deterministic way as {1} or using a catalog table as {2} (NOTE: the bellow examples are using pseudo code)
[1]
query = "SELECT name,lastname FROM " + relation + partition " WHERE " id =" + person_id
[2]
shard = query("SELECT shard FROM catalog WHERE key = " + person_id)
query = "SELECT name,lastname FROM " + relation + shard " WHERE " id =" + person_id
As foreign tables(FT) does not hold any data, it is possible to keep copies arround all the databases involved and also in separated instances if this is necessary.
All the operations against the table will be done through the parent table of the FT tree tables and Postgres itself will determine the destination FT using the constraint exclusion feature, which will be detailed further.
For HA, you are limited on the data nodes to implement any other replication solution available in the core version. To be fair, 9.6 supports streaming replication and logical decoding, which is used by the pglogical tool for providing advanced logical replication per table basis.
![TPS][2]
Foreign tables do not contain data by itselves and the only reference to a external table on a different Postgres database. There are plenty of different extensions allowing external tables on different data sotre solutions, but in this particular article we are going to focus on postgres_fdw as we want to explore more about condition pushdows, which makes queries against these tables more performant on more complex queries.
A more extensive benchmark can be found at my next article.
The framework underlying for the Foreign Data Wrappers, support both reads and write operation. postgres_fdw is not the exception and does also support condition pushdown for avoiding large scans on the source tables.
On each database holding the FT, you need to invoke the extension creation:
CREATE EXTENSION postgres_fdw;
FT have two main elements, necessary to point correctly both in source as in user privileges. If you are paranoic enough, you’ll prefer to use unprivileged users with limited grants over the table you use.
{1}
CREATE SERVER shard1_main FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '127.0.0.1',port '5434', dbname 'shard1');
CREATE SERVER shard2_main FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '127.0.0.1',port '5435', dbname 'shard2');
-- Slaves
CREATE SERVER shard1_main_replica FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '127.0.0.1',port '7777',dbname 'shard1');
CREATE SERVER shard2_main_replica FOREIGN DATA WRAPPER postgres_fdw
OPTIONS(host '127.0.0.1',port '8888',dbname 'shard2');
{2}
-- User mapping
CREATE USER MAPPING FOR postgres SERVER shard1_main OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER shard2_main OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER shard1_main_replica OPTIONS(user 'postgres');
CREATE USER MAPPING FOR postgres SERVER shard2_main_replica OPTIONS(user 'postgres');
The FT definition is indeed pretty straightforward if we don’t want to do any further column filtering:
CREATE TABLE main (shardKey char(2), key bigint, avalue text);
CREATE FOREIGN TABLE main_shard01
(CHECK (shardKey = '01'))
INHERITS (main)
SERVER shard1_main;
CREATE FOREIGN TABLE main_shard02
(CHECK (shardKey = '02'))
INHERITS (main)
SERVER shard2_main;
Even if i don’t recommend the following approach, it can be very easy to centralize the writes to the shards through the FT. Allthough, it requires to code a trigger for managing this. Currently, the minimum transaction level for foreign tables is REPEATABLE READ, but it will probably change in future versions.
A very simplistic approach for an INSERT trigger will be like below:
CREATE OR REPLACE FUNCTION f_main_part() RETURNS TRIGGER AS
$FMAINPART$
DECLARE
partition_name text;
BEGIN
partition_name := 'main_shard' || NEW.shardKey;
EXECUTE 'INSERT INTO' || quote_ident(partition_name) || ' SELECT ($1).*' USING NEW;
RETURN NULL;
END;
$FMAINPART$ LANGUAGE plpgsql;
CREATE TRIGGER t_main BEFORE INSERT
ON main
FOR EACH ROW EXECUTE PROCEDURE f_main_part();
As shards contain data, the declaration ends up to be a common table within the necessary suffix for localization:
CREATE TABLE main_shard01( shardKey char(2),
key bigint,
avalue text,
CHECk(shardKey='01'));
CREATE INDEX ON main_shard01(key);
A simple test could be done by issuing:
proxy=# INSERT INTO main
SELECT '0' || round(random()*1+1),i.i,random()::text
FROM generate_series(1,20000) i(i);
INSERT 0 0
YOu probably are intuiting that the above the statement inserts data on both nodes, and the trigger will derive the row accordingly to the corresponding shard.
Note: the shard number is generated by random()*1+1 which output rounds between 1 and 2.
Querying data can be nicely transparent, as shown below. The tableoid in this particular cas can be misleading, as the oid reported are those from the nodes, not the local machine. It is used just to show that they’re effectively different tables:
proxy=# select tableoid,count(*) from main group be tableoid;
tableoid | count
---------+-------
33226 | 104
33222 | 96
(2 rows)
For example, retrieving a single row is easy as:
proxy=# SELECT avalue FROM main WHERE key = 1500 and shardKey = '01';
avalue
---------------------
0.971926014870405
(1 row)
Behind the scenes, the pushed query to the remote servers contains the corresponding filter( (key = 1500) ) and locally, the constraint exclusion allws to avoid further scans into the other child FT.
proxy=# explain (VERBOSE true)SELECT avalue
FROM main WHERE key = 1500
and shardKey = '01';
QUERY PLAN
----------------------------------------------------------
Append (cost=0.00..131.95 rows=2 width=32)
-> Seq Scan on public.main (cost=0.00..0.00 rows=1 width=32)
Output: main.avalue
Filter: ((main.key = 1500) AND (main.shardKey = '01'::bpchar))
-> Foreign Scan on public.main_shard01 (cost=100.00..131.95 rows=1 width=32)
Output: main_shard01.avalue
Remote SQL: SELECT avalue FROM public.main_shard01 WHERE ((key = 1500))
AND ((shardKey = '01'::bpchar))
(7 rows)
Even if we don’t want to provide the shardKey, the key filter will be pushed across all the shard nodes. If your keys aren’t unique acorss shards, you’ll get a multi-row result set.
proxy=# explain (VERBOSE true) SELECT avalue FROM main WHERE key = 1500;
QUERY PLAN
---------------------------------------------------------------------------
Append (cost=0.00..256.83 rows=15 width=32)
-> Seq Scan on public.main (cost=0.00..00.0 rows=1 width=32)
Output: main.avalue
Filter: (main.key = 1500)
-> Foreign Scan on public.main_shard01 (cost=100.00..128.41 rows=7 width=32)
Output: main_shard01.avalue
Remote SQL: SELECT avalue FROM public.main_shard01 WHERE ((key = 1500))
-> Foreign Scan on puclib.main_shard02 (cost=100.00..128.41 rows=7 width=32)
Output: main_shard02.avalue
Remote SQL: SELECT avalue FROM public.main_shard02 WHERE ((key = 1500))
(10 rows)
Foreign Data Wrappers for Postgres are such a great extension, but it comes at price with a visible overhead in high intesive transactional workloads.
Hope you liked the article!