資源簡介
記得自己要引入環境
(1)利用SparkStreaming從文件目錄讀入日志信息,日志內容包含:
”日志級別、函數名、日志內容“ 三個字段,字段之間以空格拆分。請看數據源的文件。
(2)對讀入都日志信息流進行指定篩選出日志級別為error或warn的,并輸出到外部MySQL中。
需要用到的函數
(1)輸入采用textFileStream()算子
(2)輸出采用foreachRDD()算子
(3)將RDD轉為DataFrame
(4)DataFrame注冊為臨時表,使用SQL過濾
(5)將過濾后的數據保存到MySQL

代碼片段和文件信息
from?pyspark.shell?import?sc
from?pyspark.sql?import?Row
from?pyspark.sql?import?SparkSession
from?pyspark.sql.types?import?*
from?pyspark.streaming?import?StreamingContext
spark?=?SparkSession.builder.appName(“Streaming“).getOrCreate()
sc=spark.sparkContext
#兩個參數:1、sc參數?2、采樣時間間隔(秒)
ssc?=StreamingContext(sc1)
#在ubuntu環境下數據源路勁
ds1?=ssc.textFileStream(“/home/zhuang/138/input/test“)
#把所有數據劃分為[[][]]格式
ds3?=?ds1.map(lambda?line:line.split(“\t“))
def?func(rdd):
????if?not?rdd.isEmpty():
????????#記得轉碼很重要
????????url?=?“jdbc:mysql://ip地址:3306/pyspark?user=root&password=zhuang&characterEncoding=UTF-8“
????????#構建表結構
????????schema?=?StructType([StructField(“日志級別“?StringType()?True)?StructField(“函數名“?StringType()?True)
?????????????????????????????StructField(“日志內容“?StringType()?True)])
????????#對[[][][]]數據轉換成[[[][][]][][][]]因為todf數據是數據格式傳值
????????rdd.map(lambda?x:tuple(x)).toDF(schema).registerTempTable(“test_person1“)
????????df1?=?spark.sql(“select?*?from?test_person1?where?‘日志級別‘!=‘[info]‘“)
????????#?df2?=?spark
????????df1.show()
????????#寫入mysql
????????df1.write.jdbc(mode=“overwrite“url=urltable=“test_person1“?properties={“driver“:‘com.mysql.jdbc.Driver‘})
????????df1.show()
ds3.pprint()
ds3.foreachRDD(func)
#?print(ds4.foreachRDD(func))
ssc.start();ssc.awaitTermination()
?屬性????????????大小?????日期????時間???名稱
-----------?---------??----------?-----??----
?????文件?????????501??2019-05-04?11:03??20180103.log
?????文件?????????501??2019-05-04?11:03??20180104.log
?????文件?????1007502??2019-03-12?14:38??mysql-connector-java-5.1.47.jar
?????文件????????1514??2019-05-30?08:58??test02.py
評論
共有 條評論