欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

示例:python模擬日志生成+Flume+Kafka+Spark

系統 1866 0

生成模擬數據

  1. 編寫 generate_log.py
            
              
                #coding=UTF-8
              
              
                import
              
               random

              
                import
              
               time

url_paths
              
                =
              
              
                [
              
              
                "class/112.html"
              
              
                ,
              
              
                "class/128.html"
              
              
                ,
              
              
                "class/145.html"
              
              
                ,
              
              
                "class/130.html"
              
              
                ,
              
              
                "class/146.html"
              
              
                ,
              
              
                "class/131.html"
              
              
                ,
              
              
                "learn/821"
              
              
                ,
              
              
                "course/list"
              
              
                ]
              
              

ip_slices
              
                =
              
              
                [
              
              
                132
              
              
                ,
              
              
                156
              
              
                ,
              
              
                124
              
              
                ,
              
              
                10
              
              
                ,
              
              
                29
              
              
                ,
              
              
                167
              
              
                ,
              
              
                143
              
              
                ,
              
              
                187
              
              
                ,
              
              
                30
              
              
                ,
              
              
                46
              
              
                ,
              
              
                55
              
              
                ,
              
              
                63
              
              
                ,
              
              
                72
              
              
                ,
              
              
                87
              
              
                ,
              
              
                98
              
              
                ,
              
              
                168
              
              
                ]
              
              

http_referers
              
                =
              
              
                [
              
              
                "https://www.baidu.com/s?wd={query}"
              
              
                ,
              
              
                "https://www.sogou.com/web?query={query}"
              
              
                ,
              
              
                "https://cn.bing.com/search?q={query}"
              
              
                ,
              
              
                "https://www.so.com/s?q={query}"
              
              
                ]
              
              

search_keyword
              
                =
              
              
                [
              
              
                "spark sql實戰"
              
              
                ,
              
              
                "hadoop 基礎"
              
              
                ,
              
              
                "storm實戰"
              
              
                ,
              
              
                "spark streaming實戰"
              
              
                ]
              
              

status_code
              
                =
              
              
                [
              
              
                "200"
              
              
                ,
              
              
                "404"
              
              
                ,
              
              
                "500"
              
              
                ]
              
              
                def
              
              
                sample_status_code
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              status_code
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_referer
              
              
                (
              
              
                )
              
              
                :
              
              
                if
              
               random
              
                .
              
              uniform
              
                (
              
              
                0
              
              
                ,
              
              
                1
              
              
                )
              
              
                >
              
              
                0.2
              
              
                :
              
              
                return
              
              
                "-"
              
              
    refer_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              http_referers
              
                ,
              
              
                1
              
              
                )
              
              
    query_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              search_keyword
              
                ,
              
              
                1
              
              
                )
              
              
                return
              
               refer_str
              
                [
              
              
                0
              
              
                ]
              
              
                .
              
              
                format
              
              
                (
              
              query
              
                =
              
              query_str
              
                [
              
              
                0
              
              
                ]
              
              
                )
              
              
                def
              
              
                sample_url
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              url_paths
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_ip
              
              
                (
              
              
                )
              
              
                :
              
              
                slice
              
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              ip_slices
              
                ,
              
              
                4
              
              
                )
              
              
                return
              
              
                "."
              
              
                .
              
              join
              
                (
              
              
                [
              
              
                str
              
              
                (
              
              item
              
                )
              
              
                for
              
               item 
              
                in
              
              
                slice
              
              
                ]
              
              
                )
              
              
                def
              
              
                generate_log
              
              
                (
              
              count
              
                =
              
              
                10
              
              
                )
              
              
                :
              
              
    time_str
              
                =
              
              time
              
                .
              
              strftime
              
                (
              
              
                "%Y-%m-%d %H:%M:%S"
              
              
                ,
              
              time
              
                .
              
              localtime
              
                (
              
              
                )
              
              
                )
              
              

    f
              
                =
              
              
                open
              
              
                (
              
              
                "C:/Users/DaiRenLong/Desktop/streaming_access.log"
              
              
                ,
              
              
                "w+"
              
              
                )
              
              
                while
              
               count 
              
                >=
              
              
                1
              
              
                :
              
              
        query_log
              
                =
              
              
                "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}"
              
              
                .
              
              
                format
              
              
                (
              
              url
              
                =
              
              sample_url
              
                (
              
              
                )
              
              
                ,
              
              ip
              
                =
              
              sample_ip
              
                (
              
              
                )
              
              
                ,
              
              refer
              
                =
              
              sample_referer
              
                (
              
              
                )
              
              
                ,
              
              status_code
              
                =
              
              sample_status_code
              
                (
              
              
                )
              
              
                ,
              
              local_time
              
                =
              
              time_str
              
                )
              
              
                print
              
              
                (
              
              query_log
              
                )
              
              
        f
              
                .
              
              write
              
                (
              
              query_log
              
                +
              
              
                "\n"
              
              
                )
              
              
        count
              
                =
              
              count
              
                -
              
              
                1
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 每一分鐘生成一次日志信息
              
              
                while
              
              
                True
              
              
                :
              
              
        generate_log
              
                (
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                60
              
              
                )
              
            
          
  1. 日志文件對接flume==>kafka
    Flume配置文件: https://blog.csdn.net/drl_blogs/article/details/95192574#execkafkaconf_1
  2. 運行flume
            
              flume-ng agent \
