Query请求的执行流程分析

Query请求的执行流程分析

  • 我们以 httpd/handler.go中的serverQuery为入口来分析;
  • 在前面我们有专门讲解 httpd/handler 的一篇文章;
  • 我们不会分析查询结果是如何通过tsm tree和倒排索引得到的,重点放在查询的上层流程上;
  • 本章我们将主要精力放在 query.Executor的分析上。

Executor的定义和创建

  1. 定义:
type Executor struct {
    // Used for executing a statement in the query.
    // 具体的查询操作
    StatementExecutor StatementExecutor

    // Used for tracking running queries.
    // 每个query都会对应一个task, 由TaskManager统一管理
    TaskManager *TaskManager

    // Logger to use for all logging.
    // Defaults to discarding all log output.
    Logger *zap.Logger

    // expvar-based stats.
    stats *Statistics
}
  1. 对应的New函数:
func NewExecutor() *Executor {
    return &Executor{
        TaskManager: NewTaskManager(),
        Logger:      zap.NewNop(),
        stats:       &Statistics{},
    }
}
  1. 创建和初始化在run/server.go中的NewServer函数, 其中包括TaskManagerStatementExecutor的初始化
    s.QueryExecutor = query.NewExecutor()
    
    s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
        MetaClient:  s.MetaClient,
        TaskManager: s.QueryExecutor.TaskManager,
        TSDBStore:   s.TSDBStore,
        ShardMapper: &coordinator.LocalShardMapper{
            MetaClient: s.MetaClient,
            TSDBStore:  coordinator.LocalTSDBStore{Store: s.TSDBStore},
        },
        Monitor:           s.Monitor,
        PointsWriter:      s.PointsWriter,
        MaxSelectPointN:   c.Coordinator.MaxSelectPointN,
        MaxSelectSeriesN:  c.Coordinator.MaxSelectSeriesN,
        MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
    }
    s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
    s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
    s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries

TaskManager相关内容解析

Task分析
  1. 首先我们先来看Task, 它被定义在queyr/executor.go, 每个Query请求都会对应一个Task,交由TaskManager统一管理
type Task struct {
    query     string  //Query请求的string
    database  string  //当前Query需要操作的db
    status    TaskStatus //task运行的状态: RunningTask或KilledTask
    startTime time.Time // task开始时间
    closing   chan struct{} // task结束时,通过这个closing chan来通知
    monitorCh chan error
    err       error
    mu        sync.Mutex
}
  1. monitor监控函数,用来也监控task来发生的事情,比如慢请求
func (q *Task) monitor(fn MonitorFunc) {
    if err := fn(q.closing); err != nil {
        select {
        case <-q.closing:
        case q.monitorCh <- err:
        }
    }
}

2.1 这个MonitorFunc是一个函数类型,定义为type MonitorFunc func(<-chan struct{}) error, 它用来检查当前task对应的query的健康情况,如果当前query被某些错误中断,它将返回err; 2.2 如果fn MonitorFunc返回了err, 则将此err写到q.monitorCh这个chan中;

  1. close函数,结束掉一个task
    q.mu.Lock()
    if q.status != KilledTask {
        // Set the status to killed to prevent closing the channel twice.
        q.status = KilledTask
        //通过q.closing这个chan作通知
        close(q.closing)
    }
    q.mu.Unlock()
  1. kill函数,task自杀
    q.mu.Lock()
    if q.status == KilledTask {
        q.mu.Unlock()
        return ErrAlreadyKilled
    }
    q.status = KilledTask
    close(q.closing)
    q.mu.Unlock()
    return nil
ExecutionContext分析
  1. 定义在execution_context.go中, 跟踪当前query的执行状态
type ExecutionContext struct {
    // 匿名字段 Context 
    context.Context

    // The statement ID of the executing query.
    statementID int

    // The query ID of the executing query.
    QueryID uint64

    // The query task information available to the StatementExecutor.
    task *Task

    // Output channel where results and errors should be sent.
    Results chan *Result

    // Options used to start this query.
    //在 httpd.Handler.go中生成的ExecutionOptions
    ExecutionOptions

    mu   sync.RWMutex
    done chan struct{}
    err  error
}
  1. watch 函数: 开一个新的goroutine, 等待task.closing chan的通知,Contex.Done完成的通知和ExecutionOptions.AbortCh的通知
func (ctx *ExecutionContext) watch() {
    ctx.done = make(chan struct{})
    if ctx.err != nil {
        close(ctx.done)
        return
    }

    go func() {
        defer close(ctx.done)

        var taskCtx <-chan struct{}
        if ctx.task != nil {
            taskCtx = ctx.task.closing
        }

        select {
        case <-taskCtx:
            ctx.err = ctx.task.Error()
            if ctx.err == nil {
                ctx.err = ErrQueryInterrupted
            }
        case <-ctx.AbortCh:
            ctx.err = ErrQueryAborted
        case <-ctx.Context.Done():
            ctx.err = ctx.Context.Err()
        }
    }()
}
TaskManager分析
  1. 定义在query/taks_manager.go
