#ifndef __QUEUE_H__#define __QUEUE_H__#include#include #include /* * Queues can have more than one producer but only one consumer. * This means that more than one task or interrupt handler is allowed to store * new data in the queue but only one task is allowed toget data from the queue. * * Queues accept messages of various size. When putting a message into a queue, * the message size is passed as a parameter. * * Retrieving a message from the queue does not copy the message, but returns * a pointer to the message and its size. Thisenhances performance because the * data is copied only once, when the message is written into the queue. * * The retrieving function has to delete every message after processing it. * A new message can only be retrieved from the queue when the previous message * was deleted from the queue. * * |---------------------- size -------------------------| * | |------------ msgCnt ---------------| | * [ .... ] [ size : message ] [ size : message ] [ .... ] * | | | * |pData |offsetFirst |offsetLast * */typedef struct TAG_QUEUE{ uint8_t * Memory; uint32_t Capacity; uint32_t MessageCount; uint32_t ReadIndex; uint32_t WriteIndex; uint32_t IsUsing; uint32_t InProgressCount;} QUEUE;// Creates and initializes a message queue.QUEUE * Q_Create( uint32_t Capacity );// Deletes a specific queue.void Q_Delete( QUEUE * Queue );// Initializes a message queue.void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity );// Deletes the last retrieved message in a queue.void Q_Purge( QUEUE * Queue );// Deletes all message in a queue.void Q_Clear( QUEUE * Queue );// Returns the number of messages currently in a queueuint32_t Q_GetCount( QUEUE * Queue );// Returns the first message sizeuint32_t Q_GetSize( QUEUE * Queue );// Delivers information whether the queue is actually in use.// A queue must not be cleared or deleted when it is in use.uint32_t Q_IsUsing( QUEUE * Queue );// Stores a new message of given size in a queue.uint32_t Q_Wirte( QUEUE * Queue, void * Message, uint32_t Size );// Retrieves a message from a queueuint32_t Q_Read( QUEUE * Queue, void ** Message );#endif /* __QUEUE_H__ */
#include "queue.h"#include "cmsis_os.h"#include "macro_misc.h"// Creates and initializes a message Queue.QUEUE * Q_Create( uint32_t Capacity ){ uint32_t Size = ALIGN_UP( sizeof(QUEUE), 4 ) + ALIGN_UP( Capacity, 4 ); QUEUE *Queue = (QUEUE *) osMalloc( Size, osWaitForever ); if ( Queue == NULL ) return NULL; uint8_t * Memory = // (uint8_t *) ( ( (uint32_t) ( Queue ) ) + ALIGN_UP( sizeof(QUEUE), 4 ) ); Q_Init( Queue, Memory, ALIGN_UP( Capacity, 4 ) ); return Queue;}// Deletes a specific Queue.// A Queue must not be cleared or deleted when it is in use.void Q_Delete( QUEUE * Queue ){ if ( Queue->IsUsing == 0 ) osFree( Queue );}// Deletes all messages in a Queue.// A Queue must not be cleared or deleted when it is in use.void Q_Clear( QUEUE * Queue ){ if ( Queue->IsUsing == 0 ) Queue->MessageCount = 0;}// Initializes a message Queue.void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity ){ int32_t Delta = (uint32_t) Memory & 3; if ( Delta ) { Delta -= 4; Capacity += Delta; Memory -= Delta; } memset( Queue, 0, sizeof(QUEUE) ); Queue->Capacity = Capacity; Queue->Memory = Memory;}// Returns the number of messages currently in a Queueuint32_t Q_GetCount( QUEUE * Queue ){ return Queue->MessageCount - Queue->InProgressCount;}// Returns the first message sizeuint32_t Q_GetSize( QUEUE * Queue ){ uint32_t MessageSize = 0; if ( Queue->MessageCount ) MessageSize = *(uint32_t *) ( &Queue->Memory[ Queue->ReadIndex ] ); return MessageSize;}// Delivers information whether the Queue is actually in use.// A Queue must not be cleared or deleted when it is in use.uint32_t Q_IsUsing( QUEUE * Queue ){ return Queue->IsUsing;}// Stores a new message of given size in a Queue.// 0 : Queue could not be stored (Queue is full).// 1 : Success; message stored.uint32_t Q_Write( QUEUE * Queue, void * Message, uint32_t Size ){ uint32_t ReadIndexVal; uint32_t WriteIndexPending; uint32_t WriteIndexVal; uint32_t MessageSize = 4 + ALIGN_UP( Size, 4 ); int32_t * Memory = (int32_t *) Queue->Memory; uint32_t Value = osDisableInterrupt( ); if ( Queue->MessageCount == 0 ) { // read next message from head of memory Queue->ReadIndex = 0; // Queue could not be stored (memory is full). WriteIndexVal = -1; if ( Queue->Capacity >= MessageSize ) WriteIndexVal = 0; } else { Memory = (int32_t *) Queue->Memory; WriteIndexPending = Queue->WriteIndex; int32_t SizePending = Memory[ WriteIndexPending ]; if ( SizePending < 0 ) { // other task is writting ... but it is preemptived by our task // WriteIndexPending has been updated // [ Last Queue ] [ --- Other Queue --- ] [ Our Mesage ] // | WriteIndexPending SizePending = -SizePending; } else { // [ Last Queue ] [ Our Mesage ] // | WriteIndexPending } // where our task will write ... WriteIndexVal = WriteIndexPending + 4 + ALIGN_UP( SizePending, 4 ); ReadIndexVal = Queue->ReadIndex; if ( ReadIndexVal >= WriteIndexVal ) { // [ Our Mesage ] [ Last Queue ] // |<------------>|ReadIndexVal // |WriteIndexVal if ( ReadIndexVal - WriteIndexVal < MessageSize ) WriteIndexVal = -1; } else { // [ Our Mesage ] [ Available Space ] // |WriteIndexVal |Capacity // |<------------------------------>| uint32_t sizeAvailableTail = Queue->Capacity - WriteIndexVal; if ( sizeAvailableTail < MessageSize ) { // try to write to head of memory // [ Our Mesage ] [ Last Queue ] // |<------------>|ReadIndexVal // |0 if ( ReadIndexVal < MessageSize ) WriteIndexVal = -1; else if ( sizeAvailableTail > 4 ) { // can not read message from tail of memory // Marker for Q_Purge() Memory[ WriteIndexVal ] = 0; // write to head of memory WriteIndexVal = 0; } } } } // store message to memory if ( WriteIndexVal != -1 ) { // WriteIndexPending for other task if our task be preemptived Queue->WriteIndex = WriteIndexVal; Queue->MessageCount++; Memory[ WriteIndexVal ] = -Size; // SizePending for other task Queue->InProgressCount++; osRestoreInterrupt( Value ); // memcpy( &Memory[ WriteIndexVal + 4 ], Message, Size ); // Value = osDisableInterrupt( ); Memory[ WriteIndexVal ] = Size; // Size for this message Queue->InProgressCount--; } osRestoreInterrupt( Value ); return ( WriteIndexVal != -1 );}// Retrieves a message from a Queue// not allowed while the queue is in use.uint32_t Q_Read( QUEUE * Queue, void ** Message ){ uint32_t MessageSize = 0; uint32_t * Memory = (uint32_t *) Queue->Memory; uint32_t Value = osDisableInterrupt( ); if ( ( Queue->IsUsing == 0 ) && ( Queue->MessageCount ) ) { MessageSize = Memory[ Queue->ReadIndex ]; *Message = (void *) ( (uint32_t) ( &Memory[ Queue->ReadIndex ] ) + 4 ); Queue->IsUsing = 1; } osRestoreInterrupt( Value ); return MessageSize;}// Deletes the last retrieved message in a Queue.void Q_Purge( QUEUE * Queue ){ uint32_t Value = osDisableInterrupt( ); if ( Queue->IsUsing ) { uint32_t * Memory = (uint32_t *) Queue->Memory; uint32_t MessageSize = 4 + ALIGN_UP( Memory[ Queue->ReadIndex ], 4 ); Queue->MessageCount--; uint32_t NextReadIndexVal = Queue->ReadIndex + MessageSize; Queue->ReadIndex = NextReadIndexVal; if ( Queue->Capacity - NextReadIndexVal < 5 ) Queue->ReadIndex = 0; else if ( Queue->MessageCount ) { // Marked by Q_Write(), Next readable message at head of memory if ( Memory[ NextReadIndexVal ] == 0 ) Queue->ReadIndex = 0; } Queue->IsUsing = 0; } osRestoreInterrupt( Value );}