Click here to Skip to main content
12,622,568 members (31,466 online)
Click here to Skip to main content
Add your own
alternative version

Stats

218.5K views
6.3K downloads
114 bookmarked
Posted

IP Multicasting - Using I/O completion ports with UDP

, 24 Feb 2002
Rate this:
Please Sign up or sign in to vote.
How to use I/O completion ports with UDP? ( With an IP multicast example).

Introduction

I have been writing server applications on Windows server platforms for quite sometime. We all know that I/O completion port is one of the nicest things available for writing server applications (or even in clients where you can apply Worker-Boss pattern). There are tons of articles and samples to show how I/O completion port can be used with TCP/IP. Surprisingly enough, I did not find any source code examples to demonstrate the use of I/O completion ports with UDP.

To spice up things a little, I took the multicast topic to demonstrate the code. Most of the multicast code is borrowed from Mr. Bob Quinn's article.

This sample consists of two projects (Client and Server). In order to test this application as it is, routers in your network should be enabled for multicast routing. If not, modify samples appropriately to act just UDP server and client.

Since there is so much written about I/O completion port and multicast, I won't go into all the details in this article.

Server application design and implementation is an art. You can put so many bells and whistles. Typical server application should have the following:

  1. Object Pooling

    If any of the object is to be created and destroyed often, consider making a pool of such objects. Typical example would be your sockets, user context objects, database connections.

  2. Ease of configuration

    Do not hardcode configuration information in the code. Try to use XML configuration files. Let the application read these vales during the startup. Typical examples would be, IP address of other servers, Oracle TNS names, user ids and passwords (!!!), number of worker threads, email addresses etc.

    Besides, there should be some mechanism to change these parameters while the server is running.

  3. Monitoring and logging

    Since most of these servers will be running somewhere in the data center, it is nice to have a small component sitting in your server application which will send log messages to central server from where you multicast those messages to other client applications. Make sure you have very good filtering mechanism so that you can control the number of messages being logged during the run time.

  4. Error handling

    Your servers have to be running nonstop (four nines most of the time). That means you should have proper error handling mechanism. Make use of SEH in a responsible manner. Besides keep track of number of TCP connections (using SNMP) and CPU usage, memory usage (Microsoft provides a library by which you can query all these parameters from within your program).

  5. Thread Synchronization

    Make sure you thread protects the shared data.

  6. Load balancing.

    You can not serve infinite number of clients from one server box. Always keep the 'scale-out' factor in mind. Do proper capacity planning to quantify your server hardware requirement. 80-20 rule is quite effective.

Source Code

//NOTE
// 
//This code taken from Mr. Bob Quinn's article 
//titled 'Internet Multicasting'
//published in Dr. Dobb's Journal dated Oct 1997

//I have modified the original code to illustrate 
//the use I/O completion ports with UDP.

//If you have any comments email me : shapall@hotmail.com

#include "StdAfx.h"
#include <winsock2.h>
#include <ws2tcpip.h>
#include "Stdio.h"

#define BUFSIZE 1024 //max size of incoming data buffer
#define MAXADDRSTR 16

#define DEFAULT_GROUP_ADDRESS "239.254.1.2"
#define DEFAULT_PORT 7125 

LONG nCount = 0;
HANDLE g_hCompletionPort;
DWORD WINAPI WorkerThread( LPVOID WorkContext );

BOOL HandleIncomingData( UCHAR* pBuf);
BOOL CreateNetConnections( VOID );
BOOL CreateWorkers( UINT );
void InitWinsock2();
void UnInitWinsock2();

HANDLE g_hReadEvent;
SOCKET g_hSocket;
UCHAR achInBuf [BUFSIZE];
char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS;
u_short nPort = DEFAULT_PORT;


OVERLAPPED Overlapped;

//-----------------------------------------------------------------
void InitWinsock2()
{
    WSADATA data;
    WORD version; 
    int ret = 0;  

    version = (MAKEWORD(2, 2)); 
    ret = WSAStartup(version, &data); 
    if (ret != 0) 
    {  
        ret = WSAGetLastError(); 
        
        if (ret == WSANOTINITIALISED) 
        {  
            printf("not initialised"); 
        }
    }
}

