快捷搜索:  汽车  科技

消息队列六种(消息队列概念及其实现细节)

消息队列六种(消息队列概念及其实现细节)当然对比引用的方式也有劣势:而freertos的消息队列机制就是拷贝,拷贝的方式有以下优点:一个或多个任务往一个消息容器里面发消息,其它一个或多个任务从这个消息容器里面获取消息,这样实现通信。/* 该图源自野火 */freertos的消息队列:队列传输数据有两种方式:

前言

消息队列是任务间通信系列介绍的首篇笔记,因为学习完消息队列的源码实现后,信号量、互斥量这些任务间通信机制也相当于学完了,只剩下概念性的内容了。

参考:

  • https://www.FreeRTOS.org/a00018.html
  • 李柱明博客:https://www.cnblogs.com/lizhuming/p/16344076.html
10.1 消息队列概念

消息队列实任务间通信机制中的一种。

其它还有二值信号量、计数信号量、互斥量和递归互斥量等等。

一个或多个任务往一个消息容器里面发消息,其它一个或多个任务从这个消息容器里面获取消息,这样实现通信。
/* 该图源自野火 */

消息队列六种(消息队列概念及其实现细节)(1)

freertos的消息队列:

  • 支持FIFO、支持LIFO也支持异步读写工作方式。
  • 支持超时机制。
  • 支持不同长度(在节点长度范围内)、任意类型的消息。
  • 一个任务可对一个消息队列读、写。
  • 一个消息队列支持被多个任务读、写。
  • 队列使用一次后自动从消息队列中移除。
10.2 消息队列的数据传输机制

队列传输数据有两种方式:

  1. 拷贝:把数据、把变量的值复制进队列里。
  2. 引用:把数据、把变量的地址复制进队列里。

而freertos的消息队列机制就是拷贝,拷贝的方式有以下优点:

  • 局部变量的值可以发送到队列中,后续即使函数退出、局部变量被回收,也不会影响队列中的数据。
  • 无需分配buffer来保存数据,队列中有buffer。
  • 发送任务、接收任务解耦:接收任务不需要知道这数据是谁的、也不需要发送任务来释放数据。
  • 如果数据实在太大,可以选择传输地址(即是拷贝地址),依然能实现传输引用的效果。
  • 队列的空间有FreeRTOS内核分配,无需上层应用维护。
  • 无需考虑内存保护功能,因为拷贝的方式新数据的存储区是由队列组件提供的,无需担心获取消息的任务需要权限访问。

当然对比引用的方式也有劣势:

  1. 拷贝数据相对拷贝引用来说要耗时。
  2. 需要更多内存,因为需要存储数据副本。
10.3 消息队列的阻塞访问机制

只要拿到队列句柄,任务和中断都有权限访问消息队列,但是也有阻塞限制。

写消息时,如果消息队列已满,则无法写入(覆盖写入除外),如果用户设置的阻塞时间不为0,则任务会进入阻塞,直到该队列有空闲空间给当前任务写入消息或阻塞时间超时才解除阻塞。

上面说的“该队列有空闲空间给当前任务写入消息”是因为就算当前队列有空间空间,也会优先安排阻塞在等待写链表中的最高优先级任务先写入。如果任务优先级相同,则先安排给最早开始等待的那个任务先写。

读消息时,机制和写消息一样,只是阻塞的条件是队列里面没有消息。

数据传输和阻塞访问机制都会在分析源码时阐述

10.4 消息队列使用场景

消息队列可以应用于发送不定长消息的场合,包括任务与任务间的消息交换。

队列是FreeRTOS主要的任务间通讯方式,可以在任务与任务间、中断和任务间传送信息。

发送到队列的消息是通过拷贝方式实现的,这意味着队列存储的数据是原数据,而不是原数据的引用。

10.5 消息队列控制块

消息队列的控制块也是队列控制块,这个控制块的数据结构除了被消息队列使用外,还被使用到二值信号量、计数信号量、互斥量和递归互斥量。

FreeRTOS的消息队列控制块由多个元素组成,当消息队列被创建时,系统会为控制块分配对应的内存空间,用于保存消息队列的一些信息,包括数据区位置、队列状态等等。

10.5.1 队列控制块源码

队列控制块struct QueueDefinition源码:

