前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >6.S081/6.828: 8 Lab locks

6.S081/6.828: 8 Lab locks

原创
作者头像
冰寒火
修改2022-11-26 04:57:31
4530
修改2022-11-26 04:57:31
举报
文章被收录于专栏:软件设计软件设计

多核机器上并行能力差的常见原因就是锁争用问题,提高并行能力需要修改数据结构和加锁策略。本实验是为了提高内存分配器和block buffer的并行能力,设计思想是分段加锁。

前置知识
前置知识

一、Memory allocator

1 目标

重新设计物理内存分配器,现在是单一内存页链表(freelist)被所有CPU共享,需要设计成每个CPU一个freelist来减少锁争用。但是存在某个CPU的freelist为空,但是其他CPU的freelist不为空,每个CPU的freelist不均匀情况,所以需要增加窃取逻辑,从其他CPU窃取一半空闲内存页。

2 设计

  1. 将原先的单一freelist分成每个CPU一个freelist,都有单独的锁kmem_(id)保护,另外需要修改freerange,确保其能够获取所有free memory。
  2. cpuid()函数能够获取CPU核编号,但调用这个函数以及使用它时应该先关中断,push_off、pop_off中断开闭函数。
  3. CPU之间窃取空闲内存页。

3 代码实现

3.1 分段锁

在原来的基础创建NCPU个kmem,每个CPU各有一个私有freelist。

代码语言:c
复制
struct {
  struct spinlock lock;
  struct run *freelist;
} kmem[NCPU];


void
kinit()
{
  for (int i = 0; i < NCPU; i++){
    char buf[10];
    snprintf(buf,10,"kmem_%d",i);
    initlock(&kmem[i].lock,buf);
  }
  
  freerange(end, (void*)PHYSTOP);
}

//均衡分配物理页,此时处于初始化阶段,由0-CPU执行,不需要考虑争用问题
void
freerange(void *pa_start, void *pa_end)
{
  char *p;
  int id=0, total=0;
  struct run *r;
  p = (char*)PGROUNDUP((uint64)pa_start);
  //均匀分配物理页
  for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE){
    r=(struct run*)p;
    r->next = kmem[id].freelist;
    kmem[id].freelist = r;
    id=(id+1)%NCPU;
    total++;
  }
}

3.2 分配

在申请内存时需要通过cpuid()获取CPU核编号hartid,这个函数是没有锁保护的,所以获取cpuid时需要关中断,避免被切换出去而破坏了临界区。如果该CPU的freelist为空,则释放锁,然后遍历其他CPU的freelist来steal物理页,每次只获取一个lock,不会出现互相等待的情况。

代码语言:c
复制
// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
  struct run *r;
  push_off(); //关中断,避免被切换出去,破坏临界区
  int id=cpuid();
  acquire(&kmem[id].lock);

  r = kmem[id].freelist;
  if(r)
    kmem[id].freelist = r->next;
  
  release(&kmem[id].lock);
  
  if(!r){
    //steal from others
    for (int i = 0; i < NCPU; i++){
      if(i!=id){
        acquire(&kmem[i].lock);
        r = kmem[i].freelist;
        if(r){
          kmem[i].freelist = r->next;
          release(&kmem[i].lock);
          break;
        }
        release(&kmem[i].lock);
      }
    }
  }
  pop_off(); //在此之前不能够被切换出去

  if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk

  return (void*)r;
}

3.3 释放

释放逻辑比较简单,直接放回该CPU的freelist即可

代码语言:c
复制
void
kfree(void *pa)
{
  struct run *r;
  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);

  r = (struct run*)pa;
  push_off(); //关中断,避免破坏临界区
  int id=cpuid();
  acquire(&kmem[id].lock);

  r->next = kmem[id].freelist;
  kmem[id].freelist = r;

  release(&kmem[id].lock);
  pop_off();
}

4 测试结果

测试结果
测试结果

二、Buffer cache

1 目标

内核为磁盘维护了block cache来加速读写,进程读写文件时就需要线程安全的操作block cache,本实验是为了重新设计block cache来减少锁争用问题。bcache采用一个bcache.lock保护,需要改成分段锁来支持并行读写。

2 设计

  1. 分存分配器可以根据CPU来分段,但bcache是CPU共享的,不能采用CPU分段,而是维护一个哈希表,以blockno分段,根据block number分段加锁(bcache_(blockno))。
  2. 缓存必须要考虑淘汰算法,这里使用LRU,根据ticks来决定置换哪页。

3 代码实现

3.1 分段锁

原先bcache是维护一条双链表且全局锁,并发能力差,因此对blockno哈希分桶,来减少争用提高并行能力。

buf的blockno、refcnt、prev、next等字段是通过bcache.lockshash(blockno)来保护,这样才能移动buf,buf.lock保护的是data。

代码语言:c
复制
struct {
  struct spinlock evictlock; //置换
  struct spinlock locks[NBUCKET];
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // Sorted by how recently the buffer was used.
  // head.next is most recent, head.prev is least.
  // struct buf head; //dummy node
  struct buf heads[NBUCKET];
} bcache;

struct buf {
  //受bcache.locks[hash(b.blockno)]保护
  int disk;    // does disk "own" buf?
  uint dev;
  uint blockno;
  uint refcnt;
  struct buf *prev; // LRU cache list
  struct buf *next;
  int valid;   // has data been read from disk?

  struct sleeplock lock; //只能保护data、valid、lastaccesstick
  uint64 lastaccesstick;
  uchar data[BSIZE];
};


int hash(uint blockno){
  return blockno%NBUCKET;
}
void
binit(void)
{
  struct buf *b;
  initlock(&bcache.evictlock,"bcache_evict");
  for (int i = 0; i < NBUCKET; i++){
    char buf[10];
    snprintf(buf,10,"bcache_%d",i);
    initlock(&bcache.locks[i],buf);
    bcache.heads[i].prev = &bcache.heads[i];
    bcache.heads[i].next = &bcache.heads[i];
  }
  //初始化block并添加到0-桶
  for(b = bcache.buf; b < bcache.buf+NBUF; b++){
    b->lastaccesstick=ticks;
    int id=hash(b->blockno);
    b->next=bcache.heads[id].next;
    b->prev=&bcache.heads[id];

    initsleeplock(&b->lock, "buffer");
    bcache.heads[id].next->prev=b;
    bcache.heads[id].next=b;
    // printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
  }
}

3.2 bread

bread函数返回指定blockno的已加锁的buf,获取流程如下:

  1. 对对应桶加锁遍历,如果命中就将引用+1,并释放锁,然后对命中的buf加锁再返回;
  2. 如果没命中也需要释放对应桶的锁,然后获取全局锁bcache.evictlock,然后再遍历一遍该桶判断是否可以命中,如果可以那就和1类似;
  3. 如果没命中,则需要遍历其他桶并加锁,选择出lastaccesstick最小且refcnt=0的buf,记住所有lock都要有acquire和release,不能少一个。

为什么要第二次遍历该桶?

答:因为第一次遍历时如果没命中也是要释放锁的,有可能有多个访问同一个block的进程同时经过第一次遍历,都没有命中,那么只能有一个进程真正能够执行置换逻辑,其他的进程第二次遍历时就能够命中。

既然对每个桶都要加锁,那为什么要加全局锁?

答:有多个访问同一个block的进程,就必须加全局锁才能够保证只有一个进程经过第二次遍历后继续往下

执行置换逻辑。其他进程被拦在全局锁位置,等第一个进程置换成功后,其他进程在第二次遍历时就能够命中。

对两个桶加锁策略:

  1. 每次只对一个桶加锁,不要同时对两个桶加锁,破坏请求与保持条件;
  2. 可以对两个桶加锁,但是加锁顺序要一样,破坏环路等待条件。
