原理图
实现功能:
数据从客户端输入,到接收到节点们的回复共分为5步
准备工具:cmd
(后续要改底层,建议安装go相关编译工具)
打开三个客户端,按如下操作:
(1)若已存链码,找到相关的路径,如图:
若未存链码输入如下:
git clone https://github.com/corgi-kx/blockchain_consensus_algorithm.git
之后进行已存链码操作。
插:若出现以下问题:go: go.mod file not found in current directory or any parent directory; see 'go help modules'
输入:
go env -w GO111MODULE=off
(2)内部建立exe文件
go build -o pbft.exe
(3)启动三个客户端代表5个节点
pbft.exe client
pbft.exe client N0
pbft.exe client N1
pbft.exe client N2
pbft.exe client N3
如图:
(4)测试节点同步信息,几个阶段同步信息:
(5)关闭一个节点(代表故障、恶意节点),测试信息是否能继续同步
(6)关闭两个节点,测试信息同步:
关闭两个节点后,故障节点已经超出了pbft的允许数量,消息进行到Prepare阶段由于接收不到满足数量的信息,固系统不再进行commit确认,客户端也接收不到reply。
PBFT.go:
package main
import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"strconv"
"sync"
)
//本地消息池(模拟持久化层),只有确认提交成功后才会存入此池
var localMessagePool = []Message{}
type node struct {
//节点ID
nodeID string
//节点监听地址
addr string
//RSA私钥
rsaPrivKey []byte
//RSA公钥
rsaPubKey []byte
}
type pbft struct {
//节点信息
node node
//每笔请求自增序号
sequenceID int
//锁
lock sync.Mutex
//临时消息池,消息摘要对应消息本体
messagePool map[string]Request
//存放收到的prepare数量(至少需要收到并确认2f个),根据摘要来对应
prePareConfirmCount map[string]map[string]bool
//存放收到的commit数量(至少需要收到并确认2f+1个),根据摘要来对应
commitConfirmCount map[string]map[string]bool
//该笔消息是否已进行Commit广播
isCommitBordcast map[string]bool
//该笔消息是否已对客户端进行Reply
isReply map[string]bool
}
func NewPBFT(nodeID, addr string) *pbft {
p := new(pbft)
p.node.nodeID = nodeID
p.node.addr = addr
p.node.rsaPrivKey = p.getPivKey(nodeID) //从生成的私钥文件处读取
p.node.rsaPubKey = p.getPubKey(nodeID) //从生成的私钥文件处读取
p.sequenceID = 0
p.messagePool = make(map[string]Request)
p.prePareConfirmCount = make(map[string]map[string]bool)
p.commitConfirmCount = make(map[string]map[string]bool)
p.isCommitBordcast = make(map[string]bool)
p.isReply = make(map[string]bool)
return p
}
func (p *pbft) handleRequest(data []byte) {
//切割消息,根据消息命令调用不同的功能
cmd, content := splitMessage(data)
switch command(cmd) {
case cRequest:
p.handleClientRequest(content)
case cPrePrepare:
p.handlePrePrepare(content)
case cPrepare:
p.handlePrepare(content)
case cCommit:
p.handleCommit(content)
}
}
//处理客户端发来的请求
func (p *pbft) handleClientRequest(content []byte) {
fmt.Println("主节点已接收到客户端发来的request ...")
//使用json解析出Request结构体
r := new(Request)
err := json.Unmarshal(content, r)
if err != nil {
log.Panic(err)
}
//添加信息序号
p.sequenceIDAdd()
//获取消息摘要
digest := getDigest(*r)
fmt.Println("已将request存入临时消息池")
//存入临时消息池
p.messagePool[digest] = *r
//主节点对消息摘要进行签名
digestByte, _ := hex.DecodeString(digest)
signInfo := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
//拼接成PrePrepare,准备发往follower节点
pp := PrePrepare{*r, digest, p.sequenceID, signInfo}
b, err := json.Marshal(pp)
if err != nil {
log.Panic(err)
}
fmt.Println("正在向其他节点进行进行PrePrepare广播 ...")
//进行PrePrepare广播
p.broadcast(cPrePrepare, b)
fmt.Println("PrePrepare广播完成")
}
//处理预准备消息
func (p *pbft) handlePrePrepare(content []byte) {
fmt.Println("本节点已接收到主节点发来的PrePrepare ...")
// //使用json解析出PrePrepare结构体
pp := new(PrePrepare)
err := json.Unmarshal(content, pp)
if err != nil {
log.Panic(err)
}
//获取主节点的公钥,用于数字签名验证
primaryNodePubKey := p.getPubKey("N0")
digestByte, _ := hex.DecodeString(pp.Digest)
if digest := getDigest(pp.RequestMessage); digest != pp.Digest {
fmt.Println("信息摘要对不上,拒绝进行prepare广播")
} else if p.sequenceID+1 != pp.SequenceID {
fmt.Println("消息序号对不上,拒绝进行prepare广播")
} else if !p.RsaVerySignWithSha256(digestByte, pp.Sign, primaryNodePubKey) {
fmt.Println("主节点签名验证失败!,拒绝进行prepare广播")
} else {
//序号赋值
p.sequenceID = pp.SequenceID
//将信息存入临时消息池
fmt.Println("已将消息存入临时节点池")
p.messagePool[pp.Digest] = pp.RequestMessage
//节点使用私钥对其签名
sign := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
//拼接成Prepare
pre := Prepare{pp.Digest, pp.SequenceID, p.node.nodeID, sign}
bPre, err := json.Marshal(pre)
if err != nil {
log.Panic(err)
}
//进行准备阶段的广播
fmt.Println("正在进行Prepare广播 ...")
p.broadcast(cPrepare, bPre)
fmt.Println("Prepare广播完成")
}
}
//处理准备消息
func (p *pbft) handlePrepare(content []byte) {
//使用json解析出Prepare结构体
pre := new(Prepare)
err := json.Unmarshal(content, pre)
if err != nil {
log.Panic(err)
}
fmt.Printf("本节点已接收到%s节点发来的Prepare ... \n", pre.NodeID)
//获取消息源节点的公钥,用于数字签名验证
MessageNodePubKey := p.getPubKey(pre.NodeID)
digestByte, _ := hex.DecodeString(pre.Digest)
if _, ok := p.messagePool[pre.Digest]; !ok {
fmt.Println("当前临时消息池无此摘要,拒绝执行commit广播")
} else if p.sequenceID != pre.SequenceID {
fmt.Println("消息序号对不上,拒绝执行commit广播")
} else if !p.RsaVerySignWithSha256(digestByte, pre.Sign, MessageNodePubKey) {
fmt.Println("节点签名验证失败!,拒绝执行commit广播")
} else {
p.setPrePareConfirmMap(pre.Digest, pre.NodeID, true)
count := 0
for range p.prePareConfirmCount[pre.Digest] {
count++
}
//因为主节点不会发送Prepare,所以不包含自己
specifiedCount := 0
if p.node.nodeID == "N0" {
specifiedCount = nodeCount / 3 * 2
} else {
specifiedCount = (nodeCount / 3 * 2) - 1
}
//如果节点至少收到了2f个prepare的消息(包括自己),并且没有进行过commit广播,则进行commit广播
p.lock.Lock()
//获取消息源节点的公钥,用于数字签名验证
if count >= specifiedCount && !p.isCommitBordcast[pre.Digest] {
fmt.Println("本节点已收到至少2f个节点(包括本地节点)发来的Prepare信息 ...")
//节点使用私钥对其签名
sign := p.RsaSignWithSha256(digestByte, p.node.rsaPrivKey)
c := Commit{pre.Digest, pre.SequenceID, p.node.nodeID, sign}
bc, err := json.Marshal(c)
if err != nil {
log.Panic(err)
}
//进行提交信息的广播
fmt.Println("正在进行commit广播")
p.broadcast(cCommit, bc)
p.isCommitBordcast[pre.Digest] = true
fmt.Println("commit广播完成")
}
p.lock.Unlock()
}
}
//处理提交确认消息
func (p *pbft) handleCommit(content []byte) {
//使用json解析出Commit结构体
c := new(Commit)
err := json.Unmarshal(content, c)
if err != nil {
log.Panic(err)
}
fmt.Printf("本节点已接收到%s节点发来的Commit ... \n", c.NodeID)
//获取消息源节点的公钥,用于数字签名验证
MessageNodePubKey := p.getPubKey(c.NodeID)
digestByte, _ := hex.DecodeString(c.Digest)
if _, ok := p.prePareConfirmCount[c.Digest]; !ok {
fmt.Println("当前prepare池无此摘要,拒绝将信息持久化到本地消息池")
} else if p.sequenceID != c.SequenceID {
fmt.Println("消息序号对不上,拒绝将信息持久化到本地消息池")
} else if !p.RsaVerySignWithSha256(digestByte, c.Sign, MessageNodePubKey) {
fmt.Println("节点签名验证失败!,拒绝将信息持久化到本地消息池")
} else {
p.setCommitConfirmMap(c.Digest, c.NodeID, true)
count := 0
for range p.commitConfirmCount[c.Digest] {
count++
}
//如果节点至少收到了2f+1个commit消息(包括自己),并且节点没有回复过,并且已进行过commit广播,则提交信息至本地消息池,并reply成功标志至客户端!
p.lock.Lock()
if count >= nodeCount/3*2 && !p.isReply[c.Digest] && p.isCommitBordcast[c.Digest] {
fmt.Println("本节点已收到至少2f + 1 个节点(包括本地节点)发来的Commit信息 ...")
//将消息信息,提交到本地消息池中!
localMessagePool = append(localMessagePool, p.messagePool[c.Digest].Message)
info := p.node.nodeID + "节点已将msgid:" + strconv.Itoa(p.messagePool[c.Digest].ID) + "存入本地消息池中,消息内容为:" + p.messagePool[c.Digest].Content
fmt.Println(info)
fmt.Println("正在reply客户端 ...")
tcpDial([]byte(info), p.messagePool[c.Digest].ClientAddr)
p.isReply[c.Digest] = true
fmt.Println("reply完毕")
}
p.lock.Unlock()
}
}
//序号累加
func (p *pbft) sequenceIDAdd() {
p.lock.Lock()
p.sequenceID++
p.lock.Unlock()
}
//向除自己外的其他节点进行广播
func (p *pbft) broadcast(cmd command, content []byte) {
for i := range nodeTable {
if i == p.node.nodeID {
continue
}
message := jointMessage(cmd, content)
go tcpDial(message, nodeTable[i])
}
}
//为多重映射开辟赋值
func (p *pbft) setPrePareConfirmMap(val, val2 string, b bool) {
if _, ok := p.prePareConfirmCount[val]; !ok {
p.prePareConfirmCount[val] = make(map[string]bool)
}
p.prePareConfirmCount[val][val2] = b
}
//为多重映射开辟赋值
func (p *pbft) setCommitConfirmMap(val, val2 string, b bool) {
if _, ok := p.commitConfirmCount[val]; !ok {
p.commitConfirmCount[val] = make(map[string]bool)
}
p.commitConfirmCount[val][val2] = b
}
//传入节点编号, 获取对应的公钥
func (p *pbft) getPubKey(nodeID string) []byte {
key, err := ioutil.ReadFile("Keys/" + nodeID + "/" + nodeID + "_RSA_PUB")
if err != nil {
log.Panic(err)
}
return key
}
//传入节点编号, 获取对应的私钥
func (p *pbft) getPivKey(nodeID string) []byte {
key, err := ioutil.ReadFile("Keys/" + nodeID + "/" + nodeID + "_RSA_PIV")
if err != nil {
log.Panic(err)
}
return key
}
main.go
package main
import (
"log"
"os"
)
const nodeCount = 4
//客户端的监听地址
var clientAddr = "127.0.0.1:8888"
//节点池,主要用来存储监听地址
var nodeTable map[string]string
func main() {
//为四个节点生成公私钥
genRsaKeys()
nodeTable = map[string]string{
"N0": "127.0.0.1:8000",
"N1": "127.0.0.1:8001",
"N2": "127.0.0.1:8002",
"N3": "127.0.0.1:8003",
}
if len(os.Args) != 2 {
log.Panic("输入的参数有误!")
}
nodeID := os.Args[1]
if nodeID == "client" {
clientSendMessageAndListen() //启动客户端程序
} else if addr, ok := nodeTable[nodeID]; ok {
p := NewPBFT(nodeID, addr)
go p.tcpListen() //启动节点
} else {
log.Fatal("无此节点编号!")
}
select {}
}
cmd.go
package main
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"log"
)
//<REQUEST,o,t,c>
type Request struct {
Message
Timestamp int64
//相当于clientID
ClientAddr string
}
//<<PRE-PREPARE,v,n,d>,m>
type PrePrepare struct {
RequestMessage Request
Digest string
SequenceID int
Sign []byte
}
//<PREPARE,v,n,d,i>
type Prepare struct {
Digest string
SequenceID int
NodeID string
Sign []byte
}
//<COMMIT,v,n,D(m),i>
type Commit struct {
Digest string
SequenceID int
NodeID string
Sign []byte
}
//<REPLY,v,t,c,i,r>
type Reply struct {
MessageID int
NodeID string
Result bool
}
type Message struct {
Content string
ID int
}
const prefixCMDLength = 12
type command string
const (
cRequest command = "request"
cPrePrepare command = "preprepare"
cPrepare command = "prepare"
cCommit command = "commit"
)
//默认前十二位为命令名称
func jointMessage(cmd command, content []byte) []byte {
b := make([]byte, prefixCMDLength)
for i, v := range []byte(cmd) {
b[i] = v
}
joint := make([]byte, 0)
joint = append(b, content...)
return joint
}
//默认前十二位为命令名称
func splitMessage(message []byte) (cmd string, content []byte) {
cmdBytes := message[:prefixCMDLength]
newCMDBytes := make([]byte, 0)
for _, v := range cmdBytes {
if v != byte(0) {
newCMDBytes = append(newCMDBytes, v)
}
}
cmd = string(newCMDBytes)
content = message[prefixCMDLength:]
return
}
//对消息详情进行摘要
func getDigest(request Request) string {
b, err := json.Marshal(request)
if err != nil {
log.Panic(err)
}
hash := sha256.Sum256(b)
//进行十六进制字符串编码
return hex.EncodeToString(hash[:])
}
client.go
package main
import (
"bufio"
"crypto/rand"
"encoding/json"
"fmt"
"log"
"math/big"
"os"
"strings"
"time"
)
func clientSendMessageAndListen() {
//开启客户端的本地监听(主要用来接收节点的reply信息)
go clientTcpListen()
fmt.Printf("客户端开启监听,地址:%s\n", clientAddr)
fmt.Println(" ---------------------------------------------------------------------------------")
fmt.Println("| 已进入PBFT测试Demo客户端,请启动全部节点后再发送消息!:) |")
fmt.Println(" ---------------------------------------------------------------------------------")
fmt.Println("请在下方输入要存入节点的信息:")
//首先通过命令行获取用户输入
stdReader := bufio.NewReader(os.Stdin)
for {
data, err := stdReader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from stdin")
panic(err)
}
r := new(Request)
r.Timestamp = time.Now().UnixNano()
r.ClientAddr = clientAddr
r.Message.ID = getRandom()
//消息内容就是用户的输入
r.Message.Content = strings.TrimSpace(data)
br, err := json.Marshal(r)
if err != nil {
log.Panic(err)
}
fmt.Println(string(br))
content := jointMessage(cRequest, br)
//默认N0为主节点,直接把请求信息发送至N0
tcpDial(content, nodeTable["N0"])
}
}
//返回一个十位数的随机数,作为msgid
func getRandom() int {
x := big.NewInt(10000000000)
for {
result, err := rand.Int(rand.Reader, x)
if err != nil {
log.Panic(err)
}
if result.Int64() > 1000000000 {
return int(result.Int64())
}
}
}
rsa.go
package main
import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"log"
"os"
"strconv"
)
//如果当前目录下不存在目录Keys,则创建目录,并为各个节点生成rsa公私钥
func genRsaKeys() {
if !isExist("./Keys") {
fmt.Println("检测到还未生成公私钥目录,正在生成公私钥 ...")
err := os.Mkdir("Keys", 0644)
if err != nil {
log.Panic()
}
for i := 0; i <= 4; i++ {
if !isExist("./Keys/N" + strconv.Itoa(i)) {
err := os.Mkdir("./Keys/N"+strconv.Itoa(i), 0644)
if err != nil {
log.Panic()
}
}
priv, pub := getKeyPair()
privFileName := "Keys/N" + strconv.Itoa(i) + "/N" + strconv.Itoa(i) + "_RSA_PIV"
file, err := os.OpenFile(privFileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
log.Panic(err)
}
defer file.Close()
file.Write(priv)
pubFileName := "Keys/N" + strconv.Itoa(i) + "/N" + strconv.Itoa(i) + "_RSA_PUB"
file2, err := os.OpenFile(pubFileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
log.Panic(err)
}
defer file2.Close()
file2.Write(pub)
}
fmt.Println("已为节点们生成RSA公私钥")
}
}
//生成rsa公私钥
func getKeyPair() (prvkey, pubkey []byte) {
// 生成私钥文件
privateKey, err := rsa.GenerateKey(rand.Reader, 1024)
if err != nil {
panic(err)
}
derStream := x509.MarshalPKCS1PrivateKey(privateKey)
block := &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: derStream,
}
prvkey = pem.EncodeToMemory(block)
publicKey := &privateKey.PublicKey
derPkix, err := x509.MarshalPKIXPublicKey(publicKey)
if err != nil {
panic(err)
}
block = &pem.Block{
Type: "PUBLIC KEY",
Bytes: derPkix,
}
pubkey = pem.EncodeToMemory(block)
return
}
//判断文件或文件夹是否存在
func isExist(path string) bool {
_, err := os.Stat(path)
if err != nil {
if os.IsExist(err) {
return true
}
if os.IsNotExist(err) {
return false
}
fmt.Println(err)
return false
}
return true
}
//数字签名
func (p *pbft) RsaSignWithSha256(data []byte, keyBytes []byte) []byte {
h := sha256.New()
h.Write(data)
hashed := h.Sum(nil)
block, _ := pem.Decode(keyBytes)
if block == nil {
panic(errors.New("private key error"))
}
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
fmt.Println("ParsePKCS8PrivateKey err", err)
panic(err)
}
signature, err := rsa.SignPKCS1v15(rand.Reader, privateKey, crypto.SHA256, hashed)
if err != nil {
fmt.Printf("Error from signing: %s\n", err)
panic(err)
}
return signature
}
//签名验证
func (p *pbft) RsaVerySignWithSha256(data, signData, keyBytes []byte) bool {
block, _ := pem.Decode(keyBytes)
if block == nil {
panic(errors.New("public key error"))
}
pubKey, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
panic(err)
}
hashed := sha256.Sum256(data)
err = rsa.VerifyPKCS1v15(pubKey.(*rsa.PublicKey), crypto.SHA256, hashed[:], signData)
if err != nil {
panic(err)
}
return true
}
tcp.go
package main
import (
"fmt"
"io/ioutil"
"log"
"net"
)
//客户端使用的tcp监听
func clientTcpListen() {
listen, err := net.Listen("tcp", clientAddr)
if err != nil {
log.Panic(err)
}
defer listen.Close()
for {
conn, err := listen.Accept()
if err != nil {
log.Panic(err)
}
b, err := ioutil.ReadAll(conn)
if err != nil {
log.Panic(err)
}
fmt.Println(string(b))
}
}
//节点使用的tcp监听
func (p *pbft) tcpListen() {
listen, err := net.Listen("tcp", p.node.addr)
if err != nil {
log.Panic(err)
}
fmt.Printf("节点开启监听,地址:%s\n", p.node.addr)
defer listen.Close()
for {
conn, err := listen.Accept()
if err != nil {
log.Panic(err)
}
b, err := ioutil.ReadAll(conn)
if err != nil {
log.Panic(err)
}
p.handleRequest(b)
}
}
//使用tcp发送消息
func tcpDial(context []byte, addr string) {
conn, err := net.Dial("tcp", addr)
if err != nil {
log.Println("connect error", err)
return
}
_, err = conn.Write(context)
if err != nil {
log.Fatal(err)
}
conn.Close()
}
参考(源码下载链接):https://github.com/corgi-kx/blockchain_consensus_algorithm