使用beanstalkd实现定制化持续集成过程中pipeline

持续集成是一种项目管理和流程模型,依赖于团队中各个角色的配合。各个角色的意识和配合不是一朝一夕能练就的,我们的工作只是提供一种方案和能力,这就是持续集成能力的服务化。而在做持续集成能力服务化的过程中,最核心的一点就是,如何实现一个可定制化的任务流,即所谓的pipeline。

在传统的持续集成工具实现了pipeline功能,以供串联上下游job,并把多个job联系成一次完整的构建,例如jenkins的pipeline插件。

但是各种持续集成工具,或多或少都有自己的短板,总结起来如下:

1、配置并不方便,上下游job配置并不能点击即可用;

2、上下游job之间参数的传递无法很方便的实现;

3、一次完整构建链路如何trace并收集各个job的执行情况;

4、根据3实现问题的快速定位。

我们先说一下,beanstalkd实现可定制化pipeline的方法吧。

一、先通过概念让大家了解Beanstalkd的特性和工作场景。

Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。

尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。

管道 (tube):

管道类似于消息主题 (topic), 在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。

任务 (job):

READY- 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;

DELAYED- 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;

RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;

BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;

DELETED- 消息被彻底删除。Beanstalkd 不再维持这些消息。

Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不同,任务有一系列状态:

 任务优先级 (priority):

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).

延时任务 (delay):

有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 Java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。

任务超时重发 (time-to-run):

Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

任务预留 (buried):

如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

Beanstalkd 协议:

       Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组:    

       生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:  

       生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).    

       消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

       消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

       维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>

用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。

被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。

二、python对beanstalkd的封装

import beanstalkc

class BstkManager(object):
    
    __doc__ = 'beanstalk封装类,这里只封装了用到的方法'
    
    def __init__(self, config):
        self.config = config
        self.conn = self.__createConnection(self.config)    
    
    
    def __createConnection(self, config):
        try:
            conn = beanstalkc.Connection(host=config.get('host'), port=int(config.get('port')))
            return conn
        except Exception, ex:
            raise Exception('beanstalkd connection can not be established!', ex)
    
         
    def getConnection(self):
        return self.conn
    

    def put(self, message, tube=None):
        try:
            tube = self.config.get('topic') if tube == None else tube
            self.conn.use(tube)
            self.conn.ignore('default')        
            self.conn.put(message)
        except Exception, ex:
            raise Exception('put message to %s failure!' % tube, ex)
    
        
    def reserve(self, tube=None, timeout=None):
        try:
            tube = self.config.get('topic') if tube == None else tube
            self.conn.ignore('default')        
            self.conn.watch(tube)
            msg = self.conn.reserve(timeout=timeout)
            message_body = msg.body
            msg.delete()
            return message_body
        except Exception, ex:
            raise Exception('reserve message from %s failure!' % tube, ex)
    
    
    def clean(self, tube=None):
        try:
            while True:
                tube = self.config.get('topic') if tube == None else tube
                msg = self.conn.reserve(tube, timeout=1)
                # 如果超时 return
                if msg == None:
                    return
                msg.delete()
        except Exception, ex:
            raise Exception('clean tube %s failure!' % tube, ex)
            

在持续集成中,使用tube或者说topic区分不同的业务线,不同的业务人员通过向系统注册管道topic。这样做的收益是:

1、所有的业务在环境和流程上被隔离,互补干扰。

2、每个topic是一个独立的pipeline,每个pipeline之间是串行,但是topic之间是并行。这样保证一个业务线上的job是串行执行的,独占测试环境,而不用担心测试环境占用冲突。

import logging
import os
import sys
import traceback

import time
from django.conf import settings
from django.core.management.base import BaseCommand
from beanstalkd_client import connect_beanstalkd, BeanstalkError
from beanstalkc import SocketError


logger = logging.getLogger('beanstalkd_client')
logger.addHandler(logging.StreamHandler())

