中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站正在開發(fā)中鄭州谷歌優(yōu)化外包

網(wǎng)站正在開發(fā)中,鄭州谷歌優(yōu)化外包,做彩票網(wǎng)站需要什么,做做網(wǎng)站下載免費(fèi)Gravitino SparkConnector 實(shí)現(xiàn)原理 本文參考了官網(wǎng)介紹,想看官方解析請參考 官網(wǎng)地址 本文僅僅介紹原理 文章目錄 Gravitino SparkConnector 實(shí)現(xiàn)原理背景知識-Spark Plugin 介紹(1) **插件加載**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…

Gravitino SparkConnector 實(shí)現(xiàn)原理

本文參考了官網(wǎng)介紹,想看官方解析請參考 官網(wǎng)地址 本文僅僅介紹原理

文章目錄

  • Gravitino SparkConnector 實(shí)現(xiàn)原理
    • 背景知識-Spark Plugin 介紹
        • (1) **插件加載**
        • (2) **DriverPlugin 初始化**
        • (3) **ExecutorPlugin 初始化**
        • (4) **插件執(zhí)行**
        • (5) **插件銷毀**
    • 背景知識-Driver Plugin 介紹
        • (1) **`init` 方法**
        • (2) **`registerMetrics` 方法**
        • (3) **`onTaskStart` 方法**
        • (4) **`onTaskSucceeded` 方法**
        • (5) **`onTaskFailed` 方法**
        • (6) **`close` 方法**
    • SparkConnector使用方式
      • 加載spark.sql.catalog.xxx 具體執(zhí)行的配置

背景知識-Spark Plugin 介紹

spark在[spark-29399]pr提交更新了SparkPlugin插件

SparkPlugin插件執(zhí)行生命周期

SparkPlugin 的生命周期與 Spark 應(yīng)用程序的生命周期一致,具體如下:

(1) 插件加載
  • 當(dāng) Spark 應(yīng)用程序啟動時,Spark 會掃描類路徑下的 SparkPlugin 實(shí)現(xiàn)類。
  • 如果插件被正確配置(例如通過 spark.plugins 配置項(xiàng)),Spark 會實(shí)例化該類。
(2) DriverPlugin 初始化
  • Spark 調(diào)用 driverPlugin() 方法,獲取 DriverPlugin 實(shí)例。
  • DriverPlugin 的生命周期開始,其方法(如 initregisterMetrics 等)會被調(diào)用。
(3) ExecutorPlugin 初始化
  • Spark 調(diào)用 executorPlugin() 方法,獲取 ExecutorPlugin 實(shí)例。
  • ExecutorPlugin 的生命周期開始,其方法(如 init、shutdown 等)會被調(diào)用。
(4) 插件執(zhí)行
  • DriverPlugin 在 Driver 端執(zhí)行自定義邏輯,例如注冊指標(biāo)、攔截 SQL 解析、修改 Catalog 等。
  • ExecutorPlugin 在 Executor 端執(zhí)行自定義邏輯,例如監(jiān)控 Task 執(zhí)行、收集指標(biāo)等。
(5) 插件銷毀
  • 當(dāng) Spark 應(yīng)用程序結(jié)束時,DriverPluginExecutorPlugin 的生命周期結(jié)束,其 close() 方法會被調(diào)用以釋放資源。

背景知識-Driver Plugin 介紹

DriverPlugin 是用于在 Driver 端執(zhí)行自定義邏輯的插件,其生命周期方法包括:

(1) init 方法
  • 在 Driver 插件初始化時調(diào)用。
  • 可以在此方法中執(zhí)行初始化邏輯,例如注冊自定義 Catalog、攔截 SQL 解析器等。
(2) registerMetrics 方法
  • 在 Driver 插件初始化時調(diào)用。
  • 可以在此方法中注冊自定義指標(biāo)(Metrics)。
(3) onTaskStart 方法
  • 在 Task 啟動時調(diào)用。
  • 可以在此方法中執(zhí)行與 Task 相關(guān)的邏輯。
(4) onTaskSucceeded 方法
  • 在 Task 成功完成時調(diào)用。
  • 可以在此方法中執(zhí)行與 Task 成功相關(guān)的邏輯。
