廣告網站模板下載 迅雷下載安裝網站維護費一年多少錢
設計一個基于多個帶標簽SparkSQL模板作為配置文件和多組參數的PySPARK代碼程序,實現根據不同的輸入參數自動批量地將數據導出為Parquet、CSV和Excel文件到S3上,標簽和多個參數(以“_”分割)為組成導出數據文件名,文件已經存在則覆蓋原始文件。
代碼如下:
import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3訪問(根據實際環(huán)境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 驗證參數數量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"參數數量不匹配,模板需要{placeholders}個參數,但當前參數為{len(params)}個")# 替換SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名參數部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定義輸出路徑output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 寫入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 寫入CSV(自動生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 寫入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)
配置文件示例(config.json)
{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}
使用說明
-
依賴管理:
- 確保Spark集群已安裝Hadoop AWS和Spark Excel依賴:
spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
- 確保Spark集群已安裝Hadoop AWS和Spark Excel依賴:
-
S3配置:
- 替換代碼中的
YOUR_ACCESS_KEY
和YOUR_SECRET_KEY
為實際AWS憑證 - 根據S3兼容存儲調整
endpoint
(如使用MinIO需特殊配置)
- 替換代碼中的
-
執(zhí)行命令:
spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \ data_export.py --config config.json --s3-path s3a://your-bucket/exports
輸出結構
s3a://your-bucket/exports
├── parquet
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
├── csv
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx