我們都知道hadoop主要使用java實(shí)現(xiàn)的,那么如何使用python與hadoop生態(tài)圈進(jìn)行交互呢,我看到一篇很好的文章,結(jié)合google翻譯和自己的認(rèn)識(shí)分享給大家。
您將學(xué)習(xí)如何從Hadoop Distributed Filesystem直接加載文件內(nèi)存等信息。將文件從本地移動(dòng)到HDFS或設(shè)置Spark。
from
pathlib
import
Path
import
pandas
as
pd
import
numpy
as
np
?
spark 安裝
首先,安裝findspark,以及pyspark,以防您在本地計(jì)算機(jī)上工作。如果您在Hadoop集群中關(guān)注本教程,可以跳過pyspark install。為簡單起見,我將使用conda虛擬環(huán)境管理器(專業(yè)提示:在開始之前創(chuàng)建虛擬環(huán)境,不要破壞系統(tǒng)Python安裝!)。
!conda install
-
c conda
-
forge findspark
-
y
!conda install
-
c conda
-
forge pyspark
-
y
使用findspark進(jìn)行Spark設(shè)置
import
findspark
# Local Spark
# findspark.init('/home/cloudera/miniconda3/envs/jupyter/lib/python3.7/site-packages/pyspark/')
# Cloudera cluster Spark
findspark
.
init
(
spark_home
=
'/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/'
)
進(jìn)入pyspark shell
from
pyspark
.
sql
import
SparkSession
spark
=
SparkSession
.
builder
.
appName
(
'example_app'
)
.
master
(
'local[*]'
)
.
getOrCreate
(
)
讓我們獲得現(xiàn)有的數(shù)據(jù)庫。我假設(shè)您熟悉Spark DataFrame API及其方法:
spark
.
sql
(
"show databases"
)
.
show
(
)
±-----------+
|databaseName|
±-----------+
| __ibis_tmp|
| analytics|
| db1|
| default|
| fhadoop|
| juan|
±-----------+
pandas -> spark
第一個(gè)集成是關(guān)于如何將數(shù)據(jù)從pandas庫(即用于執(zhí)行內(nèi)存數(shù)據(jù)操作的Python標(biāo)準(zhǔn)庫)移動(dòng)到Spark。首先,讓我們加載一個(gè)pandas DataFrame。這個(gè)是關(guān)于馬德里的空氣質(zhì)量(只是為了滿足您的好奇心,但對(duì)于將數(shù)據(jù)從一個(gè)地方移動(dòng)到另一個(gè)地方并不重要)。你可以在這里下載。確保安裝pytables以讀取hdf5數(shù)據(jù)。
air_quality_df
=
pd
.
read_hdf
(
'data/air_quality/air-quality-madrid/madrid.h5'
,
key
=
'28079008'
)
air_quality_df
.
head
(
)
BEN | CH4 | CO | EBE | NMHC | NO | NO_2 | NOx | O_3 | PM10 | PM25 | SO_2 | TCH | TOL | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
date | ||||||||||||||
2001-07-01 01:00:00 | 30.65 | NaN | 6.91 | 42.639999 | NaN | NaN | 381.299988 | 1017.000000 | 9.010000 | 158.899994 | NaN | 47.509998 | NaN | 76.050003 |
2001-07-01 02:00:00 | 29.59 | NaN | 2.59 | 50.360001 | NaN | NaN | 209.500000 | 409.200012 | 23.820000 | 104.800003 | NaN | 20.950001 | NaN | 84.900002 |
2001-07-01 03:00:00 | 4.69 | NaN | 0.76 | 25.570000 | NaN | NaN | 116.400002 | 143.399994 | 31.059999 | 48.470001 | NaN | 11.270000 | NaN | 20.980000 |
2001-07-01 04:00:00 | 4.46 | NaN | 0.74 | 22.629999 | NaN | NaN | 116.199997 | 149.300003 | 23.780001 | 47.500000 | NaN | 10.100000 | NaN | 14.770000 |
2001-07-01 05:00:00 | 2.18 | NaN | 0.57 | 11.920000 | NaN | NaN | 100.900002 | 124.800003 | 29.530001 | 49.689999 | NaN | 7.680000 | NaN | 8.970000 |
air_quality_df
.
reset_index
(
inplace
=
True
)
air_quality_df
[
'date'
]
=
air_quality_df
[
'date'
]
.
dt
.
strftime
(
'%Y-%m-%d %H:%M:%S'
)
我們可以簡單地從pandas加載到Spark createDataFrame:
air_quality_sdf
=
spark
.
createDataFrame
(
air_quality_df
)
air_quality_sdf
.
dtypes
將DataFrame加載到Spark(如此air_quality_sdf處)后,可以使用PySpark方法輕松操作:
air_quality_sdf
.
select
(
'date'
,
'NOx'
)
.
show
(
5
)
±------------------±-----------------+
| date| NOx|
±------------------±-----------------+
|2001-07-01 01:00:00| 1017.0|
|2001-07-01 02:00:00|409.20001220703125|
|2001-07-01 03:00:00|143.39999389648438|
|2001-07-01 04:00:00| 149.3000030517578|
|2001-07-01 05:00:00|124.80000305175781|
±------------------±-----------------+
only showing top 5 rows
pandas -> spark -> hive
要將Spark DataFrame持久保存到HDFS中,可以使用默認(rèn)的Hadoop SQL引擎(Hive)進(jìn)行查詢,一個(gè)簡單的策略(不是唯一的策略)是從該DataFrame創(chuàng)建時(shí)間視圖:
air_quality_sdf
.
createOrReplaceTempView
(
"air_quality_sdf"
)
創(chuàng)建時(shí)態(tài)視圖后,可以使用Spark SQL引擎創(chuàng)建實(shí)時(shí)表create table as select。在創(chuàng)建此表之前,我將創(chuàng)建一個(gè)名為analytics存儲(chǔ)它的新數(shù)據(jù)庫
sql_drop_table
=
"""
drop table if exists analytics.pandas_spark_hive
"""
sql_drop_database
=
"""
drop database if exists analytics cascade
"""
sql_create_database
=
"""
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
sql_create_table
=
"""
create table if not exists analytics.pandas_spark_hive
using parquet
as select to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
print
(
"dropping database..."
)
result_drop_db
=
spark
.
sql
(
sql_drop_database
)
print
(
"creating database..."
)
result_create_db
=
spark
.
sql
(
sql_create_database
)
print
(
"dropping table..."
)
result_droptable
=
spark
.
sql
(
sql_drop_table
)
print
(
"creating table..."
)
result_create_table
=
spark
.
sql
(
sql_create_table
)
borrando bb
.
dd
.
.
.
creando bb
.
dd
.
.
.
borrando tabla
.
.
.
creando tabla
.
.
.
可以使用Spark SQL引擎檢查結(jié)果,例如選擇臭氧污染物濃度隨時(shí)間變化:
spark.sql("select * from analytics.pandas_spark_hive").select("date_parsed", "O_3").show(5)
±------------------±-----------------+
| date_parsed| O_3|
±------------------±-----------------+
|2001-07-01 01:00:00| 9.010000228881836|
|2001-07-01 02:00:00| 23.81999969482422|
|2001-07-01 03:00:00|31.059999465942383|
|2001-07-01 04:00:00|23.780000686645508|
|2001-07-01 05:00:00|29.530000686645508|
±------------------±-----------------+
only showing top 5 rows
?
?
?
Apache Arrow
Apache Arrow是一種內(nèi)存中的柱狀數(shù)據(jù)格式,用于支持大數(shù)據(jù)環(huán)境中的高性能操作(可以將其視為內(nèi)存等效的parquet格式)。它是用C ++開發(fā)的,但它的Python API很棒,你現(xiàn)在可以看到,但首先請安裝它:
!conda install pyarrow -y
為了與HDFS建立本地通信,我將使用pyarrow中包含的接口。只有要求是設(shè)置一個(gè)指向其位置的環(huán)境變量libhdfs。請記住,我們處于Cloudera環(huán)境中。如果你正在使用Horton必須找到合適的位置(相信我,它存在)。
建立連接
import
pyarrow
as
pa
import
os
os
.
environ
[
'ARROW_LIBHDFS_DIR'
]
=
'/opt/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/lib64/'
hdfs_interface
=
pa
.
hdfs
.
connect
(
host
=
'localhost'
,
port
=
8020
,
user
=
'cloudera'
)
在HDFS中列出文件
讓我們列出Spark之前保存的文件。請記住,這些文件先前已從本地文件加載到pandas DataFrame中,然后加載到Spark DataFrame中。Spark默認(rèn)使用分區(qū)為大量snappy壓縮文件的文件。在HDFS路徑中,您可以標(biāo)識(shí)數(shù)據(jù)庫名稱(analytics)和表名稱(pandas_spark_hive):
hdfs_interface
.
ls
(
'/user/cloudera/analytics/pandas_spark_hive/'
)
[
'/user/cloudera/analytics/pandas_spark_hive/_SUCCESS'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00000-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00001-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00002-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00003-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00004-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00005-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'/user/cloudera/analytics/pandas_spark_hive/part-00006-b4371c8e-0f5c-4d20-a136-a65e56e97f16-c000.snappy.parquet'
,
'
/
user
/
cloudera
/
analytics
/
pandas_spark_hive
/
part
-
00007
-
b4371c8e
-
0f5
Reading parquet files directly from HDFS
要直接從HDFS讀取representing文件(或充滿表示文件的文件的文件夾),我將使用之前創(chuàng)建的PyArrow HDFS界面:
table
=
hdfs_interface
.
read_parquet
(
'/user/cloudera/analytics/pandas_spark_hive/'
)
HDFS -> pandas
一旦parquetPyArrow HDFS接口讀取文件,就會(huì)創(chuàng)建一個(gè)Table對(duì)象。我們可以通過方法輕松回到pandas 使用 to_pandas:
table_df
=
table
.
to_pandas
(
)
table_df
.
head
(
)
/
home
/
cloudera
/
miniconda3
/
envs
/
jupyter
/
lib
/
python3
.
6
/
site
-
packages
/
pyarrow
/
pandas_compat
.
py
:
752
:
FutureWarning
:
.
labels was deprecated
in
version
0.24
.0
.
Use
.
codes instead
.
labels
,
=
index
.
labels
date_parsed | date | BEN | CH4 | CO | EBE | NMHC | NO | NO_2 | NOx | O_3 | PM10 | PM25 | SO_2 | TCH | TOL | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2001-06-30 23:00:00 | 2001-07-01 01:00:00 | 30.65 | NaN | 6.91 | 42.639999 | NaN | NaN | 381.299988 | 1017.000000 | 9.010000 | 158.899994 | NaN | 47.509998 | NaN | 76.050003 |
1 | 2001-07-01 00:00:00 | 2001-07-01 02:00:00 | 29.59 | NaN | 2.59 | 50.360001 | NaN | NaN | 209.500000 | 409.200012 | 23.820000 | 104.800003 | NaN | 20.950001 | NaN | 84.900002 |
2 | 2001-07-01 01:00:00 | 2001-07-01 03:00:00 | 4.69 | NaN | 0.76 | 25.570000 | NaN | NaN | 116.400002 | 143.399994 | 31.059999 | 48.470001 | NaN | 11.270000 | NaN | 20.980000 |
3 | 2001-07-01 02:00:00 | 2001-07-01 04:00:00 | 4.46 | NaN | 0.74 | 22.629999 | NaN | NaN | 116.199997 | 149.300003 | 23.780001 | 47.500000 | NaN | 10.100000 | NaN | 14.770000 |
4 | 2001-07-01 03:00:00 | 2001-07-01 05:00:00 | 2.18 | NaN | 0.57 | 11.920000 | NaN | NaN | 100.900002 | 124.800003 | 29.530001 | 49.689999 | NaN | 7.680000 | NaN | 8.970000 |
上傳本地文件到HDFS
使用PyArrow HDFS接口支持所有類型的HDFS操作,例如,將一堆本地文件上傳到HDFS:
cwd
=
Path
(
'./data/'
)
destination_path
=
'/user/cloudera/analytics/data/'
for
f
in
cwd
.
rglob
(
'*.*'
)
:
print
(
f
'uploading {f.name}'
)
with
open
(
str
(
f
)
,
'rb'
)
as
f_upl
:
hdfs_interface
.
upload
(
destination_path
+
f
.
name
,
f_upl
)
uploading sandp500
.
zip
uploading stations
.
csv
uploading madrid
.
h5
uploading diamonds_train
.
csv
uploading diamonds_test
.
csv
讓我們檢查文件是否已正確上傳,列出目標(biāo)路徑中的文件:
hdfs_interface
.
ls
(
destination_path
)
[
'/user/cloudera/analytics/data/diamonds_test.csv'
,
'/user/cloudera/analytics/data/diamonds_train.csv'
,
'/user/cloudera/analytics/data/madrid.h5'
,
'/user/cloudera/analytics/data/sandp500.zip'
,
'/user/cloudera/analytics/data/stations.csv'
]
Reading arbitrary files (not parquet) from HDFS (HDFS -> pandas example
例如,.csv可以使用方法和標(biāo)準(zhǔn)pandas函數(shù)將文件從HDFS直接加載到pandas DataFrame中open,read_csv該函數(shù)可以獲取緩沖區(qū)作為輸入:
diamonds_train
=
pd
.
read_csv
(
hdfs_interface
.
open
(
'/user/cloudera/analytics/data/diamonds_train.csv'
)
)
diamonds_train
.
head
(
)
carat | cut | color | clarity | depth | table | price | x | y | z | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 |
1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 |
2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 |
3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 |
4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 |
如果您對(duì)該庫具有的所有方法和可能性感興趣,請?jiān)L問:https://arrow.apache.org/docs/python/filesystems.html#hdfs-api
?
?
WebHDFS
有時(shí)無法訪問libhdfs本機(jī)HDFS庫(例如,從不屬于群集的計(jì)算機(jī)執(zhí)行分析)。在這種情況下,我們可以依賴WebHDFS(HDFS服務(wù)REST API),它速度較慢,不適合繁重的大數(shù)據(jù)負(fù)載,但在輕量級(jí)工作負(fù)載的情況下是一個(gè)有趣的選擇。讓我們安裝一個(gè)WebHDFS Python API:
!conda install
-
c conda
-
forge python
-
hdfs
-
y
Collecting package metadata
:
done
Solving environment
:
done
## Package Plan ##
environment location
:
/
home
/
cloudera
/
miniconda3
/
envs
/
jupyter
added
/
updated specs
:
-
python
-
hdfs
The following packages will be downloaded
:
package
|
build
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
certifi
-
2019.3
.9
|
py36_0
149
KB conda
-
forge
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Total
:
149
KB
The following packages will be UPDATED
:
ca
-
certificates pkgs
/
main
:
:
ca
-
certificates
-
2019.1
.23
-
0
-
-
>
conda
-
forge
:
:
ca
-
certificates
-
2019.3
.9
-
hecc5488_0
The following packages will be SUPERSEDED by a higher
-
priority channel
:
certifi pkgs
/
main
-
-
>
conda
-
forge
openssl pkgs
/
main
:
:
openssl
-
1.1
.
1b
-
h7b6447c_1
-
-
>
conda
-
forge
:
:
openssl
-
1.1
.
1b
-
h14c3975_1
Downloading
and
Extracting Packages
certifi
-
2019.3
.9
|
149
KB
|
##################################### | 100%
Preparing transaction
:
done
Verifying transaction
:
done
Executing transaction
:
done
建立WebHDFS連接
建立連接
from
hdfs
import
InsecureClient
web_hdfs_interface
=
InsecureClient
(
'http://localhost:50070'
,
user
=
'cloudera'
)
List files in HDFS
列表文件類似于使用PyArrow接口,只需使用list方法和HDFS 路徑:
web_hdfs_interface
.
list
(
'/user/cloudera/analytics/data'
)
[
'diamonds_test.csv'
,
'diamonds_train.csv'
,
'madrid.h5'
,
'sandp500.zip'
,
'stations.csv'
]
上傳本地文件到HDFS采用WebHDFS
cwd
=
Path
(
'./data/'
)
destination_path
=
'/user/cloudera/analytics/data_web_hdfs/'
for
f
in
cwd
.
rglob
(
'*.*'
)
:
print
(
f
'uploading {f.name}'
)
web_hdfs_interface
.
upload
(
destination_path
+
f
.
name
,
str
(
f
)
,
overwrite
=
True
)
uploading sandp500
.
zip
uploading stations
.
csv
uploading madrid
.
h5
uploading diamonds_train
.
csv
uploading diamonds_test
.
csv
讓我們檢查上傳是否正確:
web_hdfs_interface
.
list
(
destination_path
)
[
'diamonds_test.csv'
,
'diamonds_train.csv'
,
'madrid.h5'
,
'sandp500.zip'
,
'stations.csv'
]
HDFS也可以處理更大的文件(有一些限制)。這些文件來自Kaggle Microsoft惡意軟件競賽, 每個(gè)重量為幾GB:
web_hdfs_interface
.
upload
(
destination_path
+
'train.parquet'
,
'/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/train.pq'
,
overwrite
=
True
)
;
web_hdfs_interface
.
upload
(
destination_path
+
'test.parquet'
,
'/home/cloudera/analytics/29_03_2019/notebooks/data/microsoft/test.pq'
,
overwrite
=
True
)
;
使用WebHDFS 從HDFS讀取文件(HDFS - > pandas示例)?
在這種情況下,使用PyArrow parquet模塊并傳遞緩沖區(qū)來創(chuàng)建Table對(duì)象很有用。之后,可以使用to_pandas方法從Table對(duì)象輕松創(chuàng)建pandas DataFrame :
from
pyarrow
import
parquet
as
pq
from
io
import
BytesIO
with
web_hdfs_interface
.
read
(
destination_path
+
'train.parquet'
)
as
reader
:
microsoft_train
=
pq
.
read_table
(
BytesIO
(
reader
.
read
(
)
)
)
.
to_pandas
(
)
microsoft_train
.
head
(
)
MachineIdentifier | ProductName | EngineVersion | AppVersion | AvSigVersion | IsBeta | RtpStateBitfield | IsSxsPassiveMode | DefaultBrowsersIdentifier | AVProductStatesIdentifier | … | Census_FirmwareVersionIdentifier | Census_IsSecureBootEnabled | Census_IsWIMBootEnabled | Census_IsVirtualDevice | Census_IsTouchEnabled | Census_IsPenCapable | Census_IsAlwaysOnAlwaysConnectedCapable | Wdft_IsGamer | Wdft_RegionIdentifier | HasDetections | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0000028988387b115f69f31a3bf04f09 | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1735.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 36144.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 10.0 | 0 |
1 | 000007535c3f730efa9ea0b7ef1bd645 | win8defender | 1.1.14600.4 | 4.13.17134.1 | 1.263.48.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 57858.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 8.0 | 0 |
2 | 000007905a28d863f6d0d597892cd692 | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1341.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 52682.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 3.0 | 0 |
3 | 00000b11598a75ea8ba1beea8459149f | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1527.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 20050.0 | 0 | NaN | 0.0 | 0 | 0 | 0.0 | 0.0 | 3.0 | 1 |
4 | 000014a5f00daa18e76b81417eeb99fc | win8defender | 1.1.15100.1 | 4.18.1807.18075 | 1.273.1379.0 | 0 | 7.0 | 0 | NaN | 53447.0 | … | 19844.0 | 0 | 0.0 | 0.0 | 0 | 0 | 0.0 | 0.0 | 1.0 | 1 |
5 rows × 83?columns
?
?
Hive + Impala
Hive和Impala是Hadoop的兩個(gè)SQL引擎。一個(gè)是基于MapReduce(Hive),而Impala是Cloudera創(chuàng)建和開源的更現(xiàn)代,更快速的內(nèi)存實(shí)現(xiàn)。兩個(gè)引擎都可以使用其多個(gè)API之一從Python中充分利用。在這種情況下,我將向您展示impyla,它支持兩個(gè)引擎。讓我們使用conda安裝它,不要忘記安裝thrift_sasl0.2.1版本(是的,必須是這個(gè)特定的版本,否則它將無法工作):
!conda install impyla thrift_sasl
=
0.2
.1
-
y
## Package Plan ##
environment location
:
/
home
/
cloudera
/
miniconda3
/
envs
/
jupyter
added
/
updated specs
:
-
impyla
-
thrift_sasl
=
0.2
.1
The following packages will be downloaded
:
package
|
build
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
|
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
certifi
-
2019.3
.9
|
py36_0
155
KB
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
Total
:
155
KB
The following packages will be SUPERSEDED by a higher
-
priority channel
:
ca
-
certificates conda
-
forge
:
:
ca
-
certificates
-
2019.3
.9
~
-
-
>
pkgs
/
main
:
:
ca
-
certificates
-
2019.1
.23
-
0
certifi conda
-
forge
-
-
>
pkgs
/
main
openssl conda
-
forge
:
:
openssl
-
1.1
.
1b
-
h14c3975_1
-
-
>
pkgs
/
main
:
:
openssl
-
1.1
.
1b
-
h7b6447c_1
Downloading
and
Extracting Packages
certifi
-
2019.3
.9
|
155
KB
|
##################################### | 100%
Preparing transaction
:
done
Verifying transaction
:
done
Executing transaction
:
done
建立連接
from
impala
.
dbapi
import
connect
from
impala
.
util
import
as_pandas
Hive
(
Hive
-
>
pandas example
)
?
API遵循經(jīng)典的ODBC標(biāo)準(zhǔn),您可能對(duì)此很熟悉。impyla包括一個(gè)名為的實(shí)用程序函數(shù)as_pandas,可以輕松地將結(jié)果(元組列表)解析為pandas DataFrame。謹(jǐn)慎使用它,它存在某些類型的數(shù)據(jù)問題,并且對(duì)大數(shù)據(jù)工作負(fù)載效率不高。以兩種方式獲取結(jié)果:
hive_conn
=
connect
(
host
=
'localhost'
,
port
=
10000
,
database
=
'analytics'
,
auth_mechanism
=
'PLAIN'
)
with
hive_conn
.
cursor
(
)
as
c
:
c
.
execute
(
'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
)
results
=
c
.
fetchall
(
)
with
hive_conn
.
cursor
(
)
as
c
:
c
.
execute
(
'SELECT * FROM analytics.pandas_spark_hive LIMIT 100'
)
results_df
=
as_pandas
(
c
)
Impala (Impala -> pandas example)
使用Impala遵循與Hive相同的模式,只需確保連接到正確的端口,在這種情況下默認(rèn)為21050:
impala_conn
=
connect
(
host
=
'localhost'
,
port
=
21050
)
with
impala_conn
.
cursor
(
)
as
c
:
c
.
execute
(
'show databases'
)
result_df
=
as_pandas
(
c
)
name | comment | |
---|---|---|
0 | __ibis_tmp | |
1 | _impala_builtins | System database for Impala builtin functions |
2 | analytics | |
3 | db1 | |
4 | default | Default Hive database |
5 | fhadoop | |
6 | juan |
Ibis Framework
另一種選擇是Ibis Framework,它是一個(gè)相對(duì)龐大的數(shù)據(jù)源集合的高級(jí)API,包括HDFS和Impala。它是圍繞使用Python對(duì)象和方法對(duì)這些源執(zhí)行操作的想法構(gòu)建的。讓我們以與其他庫相同的方式安裝它:
!conda install ibis-framework -y
讓我們創(chuàng)建一個(gè)HDFS和Impala接口(impala需要在Ibis中使用hdfs接口對(duì)象):
import
ibis
hdfs_ibis
=
ibis
.
hdfs_connect
(
host
=
'localhost'
,
port
=
50070
)
impala_ibis
=
ibis
.
impala
.
connect
(
host
=
'localhost'
,
port
=
21050
,
hdfs_client
=
hdfs_ibis
,
user
=
'cloudera'
)
創(chuàng)建接口后,可以執(zhí)行調(diào)用方法的操作,無需編寫更多SQL。如果您熟悉ORM(對(duì)象關(guān)系映射器),這不完全相同,但基本思想非常相似。
impala_ibis
.
invalidate_metadata
(
)
impala_ibis
.
list_databases
(
)
[’__ibis_tmp’,
‘_impala_builtins’,
‘a(chǎn)nalytics’,
‘db1’,
‘default’,
‘fhadoop’,
‘juan’]
Impala -> pandas
ibis本地工作于pandas,因此無需進(jìn)行轉(zhuǎn)換。讀表返回一個(gè)pandas DataFrame對(duì)象:
table
=
impala_ibis
.
table
(
'pandas_spark_hive'
,
database
=
'analytics'
)
table_df
=
table
.
execute
(
)
table_df
.
head
(
)
pandas–>Impala
從pandas到Impala可以使用Ibis使用Impala接口選擇數(shù)據(jù)庫,設(shè)置權(quán)限(取決于您的群集設(shè)置)并使用該方法create,將pandas DataFrame對(duì)象作為參數(shù)傳遞:
analytics_db
.
table
(
'diamonds'
)
.
execute
(
)
.
head
(
5
)
carat | cut | color | clarity | depth | table | price | x | y | z | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1.21 | Premium | J | VS2 | 62.4 | 58.0 | 4268 | 6.83 | 6.79 | 4.25 |
1 | 0.32 | Very Good | H | VS2 | 63.0 | 57.0 | 505 | 4.35 | 4.38 | 2.75 |
2 | 0.71 | Fair | G | VS1 | 65.5 | 55.0 | 2686 | 5.62 | 5.53 | 3.65 |
3 | 0.41 | Good | D | SI1 | 63.8 | 56.0 | 738 | 4.68 | 4.72 | 3.00 |
4 | 1.02 | Ideal | G | SI1 | 60.5 | 59.0 | 4882 | 6.55 | 6.51 | 3.95 |
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
