前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >bufio包系列之一个误用bufio读取的示例

bufio包系列之一个误用bufio读取的示例

作者头像
Go学堂
发布2023-01-31 16:11:00
2160
发布2023-01-31 16:11:00
举报
文章被收录于专栏:Go工具箱Go工具箱

本篇是继图解bufio包读取原理写入原理之后的第三篇实战篇。本想着借用medium上一篇使用bufio的读取操作在25秒内处理完16G文件的具体应用来结束本系列文章的。但仔细阅读了代码后,发现对bufio.Reader的使用是错误的。究其原因猜测是其对bufio读取的内部实现机制并不了解造成的。所以作为一个反面示例来进行讲解。

下面我们来看看其具体的实现机制,是如何将16G的日志文件在25秒内读完的。

如上图所示,实现思路是这样的:利用bufio.Reader每次从文件中读取250KB大小的数据,然后将读取到的数据分配一个协程进行处理,每个协程将字节数组转换成字符串后,并按"\n"分隔成多行,然后再按100行一组分配给一个协程进行处理。

具体代码如下:

代码语言:javascript
复制
var linesPool = sync.Pool{
  New: func() interface{} {
    lines := make([]byte, 250*1024)
    return lines
  },
}

var stringPool = sync.Pool{
  New: func() interface{} {
    lines := ""
    return lines
  },
}

func main() {
  s := time.Now()
  flag.Parse()
  if *filename == "" {
    panic("please input a filename")
  }
  file, err := os.Open(*filename)

  if err != nil {
    fmt.Println("cannot able to read the file", err)
    return
  }
  defer file.Close() //close after checking err

  Process(file)
  fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File) error {
  r := bufio.NewReader(f)

  var wg sync.WaitGroup
  for {
    buf := linesPool.Get().([]byte)
    n, err := r.Read(buf)
    buf = buf[:n]

    if n == 0 {
      // 这里做了简化,直接跳出循环
      break
    }
    // 为了确保buf中都是整行整行的读取
    nextUntillNewline, err := r.ReadBytes('\n')

    if err != io.EOF {
      buf = append(buf, nextUntillNewline...)
    }

    wg.Add(1)
    go func() {
      ProcessChunk(buf)
      wg.Done()
    }()
  }

  wg.Wait()
  return nil
}

func ProcessChunk(chunk []byte) {
  var wg2 sync.WaitGroup

  logs := stringPool.Get().(string)
  logs = string(chunk)
  linesPool.Put(chunk)

  logsSlice := strings.Split(logs, "\n")

  stringPool.Put(logs)

  // 每300行启用一个协程进行处理
  chunkSize := 300
  // 读取到的行数
  lines := len(logsSlice)
  // 总共需要的协程数
  numberOfThreads := lines / chunkSize

  //如果还有余数,则多加1个协程
  if lines%chunkSize != 0 {
    numberOfThreads++
  }

  for i := 0; i < (numberOfThreads); i++ {
    wg2.Add(1)
    // startLine是从第几行开始处理
    // endLine是结束的行
    startLine := i * chunkSize
    endLine := int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice))))
    go func(startLine int, endLine int) {
      defer wg2.Done() //to avaoid deadlocks
      for i := startLine; i < endLine; i++ {
        text := logsSlice[i]
        if len(text) == 0 {
          continue
        }
        //进行文本的处理分析
        fmt.Println(text)
      }
    }(startLine, endLine)
  }

  wg2.Wait()
  logsSlice = nil
}

在上面的代码中,我们看在第34行使用bufio.NewReader对缓冲区进行了初始化,该函数初始化时其缓冲区的大小是默认值,即4096字节,也就是4KB。在第39行使用Read函数进行了读取操作,期望读取的字节切片是从第3行的sync.Pool中获取的,大小是250*1024,即250KB。在读取原理篇我们讲到过当期望读取的字节大小大于缓冲区大小,并且缓冲区为空时,那么就会直接从文件中读取,而不经过缓冲区。如下图所示。所以,这里并没有减少系统调用的次数。

所以,大家在使用golang相关的包时,深入了解一下其内部的实现机制 还是很有必要的。

参考资料:

https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2


欢迎关注「Go学堂」,让知识活起来

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-06-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Go学堂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档