您的位置:首页 > 家居用品 > 厨卫 > 《A Simple IOCP Server/Client C...

《A Simple IOCP Server/Client C...

luyued 发布于 2011-06-04 22:52   浏览 N 次  

  《A Simple IOCP Server/Client Class》 一.项目概述

  1.pre:

  预编译器文件stdafx。

  2.app:MFC主线程。

  BOOLCIOCPApp::InitInstance()

  {

  //

  CIOCPDlgdlg; // main dialog

  //

  }

  3.rc:资源文件。

  4.gui:主对话框以及一些自绘制控件。

  (1)classCHistoryEdit : publicCEdit

  (2)classMyListCtrl : publicCListCtrl

  (3)classIOCPSettingsDlg : publicCDialog

  IOCPSettingsDlg使用UpdateData()<->DDX()实现对IOCP的配置。

  IOCPSettingsDlg包含一个Save按钮(IDOK),但并没有覆写OnOK()响应。

  单击OK按钮将调用基类CDialog的OnOk()函数,在OnOK()中调用了UpdateData(TRUE),将数据从控件传递到关联的成员变量。

  (4)

  classCIOCPDlg : publicCDialog

  {

  CHistoryEdit m_CtrlLogg;

  MyListCtrl m_CtrlClientList;

  IOCPSettingsDlgm_ConfigDlg;

  MyIOCP m_iocp; // network i/o handler

  }

  5.i/o数据结构:

  基于完成端口的重叠I/O网络通信业务逻辑主要涉及"单I/O数据"和"单句柄数据"。

  <1>单I/O数据(per i/o data)

  CIOCPBuffer类包含一个重叠I/O所需要的重叠结构(OVERLAPPED)和I/O缓冲区(WSABUF),它代表一次重叠I/O操作。

  CIOCPBuffer类还提供了缓冲区管理(m_nUsed)、打包(CreatePackage)、解包(GetPackageInfo)和I/O序列(m_iSequenceNumber)机制。

  class CIOCPBuffer

  {

  public:

  // 用于WSASend/WSARecv的OVERLAPPED结构

  OVERLAPPEDm_ol;

  private:

  // 用于WSASend/WSARecv的WSABUF结构

  WSABUFm_wsabuf;

  // 实际缓冲区

  BYTEm_Buffer[MAXIMUMPACKAGESIZE];

  // 已占用字节数

  UINTm_nUsed;

  // I/O类型,取值enum IOType

  intm_Operation;

  // I/O序列号

  intm_iSequenceNumber;

  // I/O在外部列表(IOCPS::m_BufferList)中的位置(指针)

  POSITIONm_pPos;

  }

  <2>单句柄数据(per handle data)

  ClientContext结构代表一个TCP连接的客户端(endpoint),它提供了对客户通信的I/O有序化管理。

  structClientContext { // 客户套接字(代表一个TCP连接)

  SOCKET m_Socket;

  // 在该连接上挂起的I/O数,释放该结构前检查以避免Access Violation

  int m_nNumberOfPendlingIO;

  // 发送序列机制

  unsignedint m_SendSequenceNumber; // apply for WSASend post: IOCPS::ASend IOCPS::SetSendSequenceNumber&rar r;CIOCPBuffer::SetSequenceNumber

  unsignedint m_CurrentSendSequenceNumber; // expect for sequential WSASend operation: IOCPS::OnWrite IOCPS::GetNextSendBuffer/IOCPS ::IncreaseSendSequenceNumber

  BufferMap m_SendBufferMap; // out of order writing

  // 接收序列机制

  unsignedint m_ReadSequenceNumber; // apply for WSARecv post: IOCPS::MakeOrderdRead CIOCPBuffer::SetSequenc eNumber

  unsignedint m_CurrentReadSequenceNumber; // expect from WSARecv completion: IOCPS::OnReadCompleted IOCPS::IncreaseR eadSequenceNumber/IOCPS::GetNextReadBuffer

  BufferMap m_ReadBufferMap; // out of order reading

  // 文件传输模式支持

  #ifdef TRANSFERFILEFUNCTIONALITY

  CFilem_File; // file to send/receive

  unsignedintm_iMaxFileBytes; // file size

  unsignedintm_iFileBytes; // sent/received completed

  BOOLm_bFileSendMode; // file sender

  BOOLm_bFileReceivedMode; // file receiver

  #endif

  // 组包单元(for assembling)

  CIOCPBuffer* m_pBuffOverlappedPackage;

  // 额外数据

  CStringm_sReceived;

  intm_iNumberOfReceivedMsg;

  BOOLm_bUpdate;

  };

  6.序列号机制

  每个i/o buffer都有序列号,每个ClientContext都有自己的read/send序列号。

  序列号从0~2^31-1回环。

  二.系统概述

  

  |--

  classIOCPS

  {

  // management for client contexts

  ContextMap m_ContextMap;

  CPtrList m_FreeContextList; // pool

  int m_iMaxNumberOfFreeContext; // pool size

  // management for overlapped i/o buffers

  CPtrList m_BufferList;

  CPtrList m_FreeBufferList; // pool

  int m_iMaxNumberOfFreeBuffer; // pool size

  // listener servo

  staticUINTListenerThreadProc(LPVOIDpParam);

  HANDLE m_hCompletionPort;

  // i/o dispatcher

  staticUINT IOWorkerThreadProc(LPVOIDpParam);

  CPtrList m_IOWorkerList; int m_nIOWorkers; void ProcessIOMessage(CIOCPBuffer* pOverlapBuff, ClientContext* pContext, DWORDdwSize); // i/o handler(ProcessJob) staticUINT WorkerThreadProc(LPVOIDpParam); CMapWordToPtrm_WorkerThreadMap; int m_nOfWorkers; // job is designed for outgoing writing such as file sending procedure CPtrList m_JobQueueList; // job queue virtualvoid ProcessJob(JobItem* pJob, IOCPS* pServer); // deal with jobs // assemble(AddAndFlush()) and notify to make custom protocol analyzation(NotifyReceivedPackage()) // invoked by OnReadCompleted() voidProcessPackage(ClientContext* pContext, DWORDdwIoSize, CIOCPBuffer* pOverlapBuff); }; 1.system outline

  

  完成端口是一个基于状态机的I/O调度器(iocp dispatcher),I/O操作(ASend/ARead)中PostQueuedCompletionStatus()。

  I/O调度线程(IOWorkerThreadProc)中,根据状态码(IOType)来协调I/O过程。

  2.IOCPS::ListenerThreadProc()

  listener servo: wait for "shutdown" event and "accept" event.

  监听套接字采用基于事件通知(WSAEventSelect)的重叠I/O模型(WSAAccept)。

  接入的客户套接字采用基于完成端口通知的重叠I/O模型(WSARecv/WSASend)。

  SOCKETclientSocket = WSAAccept(); // 接入连接

  IOCPS::AssociateIncomingClientWithContext(SOCKETclientSocket);

  

  CIOCPBuffer* pOverlapBuff = AllocateBuffer(IOInitialize);

  ::PostQueuedCompletionStatus(); // fire up the iocp dispatcher

  3.IOCPS::IOWorkerThreadProc()

  i/o dispatcher: watch for the completion port and dispatch i/o completion notification.

  (1)ASend()/AZeroByteRead()/ARead()->PostQueuedCompletionStatus()

  (2)PostPackage()->PostQueuedCompletionStatus()

  4.IOCPS::WorkerThreadProc()

  i/o handler: deal with heavy task.

  // job items list

  CPtrListm_JobQueueList;

  // Adds a job to the queue.

  BOOLAddJob(JobItem* pJob);

  // Get a Job.

  JobItem* GetJob();

  // Called to do some work.

  virtualinlinevoidProcessJob(JobItem* pJob, IOCPS* pServer);

  // Clear the Job from the heap.

  inlinevoidFreeJob(JobItem* pJob);

  CStringJobItem::m_Data可扩展为缓冲区char[],以执行具体任务。

  三.系统流程

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | initialization |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  IOCPS::AssociateIncomingClientWithContext(SOCKET) // attach tcp socket to ClientContext structure

  EnterIOLoop(ClientContext);

  CIOCPBuffer* pOverlapBuff = AllocateBuffer(IOInitialize);

  ::PostQueuedCompletionStatus();

  -------------------------------------------------- -----------------------------

  The initialization for the incoming client will fire up the iocp dispatcher

  to take over the charge of its i/o procedure.

  -------------------------------------------------- -----------------------------

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | zero read loop |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  IOInitialize->OnInitialize

  AZeroByteRead(); // SetOperation(IOZeroByteRead)

  for (inti=0; i

  {

  EnterIOLoop(pContext);

  ARead(); // AllocateBuffer(IORead)

  }

  IOZeroByteRead->OnZeroByteRead WSARecv(0); // SetOperation(IOZeroReadCompleted) IOZeroReadCompleted->OnZeroByteReadCompleted AZeroByteRead() // post another zero read operation to make a loop -------------------------------------------------- ----------------------------- The zero read loop act as a probe to notify application when incoming data pending. -------------------------------------------------- ----------------------------- ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ | read loop | ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ IORead->OnRead MakeOrderdRead() WSARecv(MAXIMUMPACKAGESIZE) // SetOperation(IOReadCompleted) IOReadCompleted->OnReadCompleted->NotifyReadCompleted ProcessPackage()/AddToFile() // custom protocol analyzation for incoming data ARead() // post another read operation to make a loop -------------------------------------------------- ----------------------------- For each client,we initiate a number of read oepration waiting for incoming data. The read operation post is automatically made,so it's always ready to receive incoming data. -------------------------------------------------- ----------------------------- ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ | manual write | ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ MyIOCP::BuildPackageAndSend AllocateBuffer(IOWrite) ASend() EnterIOLoop() PostQueuedCompletionStatus() IOWrite->OnWrite WSASend() // SetOperation(IOWriteCompleted) IOWriteCompleted->OnWriteCompleted->NotifyWriteCompleted -------------------------------------------------- ----------------------------- The read opration loop is automate machine, but we should invoke ASend() to start up data sending procedure. -------------------------------------------------- ----------------------------- ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ | iocp packet | ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ PostPackage() EnterIOLoop() SetOperation(IOPostedPackage) PostQueuedCompletionStatus() IOPostedPackage->OnPostedPackage->NotifyReceivedPackage -------------------------------------------------- ----------------------------- PostPackage() is used to post request into IOCP (simulate received packages). This function is necessary to split heavy computation operation into several parts. (automate machine) This functions can be used instead of the function addJob(..). -------------------------------------------------- ----------------------------- ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ | file transmission | ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++ (1)file sender: MyIOCP::BuildFilePackageAndSend() PrepareSendFile() // open a file(m_bFileSendMode = TRUE) CIOCPBuffer.CreatePackage(Job_SendFileInfo) AllocateBuffer(IOWrite); EnterIOLoop(); PostQueuedCompletionStatus(); // send file information (2)file receiver: IOCPS::OnReadCompleted() ProcessPackage() NotifyReceivedPackage() PackageFileTransfer() PrepareReceiveFile() // create a file(m_bFileReceivedMode = TRUE) AllocateBuffer(IOWrite) CreatePackage(Job_StartFileTransfer) ASend() (3)file sender: IOCPS::OnReadCompleted() ProcessPackage() NotifyReceivedPackage() PackageStartFileTransfer() StartSendFile() // m_bFileSendMode = TRUE AllocateBuffer(IOWrite) EnterIOLoop() SetOperation(IOTransmitFileCompleted) TransmitFile() IOTransmitFileCompleted->OnTransmitFileCompleted->NotifyFileCompleted// file send completed (4)file receiver: IOCPS::OnReadCompleted() AddToFile() if (pContext->m_iFileBytes == pContext->m_iMaxFileBytes) { NotifyFileCompleted(pContext); // file receive completed } 四.修改完善

  参考《NKittyServer》

  1.去MFC化

  (1)CFile -> FILE

  (2)CString -> std::string

  (3)CCriticalSection -> Lock

  (4)CWinThread

  <1>CWinThread* -> HANDLE

  <2>AfxBeginThread -> CreateThread

  <3>CPtrList m_IOWorkerList;

  <4>CMapWordToPtr m_WorkerThreadMap;

  <5>CWinThread* GetWorker(WORDWorkerID);

  (5)CPtrList/CMap -> std adapter

  m_OneIPPerConnectionList : CPtrList -> std::list

  m_BanIPList : CPtrList -> std::list

  m_BufferList : CPtrList -> std::list

  ContextMap : CMap -> std::map

  BufferMap -> ReorderQueue : CMap -> std::priority_queue

  m_FreeContextList -> m_FreeContextVector : CPtrList -> std::vector

  m_FreeBufferList -> m_FreeBufferVector : CPtrList -> std::vector

  m_IOWorkerList -> m_IOWorkerVector : CPtrList -> std::vector

  m_WorkerThreadMap -> m_WorkerVector : CMapWordToPtr -> std::vector

  m_JobQueueList -> m_JobQueue : CPtrList -> std::queue

  2.客户端拔线的情况

  TCP一方A进程崩溃或手动终止进程,网络仍处于连接状态,此时A应用层面的进程管理机制将释放连接资源,并向对方B发送RST报文重置连接,对方B收到RST报文,向应用层返回WSAECONNRESET错误。

  TCP一方A禁用网络或拔掉网线导致网络中断,A本地状态机能够检测到异常,但是无法向对方B发送RST报文。此时,这条连接就成了"死连接",直到对方B发送数据才能探测到异常,返回WSAEHOSTUNREACH或WSAECONNABORTED(WSAETIMEDOUT);如果B也不发数据,那么只有依靠B的TCP心跳(Keep Alive Probe)机制来检测状态,返回WSAENETRESET。

  如果2小时内套接口上任一方向都没有数据交换,即无I/O,则B方的TCP自动给对方A发一个心跳包[TCP Keep-Alive],如果超时无响应,则B认为对方A已放弃该连接,重置(reset)连接回收资源。

  对于客户端拔线的"死连接"情况,采用了RegisterWaitForSingleObject的TimeOutCallback函数进行处理。如果有I/O完成,说明连接处于活动状态,置信hAlive事件。如果超过一定时间(如3分钟)无数据进出,则超时(TimerOrWaitFired==TRUE),在TimeOutCallback中断开该连接。

  IOCPS::AssociateIncomingClientWithContext

  pContext->hAlive = CreateEvent(NULL,FALSE,FALSE,NULL);

  RegisterWaitForSingleObject(&pContext->hTimeOut, pContext->hAlive, );

  IOCPS::OnWriteCompleted

  SetEvent(pContext->hAlive);

  IOCPS::OnReadCompleted

  SetEvent(pContext->hAlive);

  IOCPS::AbortiveClose/IOCPS::ReleaseClientContext

  CloseHandle(pContext->hAlive);

  IOCPS::AbortiveClose/IOCPS::ReleaseClientContext

  UnregisterWaitEx(mp->hTimeOut,NULL);

  3.WSASend实际完成返回的大小小于请求发送的大小。

  IOCPS::OnWriteCompleted()

  {

  if (pContext)

  {

  if (pOverlapBuff)

  {

  //发送小于需要发送的大小

  if (pOverlapBuff->GetUsed()!=dwIoSize)

  {

  if (dwIoSize < pOverlapBuff->GetUsed() && dwIoSize > 0)

  {

  // 这应该是正常的

  // ReleaseBuffer(pOverlapBuff); if (pOverlapBuff->Flush(dwIoSize) == TRUE) { pOverlapBuff->SetOperation(IOWrite); ASend(pContext,pOverlapBuff); } } } else { } } } } 4.WSARecv的自动化处理机制

  应该先投递WSARecv(0),如果返回,表示有数据进入(incoming),投递真正的WSARecv进行读取。这样才发挥了零字节读取的通知作用,同时又不浪费分配CIOCPBuffer的缓冲区(m_Buffer[MAXIMUMPACKAGESIZE])

  voidIOCPS::OnInitialize(ClientContext* pContext, DWORDdwIoSize, CIOCPBuffer* pOverlapBuff)

  {

  // Do some init here..

  // Notify new connection.

  pContext->m_ContextLock.On();

  NotifyNewConnection(pContext);

  pContext->m_ContextLock.Off();

  // A ZeroByteLoop. EnterIOLoop is not needed here. Already done in previous call.

  // AZeroByteRead(pContext, pOverlapBuff);

  // m_iNumberOfPendlingReads=1 by default.

  for (inti=0; i

  {

  // EnterIOLoop(pContext); // One for each Read Loop

  // ARead(pContext);

  AZeroByteRead(pContext, pOverlapBuff);

  }

  }

  当ZeroByteReadCompleted完成的时候,投递真正的ARead。

  voidIOCPS::OnZeroByteReadCompleted(ClientContext* pContext, DWORDdwIoSize, CIOCPBuffer* pOverlapBuff)

  {

  if (pContext)

  {

  // Make a Loop.

  // AZeroByteRead(pContext, pOverlapBuff);

  ARead(pContext, pOverlapBuff);

  }

  }

  当ReadCompleted完成的时候,再投递AZeroByteRead(pContext, pOverlapBuff);

  voidIOCPS::OnReadCompleted(ClientContext* pContext, DWORDdwIoSize, CIOCPBuffer* pOverlapBuff)

  {

  //

  AZeroByteRead(pContext, pOverlapBuff); // ARead(pContext);

  }

  5.EnterIOLoop()/ExitIOLoop()的对称性计数问题

  原来在ReleaseClientContext才调用ExitIOLoop(),应该将I/O挂起计数独立出来,只与I/O过程相关。

  修改为投递I/O时EnterIOLoop(),I/O完成时ExitIOLoop()。

  (1)需要修改ReleaseContext逻辑。

  (2)每一步投入(Post)时EnterIOLoop(),完成时ExitIOLoop(),这样跟踪整个流程状态,而不仅仅是WSASend/WSARecv。

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | read |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  <1>IOInitialize

  AssociateIncomingClientWithContext()

  EnterIOLoop(pContext, IOInitialize);

  ProcessIOMessage()

  ExitIOLoop(pContext, IOInitialize);

  OnInitialize();

  <2>IOZeroByteRead

  OnInitialize()->AZeroByteRead()

  EnterIOLoop(pContext, IOZeroByteRead);

  ProcessIOMessage()

  ExitIOLoop(pContext, IOZeroByteRead);

  OnZeroByteRead(); <3>IOZeroReadCompleted OnZeroByteRead() EnterIOLoop(pContext, IOZeroReadCompleted); WSARecv(0); ProcessIOMessage() ExitIOLoop(pContext, IOZeroReadCompleted); OnZeroByteReadCompleted(); <4>IORead OnZeroByteReadCompleted()->ARead() // AssociateIncomingClientWithContext()中分配的缓冲区沿用至今

  EnterIOLoop(pContext, IORead);

  ProcessIOMessage()

  ExitIOLoop(pContext, IORead);

  OnRead();

  <5>IOReadCompleted

  OnRead()

  EnterIOLoop(pContext, IOReadCompleted);

  WSARecv(); // MakeOrderRead();

  ProcessIOMessage()

  ExitIOLoop(pContext,IOReadCompleted);

  OnReadCompleted(); // ProcessPackage()/AddToFile()->NotifyReceivedMessage ()

  AZeroByteRead(); // automate machine

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | write |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  <1>IOWrite

  ASend()

  EnterIOLoop(pContext, IOWrite);

  ProcessIOMessage()

  ExitIOLoop(pContext, IOWrite);

  OnWrite();

  <2>IOWriteCompleted

  OnWrite();

  EnterIOLoop(pContext, IOWriteCompleted);

  WSASend();

  ProcessIOMessage()

  ExitIOLoop(pContext, IOWriteCompleted);

  OnWriteCompleted();

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | iocp packet |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  <1>IOPostedPackage

  PostPackage()

  EnterIOLoop(pContext, IOPostedPackage);

  ProcessIOMessage()

  ExitIOLoop(pContext, IOPostedPackage);

  OnPostedPackage(); // NotifyReceivedPackage()

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  | file transmission |

  ++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++

  <1>IOWrite & Job_SendFileInfo

  PrepareSendFile()

  pOverlapBuff->CreatePackage(Job_SendFileInfo,iFileSize,sFileName)

  EnterIOLoop(pContext, IOWrite); // 后续同上

  <2>IOWrite & IOTransmitFileCompleted

  StartSendFile()

  EnterIOLoop(pContext, IOTransmitFileCompleted);

  TransmitFile();

  ProcessIOMessage()

  ExitIOLoop(pContext, IOTransmitFileCompleted);

  OnTransmitFileCompleted(pContext,pOverlapBuff);

  6.加解锁条件判断截断问题

  在Lock.On()和Lock.Off()过程中遇条件截断提前return,而没有解锁。 BOOLMyIOCP::BuildFilePackageAndSend(intClientID, CStringsFile) { BOOLbRet = FALSE; m_ContextMapLock.On(); ClientContext* pContext = FindClient(ClientID); if (!pContext) { m_ContextMapLock.Off(); // 退出解锁

  returnFALSE;

  }

  bRet = BuildFilePackageAndSend(pContext, sFile);

  m_ContextMapLock.Off();

  returnbRet;

  }

  其他的地方:

  SetWorkers()

  m_WorkerThreadMapLock.On();

  returnFALSE; // 在return之前应该解锁

  m_WorkerThreadMapLock.Off();

  五.代码下载

  《A Simple IOCP Server/Client Class》

图文资讯
广告赞助商