//-----------------------------------------------------------------
void UnInitWinsock2()
{ 
    WSACleanup();
}

//-----------------------------------------------------------------
BOOL CreateNetConnections (void)
{ 
    DWORD nbytes; 
    BOOL b; 
    BOOL fFlag = TRUE; 
    int nRet=0; 
    
    SOCKADDR_IN stLclAddr;  
    struct ip_mreq stMreq; // Multicast interface structure  

    // Get a datagram socket  
    g_hSocket = socket(AF_INET, SOCK_DGRAM,0); 
    
    if (g_hSocket == INVALID_SOCKET)  
    { 
        printf ("socket() failed, Err: %d\n", WSAGetLastError()); 
        return FALSE;  
    }  

    nRet = setsockopt(g_hSocket,SOL_SOCKET,
               SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag));  
    if (nRet == SOCKET_ERROR)  
    { 
        printf ("setsockopt() SO_REUSEADDR failed, 
                          Err: %d\n",WSAGetLastError()); 
    } 

    // Name the socket (assign the local port number to receive on)  
    stLclAddr.sin_family = AF_INET; 
    stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); 
    stLclAddr.sin_port = htons(nPort); 

    nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr)); 
    if (nRet == SOCKET_ERROR)  
    { 
        printf ("bind() port: %d failed, Err: %d\n", 
                                nPort,WSAGetLastError()); 
    } 
    // Join the multicast group so we can receive from it  
    stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr); 
    stMreq.imr_interface.s_addr = INADDR_ANY; 
    nRet = setsockopt(g_hSocket,IPPROTO_IP, 
               IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq)); 

    if (nRet == SOCKET_ERROR)  
    { 
        printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed, 
                             Err: %d\n",achMCAddr,
                             WSAGetLastError()); 
    }

    //
    //note the 10 says how many concurrent cpu bound threads to allow thru 
    //this should be tunable based on the requests. CPU bound requests will 
    // really really honor this. 
    // 

    g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE,
                                                              NULL,0,3); 
    if (!g_hCompletionPort) 
    { 
        fprintf (stdout, "g_hCompletionPort Create Failed\n"); 
        return FALSE; 
    } 
    //Associate this socket to this I/O completion port 
    CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort,
                                             (DWORD)g_hSocket,3);  

    //
    // Start off an asynchronous read on the socket.  
    //  
    Overlapped.hEvent = g_hReadEvent;  
    Overlapped.Internal = 0;  
    Overlapped.InternalHigh = 0;  
    Overlapped.Offset = 0;  
    Overlapped.OffsetHigh = 0;  
    b = ReadFile ((HANDLE)g_hSocket,&achInBuf,
               sizeof(achInBuf),&nbytes,&Overlapped);  
    
    if (!b && GetLastError () != ERROR_IO_PENDING)  
    {  
        fprintf (stdout, "ReadFile Failed\n");  
        return FALSE;  
    }  
    
    return TRUE;
}
//-----------------------------------------------------------------

BOOL CreateWorkers (UINT dwNumberOfWorkers)
{ 
    DWORD ThreadId; 
    HANDLE ThreadHandle; 
    DWORD i; 

    for (i = 0; i < dwNumberOfWorkers; i++) 
    { 
        ThreadHandle = CreateThread (NULL,0,
                  WorkerThread,NULL,0,&ThreadId); 
        if (!ThreadHandle) 
        { 
            fprintf (stdout, "Create Worker Thread Failed\n"); 
            return FALSE;
        } 
               
        CloseHandle (ThreadHandle); 
    } 
    return TRUE;
}

