欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

數(shù)據(jù)分析(7)-如何使用Python與Hadoop生態(tài)系統(tǒng)進(jìn)行交互(譯)

系統(tǒng) 1707 0

我們都知道hadoop主要使用java實(shí)現(xiàn)的,那么如何使用python與hadoop生態(tài)圈進(jìn)行交互呢,我看到一篇很好的文章,結(jié)合google翻譯和自己的認(rèn)識(shí)分享給大家。
您將學(xué)習(xí)如何從Hadoop Distributed Filesystem直接加載文件內(nèi)存等信息。將文件從本地移動(dòng)到HDFS或設(shè)置Spark。

            
              
                from
              
               pathlib 
              
                import
              
               Path

              
                import
              
               pandas 
              
                as
              
               pd

              
                import
              
               numpy 
              
                as
              
               np

            
          

?

spark 安裝

首先,安裝findspark,以及pyspark,以防您在本地計(jì)算機(jī)上工作。如果您在Hadoop集群中關(guān)注本教程,可以跳過pyspark install。為簡單起見,我將使用conda虛擬環(huán)境管理器(專業(yè)提示:在開始之前創(chuàng)建虛擬環(huán)境,不要破壞系統(tǒng)Python安裝!)。

            
              !conda install 
              
                -
              
              c conda
              
                -
              
              forge findspark 
              
                -
              
              y
!conda install 
              
                -
              
              c conda
              
                -
              
              forge pyspark 
              
                -
              
              y

            
          

使用findspark進(jìn)行Spark設(shè)置

            
              
                import
              
               findspark

              
                # Local Spark
              
              
                # findspark.init('/home/cloudera/miniconda3/envs/jupyter/lib/python3.7/site-packages/pyspark/')
              
              
                # Cloudera cluster Spark
              
              
findspark
              
                .
              
              init
              
                (
              
              spark_home
              
                =
              
              
                '/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/'
              
              
                )
              
            
          

進(jìn)入pyspark shell

            
              
                from
              
               pyspark
              
                .
              
              sql 
              
                import
              
               SparkSession
spark 
              
                =
              
               SparkSession
              
                .
              
              builder
              
                .
              
              appName
              
                (
              
              
                'example_app'
              
              
                )
              
              
                .
              
              master
              
                (
              
              
                'local[*]'
              
              
                )
              
              
                .
              
              getOrCreate
              
                (
              
              
                )
              
            
          

讓我們獲得現(xiàn)有的數(shù)據(jù)庫。我假設(shè)您熟悉Spark DataFrame API及其方法:

            
              spark
              
                .
              
              sql
              
                (
              
              
                "show databases"
              
              
                )
              
              
                .
              
              show
              
                (
              
              
                )
              
            
          

±-----------+
|databaseName|
±-----------+
| __ibis_tmp|
| analytics|
| db1|
| default|
| fhadoop|
| juan|
±-----------+

pandas -> spark

第一個(gè)集成是關(guān)于如何將數(shù)據(jù)從pandas庫(即用于執(zhí)行內(nèi)存數(shù)據(jù)操作的Python標(biāo)準(zhǔn)庫)移動(dòng)到Spark。首先,讓我們加載一個(gè)pandas DataFrame。這個(gè)是關(guān)于馬德里的空氣質(zhì)量(只是為了滿足您的好奇心,但對(duì)于將數(shù)據(jù)從一個(gè)地方移動(dòng)到另一個(gè)地方并不重要)。你可以在這里下載。確保安裝pytables以讀取hdf5數(shù)據(jù)。

            
              air_quality_df 
              
                =
              
               pd
              
                .
              
              read_hdf
              
                (
              
              
                'data/air_quality/air-quality-madrid/madrid.h5'
              
              
                ,
              
               key
              
                =
              
              
                '28079008'
              
              
                )
              
              
air_quality_df
              
                .
              
              head
              
                (
              
              
                )
              
            
          
