前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >influxdb 简介与实现(一)

influxdb 简介与实现(一)

原创
作者头像
Timu
发布2018-07-17 18:58:24
1.4K0
发布2018-07-17 18:58:24
举报
文章被收录于专栏:GolangGolang

InfluxDB是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及执行分析。

特点:

轻量,部署方便,基于go 无依赖

概念:

这里使用mysql与之对比

代码语言:javascript
复制
mysql : database             table	         row	 
influx: database retention measurement  series  point

database : 数据库名

measurement:  数据表

retention: 存储策略,用于设置数据保存的时间

series: 数据集合,tag相同的数据的集合,可以理解为折线图上的一条线,(当然折线图上可以有很多条线)

point:单条数据,由3部分组成 time,tag[属性],field[值],可以理解为折线图上的一个点。

CURD:

增:

代码语言:javascript
复制
无需建表,直接插入指定表,插入语句
insert <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [timestamp]

删:

代码语言:javascript
复制
支持删库,删表,使用存储策略删除数据,删除数据使用数据标记删除

改:

代码语言:javascript
复制
不支持修改数据

查:

代码语言:javascript
复制
select * from mytable
支持的函数,列举:
COUNT 返回唯一字段值的列表
DISTINCT 返回唯一字段值的列表
MEAN 返回字段值的算术平均值
MEDIAN 从排序的字段值列表中返回中间值
MODE 返回字段值列表中最常用的值
SPREAD 返回最小和最大字段值之间的差异
SUM 返回字段值的总和
FIRST 返回具有最早时间戳的字段值
LAST 返回具有最新时间戳的字段值
MAX 返回最大的字段值
MIN 返回最小的字段值

 储存引擎:

TSM 基于LSM演变

物理上,一个measurement 由多个shard(数据分片)组成,(此处shard不是一个物理概念,可以理解为一个盒子)

一个shard由cache,wal,tsm file,compactor 组成。

代码语言:javascript
复制
cache 数据的内存缓存
wal 内存缓存的文件备份
tsm file 数据文件
compactor 合并与压缩 默认1秒执行一次

shard:

influxdb中,会按照数据的时间戳所在范围,去创建shard,这样做的目的是为了根据时间取查询时,可以快速定位到指定的物理文件,并且删除数据的时候,可以直接删除指定块文件。

代码语言:javascript
复制
//在创建db时,会指定db的存储策略,此函数根据不同的存储策略,得到每个shard存储的数据时间段。
func shardGroupDuration(d time.Duration) time.Duration {
    if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
        return 7 * 24 * time.Hour
    } else if d >= 2*24*time.Hour { // 2 days
        return 1 * 24 * time.Hour
    }
    return 1 * time.Hour
}

cache:

influxdb在接收到一条数据时,会将数据缓存在内存中,每一个这样的cache模块对应有一个wal物理文件,在写入缓存的同时,也会将数据落地到文件。保证掉电时不丢失数据。

cache不是无限增长的,参数maxSzie 用于控制当cache中的内存占用量超过此数值,就会将内存中的数据进行一次快照,之后清空cache,并且删除wal文件,然后将快照中的数据进行时间排序,写入到tsm文件中。对应的struct结构:

代码语言:javascript
复制
type Cache struct {
    commit  sync.Mutex
    mu      sync.RWMutex
    store   map[string]*entry   // 存储的数据内容
    size    uint64              // 当前使用内存的大小
    maxSize uint64              // 缓存最大值
    // snapshots are the cache objects that are currently being written to tsm files
    // they're kept in memory while flushing so they can be queried along with the cache.
    // they are read only and should never be modified
    // 快照,用于写入 tsm 文件,只读
    snapshot     *Cache
    snapshotSize uint64
    snapshotting bool
    // This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
    snapshotAttempts int
    stats        *CacheStatistics
    lastSnapshot time.Time
}

 实际写入数据部分:

代码语言:javascript
复制
func (e *Engine) WritePoints(points []models.Point) error {
	values := make(map[string][]Value, len(points))
	var (
		keyBuf    []byte
		baseLen   int
		seriesErr error
	)
        //忽略一些代码
        ......
	e.mu.RLock()
	defer e.mu.RUnlock()

	// first try to write to the cache
	if err := e.Cache.WriteMulti(values); err != nil {
		return err
	}
        //此处可以看到,可以设置WALEncbled 来关闭wal文件的写入,提高效率,但会存在掉电丢数据的风险
	if e.WALEnabled {
		if _, err := e.WAL.WriteMulti(values); err != nil {
			return err
		}
	}
	return seriesErr
}

WAL:

从上面的代码可以看出wal是cache的物理备份,作用就是数据持久化,防止掉电丢数据。由于数据是被顺序插入到wal文件的,所以这里的写入效率非常高。但是如果插入的数据的时间戳是乱序的,会导致数据被分配到不同的shard,从而会将数据插入到多个wal文件中,会有影响性能。