/* * Definition of the queue used by the scheduler. * Items are queued by copy not reference. See the following link for the * rationale: http://www.freertos.org/Embedded-RTOS-Queues.html */ typedef struct QueueDefinition { int8_t *pcHead; /*< Points to the beginning of the queue storage area. */ int8_t *pcTail; /*< Points to the byte at the end of the queue storage area. Once more byte is allocated than necessary to store the queue items this is used as a marker. */ int8_t *pcWriteTo; /*< Points to the free next place in the storage area. */ union /* Use of a union is an exception to the coding standard to ensure two mutually exclusive structure members don't appear simultaneously (wasting RAM). */ { int8_t *pcReadFrom; /*< Points to the last place that a queued item was read from when the structure is used as a queue. */ UBaseType_t uxRecursiveCallCount;/*< Maintains a count of the number of times a recursive mutex has been recursively 'taken' when the structure is used as a mutex. */ } u; List_t xTasksWaitingToSend; /*< List of tasks that are blocked waiting to post onto this queue. Stored in priority order. */ List_t xTasksWaitingToReceive; /*< List of tasks that are blocked waiting to read from this queue. Stored in priority order. */ volatile UBaseType_t uxMessagesWaiting;/*< The number of items currently in the queue. */ UBaseType_t uxLength; /*< The length of the queue defined as the number of items it will hold not the number of bytes. */ UBaseType_t uxItemSize; /*< The size of each items that the queue will hold. */ volatile int8_t cRxLock; /*< Stores the number of items received from the queue (removed from the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */ volatile int8_t cTxLock; /*< Stores the number of items transmitted to the queue (added to the queue) while the queue was locked. Set to queueUNLOCKED when the queue is not locked. */ #if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) ) uint8_t ucstaticallyAllocated; /*< Set to pdTRUE if the memory used by the queue was statically allocated to ensure no attempt is made to free the memory. */ #endif #if ( configUSE_QUEUE_SETS == 1 ) struct QueueDefinition *pxQueueSetContainer; #endif #if ( configUSE_TRACE_FACILITY == 1 ) UBaseType_t uxQueueNumber; uint8_t ucQueueType; #endif } xQUEUE;10.5.2 队列控制块成员剖析

在成员剖析时默认按消息队列的作用去剖析。

int8_t *pcHead;:

/* 该队列存储区的起始位置,对应第一个消息空间。*/ int8_t *pcHead;

int8_t *pcTail;:

/* 消息队列存储区的结尾位置。 * 结合 pcHead 指针就是整个存储区合法区域。*/ int8_t *pcHead;

int8_t *pcWriteTo;:

/* 写指针,指向存储区中下一个空闲的空间。 * 队列下次写数据的位置,需要入队时调用该指针写入数据。*/ int8_t *pcWriteTo;

int8_t *pcReadFrom;:

/* 读指针,指向存储区中下一个有效数据的空间。 * 队列下次读取数据的位置,需要出队时调用该指针写入数据。*/ int8_t *pcReadFrom;

UBaseType_t uxRecursiveCallCount;:

/* 递归次数。 * 用于互斥量时使用,与 pcReadFrom 为联合体。 * 记录递归互斥量被调用的次数。 */ UBaseType_t uxRecursiveCallCount;

List_t xTasksWaitingToSend;:

/* 等待发送的任务列表。 * 当队列存储区满时,需要发送消息的任务阻塞时记录到该链表。 * 按任务优先级排序。 */ List_t xTasksWaitingToSend;

List_t xTasksWaitingToReceive;:

/* 等待接收的任务列表。 * 当队列存储区为空时,需要获取消息的任务阻塞时记录到该链表。 * 按任务优先级排序。 */ List_t xTasksWaitingToReceive;

volatile UBaseType_t uxMessagesWaiting;:

/* 当前消息节点的个数。 * 即是当前有效消息数量。 * 二值信号量、互斥信号量时:表示有无信号量可用。 * 计数信号量时:有效信号量个数。 */ volatile UBaseType_t uxMessagesWaiting;

UBaseType_t uxLength;:

/* 当前队列最大节点总数。 * 即是最多能存放多少个消息。 * 二值信号量、互斥信号量时:最大为1。 * 计数信号量时:最大的信号量个数。 */ UBaseType_t uxLength;

UBaseType_t uxItemSize;:

/* 单个节点的大小。 * 单个消息的大小。 * 二值信号量、互斥信号量时:0。 * 计数信号量时:0。 */ UBaseType_t uxItemSize;

volatile int8_t cRxLock;:

/* 记录出队的数据项个数。 * 即是需要解除多少个阻塞在接收等待列表中的任务。 */ volatile int8_t cRxLock;

volatile int8_t cTxLock;:

/* 记录入队的数据项个数。 * 即是需要解除多少个阻塞在发送等待列表中的任务。 */ volatile int8_t cTxLock;10.5.3 cRxLock 和 cTxLock

当中断服务程序操作队列并且导致阻塞的任务解除阻塞时。
首先判断该队列是否上锁:

  • 如果没有上锁,则解除被阻塞的任务,还会根据需要设置上下文切换请求标志;
  • 如果队列已经上锁,则不会解除被阻塞的任务,取而代之的是,将xRxLock或xTxLock加1,表示队列上锁期间出队或入队的数目,也表示有任务可以解除阻塞了。

cRxLock 对应待出队的个数。
cTxLock 对应待入队的个数。

10.5.4 队列控制块数据结构图

消息队列六种(消息队列概念及其实现细节)(2)

10.6 创建消息队列

创建消息队列是在系统上新建一个消息队列,申请资源并初始化后返回句柄给用户,用户可以使用该队列句柄访问、操作该队列。

10.6.1 创建消息队列API说明

队列的创建有两种方法:静态分配内存、动态分配内存。其区别就是队列的内存来源是用户提供的还是内核分配的。

主要是分析动态分配内存。

函数原型:

QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength UBaseType_t uxItemSize );

参数说明:

  • uxQueueLength:队列长度,最多能存放多少个数据(item)。
  • uxItemSize:每个数据(item)的大小:以字节为单位。
  • 返回值:
    • 非0:成功,返回句柄,以后使用句柄来操作队列。
    • NULL:失败,因为内存不足。
