Influxdb中Select查询请求结果涉及到的一些数据结构

前言

关于Select查询请求结果涉及到的一些数据结构

Series
  1. 定义
type Series struct {
    // Name is the measurement name.
    Name string

    // Tags for the series.
    Tags Tags

    id uint64
}
type Tags struct {
    id string
    m  map[string]string
}
  1. Series其实就是measurementtags的组合,tags是tag key和tag value的map.这个Tags的id是如何产生的呢,其实就是对tag key和tag value编码到[]byte: tagkey1\0tagkey2\0...\tagvalue1\0tagvalue2\0...,具体实现定义在query/point.go中的encodeTags
Row
  1. 定义
type Row struct {
    Time int64

    // Series contains the series metadata for this row.
    Series Series

    // Values contains the values within the current row.
    Values []interface{}
}
  1. Row表示查询结果集中的每一行, 其中的Values表示是返回的Fields的集合
Iterator
bufFloatIterator
  1. 定义
type bufFloatIterator struct {
    itr FloatIterator
    buf *FloatPoint
}

相当于c里面的链表元素,itr指向下一个元素的指针,buf表示当前元素,即FloatPoint类型的链表的迭代器.

  1. 看一下FloatPoint定义
type FloatPoint struct {
    Name string
    Tags Tags

    Time  int64
    Value float64
    Aux   []interface{}

    // Total number of points that were combined into this point from an aggregate.
    // If this is zero, the point is not the result of an aggregate function.
    Aggregated uint32
    Nil        bool
}

定义在query/point.gen.go中, 表示一条field为float类型的数据

  1. Next实现
func (itr *bufFloatIterator) Next() (*FloatPoint, error) {
    buf := itr.buf
    if buf != nil {
        itr.buf = nil
        return buf, nil
    }
    return itr.itr.Next()
}

当前Iterator的值不为空,就返回当前的buf, 当前的值为空,就返回itr.itr.Next(),即指向的下一个元素

  1. unread: iterator回退操作
func (itr *bufFloatIterator) unread(v *FloatPoint) { itr.buf = v }
floatMergeIterator
  1. 组合了多个floatIterator
  2. 我们来看下定义
type floatMergeIterator struct {
    inputs []FloatIterator
    heap   *floatMergeHeap
    init   bool

    closed bool
    mu     sync.RWMutex

    // Current iterator and window.
    curr   *floatMergeHeapItem
    window struct {
        name      string
        tags      string
        startTime int64
        endTime   int64
    }
}

因为要作merge, 这里需要对其管理的所有Interator元素作排序,这里用到了golang的container/heap作堆排, 大小根堆不太清楚了大家自行google吧。 因为要用golang的container/heap来管理,需要实现下面规定的接口,

type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

floatMergeIterator定义中的floatMergeHeap即实现了上面的接口,我们主要来看一下比较函数的实现,比较的其实就是FloatPoint

func (h *floatMergeHeap) Less(i, j int) bool {
    x, err := h.items[i].itr.peek()
    if err != nil {
        return true
    }
    y, err := h.items[j].itr.peek()
    if err != nil {
        return false
    }

    if h.opt.Ascending {
        if x.Name != y.Name {
            return x.Name < y.Name
        } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
            return xTags.ID() < yTags.ID()
        }
    } else {
        if x.Name != y.Name {
            return x.Name > y.Name
        } else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); xTags.ID() != yTags.ID() {
            return xTags.ID() > yTags.ID()
        }
    }

    xt, _ := h.opt.Window(x.Time)
    yt, _ := h.opt.Window(y.Time)

    if h.opt.Ascending {
        return xt < yt
    }
    return xt > yt
}

比较的优先级先是FloatPoint的measurement名,然后是tagset id, 最后是time,将这个比较函数我们就可以知道.

  1. 看一下结构:

float_merge_iterator.png

  1. Next函数的实现