此外,也可以关闭wal写入,达到更高的写入能力。我们来看一下wal的写入的关键代码部分:

代码语言:javascript
复制
func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {
	entry := &WriteWALEntry{
		Values: values,
	}

	id, err := l.writeToLog(entry)
	if err != nil {
		atomic.AddInt64(&l.stats.WriteErr, 1)
		return -1, err
	}
	atomic.AddInt64(&l.stats.WriteOK, 1)

	return id, nil
}
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
	// limit how many concurrent encodings can be in flight.  Since we can only
	// write one at a time to disk, a slow disk can cause the allocations below
	// to increase quickly.  If we're backed up, wait until others have completed.
	bytes := bytesPool.Get(entry.MarshalSize())

	b, err := entry.Encode(bytes)
	if err != nil {
		bytesPool.Put(bytes)
		return -1, err
	}

	encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))

	compressed := snappy.Encode(encBuf, b)
	bytesPool.Put(bytes)

	syncErr := make(chan error)

	segID, err := func() (int, error) {
		l.mu.Lock()
		defer l.mu.Unlock()

		// Make sure the log has not been closed
		select {
		case <-l.closing:
			return -1, ErrWALClosed
		default:
		}

		// roll the segment file if needed
		if err := l.rollSegment(); err != nil {
			return -1, fmt.Errorf("error rolling WAL segment: %v", err)
		}

		// write and sync
		if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
			return -1, fmt.Errorf("error writing WAL entry: %v", err)
		}

		select {
		case l.syncWaiters <- syncErr:
		default:
			return -1, fmt.Errorf("error syncing wal")
		}
                //刷数据到磁盘 调用bufio的Flush。详细见后文,落盘实现
		l.scheduleSync()

		// Update stats for current segment size
		atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))

		l.lastWriteTime = time.Now().UTC()

		return l.currentSegmentID, nil

	}()

	bytesPool.Put(encBuf)

	if err != nil {
		return segID, err
	}

	// schedule an fsync and wait for it to complete
	return segID, <-syncErr
}

 此处可以看到实际的文件落地方式是一个bufio类型。

l.currentSegmentWriter.Writes实现:

代码语言:javascript
复制
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
	return &WALSegmentWriter{
		bw: bufio.NewWriterSize(w, 16*1024),
		w:  w,
	}
}

// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
	var buf [5]byte
	buf[0] = byte(entryType)
	binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))

	if _, err := w.bw.Write(buf[:]); err != nil {
		return err
	}

	if _, err := w.bw.Write(compressed); err != nil {
		return err
	}

	w.size += len(buf) + len(compressed)

	return nil
}
// Sync flushes the file systems in-memory copy of recently written data to disk,
// if w is writing to an os.File.
func (w *WALSegmentWriter) sync() error {
	if err := w.bw.Flush(); err != nil {
		return err
	}

	if f, ok := w.w.(*os.File); ok {
		return f.Sync()
	}
	return nil
}

 落盘实现:

代码语言:javascript
复制
// scheduleSync will schedule an fsync to the current wal segment and notify any
// waiting gorutines.  If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) scheduleSync() {
	// If we're not the first to sync, then another goroutine is fsyncing the wal for us.
	if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
		return
	}

	// Fsync the wal and notify all pending waiters
	go func() {
		var timerCh <-chan time.Time

		// time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed
		// channel which will always be ready to read from.
		if l.syncDelay == 0 {
			// Create a RW chan and close it
			timerChrw := make(chan time.Time)
			close(timerChrw)
			// Convert it to a read-only
			timerCh = timerChrw
		} else {
			t := time.NewTicker(l.syncDelay)
			defer t.Stop()
			timerCh = t.C
		}
		for {
			select {
			case <-timerCh:
				l.mu.Lock()
				if len(l.syncWaiters) == 0 {
					atomic.StoreUint64(&l.syncCount, 0)
					l.mu.Unlock()
					return
				}

				l.sync()
				l.mu.Unlock()
			case <-l.closing:
				atomic.StoreUint64(&l.syncCount, 0)
				return
			}
		}
	}()
}

// sync fsyncs the current wal segments and notifies any waiters.  Callers must ensure
// a write lock on the WAL is obtained before calling sync.
func (l *WAL) sync() {
	err := l.currentSegmentWriter.sync()
	for len(l.syncWaiters) > 0 {
		errC := <-l.syncWaiters
		errC <- err
	}
}

TSM File:

单个tsm file,最大大小为2g,用于存放数据, 上文中对内存数据刷到磁盘就会生成一个tsm文件,然后合并进程,慢慢将这些tsm文件合并到一个大的tsm文件

Compactor:

每秒执行一次,主要是两个任务:

1.cache是否需要落地磁盘,需要就将cache落地到磁盘,生成一个tsm文件。

2.tsm文件是否需要合并, 需要的话就将多个tsm文件合并成一个大文件。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档