10.6.2 创建消息队列简要步骤
  1. 参数校验。
  2. 计算本次队列需要的总内存。
  3. 分配队列内存空间。
  4. 初始化队列控制块。
  5. 格式化队列数据区。
  6. 返回队列句柄。
10.6.3 创建消息队列源码

创建消息队列这个API其实就是封装了创建队列xQueueGenericCreate()这个通用API,类型为queueQUEUE_TYPE_BASE

#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) #define xQueueCreate( uxQueueLength uxItemSize ) xQueueGenericCreate( ( uxQueueLength ) ( uxItemSize ) ( queueQUEUE_TYPE_BASE ) ) #endif

其中队列的类型有多种:

/* For internal use only. These definitions *must* match those in queue.c. */ #define queueQUEUE_TYPE_BASE ( ( uint8_t ) 0U ) // 队列类型 #define queueQUEUE_TYPE_SET ( ( uint8_t ) 0U ) // 队列集合类型 #define queueQUEUE_TYPE_MUTEX ( ( uint8_t ) 1U ) // 互斥量类型 #define queueQUEUE_TYPE_COUNTING_SEMAPHORE ( ( uint8_t ) 2U ) // 计数信号量类型 #define queueQUEUE_TYPE_BINARY_SEMAPHORE ( ( uint8_t ) 3U ) // 二进制信号量类型 #define queueQUEUE_TYPE_RECURSIVE_MUTEX ( ( uint8_t ) 4U ) // 递归互斥量类型

创建队列函数源码xQueueGenericCreateStatic()

#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength const UBaseType_t uxItemSize const uint8_t ucQueueType ) { Queue_t * pxNewQueue = NULL; size_t xQueueSizeInBytes; uint8_t * pucQueueStorage; if( ( uxQueueLength > ( UBaseType_t ) 0 ) && /* 检查需要的数据区size是否溢出限定范围 */ ( ( SIZE_MAX / uxQueueLength ) >= uxItemSize ) && /* 检查本次队列创建需要的空间是否溢出限定范围 */ ( ( SIZE_MAX - sizeof( Queue_t ) ) >= ( uxQueueLength * uxItemSize ) ) ) { /* 计算数据区空间。 如果队列创建的是不带数据的,如信号量、互斥量,则传入参数时uxItemSize值应该被置为0。 */ xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /* 一次性分配队列所需要的空间,包括队列控制块和数据区 */ pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) xQueueSizeInBytes ); if( pxNewQueue != NULL ) { /* 找出数据区起始地址 */ pucQueueStorage = ( uint8_t * ) pxNewQueue; pucQueueStorage = sizeof( Queue_t ); #if ( configSUPPORT_STATIC_ALLOCATION == 1 ) { /* 如果系统使能了静态创建功能,就需要标记当前队列是动态创建,内存有内核管理,以防用户删除。 */ pxNewQueue->ucStaticallyAllocated = pdFALSE; } #endif /* configSUPPORT_STATIC_ALLOCATION */ /* 初始化这个队列 */ prvInitialiseNewQueue( uxQueueLength uxItemSize pucQueueStorage ucQueueType pxNewQueue ); } else { traceQUEUE_CREATE_FAILED( ucQueueType ); mtCOVERAGE_TEST_MARKER(); } } else { configASSERT( pxNewQueue ); mtCOVERAGE_TEST_MARKER(); } /* 返回队列起始地址,便是队列句柄 */ return pxNewQueue; } #endif /* configSUPPORT_STATIC_ALLOCATION */

初始化队列函数源码prvInitialiseNewQueue()

  • 小笔记:初始化队列,看源码实现就知道控制块和数据区物理内存是可以分开的,但是在创建消息队列这个API里面实现是连续的。

