欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Spark學習實例(Python):窗口操作 Window

系統 1646 0

說到流處理,Spark為我們提供了窗口函數,允許在滑動數據窗口上應用轉換,常用場景如每五分鐘商場人流密度、每分鐘流量等等,接下來我們通過畫圖來了解Spark Streaming的窗口函數如何工作的,處理過程圖如下所示:

Spark學習實例(Python):窗口操作 Window Operations_第1張圖片

上圖中綠色的小框框是一批一批的數據流,虛線框和實線框分別是前一個窗口和后一個窗口,從圖中可以看出后一個窗口在前一個窗口基礎上移動了兩個批次的數據流,而我們真正通過算子操作的數據其實就是窗口內所有的數據流。

在代碼實現前了解下窗口操作常用的函數有:

  • window
  • countByWindow
  • reduceByWindow
  • reduceByKeyAndWindow
  • reduceByKeyAndWindow
  • countByValueAndWindow

window最原始的窗口,提供兩個參數,第一個參數是窗口長度,第二個參數是滑動間隔,返回一個新的DStream, 返回的結果可以進行算子操作,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.window(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-13 19:46:45
    # -------------------------------------------
    # hello
    # world
    ssc.start()
    ssc.awaitTermination()
            
          

現在終端使用nc發送數據

root@root:~$ nc -lk 9999
hello
world

countByWindow統計每個滑動窗口內數據條數,要注意的是使用該函數要加上checkpoint機制,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.countByWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 18:56:40
    # -------------------------------------------
    # 2
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByWindow聚合每個鍵的值,底層執行的是reduceByKeyAndWindow,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x):
    return x

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行指定函數, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒
    dstream = lines.reduceByWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # hello
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByKeyAndWindow是對(K,V)窗口數據相同的K執行對應的fun,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x,y):
    return x+y

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行的功能函數fun, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒,
    # 第四個參數設定并行度
    dstream = lines.map(lambda x:(x,1)).reduceByKeyAndWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 2)
    ssc.start()
    ssc.awaitTermination()
            
          

countByValueAndWindow是對窗口數據進行單詞統計,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒, 第三個參數任務并行度
    dstream = lines.countByValueAndWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 3)
    # ('world', 1)
    ssc.start()
    ssc.awaitTermination()
            
          

以上就是所有窗口函數的使用

?

Spark學習目錄:

  • Spark學習實例1(Python):單詞統計 Word Count
  • Spark學習實例2(Python):加載數據源Load Data Source
  • Spark學習實例3(Python):保存數據Save Data
  • Spark學習實例4(Python):RDD轉換 Transformations
  • Spark學習實例5(Python):RDD執行 Actions
  • Spark學習實例6(Python):共享變量Shared Variables
  • Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
  • Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
  • Spark學習實例9(Python):窗口操作 Window Operations

?

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 欧美欲妇激情视频在线 | 爱操影视| 色欧美片视频在线观看 | 国内精品免费 | 欧美国产激情二区三区 | 色版网站 | 天天干天天操天天射 | 97色伦色在线综合视频 | 日韩精品在线视频 | 日韩免费一区二区 | 国产精品婷婷久久久久 | 成人av免费| 国产成年人网站 | 国产综合亚洲精品一区二 | 九九久久99综合一区二区 | 精品推荐国产麻豆剧传媒 | 色欧美片视频在线观看 | 亚洲欧洲精品在线 | 日韩精品一区二区三区不卡 | 色婷婷色综合激情国产日韩 | 91看片网 | 九九99久久 | 91综合视频 | 国产精品久久久久久吹潮 | 亚洲精品视频一区二区三区 | 午夜小电影 | 一级毛片免费在线播放 | 欧美久久xxxxxx影院 | 波多野结衣亚洲一区二区三区 | 日韩在线高清 | 免费色网 | 亚洲最大福利视频 | 超碰97青青草 | 免费精品美女久久久久久久久久 | 国产精品高清在线观看 | 久久精品久久精品久久 | 一区二区三区四区在线视频 | 欧美交换乱理伦片120秒 | 日韩精品不卡 | 艹艹艹逼 | 欧美国产精品一区 |