Click here to Skip to main content
15,885,365 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hello,
I have a listener thread that listens to UDP transactions and en-queue them into _TransactionQ, I also have another thread that de-queue the transactions from _TransactionQ, it will do a lot of processing to confirm the correct order of the transactions, it also has a logic to detect missing transactions based on a sequence number and to request a reload from the server, this thread will en-queue the confirmed transactions in _ConfirmedMessageQueue.
I use SyncLock to Synchronize access to the three Queues.
I have a problem with high CPU utilization (70%-80%)!!!!

Imports System.Net
Imports System.Net.Sockets
Imports System.Threading

Public Class UDPListener

#Region "Variables"
    Private _port As Integer
    Private _listenThread As Thread 'Queue thread
    Private _dequeueThread As Thread 'DeQueue thread
    Private _ListenThreadIsAlive As Boolean
    Private _DequeueThreadIsAlive As Boolean
    Private _receivedData As String
    Private _udpClient As UdpClient
    Private _mySequenceNumber As Integer
    Private _address As String
    Private _ConfirmedMessageQueue As Queue
    Private _TransactionQ As Queue
#End Region

#Region "Properties"
    Public Property ListenThreadIsAlive() As Boolean
        Get
            Return _ListenThreadIsAlive
        End Get
        Set(ByVal value As Boolean)
            _ListenThreadIsAlive = value
        End Set
    End Property

    Public Property DequeueThreadIsAlive() As Boolean
        Get
            Return _DequeueThreadIsAlive
        End Get
        Set(ByVal value As Boolean)
            _DequeueThreadIsAlive = value
        End Set
    End Property

    Public ReadOnly Property udpClient() As UdpClient
        Get
            Return _udpClient
        End Get
    End Property

    Public Property mySequenceNumber() As String
        Get
            Return Interlocked.Read(_mySequenceNumber)
        End Get
        Set(ByVal value As String)
            Interlocked.Exchange(_mySequenceNumber, value)
        End Set
    End Property

    Public Property ConfirmedMessageQueue() As Queue
        Get
            Return Queue.Synchronized(_ConfirmedMessageQueue)
        End Get
        Set(ByVal value As Queue)
            _ConfirmedMessageQueue = value
        End Set
    End Property

    Public Property TransactionQ() As Queue
        Get
            Return Queue.Synchronized(_TransactionQ)
        End Get
        Set(ByVal value As Queue)
            _TransactionQ = value
        End Set
    End Property
#End Region

#Region "Events"
    Public Event dataArrived()
#End Region

    'Constructor
    Sub New(ByVal address As String, ByVal port As Integer, ByVal ExpectedSequenceNumber As Integer)
        Me._port = port
        Me._address = address
        Me.mySequenceNumber = ExpectedSequenceNumber
        Me.initialize()
    End Sub

    Private Sub initialize()
        Try
            ConfirmedMessageQueue = New Queue
            TransactionQ = New Queue
            _listenThread = New Thread(AddressOf listen)
            _dequeueThread = New Thread(AddressOf dequeue)
            _udpClient = New UdpClient(Sockets.AddressFamily.InterNetwork)
            Dim localEP As EndPoint = CType(New IPEndPoint(IPAddress.Any, _port), EndPoint)
            Dim mcastopt As MulticastOption
            mcastopt = New MulticastOption(IPAddress.Parse(_address), IPAddress.Loopback)
            _udpClient.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1)
            _udpClient.Client.Bind(localEP)
            _listenThread.Name = "udpListenThread"
            _dequeueThread.Name = "udpDequeueThread"
            _ListenThreadIsAlive = True
            _listenThread.Start()
            _DequeueThreadIsAlive = True
            _dequeueThread.Start()
        Catch ex As Exception
            ' Write to log
        End Try
    End Sub

    Private Sub listen()
        _udpClient.JoinMulticastGroup(IPAddress.Parse(Me._address))
        Dim RemoteIpEndPoint As New IPEndPoint(IPAddress.Parse(Me._address), Me._port)
        Dim receiveBytes As [Byte]() = Nothing
        Dim TransactionData As String = ""
        Dim TransactionSeqNum As String = ""
        While _ListenThreadIsAlive

            receiveBytes = _udpClient.Receive(RemoteIpEndPoint)
            If receiveBytes.Count > 0 Then

                TransactionData = Encoding.ASCII.GetString(receiveBytes)
                TransactionSeqNum = TransactionData.Substring(0, TransactionData.IndexOf(Chr(3))).Substring(1)
                TransactionData = TransactionData.Substring(TransactionData.IndexOf(Chr(3)))
                Dim pair As New DictionaryEntry(TransactionSeqNum, TransactionData)
                SyncLock TransactionQ.SyncRoot
                    TransactionQ.Enqueue(pair)
                End SyncLock
            End If
        End While
        Try
            SyncLock _listenThread
                _listenThread.Abort()
            End SyncLock
        Catch ex As Threading.ThreadAbortException
            'Ignore ThreadAbortException
        End Try
    End Sub

    Private Sub dequeue()
        While _DequeueThreadIsAlive
            Try
                Dim TransactionSeqNum As Integer
                SyncLock TransactionQ.SyncRoot
                    Do While TransactionQ.Count > 0
                        Dim trans As DictionaryEntry
                        trans = TransactionQ.Dequeue
                        '
                        '                
                        ' LONG PROCESSING
                        '
                        '
                        SyncLock ConfirmedMessageQueue.SyncRoot
                            ConfirmedMessageQueue.Enqueue(_receivedData)
                            incrementMySequenceNumber()
                        End SyncLock 'ConfirmedMessageQueue
                        RaiseEvent dataArrived()
                    Loop
                End SyncLock 'TransactionQ
            Catch ex As Exception
                ' Write to log
            End Try
        End While
        Try
            SyncLock _dequeueThread
                _dequeueThread.Abort()
            End SyncLock
        Catch ex As Threading.ThreadAbortException
            'Ignore ThreadAbortException
        End Try
    End Sub

    'Thread safe increment for mySequenceNumber
    Private Sub incrementMySequenceNumber()
        Interlocked.Increment(_mySequenceNumber)
    End Sub
    
    ' close connection, abort thread
    Public Sub stopListening()
        If Not IsNothing(_udpClient) Then
            If _listenThread.IsAlive Then
                _ListenThreadIsAlive = False
            End If
            If _dequeueThread.IsAlive Then
                _DequeueThreadIsAlive = False
            End If
            _udpClient.Close()
            CType(_udpClient, IDisposable).Dispose()
        End If
    End Sub

    Protected Overrides Sub Finalize()
        MyBase.Finalize()
    End Sub
