說到流處理,Spark為我們提供了窗口函數,允許在滑動數據窗口上應用轉換,常用場景如每五分鐘商場人流密度、每分鐘流量等等,接下來我們通過畫圖來了解Spark Streaming的窗口函數如何工作的,處理過程圖如下所示:
上圖中綠色的小框框是一批一批的數據流,虛線框和實線框分別是前一個窗口和后一個窗口,從圖中可以看出后一個窗口在前一個窗口基礎上移動了兩個批次的數據流,而我們真正通過算子操作的數據其實就是窗口內所有的數據流。
在代碼實現前了解下窗口操作常用的函數有:
- window
- countByWindow
- reduceByWindow
- reduceByKeyAndWindow
- reduceByKeyAndWindow
- countByValueAndWindow
window最原始的窗口,提供兩個參數,第一個參數是窗口長度,第二個參數是滑動間隔,返回一個新的DStream, 返回的結果可以進行算子操作,代碼實現如下
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 9999)
# 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
dstream = lines.window(60, 10)
dstream.pprint()
# -------------------------------------------
# Time: 2019-08-13 19:46:45
# -------------------------------------------
# hello
# world
ssc.start()
ssc.awaitTermination()
現在終端使用nc發送數據
root@root:~$ nc -lk 9999
hello
world
countByWindow統計每個滑動窗口內數據條數,要注意的是使用該函數要加上checkpoint機制,代碼實現如下
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/tmp/window")
lines = ssc.socketTextStream("localhost", 9999)
# 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
dstream = lines.countByWindow(60, 10)
dstream.pprint()
# -------------------------------------------
# Time: 2019-08-14 18:56:40
# -------------------------------------------
# 2
ssc.start()
ssc.awaitTermination()
reduceByWindow聚合每個鍵的值,底層執行的是reduceByKeyAndWindow,實現代碼如下
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fun(x):
return x
if __name__ == '__main__':
sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/tmp/window")
lines = ssc.socketTextStream("localhost", 9999)
# 第一個參數執行指定函數, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒
dstream = lines.reduceByWindow(fun, 60, 10)
dstream.pprint()
# -------------------------------------------
# Time: 2019-08-14 19:23:30
# -------------------------------------------
# hello
ssc.start()
ssc.awaitTermination()
reduceByKeyAndWindow是對(K,V)窗口數據相同的K執行對應的fun,實現代碼如下
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def fun(x,y):
return x+y
if __name__ == '__main__':
sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/tmp/window")
lines = ssc.socketTextStream("localhost", 9999)
# 第一個參數執行的功能函數fun, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒,
# 第四個參數設定并行度
dstream = lines.map(lambda x:(x,1)).reduceByKeyAndWindow(fun, 60, 10)
dstream.pprint()
# -------------------------------------------
# Time: 2019-08-14 19:23:30
# -------------------------------------------
# ('hello', 2)
ssc.start()
ssc.awaitTermination()
countByValueAndWindow是對窗口數據進行單詞統計,實現代碼如下
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == '__main__':
sc = SparkContext(appName="windowStream", master="local[*]")
# 第二個參數指統計多長時間的數據
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/tmp/window")
lines = ssc.socketTextStream("localhost", 9999)
# 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒, 第三個參數任務并行度
dstream = lines.countByValueAndWindow(60, 10)
dstream.pprint()
# -------------------------------------------
# Time: 2019-08-14 19:23:30
# -------------------------------------------
# ('hello', 3)
# ('world', 1)
ssc.start()
ssc.awaitTermination()
以上就是所有窗口函數的使用
?
Spark學習目錄:
- Spark學習實例1(Python):單詞統計 Word Count
- Spark學習實例2(Python):加載數據源Load Data Source
- Spark學習實例3(Python):保存數據Save Data
- Spark學習實例4(Python):RDD轉換 Transformations
- Spark學習實例5(Python):RDD執行 Actions
- Spark學習實例6(Python):共享變量Shared Variables
- Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
- Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
- Spark學習實例9(Python):窗口操作 Window Operations
?
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