(5) onTaskFailed 方法
  • 在 Task 失敗時調(diào)用。
  • 可以在此方法中執(zhí)行與 Task 失敗相關(guān)的邏輯。
(6) close 方法
  • 在 Driver 插件銷毀時調(diào)用。
  • 可以在此方法中釋放資源,例如關(guān)閉連接、清理緩存等。

SparkConnector使用方式

./bin/spark-sql -v \
--conf spark.plugins="org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin" \
--conf spark.sql.gravitino.uri=http://127.0.0.1:8090 \
--conf spark.sql.gravitino.metalake=test \
--conf spark.sql.gravitino.enableIcebergSupport=true \
--conf spark.sql.warehouse.dir=hdfs://127.0.0.1:9000/user/hive/warehouse-hive

可以看出SparkConnector指定了加載的插件是GravitinoSparkPlugin

public class GravitinoSparkPlugin implements SparkPlugin {@Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}@Overridepublic ExecutorPlugin executorPlugin() {return null;}
}

可以看出實(shí)現(xiàn)方式很簡單,僅僅使用了一個GravitinoDriverPlugin,也就是在Spark應(yīng)用程序啟動的時候掃描SparkPlugin掃描到了這個GravitinoSparkPlugin然后立馬就去執(zhí)行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化過程中 插件僅僅覆寫了兩個函數(shù),init()shutdown()。 說明這個插件僅僅做了一些初始化和資源銷毀操作。

在Driver端進(jìn)行初始化

  1. 配置檢查檢查gravitino_uri和gravitino_metalake是否配置

  2. 如果開啟了iceberg則將gravitinoDriverExtensions放入到數(shù)組中方便配置

  3. 初始化Gravtino客戶端和GravitinoCatalogManager,并且將relational類型的表加載到緩存中

  4. 將緩存中的catalog進(jìn)行如果是非iceberg類型(當(dāng)前僅僅只有Hive)進(jìn)行注冊,這里定義的注冊的實(shí)際操作配置Spark的配置項(xiàng)(spark.sql.catalog.catalogName)這里的catalogName對應(yīng)的是緩存中的catalogName,配置的值為根據(jù)Gravitino自己的Catalog使用的Provider進(jìn)行適配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具體情況由適配器進(jìn)行處理。

  5. 然后注冊SqlExtensions其實(shí)就是將第2步驟的數(shù)組配置到SPARK_SESSION_EXTENSIONS這個SparkConf配置里面

稍微貼一下注冊Catalog代碼,比較重要

  //初始化的時候調(diào)用注冊邏輯,將Gravitino中的Catalog加載到緩存//然后將緩存中的數(shù)據(jù)作為第二個參數(shù)gravitinoCatalogs傳遞進(jìn)來private void registerGravitinoCatalogs(SparkConf sparkConf, Map<String, Catalog> gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry -> {String catalogName = entry.getKey();Catalog gravitinoCatalog = entry.getValue();String provider = gravitinoCatalog.provider();if ("lakehouse-iceberg".equals(provider.toLowerCase(Locale.ROOT))&& enableIcebergSupport == false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn("Register catalog {} failed.", catalogName, e);}});}//這里根據(jù)適配器去配置spark.sql.catalog.xxx 的具體執(zhí)行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn("Skip registering {} because catalog provider is empty.", catalogName);return;}String catalogClassName = CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn("Skip registering {} because {} is not supported yet.", catalogName, provider);return;}String sparkCatalogConfigName = "spark.sql.catalog." + catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName + " is already registered to SparkCatalogManager");sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info("Register {} catalog to Spark catalog manager.", catalogName);}

到這里GravitinoConnector的代碼機(jī)制已經(jīng)說完了,下面聊聊Spark機(jī)制

加載spark.sql.catalog.xxx 具體執(zhí)行的配置

經(jīng)過上面GravitinoDriverPlugin的初始化之后,已經(jīng)將具體的catalog名稱和對應(yīng)的處理類映射起來,這里以GravitinoHiveCatalogSpark33為例。

GravitinoHiveCatalogSpark33這個類繼承關(guān)系是繼承了BaseCatalogBaseCatalog是Spark中定義的CatalogPlugin的一個實(shí)現(xiàn)類。

