網(wǎng)站正在開發(fā)中鄭州谷歌優(yōu)化外包
Gravitino SparkConnector 實(shí)現(xiàn)原理
本文參考了官網(wǎng)介紹,想看官方解析請參考 官網(wǎng)地址 本文僅僅介紹原理
文章目錄
- Gravitino SparkConnector 實(shí)現(xiàn)原理
- 背景知識-Spark Plugin 介紹
- (1) **插件加載**
- (2) **DriverPlugin 初始化**
- (3) **ExecutorPlugin 初始化**
- (4) **插件執(zhí)行**
- (5) **插件銷毀**
- 背景知識-Driver Plugin 介紹
- (1) **`init` 方法**
- (2) **`registerMetrics` 方法**
- (3) **`onTaskStart` 方法**
- (4) **`onTaskSucceeded` 方法**
- (5) **`onTaskFailed` 方法**
- (6) **`close` 方法**
- SparkConnector使用方式
- 加載spark.sql.catalog.xxx 具體執(zhí)行的配置
背景知識-Spark Plugin 介紹
spark在[spark-29399]pr提交更新了SparkPlugin插件
SparkPlugin插件執(zhí)行生命周期
SparkPlugin
的生命周期與 Spark 應(yīng)用程序的生命周期一致,具體如下:
(1) 插件加載
- 當(dāng) Spark 應(yīng)用程序啟動時,Spark 會掃描類路徑下的
SparkPlugin
實(shí)現(xiàn)類。 - 如果插件被正確配置(例如通過
spark.plugins
配置項(xiàng)),Spark 會實(shí)例化該類。
(2) DriverPlugin 初始化
- Spark 調(diào)用
driverPlugin()
方法,獲取DriverPlugin
實(shí)例。 DriverPlugin
的生命周期開始,其方法(如init
、registerMetrics
等)會被調(diào)用。
(3) ExecutorPlugin 初始化
- Spark 調(diào)用
executorPlugin()
方法,獲取ExecutorPlugin
實(shí)例。 ExecutorPlugin
的生命周期開始,其方法(如init
、shutdown
等)會被調(diào)用。
(4) 插件執(zhí)行
DriverPlugin
在 Driver 端執(zhí)行自定義邏輯,例如注冊指標(biāo)、攔截 SQL 解析、修改 Catalog 等。ExecutorPlugin
在 Executor 端執(zhí)行自定義邏輯,例如監(jiān)控 Task 執(zhí)行、收集指標(biāo)等。
(5) 插件銷毀
- 當(dāng) Spark 應(yīng)用程序結(jié)束時,
DriverPlugin
和ExecutorPlugin
的生命周期結(jié)束,其close()
方法會被調(diào)用以釋放資源。
背景知識-Driver Plugin 介紹
DriverPlugin
是用于在 Driver 端執(zhí)行自定義邏輯的插件,其生命周期方法包括:
(1) init
方法
- 在 Driver 插件初始化時調(diào)用。
- 可以在此方法中執(zhí)行初始化邏輯,例如注冊自定義 Catalog、攔截 SQL 解析器等。
(2) registerMetrics
方法
- 在 Driver 插件初始化時調(diào)用。
- 可以在此方法中注冊自定義指標(biāo)(Metrics)。
(3) onTaskStart
方法
- 在 Task 啟動時調(diào)用。
- 可以在此方法中執(zhí)行與 Task 相關(guān)的邏輯。
(4) onTaskSucceeded
方法
- 在 Task 成功完成時調(diào)用。
- 可以在此方法中執(zhí)行與 Task 成功相關(guān)的邏輯。
(5) onTaskFailed
方法
- 在 Task 失敗時調(diào)用。
- 可以在此方法中執(zhí)行與 Task 失敗相關(guān)的邏輯。
(6) close
方法
- 在 Driver 插件銷毀時調(diào)用。
- 可以在此方法中釋放資源,例如關(guān)閉連接、清理緩存等。
SparkConnector使用方式
./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive
可以看出SparkConnector指定了加載的插件是GravitinoSparkPlugin
public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}
可以看出實(shí)現(xiàn)方式很簡單,僅僅使用了一個GravitinoDriverPlugin
,也就是在Spark應(yīng)用程序啟動的時候掃描SparkPlugin
掃描到了這個GravitinoSparkPlugin
然后立馬就去執(zhí)行GravitinoDriverPlugin
初始化程序。在DriverPlugin初始化過程中 插件僅僅覆寫了兩個函數(shù),init()
和shutdown()
。 說明這個插件僅僅做了一些初始化和資源銷毀操作。
在Driver端進(jìn)行初始化
-
配置檢查檢查gravitino_uri和gravitino_metalake是否配置
-
如果開啟了iceberg則將gravitinoDriverExtensions放入到數(shù)組中方便配置
-
初始化Gravtino客戶端和
GravitinoCatalogManager
,并且將relational類型的表加載到緩存中 -
將緩存中的catalog進(jìn)行如果是非iceberg類型(當(dāng)前僅僅只有Hive)進(jìn)行注冊,這里定義的注冊的實(shí)際操作配置Spark的配置項(xiàng)(spark.sql.catalog.catalogName)這里的catalogName對應(yīng)的是緩存中的catalogName,配置的值為根據(jù)Gravitino自己的Catalog使用的Provider進(jìn)行適配比如可以是(
org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33
或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33
)具體情況由適配器進(jìn)行處理。 -
然后注冊SqlExtensions其實(shí)就是將第2步驟的數(shù)組配置到
SPARK_SESSION_EXTENSIONS
這個SparkConf
配置里面
稍微貼一下注冊Catalog代碼,比較重要
//初始化的時候調(diào)用注冊邏輯,將Gravitino中的Catalog加載到緩存//然后將緩存中的數(shù)據(jù)作為第二個參數(shù)gravitinoCatalogs傳遞進(jìn)來private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//這里根據(jù)適配器去配置spark.sql.catalog.xxx 的具體執(zhí)行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}
到這里GravitinoConnector的代碼機(jī)制已經(jīng)說完了,下面聊聊Spark機(jī)制
加載spark.sql.catalog.xxx 具體執(zhí)行的配置
經(jīng)過上面GravitinoDriverPlugin
的初始化之后,已經(jīng)將具體的catalog名稱和對應(yīng)的處理類映射起來,這里以GravitinoHiveCatalogSpark33
為例。
GravitinoHiveCatalogSpark33
這個類繼承關(guān)系是繼承了BaseCatalog
而BaseCatalog
是Spark中定義的CatalogPlugin
的一個實(shí)現(xiàn)類。
Spark在解析SQL的時候會查找catalog對應(yīng)的Catalog,可以看到調(diào)用了CatalogManager.catalog()
方法
private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}
這個catalog方法調(diào)用了Catalogs.load()
方法
def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}
這個方法才是真正的加載方法,他真正根據(jù)conf配置將GravitinoHiveCatalogSpark33
名稱根據(jù)定義的反射構(gòu)造函數(shù)實(shí)例化到內(nèi)存中
def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}
到這里流程就分析結(jié)束了