三門峽建設網(wǎng)站哪家好深圳網(wǎng)絡推廣
FlinkSQL的行級權限解決方案及源碼,支持面向用戶級別的行級數(shù)據(jù)訪問控制,即特定用戶只能訪問授權過的行,隱藏未授權的行數(shù)據(jù)。此方案是實時領域Flink的解決方案,類似離線數(shù)倉Hive中Ranger Row-level Filter方案。
源碼地址: https://github.com/HamaWhiteGG/flink-sql-security
注: 此方案已產品化集成到實時計算平臺Dinky,歡迎試用。
一、基礎知識
1.1 行級權限
行級權限即橫向數(shù)據(jù)安全保護,可以解決不同人員只允許訪問不同數(shù)據(jù)行的問題。例如針對訂單表,用戶A只能查看到北京區(qū)域的數(shù)據(jù),用戶B只能查看到杭州區(qū)域的數(shù)據(jù)。
1.2 業(yè)務流程
1.2.1 設置行級權限
管理員配置用戶、表、行級權限條件,例如下面的配置。
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
2 | 用戶B | orders | region = ‘hangzhou’ |
1.2.2 用戶查詢數(shù)據(jù)
用戶在系統(tǒng)上查詢orders表的數(shù)據(jù)時,系統(tǒng)在底層查詢時會根據(jù)該用戶的行級權限條件來自動過濾數(shù)據(jù),即讓行級權限生效。
當用戶A和用戶B在執(zhí)行下面相同的SQL時,會查看到不同的結果數(shù)據(jù)。
SELECT * FROM orders;
用戶A查看到的結果數(shù)據(jù)是:
order_id | order_date | customer_name | price | product_id | order_status | region |
---|---|---|---|---|---|---|
10001 | 2020-07-30 10:08:22 | Jack | 50.50 | 102 | false | beijing |
10002 | 2020-07-30 10:11:09 | Sally | 15.00 | 105 | false | beijing |
注: 系統(tǒng)底層最終執(zhí)行的SQL是:
SELECT * FROM orders WHERE region = 'beijing'
。
用戶B查看到的結果數(shù)據(jù)是:
order_id | order_date | customer_name | price | product_id | order_status | region |
---|---|---|---|---|---|---|
10003 | 2020-07-30 12:00:30 | Edward | 25.25 | 106 | false | hangzhou |
10004 | 2022-12-15 12:11:09 | John | 78.00 | 103 | false | hangzhou |
注: 系統(tǒng)底層最終執(zhí)行的SQL是:
SELECT * FROM orders WHERE region = 'hangzhou'
。
1.3 組件版本
組件名稱 | 版本 | 備注 |
---|---|---|
Flink | 1.16.0 | |
Flink-connector-mysql-cdc | 2.3.0 |
二、Hive行級權限解決方案
在離線數(shù)倉工具Hive領域,由于發(fā)展多年已有Ranger來支持表數(shù)據(jù)的行級權限控制,詳見參考文獻[2]。下圖是在Ranger里配置Hive表行級過濾條件的頁面,供參考。
但由于Flink實時數(shù)倉領域發(fā)展相對較短,Ranger還不支持FlinkSQL,以及要依賴Ranger會導致系統(tǒng)部署和運維過重,因此開始自研實時數(shù)倉的行級權限解決工具。
三、FlinkSQL行級權限解決方案
3.1 解決方案
3.1.1 FlinkSQL執(zhí)行流程
可以參考作者文章[FlinkSQL字段血緣解決方案及源碼],本文根據(jù)Flink1.16修正和簡化后的執(zhí)行流程如下圖所示。
在CalciteParser.parse()處理后會得到一個SqlNode類型的抽象語法樹(Abstract Syntax Tree
,簡稱AST),本文會在Parse階段,通過組裝行級過濾條件生成新的AST來實現(xiàn)行級權限控制。
3.1.2 Calcite對象繼承關系
下面章節(jié)要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。
序號 | 類 | 介紹 |
---|---|---|
1 | SqlNode | A SqlNode is a SQL parse tree. |
2 | SqlCall | A SqlCall is a call to an SqlOperator operator. |
3 | SqlIdentifier | A SqlIdentifier is an identifier, possibly compound. |
4 | SqlJoin | Parse tree node representing a JOIN clause. |
5 | SqlBasicCall | Implementation of SqlCall that keeps its operands in an array. |
6 | SqlSelect | A SqlSelect is a node of a parse tree which represents a select statement. |
3.1.3 解決思路
在Parser階段,如果執(zhí)行的SQL包含對表的查詢操作,則一定會構建Calcite SqlSelect對象。因此限制表的行級權限,只要在構建Calcite SqlSelect對象時對Where條件進行攔截即可,而不需要解析用戶執(zhí)行的各種SQL來查找配置過行級權限條件約束的表。
在SqlSelect對象構造Where條件時,要通過執(zhí)行用戶和表名來查找配置的行級權限條件,系統(tǒng)會把此條件用CalciteParser提供的parseExpression(String sqlExpression)
方法解析生成一個SqlBacicCall再返回。然后結合用戶執(zhí)行的SQL和配置的行級權限條件重新組裝Where條件,即生成新的帶行級過濾條件Abstract Syntax Tree,最后基于新的AST再執(zhí)行后續(xù)的Validate、Convert、Optimize和Execute階段。
以上整個過程對執(zhí)行SQL的用戶都是透明和無感知的,還是調用Flink自帶的TableEnvironment.executeSql(String statement)方法即可。
注: 要通過技術手段把執(zhí)行用戶傳遞到Calcite SqlSelect中。
3.2 重寫SQL
主要在org.apache.calcite.sql.SqlSelect的構造方法中完成。
3.2.1 主要流程
主流程如下圖所示,根據(jù)From的類型進行不同的操作,例如針對SqlJoin類型,要分別遍歷其left和right節(jié)點,而且要支持遞歸操作以便支持三張表及以上JOIN;針對SqlIdentifier類型,要額外判斷下是否來自JOIN,如果是的話且JOIN時且未定義表別名,則用表名作為別名;針對SqlBasicCall類型,如果來自于子查詢,說明已在子查詢中組裝過行級權限條件,則直接返回當前Where即可,否則分別取出表名和別名。
然后再獲取行級權限條件解析后生成SqlBacicCall類型的Permissions,并給Permissions增加別名,最后把已有Where和Permissions進行組裝生成新的Where,來作為SqlSelect對象的Where約束。
上述流程圖的各個分支,都會在下面的用例測試章節(jié)中會舉例說明。
3.2.2 核心源碼
核心源碼位于SqlSelect中新增的addCondition()
、addPermission()
、buildWhereClause()
三個方法,下面只給出控制主流程addCondition()
的源碼。
/*** The main process of controlling row-level permissions*/
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {if (from instanceof SqlIdentifier) {String tableName = from.toString();// the table name is used as an alias for joinString tableAlias = fromJoin ? tableName : null;return addPermission(where, tableName, tableAlias);} else if (from instanceof SqlJoin) {SqlJoin sqlJoin = (SqlJoin) from;// support recursive processing, such as join for three tables, process left sqlNodewhere = addCondition(sqlJoin.getLeft(), where, true);// process right sqlNodereturn addCondition(sqlJoin.getRight(), where, true);} else if (from instanceof SqlBasicCall) {// Table has an alias or comes from a subquerySqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();/*** If there is a subquery in the Join, row-level filtering has been appended to the subquery.* What is returned here is the SqlSelect type, just return the original where directly*/if (!(tableNodes[0] instanceof SqlIdentifier)) {return where;}String tableName = tableNodes[0].toString();String tableAlias = tableNodes[1].toString();return addPermission(where, tableName, tableAlias);}return where;
}
四、用例測試
用例測試數(shù)據(jù)來自于CDC Connectors for Apache Flink
[6]官網(wǎng),在此表示感謝。下載本文源碼后,可通過Maven運行單元測試。
$ cd flink-sql-security
$ mvn test
4.1 新建Mysql表及初始化數(shù)據(jù)
Mysql新建表語句及初始化數(shù)據(jù)SQL詳見源碼[flink-sql-security/data/database]里面的mysql_ddl.sql和mysql_init.sql文件,本文給orders
表增加一個region字段。
4.2 新建Flink表
4.2.1 新建mysql cdc類型的orders表
DROP TABLE IF EXISTS orders;CREATE TABLE IF NOT EXISTS orders (order_id INT PRIMARY KEY NOT ENFORCED,order_date TIMESTAMP(0),customer_name STRING,product_id INT,price DECIMAL(10, 5),order_status BOOLEAN,region STRING
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='orders'
);
4.2.2 新建mysql cdc類型的products表
DROP TABLE IF EXISTS products;CREATE TABLE IF NOT EXISTS products (id INT PRIMARY KEY NOT ENFORCED,name STRING,description STRING
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='products'
);
4.2.3 新建mysql cdc類型shipments表
DROP TABLE IF EXISTS shipments;CREATE TABLE IF NOT EXISTS shipments (shipment_id INT PRIMARY KEY NOT ENFORCED,order_id INT,origin STRING,destination STRING,is_arrived BOOLEAN
) WITH ('connector'='mysql-cdc','hostname'='xxx.xxx.xxx.xxx','port'='3306','username'='root','password'='xxx','server-time-zone'='Asia/Shanghai','database-name'='demo','table-name'='shipments'
);
4.2.4 新建print類型print_sink表
DROP TABLE IF EXISTS print_sink;CREATE TABLE IF NOT EXISTS print_sink (order_id INT PRIMARY KEY NOT ENFORCED,order_date TIMESTAMP(0),customer_name STRING,product_id INT,price DECIMAL(10, 5),order_status BOOLEAN,region STRING
) WITH ('connector'='print'
);
4.3 測試用例
詳細測試用例可查看源碼中的單測,下面只描述部分測試點。
4.3.1 簡單SELECT
4.3.1.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
4.3.1.2 輸入SQL
SELECT * FROM orders;
4.3.1.3 輸出SQL
SELECT * FROM orders WHERE region = 'beijing';
4.3.1.4 測試小結
輸入SQL中沒有WHERE條件,只需要把行級過濾條件region = 'beijing'
追加到WHERE后即可。
4.3.2 SELECT帶復雜WHERE約束
4.3.2.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
4.3.2.2 輸入SQL
SELECT * FROM orders WHERE price > 45.0 OR customer_name = 'John';
4.3.2.3 輸出SQL
SELECT * FROM orders WHERE (price > 45.0 OR customer_name = 'John') AND region = 'beijing';
4.3.2.4 測試小結
輸入SQL中有兩個約束條件,中間用的是OR,因此在組裝region = 'beijing'
時,要給已有的price > 45.0 OR customer_name = 'John'
增加括號。
4.3.3 兩表JOIN且含子查詢
4.3.3.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
4.3.3.2 輸入SQL
SELECTo.*,p.name,p.description
FROM (SELECT*FROM ordersWHERE order_status = FALSE) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHEREo.price > 45.0 OR o.customer_name = 'John'
4.3.3.3 輸出SQL
SELECTo.*,p.name,p.description
FROM (SELECT*FROM ordersWHERE order_status = FALSE AND region = 'beijing') AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHEREo.price > 45.0 OR o.customer_name = 'John'
4.3.3.4 測試小結
針對比較復雜的SQL,例如兩表在JOIN時且其中左表來自于子查詢SELECT * FROM orders WHERE order_status = FALSE
,行級過濾條件region = 'beijing'
只會追加到子查詢的里面。
4.3.4 三表JOIN
4.3.4.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
2 | 用戶A | products | name = ‘hammer’ |
3 | 用戶A | shipments | is_arrived = FALSE |
4.3.4.2 輸入SQL
SELECTo.*,p.name,p.description,s.shipment_id,s.origin,s.destination,s.is_arrived
FROMorders AS oLEFT JOIN products AS p ON o.product_id=p.idLEFT JOIN shipments AS s ON o.order_id=s.order_id;
4.3.4.3 輸出SQL
SELECTo.*,p.name,p.description,s.shipment_id,s.origin,s.destination,s.is_arrived
FROMorders AS oLEFT JOIN products AS p ON o.product_id=p.idLEFT JOIN shipments AS s ON o.order_id=s.order_id
WHEREo.region='beijing'AND p.name='hammer'AND s.is_arrived=FALSE;
4.3.4.4 測試小結
三張表進行JOIN時,會分別獲取orders
、products
、shipments
三張表的行級權限條件: region = 'beijing'
、name = 'hammer'
和is_arrived = FALSE
,然后增加orders
表的別名o、products
表的別名p、shipments
表的別名s,最后組裝到WHERE子句后面。
4.3.5 INSERT來自帶子查詢的SELECT
4.3.5.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
4.3.5.2 輸入SQL
INSERT INTO print_sink SELECT * FROM (SELECT * FROM orders);
4.3.5.3 輸出SQL
INSERT INTO print_sink (SELECT * FROM (SELECT * FROM orders WHERE region = 'beijing'));
4.3.5.4 測試小結
無論運行SQL類型是INSERT、SELECT或者其他,只會找到查詢oders
表的子句,然后對其組裝行級權限條件。
4.3.6 運行SQL
測試兩個不同用戶執(zhí)行相同的SQL,兩個用戶的行級權限條件不一樣。
4.3.6.1 行級權限條件
序號 | 用戶名 | 表名 | 行級權限條件 |
---|---|---|---|
1 | 用戶A | orders | region = ‘beijing’ |
2 | 用戶B | orders | region = ‘hangzhou’ |
4.3.6.2 輸入SQL
SELECT * FROM orders;
4.3.6.3 執(zhí)行SQL
用戶A的真實執(zhí)行SQL:
SELECT * FROM orders WHERE region = 'beijing';
用戶B的真實執(zhí)行SQL:
SELECT * FROM orders WHERE region = 'hangzhou';
4.3.6.4 測試小結
用戶調用下面的執(zhí)行方法,除傳遞要執(zhí)行的SQL參數(shù)外,只需要額外指定執(zhí)行的用戶即可,便能自動按照行級權限限制來執(zhí)行。
/*** Execute the single sql with user permissions*/
public TableResult execute(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);return tableEnv.executeSql(singleSql);
}
五、源碼修改步驟
注: Flink版本1.16.0依賴的Calcite是1.26.0版本。
5.1 新增Parser和ParserImpl類
復制Flink源碼中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到項目下,新增下面兩個方法及實現(xiàn)。
/*** Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.** @param sqlExpression a SQL expression string to parse* @return a parsed SQL node* @throws SqlParserException if an exception is thrown when parsing the statement*/
@Override
public SqlNode parseExpression(String sqlExpression) {CalciteParser parser = calciteParserSupplier.get();return parser.parseExpression(sqlExpression);
}/*** Entry point for parsing SQL queries and return the abstract syntax tree** @param statement the SQL statement to evaluate* @return abstract syntax tree* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement*/
@Override
public SqlNode parseSql(String statement) {CalciteParser parser = calciteParserSupplier.get();// use parseSqlList here because we need to support statement end with ';' in sql client.SqlNodeList sqlNodeList = parser.parseSqlList(statement);List<SqlNode> parsed = sqlNodeList.getList();Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");return parsed.get(0);
}
5.2 新增SqlSelect類
復制Calcite源碼中的org.apache.calcite.sql.SqlSelect到項目下,新增上文提到的addCondition()
、addPermission()
、buildWhereClause()
三個方法。
并且在構造方法中注釋掉原有的this.where = where
行,并添加如下代碼:
// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;
5.3 封裝SecurityContext類
新建SecurityContext類,主要添加下面三個方法:
/*** Add row-level filter conditions and return new SQL*/
public String addRowFilter(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);// in the modified SqlSelect, filter conditions will be added to the where clauseSqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);return parsedTree.toString();
}/*** Query the configured permission point according to the user name and table name, and return* it to SqlBasicCall*/
public SqlBasicCall queryPermissions(String username, String tableName) {String permissions = rowLevelPermissions.get(username, tableName);LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);if (permissions != null) {return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);}return null;
}/*** Execute the single sql with user permissions*/
public TableResult execute(String username, String singleSql) {System.setProperty(EXECUTE_USERNAME, username);return tableEnv.executeSql(singleSql);
}
六、下一步計劃
- 支持數(shù)據(jù)脫敏(Data Masking)
- 開發(fā)ranger-flink-plugin
七、參考文獻
- 數(shù)據(jù)管理DMS-敏感數(shù)據(jù)管理-行級管控
- Apache Ranger Row-level Filter
- OpenLooKeng的行級權限控制
- PostgreSQL中的行級權限/數(shù)據(jù)權限/行安全策略
- FlinkSQL字段血緣解決方案及源碼
- 基于 Flink CDC 構建 MySQL 和 Postgres 的 Streaming ETL