muduo源码剖析——以三个切片浅析muduo库代码设计的严谨性、高效性与灵活性

2025-05-30 19:54:05 欧冠世界杯

0 前言

陈硕大佬的muduo网络库的源码我已经看了好久了,奈何本人实力有限,每每看到其代码设计的精巧之处只能内心称赞,无法用言语表达出来。实在令人汗颜。最近在看到网络设计部分时有了一些体会,结合自己之前在网络编程方面的积累,特对代码设计中的一些精巧之处做一些总结。 就muduo在多线程并发服务器设计而言,除了其高效的并发服务架构之外,其在代码设计方面的高效性和灵活性可以从下面三个切片得以体现。 在这之前,首先说明muduo的并发服务器架构为multi-reactors+thread pool架构,其架构图如下: 简单而言,mainReactor负责处理新连接IO的创建和管理,subReactor+thread pool负责已连接IO的读写事件。其中,为了降低subReactor的压力、提高IO读写效率,可以添加多个subReactor。

1 切片一:EventLoop线程绑定与跨线程调用

muduo_net设计中,每一个EventLoop对象对应一个reactor,并且每个EventLoop对象绑定一个线程,EventLoop对象中的loop()在内的多数函数只能由该对象所属的线程调用,即EventLoop对象有严格的线程所属特性。某些函数可以由外部线程调用,可以起到激活睡眠或阻塞的EventLoop当前线程的作用。 首先看一下EventLoop类的成员变量(部分):

typedef std::vector ChannelList;

bool looping_; /* atomic */

bool quit_; /* atomic */

bool eventHandling_; /* atomic */

bool callingPendingFunctors_; /* atomic */

const pid_t threadId_; // 当前对象所属线程ID

Timestamp pollReturnTime_;

boost::scoped_ptr poller_;

boost::scoped_ptr timerQueue_;

int wakeupFd_; // 用于保存eventfd创建的file descriptor--进程间通信

// unlike in TimerQueue, which is an internal class,

// we don't expose Channel to client.

// eventfd的wakeupFd_对应的通道 该通道将会纳入poller_来管理

boost::scoped_ptr wakeupChannel_;// EventLoop对象负责wakeupChannel_对象的创建/生命周期

// Poller返回的活动通道

ChannelList activeChannels_;// 这些channel的生存期不由EventLoop管理和负责-->由TcpConnection和TimerQueue管理

Channel* currentActiveChannel_; // 当前正在处理的活动通道

MutexLock mutex_;

std::vector pendingFunctors_; // @BuardedBy mutex_

具体体现如下。

1.1 判断当前线程是否已经拥有所属EventLoop对象

主要通过下面两条实现。 1 使用线程局部变量记录当前线程是否已经拥有所属EventLoop对象

// 当前线程EventLoop对象指针

// 线程局部存储

__thread EventLoop* t_loopInThisThread = 0;// 指向EventLoop对象

2 在创建EventLoop对象前进行判断

EventLoop::EventLoop()

: looping_(false),

quit_(false),

eventHandling_(false),

callingPendingFunctors_(false),

threadId_(CurrentThread::tid()),

poller_(Poller::newDefaultPoller(this)),

timerQueue_(new TimerQueue(this)),

wakeupFd_(createEventfd()),// 创建一个eventfd

wakeupChannel_(new Channel(this, wakeupFd_)),// 创建一个通道 并将wawkeupFd_传进Channel

currentActiveChannel_(NULL)

{

LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;

/* 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL) */

if (t_loopInThisThread)

{

LOG_FATAL << "Another EventLoop " << t_loopInThisThread

<< " exists in this thread " << threadId_;

}

else

{

t_loopInThisThread = this;// 指向EventLoop对象

}

// 绑定eventfd的回调处理函数handleRead()

wakeupChannel_->setReadCallback(

boost::bind(&EventLoop::handleRead, this));

// we are always reading the wakeupfd

// 绑定eventfd对应事件--EPOLLIN可读事件

wakeupChannel_->enableReading();

}

1.2 执行函数前判断当前线程是否为EventLoop对象所属线程

通过assertInLoopThread()和isInLoopThread()函数进行判断。

void assertInLoopThread()

{

if (!isInLoopThread())

{

abortNotInLoopThread();

}

}

// ...

bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

1.3 部分函数可以跨线程调用以唤醒阻塞的EventLoop所属线程

以quit()函数为例,当要停止loop()循环时,此时EventLoop对象所属线程可能正处于poll的阻塞中,因此此时需要借助外部进程调用该quit()同时唤醒处于阻塞中的EventLoop对象所属线程,并退出poll循环。

// 该函数可以跨线程调用

void EventLoop::quit()

{

quit_ = true;

if (!isInLoopThread())

{

wakeup();

}

}

2 切片二:IO线程与计算线程的灵活调度

为了充分了利用CPU,在IO线程空间时,可以为loop()线程分配一些计算任务。因此,muduo_net既可以完成IO处理又能进行计算任务。 具体如下。

2.1 poll+handleEvent

该部分主要负责IO事件的监听、响应和处理。

// 事件循环,该函数不能跨线程调用

// 只能在创建该对象的线程中调用

void EventLoop::loop()

{

assert(!looping_);

// 断言当前处于创建该对象的线程中

assertInLoopThread();

looping_ = true;

quit_ = false;

LOG_TRACE << "EventLoop " << this << " start looping";

//::poll(NULL, 0, 5*1000);

while (!quit_)

{

activeChannels_.clear();

/* IO事件监听、响应和处理 */

pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);

//++iteration_;

if (Logger::logLevel() <= Logger::TRACE)

{

printActiveChannels();

}

// TODO sort channel by priority

eventHandling_ = true;

for (ChannelList::iterator it = activeChannels_.begin();

it != activeChannels_.end(); ++it)

{

currentActiveChannel_ = *it;

currentActiveChannel_->handleEvent(pollReturnTime_);

}

currentActiveChannel_ = NULL;

eventHandling_ = false;

/* 处理计算任务 */

doPendingFunctors();// 让IO线程不繁忙的时候也能执行一些计算任务-->避免IO线程一直处于阻塞状态

}

LOG_TRACE << "EventLoop " << this << " stop looping";

looping_ = false;

}

2.2 queueInLoop+doPendingFunctors

该部分负责计算任务的添加和处理。其中添加计算任务queueInLoop可以由外部线程和EventLoop所属线程添加,执行计算任务doPendingFunctors只能由EventLoop线程处理。

// 将cb添加到队列中

void EventLoop::queueInLoop(const Functor& cb)

{

{

MutexLockGuard lock(mutex_);

pendingFunctors_.push_back(cb);// 添加外部任务到pendingFunctors_数组

}

// 调用queueInLoop的线程不是IO线程需要唤醒(唤醒EventLoop对应的线程 以便该线程及时执行cb函数)

// 或者调用queueInLoop的线程是EventLoop对应的IO线程,并且此时该线程正在调用pending functor(正在执行计算任务),需要唤醒

// 只有IO线程的事件回调中调用queueInLoop才不需要唤醒

if (!isInLoopThread() || callingPendingFunctors_)

{

wakeup();

}

}

void EventLoop::doPendingFunctors()

{

std::vector functors;

callingPendingFunctors_ = true;

// 添加互斥锁

// 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问

{

MutexLockGuard lock(mutex_);

functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空

}

// 为什么functors的执行没有放在临界区?

// 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间

// 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件

for (size_t i = 0; i < functors.size(); ++i)

{

functors[i]();// 执行函数

}

callingPendingFunctors_ = false;

}

3 切片三:线程安全与执行效率

下面通过doPendingFunctors()函数观察muduo在实现线程安全方与保证执行效率之间权衡的设计。 该函数主要包含两个部分:获取待处理计算任务序列、依次执行计算任务。 其中pendingFunctors_存放计算任务的数组作为外部线程和EventLoop线程都能够处理(插入数据)的共享变量,EventLoop线程在获取计算任务序列时需要对pendingFunctors_进行加锁处理。 这其中涉及到锁的作用范围即临界区的作用范围的问题。muduo此处只将对pendingFunctors_的操作置于临界区,而将执行计算任务的操作置于临界区之外。原因主要有两个: 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间; 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件。

void EventLoop::doPendingFunctors()

{

std::vector functors;

callingPendingFunctors_ = true;

/* 1 获取待处理计算任务序列 */

// 添加互斥锁

// 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问(其他线程不能操作pendingFunctors_数组)

{

MutexLockGuard lock(mutex_);

functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空

}

/* 依次执行计算任务 */

// 为什么functors的执行没有放在临界区?

// 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间

// 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件

for (size_t i = 0; i < functors.size(); ++i)

{

functors[i]();// 执行函数

}

callingPendingFunctors_ = false;

}

4 参考材料

https://www.bilibili.com/video/BV11b411q7zr?p=29