BEN CH4 CO EBE NMHC NO NO_2 NOx O_3 PM10 PM25 SO_2 TCH TOL
date
2001-07-01 01:00:00 30.65 NaN 6.91 42.639999 NaN NaN 381.299988 1017.000000 9.010000 158.899994 NaN 47.509998 NaN 76.050003
2001-07-01 02:00:00 29.59 NaN 2.59 50.360001 NaN NaN 209.500000 409.200012 23.820000 104.800003 NaN 20.950001 NaN 84.900002
2001-07-01 03:00:00 4.69 NaN 0.76 25.570000 NaN NaN 116.400002 143.399994 31.059999 48.470001 NaN 11.270000 NaN 20.980000
2001-07-01 04:00:00 4.46 NaN 0.74 22.629999 NaN NaN 116.199997 149.300003 23.780001 47.500000 NaN 10.100000 NaN 14.770000
2001-07-01 05:00:00 2.18 NaN 0.57 11.920000 NaN NaN 100.900002 124.800003 29.530001 49.689999 NaN 7.680000 NaN 8.970000
讓我們對(duì)這個(gè)DataFrame進(jìn)行一些更改,比如重置datetime索引,以便在加載到Spark時(shí)不會(huì)丟失信息。由于Spark在處理日期時(shí)遇到了一些問題(與系統(tǒng)區(qū)域設(shè)置,時(shí)區(qū)等相關(guān)),因此日期時(shí)間也將轉(zhuǎn)換為字符串。
            
              air_quality_df
              
                .
              
              reset_index
              
                (
              
              inplace
              
                =
              
              
                True
              
              
                )
              
              
air_quality_df
              
                [
              
              
                'date'
              
              
                ]
              
              
                =
              
               air_quality_df
              
                [
              
              
                'date'
              
              
                ]
              
              
                .
              
              dt
              
                .
              
              strftime
              
                (
              
              
                '%Y-%m-%d %H:%M:%S'
              
              
                )
              
            
          

我們可以簡單地從pandas加載到Spark createDataFrame:

            
              air_quality_sdf 
              
                =
              
               spark
              
                .
              
              createDataFrame
              
                (
              
              air_quality_df
              
                )
              
              
air_quality_sdf
              
                .
              
              dtypes

            
          

將DataFrame加載到Spark(如此air_quality_sdf處)后,可以使用PySpark方法輕松操作:

            
              air_quality_sdf
              
                .
              
              select
              
                (
              
              
                'date'
              
              
                ,
              
              
                'NOx'
              
              
                )
              
              
                .
              
              show
              
                (
              
              
                5
              
              
                )
              
            
          

±------------------±-----------------+
| date| NOx|
±------------------±-----------------+
|2001-07-01 01:00:00| 1017.0|
|2001-07-01 02:00:00|409.20001220703125|
|2001-07-01 03:00:00|143.39999389648438|
|2001-07-01 04:00:00| 149.3000030517578|
|2001-07-01 05:00:00|124.80000305175781|
±------------------±-----------------+
only showing top 5 rows

pandas -> spark -> hive

要將Spark DataFrame持久保存到HDFS中,可以使用默認(rèn)的Hadoop SQL引擎(Hive)進(jìn)行查詢,一個(gè)簡單的策略(不是唯一的策略)是從該DataFrame創(chuàng)建時(shí)間視圖:

            
              air_quality_sdf
              
                .
              
              createOrReplaceTempView
              
                (
              
              
                "air_quality_sdf"
              
              
                )
              
            
          

創(chuàng)建時(shí)態(tài)視圖后,可以使用Spark SQL引擎創(chuàng)建實(shí)時(shí)表create table as select。在創(chuàng)建此表之前,我將創(chuàng)建一個(gè)名為analytics存儲(chǔ)它的新數(shù)據(jù)庫

            
              sql_drop_table 
              
                =
              
              
                """
drop table if exists analytics.pandas_spark_hive
"""
              
              

sql_drop_database 
              
                =
              
              
                """
drop database if exists analytics cascade
"""
              
              

sql_create_database 
              
                =
              
              
                """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
              
              

sql_create_table 
              
                =
              
              
                """
create table if not exists analytics.pandas_spark_hive
using parquet
as select to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
              
              
                print
              
              
                (
              
              
                "dropping database..."
              
              
                )
              
              
result_drop_db 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_drop_database
              
                )
              
              
                print
              
              
                (
              
              
                "creating database..."
              
              
                )
              
              
result_create_db 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_create_database
              
                )
              
              
                print
              
              
                (
              
              
                "dropping table..."
              
              
                )
              
              
result_droptable 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_drop_table
              
                )
              
              
                print
              
              
                (
              
              
                "creating table..."
              
              
                )
              
              
result_create_table 
              
                =
              
               spark
              
                .
              
              sql
              
                (
              
              sql_create_table
              
                )
              
              

borrando bb
              
                .
              
              dd
              
                .
              
              
                .
              
              
                .
              
              