class Command(BaseCommand):
    help = "Start a Beanstalk worker serving all registered Beanstalk jobs"
    __doc__ = help
    
    def add_arguments(self, parser):

        parser.add_argument(
            '-w',
            '--workers',
            action='store',
            dest='worker_count',
            default='1',
            help='Number of workers to spawn.',
        )
        
        parser.add_argument(
            '-l',
            '--log-level',
            action='store',
            dest='log_level',
            default='info',
            help='Log level of worker process (one of '
                    '"debug", "info", "warning", "error"',
        )
    
    children = [] # list of worker processes
    jobs = {}

    def handle(self, *args, **options):
        # set log level
        logger.setLevel(getattr(logging, options['log_level'].upper()))

        # find beanstalk job modules
        bs_modules = []
        for app in settings.INSTALLED_APPS:
            try:
                modname = "%s.beanstalk_jobs" % app
                __import__(modname)
                bs_modules.append(sys.modules[modname])
            except ImportError:
                pass
        if not bs_modules:
            logger.error("No beanstalk_jobs modules found!")
            return

        # find all jobs
        jobs = []
        for bs_module in bs_modules:
            try:
                jobs += bs_module.beanstalk_job_list
            except AttributeError:
                pass
        if not jobs:
            logger.error("No beanstalk jobs found!")
            return
        logger.info("Available jobs:")
        for job in jobs:
            # determine right name to register function with
            app = job.app
            jobname = job.__name__
            try:
                func = settings.BEANSTALK_JOB_NAME % {
                    'app': app,
                    'job': jobname,
                }
            except AttributeError:
                func = '%s.%s' % (app, jobname)
            self.jobs[func] = job
            logger.info("* %s" % func)

        # spawn all workers and register all jobs
        try:
            worker_count = int(options['worker_count'])
            assert(worker_count > 0)
        except (ValueError, AssertionError):
            worker_count = 1
        self.spawn_workers(worker_count)

        # start working
        logger.info("Starting to work... (press ^C to exit)")
        try:
            for child in self.children:
                os.waitpid(child, 0)
        except KeyboardInterrupt:
            sys.exit(0)

    def spawn_workers(self, worker_count):
        """
        Spawn as many workers as desired (at least 1).
        Accepts:
        - worker_count, positive int
        """
        # no need for forking if there's only one worker
        if worker_count == 1:
            return self.work()

        logger.info("Spawning %s worker(s)" % worker_count)
        # spawn children and make them work (hello, 19th century!)
        for i in range(worker_count):
            child = os.fork()
            if child:
                self.children.append(child)
                continue
            else:
                self.work()
                break

    def work(self):
        """children only: watch tubes for all jobs, start working"""
        try:

            while True:
                try:
                    # Reattempt Beanstalk connection if connection attempt fails or is dropped
                    beanstalk = connect_beanstalkd()
                    for job in self.jobs.keys():
                        beanstalk.watch(job)
                    beanstalk.ignore('default')

                    # Connected to Beanstalk queue, continually process jobs until an error occurs
                    self.process_jobs(beanstalk)

                except (BeanstalkError, SocketError) as e:
                    logger.info("Beanstalk connection error: " + str(e))
                    time.sleep(2.0)
                    logger.info("retrying Beanstalk connection...")

        except KeyboardInterrupt:
            sys.exit(0)

    def process_jobs(self, beanstalk):
        while True:
            logger.debug("Beanstalk connection established, waiting for jobs")
            job = beanstalk.reserve()
            job_name = job.stats()['tube']
            if job_name in self.jobs:
                logger.debug("Calling %s with arg: %s" % (job_name, job.body))
                try:
                    self.jobs[job_name](job.body)
                except Exception, e:
                    tp, value, tb = sys.exc_info()
                    logger.error('Error while calling "%s" with arg "%s": '
                        '%s' % (
                            job_name,
                            job.body,
                            e,
                        )
                    )
                    logger.debug("%s:%s" % (tp.__name__, value))
                    logger.debug("\n".join(traceback.format_tb(tb)))
                    job.bury()
                else:
                    job.delete()
            else:
                job.release()

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏互联网高可用架构

教你如何成为Java的OOM Killer

1875
来自专栏张善友的专栏

RESTful API 设计最佳实践

背景 目前互联网上充斥着大量的关于RESTful API(为了方便,以后API和RESTful API 一个意思)如何设计的文章,然而却没有一个”万能“的设计...

3708
来自专栏Golang语言社区

让事件飞 ——Linux eventfd 原理与实践

目前越来越多的应用程序采用事件驱动的方式实现功能,如何高效地利用系统资源实现通知的管理和送达就愈发变得重要起来。在Linux系统中,eventfd是一个用来通知...

8942
来自专栏Elasticsearch实验室

Elasticsearch调优实践

本文基于ES 5.6.4,从性能和稳定性两方面,从linux参数调优、ES节点配置和ES使用方式三个角度入手,介绍ES调优的基本方案。当然,ES的调优绝不能一概...

4.4K16
来自专栏我的小碗汤

6个最好的Go语言Web框架

原文:Top 6 web frameworks for Go as of 2017

1601
来自专栏北京马哥教育

高可用集群基本概念与heartbeat文本配置接口

一、高可用集群基本概念: 什么是高可用集群: 所谓高可用集群,就是在出现故障时,可以把业务自动转移到其他主机上并让服务正常运行的集群构架 > 高...

3587
来自专栏源码之家

清空TMP文件夹下的文件

1393
来自专栏纯洁的微笑

jvm系列(十):教你如何成为Java的OOM Killer

此文出处云时代架构,作者:李艳鹏 教你如何成为Java的OOM Killer 前言 虽然事隔半年,当时排查线上OOM事故的过程记忆犹新,每一个步骤都历历在目,...

5984
来自专栏13blog.site

小测试

可以在 @RequestMapping 注解里面加上 method=RequestMethod.GET 或者使用 @GetMapping 注解

1631
来自专栏Kevin-ZhangCG

[ SSH框架 ] Hibernate框架学习之一

2468

扫码关注云+社区

领取腾讯云代金券