存档在 ‘Socket library’ 分类

TcpSocket类之Buffer(将字节序列压入OUTPUT缓冲区)

2013年7月2日

该函数原型为:

void TcpSocket::Buffer(const char *buf, size_t len)

先列出一些变量的意义
output_l m_obuf >>这个实际上是个STL容器List,存放的为指向OUTPUT缓冲区的指针。

typedef std::list<OUTPUT *> output_l;

OUTPUT *m_obuf_top >>这个是指向output_l的顶部(top)指向的内存区。
buf >>需要存入top buffer的数据的首地址。
len >>需要存入top buffer的数据的长度。
m_output_length >>所有输出缓冲区的总字节数,这些缓冲区由list内的指针指向。
ptr >>当前已经将buf指向的len字节里的ptr字节写到了TOp buffer(已经处理的字节数)。
space >>当前缓冲区的空余量。
pbuf >>为buf中还没写入到Top buffer的首地址。
sz >>为还没写到top buffer中的字节数。

//增加数据到输出缓冲区顶部


来看源代码:
void TcpSocket::Buffer(const char *buf, size_t len)
{
	//ptr为放入缓冲区中的数据个数
	size_t ptr = 0;
	//总共待发送的字节数
	m_output_length += len;
	//这里使用while原因
	//如果当前top元素指针指向的OUTPUT缓冲区不能容纳len个字节,
	//那么就 需要将当前的top指针指向的OUTPUT填满,剩下的len - ptr个字节则填入到
	//new出来的新top指针指向的OUTPUT中
	while (ptr < len)
	{
		// buf/len => pbuf/sz
		size_t space = 0;
		if ((space = m_obuf_top ? m_obuf_top -> Space() : 0) > 0)
		{
			const char *pbuf = buf + ptr;
			size_t sz = len - ptr;
			if (space >= sz)
			{
				m_obuf_top -> Add(pbuf, sz);
				ptr += sz;
			}
			else
			{
				m_obuf_top -> Add(pbuf, space);
				ptr += space;
			}
		}
		else
		{
			m_obuf_top = new OUTPUT;
			m_obuf.push_back( m_obuf_top );
		}
	}
}

使用while循环的目的是:
因为当前top buffer的space可能容纳不了len个字节,那么我们就需要先将space个字节放入到当前top buffer,剩下的字节数则需需要重新new一个OUTPUT,相应的top buffer也编程现在new这个缓冲区。

由代码科技,当space =0是则需要new新的缓冲区。

		else
		{
			m_obuf_top = new OUTPUT;
			m_obuf.push_back( m_obuf_top );
		}

pbuf更新到还没写入到top bufer的首地址,长度也进行更新。

const char *pbuf = buf + ptr;
size_t sz = len - ptr;

如果当前top buffer的space 大于剩余需要写入的字节数,那么就将剩余的sz字节写入。
如果当前top buffer的space 小于剩余需要写入的字节数,那么就将space填满,剩余的字节再下一个 循环处理。

			if (space >= sz)
			{
				m_obuf_top -> Add(pbuf, sz);
				ptr += sz;
			}
			else
			{
				m_obuf_top -> Add(pbuf, space);
				ptr += space;
			}

TcpSocket类之Open中的同步解析和异步解析

2013年7月2日

TcpSocket类之Open解析描述了Open函数

bool TcpSocket::Open(const std::string &host,port_t port)

该函数完成host到ip的转换并且connect到host与port指定的服务器。

#ifdef ENABLE_RESOLVER
	if (!Handler().ResolverEnabled() || Utility::isipv4(host) )
	{
#endif
		ipaddr_t l;
		if (!Utility::u2ip(host,l))
		{
			SetCloseAndDelete();
			return false;
		}
		Ipv4Address ad(l, port);
		Ipv4Address local;
		return Open(ad, local);
#ifdef ENABLE_RESOLVER
	}
	// resolve using async resolver thread
	m_resolver_id = Resolve(host, port);
	return true;
#endif

如果ResolverEnabled没有启动[!Handler().ResolverEnabled()],则使用同步方式来获取地址信息。
如果该string为字符串形式表示的点分十进制IP地址,则并不需要解析就可以直接转换出ip地址。
那么什么时候需要解析呢,那就是你输入别名或者主机名。比如你输入www.baidu.com或者localhost之类。
这也就是为什么会存在这样的语句:

if (!Handler().ResolverEnabled() || Utility::isipv4(host) )

再说一次,host这个string描述的是服务器地址信息,port描述的是服务器端口信息。
但是host可能是ip地址形式,比如127.0.0.1
也可能是名字形式,比如localhost
那么localhost需要解析,而127.0.0.1则可以直接使用u2ip转换成二进制ip地址。

这里需要注意的是,为什么不使用同步的u2ip->gethostbyname或getaddrinfo。
而是使用异步的Reslove线程。

使用异步解析时,调用如下

m_resolver_id = Resolve(host, port);

返回一个m_resolver_id 用来在后边对解析后的结果匹配。

大致调用过程

m_resolver_id = Resolve(host, port);

|
|

int Socket::Resolve(const std::string& host,port_t port)
{
	return Handler().Resolve(this, host, port);
}

|
|

int SocketHandler::Resolve(Socket *p,const std::string& host,port_t port)
{
	// check cache
	ResolvSocket *resolv = new ResolvSocket(*this, p, host, port);
	resolv -> SetId(++m_resolv_id);
	resolv -> SetDeleteByHandler();
	ipaddr_t local;
	Utility::u2ip("127.0.0.1", local);
	if (!resolv -> Open(local, m_resolver_port))
	{
		LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
	}
	Add(resolv);
	m_resolve_q[p -> UniqueIdentifier()] = true;
DEB(	fprintf(stderr, " *** Resolve '%s:%d' id#%d  m_resolve_q size: %d  p: %p\n", host.c_str(), port, resolv -> GetId(), m_resolve_q.size(), p);)
	return resolv -> GetId();
}
int SocketHandler::Resolve(Socket *p,ipaddr_t a)
{
	// check cache
	ResolvSocket *resolv = new ResolvSocket(*this, p, a);
	resolv -> SetId(++m_resolv_id);
	resolv -> SetDeleteByHandler();
	ipaddr_t local;
	Utility::u2ip("127.0.0.1", local);
	if (!resolv -> Open(local, m_resolver_port))
	{
		LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
	}
	Add(resolv);
	m_resolve_q[p -> UniqueIdentifier()] = true;
	return resolv -> GetId();
}

TcpSocket类之Open解析

2013年7月2日

首先关注这个Open函数,这个函数的作用就是让TcpSocket作为客户端去主动连接服务器,并返回connect状态。

bool TcpSocket::Open(SocketAddress& ad,SocketAddress& bind_ad,bool skip_socks)

该函数的大致步骤为:

//step1:检查ad是否有效
//step2:检查handler是否还可以容纳Socket
//step3:查看连接池是否可用,可用则搜索,如果找到可用则返回,否则
//step4:创建一个新的套接字,并根据bind_ad决定是否绑定
//step5:connect到ad指定的远程主机

bool TcpSocket::Open(SocketAddress& ad,SocketAddress& bind_ad,bool skip_socks)
{
	if (!ad.IsValid())
	{
		Handler().LogError(this, "Open", 0, "Invalid SocketAddress", LOG_LEVEL_FATAL);
		SetCloseAndDelete();
		return false;
	}
	if (Handler().GetCount() >= Handler().MaxCount())
	{
		Handler().LogError(this, "Open", 0, "no space left for more sockets", LOG_LEVEL_FATAL);
		SetCloseAndDelete();
		return false;
	}
	
	SetConnecting(false);
	
	// check for pooling
	//在连接池中找可用的Socket
#ifdef ENABLE_POOL
	if (Handler().PoolEnabled())
	{
		ISocketHandler::PoolSocket *pools = Handler().FindConnection(SOCK_STREAM, "tcp", ad);
		if (pools)
		{
			//将该连接信息拷贝过来
			CopyConnection( pools );
			//删除临时指针变量
			delete pools;
			//作为客户端
			SetIsClient();
			//ISocketHandler管理器必须调用OnConnect
			SetCallOnConnect(); // ISocketHandler must call OnConnect
			Handler().LogError(this, "SetCallOnConnect", 0, "Found pooled connection", LOG_LEVEL_INFO);
			//成功
			return true;
		}
	}
#endif
	// if not, create new connection
	//创建一个新的连接
	SOCKET s = CreateSocket(ad.GetFamily(), SOCK_STREAM, "tcp");
	if (s == INVALID_SOCKET)
	{
		return false;
	}
	// socket must be nonblocking for async connect
	//因为需要异步连接,所以这里设置成非阻塞
	if (!SetNonblocking(true, s))
	{
		SetCloseAndDelete();
		closesocket(s);
		return false;
	}
#ifdef ENABLE_POOL
	SetIsClient(); // client because we connect
#endif
	//设置需要连接的远程主机的SocketAddress
	SetClientRemoteAddress(ad);
	int n = 0;
	//如果 需要绑定到本地端口和地址(一般客户端不需要绑定)
	if (bind_ad.GetPort() != 0)
	{//bind_ad为SocketAddress,所以存在这两个隐式转换
		bind(s, bind_ad, bind_ad);
	}
	{
	//主动连接远程服务器
		n = connect(s, ad, ad);
	//设置连接的远程主机SocketAddr信息
		SetRemoteAddress(ad);
	}
	if (n == -1)
	{
		// check error code that means a connect is in progress
		if (Errno == EINPROGRESS)
		{
			Attach(s);
			SetConnecting( true ); // this flag will control fd_set's
		}
		else
#ifdef ENABLE_RECONNECT
		if (Reconnect())
		{
			Handler().LogError(this, "connect: failed, reconnect pending", Errno, StrError(Errno), LOG_LEVEL_INFO);
			Attach(s);
			SetConnecting( true ); // this flag will control fd_set's
		}
		else
#endif
		{
			Handler().LogError(this, "connect: failed", Errno, StrError(Errno), LOG_LEVEL_FATAL);
			SetCloseAndDelete();
			closesocket(s);
			return false;
		}
	}
	else
	{
	//将描述符关联
		Attach(s);
		SetCallOnConnect(); // ISocketHandler must call OnConnect
	}

	// 'true' means connected or connecting(not yet connected)
	// 'false' means something failed
	return true; //!Connecting();
}

该函数最先就是对传入参数的正确性检测。
由于该库是基于事件的,所以Open创建的套接字需要交给Handler来管理,这也就是需要检查Handler的容量是否超限。
SetConnecting(false)指明现在还没有进行connect。
如果启用了连接池,则在连接池中查找可用的匹配连接,并提取出来直接使用,而不是从头开始创建套机子再连接。

#ifdef ENABLE_POOL
	if (Handler().PoolEnabled())
	{
		ISocketHandler::PoolSocket *pools = Handler().FindConnection(SOCK_STREAM, "tcp", ad);
		if (pools)
		{
			//将该连接信息拷贝过来
			CopyConnection( pools );
			//删除临时指针变量
			delete pools;
			//作为客户端
			SetIsClient();
			//ISocketHandler管理器必须调用OnConnect
			SetCallOnConnect(); // ISocketHandler must call OnConnect
			Handler().LogError(this, "SetCallOnConnect", 0, "Found pooled connection", LOG_LEVEL_INFO);
			//成功
			return true;
		}
	}
#endif

使用SetIsClient()来指明该连接为客户端。
如果连接池没启用或者没有合适的连接则创建新的套接字。

接着开始非常重要的设置。

if (!SetNonblocking(true, s))

设置套接字描述符为非阻塞,避免connect的阻塞,注意就是非阻塞的connect返回值为-1.
更详细的描述为:
connect立即返回一个EINPROGRESS错误,但是3次握手继续在进行,这个时候使用select来检测成功或者失败。
连接如果成功,则select变可写;
连接如果失败,则select变可读可写。
那么为什么还要对n!=-1进行分支呢?这是因为如果服务器在同一个主机上,那么一般connect通常立刻建立。

另外,如果连接失败if (Errno != EINPROGRESS),那么则可能启用Reconnect机制。

#ifdef ENABLE_RECONNECT
		if (Reconnect())
		{
			Handler().LogError(this, "connect: failed, reconnect pending", Errno, StrError(Errno), LOG_LEVEL_INFO);
			Attach(s);
			SetConnecting( true ); // this flag will control fd_set's
		}
		else
#endif

重复connect中会

SetConnecting( true ); // this flag will control fd_set's

设置connecting标志后,将会影响fd_set,进而影响select.

还有一个Open函数,使用string作为主机地址参数,这个时候用(!Utility::u2ip(host,l))转换成ip地址就行了。

TcpSocket类之析构与Open

2013年7月2日

1.TcpSocket类通过SOCKETS_DYNAMIC_TEMP来决定是使用预先定义一个临时缓冲区m_buf,还是在每次接收时都动态new。当然动态new效率较低。
2.析构函数,首先决定是否释放m_buf,再将输出链表清空,因为list中存放的是动态new的地址,所以需要在析构函数中进行释放。

TcpSocket::~TcpSocket()
{
#ifdef SOCKETS_DYNAMIC_TEMP
	delete[] m_buf;
#endif
	//清空m_obuf,因为list中存放指针,所以需要在析构函数中析构
	while (m_obuf.size())
	{
		output_l::iterator it = m_obuf.begin();
		OUTPUT *p = *it;
		delete p;
		m_obuf.erase(it);
	}
}

3.5个Open函数的重载,实际上前面几个Open都是对地址和端口进行封装后调用

bool TcpSocket::Open(SocketAddress& ad,SocketAddress& bind_ad,bool skip_socks)

每个函数需要的参数为:

bool TcpSocket::Open(ipaddr_t ip,port_t port,bool skip_socks)
需要Tcp客户端connect到的ip地址和端口,内部对ip和port组成一个SocketAddress

bool TcpSocket::Open(ipaddr_t ip,port_t port,bool skip_socks)
{
	Ipv4Address ad(ip, port);
	Ipv4Address local;
	return Open(ad, local, skip_socks);
}

bool TcpSocket::Open(SocketAddress& ad,bool skip_socks)
则会使用Ipv4Address bind_ad(“0.0.0.0”, 0);将本地进程的ip和端口决定权交给内核。

bool TcpSocket::Open(SocketAddress& ad,SocketAddress& bind_ad,bool skip_socks)
ad为connect到的服务器地址信息,bind_ad为本地进程需要绑定的地址信息,skip_socks暂不关注。

bool TcpSocket::Open(const std::string &host,port_t port)
则是以字符串形式描述的服务器IP地址。
QQ图片20130702082627

原来我的鳖脚英语老外还是看的懂啊

2013年7月1日

这是我最近阅读的一个开源项目,但是我发现一个BUG,所以向作者提交了该问题。

Thank you very much!
I will try to recreate your problem with using the standard SocketHandlerEp class. I have
only myself used it as a base class for specialized handlers, so there is a problem left to
be discovered!
Best regards
/ah

On Sat, Jul 6, 2013 at 5:23 AM, Unix8.net,Unix吧 wrote:
Dear autor!
When I read the source of “C++ Socket Library”, i wirted a test, but have a “Segmentation fault” error.

When i use SocketHandler as the handler, there is no error.
When i write a class which inherit from SocketHandlerEp as the handler, there is no error.
When i use SocketHandlerEp as the handler, there is a fatal error.

【I am a Chinese person, please forgive my English is not good】

Code:

#include <stdio.h>
#include <SocketHandler.h>
#include <SocketHandlerEp.h>
#include <TcpSocket.h>
#include <Mutex.h>
#include <StdoutLog.h>
#include <ListenSocket.h>
#include <Lock.h>


class MyHandler;
class InfoSocket : public TcpSocket
{
public:
	InfoSocket(ISocketHandler& h) : TcpSocket(h) {}
	~InfoSocket() {}

	void OnAccept() {
		printf("Accept\n");
	}

	//Don't use OnRawData
	void OnRead()
	{
		TcpSocket::OnRead();
		size_t n = ibuf.GetLength();
		printf("ibuf have %d bytes\n", n);
		if (n > 0)
		{
			char tmp[100];
			ibuf.Read(tmp,n);
			printf("Read %d bytes:\n",n);
			for (size_t i = 0; i < n; i++)
			{
				printf("%c",isprint(tmp[i]) ? tmp[i] : '.');
			}
			printf("\n");

		}
	}
};

class MyHandler : public SocketHandlerEp
{
public:
	MyHandler(IMutex& m,StdoutLog *p) : SocketHandlerEp(m, p) {}
	~MyHandler() {}
};

// use SocketHandlerEp
#define TEST_SOCKETHANDLEREP

//use SocketHandler
//#define TEST_SOCKETHANDLER

//use MyHandler, inherit SocketHandlerEp or SocketHandler
//#define TEST_INHERIT_SOCKETHANDLEREP


int main(int argc,char *argv[])
{
	Mutex lock;
	StdoutLog log;

#if  defined(TEST_SOCKETHANDLEREP)
	SocketHandlerEp h(lock, &log);
#elif defined(TEST_SOCKETHANDLER)
	SocketHandler h(lock, &log);
#elif defined(TEST_INHERIT_SOCKETHANDLEREP)
	MyHandler h(lock, &log);
#endif

	ListenSocket<InfoSocket> l(h);
	if (l.Bind(11002))
	{
		printf("Bind() failed\n");
		exit(-2);
	}
	h.Add(&l);
	
	bool quit = false;
	while (!quit)
	{
		h.Select(1, 0);
	}
}

When I debug found that cause of the error is “m_mutex.Lock()”.

Thank you!
Best Regards!
2013-07-06

C++ Socket library中两个关键接口类[1]IEventHandler

2013年6月24日

IEventHandler主要是针对于Timer Event,所以几个纯虚函数都是与之相关的。

virtual bool GetTimeUntilNextEvent(struct timeval *) = 0;

该接口用于获取下一次Timer Event触发的时间,结果由值-结果参数timeval传回。这个结构体的定义可以查看struct timeval结构体 以及 gettimeofday()函数
如果有下一个Timer Event则返回true,否则返回false。可以来看看IEventHandler的派生类EventHandler是怎么实现该接口的。

bool EventHandler::GetTimeUntilNextEvent(struct timeval *tv)
{
	//如果事件链表为空,表明没有下一个可用的timer Event
	if (!m_events.size())
		return false;
	std::list::iterator it = m_events.begin();
	if (it != m_events.end())
	{
		//获取现在时刻
		EventTime now;
		//和链表中的时刻进行比较
		mytime_t diff = (*it) -> GetTime() - now;
		if (diff < 1) 		{ 			diff = 1; 		} 		//对这个时间间隔进行返回 		tv -> tv_sec = static_cast(diff / 1000000);//秒
		tv -> tv_usec = static_cast(diff % 1000000);//微妙零头
		return true;
	}
	return false;
}

» 阅读更多: C++ Socket library中两个关键接口类[1]IEventHandler

C++ socket library中的事件管理者handler

2013年6月24日

存在两个纯虚基类,也就是类似两个接口类。IEventHandler与ISocketHandler。

ISocketHandler管理的是文件描述符事件(当然更多的是套接字描述符,以类表示为Socket)。
IEventHandler管理的是Timer事件。

实现ISocketHandler接口类的基类为SocketHandler。SocketHandler通过select来获取描述符的可读,可写,出错事件,并以事件驱动形式来驱动Socket类,调用其中的回调函数,比如OnAccept,OnRead,OnLine等。

还有个基类SocketHandlerEp也是继承自SocketHandler来实现了接口,SocketHandlerEp与SocketHandler不同,SocketHandlerEp是通过epoll来分发事件的。

实现IEventHandler接口类的基类为EventHandler,EventHandler类不仅仅实现了IEventHandler的接口,而且继承了SocketHandler

class EventHandler : public SocketHandler,public IEventHandler

可以看出,要想同时管理Socket与timer则需要继承EventHandler 基类。

所以最主要的3个管理类为:SocketHandler,EventHandler,SocketHandlerEp。

C++ Socket library 中Socket类中的一些回调函数

2013年6月18日

1.virtual void OnConnect()
当一个连接完成时回调。

2.virtual void OnAccept()
当一个外部客户端请求连接到达时并在连接建立时回调。载Socket类中并没有实现,而在派生类中来实现。

3.virtual void OnLine(const std::string&)
当socket类工作于line协议模式时,收到一个完成的行时回调。

4.virtual void OnDelete()
当该套接字类被ISocketHandler删除时回调。

5.virutal void OnRawData(const char *buf, size_t len)
当从套机字成功读时的回调函数。