End Class
Posted

1 solution

Your thread tasks are running the same checks thousands of times a second, with no code in there at all that gives up control and waits.

Look at what your dequeue method is doing. It's going to continuously allocate TransactionSeqNum, grab a lock, execute another loop if there are TransactionQ records, release the lock, release the TransactionSeqNum, and go back to the beginning - about a million tims a second since there is no code in here that puts the thread to sleep to wait for work to show up.

You're doing a ton of polling which is why your code is hogging the CPU.

Also, what's the point of all this?? What I see is a very inefficient attempt at duplicating what MS Message Queue does. Had you used that you could have avoided writing ALL of this code and just gotten down to the business of handling transactions.
 
Share this answer
 
Comments
Sameer Alomari 5-Mar-12 13:50pm    
Hi Dave,
TransactionSeqNum is a local variable in the dequeue method, so I don't care if it's locked or not.
the point of this is that I am receiving UDP transactions from the server, the listener thread will queue the transactions in _TransactionQ, the DeQueue thread will pull the transactions, read the Sequence Number and make sure that we didn't miss any transaction, if we did, the dequeue thread will request the latest 100 transaction from the server by calling a Web Service Method, then it will find the missed ones and queue the transactions in _ConfirmedMessageQueue, after making sure that the transactions are queued in the correct order, the DeQueue thread will raise an Event that will be handled by the Forms.
This logic will make the UDP reliable, the listener thread will not waste time by finding if the transaction is in the correct order or not, so it will keep listening and will en-queue UDP traffic quickly, the long processing will be done by the De-Queue thread.
I believe the problem is because the dequeue thread is spinning even if there is nothing queued in _TransactionQ, am I correct? and how to fix it?
Dave Kreskowiak 5-Mar-12 14:32pm    
I told you the problem. You're polling CONSTANTLY about a million times a second and you don't need to do it that fast. You have nothing in your polling loops that will give up control of the CPU, so try putting your polling loop to sleep (Thread.Sleep) for a few seconds after every iteration.
Sameer Alomari 5-Mar-12 13:51pm    
Actually I don't know MS Message Queue!!! please give me an example!
Thanks
Dave Kreskowiak 5-Mar-12 14:30pm    
You can start with http://en.wikipedia.org/wiki/Microsoft_Message_Queuing
Sameer Alomari 5-Mar-12 15:59pm    
Thanks Dave, thread.sleep(1) fixed it :)

<pre lang="vb">
Private Sub dequeue()
While _DequeueThreadIsAlive
Try
Dim TransactionSeqNum As Integer
SyncLock TransactionQ.SyncRoot
Do While TransactionQ.Count > 0
Dim trans As DictionaryEntry
trans = TransactionQ.Dequeue
'
'
' LONG PROCESSING
'
'
SyncLock ConfirmedMessageQueue.SyncRoot
ConfirmedMessageQueue.Enqueue(_receivedData)
incrementMySequenceNumber()
End SyncLock 'ConfirmedMessageQueue
RaiseEvent dataArrived()
Loop
End SyncLock 'TransactionQ
Thread.Sleep(1)
Catch ex As Exception
' Write to log
End Try
End While
Try
SyncLock _dequeueThread
_dequeueThread.Abort()
End SyncLock
Catch ex As Threading.ThreadAbortException
'Ignore ThreadAbortException
End Try
End Sub
</pre>

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900