Spark在解析SQL的時候會查找catalog對應(yīng)的Catalog,可以看到調(diào)用了CatalogManager.catalog()方法

  private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = parts match {case Seq(_) =>Some((None, parts))case Seq(catalogName, tail @ _*) =>try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException =>Some((None, parts))}}}

這個catalog方法調(diào)用了Catalogs.load()方法

  def catalog(name: String): CatalogPlugin = synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}

這個方法才是真正的加載方法,他真正根據(jù)conf配置將GravitinoHiveCatalogSpark33名稱根據(jù)定義的反射構(gòu)造函數(shù)實(shí)例化到內(nèi)存中

   def load(name: String, conf: SQLConf): CatalogPlugin = {val pluginClassName = try {val _pluginClassName = conf.getConfString(s"spark.sql.catalog.$name")// SPARK-39079 do configuration check first, otherwise some path-based table like// `org.apache.spark.sql.json`.`/path/json_file` may fail on analyze phaseif (name.contains(".")) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException =>throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader = Utils.getContextOrSparkClassLoadertry {val pluginClass = loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}

到這里流程就分析結(jié)束了

http://www.risenshineclean.com/news/39472.html

相關(guān)文章:

  • wordpress下拉篩選重慶做seo外包的
  • wordpress科技主題網(wǎng)站排名優(yōu)化公司
  • seo優(yōu)化排名推廣排名優(yōu)化系統(tǒng)
  • 網(wǎng)頁設(shè)計(jì)模板網(wǎng)站推薦外包網(wǎng)絡(luò)推廣公司
  • 上海網(wǎng)站開發(fā)公司外包自學(xué)seo能找到工作嗎
  • asp網(wǎng)站制作設(shè)計(jì)教程佛山網(wǎng)站優(yōu)化軟件
  • 海南省住房和城鄉(xiāng)建設(shè)廳網(wǎng)站首頁排名前50名免費(fèi)的網(wǎng)站
  • 網(wǎng)站建設(shè) 云計(jì)算搜索數(shù)據(jù)
  • wordpress企業(yè)網(wǎng)站制作鄭州seo優(yōu)化
  • 這幾年做啥網(wǎng)站致富推廣鏈接讓別人點(diǎn)擊
  • 門戶網(wǎng)站建設(shè)如何入賬銅陵seo
  • 美國十大購物網(wǎng)站免費(fèi)注冊個人網(wǎng)站不花錢
  • 長安東莞網(wǎng)站設(shè)計(jì)百度掃一掃識別圖片在線
  • logo設(shè)計(jì)培訓(xùn)寧波seo網(wǎng)絡(luò)推廣優(yōu)化價格
  • 網(wǎng)站網(wǎng)頁設(shè)計(jì)中怎么添加頁碼信息谷歌海外推廣
  • 網(wǎng)站方案策劃5118營銷大數(shù)據(jù)
  • wordpress屏蔽垃圾國外ip領(lǐng)碩網(wǎng)站seo優(yōu)化
  • 網(wǎng)站建設(shè)服務(wù)好公司排名google瀏覽器官網(wǎng)下載
  • 做公司網(wǎng)站計(jì)入什么會計(jì)科目seo用什么論壇引流
  • 網(wǎng)站實(shí)現(xiàn)seo基礎(chǔ)知識考試
  • 怎樣優(yōu)化網(wǎng)站排名靠前泰州百度關(guān)鍵詞優(yōu)化
  • 重慶市工程建設(shè)信息網(wǎng)2021優(yōu)化關(guān)鍵詞的公司
  • 哈爾濱地鐵愛建站seo查詢網(wǎng)站是什么
  • 企業(yè)網(wǎng)站優(yōu)化找哪家搜索排行
  • wordpress本地建站成人零基礎(chǔ)學(xué)電腦培訓(xùn)班
  • 瀚欽科技網(wǎng)站建設(shè)谷歌搜索引擎免費(fèi)
  • 北京建站設(shè)計(jì)寫一篇軟文1000字
  • 有沒有專門做航拍婚禮網(wǎng)站應(yīng)用下載app排行榜
  • wordpress動漫博客模板東莞seo靠譜
  • 網(wǎng)頁制作基礎(chǔ)教程第二版seo查詢 站長之家