中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

網站企業(yè)備案代理短視頻拍攝剪輯培訓班

網站企業(yè)備案代理,短視頻拍攝剪輯培訓班,谷歌云做網站,網頁設計中滾動圖片設置目錄 一、輸入Source 1)代碼演示最普通的文件讀取方式: 2) 通過jdbc讀取數(shù)據(jù)庫數(shù)據(jù) 3) 讀取table中的數(shù)據(jù)【hive】 二、輸出Sink 實戰(zhàn)一:保存普通格式 實戰(zhàn)二:保存到數(shù)據(jù)庫中 實戰(zhàn)三:將結果保存在h…

目錄

一、輸入Source

1)代碼演示最普通的文件讀取方式:

2) 通過jdbc讀取數(shù)據(jù)庫數(shù)據(jù)

3) 讀取table中的數(shù)據(jù)【hive】

二、輸出Sink

實戰(zhàn)一:保存普通格式

實戰(zhàn)二:保存到數(shù)據(jù)庫中

實戰(zhàn)三:將結果保存在hive表中

三、總結


????????在大數(shù)據(jù)處理領域,SparkSQL 以其強大的數(shù)據(jù)處理能力和豐富的數(shù)據(jù)源支持備受青睞。它能夠高效地讀取和寫入多種格式的數(shù)據(jù),無論是本地文件、分布式文件系統(tǒng)(如 HDFS)上的數(shù)據(jù),還是數(shù)據(jù)庫、Hive 表中的數(shù)據(jù),都能輕松駕馭。今天,就讓我們深入探究 SparkSQL 讀寫數(shù)據(jù)的方式,通過詳細的代碼示例和原理講解,助你全面掌握這一關鍵技能。

?

一、輸入Source

?

  • 類型:text / csv【任意固定分隔符】 / json / orc / parquet / jdbc / table【Hive表】
  • 語法:spark.read.format(格式).load(讀取的地址)

方式一:給定讀取數(shù)據(jù)源的類型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接調用對應數(shù)據(jù)源類型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

特殊參數(shù):option,用于指定讀取時的一些配置選項

spark.read.format("csv").option("sep", "\t").load(path)jdbcDF = spark.read \.format("jdbc") \.option("url", "jdbc:postgresql:dbserver") \.option("dbtable", "schema.tablename") \.option("user", "username") \.option("password", "password") \.load()

?

1)代碼演示最普通的文件讀取方式:

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 構建環(huán)境變量# 配置環(huán)境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 獲取sparkSession對象spark = SparkSession.builder.master("local[2]").appName("第一次構建SparkSession").config("spark.sql.shuffle.partitions", 2).getOrCreate()df01 = spark.read.json("../../datas/resources/people.json")df01.printSchema()df02 = spark.read.format("json").load("../../datas/resources/people.json")df02.printSchema()df03 = spark.read.parquet("../../datas/resources/users.parquet")df03.printSchema()#spark.read.orc("")df04 = spark.read.format("orc").load("../../datas/resources/users.orc")df04.printSchema()df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")df05.printSchema()df06 = spark.read.load(path="../../datas/resources/people.csv",format="csv",sep=";")df06.printSchema()spark.stop()

?

2) 通過jdbc讀取數(shù)據(jù)庫數(shù)據(jù)

先在本地數(shù)據(jù)庫或者linux數(shù)據(jù)庫中插入一張表:

CREATE TABLE `emp`  (`empno` int(11) NULL DEFAULT NULL,`ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`mgr` int(11) NULL DEFAULT NULL,`hiredate` date NULL DEFAULT NULL,`sal` decimal(7, 2) NULL DEFAULT NULL,`comm` decimal(7, 2) NULL DEFAULT NULL,`deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的數(shù)據(jù):

CREATE TABLE `dept`  (`deptno` int(11) NULL DEFAULT NULL,`dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

接著放驅動程序:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

Python環(huán)境放入MySQL連接驅動

  • 找到工程中pyspark庫包所在的環(huán)境,將驅動包放入環(huán)境所在的jars目錄中
  • 如果是Linux上:注意集群模式所有節(jié)點都要放。

第一種情況:

假如你是windows環(huán)境:

最終的路徑是在這里:

第二種情況:linux環(huán)境下,按照如下方式進行

# 進入目錄
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars# 上傳jar包:mysql-connector-java-5.1.32.jar

代碼練習:

import osfrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongTypeif __name__ == '__main__':# 獲取sparkSession對象# 設置 任務的環(huán)境變量os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 得到sparkSession對象spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 處理邏輯# 讀取json 數(shù)據(jù)df1 = spark.read.format("json").load("../../datas/sql/person.json")df1.show()# 另一種寫法,推薦使用這一種df2 = spark.read.json("../../datas/sql/person.json")df2.show()df3 = spark.read.csv("../../datas/dept.csv")df4 = spark.read.format("csv").load("../../datas/dept.csv")# 讀取分隔符為別的分隔符的文件user_schema = StructType([StructField(name="emp_id", dataType=StringType(), nullable=False),StructField(name="emp_name", dataType=StringType(), nullable=True),StructField(name="salary", dataType=DoubleType(), nullable=True),StructField(name="comm", dataType=DoubleType(), nullable=True),StructField(name="dept_id", dataType=LongType(), nullable=True)])# 使用csv 讀取了一個 \t 為分隔符的文件,讀取的數(shù)據(jù)字段名很隨意,所以可以自定義df5 = spark.read.format("csv").option("sep","\t").load("../../datas/emp.tsv",schema=user_schema)df5.show()# 昨天的作業(yè)是否也可以有另一個寫法movie_schema = StructType([StructField(name="movie_id", dataType=LongType(), nullable=False),StructField(name="movie_name", dataType=StringType(), nullable=True),StructField(name="movie_type", dataType=StringType(), nullable=True)])movieDF = spark.read.format("csv").option("sep","::").load("../../datas/zuoye/movies.dat",schema=movie_schema)movieDF.show()spark.read.load(path="../../datas/zuoye/movies.dat",format="csv",sep="::",schema=movie_schema).show()dict = {"user":"root","password":"root"}jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)jdbcDf.show()# jdbc的另一種寫法jdbcDf2 = spark.read.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "spark.dept") \.option("user", "root") \.option("password", "root").load()jdbcDf2.show()# 讀取hive表中的數(shù)據(jù)# 關閉spark.stop()

?

3) 讀取table中的數(shù)據(jù)【hive】

海量數(shù)據(jù),如何處理,存儲在hdfs上

第一種:

使用spark讀取hdfs上的數(shù)據(jù)(可以使用sparkCore讀取,也可以使用sparksql讀取),將數(shù)據(jù)變?yōu)楸怼緮?shù)據(jù)+Schema】,然后編寫sql或者sparkCore代碼。

rdd --> dataFrame

第二種:推薦

將hdfs上的數(shù)據(jù)映射成hive的表,然后通過sparkSql連接hive, 編寫 sql 處理需求。

  • 場景:Hive底層默認是MR引擎,計算性能特別差,一般用Hive作為數(shù)據(jù)倉庫,使用SparkSQL對Hive中的數(shù)據(jù)進行計算
    • 存儲:數(shù)據(jù)倉庫:Hive:將HDFS文件映射成表
    • 計算:計算引擎:SparkSQL、Impala、Presto:對Hive中的數(shù)據(jù)表進行處理
  • 問題:SparkSQL怎么能訪問到Hive中有哪些表,以及如何知道Hive中表對應的HDFS的地址?

Hive中的表存在哪里?元數(shù)據(jù)--MySQL , 啟動metastore服務即可。

本質上:SparkSQL訪問了Metastore服務獲取了Hive元數(shù)據(jù),基于元數(shù)據(jù)提供的地址進行計算

先退出base環(huán)境:conda deactivate
啟動服務:
啟動hdfs:  start-dfs.sh  因為hive的數(shù)據(jù)在那里存儲著
啟動yarn:  start-yarn.sh 因為spark是根據(jù)yarn部署的,假如你的spark是standalone模式,不需要啟動yarn.
日志服務也需要啟動一下:
mapred --daemon start historyserver
# 啟動Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
啟動metastore服務: 因為sparkSQL需要知道表結構,和表數(shù)據(jù)的位置
hive-server-manager.sh start metastore
啟動spark服務: 啥服務也沒有了,已經啟動完了。
查看metastore服務:
hive-server-manager.sh status metastore

修改配置:

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml在這個文件中,編寫如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>hive.metastore.uris</name><value>thrift://bigdata01:9083</value></property>
</configuration>接著將該文件進行分發(fā):
xsync.sh hive-site.xml

操作sparkSQL:

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此處的pyspark更像是一個客戶端,里面可以通過python編寫spark代碼而已。而我們以前安裝的pyspark更像是spark的python運行環(huán)境。

進入后,通過內置對象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德楓|
|  2|  劉浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

開發(fā)環(huán)境如何編寫代碼,操作hive:

Pycharm工具集成Hive開發(fā)SparkSQL,必須申明Metastore的地址和啟用Hive的支持

spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport()\.getOrCreate()

代碼實戰(zhàn):

from pyspark.sql import SparkSession
import osif __name__ == '__main__':# 構建環(huán)境變量# 配置環(huán)境os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 防止在本地操作hdfs的時候,出現(xiàn)權限問題os.environ['HADOOP_USER_NAME'] = 'root'# 獲取sparkSession對象spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()spark.sql("select * from yhdb.student").show()spark.stop()

代碼還可以這樣寫:

方式二:加載Hive表的數(shù)據(jù)變成DF,可以調用DSL或者SQL的方式來實現(xiàn)計算

# 讀取Hive表構建DataFrame

hiveData = spark.read.table("yhdb.student")

hiveData.printSchema()

hiveData.show()

# 讀取hive表中的數(shù)據(jù)spark2 = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://192.168.233.128:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://192.168.233.128:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()#spark2.sql("show databases").show()#spark2.sql("show  tables").show()#spark2.sql("select * from yhdb.t_user").show()spark2.read.table("t_user2").show()

不要在一個python 文件中,創(chuàng)建兩個不同的sparkSession對象,否則對于sparksql獲取hive的元數(shù)據(jù),有影響。另外,記得添加一個權限校驗的語句:

# 防止在本地操作hdfs的時候,出現(xiàn)權限問題
os.environ['HADOOP_USER_NAME'] = 'root'

為什么有些平臺不支持,不兼容 sqoop flume datax 這些工具呢?

spark 可以讀取日志數(shù)據(jù)

spark 可以讀取數(shù)據(jù)庫數(shù)據(jù)

spark 可以讀取 hdfs 數(shù)據(jù)

spark 可以讀取 hive 數(shù)據(jù)

------------------------------------

spark 可以讀取日志數(shù)據(jù),形成一個 A 表,讀取 mysql 數(shù)據(jù),形成一個 B 表

A 表和 B 表還可以相互關聯(lián),此時也就不需要 sqoop、flume、datax 去導入導出了。

spark 還可以將統(tǒng)計出來的結果直接放入 mysql 或者直接放入 hive

--------------------

我們后面學習的內容還是沿用 將日志數(shù)據(jù),數(shù)據(jù)庫數(shù)據(jù)等所有數(shù)據(jù)抽取到 hive ,然后呢,使用 spark 去統(tǒng)計,統(tǒng)計完之后還是放入 hive ,使用 datax 等工具將結果導出 mysql。

?

二、輸出Sink

?

sink --> 下沉 --> 落盤 --> 保存起來

如果輸出路徑或者表已經存在了怎么辦

  • 類型:text /csv【所有具有固定分隔符的文件】/ json/ orc/ parquet / jdbc / table【Hive表】
  • 語法:DataFrame.write.format(保存的類型).save(保存到哪)
    • 方法:save-保存到文件save(path)或者數(shù)據(jù)庫表save()中,saveAsTable-用于保存到Hive表

方式一:給定輸出數(shù)據(jù)源的類型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接調用對應數(shù)據(jù)源類型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)

特殊參數(shù):option,用于指定輸出時的一些配置選項

df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()

輸出模式:Save Mode

append: 追加模式,當數(shù)據(jù)存在時,繼續(xù)追加
overwrite: 覆寫模式,當數(shù)據(jù)存在時,覆寫以前數(shù)據(jù),存儲當前最新數(shù)據(jù);
error/errorifexists: 如果目標存在就報錯,默認的模式
ignore: 忽略,數(shù)據(jù)存在時不做任何操作

代碼如何編寫:

df.write.mode(saveMode="append").format("csv").save(path)

?

實戰(zhàn)一:保存普通格式

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.json("../../datas/person.json")# 獲取年齡最大的人的名字df.createOrReplaceTempView("persons")rsDf = spark.sql("""select name,age from persons where age = (select max(age) from persons)""")# 將結果打印到控制臺#rsDf.write.format("console").save()#rsDf.write.json("../../datas/result",mode="overwrite")#rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")#rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")#rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")#rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")# text 保存路徑為hdfs 直接報錯,不支持#rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")#rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")spark.stop()
假如:
spark.sql("select concat(name,' ',age) from person").write.text("hdfs://bigdata01:9820/spark/result")
直接報錯:假如你的輸出類型是text類型,直接報錯
pyspark.sql.utils.AnalysisException: Text data source does not support bigint data type.
假如修改為parquet等類型,是可以直接保存的:
rsDf.write.parquet("hdfs://bigdata01:9820/result")                                                                

?

實戰(zhàn)二:保存到數(shù)據(jù)庫中

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.format("csv").option("sep","\t").load("../../datas/zuoye/emp.tsv").toDF("id","name","sal","comm","deptno")# 獲取年齡最大的人的名字df.createOrReplaceTempView("emps")rsDf = spark.sql("""select * from emps where comm is not null""")# 不需要事先將表創(chuàng)建好,它可以幫助我們創(chuàng)建rsDf.write.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark?characterEncoding=UTF-8") \.option("user","root") \.option("password", "123456") \.option("dbtable", "emp1") \.save(mode="overwrite")spark.stop()

?

實戰(zhàn)三:將結果保存在hive表中

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置環(huán)境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路徑,就是前面解壓的那個路徑os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'os.environ['HADOOP_USER_NAME'] = 'root'spark = SparkSession \.builder \.appName("測試本地連接hive") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()df = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv").toDF("id", "name", "sal","comm", "deptno")# 獲取年齡最大的人的名字df.createOrReplaceTempView("emps")rsDf = spark.sql("""select * from emps where comm is not null""")rsDf.write.saveAsTable("yhdb03.emp")spark.stop()

三、總結

????????SparkSQL 讀寫數(shù)據(jù)功能豐富強大,涵蓋多種數(shù)據(jù)源與格式,理解其原理、語法和操作細節(jié),結合不同業(yè)務場景(如數(shù)據(jù)分析、數(shù)據(jù)遷移、數(shù)據(jù)存儲優(yōu)化等)靈活運用,能極大提升大數(shù)據(jù)處理效率,助力在大數(shù)據(jù)領域深挖數(shù)據(jù)價值、攻克業(yè)務難題,為數(shù)據(jù)驅動決策筑牢根基。后續(xù)實踐中,多嘗試不同數(shù)據(jù)、場景組合,深化掌握程度。

http://www.risenshineclean.com/news/42743.html

相關文章:

  • 溫州網站制作多少錢谷歌google 官網下載
  • 手機html5網站源碼廣告投放的方式有哪些
  • 深圳網站建設培訓班深圳最新通告今天
  • 技術支持:淄博網站建設優(yōu)化設計三年級上冊語文答案
  • 山東省建設工程招標中心網站當日網站收錄查詢統(tǒng)計
  • 網站建設需求分析寫什么茶葉seo網站推廣與優(yōu)化方案
  • 網站程序組成seo搜狗排名點擊
  • 辛集seo網站優(yōu)化電話靠譜的免費建站
  • 建立手機個人網站營銷網站建設制作
  • 視頻資源的網站怎么做站長資訊
  • 網站建設課程設計內容淘寶店鋪轉讓價格表
  • wordpress評論框文件采集站seo課程
  • 自己做網站外包百度熱搜高考大數(shù)據(jù)
  • 企業(yè)做網站需要什么軟件百度品牌廣告收費標準
  • 網站制作預付款會計分錄小程序運營推廣公司
  • 大慶網站制作營銷策劃方案包括哪些內容
  • 在百度做網站多少錢網站推廣營銷
  • 網站站內鏈接濰坊住房公積金管理中心
  • 設計網頁推薦萬秀服務不錯的seo推廣
  • 網站的功能和作用百度seo排名帝搜軟件
  • 寶雞哪有有做網站的專業(yè)網絡推廣公司
  • 網站開發(fā)的工作總結google搜索優(yōu)化方法
  • 怎么做賭博網站代理承德seo
  • html做網站實戰(zhàn)教程軟文寫作經驗是什么
  • 余姚物流做網站鄭州本地seo顧問
  • 醫(yī)院網站建設 費用百度認證官網
  • 四川建設廳官方網站四庫一平臺網絡運營商
  • 建設銀行手機查詢網站合肥seo優(yōu)化排名公司
  • 在北京大學生做家教的網站關鍵詞調詞平臺哪個好
  • 東莞電商頁面設計公司福州短視頻seo機會