boost.asio中io_service分析

2014年4月14日 由 Creater 留言 »

本文从源代码的角度进行来看看io_service

boost::asio::io_service io_service;

当使用同步方式编程时,没有io_service.run()调用;使用异步方式编程时,则有此调用,来看看这个函数实际上在做什么

std::size_t io_service::run()
{
  boost::system::error_code ec;
  std::size_t s = impl_.run(ec);
  boost::asio::detail::throw_error(ec);
  return s;
}

可以看出,实际上的主要工作已经委托给impl_,impl的本来意思就是执行,实施的意思。

既然这里使用了数据成员impl_,就不得不先来看看io_service类的组成为何方圣神,抛开一切函数,来看实际的数据成员,

#if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
  detail::winsock_init<> init_;
#elif defined(__sun) || defined(__QNX__) || defined(__hpux) || defined(_AIX) \
  || defined(__osf__)
  detail::signal_init<> init_;
#endif

  // The service registry.
  boost::asio::detail::service_registry* service_registry_;

  // The implementation.
  impl_type& impl_;

其中的impl_实际上为

typedef detail::io_service_impl impl_type;

既然调用的是impl_.run(),不得不看这个run函数在干什么,在看run函数之前,得来看看impl_这个类型?

#if defined(BOOST_ASIO_HAS_IOCP)
  typedef class win_iocp_io_service io_service_impl;
  class win_iocp_overlapped_ptr;
#else
  typedef class task_io_service io_service_impl;
#endif

从这里看出,impl_在windows下使用的是windows IOCP模型,即win_iocp_io_service,在Linux下估计是使用的是EPOLL模型,但是没有细看,不知道是不是,反正这里命名为task_io_service。

class win_iocp_io_service
  : public boost::asio::detail::service_base<win_iocp_io_service>

class task_io_service
  : public boost::asio::detail::service_base<task_io_service>

在win_iocp_io_service类里可以看见调用的是windows系统提供的SDK里的文件。

这说明asio在windows里使用win_iocp_io_service来处理任务,因为io_service.run实际上调用的是win_iocp_io_service.run(),所以来看看该函数。

size_t win_iocp_io_service::run(boost::system::error_code& ec)
{
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  {
    stop();
    ec = boost::system::error_code();
    return 0;
  }

  win_iocp_thread_info this_thread;
  thread_call_stack::context ctx(this, this_thread);

  size_t n = 0;
  while (do_one(true, ec))
    if (n != (std::numeric_limits<size_t>::max)())
      ++n;
  return n;
}

到这里可以看出,最终交给了以下函数来处理:

size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)

从这个函数的详细实现里可以看出,大概意思就是从队列中取操作来执行。

综上可以得出,io_service实际上就是利用iocp模型从队列中获取操作来执行。

 

io_servie 任务队列中的任务由io_service类的post函数提供

Post向队列中投递任务,然后激活空闲线程执行任务。其实现流程如下:

1. Post接收handler作为参数,实际上是个仿函数,通过此仿函数构造出completion_handler对象,completion_handler继承自operation。然后调用post_immediate_completion。

 typename op::ptr p = { boost::asio::detail::addressof(handler),
    boost_asio_handler_alloc_helpers::allocate(
      sizeof(op), handler), 0 };
  p.p = new (p.v) op(handler);

  BOOST_ASIO_HANDLER_CREATION((p.p, "io_service", this, "post"));

  post_immediate_completion(p.p, false);

2.post_immediate_completion首先将outstanding_work_增加,然后调用post_deferred_completion。

  void post_immediate_completion(win_iocp_operation* op, bool)
  {
    work_started();
    post_deferred_completion(op);
  }

3.post_deferred_completion首先加锁将任务入列,然后调用wake_one_thread_and_unlock

void win_iocp_io_service::post_deferred_completion(win_iocp_operation* op)
{
  // Flag the operation as ready.
  op->ready_ = 1;

  // Enqueue the operation on the I/O completion port.
  if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
  {
    // Out of resources. Put on completed queue instead.
    mutex::scoped_lock lock(dispatch_mutex_);
    completed_ops_.push(op);
    ::InterlockedExchange(&dispatch_required_, 1);
  }
}

4.wake_one_thread_and_unlock尝试唤醒当前空闲的线程,其实现中特别之处在于,若没有空闲线程,但是有线程在执行task->run,先阻塞。

5.  first_idle_thread_维护了所有当前空闲线程,每次唤醒时只唤醒空闲线程的第一个。

广告位

评论已关闭.