一、引言
前两章中我们完成了日志时间、日志打开文件、日志基本信息、日志输出格式、日志落地方向。那我们今天要谈的就是异步日志的输出、全局日志器和局部日志器的制造。
二、 异步缓冲区
异步日志器当然需要缓冲区,异步日志器是的效率不能和同步日志器相比的,因为他们涉及到消费者和生产者冲突,无法像同步日志器那样一直输出到指定文件。
在异步日志器中我们需要了解到它的读位置和写位置。保存他们读位置和写位置,进行读写、刷新、重置读写位置、swap交换以便从任务队列中取出任务、判断缓冲区是否为空,再进行接下来的操作。是否去读、是否去写。至于是否去扩容取决于实现者自己对安全的要求。
#include "Until.hpp"
#include <vector>
#ifndef __BUFFER_HPP__
#define __BUFFER_HPP__
#include <vector>
#include <algorithm>
namespace Logs
{
#define DEFAULT_BUFFER_SIZE (10 * 1024 * 1024)
// 扩容设置一个阈值,到达阈值之前可以二倍扩容。
// 到达后就线性扩容。
#define THRESHOLD_BUFFER_SIZE (80 * 1024 * 1024)
#define INCREATE_BUFFER_SIZE (10 * 1024 * 1024)
class Buffer
{
public:
Buffer()
:_buffer(DEFAULT_BUFFER_SIZE)
,_write_idx(0)
,_reader_idx(0)
{}
// 可读数据的起始位置。
const char* begin()
{
return &_buffer[_reader_idx];
}
// 可以读多少数据。
size_t readAbleLen()
{
return _write_idx - _reader_idx;
}
// 还可以写入多少数据。
size_t writeAbleLen()
{
return (_buffer.size() - _write_idx);
}
// 想缓冲区写数据。
void push(const char* str,size_t len)
{
// 不扩容的。
// if((len + _write_idx) >= writeAbleLen())
// {
// return ;
// }
// 扩容
if((len + _write_idx) >= writeAbleLen())
{
enhanceBuffer((len + _write_idx));
}
// 取出地址。
std::copy(str,str + len,&_buffer[_write_idx]);
// for(int i = 0;i < len;++i)
// {
// _buffer[_write_idx] = str[i];
// moveWrite(1);
// }
moveWrite(len);
}
// 将指针移到后面
void moveRead(size_t len)
{
_reader_idx += len;
}
// 重置读写指针,初始化缓冲区。
void reset()
{
_reader_idx = 0;
_write_idx = 0;
}
// 对Buffer做交换。
void swap(Buffer& buffer)
{
_buffer.swap(buffer._buffer);
std::swap(_reader_idx,buffer._reader_idx);
std::swap(_write_idx,buffer._write_idx);
}
// 判断缓冲区是否为空。
bool empty()
{
return _write_idx == _reader_idx;
}
private:
void enhanceBuffer(size_t len)
{
// 日志不可能会一下子就大到那种地步,没人会干。
// 到达指定容量后扩容。
if (len <= writeAbleLen()) return;
/*每次增大1M大小*/
size_t new_capacity;
if (_buffer.size() < THRESHOLD_BUFFER_SIZE)
new_capacity = _buffer.size() * 2 + len;
else
new_capacity = _buffer.size() + INCREATE_BUFFER_SIZE + len;
_buffer.resize(new_capacity);
// if(len < THRESHOLD_BUFFER_SIZE)
// {
// _buffer.reserve(INCREATE_BUFFER_SIZE * 2);
// }
// else if((len + _write_idx) < DEFAULT_BUFFER_SIZE)
// {
// _buffer.reserve(2 * _buffer.size());
// }
// else
// {
// _buffer.reserve(_buffer.size());
// }
}
void moveWrite(size_t len)
{
_write_idx += len;
}
private:
std::vector<char> _buffer;
size_t _reader_idx;// 可读取数据的指针 —————— 下标
size_t _write_idx;// 可写入数据的指针。
};
}
#endif三、 异步日志器的输出
还记得我们之前提到的同步日志器就是一个单线程就是一条代码从上往下执行。如果一个函数执行的所消耗的时间非常长,那非常浪费时间。所以我们需要使用多线程的思想,独立一个线程出去执行日志的落地,尽量减少主线程的阻塞。
这里还用到一个我们在多线程中使用的模型,生产者消费者模型(这个小编一定尽早赶出来)。
简单介绍一下生产者消费者模型。两个锁一个专门锁生产者,另一个锁就是专门锁消费者(简单来说就是将线程函数锁上,不让其他的线程进入该线程函数。)。生产者负责生产任务,消费者负责执行任务。他们之间肯定有矛盾的,如果没有任务那意味生产者必须等待。那如果任务满了,那意味着消费者不用这么卷。那我们需要一个条件变量控制锁,在不符合条件时用条件变量锁住该线程直到条件符合为止。在条件满足时唤醒所有消费者或生产者线程。
锁的本质:将线程函数锁上,不让其他的线程进入该线程函数。
任务自然是函数,函数如果不符合我们定义的function包装器、函数指针。那我们就用绑定器绑定/固定参数。
两种需要调整的极端情况:
1.没有任务时,意味着消费者线程必须停止,让生产者拥有更多的资源生产。
2.任务满了时。意味着生产者线程必须停止,让消费者拥有更多的时间执行任务。
我们用一个队列(数组)装载任务,需要执行任务时消费者线程直接去取。
另外我们还需要一个线程中运行的标志,这样我们就可以用主线程去控制这些多线程。为了更加高效化,我们先将要输出的数据放在缓冲区里。毕竟消费者和生产者之间存在冲突(矛盾),需要时间去解决,用缓冲区减少一个向文件中输出的次数,增大了效率。
#ifndef __LOOPER_HPP__
#define __LOOPER_HPP__
#include <thread>
#include <mutex>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <functional>
#include "Buffer.hpp"
namespace Logs
{
enum class AsyncType
{
ASYNC_SAFE, // 安全状态,表示缓冲区满了则阻塞,避免资源耗尽的风险。
ASYNC_UNSAFE // 不安全状态,表示缓冲区满了就扩容,无线扩容,常用于测试。
};
class AsyncLooper
{
public:
using Functor = std::function<void (Buffer&)>;
using Ptr = std::shared_ptr<AsyncLooper>;
AsyncLooper(Functor cb,AsyncType type = AsyncType::ASYNC_SAFE)
:_callback(cb)
,_thread(&AsyncLooper::threadEntry,this)
,_stop(false)
,_looper_type(type)
{}
~AsyncLooper()
{
// 停止执行线程函数。
stop();
}
void push(const char* str,size_t len)
{
// // 1.安全的————不扩容。
// if(_type == AsyncType::ASYNC_SAFE)
// {
// ;
// }
// // 2.不安全————扩容
// if(_type == AsyncType::ASYNC_UNSAFE)
// {
// ;
// }
// 上锁
std::unique_lock<std::mutex> lock(_mutex);
// 安全的才可能会阻塞。
// 避免扩容。
if(_stop == true)
{
return ;
}
if(_looper_type == AsyncType::ASYNC_SAFE)
_cond_con.wait(lock,[&](){ return _pro_buf.writeAbleLen() >= len; });
// 走到下面这一步,说明满足条件,可以添加数据
_pro_buf.push(str,len);
// 唤醒消费者进行处理。
_cond_pro.notify_all();
}
void stop()
{
_stop = true;
// 所有生产者写入线程都停止。
// 唤醒所有工作线程。
_cond_con.notify_all();
// 由进程来回收。
_thread.join();
}
private:
Functor _callback;// 处理缓冲区的函数。其实就是落地到哪个部分。
private:
void threadEntry()
{
while(1)
{
{
std::unique_lock<std::mutex> lock(_mutex);
// 为空则阻塞。不为空就交换。
if(_stop && _pro_buf.empty()) return ;
// 条件二是阻塞的条件,当日志正在运行时需要处理的缓冲区为空,。
// 如果日志不在运行没必要再等待了,直接将缓冲区的内容放进去就行了。
_cond_pro.wait(lock,[&](){ return !_pro_buf.empty() || _stop; });
_con_buf.swap(_pro_buf);
_cond_con.notify_all();
}
_callback(_con_buf);
_con_buf.reset();
}
}
private:
std::atomic<bool> _stop;
AsyncType _looper_type;
Buffer _con_buf; // 消费者缓冲区。
Buffer _pro_buf; // 生产者缓冲区。
std::mutex _mutex;
std::thread _thread;
std::condition_variable _cond_con; // 生产者条件变量。
std::condition_variable _cond_pro; // 消费者条件变量。
};
}
#endif