type TaskManager struct {
    // Query 执行的超时时长,超时请求的执行将被中断
    QueryTimeout time.Duration

    // Log queries if they are slower than this time.
    // If zero, slow queries will never be logged.
    // 慢请求的阈值
    LogQueriesAfter time.Duration

    // Maximum number of concurrent queries.
    // 并发处理的query数量
    MaxConcurrentQueries int

    // Logger to use for all logging.
    // Defaults to discarding all log output.
    Logger *zap.Logger

    // Used for managing and tracking running queries.
    // Task id和task组成的map
    queries  map[uint64]*Task
    nextID   uint64
    mu       sync.RWMutex
    shutdown bool
}
  1. TaskManager.AttachQuery: 将query封装成task交由TaskManager管理
func (t *TaskManager) AttachQuery(q *influxql.Query, opt ExecutionOptions, interrupt <-chan struct{}) (*ExecutionContext, func(), error) {
    ...

    // 超过设置的并发Query数量后,Attach失败
    if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
        return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
    }

    // 生成 Task和 TaskId
    qid := t.nextID
    query := &Task{
        query:     q.String(),
        database:  opt.Database,
        status:    RunningTask,
        startTime: time.Now(),
        closing:   make(chan struct{}),
        monitorCh: make(chan error),
    }
    t.queries[qid] = query

    //新开一个goroutine, 等待query超时,http连接断后工等各种情况,后面详述
    go t.waitForQuery(qid, query.closing, interrupt, query.monitorCh)
    
    if t.LogQueriesAfter != 0 {
        // 遇到慢查询打log
        go query.monitor(func(closing <-chan struct{}) error {
            timer := time.NewTimer(t.LogQueriesAfter)
            defer timer.Stop()

            select {
            case <-timer.C:
                t.Logger.Warn(fmt.Sprintf("Detected slow query: %s (qid: %d, database: %s, threshold: %s)",
                    query.query, qid, query.database, t.LogQueriesAfter))
            case <-closing:
            }
            return nil
        })
    }
    t.nextID++

    // 生成ExcutionContext
    ctx := &ExecutionContext{
        Context:          context.Background(),
        QueryID:          qid,
        task:             query,
        ExecutionOptions: opt,
    }
    
    // 开如watch这个query的执行状态
    ctx.watch()
    return ctx, func() { t.DetachQuery(qid) }, nil
}
分析Query执行过程中可能遇到的几种情况
前提
  • 其实还是从results := h.QueryExecutor.ExecuteQuery(q, opts, closing)说起
AttachQuery失败
  1. 在执行Query前,先要将Query生成Task交由TaskManager管理,AttachQuery失败有两种情况
    // query/task_manager.go:AttachQuery
    if t.shutdown {
        return nil, nil, ErrQueryEngineShutdown
    }

    if t.MaxConcurrentQueries > 0 && len(t.queries) >= t.MaxConcurrentQueries {
        return nil, nil, ErrMaxConcurrentQueriesLimitExceeded(len(t.queries), t.MaxConcurrentQueries)
    }

这将反回相应的err, 如果处理这个err呢?

// query/executor.go:executeQuery
func (e *Executor) executeQuery(query *influxql.Query, opt ExecutionOptions, closing <-chan struct{}, results chan *Result) {
    ...
    ctx, detach, err := e.TaskManager.AttachQuery(query, opt, closing)
    if err != nil {
        select {
        case results <- &Result{Err: err}:
        case <-opt.AbortCh:
        }
        return
    }
    ...
}

这个err被封装到Result中写入到results这个chan中. 接下来呢?

results := h.QueryExecutor.ExecuteQuery(q, opts, closing)
for r := range results {
  ...
}

调用返回,results就是上面写入的chan(httpd/henalder.go:serveQuery(), 读取出包含err信息的Result返回给客户端

Query被正确执行并返回给客户端
  1. AttachQuery 成功后返回了ExecutionContext,并且将返回Query结果的Results chan赋值给ExcutionContext.Results备用;
  2. 一个Query可能包含多个statement, 逐一执行
for ; i < len(query.Statements); i++ {
...
}
  1. Query语句过滤, 针对system measurements(_fieldKeys,_measurements,_series,_tagkeys,_tags)的 select操作,是不被允许的, 将含err信息的Result写入Results
results <- &Result{
        Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command),
}
  1. 改变query statement, 主要是针对metashow ...,改写成针对system measurement的select语句
  2. 执行具体的Query: err = e.StatementExecutor.ExecuteStatement(stmt, ctx)(coordinator/statement_executor.go),特别是针对select query, 调用 func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error 关于StatementExecutor, 我们这里不详细分析,只需要知道它会将查询结果封装成query.Result里写入到上面提到的contex.Results chan中
Query执行过程中Http连接中断
  1. httpd/henalder.go:serveQuery中