func (itr *floatMergeIterator) Next() (*FloatPoint, error) {
    itr.mu.RLock()
    defer itr.mu.RUnlock()
    if itr.closed {
        return nil, nil
    }
 
    // 堆排的heap数据结构并不会一开始就构造,而是在首次遍历时初始化构造 
    if !itr.init {
        items := itr.heap.items
        itr.heap.items = make([]*floatMergeHeapItem, 0, len(items))
        for _, item := range items {
            if p, err := item.itr.peek(); err != nil {
                return nil, err
            } else if p == nil {
                continue
            }
            itr.heap.items = append(itr.heap.items, item)
        }
        heap.Init(itr.heap)
        itr.init = true
    }

    for {
        // Retrieve the next iterator if we don't have one.
        if itr.curr == nil {
            if len(itr.heap.items) == 0 {
                return nil, nil
            }
            itr.curr = heap.Pop(itr.heap).(*floatMergeHeapItem)

            // Read point and set current window.
            p, err := itr.curr.itr.Next()
            if err != nil {
                return nil, err
            }
            tags := p.Tags.Subset(itr.heap.opt.Dimensions)
            itr.window.name, itr.window.tags = p.Name, tags.ID()
            itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
            return p, nil
        }

        // Read the next point from the current iterator.
        p, err := itr.curr.itr.Next()
        if err != nil {
            return nil, err
        }

        // If there are no more points then remove iterator from heap and find next.
        if p == nil {
            itr.curr = nil
            continue
        }

        // 下面的代码确认是否需要切换Window,
        // Check if the point is inside of our current window.
        inWindow := true
        if window := itr.window; window.name != p.Name {
            inWindow = false
        } else if tags := p.Tags.Subset(itr.heap.opt.Dimensions); window.tags != tags.ID() {
            inWindow = false
        } else if opt := itr.heap.opt; opt.Ascending && p.Time >= window.endTime {
            inWindow = false
        } else if !opt.Ascending && p.Time < window.startTime {
            inWindow = false
        }

        // If it's outside our window then push iterator back on the heap and find new iterator.
        if !inWindow {
            itr.curr.itr.unread(p)
            heap.Push(itr.heap, itr.curr)
            itr.curr = nil
            continue
        }

        return p, nil
    }
}

结合上面的Less函数可知,针对所有的FloatPoint, 排序的最小单位是Window(由measurement name, tagset id, time window组成),属性同一Window的FloatPoint不再排序。如果是按升级规则遍历,则遍历的结果是按Window从小到大排,但同一Window内部的多条Point,时间不一定是从小到大的。

floatSortedMergeIterator
  1. 定义:
type floatSortedMergeIterator struct {
    inputs []FloatIterator
    heap   *floatSortedMergeHeap
    init   bool
}
type floatSortedMergeHeap struct {
    opt   IteratorOptions
    items []*floatSortedMergeHeapItem
}
type floatSortedMergeHeapItem struct {
    point *FloatPoint // 这个point用来排序当前所有的Item
    err   error
    itr   FloatIterator
}

同样它也借助了golang/container中的heap, 与floatMergeIterator相比它实现了全体Point的排序遍历,我们来看一下是如何实现的;

  1. pop函数:
func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
    // Initialize the heap. See the MergeIterator to see why this has to be done lazily.
    if !itr.init {
        items := itr.heap.items
        itr.heap.items = make([]*floatSortedMergeHeapItem, 0, len(items))
        for _, item := range items {
            var err error
            //  在这里为每个 floatSortedMergeHeapItem的point赋值
            if item.point, err = item.itr.Next(); err != nil {
                return nil, err
            } else if item.point == nil {
                continue
            }
            itr.heap.items = append(itr.heap.items, item)
        }
        heap.Init(itr.heap)
        itr.init = true
    }

    if len(itr.heap.items) == 0 {
        return nil, nil
    }

    // Read the next item from the heap.
    item := heap.Pop(itr.heap).(*floatSortedMergeHeapItem)
    if item.err != nil {
        return nil, item.err
    } else if item.point == nil {
        return nil, nil
    }

    // Copy the point for return.
    p := item.point.Clone()

    // 关键点在这里: 在遍历 FloatIterator时候,不是直接遍历,这里重置Item的point,然后重新push这个item, 作堆排,巧妙~~~
    if item.point, item.err = item.itr.Next(); item.point != nil {
        heap.Push(itr.heap, item)
    }

    return p, nil
}

对所有Iterator包含的所在FloatPoint,都从排序,没有Window的概念.

floatIteratorScanner
  1. floatIterator的值扫描到map里
  2. 定义
type floatIteratorScanner struct {
    input        *bufFloatIterator
    err          error
    keys         []influxql.VarRef
    defaultValue interface{}
}
  1. ScanAt: 在floatIterator中找满足条件的Point, 条件是ts, name, tags均相等,实现比较简单
