前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【共识算法】-“PBFT的实现”

【共识算法】-“PBFT的实现”

作者头像
帆说区块链
发布2022-04-26 19:45:54
5460
发布2022-04-26 19:45:54
举报
文章被收录于专栏:帆说区块链帆说区块链

原理图

实现功能:

数据从客户端输入,到接收到节点们的回复共分为5步

  1. 客户端向主节点发送请求信息
  2. 主节点N0接收到客户端请求后将请求数据里的主要信息提出,并向其余节点进行preprepare发送
  3. 从节点们接收到来自主节点的preprepare,首先利用主节点的公钥进行签名认证,其次将消息进行散列(消息摘要,以便缩小信息在网络中的传输大小)后,向其他节点广播prepare
  4. 节点接收到2f个prepare信息(包含自己),并全部签名验证通过,则可以进行到commit步骤,向全网其他节点广播commit
  5. 节点接收到2f+1个commit信息(包含自己),并全部签名验证通过,则可以把消息存入到本地,并向客户端返回reply消息

准备工具:cmd

(后续要改底层,建议安装go相关编译工具)

打开三个客户端,按如下操作:

(1)若已存链码,找到相关的路径,如图:

若未存链码输入如下:

代码语言:javascript
复制
 git clone https://github.com/corgi-kx/blockchain_consensus_algorithm.git

之后进行已存链码操作。

插:若出现以下问题:go: go.mod file not found in current directory or any parent directory; see 'go help modules'

输入:

代码语言:javascript
复制
go env -w GO111MODULE=off

(2)内部建立exe文件

代码语言:javascript
复制
go build -o pbft.exe

(3)启动三个客户端代表5个节点

代码语言:javascript
复制
pbft.exe client
代码语言:javascript
复制
pbft.exe client N0
代码语言:javascript
复制
pbft.exe client N1
代码语言:javascript
复制
pbft.exe client N2
代码语言:javascript
复制
pbft.exe client N3

如图:

(4)测试节点同步信息,几个阶段同步信息:

(5)关闭一个节点(代表故障、恶意节点),测试信息是否能继续同步

(6)关闭两个节点,测试信息同步:

关闭两个节点后,故障节点已经超出了pbft的允许数量,消息进行到Prepare阶段由于接收不到满足数量的信息,固系统不再进行commit确认,客户端也接收不到reply。

PBFT.go:

代码语言:javascript
复制
package main

import (
  "encoding/hex"
  "encoding/json"
  "fmt"
  "io/ioutil"
  "log"
  "strconv"
  "sync"
)

//本地消息池(模拟持久化层),只有确认提交成功后才会存入此池
var localMessagePool = []Message{}

type node struct {
  //节点ID
  nodeID string
  //节点监听地址
  addr string
  //RSA私钥
  rsaPrivKey []byte
  //RSA公钥
  rsaPubKey []byte
}

type pbft struct {
  //节点信息
  node node
  //每笔请求自增序号
  sequenceID int
  //锁
  lock sync.Mutex
  //临时消息池,消息摘要对应消息本体
  messagePool map[string]Request
  //存放收到的prepare数量(至少需要收到并确认2f个),根据摘要来对应
  prePareConfirmCount map[string]map[string]bool
  //存放收到的commit数量(至少需要收到并确认2f+1个),根据摘要来对应
  commitConfirmCount map[string]map[string]bool
  //该笔消息是否已进行Commit广播
  isCommitBordcast map[string]bool
  //该笔消息是否已对客户端进行Reply
  isReply map[string]bool
}

func NewPBFT(nodeID, addr string) *pbft {
  p := new(pbft)
  p.node.nodeID = nodeID
  p.node.addr = addr
  p.node.rsaPrivKey = p.getPivKey(nodeID) //从生成的私钥文件处读取
  p.node.rsaPubKey = p.getPubKey(nodeID)  //从生成的私钥文件处读取
  p.sequenceID = 0
  p.messagePool = make(map[string]Request)
  p.prePareConfirmCount = make(map[string]map[string]bool)
  p.commitConfirmCount = make(map[string]map[string]bool)
  p.isCommitBordcast = make(map[string]bool)
  p.isReply = make(map[string]bool)
  return p
}

func (p *pbft) handleRequest(data []byte) {
  //切割消息,根据消息命令调用不同的功能
  cmd, content := splitMessage(data)
  switch command(cmd) {
  case cRequest:
    p.handleClientRequest(content)
  case cPrePrepare:
    p.handlePrePrepare(content)
  case cPrepare:
    p.handlePrepare(content)
  case cCommit:
    p.handleCommit(content)
  }
}

//处理客户端发来的请求
func (p *pbft) handleClientRequest(content []byte) {
  fmt.Println("主节点已接收到客户端发来的request ...")
  //使用json解析出Request结构体
  r := new(Request)
  err := json.Unmarshal(content, r)
  if err != nil {
    log.Panic(err)
  }
  //添加信息序号
  p.sequenceIDAdd()
  //获取消息摘要
  digest := getDigest(*r)
  fmt.Println("已将request存入临时消息池")
  //存入临时消息池
  p.messagePool[digest] = *r
  //主节点对消息摘要进行签名
  digestByte, _ := hex.DecodeString(digest)
  signInfo := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
  //拼接成PrePrepare,准备发往follower节点
  pp := PrePrepare{*r, digest, p.sequenceID, signInfo}
  b, err := json.Marshal(pp)
  if err != nil {
    log.Panic(err)
  }
  fmt.Println("正在向其他节点进行进行PrePrepare广播 ...")
  //进行PrePrepare广播
  p.broadcast(cPrePrepare, b)
  fmt.Println("PrePrepare广播完成")
}

//处理预准备消息
func (p *pbft) handlePrePrepare(content []byte) {
  fmt.Println("本节点已接收到主节点发来的PrePrepare ...")
  //  //使用json解析出PrePrepare结构体
  pp := new(PrePrepare)
  err := json.Unmarshal(content, pp)
  if err != nil {
    log.Panic(err)
  }
  //获取主节点的公钥,用于数字签名验证
  primaryNodePubKey := p.getPubKey("N0")
  digestByte, _ := hex.DecodeString(pp.Digest)
  if digest := getDigest(pp.RequestMessage); digest != pp.Digest {
    fmt.Println("信息摘要对不上,拒绝进行prepare广播")
  } else if p.sequenceID+1 != pp.SequenceID {
    fmt.Println("消息序号对不上,拒绝进行prepare广播")
  } else if !p.RsaVerySignWithSha256(digestByte, pp.Sign, primaryNodePubKey) {
    fmt.Println("主节点签名验证失败!,拒绝进行prepare广播")
  } else {
    //序号赋值
    p.sequenceID = pp.SequenceID
    //将信息存入临时消息池
    fmt.Println("已将消息存入临时节点池")
    p.messagePool[pp.Digest] = pp.RequestMessage
    //节点使用私钥对其签名
    sign := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
    //拼接成Prepare
    pre := Prepare{pp.Digest, pp.SequenceID, p.node.nodeID, sign}
    bPre, err := json.Marshal(pre)
    if err != nil {
      log.Panic(err)
    }
    //进行准备阶段的广播
    fmt.Println("正在进行Prepare广播 ...")
    p.broadcast(cPrepare, bPre)
    fmt.Println("Prepare广播完成")
  }
}

