自己动手实现RAFT算法

Raven's Blog

Home Page View on GitHub Send Email

自己动手实现RAFT算法

前段时间学习了一下分布式系统的raft算法,相比较Paxos协议,理解起来确实容易多了,于是就产生了自己动手实现一套基于raft一致性协议的分布式缓存的想法,经过大约两个月的空闲时间,终于完成了一个可以运行的python版本aducode/simple-raft-py

模块划分

项目主要包括如下几个模块: 1. server


def server_forever(self):
        """
        启动服务
        :return:
        """
        # init the server runtime env
        self.initialise()
        # Main Loop
        while self.inputs:
            # handle io
            self.handle_io_event()
            # handle timer
            self.handle_timeout_event()
            if not self.is_running():
                # stoppde or stopping
                break
        # release the resources
        self.realease()
        print 'Server stopped!'

class Channel(object):
    def __init__(self, server, client, _next):
        self.server = server
        self.client = client
        self.next = _next

    def input(self, data, recv):
        """
        :param data 数据
        :param recv 是否从socket接受来的消息
                    当为False时,说明数据是发送到其他server的,不是server接受来的
        """
        pass

    def output(self):
        pass

    def close(self):
        """
        说明socket被关闭,传递到handler
        """
        return self.next.close()

class Handler(object):
    def handle(self, server, client, request):
        pass

    def close(self, server, client):
        """
        handler处理close
        :return:
        """
        pass

class IO2Channel(Channel):
    """
    直接与IO关联的Channel
    """

    def __init__(self, server, client, _next):
        super(IO2Channel, self).__init__(server, client, _next)

    def input(self, data=None, recv=True):
        client_close = False
        try:
            if recv:
                request = self.client.recv(1024)
            else:
                request = data
        except Exception, e:
            client_close = True
        else:
            if request:
                self.next.input(request, recv)
            else:
                client_close = True
        if client_close:
            self.server.close(self.client)

    def output(self):
        response, end = self.next.output()
        if response:
            try:
                self.client.send(response)
            except (IOError, socket.error):
                self.server.close(self.client)
        if (not response or end) and self.client in self.server.outputs:
            self.server.outputs.remove(self.client)


class Channel2Handler(Channel):
    """
    与handler关联起来
    """

    def __init__(self, server, client, _next):
        super(Channel2Handler, self).__init__(server, client, _next)
        self.queue = Queue.Queue()

    def input(self, data, recv):
        response = None
        if recv:
            if isinstance(self.next, Handler):
                response = self.next.handle(self.server, self.client, data)
            elif isinstance(self.next, types.FunctionType):
                response = self.next(self.server, self.client, data)
        else:
            # 说明直接发出去
            response = data
        if response:
            self.queue.put(response)
            if self.client not in self.server.outputs:
                self.server.outputs.append(self.client)

    def output(self):
        try:
            return self.queue.get_nowait(), self.queue.empty()
        except Queue.Empty:
            return None, True

    def close(self):
        return self.next.close(self.server, self.client)

2. protocol

3.db

4.node & state


class State(object):
    """
    节点状态类
    不同状态有不同的表现
    """

    def __init__(self, node):
        self.node = node

    def handle(self, client, message):
        """
        处理其他node的消息
        """
        assert isinstance(message, Message)
        if isinstance(message, ClientMessage):
            if message.op == 'get':
                return self.on_get(client, message)
            else:
                return self.on_update(client, message)
        elif isinstance(message, ClientCloseMessage):
            return self.on_client_close(client, message)
        elif isinstance(message, HeartbeatRequestMessage):
            return self.on_heartbeat_request(client, message)
        elif isinstance(message, HeartbeatResponseMessage):
            return self.on_heartbeat_response(client, message)
        elif isinstance(message, ElectRequestMessage):
            return self.on_elect_request(client, message)
        elif isinstance(message, ElectResponseMessage):
            return self.on_elect_response(client, message)
        elif isinstance(message, SyncRequestMessage):
            return self.on_sync_request(client, message)
        elif isinstance(message, SyncResponseMessage):
            return self.on_sync_response(client, message)
        else:
            pass

    def on_get(self, client, message):
        """
        处理客户端get请求
        """
        return self.node.config.db.handle(client, message.op, message.key, message.value, message.auto_commit)

    def on_update(self, client, message):
        """
        处理客户端除了get外的请求
        """
        return '@%s:%d@redirect' % self.node.leader if self.node.leader \
            else 'No Leader Elected, please wait until we have a leader...'

    def on_client_close(self, client, message):
        """
        处理客户端关闭
        """
        return self.node.config.db.release(message.client)

    def on_heartbeat_request(self, client, message):
        """
        接收到heartbeat请求的处理方法
        """
        pass

    def on_heartbeat_response(self, client, message):
        """
        接收到heartbeata响应的处理方法
        """
        pass

    def on_elect_request(self, client, message):
        """
        接收到选举请求的处理方法
        """
        pass

    def on_elect_response(self, client, message):
        """
        接收到选举响应的处理方法
        """
        pass

    def on_sync_request(self, client, message):
        """
        接收到同步请求
        """
        pass

    def on_sync_response(self, client, message):
        """
        接收同步响应
        """
        pass

    def __str__(self):
        return self.__class__.__name__.strip().upper()

目前实现的功能

不足和想法


联系作者:aducode@126.com
更多精彩文章请点击:http://aducode.github.io