static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength const UBaseType_t uxItemSize uint8_t * pucQueueStorage const uint8_t ucQueueType Queue_t * pxNewQueue ) { /* 防止编译时警告未使用 */ ( void ) ucQueueType; if( uxItemSize == ( UBaseType_t ) 0 ) { /* 如果没有数据区(如信号量、互斥量等等),就需要把队列中的pcHead指回当前队列控制块起始地址,表明当前队列不含数据区。 */ pxNewQueue->pcHead = ( int8_t * ) pxNewQueue; } else { /* 如果当前队列含有数据区,则把 */ pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage; } /* 保存当前队列成员数量 */ pxNewQueue->uxLength = uxQueueLength; /* 保存当前队列每个成员的最大size */ pxNewQueue->uxItemSize = uxItemSize; /* 队列格式化。(组成一个接口是因为不仅仅在这里用到重置队列的功能) */ ( void ) xQueueGenericReset( pxNewQueue pdTRUE ); #if ( configUSE_TRACE_FACILITY == 1 ) { /* 记录当前队列类型。一般用于调试、查栈使用。 */ pxNewQueue->ucQueueType = ucQueueType; } #endif /* configUSE_TRACE_FACILITY */ #if ( configUSE_QUEUE_SETS == 1 ) { pxNewQueue->pxQueueSetContainer = NULL; } #endif /* configUSE_QUEUE_SETS */ traceQUEUE_CREATE( pxNewQueue ); }

重置队列函数xQueueGenericReset()

  • 专门用于函数据区的队列,如消息队列。

BaseType_t xQueueGenericReset( QueueHandle_t xQueue BaseType_t xNewQueue ) { BaseType_t xReturn = pdPASS; Queue_t * const pxQueue = xQueue; /* 参数校验 */ configASSERT( pxQueue ); if( ( pxQueue != NULL ) && ( pxQueue->uxLength >= 1U ) && /* 队列成员数不能小于1,要不然算参数校验失败 */ ( ( SIZE_MAX / pxQueue->uxLength ) >= pxQueue->uxItemSize ) ) /* 队列size溢出检查 */ { taskENTER_CRITICAL(); /* 进入任务临界 */ { /* 保存整个队列尾部的地址,和pxQueue->pcHead结合看,就是这个队列的合法空间首尾 */ pxQueue->u.xQueue.pcTail = pxQueue->pcHead ( pxQueue->uxLength * pxQueue->uxItemSize ); /* 重置当前有效消息数量 */ pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U; /* 重置写指针,指向第一个队列成员 */ pxQueue->pcWriteTo = pxQueue->pcHead; /* 重置读指针,指向最后一个队列成员。因为下次读前要先偏移读指针。 */ pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize ); /* 重置消息队列读锁:开锁状态 */ pxQueue->cRxLock = queueUNLOCKED; /* 重置消息队列写锁:开锁状态 */ pxQueue->cTxLock = queueUNLOCKED; if( xNewQueue == pdFALSE ) /* 重置已经在使用的队列 */ { /* 因为重置队列相当于清空队列里面的数据,队列有空位可写入,所以可以解除一个写阻塞任务 */ /* 如果有任务因为当前队列而写阻塞的,可以解除 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { /* 解除一个写阻塞任务 */ if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { /* 如果内部解锁了个比当前优先级还高的任务,就触发一次任务切换。(当然,实际自信还是在退出任务临界才会执行) */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } } else /* 重置的是一个新的队列 */ { /* 直接初始化写阻塞任务链表和读阻塞任务链表. */ vListInitialise( &( pxQueue->xTasksWaitingToSend ) ); vListInitialise( &( pxQueue->xTasksWaitingToReceive ) ); } } taskEXIT_CRITICAL(); /* 退出任务临界 */ } else { xReturn = pdFAIL; } /* 前面if里面参数校验失败会直接断言 */ configASSERT( xReturn != pdFAIL ); return xReturn; }10.6.4 消息队列数据结构图

消息队列六种(消息队列概念及其实现细节)(3)

10.7 发送消息

任务或者中断服务程序都可以给消息队列发送消息。

中断中发送消息不可阻塞。要么直接返回,要么覆盖写入。

任务发送消息时,如果队列未满或者允许覆盖入队,FreeRTOS会将消息拷贝到消息队列队尾或队列头,否则,会根据用户指定的阻塞超时时间进行阻塞。直到该队列有空闲空间给当前任务写入消息或阻塞时间超时才解除阻塞。

发送消息的API分任务和中断专属,中断专用的API都带FromISR后缀。

因为本系列笔记主要记录源码实现,API的使用不会详细列举。

10.7.1 发送消息API

/* 往队列尾部写入数据。等同于xQueueSendToBack */ BaseType_t xQueueSend(QueueHandle_t xQueue const void *pvItemToQueue TickType_t xTicksToWait); /* 往队列尾部写入数据。等同于xQueueSend */ BaseType_t xQueueSendToBack(QueueHandle_t xQueue const void *pvItemToQueue TickType_t xTicksToWait); /* 往队列尾部写入数据。中断专用 */ BaseType_t xQueueSendToBackFromISR(QueueHandle_t xQueue const void *pvItemToQueue BaseType_t *pxHigherPriorityTaskWoken); /* 往队列头部写入数据 */ BaseType_t xQueueSendToFront(QueueHandle_t xQueue const void *pvItemToQueue TickType_t xTicksToWait); /* 往队列头部写入数据。中断专用 */ BaseType_t xQueueSendToFrontFromISR(QueueHandle_t xQueue const void *pvItemToQueue BaseType_t *pxHigherPriorityTaskWoken);

参数说明:

  • xQueue:队列句柄。
  • pvItemToQueue:数据指针,这些数据的值会被复制进队列。
  • xTicksToWait:最大阻塞时间,单位Tick Count。
    • 如果被设为0,无法写入数据时函数会立刻返回;
    • 如果被设为portMAX_DELAY,则会一直阻塞直到有空间可写
  • 返回值:
    • pdPASS:数据成功写入了队列
    • errQUEUE_FULL:写入失败,因为队列满了。