//处理准备消息
func (p *pbft) handlePrepare(content []byte) {
  //使用json解析出Prepare结构体
  pre := new(Prepare)
  err := json.Unmarshal(content, pre)
  if err != nil {
    log.Panic(err)
  }
  fmt.Printf("本节点已接收到%s节点发来的Prepare ... \n", pre.NodeID)
  //获取消息源节点的公钥,用于数字签名验证
  MessageNodePubKey := p.getPubKey(pre.NodeID)
  digestByte, _ := hex.DecodeString(pre.Digest)
  if _, ok := p.messagePool[pre.Digest]; !ok {
    fmt.Println("当前临时消息池无此摘要,拒绝执行commit广播")
  } else if p.sequenceID != pre.SequenceID {
    fmt.Println("消息序号对不上,拒绝执行commit广播")
  } else if !p.RsaVerySignWithSha256(digestByte, pre.Sign, MessageNodePubKey) {
    fmt.Println("节点签名验证失败!,拒绝执行commit广播")
  } else {
    p.setPrePareConfirmMap(pre.Digest, pre.NodeID, true)
    count := 0
    for range p.prePareConfirmCount[pre.Digest] {
      count++
    }
    //因为主节点不会发送Prepare,所以不包含自己
    specifiedCount := 0
    if p.node.nodeID == "N0" {
      specifiedCount = nodeCount / 3 * 2
    } else {
      specifiedCount = (nodeCount / 3 * 2) - 1
    }
    //如果节点至少收到了2f个prepare的消息(包括自己),并且没有进行过commit广播,则进行commit广播
    p.lock.Lock()
    //获取消息源节点的公钥,用于数字签名验证
    if count >= specifiedCount && !p.isCommitBordcast[pre.Digest] {
      fmt.Println("本节点已收到至少2f个节点(包括本地节点)发来的Prepare信息 ...")
      //节点使用私钥对其签名
      sign := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
      c := Commit{pre.Digest, pre.SequenceID, p.node.nodeID, sign}
      bc, err := json.Marshal(c)
      if err != nil {
        log.Panic(err)
      }
      //进行提交信息的广播
      fmt.Println("正在进行commit广播")
      p.broadcast(cCommit, bc)
      p.isCommitBordcast[pre.Digest] = true
      fmt.Println("commit广播完成")
    }
    p.lock.Unlock()
  }
}

//处理提交确认消息
func (p *pbft) handleCommit(content []byte) {
  //使用json解析出Commit结构体
  c := new(Commit)
  err := json.Unmarshal(content, c)
  if err != nil {
    log.Panic(err)
  }
  fmt.Printf("本节点已接收到%s节点发来的Commit ... \n", c.NodeID)
  //获取消息源节点的公钥,用于数字签名验证
  MessageNodePubKey := p.getPubKey(c.NodeID)
  digestByte, _ := hex.DecodeString(c.Digest)
  if _, ok := p.prePareConfirmCount[c.Digest]; !ok {
    fmt.Println("当前prepare池无此摘要,拒绝将信息持久化到本地消息池")
  } else if p.sequenceID != c.SequenceID {
    fmt.Println("消息序号对不上,拒绝将信息持久化到本地消息池")
  } else if !p.RsaVerySignWithSha256(digestByte, c.Sign, MessageNodePubKey) {
    fmt.Println("节点签名验证失败!,拒绝将信息持久化到本地消息池")
  } else {
    p.setCommitConfirmMap(c.Digest, c.NodeID, true)
    count := 0
    for range p.commitConfirmCount[c.Digest] {
      count++
    }
    //如果节点至少收到了2f+1个commit消息(包括自己),并且节点没有回复过,并且已进行过commit广播,则提交信息至本地消息池,并reply成功标志至客户端!
    p.lock.Lock()
    if count >= nodeCount/3*2 && !p.isReply[c.Digest] && p.isCommitBordcast[c.Digest] {
      fmt.Println("本节点已收到至少2f + 1 个节点(包括本地节点)发来的Commit信息 ...")
      //将消息信息,提交到本地消息池中!
      localMessagePool = append(localMessagePool, p.messagePool[c.Digest].Message)
      info := p.node.nodeID + "节点已将msgid:" + strconv.Itoa(p.messagePool[c.Digest].ID) + "存入本地消息池中,消息内容为:" + p.messagePool[c.Digest].Content
      fmt.Println(info)
      fmt.Println("正在reply客户端 ...")
      tcpDial([]byte(info), p.messagePool[c.Digest].ClientAddr)
      p.isReply[c.Digest] = true
      fmt.Println("reply完毕")
    }
    p.lock.Unlock()
  }
}

//序号累加
func (p *pbft) sequenceIDAdd() {
  p.lock.Lock()
  p.sequenceID++
  p.lock.Unlock()
}