closing = make(chan struct{})
        if notifier, ok := w.(http.CloseNotifier); ok {
            done := make(chan struct{})
            defer close(done)

            notify := notifier.CloseNotify()
            go func() {
                // Wait for either the request to finish
                // or for the client to disconnect
                select {
                case <-done:
                case <-notify:
                    close(closing)
                }
            }()
            opts.AbortCh = done
        } else {
            defer close(closing)
        }

如果http 连接断掉,则会close(closing),关掉这个chosing chan;

  1. TaskManager.AttachQuery时,会TaskManager.waitForQuery:
 select {
    case <-closing:
        t.queryError(qid, ErrQueryInterrupted)
        ....
    }
    t.KillQuery(qid)

上面的select会返回,KillQuery被调用,它又会调用Task.kill()

 func (q *Task) kill() error {
    ...
    q.status = KilledTask
    close(q.closing)
    ...
}

q.closing这个chan关掉,让我们再次回到AttachQuery的最后是ExcutionContext.watch:

func (ctx *ExecutionContext) watch() {
    ctx.done = make(chan struct{})
    ...
    go func() {
        defer close(ctx.done)

        var taskCtx <-chan struct{}
        if ctx.task != nil {
            taskCtx = ctx.task.closing
        }

        select {
        case <-taskCtx:
            ctx.err = ctx.task.Error()
            if ctx.err == nil {
                ctx.err = ErrQueryInterrupted
            }
        ...
        }
    }()
}

上面的select将被触发,将ctx.err = ErrQueryInterrupted并调用close(ctx.done),关掉这个ctx.donw chan,这个chan很关键,让我们回到执行具体query的coordinator/statement_executor.go:executeSelectStatement:

 func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *query.ExecutionContext) error {
    ...
    for {
    select {
                ...
                case <-ctx.Done():
                    return ctx.Err()
                default:
                }
                ...
    }
    ...
    if err := ctx.Send(result); err != nil {
            return err
        }
}

上面的两处都会返回err, executeSelectStatement调用结束,返回err -> ErrQueryInterrupted, 最终被封装在query.Result里写入到Results chan中;

Query执行超时
  1. 如果超时,TaskManager::waitForQuery中下面的代码将被触发:
var timerCh <-chan time.Time
    if t.QueryTimeout != 0 {
        timer := time.NewTimer(t.QueryTimeout)
        timerCh = timer.C
        defer timer.Stop()
    }

    select {
    ...
    case <-timerCh:
        t.queryError(qid, ErrQueryTimeoutLimitExceeded)
    ...
    }
    t.KillQuery(qid)
  1. 往下的流程就和http断开后的流程一样了,最后返回的err -> ErrQueryTimeoutLimitExceeded

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ACM算法日常

悬赏题No.1- 扑克牌 解题源码!

,Sorry一直没时间贴代码。该类题目的源码不做讲解,只贴源码,有兴趣的可以看看。

902
来自专栏PingCAP的专栏

TiDB 源码阅读系列文章(十五)Sort Merge Join

在开始阅读源码之前, 我们来看看什么是 Sort Merge Join (SMJ),定义可以看 wikipedia。简单说来就是将 Join 的两个表,首先根据...

1410
来自专栏me的随笔

【译】《Clean C#》

本文是《Clean C#》一书译文的序言,阅读译文请移步至:《Clean C#》译文。

901
来自专栏苦逼的码农

加锁还是不加锁,这是一个问题

上次我说过, 我们这个线程的世界是个弱肉强食的地方, 大家为了争抢资源大打出手,时不时闹出些内存数据互相被覆盖的事故, 给人类带了无穷的烦恼。

2216
来自专栏牛客网

蚂蚁金服暑期实习生一面总结

6892
来自专栏非著名程序员

Android程序员必备精品资源

平时写程序中不断收集到的一些比较常用的东西,实用工具等,分享给大家。 实用工具集锦 Android Lifecycle https://github.com/x...

2087
来自专栏me的随笔

【译】《Clean C#》

本文是《Clean C#》一书译文的序言,阅读译文请移步至:《Clean C#》译文。

1382
来自专栏牛客网

后台开发:校招中遇到的问题总结

楼主的秋招也算是今天开始结束了,期间也迷茫过,最终拿到了百度sp、腾讯sp、360sp、京东、招行信用卡中心、华为、中兴、陌陌sp 等的offer(具体的面经前...

6129
来自专栏魏琼东

本人为巨杉数据库(开源NoSQL)写的C#驱动,支持Linq,全部开源,已提交github

一、关于NoSQL的项目需求      这些年在做AgileEAS.NET SOA 中间件平台的推广、技术咨询服务过程之中,特别是针对我们最熟悉的医疗行业应用之...

7548
来自专栏jessetalks

初探领域驱动设计(2)Repository在DDD中的应用

概述 上一篇我们算是粗略的介绍了一下DDD,我们提到了实体、值类型和领域服务,也稍微讲到了DDD中的分层结构。但这只能算是一个很简单的介绍,并且我们在上篇的末...

4626

扫码关注云+社区

领取腾讯云代金券