锁是高性能程序的杀手,但是为了保证数据的一致性,在多线程的应用环境下又不得不加锁。但是在某些特殊的场景下, 是可以通过优化数据结构来达到无锁的目的。那么我们就来看一下如何实现一个无锁队列。
队列:众所周知,就是先进先出。 出队列的时候从队列头取出一个结点;入队列的时候,将结点添加到队列尾部。当多线程同时操作一个队列读写时,显然就需要加锁。但是在单读单写的这种多线程应用时,是可以做到无锁的。直接上代码
#ifndef _QUEUE_H_
#define _QUEUE_H_
template<class T>
class node
{
public:
T* pdata;
node* next;
};
template<class T>
class queue
{
private:
node<T> *front;
node<T> *tear;
public:
queue();
~queue();
bool isempty();
T* pop();
void push(T *p);
};
template<class T>
queue<T>::queue()
{
node<T> *pnode = new node<T>();
front = pnode;
tear = pnode;
}
template<class T>
queue<T>::~queue()
{
node<T> *cur = front;
node<T> *next = NULL;
while(cur)
{
next = cur->next;
delete cur->pdata;
delete cur;
cur = next;
}
}
template<class T>
bool queue<T>::isempty()
{
if(front == tear)
return true;
return false;
}
template<class T>
void queue<T>::push(T *p)
{
node<T> *pnode = new node<T>();
tear->pdata = p;
tear->next = pnode;
tear = pnode;
}
template<class T>
T* queue<T>::pop()
{
if(isempty())
return NULL;
node<T>* pnode = front;
T* p = pnode->pdata;
front = front->next;
delete pnode;
return p;
}
#endif
原理其实很简单,就是在队列的末尾添加一个空节点。这样在队列非空的情况下,front结点与tear结点中间至少间隔一个结点。那么当两个线程同时插入与取出结点时,就不会存在同时操作front与tear的情况,从而保证不会出现race condition
下面是测试代码:
#include <stdio.h>
#include <pthread.h>
#include <sys/time.h>
#include "queue.h"
#define MAX 100000
void* fun1(void *p)
{
queue<int> *q = (queue<int>*)p;
int i = 0;
while(1)
{
int *tmp = q->pop();
if(tmp)
{
if(*tmp != i++)
printf("err\n");
delete tmp;
}
if(i == MAX)
break;
}
}
void* fun2(void *p)
{
queue<int> *q = (queue<int>*)p;
int i = 0;
while(i < MAX)
{
int *tmp = new int(i++);
q->push(tmp);
}
}
int main()
{
queue<int> q;
struct timeval tv1;
struct timeval tv2;
pthread_t t1,t2;
gettimeofday(&tv1,NULL);
pthread_create(&t1,NULL ,fun1 ,&q);
pthread_create(&t2,NULL, fun2 ,&q );
pthread_join(t1,NULL);
pthread_join(t2,NULL);
gettimeofday(&tv2,NULL);
long delta = tv2.tv_sec*1000000+tv2.tv_usec - ( tv1.tv_sec*1000000+tv1.tv_usec) ;
printf("time : %lu\n",delta);
return 0;
}
在我的机器上,测试结果为327730us
下面再给出加锁的版本,并使用相同的测试方法,进行对比
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <pthread.h>
template<class T>
class node
{
public:
T* pdata;
node* next;
node(T* p1, node* p2):pdata(p1),next(p2){}
};
template<class T>
class queue
{
private:
node<T> *front;
node<T> *tear;
pthread_mutex_t mutex;
public:
queue();
~queue();
bool isempty();
T* pop();
void push(T *p);
};
template<class T>
queue<T>::queue()
{
front = NULL;
tear = NULL;
pthread_mutex_init(&mutex,NULL);
}
template<class T>
queue<T>::~queue()
{
node<T> *cur = front;
node<T> *next = NULL;
while(cur)
{
next = cur->next;
delete cur->pdata;
delete cur;
cur = next;
}
}
template<class T>
void queue<T>::push(T *p)
{
node<T> *pnode = new node<T>(p,NULL);
pthread_mutex_lock(&mutex);
if(front == NULL)
{
front = pnode;
tear = pnode;
}
else
{
tear->next = pnode;
tear = pnode;
}
pthread_mutex_unlock(&mutex);
}
template<class T>
T* queue<T>::pop()
{
T * pdata = NULL;
node<T> *pnode = NULL;
pthread_mutex_lock(&mutex);
if(front == NULL)
{
pthread_mutex_unlock(&mutex);
return NULL;
}
pnode = front;
if(front == tear)
tear = NULL;
front = front->next;
pthread_mutex_unlock(&mutex);
pdata = pnode->pdata;
delete pnode;
return pdata;
}
#endif
加锁的版本,测试结果为497987us。
可见,加锁版本所耗时间,差不多为无锁版本的1.5倍以上。