//向除自己外的其他节点进行广播
func (p *pbft) broadcast(cmd command, content []byte) {
  for i := range nodeTable {
    if i == p.node.nodeID {
      continue
    }
    message := jointMessage(cmd, content)
    go tcpDial(message, nodeTable[i])
  }
}

//为多重映射开辟赋值
func (p *pbft) setPrePareConfirmMap(val, val2 string, b bool) {
  if _, ok := p.prePareConfirmCount[val]; !ok {
    p.prePareConfirmCount[val] = make(map[string]bool)
  }
  p.prePareConfirmCount[val][val2] = b
}

//为多重映射开辟赋值
func (p *pbft) setCommitConfirmMap(val, val2 string, b bool) {
  if _, ok := p.commitConfirmCount[val]; !ok {
    p.commitConfirmCount[val] = make(map[string]bool)
  }
  p.commitConfirmCount[val][val2] = b
}

//传入节点编号, 获取对应的公钥
func (p *pbft) getPubKey(nodeID string) []byte {
  key, err := ioutil.ReadFile("Keys/" + nodeID + "/" + nodeID + "_RSA_PUB")
  if err != nil {
    log.Panic(err)
  }
  return key
}

//传入节点编号, 获取对应的私钥
func (p *pbft) getPivKey(nodeID string) []byte {
  key, err := ioutil.ReadFile("Keys/" + nodeID + "/" + nodeID + "_RSA_PIV")
  if err != nil {
    log.Panic(err)
  }
  return key
}

main.go

代码语言:javascript
复制
package main

import (
  "log"
  "os"
)

const nodeCount = 4

//客户端的监听地址
var clientAddr = "127.0.0.1:8888"

//节点池,主要用来存储监听地址
var nodeTable map[string]string

func main() {
  //为四个节点生成公私钥
  genRsaKeys()
  nodeTable = map[string]string{
    "N0": "127.0.0.1:8000",
    "N1": "127.0.0.1:8001",
    "N2": "127.0.0.1:8002",
    "N3": "127.0.0.1:8003",
  }
  if len(os.Args) != 2 {
    log.Panic("输入的参数有误!")
  }
  nodeID := os.Args[1]
  if nodeID == "client" {
    clientSendMessageAndListen() //启动客户端程序
  } else if addr, ok := nodeTable[nodeID]; ok {
    p := NewPBFT(nodeID, addr)
    go p.tcpListen() //启动节点
  } else {
    log.Fatal("无此节点编号!")
  }
  select {}
}

cmd.go

代码语言:javascript
复制
package main

import (
  "crypto/sha256"
  "encoding/hex"
  "encoding/json"
  "log"
)

//<REQUEST,o,t,c>
type Request struct {
  Message
  Timestamp int64
  //相当于clientID
  ClientAddr string
}

//<<PRE-PREPARE,v,n,d>,m>
type PrePrepare struct {
  RequestMessage Request
  Digest         string
  SequenceID     int
  Sign           []byte
}

//<PREPARE,v,n,d,i>
type Prepare struct {
  Digest     string
  SequenceID int
  NodeID     string
  Sign       []byte
}

//<COMMIT,v,n,D(m),i>
type Commit struct {
  Digest     string
  SequenceID int
  NodeID     string
  Sign       []byte
}

//<REPLY,v,t,c,i,r>
type Reply struct {
  MessageID int
  NodeID    string
  Result    bool
}

type Message struct {
  Content string
  ID      int
}

const prefixCMDLength = 12

type command string

const (
  cRequest    command = "request"
  cPrePrepare command = "preprepare"
  cPrepare    command = "prepare"
  cCommit     command = "commit"
)

//默认前十二位为命令名称
func jointMessage(cmd command, content []byte) []byte {
  b := make([]byte, prefixCMDLength)
  for i, v := range []byte(cmd) {
    b[i] = v
  }
  joint := make([]byte, 0)
  joint = append(b, content...)
  return joint
}

