FirewaterDistributedArchitecture
From Eigenpedia
This page describes the distributed aspects of the architecture being developed for the first release of the Firewater project.
Contents |
Scope
Please read FirewaterLongRangeArchitecture for concepts, terminology, and blue sky stuff. For the first release, the following capabilities will not be present:
- load balancing
- shared storage
- replication of partitions (only replication of unpartitioned tables will be present)
Deployment Options
This section explains the different ways Firewater can be deployed on machines in a network.
Single SMP Scaleup Server
Firewater can be used to take full advantage of a single powerful machine with multiple cores and a storage array or SAN with a lot of I/O bandwidth:
In this configuration, both the distributor and one embedded storage node run within the same (single) Firewater process, with the data spread out over a number of partitions equal to the number of cores. The distributed query optimizer is capable of recognizing that all of the partitions are actually local; it optimizes the plan by avoiding JDBC loopback connections--instead, a single parallel execution plan is generated which combines the results of local table access to all partitions.
Firewater supports this configuration via the SYS_FIREWATER_EMBEDDED_WRAPPER and companion SYS_FIREWATER_EMBEDDED_SERVER for referencing embedded storage:
create partition p1 on (sys_firewater_embedded_server); create partition p2 on (sys_firewater_embedded_server); ... create partition p8 on (sys_firewater_embedded_server);
Note that with this option, all resources (such as catalog repository, buffer pool and device I/O scheduler) are fully shared. Usually, this should be optimal (--Jvs 20:04, 28 May 2009 (EDT): it would help if each partition could be placed in its own physical tablespace file), but in cases where it's not, the scaleout configuration described next could be used instead, with multiple processes running on the same box.
Scaleout Network
To take advantage of multiple machines on a network, Firewater can instead be used in distributed fashion:
In this configuration, a single machine is dedicated to the distributor, plus one machine for each storage node. Again, for storage nodes, the number of partitions would typically be based on the number of cores available. The distributed query optimizer decomposes plans, sending fragments to storage nodes, and then combining the results before returning them to clients.
Each node maintains its own private storage and catalog repository. The storage node catalogs describe real physical table partition storage, whereas the distributor node catalog describes the logical table definitions and how they are partitioned and/or replicated across the storage nodes.
Firewater supports this configuration via the SYS_FIREWATER_REMOTE_WRAPPER:
create server remote_node1 foreign data wrapper sys_firewater_remote_wrapper options( user_name 'sa', url 'jdbc:firewater_storage:remote:http://storage_node1' ); create server remote_node2 foreign data wrapper sys_firewater_remote_wrapper options( user_name 'sa', url 'jdbc:firewater_storage:remote:http://storage_node2' ); ... create partition p1 on (remote_node1); create partition p2 on (remote_node1); create partition p3 on (remote_node2); create partition p4 on (remote_node2); ...
(For a cloud-based deployment, the foreign data wrapper's JNDI capability should be used instead, so that the URL's reference logical names rather than physical addresses. This way, when a cloud node is allocated, only the JNDI directory needs to be updated--not the foreign server definition itself.)
Small Offload Network
A hybrid configurations is also possible in which the distributor node has an embedded storage, but is also backed by remote nodes. This could make sense if either the distributor node has power to spare, or if only a small number of machines are being used as storage nodes to partially offload the primary server:
create server remote_node1 foreign data wrapper sys_firewater_remote_wrapper options( user_name 'sa', url 'jdbc:firewater_storage:remote:http://storage_node1' ); create server remote_node2 foreign data wrapper sys_firewater_remote_wrapper options( user_name 'sa', url 'jdbc:firewater_storage:remote:http://storage_node2' ); create partition p1 on (sys_firewater_embedded_server); create partition p2 on (remote_node1); create partition p3 on (remote_node2);
Catalog Repository Model
This UML class diagram illustrates how Firewater extends the standard Eigenbase catalog model with information on how partitions are placed on storage nodes (represented as DataServer instances):
Although the model is designed to encompass a many-to-many association between partitions and storage nodes via the PartitionReplica intersection class, Firewater does not currently support this--a partition can only be placed on a single storage node:
create server remote_node1 foreign data wrapper sys_firewater_remote_wrapper options( user_name 'sa', url 'jdbc:firewater_storage:remote:http://node1' ); create partition rp1 on (remote_node1);
In the future, the partition creation DDL will allow for a list of nodes:
create partition rp1 on (remote_node1, remote_node2);
Note that only the catalog repository for the distributor node is extended in this way; storage nodes do not have any distribution information. When the distributor node processes a CREATE PARTITION statement such as the one above, it records the association in its own catalog, and then sends down the following command to remote_node1:
CREATE CATALOG "RP1";
The partition name will be used to qualify any table partitions placed on remote_node1, as explained in the next section.
Object Partitioning and Replication
Suppose the following DDL is executed on the distributor node:
create partition rp1 on (remote_node1); create partition rp2 on (remote_node1); create partition rp3 on (remote_node2); create partition rp4 on (remote_node2); create schema warehouse; create table warehouse.event_log(id int primary key, ...) options (partitioning 'HASH');
Then on remote_node1, the corresponding DDL will be executed automatically by the distributor via JDBC:
CREATE CATALOG RP1; CREATE CATALOG RP2; CREATE SCHEMA LOCALDB.WAREHOUSE; CREATE SCHEMA RP1.WAREHOUSE; CREATE SCHEMA RP2.WAREHOUSE; CREATE TABLE RP1.WAREHOUSE.EVENT_LOG(ID INT PRIMARY KEY, ...); CREATE TABLE RP2.WAREHOUSE.EVENT_LOG(ID INT PRIMARY KEY, ...);
Likewise on remote_node2:
CREATE CATALOG RP3; CREATE CATALOG RP4; CREATE SCHEMA LOCALDB.WAREHOUSE; CREATE SCHEMA RP3.WAREHOUSE; CREATE SCHEMA RP4.WAREHOUSE; CREATE TABLE RP3.WAREHOUSE.EVENT_LOG(ID INT PRIMARY KEY, ...); CREATE TABLE RP4.WAREHOUSE.EVENT_LOG(ID INT PRIMARY KEY, ...);
The CREATE TABLE processing looks like this (simplified with only one partition per storage node):
In other words, each partition corresponds to a private copy of schema/table definitions in a named catalog on the appropriate storage node. A table partition on a storage node will only store the subset of table rows which hash to that partition; it is the job of Firewater's distributed query optimizer and executor to put all of the partitions back together in order to answer a query.
There is currently no way to use a subset of the available partitions for a particular table; a partitioned table is automatically distributed across all available partitions. And it is illegal to attempt to create a partition when at least one partitioned table already exists.
Suppose instead that replication rather than partitioning is chosen:
create table warehouse.event_source(id int primary key, ...) options (partitioning 'NONE');
Then on both remote nodes we get:
CREATE TABLE LOCALDB.WAREHOUSE.EVENT_SOURCE(ID INT PRIMARY KEY, ...);
(Note that in the partitioning example, the schema named WAREHOUSE was already replicated into LOCALDB as well.)
What about other kinds of objects? The following table describes how each one is handled:
| Object Type | Handling |
|---|---|
| Schema | always replicate |
| Stored Table | replicate or partition as directed |
| Index | replicate or partition as directed for containing table |
| Warehouse Label | always replicate |
| SQL/MED Foreign Wrapper/Server/Table | never replicate??? depends what kind of distributed ETL support we add |
| View | never replicate (view expansion happens entirely in distributor's optimizer) |
| User/Role | never replicate (access control happens entirely in distributor) |
| Routines | always replicate |
| Jars | always replicate (either require URL to point to a shared location, or implement an automatic distributed deployment mechanism) |
Distributed Query Processing
Firewater has a distributed query optimizer for decomposing a query into subqueries to be executed by the individual storage nodes (accessed via a remoting JDBC client and the associated SQL/MED foreign data wrapper); it also has an executor capable of combining the results and then returning them to the client, similar to what is described in LucidDbHorizontalPartitioning:
For the first release, the optimizer will focus on OLAP-style star join/agg (not arbitrary queries and schemas such as might be encountered when executing ETL).
- NG COMMENT: I'd suggest support for one slighly complex query plan not covered by the technique above. Distinct Count aggregators in an OLAP-style star will need to be handled differently than the simple added avg/sum/min/max/count additive measures. I think "full" coverage on the mondrian side makes the Moonshine/Mondrian combination potent!
- --Jvs 14:19, 9 May 2009 (EDT): The rewrite LucidDB is already using for COUNT DISTINCT may work as is; we do one level of GROUP BY to filter out the duplicates, then another level to compute the count. When it is mixed in with other aggregates, there's a "join-back" in the plan to put the different grains together. We can push part of the duplicate-filtering work down to the storage nodes, but the final duplicate-filtering and the count have to be done in the D node. That is, unless the top-level GROUP BY is on the partitioning key; in that special case we can push the whole thing down.
Distributed Loader
How does the data get into the storage nodes in the first place? Rather than implementing distributed DML operators, for the first release we plan to leverage Kettle's parallel ETL capabilities. A Firewater plugin for Kettle will query the distributor catalog to discover the distribution information, then partition the incoming data stream accordingly and send it to the storage nodes via an enhanced version of LucidDbPDIStreamingLoader:
- NG COMMENT: The integration could leverage the partitioning features in PDI to get individual PDI slaves on each node loading into the local instance of LucidDB coordinated from a master (Firewater) slave server. Count me in for helping with this.
- --Jvs 14:21, 9 May 2009 (EDT): Awesome!
For the first release, distribution options will include HASHED (based on primary key) and REPLICATED. There will be no way to bind a table to a single node or subset of nodes; partitioning and replication will be across all nodes, with the number of nodes static per installation.
A typical star schema layout would hash fact tables while replicating dimension tables (to allow star joins to be fully pushed down). Later, an advanced layout might hash the largest dimension as well (with a partitioning key either equivalent to or a subset of the fact partitioning key).







