欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

基于python的Paxos算法實現

系統 1615 0

理解一個算法最快,最深刻的做法,我覺著可能是自己手動實現,雖然項目中不用自己實現,有已經封裝好的算法庫,供我們調用,我覺著還是有必要自己親自實踐一下。

這里首先說明一下,python這種動態語言,對不熟悉的人可能看著比較別扭,不像java那樣參數類型是固定的,所以看著會有些蛋疼。這里環境用的是python2.7。

基于python的Paxos算法實現_第1張圖片

            
class Message:
  # command
  MSG_ACCEPTOR_AGREE = 0 # 追隨者約定
  MSG_ACCEPTOR_ACCEPT = 1 # 追隨者接受
  MSG_ACCEPTOR_REJECT = 2 # 追隨者拒絕-網絡不通
  MSG_ACCEPTOR_UNACCEPT = 3 # 追隨者網絡通-不同意
  MSG_ACCEPT = 4 # 接受
  MSG_PROPOSE = 5 # 提議
  MSG_EXT_PROPOSE = 6 # 額外提議
  MSG_HEARTBEAT = 7 # 心跳,每隔一段時間同步消息
  def __init__(self, command=None):
    self.command = command
  # 把收到的消息原原路返回,作為應答消息
  def copyAsReply(self, message):
    # 提議ID #當前的ID #發給誰 #誰發的
    self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
    self.value = message.value # 發的信息
          

然后是利用socket,線程和隊列實現的消息處理器:

            
# 基于socket傳遞消息,封裝網絡傳遞消息
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
  # 收取消息線程
  class MPHelper(threading.Thread):
    #
    def __init__(self, owner):
      self.owner = owner
      threading.Thread.__init__(self)
    def run(self):
      while not self.owner.abort: # 只要所有者線程沒有結束,一直接受消息
        try:
          (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息
          msg = pickle.loads(bytes) # 讀取二進制數據轉化為消息
          msg.source = addr[1]
          self.owner.queue.put(msg) # 隊列存入消息
        except Exception as e:
          pass

  def __init__(self, owner, port, timeout=2):
    threading.Thread.__init__(self)
    self.owner = owner
    self.abort = False
    self.timeout = 2
    self.port = port
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) # 通信參數
    self.socket.bind(("localhost", port)) # 通信地址,ip,端口
    self.socket.settimeout(timeout) # 超時設置
    self.queue = queue.Queue() # 隊列
    self.helper = MessagePump.MPHelper(self) # 接收消息

  # 運行主線程
  def run(self):
    self.helper.start() # 開啟收消息的線程
    while not self.abort:
      message = self.waitForMessage() # 阻塞等待
      self.owner.recvMessage(message) # 收取消息

  # 等待消息
  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 3) # 抓取數據,最多等待3s
      return msg
    except:
      return None

  # 發送消息
  def sendMessage(self, message):
    bytes = pickle.dumps(message) # 轉化為二進制
    address = ("localhost", message.to) # 地址ip,端口(ip,port)
    self.socket.sendto(bytes, address)
    return True
  #是否停止收取消息
  def doAbort(self):
    self.abort = True
          

再來一個消息處理器, 模擬消息的傳遞,延遲,丟包,其實這個類沒什么卵用,這個是為模擬測試準備的

            
from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 類的繼承
  # 對抗消息傳輸,延遲消息并任意順序傳遞,模擬網絡的延遲,消息傳送并不是順序
  def __init__(self, owner, port, timeout=2):
    MessagePump.__init__(self, owner, port, timeout) # 初始化父類
    self.messages = set() # 集合避免重復

  def waitForMessage(self):
    try:
      msg = self.queue.get(True, 0.1) # 從隊列抓取數據
      self.messages.add(msg) # 添加消息
    except Exception as e: # 處理異常
      pass
      # print(e)
    if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
      msg = random.choice(list(self.messages)) # 隨機抓取消息發送
      self.messages.remove(msg) # 刪除消息
    else:
      msg = None
    return msg
          