func (s *floatIteratorScanner) ScanAt(ts int64, name string, tags Tags, m map[string]interface{}) {
    if s.err != nil {
        return
    }

    // 获取当前的FloatPoint
    p, err := s.input.Next()
    if err != nil {
        s.err = err
        return
    } else if p == nil {
        s.useDefaults(m)
        return
    } else if p.Time != ts || p.Name != name || !p.Tags.Equals(&tags) {
        s.useDefaults(m)
        // 如果
        s.input.unread(p)
        return
    }

    if k := s.keys[0]; k.Val != "" {
        if p.Nil {
            if s.defaultValue != SkipDefault {
                m[k.Val] = castToType(s.defaultValue, k.Type)
            }
        } else {
            m[k.Val] = p.Value
        }
    }
    for i, v := range p.Aux {
        k := s.keys[i+1]
        switch v.(type) {
        case float64, int64, uint64, string, bool:
            m[k.Val] = v
        default:
            // Insert the fill value if one was specified.
            if s.defaultValue != SkipDefault {
                m[k.Val] = castToType(s.defaultValue, k.Type)
            }
        }
    }
}
floatParallelIterator
  1. 定义:
type floatParallelIterator struct {
    input FloatIterator
    ch    chan floatPointError

    once    sync.Once
    closing chan struct{}
    wg      sync.WaitGroup
}
  1. 在一个单独的goroutine里面循环调用floatIterator.Next获取FloatPoint,然后写入到chan中:
func (itr *floatParallelIterator) monitor() {
    defer close(itr.ch)
    defer itr.wg.Done()

    for {
        // Read next point.
        p, err := itr.input.Next()
        if p != nil {
            p = p.Clone()
        }

        select {
        case <-itr.closing:
            return
        case itr.ch <- floatPointError{point: p, err: err}: //写入数据到Chan
        }
    }
}
  1. 使用的时候,调用Next, 从上面的Chan中读数据:
func (itr *floatParallelIterator) Next() (*FloatPoint, error) {
    v, ok := <-itr.ch
    if !ok {
        return nil, io.EOF
    }
    return v.point, v.err
}
floatLimitIterator
  1. 限制在每个window中读取的Point个数
  2. 定义:
type floatLimitIterator struct {
    input FloatIterator
    opt   IteratorOptions
    n     int

    // 定义了当前的window, measurement和tagset id相同的point算作同一个window group
    prev struct {
        name string
        tags Tags
    }
}
  1. Next:
func (itr *floatLimitIterator) Next() (*FloatPoint, error) {
    for {
        //读取当前的Point
        p, err := itr.input.Next()
        if p == nil || err != nil {
            return nil, err
        }

        // Reset window and counter if a new window is encountered.
        // 判断是否需要切换到新的window, 需要重置计数器n
        if p.Name != itr.prev.name || !p.Tags.Equals(&itr.prev.tags) {
            itr.prev.name = p.Name
            itr.prev.tags = p.Tags
            itr.n = 0
        }

        // Increment counter.
        itr.n++

        // Read next point if not beyond the offset.
        // 首先要跳到开始读取的偏移量Offset
        if itr.n <= itr.opt.Offset {
            continue
        }

        // Read next point if we're beyond the limit.
        // 判断是否已到达Limit限制
        if itr.opt.Limit > 0 && (itr.n-itr.opt.Offset) > itr.opt.Limit {
            continue
        }

        return p, nil
    }
}
floatFillIterator
  1. 运行在select中的Group by time fill(...), 在当前的interval的window中,如果没有查询到值,则使用相应的添充规则生成相应的值
  2. 具体可参见:group-by-time-intervals-and-fill
  3. 定义:
type floatFillIterator struct {
    input     *bufFloatIterator
    prev      FloatPoint
    startTime int64  // 此次iterator覆盖的时间边界 
    endTime   int64
    auxFields []interface{}
    init      bool
    opt       IteratorOptions

    // 如果在当前的duration中,window窗口还未过期,但已不相应的数据,则应用填充规则生成新的值
    window struct {
        name   string
        tags   Tags
        time   int64
        offset int64
    }
}
floatInterruptIterator
  1. 每遍历N条数据后,检测下遍历是否需要中断
  2. 定义
type floatInterruptIterator struct {
    input   FloatIterator
    closing <-chan struct{} //用于通知中断的chan
    count   int //遍历的计数器
}
  1. Next:实现比较简单
func (itr *floatInterruptIterator) Next() (*FloatPoint, error) {
    if itr.count&0xFF == 0xFF {
        select {
        case <-itr.closing:
            return nil, itr.Close()
        default:
            // Reset iterator count to zero and fall through to emit the next point.
            itr.count = 0
        }
    }

    // Increment the counter for every point read.
    itr.count++
    return itr.input.Next()
}
floatReduceFloatIterator
  1. 对每个interval内的数据作reduce操作
  2. 定义:
type floatReduceFloatIterator struct {
    input    *bufFloatIterator //需要处理的原始数据
    create   func() (FloatPointAggregator, FloatPointEmitter) //创建reduce函数
    dims     []string
    opt      IteratorOptions
    points   []FloatPoint // reduce处理后的数据存储在这里
    keepTags bool
}
  1. reduce() 返回处理后的points, 函数较长,但逻辑比较简单 <pre><code> func (itr *floatReduceFloatIterator) reduce() ([]FloatPoint, error) { //Calculate next window. var ( startTime, endTime int64 window struct { name string tags string } ) // 1. 确定当前的Window和时间边界 for { p, err := itr.input.Next() if err != nil || p == nil { return nil, err } else if p.Nil { continue } // Unread the point so it can be processed. itr.input.unread(p) startTime, endTime = itr.opt.Window(p.Time) window.name, window.tags = p.Name, p.Tags.Subset(itr.opt.Dimensions).ID() break } // Create points by tags. m := make(map[string]*floatReduceFloatPoint) for { // Read next point. // 2. 读取当前时间边界内的数据; // 如果当前时间边界内无数据或者measurement name 不同,本次reduce完成 curr, err := itr.input.NextInWindow(startTime, endTime) if err != nil { return nil, err } else if curr == nil { break } else if curr.Nil { continue } else if curr.Name != window.name { itr.input.unread(curr) break } // Ensure this point is within the same final window. // 3. 确认curr是否在当前Window内,不在则本次reduce完成 if curr.Name != window.name { itr.input.unread(curr) break } else if tags := curr.Tags.Subset(itr.opt.Dimensions); tags.ID() != window.tags { itr.input.unread(curr) break } // Retrieve the tags on this point for this level of the query. // This may be different than the bucket dimensions. tags := curr.Tags.Subset(itr.dims) id := tags.ID() // Retrieve the aggregator for this name/tag combination or create one. // 4. 按tagset id来构造floatReduceFloatPoint来Aggregate rp := m[id] if rp == nil { aggregator, emitter := itr.create() rp = &floatReduceFloatPoint{ Name: curr.Name, Tags: tags, Aggregator: aggregator, Emitter: emitter, } m[id] = rp } rp.Aggregator.AggregateFloat(curr) } // Reverse sort points by name & tag if our output is supposed to be ordered. ... // Assume the points are already sorted until proven otherwise. sortedByTime := true // Emit the points for each name & tag combination. a := make([]FloatPoint, 0, len(m)) for _, k := range keys { rp := m[k] points := rp.Emitter.Emit() for i := len(points) - 1; i >= 0; i-- { points[i].Name = rp.Name if !itr.keepTags { points[i].Tags = rp.Tags } // Set the points time to the interval time if the reducer didn't provide one. if points[i].Time == ZeroTime { points[i].Time = startTime } else { sortedByTime = false } a = append(a, points[i]) } } // Points may be out of order. Perform a stable sort by time if requested. ... return a, nil } </code></pre>
CallIterator

CallIterator实现了聚合函数的Iterator: count, min, max, sum, first, last, mean, distinct,Median....主要是使用我们上面介绍的一系列的ReduceIterator,提供相应的Reducer, 实现AggregateFloatEmit这两个函数

IteratorOptions
  1. 构建Iterator时用到的一些配置选项, 包含的内容较多
  2. 定义:
 type IteratorOptions struct {
    // Expression to iterate for.
    // This can be VarRef or a Call.
    Expr influxql.Expr

    // Auxilary tags or values to also retrieve for the point.
    Aux []influxql.VarRef
    
    Sources []influxql.Source

    // Group by interval and tags.
    Interval   Interval
    Dimensions []string            // The final dimensions of the query (stays the same even in subqueries).
    GroupBy    map[string]struct{} // Dimensions to group points by in intermediate iterators.
    Location   *time.Location

    // Fill options.
    Fill      influxql.FillOption
    FillValue interface{}

    // Condition to filter by.
    Condition influxql.Expr

    // Time range for the iterator.
    StartTime int64
    EndTime   int64

    // Sorted in time ascending order if true.
    Ascending bool

    // Limits the number of points per series.
    Limit, Offset int

    // Limits the number of series.
    SLimit, SOffset int

    // Removes the measurement name. Useful for meta queries.
    StripName bool

    // Removes duplicate rows from raw queries.
    Dedupe bool

    // Determines if this is a query for raw data or an aggregate/selector.
    Ordered bool

    // Limits on the creation of iterators.
    MaxSeriesN int

    //通过当前 Iterator退出的chan.
    InterruptCh <-chan struct{}

    // Authorizer can limit access to data
    Authorizer Authorizer
 }

Cursor

  1. select后会得到这个cursor,用来遍历查询结结果
  2. 定义:
type Cursor interface {
    Scan(row *Row) bool

    // Stats returns the IteratorStats from the underlying iterators.
    Stats() IteratorStats

    // Err returns any errors that were encountered from scanning the rows.
    Err() error

    // Columns returns the column names and types.
    Columns() []influxql.VarRef

    // Close closes the underlying resources that the cursor is using.
    Close() error
}
  1. Scan:
func (cur *rowCursor) Scan(row *Row) bool {
    if len(cur.rows) == 0 {
        return false
    }

    *row = cur.rows[0]
    if row.Series.Name != cur.series.Name || !row.Series.Tags.Equals(&cur.series.Tags) {
        cur.series.Name = row.Series.Name
        cur.series.Tags = row.Series.Tags
        cur.series.id++
    }
    cur.rows = cur.rows[1:]
    return true
}

floatIteratorMapper

  1. *IteratorMapper系列, 主要作用是遍历cursor
  2. 定义:
type floatIteratorMapper struct {
    cur    Cursor
    row    Row
    driver IteratorMap   // which iterator to use for the primary value, can be nil
    fields []IteratorMap // which iterator to use for an aux field
    point  FloatPoint
}
  1. 我们来看一下Next接口, 对当前的cursor作scan来返回FloatPoint
func (itr *floatIteratorMapper) Next() (*FloatPoint, error) {

     //对当前的cursor作scan来返回FloatPoint
    if !itr.cur.Scan(&itr.row) {
        if err := itr.cur.Err(); err != nil {
            return nil, err
        }
        return nil, nil
    }

    itr.point.Time = itr.row.Time
    itr.point.Name = itr.row.Series.Name
    itr.point.Tags = itr.row.Series.Tags

    if itr.driver != nil {
        if v := itr.driver.Value(&itr.row); v != nil {
            if v, ok := castToFloat(v); ok {
                itr.point.Value = v
                itr.point.Nil = false
            } else {
                itr.point.Value = 0
                itr.point.Nil = true
            }
        } else {
            itr.point.Value = 0
            itr.point.Nil = true
        }
    }
    for i, f := range itr.fields {
        itr.point.Aux[i] = f.Value(&itr.row)
    }
    return &itr.point, nil
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏陈满iOS

iOS开发·runtime原理与实践: 关联对象篇(Associated Object)(应用场景:为分类添加“属性”,为UI控件关联事件Block体,为了不重复获得某种数据)

分类(category)与关联对象(Associated Object)作为objective-c的扩展机制的两个特性:分类,可以通过它来扩展方法;Associ...

4202
来自专栏码匠的流水账

聊聊jesque的WorkerImpl与WorkerPool

Resque是一个使用redis来创建后台任务的ruby组件。而jesque是其java版本。通常用来做延时队列。

721
来自专栏别先生

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念 序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回...

26610
来自专栏鸿的学习笔记

python源码阅读笔记之GC(二)

这是指使用glibc的malloc()方法去申请内存,当然这不是啥对象都用这个,而是取决于分配的内存大小

1032
来自专栏函数式编程语言及工具

FunDA(2)- Streaming Data Operation:流式数据操作

   在上一集的讨论里我们介绍并实现了强类型返回结果行。使用强类型主要的目的是当我们把后端数据库SQL批次操作搬到内存里转变成数据流式按行操作时能更方便、准确、...

2176
来自专栏PPV课数据科学社区

【学习】七天搞定SAS(六):宏的编写、程序调错

在SAS各种繁杂的PROC之后,还要来看看MACRO才可以嘛。又不能写函数... SAS中的MACRO:宏编写 MACRO主要是DO和%LET的各种组合,前者负...

3746
来自专栏Java与Android技术栈

Scrypt 不止是加密算法,也是莱特币的挖矿算法

Scrypt不仅计算所需时间长,而且占用的内存也多,使得并行计算多个摘要异常困难,因此利用rainbow table进行暴力攻击更加困难。Scrypt 没有在生...

1394
来自专栏跟着阿笨一起玩NET

ZPL打印中文信息

  相信各位在实际的项目中,需要开发打条码模块的也会有不少,很多同行肯定也一直觉得斑马打印机很不错,但是ZPL打印中文字符很麻烦。如果购买字体卡,或者通过COD...

4381
来自专栏码匠的流水账

聊聊pg jdbc的queryTimeout及next方法

本文主要介绍一下pg jdbc statement的queryTimeout及resultSet的next方法

3371
来自专栏木宛城主

Thinking In Design Pattern——Query Object模式

什么是Query Object模式 Query Object的架构设计 Query Object在服务层的应用 测试 Query Obj...

2196

扫码关注云+社区

领取腾讯云代金券