//默认前十二位为命令名称
func splitMessage(message []byte) (cmd string, content []byte) {
  cmdBytes := message[:prefixCMDLength]
  newCMDBytes := make([]byte, 0)
  for _, v := range cmdBytes {
    if v != byte(0) {
      newCMDBytes = append(newCMDBytes, v)
    }
  }
  cmd = string(newCMDBytes)
  content = message[prefixCMDLength:]
  return
}

//对消息详情进行摘要
func getDigest(request Request) string {
  b, err := json.Marshal(request)
  if err != nil {
    log.Panic(err)
  }
  hash := sha256.Sum256(b)
  //进行十六进制字符串编码
  return hex.EncodeToString(hash[:])
}

client.go

代码语言:javascript
复制
package main

import (
  "bufio"
  "crypto/rand"
  "encoding/json"
  "fmt"
  "log"
  "math/big"
  "os"
  "strings"
  "time"
)

func clientSendMessageAndListen() {
  //开启客户端的本地监听(主要用来接收节点的reply信息)
  go clientTcpListen()
  fmt.Printf("客户端开启监听,地址:%s\n", clientAddr)

  fmt.Println(" ---------------------------------------------------------------------------------")
  fmt.Println("|  已进入PBFT测试Demo客户端,请启动全部节点后再发送消息!:)  |")
  fmt.Println(" ---------------------------------------------------------------------------------")
  fmt.Println("请在下方输入要存入节点的信息:")
  //首先通过命令行获取用户输入
  stdReader := bufio.NewReader(os.Stdin)
  for {
    data, err := stdReader.ReadString('\n')
    if err != nil {
      fmt.Println("Error reading from stdin")
      panic(err)
    }
    r := new(Request)
    r.Timestamp = time.Now().UnixNano()
    r.ClientAddr = clientAddr
    r.Message.ID = getRandom()
    //消息内容就是用户的输入
    r.Message.Content = strings.TrimSpace(data)
    br, err := json.Marshal(r)
    if err != nil {
      log.Panic(err)
    }
    fmt.Println(string(br))
    content := jointMessage(cRequest, br)
    //默认N0为主节点,直接把请求信息发送至N0
    tcpDial(content, nodeTable["N0"])
  }
}

//返回一个十位数的随机数,作为msgid
func getRandom() int {
  x := big.NewInt(10000000000)
  for {
    result, err := rand.Int(rand.Reader, x)
    if err != nil {
      log.Panic(err)
    }
    if result.Int64() > 1000000000 {
      return int(result.Int64())
    }
  }
}

rsa.go

代码语言:javascript
复制
package main

import (
  "crypto"
  "crypto/rand"
  "crypto/rsa"
  "crypto/sha256"
  "crypto/x509"
  "encoding/pem"
  "errors"
  "fmt"
  "log"
  "os"
  "strconv"
)

//如果当前目录下不存在目录Keys,则创建目录,并为各个节点生成rsa公私钥
func genRsaKeys() {
  if !isExist("./Keys") {
    fmt.Println("检测到还未生成公私钥目录,正在生成公私钥 ...")
    err := os.Mkdir("Keys", 0644)
    if err != nil {
      log.Panic()
    }
    for i := 0; i <= 4; i++ {
      if !isExist("./Keys/N" + strconv.Itoa(i)) {
        err := os.Mkdir("./Keys/N"+strconv.Itoa(i), 0644)
        if err != nil {
          log.Panic()
        }
      }
      priv, pub := getKeyPair()
      privFileName := "Keys/N" + strconv.Itoa(i) + "/N" + strconv.Itoa(i) + "_RSA_PIV"
      file, err := os.OpenFile(privFileName, os.O_RDWR|os.O_CREATE, 0644)
      if err != nil {
        log.Panic(err)
      }
      defer file.Close()
      file.Write(priv)

      pubFileName := "Keys/N" + strconv.Itoa(i) + "/N" + strconv.Itoa(i) + "_RSA_PUB"
      file2, err := os.OpenFile(pubFileName, os.O_RDWR|os.O_CREATE, 0644)
      if err != nil {
        log.Panic(err)
      }
      defer file2.Close()
      file2.Write(pub)
    }
    fmt.Println("已为节点们生成RSA公私钥")
  }
}

