go 并发处理脚本

并发处理脚本

最近经常涉及到脚本的编写。本身项目数据量较大,所以经常编写的脚本需要高并发,干脆就提取出来。

如果有地方用到,只需要实现接口即可。

谨以此文抛砖引玉,不喜勿喷

package script

import (
	"fmt"
	"time"
	"errors"
	"flag"
	"pigcome/utils"
)

// 实现此接口即可
type model interface {
  	// 每个goroutine运行的函数
		RunOne(id int64) (params []int64, errInfos map[int64]string, err error)
  	// 最后收集到结果后的处理函数
		HandleResult(res Result)
}

type RunParam struct {
	Start    int64			//起始ID
	End      int64			//结束ID
	Step     int			//同时进行的goroutine数目
	Ids      []int64		//自定义IDs
	Deadline time.Duration	//goroutine超时限制
}

type ResultType int

const (
	_       ResultType = iota
	Success
	Panic
	Timeout
	Error
)

type Result struct {
	Type     ResultType			//结果类型
	ID       int64				//每个goroutine的ID
  	Ress     []int64			//结果的参数(自定义)
	ErrInfos map[int64]string	 //runOne函数自定义的错误信息
	ErrInfo  error				//整个goroutine的错误信息
}

func Run(params *RunParam, m model) (err error) {
	beginAt := time.Now()
	err = CheckParams(params)
	if err != nil {
		return
	}
	// 1000并没有什么特殊意义,只是为了有缓冲,提高速度
	ch := make(chan Result, 1000) 
  // 令牌,只有持有令牌才能运行,为了控制goroutine同时进行的数目
	token := make(chan struct{}, params.Step)
  // 以此判断结果是否都处理完成
	done := make(chan struct{})

	go collectResult(params, ch, done, m)
	if len(params.Ids) > 0 {
		for _, id := range params.Ids {
			token <- struct{}{}
			go runOne(params, ch, m, token, id)
		}
	} else {
		for id := params.Start; id < params.End; id++ {
			token <- struct{}{}
			go runOne(params, ch, m, token, id)
		}
	}
	<-done

	since := time.Since(beginAt)
	utils.PrintlnColorful(utils.Yellow, "耗时:", since.String())
	return
}

func collectResult(params *RunParam, ch chan Result, done chan struct{}, m model) {
	var finishCount, count int64
	idsLength := len(params.Ids)
	if idsLength > 0 {
		count = int64(idsLength)
	} else {
		count = params.End - params.Start
	}
	for {
		result := <-ch
		m.HandleResult(result)
		finishCount++
		if result.ErrInfo != nil {
			utils.PrintfColorful(utils.Red, "%d: %+v\n", finishCount, result)
		}
		if finishCount >= count {
			done <- struct{}{}
			return
		}
	}
}

func runOne(params *RunParam, ch chan Result, m model, token chan struct{}, id int64) {
	defer func() {
		<-token
		if err := recover(); err != nil {
			ch <- Result{Panic, id, nil, nil, fmt.Errorf("panic: %v", err)}
		}
	}()

	type runOneCh struct {
		ress     []int64
		errInfos map[int64]string
		err      error
	}
	errCh := make(chan runOneCh)

	go func(ch chan runOneCh, id int64) {
		ress, errInfos, err := m.RunOne(id)
		errCh <- runOneCh{ress, errInfos, err}
	}(errCh, id)

	select {
	case runOneRes := <-errCh:
		if runOneRes.err != nil {
			ch <- Result{Error, id, nil, nil, runOneRes.err}
		} else {
			ch <- Result{Success, id, runOneRes.ress, nil, nil}
		}
	case <-time.After(params.Deadline):
		ch <- Result{Timeout, id, nil, nil, errors.New("timeout")}
	}
}

// 初始化运行参数
func NewParams() (params *RunParam) {
	params = new(RunParam)
	flag.Int64Var(&params.Start, "start", 0, "")
	flag.Int64Var(&params.End, "end", 0, "")
	flag.IntVar(&params.Step, "step", 100, "")
	var second int
	flag.IntVar(&second, "second", 5, "")
	flag.Parse()
	params.Deadline = time.Second * time.Duration(second)
	return
}

// 运行参数检测
func CheckParams(params *RunParam) error {
	if params.End <= params.Start {
		return fmt.Errorf("ress range err, %+v", *params)
	}
	if params.Deadline <= 0 {
		return fmt.Errorf("ress deadline err, %+v", *params)
	}
	return nil
}

另外,用到了utils的的相关代码,下面贴一下

const (
	Red    = "\033[31m"
	Yellow = "\033[33m"
	Green  = "\033[32m"
)

func PrintlnColorful(color string, vals ...interface{}) {
	fmt.Printf(color)
	fmt.Println(vals...)
	fmt.Printf("\033[0m")
}

func PrintfColorful(color string, format string, vals ...interface{}) {
	fmt.Printf(color)
	fmt.Printf(format, vals...)
	fmt.Printf("\033[0m")
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

1 条评论
登录 后参与评论

相关文章

来自专栏颇忒脱的技术博客

Servlet 3.1 Async IO分析

Servlet Async Processing提供了一种异步请求处理的手段(见我的另一篇文章Servlet 3.0 异步处理详解),能够让你将Http thr...

803
来自专栏技术博客

C# Obsolete

Obsolete 属性将某个程序实体标记为一个建议不再使用的实体。每次使用被标记为已过时的实体时,随后将生成警告或错误,这取决于属性是如何配置的。例如:

621
来自专栏张善友的专栏

Dynamite动态排序库

易于使用和高性能动态排序库支持类似 SQL 语法和嵌套/复杂的表达式,使用 System.Linq.Expression 动态生成快速比较器。 使用此库就可以使...

18910
来自专栏Java3y

阅读SSH项目之ERP

前言 本博文主要是记录我阅读过的SSH项目所学习到的知识,并不是相关系列教程。该SSH项目的gitHub地址:ERP项目地址 删除数据 实际业务中真正意义上的数...

2747
来自专栏逸鹏说道

C# 温故而知新:Stream篇(五)下

对于重写的方法这里不再重复说明,大家可以参考我写的第一篇 以下是memoryStream独有的方法 virtual byte[] GetBuffer() 这个方...

33510
来自专栏大内老A

ASP.NET Core的配置(1):读取配置信息

提到“配置”二字,我想绝大部分.NET开发人员脑海中会立马浮现出两个特殊文件的身影,那就是我们再熟悉不过的app.config和web.config,多年以来我...

1938
来自专栏向治洪

Java 读写大文本文件

如下的程序,将一个行数为fileLines的文本文件平均分为splitNum个小文本文件,其中换行符’r’是linux上的,windows的java换行符是’...

17610
来自专栏张善友的专栏

在Linux和Windows平台上操作MemoryMappedFile(简称MMF)

操作系统很早就开始使用内存映射文件(Memory Mapped File)来作为进程间的共享存储区,这是一种非常高效的进程通讯手段。.NET 4.0新增加了一个...

1916
来自专栏智能大石头

XCode读取Excel数据(适用于任何数据库)

虽然是充血模型,虽然是强类型,XCode同样支持遍历任何数据库结构,并以强类型(相对于DataSet等字典访问)方式读取数据。 要遍历数据库结构是很容易的事情,...

1928
来自专栏木宛城主

工欲善其事,必先利其器:分享一套Code Smith 搭建N层架构模板

 开篇 平常开发时,由于冗余代码过多,程序员做重复的工作过多势必会影响开发效率。倘若对重复性代码简单的复制、粘贴,虽然也能节省时间,但也需仔细一步步替换,这无...

1738

扫码关注云+社区