再來一個是記錄類

            
# InstanceRecord本地記錄類,主要記錄追隨者、領導者最高編號的協議
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
  def __init__(self):
    self.protocols = {}
    self.highestID = (-1, -1) # (port,count)
    self.value = None

  def addProtocol(self, protocol):
    self.protocols[protocol.proposalID] = protocol
    #
    if protocol.proposalID[1] > self.highestID[1] or (
        protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
      self.highestID = protocol.proposalID # 取得編號最大的協議

  def getProtocol(self, protocolID):
    return self.protocols[protocolID]

  def cleanProtocols(self):
    keys = self.protocols.keys()
    for k in keys:
      protocol = self.protocols[k]
      if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
        print("刪除協議")
        del self.protocols[k]
          

下面就是Acceptor的實現:

            
# 追隨者
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
  def __init__(self, port, leaders):
    self.port = port
    self.leaders = leaders
    self.instances = {} # 接口列表
    self.msgPump = MessagePump(self, self.port) # 消息傳遞器
    self.failed = False

  # 開始消息傳送
  def start(self):
    self.msgPump.start()

  # 停止
  def stop(self):
    self.msgPump.doAbort()

  # 失敗
  def fail(self):
    self.failed = True

  def recover(self):
    self.failed = False

  # 發送消息
  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  # 收消息,只收取為提議的消息
  def recvMessage(self, message):
    if message == None:
      return
    if self.failed: # 失敗狀態不收取消息
      return

    if message.command == Message.MSG_PROPOSE: # 判斷消息是否為提議
      if message.instanceID not in self.instances:
        record = InstanceRecord() # 記錄器
        self.instances[message.instanceID] = record
      protocol = PaxosAcceptorProtocol(self) # 創建協議
      protocol.recvProposal(message) # 收取消息
      self.instances[message.instanceID].addProtocol(protocol)
    else:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

  # 通知客戶端,
  def notifyClient(self, protocol, message):
    if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提議被接受,通知
      self.instances[protocol.instanceID].value = message.value # 儲存信息
      print(u"協議被客戶端接受 %s" % message.value)

  # 獲取最高同意的建議
  def getHighestAgreedProposal(self, instance):
    return self.instances[instance].highestID # (port,count)

  # 獲取接口數據
  def getInstanceValue(self, instance):
    return self.instances[instance].value
          

那再看下AcceptorProtocol的實現:

            
from Message import Message
class PaxosAcceptorProtocol(object):
  # State variables
  STATE_UNDEFINED = -1 # 協議沒有定義的情況0
  STATE_PROPOSAL_RECEIVED = 0 # 收到消息
  STATE_PROPOSAL_REJECTED = 1 # 拒絕鏈接
  STATE_PROPOSAL_AGREED = 2 # 同意鏈接
  STATE_PROPOSAL_ACCEPTED = 3 # 同意請求
  STATE_PROPOSAL_UNACCEPTED = 4 # 拒絕請求

  def __init__(self, client):
    self.client = client
    self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

  # 收取,只處理協議類型的消息
  def recvProposal(self, message):

    if message.command == Message.MSG_PROPOSE: # 協議
      self.proposalID = message.proposalID
      self.instanceID = message.instanceID
      (port, count) = self.client.getHighestAgreedProposal(message.instanceID) # 端口,協議內容的最高編號
      # 檢測編號處理消息協議
      # 判斷協議是否最高 
      if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 協議同意
        print("同意協議:%s, %s " % (message.instanceID, message.value))
        value = self.client.getInstanceValue(message.instanceID)
        msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意協議
        msg.copyAsReply(message)
        msg.value = value
        msg.sequence = (port, count)
        self.client.sendMessage(msg) # 發送消息
      else: # 不再接受比最高協議小的提議
        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
      return self.proposalID
    else:
      # 錯誤重試
      pass
  # 過度
  def doTransition(self, message): # 如果當前協議狀態是接受連接,消息類型是接受
    if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
      self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收協議
      msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 創造消息
      msg.copyAsReply(message) # 拷貝并回復
      for l in self.client.leaders:
        msg.to = l
        self.client.sendMessage(msg) # 給領導發送消息
      self.notifyClient(message) # 通知自己
      return True
    raise Exception("并非預期的狀態和命令")

  # 通知 自己客戶端
  def notifyClient(self, message):
    self.client.notifyClient(self, message)
          

接著看下Leader和LeaderProtocol實現:

            
# 領導者
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
  # 定時監聽
  class HeartbeatListener(threading.Thread):
    def __init__(self, leader):
      self.leader = leader
      self.queue = Queue.Queue() # 消息隊列
      self.abort = False
      threading.Thread.__init__(self)

    def newHB(self, message):
      self.queue.put(message)

    def doAbort(self):
      self.abort = True

    def run(self): # 讀取消息
      elapsed = 0
      while not self.abort:
        s = time.time()
        try:
          hb = self.queue.get(True, 2)
          # 設定規則,誰的端口號比較高,誰就是領導
          if hb.source > self.leader.port:
            self.leader.setPrimary(False)
        except:
          self.leader.setPrimary(True)

  # 定時發送
  class HeartbeatSender(threading.Thread):
    def __init__(self, leader):
      threading.Thread.__init__(self)
      self.leader = leader
      self.abort = False
    def doAbort(self):
      self.abort = True
    def run(self):
      while not self.abort:
        time.sleep(1)
        if self.leader.isPrimary:
          msg = Message(Message.MSG_HEARTBEAT)
          msg.source = self.leader.port
          for leader in self.leader.leaders:
            msg.to = leader
            self.leader.sendMessage(msg)

  def __init__(self, port, leaders=None, acceptors=None):
    self.port = port
    if leaders == None:
      self.leaders = []
    else:
      self.leaders = leaders
    if acceptors == None:
      self.acceptors = []
    else:
      self.acceptors = acceptors
    self.group = self.leaders + self.acceptors # 集合合并
    self.isPrimary = False # 自身是不是領導
    self.proposalCount = 0
    self.msgPump = MessagePump(self, port) # 消息傳送器
    self.instances = {}
    self.hbListener = PaxosLeader.HeartbeatListener(self) # 監聽
    self.hbSender = PaxosLeader.HeartbeatSender(self) # 發送心跳
    self.highestInstance = -1 # 協議狀態
    self.stoped = True # 是否正在運行
    self.lasttime = time.time() # 最后一次時間

  def sendMessage(self, message):
    self.msgPump.sendMessage(message)

  def start(self):
    self.hbSender.start()
    self.hbListener.start()
    self.msgPump.start()
    self.stoped = False

  def stop(self):
    self.hbSender.doAbort()
    self.hbListener.doAbort()
    self.msgPump.doAbort()
    self.stoped = True

  def setPrimary(self, primary): # 設置領導者
    if self.isPrimary != primary:
      # Only print if something's changed
      if primary:
        print(u"我是leader%s" % self.port)
      else:
        print(u"我不是leader%s" % self.port)
    self.isPrimary = primary

  # 獲取所有的領導下面的追隨者
  def getGroup(self):
    return self.group

  def getLeaders(self):
    return self.leaders

  def getAcceptors(self):
    return self.acceptors

  # 必須獲得1/2以上的人支持
  def getQuorumSize(self):
    return (len(self.getAcceptors()) / 2) + 1

  def getInstanceValue(self, instanceID):
    if instanceID in self.instances:
      return self.instances[instanceID].value
    return None

  def getHistory(self): # 歷史記錄
    return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]

  # 抓取同意的數量
  def getNumAccpted(self):
    return len([v for v in self.getHistory() if v != None])

  # 抓取空白時間處理下事務
  def findAndFillGaps(self):
    for i in range(1, self.highestInstance):
      if self.getInstanceValue(i) == None:
        print("填充空白", i)
        self.newProposal(0, i)
    self.lasttime = time.time()

  # 采集無用信息
  def garbageCollect(self):
    for i in self.instances:
      self.instances[i].cleanProtocols()

  # 通知領導
  def recvMessage(self, message):
    if self.stoped:
      return
    if message == None:
      if self.isPrimary and time.time() - self.lasttime > 15.0:
        self.findAndFillGaps()
        self.garbageCollect()
      return
    #處理心跳信息
    if message.command == Message.MSG_HEARTBEAT:
      self.hbListener.newHB(message)
      return True
    #處理額外的提議
    if message.command == Message.MSG_EXT_PROPOSE:
      print("額外的協議", self.port, self.highestInstance)
      if self.isPrimary:
        self.newProposal(message.value)
      return True

    if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

    if message.command == Message.MSG_ACCEPTOR_ACCEPT:
      if message.instanceID not in self.instances:
        self.instances[message.instanceID] = InstanceRecord()
      record = self.instances[message.instanceID]
      if message.proposalID not in record.protocols:#創建協議
        protocol = PaxosLeaderProtocol(self)
        protocol.state = PaxosLeaderProtocol.STATE_AGREED
        protocol.proposalID = message.proposalID
        protocol.instanceID = message.instanceID
        protocol.value = message.value
        record.addProtocol(protocol)
      else:
        protocol = record.getProtocol(message.proposalID)

      protocol.doTransition(message)

    return True
  # 新建提議
  def newProposal(self, value, instance=None):
    protocol = PaxosLeaderProtocol(self)
    if instance == None: # 創建協議標號
      self.highestInstance += 1
      instanceID = self.highestInstance
    else:
      instanceID = instance
    self.proposalCount += 1
    id = (self.port, self.proposalCount)
    if instanceID in self.instances:
      record = self.instances[instanceID]
    else:
      record = InstanceRecord()
      self.instances[instanceID] = record
    protocol.propose(value, id, instanceID)
    record.addProtocol(protocol)

  def notifyLeader(self, protocol, message):
    if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
      print("協議接口%s被%s接受" % (message.instanceID, message.value))
      self.instances[message.instanceID].accepted = True
      self.instances[message.instanceID].value = message.value
      self.highestInstance = max(message.instanceID, self.highestInstance)
      return
    if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # 重新嘗試
      self.proposalCount = max(self.proposalCount, message.highestPID[1])
      self.newProposal(message.value)
      return True
    if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
      pass
          

