前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式学习十四:协调任务

分布式学习十四:协调任务

作者头像
仙士可
发布2022-04-06 18:43:13
1930
发布2022-04-06 18:43:13
举报
文章被收录于专栏:仙士可博客

分布式协调/通知服务

mysql备份数据时,我们会通过读取binlog方式备份,但是如果当从服务器宕机时,则备份就会停止,我们可以通过zookeeper实现分布式协调备份

主服务进行备份提交,其他服务监听主服务器状态,如果宕机失去联系,则替代主服务进行工作.

实现原理

在zookeeper节点结构如下:

代码语言:javascript
复制
test
└── customBackUp
    └── tasks  任务列表
        └── task01  任务
            ├── instance  服务实例列表
            │   └── server_1_00000001  有序/临时 节点
            ├── lastCommit 最后提交id
            └── status 当前状态

server进程:

1:判断tasks是否存在 task01 任务

2:如果不存在则初始化 task01 任务的节点列表

monitor进程:

1:监听tasks所有任务下的 status 节点,进行监控报警

task进程

1:多台服务初始化之后,先获取指定任务列表的节点数据(task01)

2:在instance中注册自己的有序/临时节点

3:注册完成之后,判断instance自己的节点是否为最小的,如果是,则节点状态为 "主服务"

4:如果不是最小的,则节点状态为:"从服务"

5:主服务进行处理数据,将status状态更新为"running",并将处理的进度id保留到 lastCommit 中

6:从服务进行监听instance节点列表,当主服务断线后,临时节点将会被删除,从而触发监听

7:从服务将status改为"stop"状态,重新进行判断节点是否最小

8:重复3-7 

完整架构图解

仙士可博客
仙士可博客

简单实现代码

代码语言:javascript
复制
package main

import (
   "errors"
   "fmt"
   "github.com/go-zookeeper/zk"
   "os"
   "strconv"
   "strings"
   "time"
)

var serverId int = 1

type instanceStatus int

var (
   StatusRunning instanceStatus = 1
   StatusStandby instanceStatus = -1
)

func main() {
   serverId, _ = strconv.Atoi(os.Args[1])
   conn, _, err := zk.Connect([]string{"127.0.0.1:20005"}, time.Second*10)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("serverId:%v start.",serverId))
   //task path
   path := "/customBackUp/tasks/task01"

   //check path exits
   exists, _, err := conn.Exists(path)
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("check task node exist."))

   //if path  not exits,create path
   if exists == false {
      err := createPath(conn, path)
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/instance/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/lastCommit/")
      if err != nil {
         panic(err)
      }
      err = createPath(conn, path+"/status/")
      if err != nil {
         panic(err)
      }
      logWithTime(fmt.Sprintf("create task node"))
   }

   //register task(create a node sequence and ephemeral path)
   registerInstanceNodePath := path + "/instance/" + "server_" + strconv.Itoa(serverId)
   createPath, err := conn.Create(registerInstanceNodePath, []byte{}, zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("register instance node: %v",createPath))

start_worker:
   status, err := checkInstanceStatus(conn, createPath, path+"/instance")
   if err != nil {
      panic(err)
   }
   logWithTime(fmt.Sprintf("current server status: %v",status))

   if status == StatusRunning {
      //status need to update to running
      _, err := conn.Set(path+"/status", []byte("1"), -1)
      if err != nil {
          panic(err)
      }
      logWithTime(fmt.Sprintf("current server handle task..."))
      handleTask(conn, path+"/lastCommit")

   } else if status == StatusStandby {

      logWithTime(fmt.Sprintf("current server watch node..."))
      watchTaskNode(conn, path+"/instance")
      //status need to update to stop
      _, err := conn.Set(path+"/status", []byte("0"), -1)
      if err != nil {
         panic(err)
      }
      goto start_worker

   }
}

func handleTask(conn *zk.Conn, commitPath string) {
   //get commitId
   bytes, _, err := conn.Get(commitPath)
   if err != nil {
      panic(err)
   }
   str := string(bytes)
   id, _ := strconv.Atoi(str)
   for {
      id++
      str := strconv.Itoa(id)
      _, err := conn.Set(commitPath, []byte(str), -1)
      if err != nil {
         panic(err)
      }
      fmt.Printf("[%v]serverId(%v),commitId:%v \n", time.Now().Format("2006-01-02 15:04:05"), serverId, id)
      time.Sleep(time.Second * 5)
   }
}

func watchTaskNode(conn *zk.Conn, watchPath string) {
   _, _, events, err := conn.ChildrenW(watchPath)
   if err != nil {
      panic(err)
   }
   fmt.Printf("event change: %v", <-events)
   return
}

func checkInstanceStatus(conn *zk.Conn, nodeName string, path string) (status instanceStatus, err error) {
   currentId := getInstanceNodeId(nodeName)
   minId := currentId
   //get all child nodes of task Instance node
   nodeArr, _, err := conn.Children(path)
   if err != nil {
      return 0, err
   }
   for _, v := range nodeArr {
      nodeId := getInstanceNodeId(v)
      if nodeId <= minId {
         minId = nodeId
      }
   }
   if minId == currentId {
      return StatusRunning, nil
   } else {
      return StatusStandby, nil
   }
}

func getInstanceNodeId(nodeName string) int {
   //nodeanme='/customBackUp/tasks/task01/instance/server_10000000001'
   //only need to intercept the last 8 digits
   id := nodeName[len(nodeName)-8:]
   intId, _ := strconv.Atoi(id)
   return intId
}

func createPath(conn *zk.Conn, path string) (err error) {
   strArr := strings.Split(path, "/")
   var node string
   for _, str := range strArr {
      if str == "" {
         continue
      }
      node = node + "/" + str
      exists, _, err := conn.Exists(node)
      if err != nil {
         return errors.New(err.Error())
      }
      if exists {
         continue
      } else {
         _, err = conn.Create(node, []byte{}, 0, zk.WorldACL(zk.PermAll))
         if err != nil {
            return errors.New(err.Error())
         }
      }
   }
   return err
}

func logWithTime(log string) {
   fmt.Printf("%v %v\n", time.Now().Format("2006-01-02 15:04:05"), log)
}

运行工作图:

仙士可博客
仙士可博客

注意:此代码部分逻辑缺失,例如:

1:发布任务的task进程没有体现

2:监控任务的monitor没有体现

本文为仙士可原创文章,转载无需和我联系,但请注明来自仙士可博客www.php20.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/04/02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式协调/通知服务
  • 实现原理
    • 完整架构图解
    • 简单实现代码
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档