creando bb
              
                .
              
              dd
              
                .
              
              
                .
              
              
                .
              
              
borrando tabla
              
                .
              
              
                .
              
              
                .
              
              
creando tabla
              
                .
              
              
                .
              
              
                .
              
            
          

可以使用Spark SQL引擎檢查結(jié)果,例如選擇臭氧污染物濃度隨時(shí)間變化:

            
              spark.sql("select * from analytics.pandas_spark_hive").select("date_parsed", "O_3").show(5)


            
          

±------------------±-----------------+
| date_parsed| O_3|
±------------------±-----------------+
|2001-07-01 01:00:00| 9.010000228881836|
|2001-07-01 02:00:00| 23.81999969482422|
|2001-07-01 03:00:00|31.059999465942383|
|2001-07-01 04:00:00|23.780000686645508|
|2001-07-01 05:00:00|29.530000686645508|
±------------------±-----------------+
only showing top 5 rows
?
?
?

Apache Arrow

Apache Arrow是一種內(nèi)存中的柱狀數(shù)據(jù)格式,用于支持大數(shù)據(jù)環(huán)境中的高性能操作(可以將其視為內(nèi)存等效的parquet格式)。它是用C ++開發(fā)的,但它的Python API很棒,你現(xiàn)在可以看到,但首先請安裝它:

            
              !conda install pyarrow -y

            
          

為了與HDFS建立本地通信,我將使用pyarrow中包含的接口。只有要求是設(shè)置一個(gè)指向其位置的環(huán)境變量libhdfs。請記住,我們處于Cloudera環(huán)境中。如果你正在使用Horton必須找到合適的位置(相信我,它存在)。

建立連接

            
              
                import
              
               pyarrow 
              
                as
              
               pa

              
                import
              
               os
os
              
                .
              
              environ
              
                [
              
              
                'ARROW_LIBHDFS_DIR'
              
              
                ]
              
              
                =
              
              
                '/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
              
              
hdfs_interface 
              
                =
              
               pa
              
                .
              
              hdfs
              
                .
              
              connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                8020
              
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          

在HDFS中列出文件