10.7.2 发送消息实现简要步骤
  1. 参数校验。
  2. 检查当前队列是否有空闲空间可写入。
    1. 进入临界。
    2. 有空间可写入:
      1. 直接写入。
      2. 检查下是否有任务阻塞在当前队列写阻塞链表中,有就解锁一个最高优先级、最早开始等待的任务。
      3. 退出临界。
    3. 没空间可写入:进入阻塞处理。
      1. 不需要阻塞,就退出临界并返回。
      2. 开始阻塞超时计时。
      3. 退出临界。(可能会切到其它任务或中断)
      4. 挂起调度器。
      5. 再次检查下是否有空间可写,是否超时。
      6. 需要阻塞就计算下当前任务的唤醒时间,记录到任务事件状态节点信息中,把当前任务从就绪链表抽离,插入到延时链表和当前队列的写阻塞任务链表中。
      7. 切走任务,等待唤醒。
10.7.3 发送消息源码分析

往队列里发消息的API(中断专用除外),都是封装xQueueGenericSend()函数而来的,所以我们直接分析该函数实现即可。

需要注意的是,如果发送消息前,调度器被挂起了,则这个消息不能配置为阻塞式的,因为如果挂起调度器后使用阻塞式写入队列,会触发断言。

在这里可以拓展下,如果没有这个断言校验,队列已满,则会在当前任务一直死循环,直至有中断服务恢复调度器或读取当前队列的消息,当前任务才能跑出这个坑。

xQueueGenericSend()

BaseType_t xQueueGenericSend( QueueHandle_t xQueue const void * const pvItemToQueue TickType_t xTicksToWait const BaseType_t xCopyPosition ) { BaseType_t xEntryTimeSet = pdFALSE xYieldRequired; TimeOut_t xTimeOut; Queue_t * const pxQueue = xQueue; /* 传入的队列句柄不能为空 */ configASSERT( pxQueue ); /* 如果写入队列的数据为空,就说明调用当前API的不是一个消息队列,而是不含数据区的信号量、互斥量这些IPC,所以队列成员size必须为0 */ configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) ); /* 如果是覆盖写入,这个功能默认只能在队列成员只有1个的情况下使用 */ configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) ); #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) ) { /* 如果调度器被挂起,则不能进入阻塞。 */ configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) ); } #endif /* 使用循环逻辑,是为了解除阻塞后能检查一下能否可以写入。 */ for( ; ; ) { taskENTER_CRITICAL(); /* 进入临界,因为下面操作可能会涉及到全局资源,如那几个任务链表 */ { /* 队列有空闲空间或需要强制写入队列,方可写入 */ if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) ) { traceQUEUE_SEND( pxQueue ); { const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting; /* 拷贝数据到队列里 */ xYieldRequired = prvCopyDataToQueue( pxQueue pvItemToQueue xCopyPosition ); if( pxQueue->pxQueueSetContainer != NULL ) /* 队列集合 */ { if( ( xCopyPosition == queueOVERWRITE ) && ( uxPreviousMessagesWaiting != ( UBaseType_t ) 0 ) ) { /* 如果当前队列里面有数据,且本次写入是覆盖写入,就不需要通知队列集了,因为队列集已经被通知过。 */ mtCOVERAGE_TEST_MARKER(); } else if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE ) /* 通知队列集当前队列有数据了 */ { /* 触发任务切换。只是触发,实际切换需要到退出临界后才执行。 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else /* 不是队列集 */ { /* 有任务阻塞在读阻塞链表,现在队列有数据了,需要解锁一个任务 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) { /* 从读阻塞链表中解除一个最高优先级且最先进入阻塞的任务。 即把这个解除阻塞的任务的事件节点从当前队列的阻塞链表中抽离,把状态节点从挂起链表或延时链表重新插入到就绪链表或挂起的就绪链表中。 如果解除阻塞的任务比当前在跑任务优先级还高,就返回pdTRUE */ if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) { /* 触发任务切换。只是触发,实际切换需要到退出临界后才执行。 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else if( xYieldRequired != pdFALSE ) { /* 如果是释放互斥量时优先级继承机制触发当前任务优先级回落,就绪链表中有更高优先级的任务,则触发任务切换。只是触发,实际切换需要到退出临界后才执行。 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } } taskEXIT_CRITICAL(); /* 退出临界 */ return pdPASS; /* 返回成功 */ } else /* 本次不能写入,则检查、准备进入阻塞处理 */ { if( xTicksToWait == ( TickType_t ) 0 ) /* 不需要阻塞 */ { taskEXIT_CRITICAL(); /* 退出临界 */ traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; /* 返回写入失败 */ } else if( xEntryTimeSet == pdFALSE ) /* 需要阻塞。第一次循环,需要记录当前时间,开始计时阻塞超时。 */ { /* 备份当前系统节拍 */ vTaskInternalSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; /* 标记已开始记录了 */ } else { /* 进入时间已经设定 */ mtCOVERAGE_TEST_MARKER(); } } } taskEXIT_CRITICAL(); /* 退出临界 */ /* 退出临界后系统会先处理在临界期触发的被屏蔽的中断服务,如任务切换的中断服务、其它中断服务等等。 */ vTaskSuspendAll(); /* 又调度回到当前任务了,挂起调度器,继续干活 */ prvLockQueue( pxQueue ); /* 当前队列上锁,以免有中断服务操作当前队列,打乱老子的节奏 */ /* 检查阻塞时间是否已经超时 */ if( xTaskCheckForTimeOut( &xTimeOut &xTicksToWait ) == pdFALSE ) /* 还没超时呢,继续等呗 */ { if( prvIsQueueFull( pxQueue ) != pdFALSE ) /* 纳尼,队列还是满的,写不进去啊 */ { traceBLOCKING_ON_QUEUE_SEND( pxQueue ); /* 我还是去这个队列里面的写阻塞链表里面排个队吧。还可以插队,不过只能插到比自己优先级低的任务前面。 */ vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ) xTicksToWait ); /* 放开当前队列的控制权 */ prvUnlockQueue( pxQueue ); /* 恢复调度器 */ if( xTaskResumeAll() == pdFALSE ) { /* 如果在恢复调度器里面没有触发过调度,那这里需要触发一次调度,因为当前任务已经处于阻塞态了,怎么滴也要触发一次调度切走。 */ portYIELD_WITHIN_API(); } } else /* 队列有空位,赶紧写 */ { /* 解锁当前队列,进入下一个循环,看看能不能抢到写入权限 */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); /* 恢复调度器 */ } } else /* 超时都没等到写入的权限 */ { /* 解锁队列 */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); /* 恢复调度器 */ traceQUEUE_SEND_FAILED( pxQueue ); return errQUEUE_FULL; /* 写入失败 */ } } /*lint -restore */ }