LeaderProtocol實現:

            
from Message import Message
class PaxosLeaderProtocol(object):
  STATE_UNDEFINED = -1 # 協議沒有定義的情況0
  STATE_PROPOSED = 0 # 協議消息
  STATE_REJECTED = 1 # 拒絕鏈接
  STATE_AGREED = 2 # 同意鏈接
  STATE_ACCEPTED = 3 # 同意請求
  STATE_UNACCEPTED = 4 # 拒絕請求
  def __init__(self, leader):
    self.leader = leader
    self.state = PaxosLeaderProtocol.STATE_UNDEFINED
    self.proposalID = (-1, -1)
    self.agreecount, self.acceptcount = (0, 0)
    self.rejectcount, self.unacceptcount = (0, 0)
    self.instanceID = -1
    self.highestseen = (0, 0)
  # 提議
  def propose(self, value, pID, instanceID):
    self.proposalID = pID
    self.value = value
    self.instanceID = instanceID
    message = Message(Message.MSG_PROPOSE)
    message.proposalID = pID
    message.instanceID = instanceID
    message.value = value
    for server in self.leader.getAcceptors():
      message.to = server
      self.leader.sendMessage(message)
    self.state = PaxosLeaderProtocol.STATE_PROPOSED

    return self.proposalID

  # ?^度
  def doTransition(self, message):
    # 根????B?\行?f?h
    if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
      if message.command == Message.MSG_ACCEPTOR_AGREE:
        self.agreecount += 1
        if self.agreecount >= self.leader.getQuorumSize(): # 選舉
          print(u"達成協議的法定人數,最后的價值回答是:%s" % message.value)
          if message.value != None:
            if message.sequence[0] > self.highestseen[0] or (
                message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
              1]):
              self.value = message.value
              self.highestseen = message.sequence

            self.state = PaxosLeaderProtocol.STATE_AGREED # 同意更新
            # 發送同意消息
            msg = Message(Message.MSG_ACCEPT)
            msg.copyAsReply(message)
            msg.value = self.value
            msg.leaderID = msg.to
            for server in self.leader.getAcceptors():
              msg.to = server
              self.leader.sendMessage(msg)
            self.leader.notifyLeader(self, message)
          return True

        if message.command == Message.MSG_ACCEPTOR_REJECT:
          self.rejectcount += 1
          if self.rejectcount >= self.leader.getQuorumSize():
            self.state = PaxosLeaderProtocol.STATE_REJECTED
            self.leader.notifyLeader(self, message)
          return True

    if self.state == PaxosLeaderProtocol.STATE_AGREED:
      if message.command == Message.MSG_ACCEPTOR_ACCEPT: # 同意協議
        self.acceptcount += 1
        if self.acceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受
          self.leader.notifyLeader(self, message)
      if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
        self.unacceptcount += 1
        if self.unacceptcount >= self.leader.getQuorumSize():
          self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
          self.leader.notifyLeader(self, message)
          

