Announcement

Collapse
No announcement yet.

Probelm with QueueCollection?

Collapse
X
 
  • Filter
  • Time
  • Show
Clear All
new posts

  • Ganeshan Kalmane
    replied
    Yes. I too figured that this might be the case and wrote and used the following functions.
    But problem persisted.


    Code:
    '-----------------------------------------------------------------------------------------
    'FUNCTION DataPacket1QueueCOunt () THREADSAFE AS LONG
    'Returns count of elements in Queue
    '-----------------------------------------------------------------------------------------
    FUNCTION DataPacket1QueueCOunt () THREADSAFE AS LONG
    
    LOCAL L1 AS LONG
    EnterCriticalSection g_CSDatapacket
    L1=DataPacket1Queue.Count
    LeaveCriticalSection g_CSDatapacket
    FUNCTION=L1
    END FUNCTION
    
    '-------------------------------------------------------------------------------------------------------------------
    'FUNCTION Datapacket1ReadWrite(sReadWrite AS STRING) THREADSAFE AS LONG
    'Call with "R" or "W"
    'Takes/returns queuelement in global variable
    'Returns 1 if success, 0 if fail
    '-------------------------------------------------------------------------------------------------------------------
    FUNCTION Datapacket1ReadWrite(sReadWrite AS STRING) THREADSAFE AS LONG
    
    LOCAL v1 ,v2 AS VARIANT
    LOCAL s1 AS STRING
    LOCAL L1 AS LONG
    v1=0
    FUNCTION=0
    s1=UCASE$(sReadWrite)
    
    EnterCriticalSection g_CSDatapacket
    
    SELECT CASE s1
          CASE "W": 'Write
                      LET v1=PacketToQue AS STRING
                      DataPacket1Queue.Enqueue(v1)
                      FUNCTION=1
                      EXIT SELECT
    
          CASE "R": 'read
                      L1=DataPacket1QueueCount
                      IF L1<1 THEN
                          FUNCTION=0
                          EXIT SELECT
                      END IF
                      v1=Datapacket1Queue.Dequeue
                      IF OBJRESULT=%S_FALSE OR VARIANTVT(v1)=%VT_EMPTY THEN
                           FUNCTION=0
                           EXIT SELECT
                      END IF
                      PacketFromQue=VARIANT$(BYTE,v1)
                      FUNCTION=1
                      EXIT SELECT
     END SELECT
    
    LeaveCriticalSection g_CSDatapacket
    
    END FUNCTION

    Leave a comment:


  • Michael Mattias
    replied
    You may have collisions in the Queue between queuing and dequeueing. I had asked PB about this a number of years ago and they told me if I wanted it thread-safe I would have to code it myself. So I came up with ...

    Code:
    ...
    
    
    #IF %DEF(%UNICODE)
           MACRO zSTR = WSTRINGZ
            MACRO bSTR = WSTRING
    #ELSE
           MACRO zSTR = ASCIIZ
           MACRO bSTR = STRING
    #ENDIF
    ...
    ...
    ' -----------------------------------
    ' QUEUE RELATED ITEMS
    ' -----------------------------------
    ' queue access
    
    ENUM QueueAction
        CREATE
        CLEAR
         enqueue
        dequeue
        COUNT
        DESTROY
    END ENUM
    .....
    ' ===================
    ' Access the Queue
    ' --------------------
    '  Written as separate function to ensure it is threadsafe. AS we are enqueueing and dequeueing from
    '  multiple threads of execution it would be really easy to get "crossed up"  if we did not use
    '  sme kind of syncronization
    ' ACTION CODE RETURNS
    ' on DEQUUE, returns the String representation of the Queue Data
    ' on ENQUEU, accepts QD AS THe Data to be queued
    ' other actions do not use the second param
    ' String return:
    '   DEQUEUE ==> QUEUEDATATYPE as a character string (may be TYPE SET or LSET directory into a UDT)
    '   ENQUEUE ==> ???
    '   CLEAR   ==> ????
    '   COUNT  ==> FORMAT$(count) (use VAL() to get number)
    '   DESTROY ==> empties and then destroys returns "0"
    
    
    ' 6/15/21 I could keep a count as I enquee and dequue  but method count is always available.
    ' 6/18/21 I could just make this THREADSAFE (even though I do not ike how it's done)
    
    
    FUNCTION Queue_Action (ActionCode AS LONG, OPT QD AS QUEUEDATATYPE ) AS BSTR
    
    STATIC oQ  AS IQUEUECOLLECTION
    STATIC CS   AS CRITICAL_SECTION, bCS  AS LONG
    LOCAL vUDT AS VARIANT,s AS BSTR, n AS LONG
    
    ' if first call to this function, set up the CRITCAL_SECTION
    
     ON ERROR GOTO ERRORTRAPPER_QUEUE_ACTION
    
       IF ISFALSE bCS THEN
           InitializeCriticalSection CS
           bCS = %TRUE
       END IF
    
    
      IF ActionCode = %QUeueAction.Destroy THEN
          oq = NOTHING
          'DeleteCriticalSection CS  NOPE CAUSES PROBLEM IF CS is currently owned. Accept overhead.
          'bCS = %FALSE
      ELSE
          EnterCriticalSection CS
    
          SELECT CASE AS LONG ActionCode
             CASE %QueueAction.Create
                   oq = CLASS "QueueCollection"
             CASE %QueueAction.CLEAR
                 oq.Clear
             CASE %QueueAction.ENQUEUE
                 oq.Enqueue QD AS STRING
             CASE %QueueAction.Dequeue
                 vUDt = OQ.Dequeue
                 S    = VARIANT$(BYTE,vUDT)
                 FUNCTION = S
             CASE %QUEUEACTION.Count
                    n = OQ.Count
                    FUNCTION = FORMAT$(n)
             CASE %QUEUEACTION.Destroy
                 OQ.Clear                ' empty it out
                 oQ = NOTHING            ' release object
    
          END SELECT
          LeaveCriticalSection CS
      END IF
    
      'function return assigned if used above
    
      EXIT FUNCTION
    
    ERRORTRAPPER_QUEUE_ACTION:
      MSGBOX USING$("ActionCode # ErrNo # &", ActionCode, ERR, ERROR$(ERRCLEAR)),,"Queue_Action_error"
      RESUME  NEXT
    
    
    END FUNCTION  ' Queue_Action
    .. which is called from multiple threads of execution. By forcing all queue actions to use the same function, I can control the queue access with a single CRITICAL_SECTION object and guarantee myself no collisions... and to date, no data corruption.

    Leave a comment:


  • Ganeshan Kalmane
    started a topic Probelm with QueueCollection?

    Probelm with QueueCollection?

    I am facing a peculiar problem with QueueCollection.

    PowerBasic Ver 10.04 running on i7/8GB RAM Windows 10

    My application is a simulator that provides a TCP server to which around 1500+ TCP clients connect , send packets and receive responses and disconnect, at different times.
    There are multiple threads handling the data.

    One receiving thread corresponds to one activeTCP connection.
    The receiving thread collects the incoming packet into a Datapacket variable and puts this data packet, along with Socket Handle , into a QUeue COllection

    A separate processing thread takes out this data from the collection, generates a reply packet and sends it off to the corresponding socket
    The program is very large, with lots of UI related code.
    Packets arrive at a a very high rate - may be one packet in every 10 to 50 mS
    Everything is working, but once in a while , say once in 100 to 200 packets ( randomly) , the data that is taken out of QueueCOllection is corrupted
    Took lot of time to figure out that data corruption was happenning and after discarding corrupted packets , system started working fine and program response is also is good.

    I have spent lots of time trying to figure out the reason for this data corruption, by selectively disabling parts of the code, but could not find the reason.
    As a Final problem bypass, I replaced Queuecollection with StackCollection.

    With no other code change whatsoever, Data corruption stopped !!!

    If I put back Queuecollection in place of StackCollection, Corruption reappears.

    So, for the time being, problem is solved and project is complete and I am not going to spend any more time on this,as the time spent has far exceeded the budget.

    But , I am curious. Is there a bug in QueueCollection?

    Wrote another program with two threads , one writing into Queuecollection and other reading from Queuecollection. No data corruption observed.


    For information , Sequence of code shown below.
    Only code snippets that are relevant to this problem given below:

    Code:
    .
    .
    .
    GLOBAL DataPacket1Queue AS IQUEUECOLLECTION
    GLOBAL g_CSDatapacket           AS CRITICAL_SECTION
    
    GLOBAL DataPacket1Stack AS ISTACKCOLLECTION
    
    TYPE typInComingDataPacket
        lPacketSize AS LONG
        DataBytes(%DATAPACKET_DATABYTES_SIZE1+10) AS BYTE
    END TYPE
    
    .
    .
    [B]In Main Thread:[/B]
                
                LET DataPacket1Queue= CLASS "QueueCollection"
                DataPacket1Queue.Clear
                LET DataPacket1Stack= CLASS "StackCollection"
                DataPacket1Stack.Clear
                InitializeCriticalSection g_CSDatapacket
    ...
    
    
                    
    .
    [B]In Receiving Thread - Data packet is put in the Data collection[/B]
    
                    LOCAL IncomingPacket AS typIncomingdataPacket
                    LOCAL s1 AS STRING
    .
    .
    .
                    EnterCriticalSection g_CSDatapacket
                    LOCAL vPacket AS VARIANT
                    IF IncomingPacket.lPacketSize<>12 THEN
                        IncomingPacket.lPacketSize=12
                        s1="Packet size reset:"+TRIM$(szPeerAddress)
                        UserMessageMON(s1 )                        'send the message to MOnitor Process
                    END IF
                    LET vPacket= IncomingPacket AS STRING
                    DataPacket1Queue.Enqueue(vPacket)
                    'DataPacket1Stack.Push(vPacket)                ' This is for using Stackcollection in place of QueueCollection
                    LeaveCriticalSection g_CSDatapacket
    .
    .
    .                
    .
    
    
    [B]In Processing Thread, Data is taken out from the collection[/B]
            LOCAL v1 AS VARIANT
            LOCAL IncomingDataNew as typIncomingdataPacket    
            LOCAL L1, lBytesRead AS LONG
            
     .
     .
     .
     
            EnterCriticalSection g_CSDatapacket
            v1=DataPacket1Queue.Dequeue
            'v1=DataPacket1Stack.Pop                            ' This is for using Stackcollection in place of QueueCollection
            L1=OBJRESULT
            LeaveCriticalSection g_CSDatapacket
    
            IF L1=%S_FALSE OR VARIANTVT(v1)=%VT_EMPTY THEN      ' error encountered while reading data from Que or there isno data in que
                lYield=1
                ITERATE DO                                      ' error. Ignore this and go back to beginning of the loop
            ELSE
            END IF
    
            IF VARIANTVT(v1)<>%VT_EMPTY THEN
                IncomingDataNew=VARIANT$(BYTE, v1)               ' get teh bytes back
                lBytesRead=IncomingDataNew.lPacketSize
            END IF
                    
    .
    .
    .
Working...
X