//生成rsa公私钥
func getKeyPair() (prvkey, pubkey []byte) {
  // 生成私钥文件
  privateKey, err := rsa.GenerateKey(rand.Reader, 1024)
  if err != nil {
    panic(err)
  }
  derStream := x509.MarshalPKCS1PrivateKey(privateKey)
  block := &pem.Block{
    Type:  "RSA PRIVATE KEY",
    Bytes: derStream,
  }
  prvkey = pem.EncodeToMemory(block)
  publicKey := &privateKey.PublicKey
  derPkix, err := x509.MarshalPKIXPublicKey(publicKey)
  if err != nil {
    panic(err)
  }
  block = &pem.Block{
    Type:  "PUBLIC KEY",
    Bytes: derPkix,
  }
  pubkey = pem.EncodeToMemory(block)
  return
}

//判断文件或文件夹是否存在
func isExist(path string) bool {
  _, err := os.Stat(path)
  if err != nil {
    if os.IsExist(err) {
      return true
    }
    if os.IsNotExist(err) {
      return false
    }
    fmt.Println(err)
    return false
  }
  return true
}

//数字签名
func (p *pbft) RsaSignWithSha256(data []byte, keyBytes []byte) []byte {
  h := sha256.New()
  h.Write(data)
  hashed := h.Sum(nil)
  block, _ := pem.Decode(keyBytes)
  if block == nil {
    panic(errors.New("private key error"))
  }
  privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
  if err != nil {
    fmt.Println("ParsePKCS8PrivateKey err", err)
    panic(err)
  }

  signature, err := rsa.SignPKCS1v15(rand.Reader, privateKey, crypto.SHA256, hashed)
  if err != nil {
    fmt.Printf("Error from signing: %s\n", err)
    panic(err)
  }

  return signature
}

//签名验证
func (p *pbft) RsaVerySignWithSha256(data, signData, keyBytes []byte) bool {
  block, _ := pem.Decode(keyBytes)
  if block == nil {
    panic(errors.New("public key error"))
  }
  pubKey, err := x509.ParsePKIXPublicKey(block.Bytes)
  if err != nil {
    panic(err)
  }

  hashed := sha256.Sum256(data)
  err = rsa.VerifyPKCS1v15(pubKey.(*rsa.PublicKey), crypto.SHA256, hashed[:], signData)
  if err != nil {
    panic(err)
  }
  return true
}

tcp.go

代码语言:javascript
复制
package main

import (
  "fmt"
  "io/ioutil"
  "log"
  "net"
)

//客户端使用的tcp监听
func clientTcpListen() {
  listen, err := net.Listen("tcp", clientAddr)
  if err != nil {
    log.Panic(err)
  }
  defer listen.Close()

  for {
    conn, err := listen.Accept()
    if err != nil {
      log.Panic(err)
    }
    b, err := ioutil.ReadAll(conn)
    if err != nil {
      log.Panic(err)
    }
    fmt.Println(string(b))
  }

}

//节点使用的tcp监听
func (p *pbft) tcpListen() {
  listen, err := net.Listen("tcp", p.node.addr)
  if err != nil {
    log.Panic(err)
  }
  fmt.Printf("节点开启监听,地址:%s\n", p.node.addr)
  defer listen.Close()

  for {
    conn, err := listen.Accept()
    if err != nil {
      log.Panic(err)
    }
    b, err := ioutil.ReadAll(conn)
    if err != nil {
      log.Panic(err)
    }
    p.handleRequest(b)
  }

}

//使用tcp发送消息
func tcpDial(context []byte, addr string) {
  conn, err := net.Dial("tcp", addr)
  if err != nil {
    log.Println("connect error", err)
    return
  }

  _, err = conn.Write(context)
  if err != nil {
    log.Fatal(err)
  }
  conn.Close()
}

参考(源码下载链接):https://github.com/corgi-kx/blockchain_consensus_algorithm

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 帆说区块链 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档