測試模塊:

            
import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader

if __name__ == "__main__":
  # 設定5個客戶端
  numclients = 5
  clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
  # 兩個領導者
  leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
  leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])

  # 開啟領導者與追隨者
  leader1.start()
  leader1.setPrimary(True)
  leader2.setPrimary(True)
  leader2.start()
  for c in clients:
    c.start()

  # 破壞,客戶端不鏈接
  clients[0].fail()
  clients[1].fail()

  # 通信
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp協議
  start = time.time()
  for i in range(1000):
    m = Message(Message.MSG_EXT_PROPOSE) # 消息
    m.value = 0 + i # 消息參數
    m.to = 54322 # 設置傳遞的端口
    bytes = pickle.dumps(m) # 提取的二進制數據
    s.sendto(bytes, ("localhost", m.to)) # 發送消息

  while leader2.getNumAccpted() < 999:
    print("休眠的這一秒 %d " % leader2.getNumAccpted())
    time.sleep(1)
  print(u"休眠10秒")
  time.sleep(10)
  print(u"停止leaders")
  leader1.stop()
  leader2.stop()
  print(u"停止客戶端")
  for c in clients:
    c.stop()
  print(u"leader1歷史紀錄")
  print(leader1.getHistory())
  print(u"leader2歷史紀錄")
  print(leader2.getHistory())
  end = time.time()
  print(u"一共用了%f秒" % (end - start))
          