//-----------------------------------------------------------------
DWORD WINAPI WorkerThread (LPVOID WorkContext)
{ 
    DWORD nSocket; 
    BOOL b; 
    OVERLAPPED ovl; 
    LPOVERLAPPED lpo=&ovl; 
    DWORD nBytesRead=0; 
    DWORD nBytesToBeRead; 
    UCHAR ReadBuffer[BUFSIZE]; 
    LPVOID lpMsgBuf; 

    memset(&ReadBuffer,0,BUFSIZE); 
    for (;;) 
    { 
        b = GetQueuedCompletionStatus(g_hCompletionPort,
                    &nBytesToBeRead,&nSocket,&lpo,INFINITE); 
        if (b || lpo) 
        { 
            if (b) 
            { 
                // 
                // Determine how long a response was desired by the client. 
                // 
                
                OVERLAPPED ol; 
                ol.hEvent = g_hReadEvent; 
                ol.Offset = 0; 
                ol.OffsetHigh = 0; 

                b = ReadFile ((HANDLE)nSocket,&ReadBuffer,
                                nBytesToBeRead,&nBytesRead,&ol); 
                if (!b )  
                { 
                    DWORD dwErrCode = GetLastError(); 
                    if( dwErrCode != ERROR_IO_PENDING ) 
                    { 
                        // something has gone wrong here... 
                        printf("Something has gone 
                               wrong:Error code - %d\n",dwErrCode ); 

                        FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | 
                               FORMAT_MESSAGE_FROM_SYSTEM | 
                               FORMAT_MESSAGE_IGNORE_INSERTS, 
                               NULL, dwErrCode, 
                               MAKELANGID(LANG_NEUTRAL, 
                                    SUBLANG_DEFAULT),// Default language
                               (LPTSTR) &lpMsgBuf, 0, NULL); 

                        OutputDebugString((LPCTSTR)lpMsgBuf); 
                        //Free the buffer. 

                        LocalFree(lpMsgBuf ); 
                    } 
                    else if( dwErrCode == ERROR_IO_PENDING ) 
                    {
                        // I had to do this for my UDP sample 
                        //Never did for my TCP servers 
                        WaitForSingleObject(ol.hEvent,INFINITE); 

                        HandleIncomingData(ReadBuffer); 
                    } 
                } 
                else 
                { 
                    HandleIncomingData(ReadBuffer); 
                } 
                continue; 
            } 
            else 
            { 
                fprintf (stdout, "WorkThread Wait Failed\n"); 
                //exit (1); 
            } 
        } 
        return 1; 
    } 
} 
//-----------------------------------------------------------------
BOOL HandleIncomingData( UCHAR* pBuf)
{ 
    InterlockedIncrement(&nCount); 
    SYSTEMTIME *lpstSysTime; 

    lpstSysTime = (SYSTEMTIME *)(pBuf); 
    printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d \n",nCount, 
                lpstSysTime->wHour, lpstSysTime->wMinute, 
                lpstSysTime->wSecond, lpstSysTime->wMilliseconds, 
                lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear); 
                memset(&pBuf,0,BUFSIZE); 
                //just making sure that i am not showing stale data 

    return TRUE; 
} 
//-----------------------------------------------------------------
main () 
{ 
    //You can modify your program to take some arguments for port number 
    //and multicast group address here 
    
    printf("\n***************************************\n"); 
    printf("Group IP address: %s\n",achMCAddr); 
    printf("Port number : %d\n",nPort); 
    printf("\n***************************************\n"); 

    //Initialize winsock 2 
    InitWinsock2(); 

    //We want to keep the main thread running 
    HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT"); 
    ResetEvent(hWait2Exit ); 

    //This OVERLAPPED event 
    g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL); 

    // 
    // try to get timing more accurate... Avoid context 
    // switch that could occur when threads are released 
    // 

    SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL); 
    if (!CreateNetConnections ()) 
    { 
        printf("Error condition @ CreateNetConnections , exiting\n"); 
        return 1; 
    } 

    if (!CreateWorkers (5)) 
    { 
        printf("Error condition @CreateWorkers, exiting\n"); 
        return 1; 
    } 
    
    WaitForSingleObject(hWait2Exit,INFINITE); 
    UnInitWinsock2(); 
    return 1; 
}

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here

