昆明網(wǎng)站建設(shè)是什么百度seo競(jìng)價(jià)推廣是什么
一、sparkStreaming的不足
1.基于微批,延遲高不能做到真正的實(shí)時(shí)
2.DStream基于RDD,不直接支持SQL
3.流批處理的API應(yīng)用層不統(tǒng)一,(流用的DStream-底層是RDD,批用的DF/DS/RDD)
4.不支持EventTime事件時(shí)間(一般流處理都會(huì)有兩個(gè)時(shí)間:事件發(fā)生的事件,一個(gè)是事件處理的時(shí)間)
5.數(shù)據(jù)的Exactly-Once(恰好一次語(yǔ)義)需要手動(dòng)實(shí)現(xiàn)
二、StructuredStreaming 的介紹?
1、2016年Spark2.0版本中發(fā)布
2、基于SparkSQL引擎的可擴(kuò)展、容錯(cuò)的全新的流處理引擎。
3、并不是對(duì)Spark Streaming的簡(jiǎn)單改進(jìn),而是重新開(kāi)發(fā)的全新流式引擎
準(zhǔn)實(shí)時(shí)技術(shù):來(lái)一批處理一批 實(shí)時(shí):來(lái)一條處理一條 離線:一般都是處理一些靜止的數(shù)據(jù)
三、socket+console
1、在虛擬機(jī)中下載nc
yum install -y nc2、啟動(dòng) nc -lk 9999
案例:wordcount
import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 創(chuàng)建一個(gè)sparkSession對(duì)象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 處理# 方式一:使用dsl語(yǔ)法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的寫(xiě)法 后續(xù)會(huì)寫(xiě)query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()
四、file+console
文件中的數(shù)據(jù):
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路徑,就是前面解壓的那個(gè)路徑os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base環(huán)境Python解析器的路徑os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 創(chuàng)建一個(gè)sparkSession對(duì)象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([# StructField(name="stu_id", dataType=IntegerType(), nullable=False),# StructField(name="subject_name", dataType=StringType(), nullable=True),# StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()