代碼確實比較長,看起來有些困難,最好還是在pycharm上看這個邏輯,可以快速定位參數指向,如果有不對的地方歡迎指正

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 久久亚洲精品国产精品黑人 | 日本特黄aa一大片 | 精品国产一区二区三区香蕉沈先生 | 久久影片| 欧美一区二区免费 | 天天综合国产 | 97国产精品视频人人做人人爱 | 久久久久国产成人精品亚洲午夜 | 亚洲精品久久久久一区二区三区 | 国产午夜视频 | 人人看人人舔 | 欧美aav | 国产免费一区视频 | 亚洲成人福利在线 | 日韩你懂得 | 久久制服丝袜 | 久久久久久一区 | 嗯啊你轻点好深啊hh在线播放 | 亚洲乱码在线卡一卡二卡新区 | 午夜久久久久久久久久一区二区 | 日韩一二三区视频 | 极品在线 | 久久色播 | 国产1区在线观看 | 韩国一级免费视频 | 天天操一操 | 国产一级成人毛片 | 欧美成人免费高清二区三区 | 偷拍自拍亚洲 | 色婷婷国产精品欧美毛片 | 九九精品视频一区在线 | 久草青青草 | 日韩一区二区免费看 | 成人在线小视频 | 国产成人禁片免费观看 | 国产精品免费入口视频 | 伊人情涩网| 久久精品视频在线观看榴莲视频 | 久爱视频www在线播放 | 久久久免费的精品 | 欧洲一区二区 |