最近需要提供一個包含多個神經網絡推理的python代碼供gRPC調用,即我需要在這個主程序的基礎上封裝一個支持gRPC的服務端(server)。本教程的目的在于通過簡單的代碼,來幫助有需求的朋友使用python來構建屬于自己的gRPC服務端/客戶端。
0. 前言
最近需要用grpc調用我們的算法模塊, 對于我來講,就是需要提供一個grpc的server,供它們的go或者c++的client進行消費。那么, 在python里面如何定義一個完整的server–client,并且使其跑的非常好是個很重要的任務。
1. gRPC的官方介紹
中文官網的python接口例子直接放在grpc的github中,可能需要我們進一步的挖掘,這里,為了避免繁瑣,我將通過一個簡單的例子來說明如何將我們的任務封裝為gRPC的 服務端 (server),并開啟 客戶端 (client)對其進行調用。
在此之前,先簡單介紹一下什么是gRPC:
1.1 什么是gRPC
-
gRPC 是一個高性能、開源和通用的 RPC(遠程過程調用) 框架,面向移動和 HTTP/2 設計。目前提供 C、Java 和 Go 語言版本,分別是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持.
gRPC 基于 HTTP/2 標準設計,帶來諸如雙向流、流控、頭部壓縮、單 TCP 連接上的多復用請求等特。這些特性使得其在移動設備上表現更好,更省電和節省空間占用。 -
在 gRPC 里客戶端(client)應用可以像調用本地對象一樣直接調用另一臺不同的機器上服務端(server)應用的方法,使得您能夠更容易地創建分布式應用和服務。與許多 RPC 系統類似,gRPC 也是基于以下理念: ① 定義一個服務 , ② 指定其能夠被遠程調用的方法(包含參數和返回類型) , ③ 在服務端實現這個接口,并運行一個 gRPC 服務器來處理客戶端調用。 在客戶端擁有一個 存根(stub) 能夠像服務端一樣的方法。
- gRPC 客戶端和服務端可以在多種環境中運行和交互 - 從 google 內部的服務器到你自己的筆記本,并且可以用任何 gRPC 支持的語言來編寫。所以,你可以很容易地用 Java 創建一個 gRPC 服務端,用 Go、Python、Ruby 來創建客戶端。此外,Google 最新 API 將有 gRPC 版本的接口,使你很容易地將 Google 的功能集成到你的應用里。
1.2 使用 protocol buffers
gRPC 默認使用 protocol buffers,這是 Google 開源的一套成熟的結構數據序列化機制(當然也可以使用其他數據格式如 JSON)。正如你將在下方例子里所看到的,你用 proto files 創建 gRPC 服務,用 protocol buffers 消息類型來定義方法參數和返回類型。你可以在 Protocol Buffers 文檔找到更多關于 Protocol Buffers 的資料。
Protocol buffers 版本
盡管 protocol buffers 對于開源用戶來說已經存在了一段時間,例子內使用的卻一種名叫 proto3 的新風格的 protocol buffers,它擁有輕量簡化的語法、一些有用的新功能,并且支持更多新語言。當前針對 Java 和 C++ 發布了 beta 版本,針對 JavaNano(即 Android Java)發布 alpha 版本,在protocol buffers Github 源碼庫里有 Ruby 支持, 在golang/protobuf Github 源碼庫里還有針對 Go 語言的生成器, 對更多語言的支持正在開發中。 你可以在 proto3 語言指南里找到更多內容, 在與當前默認版本的發布說明比較,看到兩者的主要不同點。更多關于 proto3 的文檔很快就會出現。雖然你可以使用 proto2 (當前默認的 protocol buffers 版本), 我們通常建議你在 gRPC 里使用 proto3,因為這樣你可以使用 gRPC 支持全部范圍的的語言,并且能避免 proto2 客戶端與 proto3 服務端交互時出現的兼容性問題,反之亦然。
ps: 我這里使用的都是 protobuf 作為gRPC約定的中間數據傳輸格式定義。雖然可以用json,但是我沒看到這方面的教程。
2. 基本步驟
因為官方教程有比較全面的grpc的各語言接口的安裝教程,我這里以python為例,來說明對深度學習應用,我們應該如何搭建一個基于grpc的server–client。
第1步:定義服務(實現自己的hellogdh.proto)
一個 RPC 服務通過參數和返回類型來指定可以遠程調用的方法,gRPC 通過 protocol buffers 來實現。使用 protocol buffers 接口定義語言來定義服務方法,用 protocol buffer 來定義參數和返回類型。客戶端和服務端均使用服務定義生成的接口代碼。
本文的
hellogdh.proto
定義如下[2]:
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// 參考資料:python版gRPC快速入門一
// https://blog.csdn.net/Eric_lmy/article/details/81355322
syntax
=
"proto3"
;
option java_multiple_files
=
true
;
option java_package
=
"io.grpc.gdh.proto"
;
option java_outer_classname
=
"GdhProto"
;
option objc_class_prefix
=
"HLW"
;
package
hellogdh
;
// 定義服務.
service Greeter
{
// ① 簡單rpc.
rpc
SayHello
(
HelloRequest
)
returns
(
HelloReply
)
{
}
// ② 應答式流rpc.
rpc
LstSayHello
(
HelloRequest
)
returns
(
stream HelloReply
)
{
}
}
// 客戶端傳的消息: HelloRequest.
message HelloRequest
{
string name
=
1
;
}
// 服務端發送的消息: HelloReply.
// 不能使用int, 只能使用int32這種.
// 1, 2, 3表示順序.
message HelloReply
{
int32 num_people
=
1
;
// repeated 定義列表對應的結構.
repeated int32 point
=
2
;
}
其中,
syntax = "proto3"
表示使用proto3版本。option相關的東西我都沒咋動;
service Greeter
的意思是定義了一個叫做
Greeter
的服務。這個服務下面有兩種,關于gRPC可以定義的服務有4種,下面會詳細說明。
定義完畢之后,生成client和server的代碼(Note: 我之前以為client和server的代碼是自己寫的,實際實踐后才知道,是根據xxx.proto生成的??!根本不需要我們自己寫! )
執行這一步需要安裝好一些工具如下:
sudo apt-get install protobuf-compiler-grpc
sudo apt-get install protobuf-compiler
對我的環境(ubuntu18.04 python3.6) 執行:
protoc -I ./grpc --python_out=./grpc --grpc_out=./grpc --plugin=protoc-gen-grpc=`which grpc_python_plugin` hellogdh.proto
在對應的目錄下回生成兩個文件 hellogdh_pb2_grpc.py 和 hellogdh_pb2.py 。
其中, hellogdh_pb2.py包括:
- 定義在hellogdh.proto中的消息類(Message)
-
定義在hellogdh.proto中的服務的抽象類:
BetaHellogdhServicer
, 定義了Hellogdh 服務實現的 接口 。
BetaHellogdhStub
, 定義了可以被客戶端用來激活的Hellogdh RPC的 存根 。 -
應用到的函數:
beta_create_Hellogdh_server: 根據BetaHellogdhServicer對象創建一個gRPC服務器(server專用)。
beta_create_Hellogdh_stub: 客戶端用于創建存根stub(client專用)。
第2步:實現server部分代碼.
本部分分別以 簡單調用(單項RPC) 和 服務端流RPC 為例進行說明,實際上,gRPC允許4種類型服務方法(如果想完整的學習,還是建議看官方文檔的例子[1]):
對我而言,因為我需要把多進程的python程序的最后輸出隊列封裝給gRPC的server進程,
所以我首先需要把待處理的隊列(Queue)傳入gRPC server的進程,再在這個進程中定義好overwrite一些helloworld_pb2.py的方法。
最后,在主進程中啟動所有的神經網絡任務進程和gRPC進程,并 阻塞 (join(),join的作用是保證當前進程正常結束, 即不會因為主進程先退出而把未執行完的子進程kill掉。)。
代碼如下,參考自github
grpc/examples/python
下的
route_guide
import
sys
sys
.
path
.
append
(
".."
)
import
grpc
import
hellogdh_pb2
import
hellogdh_pb2_grpc
from
concurrent
import
futures
import
cv2
import
time
import
numpy
as
np
from
utils
.
config
import
*
import
logging
from
capture
import
queue_put
,
queue_get
,
queue_img_put
from
module
.
Worker
import
PoseWorker
import
multiprocessing
as
mp
from
multiprocessing
import
Pool
,
Queue
,
Lock
# 0.0 grpc.
def
grpc_server
(
queue
)
:
class
gdhServicer
(
hellogdh_pb2
.
BetaGreeterServicer
)
:
def
SayHello
(
self
,
request
,
context
)
:
# Note: 傳參的時候必須要顯式指定參數名稱, 而不能lynxi_pb2.HelloReply(1, [5, 10])
if
request
.
name
==
'gdh'
:
return
hellogdh_pb2
.
HelloReply
(
num_people
=
1
,
point
=
[
1
,
1
]
)
else
:
return
hellogdh_pb2
.
HelloReply
(
num_people
=
55
,
point
=
[
1
,
1
]
)
def
LstSayHello
(
self
,
request
,
context
)
:
while
request
.
name
==
'gdh'
:
data
=
queue
.
get
(
)
# 因為是服務端流式模式,所以用yield。
yield
hellogdh_pb2
.
HelloReply
(
num_people
=
data
.
num
,
point
=
[
data
.
point
[
0
]
,
data
.
point
[
1
]
]
)
# 1. 之前啟動server的方式.
# server = helloworld_gdh_pb2.beta_create_Greeter_server(gdhServicer())
# 2. 在route_guide里面學到的啟動server的方式.
server
=
grpc
.
server
(
futures
.
ThreadPoolExecutor
(
max_workers
=
10
)
)
lynxi_pb2_grpc
.
add_GreeterServicer_to_server
(
gdhServicer
(
)
,
server
)
server
.
add_insecure_port
(
'[::]:50051'
)
# 因為 start() 不會阻塞,如果運行時你的代碼沒有其它的事情可做,你可能需要循環等待。
server
.
start
(
)
try
:
while
True
:
# time.sleep(_ONE_DAY_IN_SECONDS)
time
.
sleep
(
5
)
except
KeyboardInterrupt
:
server
.
stop
(
)
# 2.1 對每個任務建立一個隊列.
pose_queue_raw
=
Queue
(
)
monitor_queue_raw
=
Queue
(
)
pose_out_queue
=
Queue
(
)
# key: 名稱, val[0]: 隊列, val[1]: 加載好的模型.
queues
=
{
'pose'
:
pose_queue_raw
,
'monitor'
:
monitor_queue_raw
}
# pose
# /
# /
# 3. 生產者-消費者 --- detect
# \
# \
# face_detect
processes
=
[
]
for
key
,
val
in
queues
.
items
(
)
:
processes
.
append
(
mp
.
Process
(
target
=
queue_put
,
args
=
(
val
,
)
)
)
if
key
==
'pose'
:
processes
.
append
(
PoseWorker
(
val
,
pose_out_queue
)
)
else
:
processes
.
append
(
mp
.
Process
(
target
=
queue_get
,
args
=
(
val
,
)
)
)
processes
.
append
(
mp
.
Process
(
target
=
grpc_server
,
args
=
(
pose_out_queue
,
)
)
)
[
process
.
start
(
)
for
process
in
processes
]
[
process
.
join
(
)
for
process
in
processes
]
這段代碼的意思是將PoseWorker處理得到的隊列
pose_out_queue
喂給gRPC server進程,并設置好根據client發來的請求來發送處理好的數據。
queue_put
和
queue_get
是將視頻的幀封裝后放入隊列A和從隊列A中讀取并顯示的函數。
import
cv2
from
multiprocessing
import
Queue
,
Process
from
PIL
import
Image
,
ImageFont
,
ImageDraw
import
cv2
def
queue_put
(
q
,
video_name
=
"/home/samuel/gaodaiheng/handup.mp4"
)
:
cap
=
cv2
.
VideoCapture
(
video_name
)
while
True
:
is_opened
,
frame
=
cap
.
read
(
)
q
.
put
(
frame
)
if
is_opened
else
None
def
queue_get
(
q
,
window_name
=
'image'
)
:
cv2
.
namedWindow
(
window_name
,
flags
=
cv2
.
WINDOW_NORMAL
)
while
True
:
frame
=
q
.
get
(
)
cv2
.
imshow
(
window_name
,
frame
)
cv2
.
waitKey
(
1
)
需要額外注意的是,PoseWorker是繼承
multiprocessing.Process
類的進程,其大體定義如下:
from
multiprocessing
import
Queue
,
Process
class
PoseWorker
(
Process
)
:
"""
Pose estimation姿態估計.
"""
def
__init__
(
self
,
queue
,
out_queue
)
:
Process
.
__init__
(
self
,
name
=
'PoseProcessor'
)
# 輸入隊列和輸出隊列.
self
.
in_queue
=
queue
self
.
out_queue
=
out_queue
def
run
(
self
)
:
#set enviornment
os
.
environ
[
"CUDA_VISIBLE_DEVICES"
]
=
"0"
#load models
import
tensorflow
as
tf
.
.
.
model
=
load_model
(
xxx
)
.
.
.
while
True
:
# 從入的隊列中消費數據.
frame
=
self
.
in_queue
.
get
(
)
# 喂入模型推理得到結果.
result
=
model
.
inference
(
frame
)
# 將結果放回到生產者中.
self
.
out_queue
.
put
(
result
)
第3步:實現server部分代碼.
和第2步類似,代碼如下,參考自github
grpc/examples/python
[3]下的
route_guide
# coding: UTF-8
"""
@author: samuel ko
"""
import
os
import
grpc
import
hellogdh_pb2
as
helloworld_gdh_pb2
import
hellogdh_pb2_grpc
as
helloworld_gdh_pb2_grpc
import
time
_ONE_DAY_IN_SECONDS
=
60
*
60
*
24
# 1. 為了能調用服務的方法,我們得先創建一個 存根。
# 我們使用 .proto 中生成的 route_guide_pb2 模塊的函數beta_create_RouteGuide_stub。
def
run
(
)
:
with
grpc
.
insecure_channel
(
'localhost:50051'
)
as
channel
:
# 1) 存根方式1.
stub
=
helloworld_gdh_pb2
.
GreeterStub
(
channel
)
# 2) 存根方式2.
# stub = helloworld_gdh_pb2_grpc.GreeterStub(channel)
print
(
"-------------- ① 簡單RPC --------------"
)
# response = stub.SayHello(helloworld_gdh_pb2.HelloRequest(name='gdh'))
features
=
stub
.
SayHello
(
helloworld_gdh_pb2
.
HelloRequest
(
name
=
'gdh'
)
)
print
(
features
)
print
(
"-------------- ② 服務端流式RPC --------------"
)
features
=
stub
.
LstSayHello
(
helloworld_gdh_pb2
.
HelloRequest
(
name
=
'gdh'
)
)
for
feature
in
features
:
print
(
"哈哈哈 %s at %s, %s"
%
(
feature
.
num_people
,
feature
.
point
[
0
]
,
feature
.
point
[
1
]
)
)
if
__name__
==
"__main__"
:
run
(
)
最后,就會打印出符合我們服務端設定的數據結構…
補充知識:protobuf 支持的python數據結構.
在proto的Message定義中, 我們支持python的string等類型, 在proto中需要顯式標明, 我推測是 由于gRPC是支持多種語言接口的,有些語言是 強類型 的(C/C++, Go),所以務必需要顯式標明數據類型, 避免帶來不必要的麻煩 :
message HelloRequest {
string name = 1;
}
其中, 1, 2, 3表示的是參數的順序. 我們支持的數據類型如下:
- ① string
- ② float
- ③ int32 / uint32 (不支持int16和int8)
- ④ bool
-
⑤
repeated
int以及我們自定義的Message.
這里需要特別強調, repeated 表示不定長的數組, 里面可以放built-in的類型,或者自己額外封裝的message. 很靈活. 對應python的list.
message BoxInfos
{
message BoxInfo
{
uint32 x0
=
1
;
uint32 y0
=
2
;
uint32 x1
=
3
;
uint32 y1
=
4
;
}
repeated BoxInfo boxInfos
=
1
;
}
- ⑥ bytes 字節流, 可以用于傳遞圖片. 不過一般性在gRPC中, 每條消息的大小都不大(1MB左右?) 所以一般性都是傳圖片的絕對路徑?
-
⑦
map
字典,對應python的dict, 不過需要顯式指定key和value的類型.
總結
截至目前,一個封裝多進程神經網絡算法的python版 gRPC server-client就已經圓滿完成, 因為我也是剛接觸,可能有理解上的偏差,懇請各位指正, 非常感謝~
參考資料
[1] gRPC–python中文官網
[2] python版gRPC快速入門一
[3] grpc/examples/python
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