代码语言:c
复制
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;
  int id=hash(blockno);
  
  acquire(&bcache.locks[id]);

  // Is the block already cached?
  for(b = bcache.heads[id].next; b != &bcache.heads[id]; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(&bcache.locks[id]);
      //持有锁期间可能正在io,所以用sleeplock
      acquiresleep(&b->lock);
      // printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
      return b;
    }
  }
  release(&bcache.locks[id]);

  // Not cached.
  // Recycle the least recently used (LRU) unused buffer.
  //寻找一个buffer用于置换,只有加了该锁才能改变各bucket的buffer数量

  //可能有多个访问同一block的进程到达此处,加全局锁,确保只有一个置换成功,其他的重新遍历时会命中
  acquire(&bcache.evictlock);

  acquire(&bcache.locks[id]);
  // Is the block already cached?
  for(b = bcache.heads[id].next; b != &bcache.heads[id]; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(&bcache.locks[id]);
      release(&bcache.evictlock);
      //持有锁期间可能正在io,所以用sleeplock
      acquiresleep(&b->lock);
      // printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
      return b;
    }
  }
  release(&bcache.locks[id]);

  uint lastaccesstick=~0;
  int index=-1;
  struct buf *selectedbuf=0;
  for(int i=0;i< NBUCKET ;i++){
    int find=0;
    acquire(&bcache.locks[i]);
    for(b = bcache.heads[i].prev; b != &bcache.heads[i]; b = b->prev){
      if(b->refcnt>0){
        continue;
      }
      if(b->lastaccesstick<lastaccesstick) {
        //被选中的block的桶锁不释放,没选中的就释放掉
        //会同时加两个桶锁,但是加锁顺序一样,破坏了环路等待条件
        if(index!=-1 && index!=i){
          release(&bcache.locks[index]);
        }
        lastaccesstick=b->lastaccesstick;
        index=i;
        selectedbuf=b;
        find=1;
      }
    }
    //没找到就释放锁
    if(!find){
      release(&bcache.locks[i]);
    }
  }
  if(selectedbuf==0){
    panic("bget: no buffers");
  }

  selectedbuf->dev = dev;
  selectedbuf->blockno = blockno;
  selectedbuf->valid = 0;
  selectedbuf->refcnt = 1;
  //移除选中的buffer
  if(index!=id){
    selectedbuf->prev->next=selectedbuf->next;
    selectedbuf->next->prev=selectedbuf->prev;
  }
  release(&bcache.locks[index]);

  //添加选中的buffer到目标桶
  if(index!=id){
    acquire(&bcache.locks[id]);

    selectedbuf->next=bcache.heads[id].next;
    selectedbuf->prev=&bcache.heads[id];
    selectedbuf->next->prev=selectedbuf;
    bcache.heads[id].next=selectedbuf;

    release(&bcache.locks[id]);
  }
  

  release(&bcache.evictlock);

  acquiresleep(&selectedbuf->lock);
  return selectedbuf;
}

// Return a locked buf with the contents of the indicated block.
//返回一个加锁的buffer
struct buf*
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  //如果这个block不在cache中,则从磁盘上加载
  if(!b->valid) {
    virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  // printf("read, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
  return b;
}

3.3 brelse

代码语言:c
复制
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
  if(!holdingsleep(&b->lock))
    panic("brelse");

  releasesleep(&b->lock);
  //为啥id不在持有锁时获取呢?都可以,反正该buf的引用大于0,不会被置换出去
  int id=hash(b->blockno);
  acquire(&bcache.locks[id]);
  b->refcnt--;
  if (b->refcnt == 0) {
    // no one is waiting for it.
    b->lastaccesstick=ticks;
  }
  
  release(&bcache.locks[id]);
}

4 测试结果

测试结果
测试结果

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Memory allocator
    • 1 目标
      • 2 设计
        • 3 代码实现
          • 3.1 分段锁
          • 3.2 分配
          • 3.3 释放
        • 4 测试结果
        • 二、Buffer cache
          • 1 目标
            • 2 设计
              • 3 代码实现
                • 3.1 分段锁
                • 3.2 bread
                • 3.3 brelse
              • 4 测试结果
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档