--name exec-memory-logger \
--conf conf 
              
                $FLUME_HOME
              
              /conf \
--conf-file 
              
                $FLUME_HOME
              
              /conf/streaming_project.conf \
-Dflume.root.logger
              
                =
              
              INFO,console 
              
                &
              
            
          
  1. 運行kafka消費者
            
               kafka-console-consumer.sh \
 --zookeeper hadoop01:2181 \
 --topic kafka_streaming_topic

            
          
  1. 運行python文件測試
            
               python generate_log.py

            
          
  1. 查看kafka消費者消費者是否有信息

  2. 編寫代碼打通通道

            
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              log4j
              
                .
              
              
                {
              
              Level
              
                ,
              
               Logger
              
                }
              
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              SparkConf

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              kafka
              
                .
              
              KafkaUtils

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              
                {
              
              Seconds
              
                ,
              
               StreamingContext
              
                }
              
              

object kafka_Receiver_streaming 
              
                {
              
              
  Logger
              
                .
              
              
                getLogger
              
              
                (
              
              
                "org"
              
              
                )
              
              
                .
              
              
                setLevel
              
              
                (
              
              Level
              
                .
              
              WARN
              
                )
              
              
  def 
              
                main
              
              
                (
              
              args
              
                :
              
               Array
              
                [
              
              String
              
                ]
              
              
                )
              
              
                :
              
               Unit 
              
                =
              
              
                {
              
              
    val sparkConf 
              
                =
              
              
                new
              
              
                SparkConf
              
              
                (
              
              
                )
              
              
                .
              
              
                setAppName
              
              
                (
              
              
                "kafka_Receiver_streaming"
              
              
                )
              
              
                .
              
              
                setMaster
              
              
                (
              
              
                "local[*]"
              
              
                )
              
              
                .
              
              
                set
              
              
                (
              
              
                "spark.port.maxRetries"
              
              
                ,
              
              
                "100"
              
              
                )
              
              

    val ssc 
              
                =
              
              
                new
              
              
                StreamingContext
              
              
                (
              
              sparkConf
              
                ,
              
              
                Seconds
              
              
                (
              
              
                60
              
              
                )
              
              
                )
              
              

    val messages 
              
                =
              
               KafkaUtils
              
                .
              
              
                createStream
              
              
                (
              
              ssc
              
                ,
              
              
                "hadoop01:2181"
              
              
                ,
              
              
                "test"
              
              
                ,
              
              
                Map
              
              
                (
              
              
                "kafka_streaming_topic"
              
              
                -
              
              
                >
              
              
                1
              
              
                )
              
              
                )
              
              
    messages
              
                .
              
              
                map
              
              
                (
              
              _
              
                .
              
              _2
              
                )
              
              
                .
              
              
                count
              
              
                (
              
              
                )
              
              
                .
              
              
                print
              
              
                (
              
              
                )
              
              

    ssc
              
                .
              
              
                start
              
              
                (
              
              
                )
              
              
    ssc
              
                .
              
              
                awaitTermination
              
              
                (
              
              
                )
              
              
                }
              
              
                }
              
            
          
  1. 運行代碼查看結果

更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 色男人的天堂久久综合 | 午夜影视网 | 欧美在线观看一区 | 国产小网站| 亚洲一区黄色 | 欧美人禽 | 亚洲免费在线播放 | 欧美无遮挡一区二区三区 | 欧美精品久久久久久久久久 | 日韩视频在线观看 | 欧美黑人xxx | 日韩一级片播放 | 色哟哟在线观看精品入口 | 色噜噜在线观看 | 五月婷婷丁香六月 | 日本精品久久久一区二区三区 | 成人福利视频网站 | 国内精品伊人久久久久7777人 | 欧美亚洲一区二区三区四区 | 国产乱码精品一区二区三上 | 六月婷婷综合激情 | 欧美手机看片 | 91高清国产经典在线观看 | 日韩av电影在线免费观看 | 日韩精品久| 精品热99 | 国产嘿咻 | 成人福利视频在线看高清观看 | 久久精品草 | 欧美日韩亚洲在线 | 中文一区 | 特黄特色的免费大片看看 | 在线视频二区 | 欧美区一区二区三 | 一区二区欧美视频 | 视频一区二区三区四区五区 | 免费看成年视频网页 | 国产高清在线观看av | 成人二区 | 最新日本中文字幕在线观看 | 久久综合九色综合欧美狠狠 |