写入队列的API源码prvCopyDataToQueue():(临界中调用

static BaseType_t prvCopyDataToQueue( Queue_t * const pxQueue const void * pvItemToQueue const BaseType_t xPosition ) { BaseType_t xReturn = pdFALSE; UBaseType_t uxMessagesWaiting; /* 当前函数需要在临界里被调用 */ uxMessagesWaiting = pxQueue->uxMessagesWaiting; if( pxQueue->uxItemSize == ( UBaseType_t ) 0 ) /* 非队类型 */ { #if ( configUSE_MUTEXES == 1 ) { if( pxQueue->uxQueueType == queueQUEUE_IS_MUTEX ) /* 互斥量 */ { /* 互斥量类型调用该函数就是释放互斥量的意思 */ /* 释放互斥量,需要处理优先级继承机制,回落到基优先级 */ xReturn = xTaskPriorityDisinherit( pxQueue->u.xSemaphore.xMutexHolder ); /* 标记互斥量已经被解锁 */ pxQueue->u.xSemaphore.xMutexHolder = NULL; } else { mtCOVERAGE_TEST_MARKER(); } } #endif /* configUSE_MUTEXES */ } else if( xPosition == queueSEND_TO_BACK ) /* 往队列尾部写入 */ { /* 按队列属性写入 */ ( void ) memcpy( ( void * ) pxQueue->pcWriteTo pvItemToQueue ( size_t ) pxQueue->uxItemSize ); /* 更新队列写指针 */ pxQueue->pcWriteTo = pxQueue->uxItemSize; if( pxQueue->pcWriteTo >= pxQueue->u.xQueue.pcTail ) { /* 如果本次写入的数据时队列最后一个成员,就需要把当前队列写指针重置回首个队列成员。和ringbuffer原理类似 */ pxQueue->pcWriteTo = pxQueue->pcHead; } else { mtCOVERAGE_TEST_MARKER(); } } else /* 往有效队列头部写入 */ { /* 按列属性写入,读指针就是有效队列头 */ ( void ) memcpy( ( void * ) pxQueue->u.xQueue.pcReadFrom pvItemToQueue ( size_t ) pxQueue->uxItemSize ); /* 读指针往前推 */ pxQueue->u.xQueue.pcReadFrom -= pxQueue->uxItemSize; if( pxQueue->u.xQueue.pcReadFrom < pxQueue->pcHead ) { /* 读指针往前推时溢出后需要回溯到队列最后一个成员 */ pxQueue->u.xQueue.pcReadFrom = ( pxQueue->u.xQueue.pcTail - pxQueue->uxItemSize ); } else { mtCOVERAGE_TEST_MARKER(); } if( xPosition == queueOVERWRITE ) { if( uxMessagesWaiting > ( UBaseType_t ) 0 ) { /* 如果是覆盖写入,那当前队列有效成员数量维持不变 */ --uxMessagesWaiting; } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } } /* 更新当前队列有效成员数量 */ pxQueue->uxMessagesWaiting = uxMessagesWaiting ( UBaseType_t ) 1; return xReturn; }

优先级继承机制概念、实现原理及其源码在互斥量章节的笔记讲解。

10.7.5 中断专用的发送消息API

中断专用的发送消息API比普通的发送消息API佛系了。

区别就是:中断专用的没有阻塞机制。

如果队列有空闲空间,或本次是强制写入,就把数据写入。

  • 写入后如果队列没有上锁,就更新当前队列信息,解锁阻塞在读阻塞队列的最高优先级、最早等待的一个任务。
  • 如果队列上锁了,就用队列中的pxQueue->cTxLock记录当前队列入队了一个数据,在调用prvUnlockQueue()解锁时更新当前队列信息,解锁阻塞在读阻塞队列的最高优先级、最早等待的一个任务。

如果队列没有空闲空间,又不是强制写入,就直接退出。

10.8 接收消息

当任务从队列中读取消息时,如果队列中有消息,可以读取并返回。

如果队列中没有消息,需要进入阻塞处理,在阻塞超时前,有其他任务或中断服务往这个队列里面写消息了,且当前任务时这个队列中阻塞在读阻塞链表中的最高优先级、最先等待的任务,就解锁该任务,否则还会一直阻塞到超时才唤醒当前任务。

10.8.1 接收消息API

/* 从队列中读取数据。 */ BaseType_t xQueueReceive( QueueHandle_t xQueue void * const pvBuffer TickType_t xTicksToWait ); /* 从队列中读取数据。中断专属 */ BaseType_t xQueueReceiveFromISR(QueueHandle_t xQueue void *pvBuffer BaseType_t *pxTaskWoken);

参数说明:

  • xQueue:队列句柄。
  • pvBuffer:存储接收数据的指针,其有效空间需要按照当前队列属性设定。
  • xTicksToWait:如果队列空则无法读出数据,可以让任务进入阻塞状态,xTicksToWait表示阻塞的最大时间,单位:Tick Count。
    • 如果被设为0,无法读出数据时函数会立刻返回;
    • 如果被设为portMAX_DELAY,则会一直阻塞直到有数据可写。
  • 返回值:
    • pdPASS:从队列读出数据入;
    • errQUEUE_EMPTY:读取失败,因为队列空了。
10.8.2 接收消息简要步骤10.8.3 接收消息源码

xQueueReceive()

BaseType_t xQueueReceive( QueueHandle_t xQueue void * const pvBuffer TickType_t xTicksToWait ) { BaseType_t xEntryTimeSet = pdFALSE; TimeOut_t xTimeOut; Queue_t * const pxQueue = xQueue; /* 队列句柄不能为空 */ configASSERT( ( pxQueue ) ); /* 如果数据区回传地址为空,只能是不含数据区的IPC(信号量、互斥量等) */ configASSERT( !( ( ( pvBuffer ) == NULL ) && ( ( pxQueue )->uxItemSize != ( UBaseType_t ) 0U ) ) ); #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) ) { /* 调度器挂起后,不能已阻塞式调用当前API */ configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) ); } #endif /* 使用循环逻辑,是为了解除阻塞后能检查一下能否可以读取。 */ for( ; ; ) { taskENTER_CRITICAL(); /* 进入临界 */ { const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting; /* 当前队列有数据可读 */ if( uxMessagesWaiting > ( UBaseType_t ) 0 ) { /* 出队。 */ prvCopyDataFromQueue( pxQueue pvBuffer ); traceQUEUE_RECEIVE( pxQueue ); /* 有效队列成员个数更新 */ pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1; /* 如果有任务阻塞在当前队列的写阻塞链表中,就解锁一个,让其写入。 */ if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) { /* 把这个解除阻塞的任务从当前队列的写阻塞链表中解除,并把该任务从延时链表或挂起链表中恢复到就绪链表或挂起的就绪链表中 */ if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) { /* 解锁的任务比当前任务优先级更加高,需要触发任务调度。 */ queueYIELD_IF_USING_PREEMPTION(); } else { mtCOVERAGE_TEST_MARKER(); } } else { mtCOVERAGE_TEST_MARKER(); } /* 退出临界 */ taskEXIT_CRITICAL(); return pdPASS; /* 返回读取成功 */ } else /* 队列为空呢 */ { if( xTicksToWait == ( TickType_t ) 0 ) /* 不需要阻塞 */ { /* 退出临界并返回读取失败 */ taskEXIT_CRITICAL(); traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } else if( xEntryTimeSet == pdFALSE ) /* 进入阻塞,首次循环需要开始计时 */ { /* 获取当前系统节拍 */ vTaskInternalSetTimeOutState( &xTimeOut ); xEntryTimeSet = pdTRUE; /* 标记已经开始计时 */ } else { mtCOVERAGE_TEST_MARKER(); } } } taskEXIT_CRITICAL(); /* 退出临界 */ /* 退出临界后系统会先处理在临界期触发的被屏蔽的中断服务,如任务切换的中断服务、其它中断服务等等。 */ vTaskSuspendAll(); /* 有回到了当前任务。挂起调度器 */ prvLockQueue( pxQueue ); /* 队列上锁 */ /* 检查是否已经超时。 */ if( xTaskCheckForTimeOut( &xTimeOut &xTicksToWait ) == pdFALSE ) /* 还没超时 */ { if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) /* 如果队列还没有数据,需要继续阻塞 */ { traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue ); /* 我还是去这个队列里面的读阻塞链表里面排个队吧。还可以插队,不过只能插到比自己优先级低的任务前面。 */ vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ) xTicksToWait ); prvUnlockQueue( pxQueue ); /* 解锁当前队列 */ if( xTaskResumeAll() == pdFALSE ) /* 恢复调度器 */ { /* 如果在恢复调度器时没有调度过,这里必须手动触发一次调度。否则会在当前这个坑里一直跑,直到有中断服务往当前队列里发消息,或者有更高优先级的任务被解除阻塞,或者系统节拍中有同优先级任务被解锁(就绪链表中还有大于2个及其以上同优先级的任务)(开启时间片的前提下)才会跳出这个坑。 */ portYIELD_WITHIN_API(); } else { mtCOVERAGE_TEST_MARKER(); } } else { /* The queue contains data again. Loop back to try and read the * data. */ prvUnlockQueue( pxQueue ); ( void ) xTaskResumeAll(); } } else /* 已经超时了 */ { /* 解锁队列 */ prvUnlockQueue( pxQueue ); /* 恢复调度器 */ ( void ) xTaskResumeAll(); if( prvIsQueueEmpty( pxQueue ) != pdFALSE ) /* 再次判断下是否真的没有数据,现在有数据还来得及 */ { /* 真的没有数据,返回读取失败吧。 */ traceQUEUE_RECEIVE_FAILED( pxQueue ); return errQUEUE_EMPTY; } else { mtCOVERAGE_TEST_MARKER(); } } } /*lint -restore */ }

出队函数prvCopyDataFromQueue()

static void prvCopyDataFromQueue( Queue_t * const pxQueue void * const pvBuffer ) { if( pxQueue->uxItemSize != ( UBaseType_t ) 0 ) /* 只有带数据区的IPC才能调用 */ { /* 偏移到下一个队列成员 */ pxQueue->u.xQueue.pcReadFrom = pxQueue->uxItemSize; if( pxQueue->u.xQueue.pcReadFrom >= pxQueue->u.xQueue.pcTail ) { /* 读指针溢出的话需要回溯 */ pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead; } else { mtCOVERAGE_TEST_MARKER(); } /* 拷贝出数据 */ ( void ) memcpy( ( void * ) pvBuffer ( void * ) pxQueue->u.xQueue.pcReadFrom ( size_t ) pxQueue->uxItemSize ); } }10.9 窥探消息

就是只读取数据,不删除该数据。

其源码和xQueueReceive()差不多,只是数据不删除,读指针也不偏移,有效个数也不减少。

BaseType_t xQueuePeek( QueueHandle_t xQueue void * const pvBuffer TickType_t xTicksToWait ); BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue void * const pvBuffer BaseType_t * const pxHigherPriorityTaskWoken );10.10 队列查询

队列查询主要是操作队列控制块中的信息。

10.10.1 查询队列当前有效数据个数

UBaseType_t uxQueueMessagesWaiting( const QueueHandle_t xQueue ) { UBaseType_t uxReturn; configASSERT( xQueue ); taskENTER_CRITICAL(); { /* 获取队列有效成员个数 */ uxReturn = ( ( Queue_t * ) xQueue )->uxMessagesWaiting; } taskEXIT_CRITICAL(); return uxReturn; }10.10.2 查询队列当前可以空间个数

UBaseType_t uxQueueSpacesAvailable( const QueueHandle_t xQueue ) { UBaseType_t uxReturn; Queue_t * const pxQueue = xQueue; configASSERT( pxQueue ); taskENTER_CRITICAL(); { /* 总个数减去有效个数 */ uxReturn = pxQueue->uxLength - pxQueue->uxMessagesWaiting; } taskEXIT_CRITICAL(); return uxReturn; }10.11 删除消息队列

队列删除函数是根据消息队列句柄直接删除的,删除之后这个消息队列的所有信息都会被系统回收清空,而且不能再次使用这个消息队列了。

直接上源码:

void vQueueDelete( QueueHandle_t xQueue ) { Queue_t * const pxQueue = xQueue; /* 队列必须存在 */ configASSERT( pxQueue ); traceQUEUE_DELETE( pxQueue ); #if ( configQUEUE_REGISTRY_SIZE > 0 ) { /* 如果开启了队列注册表功能,也需要从队列注册表中取出当前队列的记录 */ vQueueUnregisterQueue( pxQueue ); } #endif #if ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) ) { /* 如果只开启了动态内存功能,就是直接释放当前队列资源 */ vPortFree( pxQueue ); } #elif ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) ) { /* 如果动态内存和静态内存都开启了 就需要区分当前队列的内存资源来源 */ if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE ) /* 动态创建 */ { /* 直接回收 */ vPortFree( pxQueue ); } else /* 静态内存,由用户回收资源 */ { mtCOVERAGE_TEST_MARKER(); } } #else /* if ( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) ) */ { /* 静态分配,只能由用户回收。 */ ( void ) pxQueue; } #endif /* configSUPPORT_DYNAMIC_ALLOCATION */ }10.12 消息队列使用注意

在使用freertos提供的消息队列组件时,需要注意以下几点:

  1. 使用xQueueSend()、xQueueSendFromISR()、xQueueReceive()等这些函数之前应先创建需消息队列,并根据队列句柄进行操作。
  2. 要明白写入队列采用的逻辑时FIFO还是LIFO,使用对应的API。
  3. 在获取队列中的消息时候,必须要定义一个存储读取数据的地方,并且该数据区域大小不小于消息大小,否则,很可能引发地址非法的错误。
  4. freertos的数据流是拷贝方式实现的,如果消息过大,建议使用拷贝引用。
  5. 队列独立在内核中,不属于任何一个任务。
小结

学习,重在理解,懂得底层原理,上层特性、特点即可推理。

本文作者: 李柱明

本文链接: https://www.cnblogs.com/lizhuming/p/16344076.html

猜您喜欢: