python

超轻量级php框架startmvc

基于python的Paxos算法实现

更新时间:2020-07-11 09:06 作者:startmvc
理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。


class Message:
 # command
 MSG_ACCEPTOR_AGREE = 0 # 追随者约定
 MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受
 MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通
 MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意
 MSG_ACCEPT = 4 # 接受
 MSG_PROPOSE = 5 # 提议
 MSG_EXT_PROPOSE = 6 # 额外提议
 MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息
 def __init__(self, command=None):
 self.command = command
 # 把收到的消息原原路返回,作为应答消息
 def copyAsReply(self, message):
 # 提议ID #当前的ID #发给谁 #谁发的
 self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
 self.value = message.value # 发的信息

然后是利用socket,线程和队列实现的消息处理器:


# 基于socket传递消息,封装网络传递消息
import threading
import pickle
import socket
import queue
class MessagePump(threading.Thread):
 # 收取消息线程
 class MPHelper(threading.Thread):
 #
 def __init__(self, owner):
 self.owner = owner
 threading.Thread.__init__(self)
 def run(self):
 while not self.owner.abort: # 只要所有者线程没有结束,一直接受消息
 try:
 (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息
 msg = pickle.loads(bytes) # 读取二进制数据转化为消息
 msg.source = addr[1]
 self.owner.queue.put(msg) # 队列存入消息
 except Exception as e:
 pass

 def __init__(self, owner, port, timeout=2):
 threading.Thread.__init__(self)
 self.owner = owner
 self.abort = False
 self.timeout = 2
 self.port = port
 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信
 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) # 通信参数
 self.socket.bind(("localhost", port)) # 通信地址,ip,端口
 self.socket.settimeout(timeout) # 超时设置
 self.queue = queue.Queue() # 队列
 self.helper = MessagePump.MPHelper(self) # 接收消息

 # 运行主线程
 def run(self):
 self.helper.start() # 开启收消息的线程
 while not self.abort:
 message = self.waitForMessage() # 阻塞等待
 self.owner.recvMessage(message) # 收取消息

 # 等待消息
 def waitForMessage(self):
 try:
 msg = self.queue.get(True, 3) # 抓取数据,最多等待3s
 return msg
 except:
 return None

 # 发送消息
 def sendMessage(self, message):
 bytes = pickle.dumps(message) # 转化为二进制
 address = ("localhost", message.to) # 地址ip,端口(ip,port)
 self.socket.sendto(bytes, address)
 return True
 #是否停止收取消息
 def doAbort(self):
 self.abort = True

再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的


from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 类的继承
 # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
 def __init__(self, owner, port, timeout=2):
 MessagePump.__init__(self, owner, port, timeout) # 初始化父类
 self.messages = set() # 集合避免重复

 def waitForMessage(self):
 try:
 msg = self.queue.get(True, 0.1) # 从队列抓取数据
 self.messages.add(msg) # 添加消息
 except Exception as e: # 处理异常
 pass
 # print(e)
 if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
 msg = random.choice(list(self.messages)) # 随机抓取消息发送
 self.messages.remove(msg) # 删除消息
 else:
 msg = None
 return msg

再来一个是记录类


# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
 def __init__(self):
 self.protocols = {}
 self.highestID = (-1, -1) # (port,count)
 self.value = None

 def addProtocol(self, protocol):
 self.protocols[protocol.proposalID] = protocol
 #
 if protocol.proposalID[1] > self.highestID[1] or (
 protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
 self.highestID = protocol.proposalID # 取得编号最大的协议

 def getProtocol(self, protocolID):
 return self.protocols[protocolID]

 def cleanProtocols(self):
 keys = self.protocols.keys()
 for k in keys:
 protocol = self.protocols[k]
 if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
 print("删除协议")
 del self.protocols[k]

下面就是Acceptor的实现:


# 追随者
from MessagePump import MessagePump
from Message import Message
from InstanceRecord import InstanceRecord
from PaxosAcceptorProtocol import PaxosAcceptorProtocol
class PaxosAcceptor:
 def __init__(self, port, leaders):
 self.port = port
 self.leaders = leaders
 self.instances = {} # 接口列表
 self.msgPump = MessagePump(self, self.port) # 消息传递器
 self.failed = False

 # 开始消息传送
 def start(self):
 self.msgPump.start()

 # 停止
 def stop(self):
 self.msgPump.doAbort()

 # 失败
 def fail(self):
 self.failed = True

 def recover(self):
 self.failed = False

 # 发送消息
 def sendMessage(self, message):
 self.msgPump.sendMessage(message)

 # 收消息,只收取为提议的消息
 def recvMessage(self, message):
 if message == None:
 return
 if self.failed: # 失败状态不收取消息
 return

 if message.command == Message.MSG_PROPOSE: # 判断消息是否为提议
 if message.instanceID not in self.instances:
 record = InstanceRecord() # 记录器
 self.instances[message.instanceID] = record
 protocol = PaxosAcceptorProtocol(self) # 创建协议
 protocol.recvProposal(message) # 收取消息
 self.instances[message.instanceID].addProtocol(protocol)
 else:
 self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

 # 通知客户端,
 def notifyClient(self, protocol, message):
 if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知
 self.instances[protocol.instanceID].value = message.value # 储存信息
 print(u"协议被客户端接受 %s" % message.value)

 # 获取最高同意的建议
 def getHighestAgreedProposal(self, instance):
 return self.instances[instance].highestID # (port,count)

 # 获取接口数据
 def getInstanceValue(self, instance):
 return self.instances[instance].value

那再看下AcceptorProtocol的实现:


from Message import Message
class PaxosAcceptorProtocol(object):
 # State variables
 STATE_UNDEFINED = -1 # 协议没有定义的情况0
 STATE_PROPOSAL_RECEIVED = 0 # 收到消息
 STATE_PROPOSAL_REJECTED = 1 # 拒绝链接
 STATE_PROPOSAL_AGREED = 2 # 同意链接
 STATE_PROPOSAL_ACCEPTED = 3 # 同意请求
 STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求

 def __init__(self, client):
 self.client = client
 self.state = PaxosAcceptorProtocol.STATE_UNDEFINED

 # 收取,只处理协议类型的消息
 def recvProposal(self, message):

 if message.command == Message.MSG_PROPOSE: # 协议
 self.proposalID = message.proposalID
 self.instanceID = message.instanceID
 (port, count) = self.client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号
 # 检测编号处理消息协议
 # 判断协议是否最高 
 if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):
 self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意
 print("同意协议:%s, %s " % (message.instanceID, message.value))
 value = self.client.getInstanceValue(message.instanceID)
 msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议
 msg.copyAsReply(message)
 msg.value = value
 msg.sequence = (port, count)
 self.client.sendMessage(msg) # 发送消息
 else: # 不再接受比最高协议小的提议
 self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
 return self.proposalID
 else:
 # 错误重试
 pass
 # 过度
 def doTransition(self, message): # 如果当前协议状态是接受连接,消息类型是接受
 if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
 self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议
 msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息
 msg.copyAsReply(message) # 拷贝并回复
 for l in self.client.leaders:
 msg.to = l
 self.client.sendMessage(msg) # 给领导发送消息
 self.notifyClient(message) # 通知自己
 return True
 raise Exception("并非预期的状态和命令")

 # 通知 自己客户端
 def notifyClient(self, message):
 self.client.notifyClient(self, message)

接着看下Leader和LeaderProtocol实现:


# 领导者
import threading
import Queue
import time
from Message import Message
from MessagePump import MessagePump
from InstanceRecord import InstanceRecord
from PaxosLeaderProtocol import PaxosLeaderProtocol
class PaxosLeader:
 # 定时监听
 class HeartbeatListener(threading.Thread):
 def __init__(self, leader):
 self.leader = leader
 self.queue = Queue.Queue() # 消息队列
 self.abort = False
 threading.Thread.__init__(self)

 def newHB(self, message):
 self.queue.put(message)

 def doAbort(self):
 self.abort = True

 def run(self): # 读取消息
 elapsed = 0
 while not self.abort:
 s = time.time()
 try:
 hb = self.queue.get(True, 2)
 # 设定规则,谁的端口号比较高,谁就是领导
 if hb.source > self.leader.port:
 self.leader.setPrimary(False)
 except:
 self.leader.setPrimary(True)

 # 定时发送
 class HeartbeatSender(threading.Thread):
 def __init__(self, leader):
 threading.Thread.__init__(self)
 self.leader = leader
 self.abort = False
 def doAbort(self):
 self.abort = True
 def run(self):
 while not self.abort:
 time.sleep(1)
 if self.leader.isPrimary:
 msg = Message(Message.MSG_HEARTBEAT)
 msg.source = self.leader.port
 for leader in self.leader.leaders:
 msg.to = leader
 self.leader.sendMessage(msg)

 def __init__(self, port, leaders=None, acceptors=None):
 self.port = port
 if leaders == None:
 self.leaders = []
 else:
 self.leaders = leaders
 if acceptors == None:
 self.acceptors = []
 else:
 self.acceptors = acceptors
 self.group = self.leaders + self.acceptors # 集合合并
 self.isPrimary = False # 自身是不是领导
 self.proposalCount = 0
 self.msgPump = MessagePump(self, port) # 消息传送器
 self.instances = {}
 self.hbListener = PaxosLeader.HeartbeatListener(self) # 监听
 self.hbSender = PaxosLeader.HeartbeatSender(self) # 发送心跳
 self.highestInstance = -1 # 协议状态
 self.stoped = True # 是否正在运行
 self.lasttime = time.time() # 最后一次时间

 def sendMessage(self, message):
 self.msgPump.sendMessage(message)

 def start(self):
 self.hbSender.start()
 self.hbListener.start()
 self.msgPump.start()
 self.stoped = False

 def stop(self):
 self.hbSender.doAbort()
 self.hbListener.doAbort()
 self.msgPump.doAbort()
 self.stoped = True

 def setPrimary(self, primary): # 设置领导者
 if self.isPrimary != primary:
 # Only print if something's changed
 if primary:
 print(u"我是leader%s" % self.port)
 else:
 print(u"我不是leader%s" % self.port)
 self.isPrimary = primary

 # 获取所有的领导下面的追随者
 def getGroup(self):
 return self.group

 def getLeaders(self):
 return self.leaders

 def getAcceptors(self):
 return self.acceptors

 # 必须获得1/2以上的人支持
 def getQuorumSize(self):
 return (len(self.getAcceptors()) / 2) + 1

 def getInstanceValue(self, instanceID):
 if instanceID in self.instances:
 return self.instances[instanceID].value
 return None

 def getHistory(self): # 历史记录
 return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]

 # 抓取同意的数量
 def getNumAccpted(self):
 return len([v for v in self.getHistory() if v != None])

 # 抓取空白时间处理下事务
 def findAndFillGaps(self):
 for i in range(1, self.highestInstance):
 if self.getInstanceValue(i) == None:
 print("填充空白", i)
 self.newProposal(0, i)
 self.lasttime = time.time()

 # 采集无用信息
 def garbageCollect(self):
 for i in self.instances:
 self.instances[i].cleanProtocols()

 # 通知领导
 def recvMessage(self, message):
 if self.stoped:
 return
 if message == None:
 if self.isPrimary and time.time() - self.lasttime > 15.0:
 self.findAndFillGaps()
 self.garbageCollect()
 return
 #处理心跳信息
 if message.command == Message.MSG_HEARTBEAT:
 self.hbListener.newHB(message)
 return True
 #处理额外的提议
 if message.command == Message.MSG_EXT_PROPOSE:
 print("额外的协议", self.port, self.highestInstance)
 if self.isPrimary:
 self.newProposal(message.value)
 return True

 if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
 self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)

 if message.command == Message.MSG_ACCEPTOR_ACCEPT:
 if message.instanceID not in self.instances:
 self.instances[message.instanceID] = InstanceRecord()
 record = self.instances[message.instanceID]
 if message.proposalID not in record.protocols:#创建协议
 protocol = PaxosLeaderProtocol(self)
 protocol.state = PaxosLeaderProtocol.STATE_AGREED
 protocol.proposalID = message.proposalID
 protocol.instanceID = message.instanceID
 protocol.value = message.value
 record.addProtocol(protocol)
 else:
 protocol = record.getProtocol(message.proposalID)

 protocol.doTransition(message)

 return True
 # 新建提议
 def newProposal(self, value, instance=None):
 protocol = PaxosLeaderProtocol(self)
 if instance == None: # 创建协议标号
 self.highestInstance += 1
 instanceID = self.highestInstance
 else:
 instanceID = instance
 self.proposalCount += 1
 id = (self.port, self.proposalCount)
 if instanceID in self.instances:
 record = self.instances[instanceID]
 else:
 record = InstanceRecord()
 self.instances[instanceID] = record
 protocol.propose(value, id, instanceID)
 record.addProtocol(protocol)

 def notifyLeader(self, protocol, message):
 if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
 print("协议接口%s被%s接受" % (message.instanceID, message.value))
 self.instances[message.instanceID].accepted = True
 self.instances[message.instanceID].value = message.value
 self.highestInstance = max(message.instanceID, self.highestInstance)
 return
 if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试
 self.proposalCount = max(self.proposalCount, message.highestPID[1])
 self.newProposal(message.value)
 return True
 if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
 pass

LeaderProtocol实现:


from Message import Message
class PaxosLeaderProtocol(object):
 STATE_UNDEFINED = -1 # 协议没有定义的情况0
 STATE_PROPOSED = 0 # 协议消息
 STATE_REJECTED = 1 # 拒绝链接
 STATE_AGREED = 2 # 同意链接
 STATE_ACCEPTED = 3 # 同意请求
 STATE_UNACCEPTED = 4 # 拒绝请求
 def __init__(self, leader):
 self.leader = leader
 self.state = PaxosLeaderProtocol.STATE_UNDEFINED
 self.proposalID = (-1, -1)
 self.agreecount, self.acceptcount = (0, 0)
 self.rejectcount, self.unacceptcount = (0, 0)
 self.instanceID = -1
 self.highestseen = (0, 0)
 # 提议
 def propose(self, value, pID, instanceID):
 self.proposalID = pID
 self.value = value
 self.instanceID = instanceID
 message = Message(Message.MSG_PROPOSE)
 message.proposalID = pID
 message.instanceID = instanceID
 message.value = value
 for server in self.leader.getAcceptors():
 message.to = server
 self.leader.sendMessage(message)
 self.state = PaxosLeaderProtocol.STATE_PROPOSED

 return self.proposalID

 # 過度
 def doTransition(self, message):
 # 根據狀態運行協議
 if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
 if message.command == Message.MSG_ACCEPTOR_AGREE:
 self.agreecount += 1
 if self.agreecount >= self.leader.getQuorumSize(): # 选举
 print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)
 if message.value != None:
 if message.sequence[0] > self.highestseen[0] or (
 message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[
 1]):
 self.value = message.value
 self.highestseen = message.sequence

 self.state = PaxosLeaderProtocol.STATE_AGREED # 同意更新
 # 发送同意消息
 msg = Message(Message.MSG_ACCEPT)
 msg.copyAsReply(message)
 msg.value = self.value
 msg.leaderID = msg.to
 for server in self.leader.getAcceptors():
 msg.to = server
 self.leader.sendMessage(msg)
 self.leader.notifyLeader(self, message)
 return True

 if message.command == Message.MSG_ACCEPTOR_REJECT:
 self.rejectcount += 1
 if self.rejectcount >= self.leader.getQuorumSize():
 self.state = PaxosLeaderProtocol.STATE_REJECTED
 self.leader.notifyLeader(self, message)
 return True

 if self.state == PaxosLeaderProtocol.STATE_AGREED:
 if message.command == Message.MSG_ACCEPTOR_ACCEPT: # 同意协议
 self.acceptcount += 1
 if self.acceptcount >= self.leader.getQuorumSize():
 self.state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受
 self.leader.notifyLeader(self, message)
 if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
 self.unacceptcount += 1
 if self.unacceptcount >= self.leader.getQuorumSize():
 self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
 self.leader.notifyLeader(self, message)

测试模块:


import socket, pickle, time
from Message import Message
from PaxosAcceptor import PaxosAcceptor
from PaxosLeader import PaxosLeader

if __name__ == "__main__":
 # 设定5个客户端
 numclients = 5
 clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]
 # 两个领导者
 leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])
 leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])

 # 开启领导者与追随者
 leader1.start()
 leader1.setPrimary(True)
 leader2.setPrimary(True)
 leader2.start()
 for c in clients:
 c.start()

 # 破坏,客户端不链接
 clients[0].fail()
 clients[1].fail()

 # 通信
 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议
 start = time.time()
 for i in range(1000):
 m = Message(Message.MSG_EXT_PROPOSE) # 消息
 m.value = 0 + i # 消息参数
 m.to = 54322 # 设置传递的端口
 bytes = pickle.dumps(m) # 提取的二进制数据
 s.sendto(bytes, ("localhost", m.to)) # 发送消息

 while leader2.getNumAccpted() < 999:
 print("休眠的这一秒 %d " % leader2.getNumAccpted())
 time.sleep(1)
 print(u"休眠10秒")
 time.sleep(10)
 print(u"停止leaders")
 leader1.stop()
 leader2.stop()
 print(u"停止客户端")
 for c in clients:
 c.stop()
 print(u"leader1历史纪录")
 print(leader1.getHistory())
 print(u"leader2历史纪录")
 print(leader2.getHistory())
 end = time.time()
 print(u"一共用了%f秒" % (end - start))

代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。