Share

About the Author

shanthu
Web Developer
United States United States
BS in Electrical engineering
MS in Computer science
Working since 12 Years.
Server side architecture/design/development is my favorite work.
-------------------------------------------
All ideas are worthless unless they are implemented.

You may also be interested in...

Comments and Discussions

 
Generala cuestion about WorkContext... Pin
alejandro29A15-Sep-08 5:02
memberalejandro29A15-Sep-08 5:02 
GeneralServer is not IOCP... Pin
Tim110719923-May-08 6:42
memberTim110719923-May-08 6:42 
QuestionTried to run it... Pin
sarahcos15-Apr-08 23:10
membersarahcos15-Apr-08 23:10 
GeneralThe first packet is lost!!! Pin
Zuzzi198229-Nov-07 7:37
memberZuzzi198229-Nov-07 7:37 
GeneralRe: The first packet is lost!!! Pin
chf26-Apr-09 8:00
memberchf26-Apr-09 8:00 
GeneralRe: The first packet is lost!!! Pin
Member 1168042118-Jun-15 21:24
memberMember 1168042118-Jun-15 21:24 
Generalif send packet size larger,such as 1200, it will crash,why Pin
liuliu16-Mar-07 0:16
memberliuliu16-Mar-07 0:16 
Generalthis is just a sync recv, but not iocp Pin
Jozu17-Dec-06 22:17
memberJozu17-Dec-06 22:17 
GeneralClient Application Pin
AYcoder2-Aug-06 9:15
memberAYcoder2-Aug-06 9:15 
GeneralMore than 1 Network Adaptor Pin
Tat Yeong27-Jul-05 7:46
memberTat Yeong27-Jul-05 7:46 
GeneralRe: More than 1 Network Adaptor Pin
basementman25-Jun-15 10:55
memberbasementman25-Jun-15 10:55 
General234 error Pin
tanyuheng@etang.com24-Nov-04 19:13
membertanyuheng@etang.com24-Nov-04 19:13 
GeneralThats what happens when you mix sync and aync calls Pin
lazybug_in5-Nov-04 15:31
memberlazybug_in5-Nov-04 15:31 
GeneralRe: Thats what happens when you mix sync and aync calls Pin
sonny-bob7-May-05 23:42
membersonny-bob7-May-05 23:42 
GeneralProblem with this is... Pin
rakkar27-Dec-03 15:39
memberrakkar27-Dec-03 15:39 
GeneralRe: Problem with this is... Pin
ncpga30-Mar-04 13:42
memberncpga30-Mar-04 13:42 
GeneralRe: Problem with this is... Pin
AalNode19-Oct-04 7:14
memberAalNode19-Oct-04 7:14 
GeneralRe: Problem with this is... Pin
youyo3-Jan-05 14:38
memberyouyo3-Jan-05 14:38 
GeneralClient IP adresses Pin
Anonymous27-Oct-03 15:47
sussAnonymous27-Oct-03 15:47 
GeneralRe: Client IP adresses Pin
MushroomSamba15-May-09 23:51
memberMushroomSamba15-May-09 23:51 
GeneralRe: Client IP adresses Pin
tonyshanghai30-Oct-09 23:36
membertonyshanghai30-Oct-09 23:36 
GeneralRe: Client IP adresses Pin
MushroomSamba30-Oct-09 23:42
memberMushroomSamba30-Oct-09 23:42 
Generalload balancing help me! Pin
dharani25-Sep-03 21:31
memberdharani25-Sep-03 21:31 
GeneralRe: load balancing help me! Pin
Magicrom29-Jan-10 23:21
memberMagicrom29-Jan-10 23:21 
QuestionWhy first package always lost? Pin
Allen Chen27-Jul-03 15:20
memberAllen Chen27-Jul-03 15:20 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web02 | 2.8.161128.1 | Last Updated 25 Feb 2002
Article Copyright 2002 by shanthu
Everything else Copyright © CodeProject, 1999-2016
Layout: fixed | fluid