• Home
  • About
    • Shadow Knight photo

      Shadow Knight

      https://live.bilibili.com/9565678

    • Learn More
    • Twitter
    • Facebook
    • Instagram
    • Github
    • Steam
  • Posts
    • All Posts
    • All Tags
  • Projects

golang实现简易版的raft

25 May 2018

Reading time ~5 minutes

golang实现简易版的raft

package main

import (
	"os"
	"strconv"
	"fmt"
	"sync"
	"net/rpc"
	"net/http"
	"math/rand"
	"time"
	"log"
)
//设置节点个数
const raftCount  = 3
//设置存储leader类型的变量AppendEntriesArsg
type AppendEntriesArgs struct {
	//任期
	Term int
	//leader 编号
	LeaderId int
}
//初始化存储leader的类型变量
var args = AppendEntriesArgs{0,-1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId =1
//用nodeTable存储每个节点中的id和port
var nodeTable map[string]string
//对每个节点地址和端口的封装成结构体
type nodeInfo struct {
	id string
	port string
}
func main() {
	if len(os.Args)>1 {
		//接收终端输入的信息
		userId:=os.Args[1]
		//字符串转int
		id,_:=strconv.Atoi(userId)
		fmt.Println(id)
		//设置node id 和端口号 和总量
		nodeTable = map[string]string {
			"1":":1200",
			"2":":1201",
			"3":":1212",
		}
		//将当前node的信息封装成结构体nodeInfo==node
		node:=nodeInfo{id:userId,port:nodeTable[userId]}
		//创建Raft结构体指针并传入几号服务器
		rf:=Make(id)
		//设置Raft结构体指针的node变量值是信息是当前node的信息 -ip -端口
		rf.node = node
		//注册rpc
		go func() {
			//注册rpc,为了实现远程链接
			//调用开启端口监听传入当前node的端口-port
			rf.raftRegisterRPC(node.port)
		}()
		if userId == "1" {
			go func() {
				//接受浏览器发送的请求
				http.HandleFunc("/req", rf.getRequest)
				fmt.Println("监听5000")
				if err := http.ListenAndServe(":5000", nil); err != nil {
					fmt.Println(err)
					return
				}
			}()
		}
	}
	for{;}
}
var clientWriter http.ResponseWriter

func (rf *Raft)getRequest(writer http.ResponseWriter, request *http.Request) {

	fmt.Println("come")
	request.ParseForm()
	if len(request.Form["age"]) > 0 {
		clientWriter = writer
		fmt.Println("1.主节点广播客户端请求\age:", request.Form["age"][0])

		param := Param{Msg:request.Form["age"][0], MsgId:strconv.Itoa(messageId)}
		messageId++
		//主节点广播请求
		if args.LeaderId == rf.me {
			rf.sendMessageToOtherNodes(param)
		} else {
			//将消息转发给leader
			leaderId := nodeTable[strconv.Itoa(args.LeaderId)]
			//连接远程rpc服务
			rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)
			if err != nil {
				log.Fatal("\nrpc转发连接server错误:",args.LeaderId, err)
			}
			var bo bool = false
			//首先geileader传递
			err = rpc.Call("Raft.ForwardingMessage", param, &bo)
			if err != nil {
				log.Fatal("\nrpc转发调用server错误:",args.LeaderId, err)
			}
		}
	}
}
//开始广播
func (rf *Raft) sendMessageToOtherNodes(param Param) {
	bufferMessage[param.MsgId] = param.Msg
	// 只有领导才能给其它服务器发送消息
	if rf.currentLeader == rf.me {
		var success_count int = 0
		fmt.Printf("领导者发送数据中 。。。\n")
		go func() {
			//向从节点发送日志并让从节点存储
			rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {
				//回调函数
				//需要其它服务端回应
				rf.message <- ok
			})
		}()
		for i := 0; i < raftCount-1; i++ {

			fmt.Println("等待其它服务端回应")
			select {
			case ok := <-rf.message:
				if ok {
					success_count++
					if success_count >= raftCount/2 {
						rf.mu.Lock()
						rf.lastMessageTime = milliseconds()
						//模模拟向数据库存储日志
						mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]
						delete(bufferMessage, param.MsgId)
						if clientWriter != nil {
							fmt.Fprintf(clientWriter, "OK")
						}
						fmt.Printf("\n领导者发送数据结束\n")
						rf.mu.Unlock()
					}
				}
			}
		}
	}
}
//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {
	//注册一个服务器
	rpc.Register(node)
	//把服务绑定到http协议上
	rpc.HandleHTTP()
	err :=http.ListenAndServe(port,nil)
	if err!=nil {
		fmt.Println("注册rpc服务的错",err)
	}
}
//创建节点对象
func Make(me int) *Raft{
	//创建Raft结构体指针
	rf:=&Raft{}
	rf.me = me
	rf.votedFor = -1
	rf.state = 0//0 follower ,1 candidate ,2 leader
	rf.timeout = 0
	rf.currentLeader = -1
	rf.setTerm(0)
	//创建通道对象
	rf.message = make(chan  bool)
	rf.heartbeat = make(chan bool)
	rf.heartbeatRe = make(chan bool)
	rf.eclectCh = make(chan bool)

	//每个节点都有选举权
	go rf.election()
	//每个节点都有心跳功能
	go rf.sendLeaderHeartBeat()

	return rf

}
//选举成功后,应该广播所有的节点,我成为了leader
func (rf *Raft)sendLeaderHeartBeat() {
	for {
		select{
		//
		case<-rf.heartbeat:
			rf.sendAppendEntriesImpl()
		}
	}
}
//向所有节点发送广播
func (rf *Raft)sendAppendEntriesImpl() {
	if rf.currentLeader == rf.me {
		var success_count = 0
		go func() {
			param:=Param{Msg:"leader heartbeat",
			//设置leader信息
			Arg:AppendEntriesArgs{rf.currentTerm,rf.me}}
			//
			rf.broadcast(param,"Raft.Heartbeat",func(ok bool){
					//回调函数
					rf.heartbeatRe<-ok
			})
		}()

		for i:=0;i<raftCount-1;i++{
			select{
			case ok:=<-rf.heartbeatRe:
				if ok {
					success_count++
					if success_count>=raftCount/2 {
						rf.mu.Lock()
						rf.lastMessageTime = milliseconds()
						fmt.Println("接收到了子节点们的返回信息")
						rf.mu.Unlock()
					}
				}
			}
		}
	}
}
func randomRange(min,max int64) int64 {
	//设置随机时间
	rand.Seed(time.Now().UnixNano())
	return rand.Int63n(max-min)+min
}
//获得当前时间(毫秒)
func milliseconds() int64 {
	return time.Now().UnixNano()/int64(time.Millisecond)
}
//节点开始选举
func(rf *Raft)election() {
	//控制跟随是否继续进行选举
	var result bool
	//每隔一段时间发一次心跳
	for {
		//延时时间
		timeout:= randomRange(1500,3000)
		//设置该节点最有一次处理消息的时间
		rf.lastMessageTime = milliseconds()

		select {
		//间隔时间为1500-3000ms的随机值
		case <-time.After(time.Duration(timeout)*time.Millisecond):
		}
        //初始化result值
		result=false
		for !result {
			//开始选择leader
			result = rf.election_one_round(&args)
		}
	}
}
//开始选举领导人
func (rf *Raft)election_one_round(args *AppendEntriesArgs) bool {
	//已经有了leader,并且不是自己,那么return
	if args.LeaderId > -1 && args.LeaderId != rf.me {
		fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)
		return true
	}
    //设置超时时间
	var timeout int64
	//判断投票节点的多少
	var done int
    //无用
	var triggerHeartbeat bool
	//设置超时时间
	timeout = 2000
	//设置last为当前时间
	last := milliseconds()
	//判断是否进行领领导人的确认
	success := false
	rf.mu.Lock()
	rf.becomeCandidate()
	rf.mu.Unlock()
	//打印当前的node为id
	fmt.Printf("候选人=%d 开始 选举 领导人\n", rf.me)
	//开始 选举 领导人
	for {
		//5.24 pm
		//printTime()
		fmt.Printf("候选人=%d 发送请求其他server投票\n", rf.me)
		//启动线程-候选人发送请求其他server投票
		go func() {

			rf.broadcast(Param{Msg:"求其他server投票"}, "Raft.ElectingLeader", func(ok bool) {
				//回调函数:
				//使在broadcast函数中传入fun(ok bool)中传入ok
				// 然后在返回到本函数将ok传入到选举通道
				rf.eclectCh <- ok
			})
		}()
        //初始化done的值
		done = 0
		triggerHeartbeat = false
		for i := 0; i < raftCount-1; i++ {
			//printTime()
			fmt.Printf("候选人=%d 等待其他服务器选择=%d\n", rf.me, i)
             //等待其他服务器的选择
			select {
			case ok := <-rf.eclectCh:
				if ok {
					done++
					//大于当前节点的一半设置success为ture
					success = done >= raftCount/2 || rf.currentLeader > -1
					//为true时确认当前候选人
					if success && !triggerHeartbeat {
						fmt.Println("okok", args)
						triggerHeartbeat = true
						rf.mu.Lock()
						rf.becomeLeader()
						args.Term = rf.currentTerm+1
						args.LeaderId = rf.me
						rf.mu.Unlock()
						//printTime()
						fmt.Printf("候选人=%d 成为领袖\n", rf.currentLeader)
						//设置开启向所有节点发送广播
						rf.heartbeat <- true
					}
				}
			}
			//printTime()
			fmt.Printf("候选人=%d 完成我的选择=%d\n", rf.me, i)
		}
		if (timeout < milliseconds()-last) || (done >= raftCount/2 || rf.currentLeader > -1) {
			break
		} else {
			select {
			case <-time.After(time.Duration(5000) * time.Millisecond):
			}
		}
	}
	//printTime()
	fmt.Printf("候选人=%d 都到票的状态=%t\n", rf.me, success)
	return success
}
//确认当前领导人
func (rf *Raft)becomeLeader() {
	rf.state = 2
	fmt.Println(rf.me,"成为了leader")
	rf.currentLeader = rf.me
}
//设置发送参数的数据类型
type Param struct{
	Msg string
	MsgId string
	Arg AppendEntriesArgs
}
//请求其他server为自己投票
func(rf *Raft)broadcast(msg Param,path string ,f func(ok bool)){
	//设置不要自己给自己广播
	for nodeID,port :=range nodeTable {
		if nodeID == rf.node.id {
			continue;
		}
		//链接远程rpc
		rp,err:=rpc.DialHTTP("tcp","127.0.0.1"+port)
		if err!=nil {
			f(false)
			continue
		}
		var bo = false
		err = rp.Call(path,msg,&bo)
		if err!= nil {
			f(false)
			continue
		}
		f(bo)
	}
}
//设置本机服务器node为candidate候选人状态
func (rf *Raft)becomeCandidate() {
	//如果当前状态为跟随者状态或没有领导人
	if rf.state ==0 || rf.currentLeader == -1 {
		rf.state = 1//候选人状态
		rf.votedFor = rf.me//为自己投一票
		rf.setTerm(rf.currentTerm+1)//领导人
		rf.currentLeader = -1//设置当前领导人为-1
	}
}
//设置当前领导期数
func(rf *Raft)setTerm(term int){
	rf.currentTerm = term
}
//声明节点对象类型Raft
type Raft struct {
	//当前节点信息
	node nodeInfo
	//线程所
	mu sync.Mutex
	//当前节点编号
	me int
	//当前领导期数
	currentTerm int
	//为哪个节点投票
	votedFor int
	//当前节点状态
	state int
	//超时时间
	timeout int
	//设置当前节点的领导
	currentLeader int
	//该节点最后一次处理数据的时间
	lastMessageTime int64
	//节点间发送消息的通道
	message chan bool
	//选举通道
	eclectCh chan bool
	//心跳信号通道
	heartbeat chan bool
	//子节点给主节点返回心跳信号
	heartbeatRe chan bool
}
//Rpc处理
func (rf *Raft)ElectingLeader(param Param,a *bool) error {
	//给leader投票
	*a = true
	rf.lastMessageTime = milliseconds()
	return  nil
}
func (rf *Raft)Heartbeat(param Param,a *bool)error {
	fmt.Println("\nrpc:heartbeat:",rf.me,param.Msg)
	if param.Arg.Term < rf.currentTerm {
		*a = false
	} else {
		args = param.Arg
		fmt.Printf("%d收到leader%d心跳\n", rf.currentLeader,args.LeaderId)
		*a = true
		rf.mu.Lock()
		rf.currentLeader = args.LeaderId
		rf.votedFor = args.LeaderId
		rf.state = 0
		rf.lastMessageTime = milliseconds()
		fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)
		rf.mu.Unlock()
	}
	return nil
}
//1,连接到leader节点
//从节点向leader节点做转发
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {	fmt.Println("\nrpc:forwardingMessage:",rf.me,param.Msg)
	rf.sendMessageToOtherNodes(param)
	*a = true
	rf.lastMessageTime = milliseconds()
	return nil
}
//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {
	fmt.Println("\nrpc:LogDataCopy:",r.me,param.Msg)
	bufferMessage[param.MsgId] = param.Msg
	*a = true
	return nil
}


golangraft Share Tweet +1