讓我們列出Spark之前保存的文件。請記住,這些文件先前已從本地文件加載到pandas DataFrame中,然后加載到Spark DataFrame中。Spark默認(rèn)使用分區(qū)為大量snappy壓縮文件的文件。在HDFS路徑中,您可以標(biāo)識(shí)數(shù)據(jù)庫名稱(analytics)和表名稱(pandas_spark_hive):

            
              hdfs_interface
              
                .
              
              ls
              
                (
              
              
                '/user/cloudera/analytics/pandas_spark_hive/'
              
              
                )
              
              
                [
              
              
                '/user/cloudera/analytics/pandas_spark_hive/_SUCCESS'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00000-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00001-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00002-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00003-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00004-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00005-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
                '/user/cloudera/analytics/pandas_spark_hive/part-00006-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
              
              
                ,
              
              
 '
              
                /
              
              user
              
                /
              
              cloudera
              
                /
              
              analytics
              
                /
              
              pandas_spark_hive
              
                /
              
              part
              
                -
              
              
                00007
              
              
                -
              
              b4371c8e
              
                -
              
              
                0f5
              
            
          

Reading parquet files directly from HDFS

要直接從HDFS讀取representing文件(或充滿表示文件的文件的文件夾),我將使用之前創(chuàng)建的PyArrow HDFS界面:

            
              table 
              
                =
              
               hdfs_interface
              
                .
              
              read_parquet
              
                (
              
              
                '/user/cloudera/analytics/pandas_spark_hive/'
              
              
                )
              
            
          

HDFS -> pandas

一旦parquetPyArrow HDFS接口讀取文件,就會(huì)創(chuàng)建一個(gè)Table對(duì)象。我們可以通過方法輕松回到pandas 使用 to_pandas:

            
              table_df 
              
                =
              
               table
              
                .
              
              to_pandas
              
                (
              
              
                )
              
              
table_df
              
                .
              
              head
              
                (
              
              
                )
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter
              
                /
              
              lib
              
                /
              
              python3
              
                .
              
              
                6
              
              
                /
              
              site
              
                -
              
              packages
              
                /
              
              pyarrow
              
                /
              
              pandas_compat
              
                .
              
              py
              
                :
              
              
                752
              
              
                :
              
               FutureWarning
              
                :
              
              
                .
              
              labels was deprecated 
              
                in
              
               version 
              
                0.24
              
              
                .0
              
              
                .
              
               Use 
              
                .
              
              codes instead
              
                .
              
              
  labels
              
                ,
              
              
                =
              
               index
              
                .
              
              labels

            
          
date_parsed date BEN CH4 CO EBE NMHC NO NO_2 NOx O_3 PM10 PM25 SO_2 TCH TOL
0 2001-06-30 23:00:00 2001-07-01 01:00:00 30.65 NaN 6.91 42.639999 NaN NaN 381.299988 1017.000000 9.010000 158.899994 NaN 47.509998 NaN 76.050003
1 2001-07-01 00:00:00 2001-07-01 02:00:00 29.59 NaN 2.59 50.360001 NaN NaN 209.500000 409.200012 23.820000 104.800003 NaN 20.950001 NaN 84.900002
2 2001-07-01 01:00:00 2001-07-01 03:00:00 4.69 NaN 0.76 25.570000 NaN NaN 116.400002 143.399994 31.059999 48.470001 NaN 11.270000 NaN 20.980000
3 2001-07-01 02:00:00 2001-07-01 04:00:00 4.46 NaN 0.74 22.629999 NaN NaN 116.199997 149.300003 23.780001 47.500000 NaN 10.100000 NaN 14.770000
4 2001-07-01 03:00:00 2001-07-01 05:00:00 2.18 NaN 0.57 11.920000 NaN NaN 100.900002 124.800003 29.530001 49.689999 NaN 7.680000 NaN 8.970000
這就是我們開始的基礎(chǔ),關(guān)閉循環(huán)Python - > Hadoop - > Python。

上傳本地文件到HDFS

使用PyArrow HDFS接口支持所有類型的HDFS操作,例如,將一堆本地文件上傳到HDFS:

            
              cwd 
              
                =
              
               Path
              
                (
              
              
                './data/'
              
              
                )
              
              
destination_path 
              
                =
              
              
                '/user/cloudera/analytics/data/'
              
              
                for
              
               f 
              
                in
              
               cwd
              
                .
              
              rglob
              
                (
              
              
                '*.*'
              
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              f
              
                'uploading {f.name}'
              
              
                )
              
              
                with
              
              
                open
              
              
                (
              
              
                str
              
              
                (
              
              f
              
                )
              
              
                ,
              
              
                'rb'
              
              
                )
              
              
                as
              
               f_upl
              
                :
              
              
        hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
               f
              
                .
              
              name
              
                ,
              
               f_upl
              
                )
              
              
uploading sandp500
              
                .
              
              
                zip
              
              
uploading stations
              
                .
              
              csv
uploading madrid
              
                .
              
              h5
uploading diamonds_train
              
                .
              
              csv
uploading diamonds_test
              
                .
              
              csv

            
          

讓我們檢查文件是否已正確上傳,列出目標(biāo)路徑中的文件:

            
              hdfs_interface
              
                .
              
              ls
              
                (
              
              destination_path
              
                )
              
              
                [
              
              
                '/user/cloudera/analytics/data/diamonds_test.csv'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/diamonds_train.csv'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/madrid.h5'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/sandp500.zip'
              
              
                ,
              
              
                '/user/cloudera/analytics/data/stations.csv'
              
              
                ]
              
            
          

Reading arbitrary files (not parquet) from HDFS (HDFS -> pandas example

例如,.csv可以使用方法和標(biāo)準(zhǔn)pandas函數(shù)將文件從HDFS直接加載到pandas DataFrame中open,read_csv該函數(shù)可以獲取緩沖區(qū)作為輸入:

            
              diamonds_train 
              
                =
              
               pd
              
                .
              
              read_csv
              
                (
              
              hdfs_interface
              
                .
              
              
                open
              
              
                (
              
              
                '/user/cloudera/analytics/data/diamonds_train.csv'
              
              
                )
              
              
                )
              
              
diamonds_train
              
                .
              
              head
              
                (
              
              
                )
              
            
          
carat cut color clarity depth table price x y z
0 1.21 Premium J VS2 62.4 58.0 4268 6.83 6.79 4.25
1 0.32 Very Good H VS2 63.0 57.0 505 4.35 4.38 2.75
2 0.71 Fair G VS1 65.5 55.0 2686 5.62 5.53 3.65
3 0.41 Good D SI1 63.8 56.0 738 4.68 4.72 3.00
4 1.02 Ideal G SI1 60.5 59.0 4882 6.55 6.51 3.95

如果您對(duì)該庫具有的所有方法和可能性感興趣,請?jiān)L問:https://arrow.apache.org/docs/python/filesystems.html#hdfs-api
?
?

WebHDFS

有時(shí)無法訪問libhdfs本機(jī)HDFS庫(例如,從不屬于群集的計(jì)算機(jī)執(zhí)行分析)。在這種情況下,我們可以依賴WebHDFS(HDFS服務(wù)REST API),它速度較慢,不適合繁重的大數(shù)據(jù)負(fù)載,但在輕量級(jí)工作負(fù)載的情況下是一個(gè)有趣的選擇。讓我們安裝一個(gè)WebHDFS Python API:

            
              !conda install 
              
                -
              
              c conda
              
                -
              
              forge python
              
                -
              
              hdfs 
              
                -
              
              y
Collecting package metadata
              
                :
              
               done
Solving environment
              
                :
              
               done


              
                ## Package Plan ##
              
              

  environment location
              
                :
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter

  added 
              
                /
              
               updated specs
              
                :
              
              
                -
              
               python
              
                -
              
              hdfs


The following packages will be downloaded
              
                :
              
              

    package                    
              
                |
              
                          build
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                |
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
    certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
                         py36_0         
              
                149
              
               KB  conda
              
                -
              
              forge
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                                           Total
              
                :
              
              
                149
              
               KB

The following packages will be UPDATED
              
                :
              
              

  ca
              
                -
              
              certificates    pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.1
              
              
                .23
              
              
                -
              
              
                0
              
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.3
              
              
                .9
              
              
                -
              
              hecc5488_0

The following packages will be SUPERSEDED by a higher
              
                -
              
              priority channel
              
                :
              
              

  certifi                                         pkgs
              
                /
              
              main 
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
  openssl              pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h7b6447c_1 
              
                -
              
              
                -
              
              
                >
              
               conda
              
                -
              
              forge
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h14c3975_1



Downloading 
              
                and
              
               Extracting Packages
certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
              
                149
              
               KB    
              
                |
              
              
                ##################################### | 100% 
              
              
Preparing transaction
              
                :
              
               done
Verifying transaction
              
                :
              
               done
Executing transaction
              
                :
              
               done

            
          

建立WebHDFS連接

建立連接

            
              
                from
              
               hdfs 
              
                import
              
               InsecureClient

web_hdfs_interface 
              
                =
              
               InsecureClient
              
                (
              
              
                'http://localhost:50070'
              
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          

List files in HDFS

列表文件類似于使用PyArrow接口,只需使用list方法和HDFS 路徑:

            
              web_hdfs_interface
              
                .
              
              
                list
              
              
                (
              
              
                '/user/cloudera/analytics/data'
              
              
                )
              
              
                [
              
              
                'diamonds_test.csv'
              
              
                ,
              
              
                'diamonds_train.csv'
              
              
                ,
              
              
                'madrid.h5'
              
              
                ,
              
              
                'sandp500.zip'
              
              
                ,
              
              
                'stations.csv'
              
              
                ]
              
            
          

上傳本地文件到HDFS采用WebHDFS

            
              cwd 
              
                =
              
               Path
              
                (
              
              
                './data/'
              
              
                )
              
              
destination_path 
              
                =
              
              
                '/user/cloudera/analytics/data_web_hdfs/'
              
              
                for
              
               f 
              
                in
              
               cwd
              
                .
              
              rglob
              
                (
              
              
                '*.*'
              
              
                )
              
              
                :
              
              
                print
              
              
                (
              
              f
              
                'uploading {f.name}'
              
              
                )
              
              
    web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
               f
              
                .
              
              name
              
                ,
              
              
                str
              
              
                (
              
              f
              
                )
              
              
                ,
              
              
                              overwrite
              
                =
              
              
                True
              
              
                )
              
              
uploading sandp500
              
                .
              
              
                zip
              
              
uploading stations
              
                .
              
              csv
uploading madrid
              
                .
              
              h5
uploading diamonds_train
              
                .
              
              csv
uploading diamonds_test
              
                .
              
              csv

            
          

讓我們檢查上傳是否正確:

            
              web_hdfs_interface
              
                .
              
              
                list
              
              
                (
              
              destination_path
              
                )
              
              
                [
              
              
                'diamonds_test.csv'
              
              
                ,
              
              
                'diamonds_train.csv'
              
              
                ,
              
              
                'madrid.h5'
              
              
                ,
              
              
                'sandp500.zip'
              
              
                ,
              
              
                'stations.csv'
              
              
                ]
              
            
          

HDFS也可以處理更大的文件(有一些限制)。這些文件來自Kaggle Microsoft惡意軟件競賽, 每個(gè)重量為幾GB:

            
              web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
              
                'train.parquet'
              
              
                ,
              
              
                '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/train.pq'
              
              
                ,
              
               overwrite
              
                =
              
              
                True
              
              
                )
              
              
                ;
              
              
web_hdfs_interface
              
                .
              
              upload
              
                (
              
              destination_path 
              
                +
              
              
                'test.parquet'
              
              
                ,
              
              
                '/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/test.pq'
              
              
                ,
              
               overwrite
              
                =
              
              
                True
              
              
                )
              
              
                ;
              
            
          

使用WebHDFS 從HDFS讀取文件(HDFS - > pandas示例)?

在這種情況下,使用PyArrow parquet模塊并傳遞緩沖區(qū)來創(chuàng)建Table對(duì)象很有用。之后,可以使用to_pandas方法從Table對(duì)象輕松創(chuàng)建pandas DataFrame :

            
              
                from
              
               pyarrow 
              
                import
              
               parquet 
              
                as
              
               pq

              
                from
              
               io 
              
                import
              
               BytesIO


              
                with
              
               web_hdfs_interface
              
                .
              
              read
              
                (
              
              destination_path 
              
                +
              
              
                'train.parquet'
              
              
                )
              
              
                as
              
               reader
              
                :
              
              
    microsoft_train 
              
                =
              
               pq
              
                .
              
              read_table
              
                (
              
              BytesIO
              
                (
              
              reader
              
                .
              
              read
              
                (
              
              
                )
              
              
                )
              
              
                )
              
              
                .
              
              to_pandas
              
                (
              
              
                )
              
              
microsoft_train
              
                .
              
              head
              
                (
              
              
                )
              
            
          
MachineIdentifier ProductName EngineVersion AppVersion AvSigVersion IsBeta RtpStateBitfield IsSxsPassiveMode DefaultBrowsersIdentifier AVProductStatesIdentifier Census_FirmwareVersionIdentifier Census_IsSecureBootEnabled Census_IsWIMBootEnabled Census_IsVirtualDevice Census_IsTouchEnabled Census_IsPenCapable Census_IsAlwaysOnAlwaysConnectedCapable Wdft_IsGamer Wdft_RegionIdentifier HasDetections
0 0000028988387b115f69f31a3bf04f09 win8defender 1.1.15100.1 4.18.1807.18075 1.273.1735.0 0 7.0 0 NaN 53447.0 36144.0 0 NaN 0.0 0 0 0.0 0.0 10.0 0
1 000007535c3f730efa9ea0b7ef1bd645 win8defender 1.1.14600.4 4.13.17134.1 1.263.48.0 0 7.0 0 NaN 53447.0 57858.0 0 NaN 0.0 0 0 0.0 0.0 8.0 0
2 000007905a28d863f6d0d597892cd692 win8defender 1.1.15100.1 4.18.1807.18075 1.273.1341.0 0 7.0 0 NaN 53447.0 52682.0 0 NaN 0.0 0 0 0.0 0.0 3.0 0
3 00000b11598a75ea8ba1beea8459149f win8defender 1.1.15100.1 4.18.1807.18075 1.273.1527.0 0 7.0 0 NaN 53447.0 20050.0 0 NaN 0.0 0 0 0.0 0.0 3.0 1
4 000014a5f00daa18e76b81417eeb99fc win8defender 1.1.15100.1 4.18.1807.18075 1.273.1379.0 0 7.0 0 NaN 53447.0 19844.0 0 0.0 0.0 0 0 0.0 0.0 1.0 1

5 rows × 83?columns

?
?

Hive + Impala

Hive和Impala是Hadoop的兩個(gè)SQL引擎。一個(gè)是基于MapReduce(Hive),而Impala是Cloudera創(chuàng)建和開源的更現(xiàn)代,更快速的內(nèi)存實(shí)現(xiàn)。兩個(gè)引擎都可以使用其多個(gè)API之一從Python中充分利用。在這種情況下,我將向您展示impyla,它支持兩個(gè)引擎。讓我們使用conda安裝它,不要忘記安裝thrift_sasl0.2.1版本(是的,必須是這個(gè)特定的版本,否則它將無法工作):

            
              !conda install impyla thrift_sasl
              
                =
              
              
                0.2
              
              
                .1
              
              
                -
              
              y

              
                ## Package Plan ##
              
              

  environment location
              
                :
              
              
                /
              
              home
              
                /
              
              cloudera
              
                /
              
              miniconda3
              
                /
              
              envs
              
                /
              
              jupyter

  added 
              
                /
              
               updated specs
              
                :
              
              
                -
              
               impyla
    
              
                -
              
               thrift_sasl
              
                =
              
              
                0.2
              
              
                .1
              
              


The following packages will be downloaded
              
                :
              
              

    package                    
              
                |
              
                          build
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                |
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
    certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
                         py36_0         
              
                155
              
               KB
    
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                -
              
              
                                           Total
              
                :
              
              
                155
              
               KB

The following packages will be SUPERSEDED by a higher
              
                -
              
              priority channel
              
                :
              
              

  ca
              
                -
              
              certificates    conda
              
                -
              
              forge
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.3
              
              
                .9
              
              
                ~
              
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              ca
              
                -
              
              certificates
              
                -
              
              
                2019.1
              
              
                .23
              
              
                -
              
              
                0
              
              
  certifi                                       conda
              
                -
              
              forge 
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
  openssl            conda
              
                -
              
              forge
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h14c3975_1 
              
                -
              
              
                -
              
              
                >
              
               pkgs
              
                /
              
              main
              
                :
              
              
                :
              
              openssl
              
                -
              
              
                1.1
              
              
                .
              
              
                1b
              
              
                -
              
              h7b6447c_1



Downloading 
              
                and
              
               Extracting Packages
certifi
              
                -
              
              
                2019.3
              
              
                .9
              
              
                |
              
              
                155
              
               KB    
              
                |
              
              
                ##################################### | 100% 
              
              
Preparing transaction
              
                :
              
               done
Verifying transaction
              
                :
              
               done
Executing transaction
              
                :
              
               done


            
          

建立連接

            
              
                from
              
               impala
              
                .
              
              dbapi 
              
                import
              
               connect

              
                from
              
               impala
              
                .
              
              util 
              
                import
              
               as_pandas
Hive 
              
                (
              
              Hive 
              
                -
              
              
                >
              
               pandas example
              
                )
              
              ?

            
          

API遵循經(jīng)典的ODBC標(biāo)準(zhǔn),您可能對(duì)此很熟悉。impyla包括一個(gè)名為的實(shí)用程序函數(shù)as_pandas,可以輕松地將結(jié)果(元組列表)解析為pandas DataFrame。謹(jǐn)慎使用它,它存在某些類型的數(shù)據(jù)問題,并且對(duì)大數(shù)據(jù)工作負(fù)載效率不高。以兩種方式獲取結(jié)果:

            
              hive_conn 
              
                =
              
               connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                10000
              
              
                ,
              
               database
              
                =
              
              
                'analytics'
              
              
                ,
              
               auth_mechanism
              
                =
              
              
                'PLAIN'
              
              
                )
              
              
                with
              
               hive_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
              
              
                )
              
              
    results 
              
                =
              
               c
              
                .
              
              fetchall
              
                (
              
              
                )
              
              
                with
              
               hive_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
              
              
                )
              
              
    results_df 
              
                =
              
               as_pandas
              
                (
              
              c
              
                )
              
            
          

Impala (Impala -> pandas example)

使用Impala遵循與Hive相同的模式,只需確保連接到正確的端口,在這種情況下默認(rèn)為21050:

            
              impala_conn 
              
                =
              
               connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                21050
              
              
                )
              
              
                with
              
               impala_conn
              
                .
              
              cursor
              
                (
              
              
                )
              
              
                as
              
               c
              
                :
              
              
    c
              
                .
              
              execute
              
                (
              
              
                'show databases'
              
              
                )
              
              
    result_df 
              
                =
              
               as_pandas
              
                (
              
              c
              
                )
              
            
          
name comment
0 __ibis_tmp
1 _impala_builtins System database for Impala builtin functions
2 analytics
3 db1
4 default Default Hive database
5 fhadoop
6 juan

Ibis Framework

另一種選擇是Ibis Framework,它是一個(gè)相對(duì)龐大的數(shù)據(jù)源集合的高級(jí)API,包括HDFS和Impala。它是圍繞使用Python對(duì)象和方法對(duì)這些源執(zhí)行操作的想法構(gòu)建的。讓我們以與其他庫相同的方式安裝它:

            
              !conda install ibis-framework -y

            
          

讓我們創(chuàng)建一個(gè)HDFS和Impala接口(impala需要在Ibis中使用hdfs接口對(duì)象):

            
              
                import
              
               ibis

hdfs_ibis 
              
                =
              
               ibis
              
                .
              
              hdfs_connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                50070
              
              
                )
              
              
impala_ibis 
              
                =
              
               ibis
              
                .
              
              impala
              
                .
              
              connect
              
                (
              
              host
              
                =
              
              
                'localhost'
              
              
                ,
              
               port
              
                =
              
              
                21050
              
              
                ,
              
               hdfs_client
              
                =
              
              hdfs_ibis
              
                ,
              
               user
              
                =
              
              
                'cloudera'
              
              
                )
              
            
          

創(chuàng)建接口后,可以執(zhí)行調(diào)用方法的操作,無需編寫更多SQL。如果您熟悉ORM(對(duì)象關(guān)系映射器),這不完全相同,但基本思想非常相似。

            
              impala_ibis
              
                .
              
              invalidate_metadata
              
                (
              
              
                )
              
              
impala_ibis
              
                .
              
              list_databases
              
                (
              
              
                )
              
            
          

[’__ibis_tmp’,
‘_impala_builtins’,
‘a(chǎn)nalytics’,
‘db1’,
‘default’,
‘fhadoop’,
‘juan’]

Impala -> pandas

ibis本地工作于pandas,因此無需進(jìn)行轉(zhuǎn)換。讀表返回一個(gè)pandas DataFrame對(duì)象:

            
              table 
              
                =
              
               impala_ibis
              
                .
              
              table
              
                (
              
              
                'pandas_spark_hive'
              
              
                ,
              
               database
              
                =
              
              
                'analytics'
              
              
                )
              
              
table_df 
              
                =
              
               table
              
                .
              
              execute
              
                (
              
              
                )
              
              
table_df
              
                .
              
              head
              
                (
              
              
                )
              
            
          

pandas–>Impala

從pandas到Impala可以使用Ibis使用Impala接口選擇數(shù)據(jù)庫,設(shè)置權(quán)限(取決于您的群集設(shè)置)并使用該方法create,將pandas DataFrame對(duì)象作為參數(shù)傳遞:

            
              analytics_db
              
                .
              
              table
              
                (
              
              
                'diamonds'
              
              
                )
              
              
                .
              
              execute
              
                (
              
              
                )
              
              
                .
              
              head
              
                (
              
              
                5
              
              
                )
              
            
          
carat cut color clarity depth table price x y z
0 1.21 Premium J VS2 62.4 58.0 4268 6.83 6.79 4.25
1 0.32 Very Good H VS2 63.0 57.0 505 4.35 4.38 2.75
2 0.71 Fair G VS1 65.5 55.0 2686 5.62 5.53 3.65
3 0.41 Good D SI1 63.8 56.0 738 4.68 4.72 3.00
4 1.02 Ideal G SI1 60.5 59.0 4882 6.55 6.51 3.95
最后希望翻譯這篇文章對(duì)你有所幫助謝謝!

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 日韩av日韩 | 免费久久99精品国产婷婷六月 | 欧美亚洲 尤物久久 综合精品 | 欧美视频在线看 | seku.tv| 亚洲精品一区二区三区婷婷月色 | 欧美精品aaa久久久影院 | 亚洲色图片区 | 色婷婷久久久亚洲一区二区三区 | 久久人精品 | 免费污的网站 | 插插插91 | 国产日韩精品一区二区 | 欧美日韩综合精品一区二区三区 | 欧美a在线观看 | 久草在线综合 | 手机国产日韩高清免费看片 | 男女配种超爽免费视频 | 久久小视频 | 精品国产视频在线观看 | 九九热免费视频在线观看 | 九一免费版在线观看 | 久久新地址 | 日韩成人免费电影 | 亚洲日韩中文字幕一区 | 免费毛片在线视频 | 久久久久国产精品一区 | 久久我们这里只有精品国产4 | 欧美日韩亚洲一区 | 韩日一区二区 | 亚洲午夜精品视频 | 日韩影院在线观看 | 末成年毛片在线播放 | 亚洲精品不卡久久久久久 | 欧美一级网址 | 国产性夜夜性夜夜爽91 | 色无极在线观看 | 日韩欧美精品综合一区二区三区 | 欧美在线观看一区 | 91在线成人 | 6080yy免费毛片一级新视觉 |