优化了队列的实现,减少空间占用.完成message center的文档

This commit is contained in:
NeoZng 2022-12-01 16:06:11 +08:00
parent 063203bd68
commit 5ad8f57a79
13 changed files with 328 additions and 110 deletions

View File

@ -65,13 +65,7 @@ void MX_FREERTOS_Init(void);
/* Private user code ---------------------------------------------------------*/
/* USER CODE BEGIN 0 */
typedef struct
{
float a;
uint8_t b;
uint32_t c;
/* data */
}good;
/* USER CODE END 0 */
/**
@ -120,17 +114,7 @@ int main(void)
RobotInit();
good asdf;
good asdfsadf;
good pub={.a=1,.b=2,.c=3};
Subscriber_t* s=SubRegister("test\0",sizeof(good));
Subscriber_t* ss=SubRegister("test\0",sizeof(good));
Publisher_t* p=PubRegister("test\0",sizeof(asdf));
PubPushMessage(p,&pub);
PubPushMessage(p,&pub);
volatile uint8_t d= SubGetMessage(s,&asdf);
d=SubGetMessage(s,&asdf);
d=SubGetMessage(ss,&asdfsadf);
/* USER CODE END 2 */
/* Call init function for freertos objects (in freertos.c) */

View File

@ -270,7 +270,7 @@ ROOT:.
### 运行任务
![image-20221113212616636](assets\image-20221113212616636.png)
![image-20221201144336613](assets/image-20221201144336613.png)
### 初始化流程

View File

@ -153,6 +153,16 @@ ITM是instrument trace macrocell指令追踪宏单元的缩写它用于提供
> ***所有需要编辑的配置文件都已经在basic_framework的仓库中提供如果不会写照猫画虎。***
> 2022-12-01更新
>
> VSCode上线了一款新的插件
>
> ![image-20221201134906999](assets/image-20221201134906999.png)
>
> 支持一键配置Arm GNU工具链、MinGW64make工具和OpenOCD可以尝试使用这个插件替代下面的配置流程。并且此插件还提供了一键下载、一键调试的支持只需要选择合适的下载器配置即可全部都是图形化界面的操作
>
> 你可以尝试使用这个插件进行环境的配置。当然,环境变量仍然需要手动添加。
- 安装STM32CubeMX并安装F4支持包和DSP库支持包
- 安装VSCode并安装以下插件

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

View File

@ -3,10 +3,12 @@
#include "string.h"
/* 消息初始化用 */
static char pname[MAX_EVENT_COUNT][MAX_EVENT_NAME_LEN+1];
static char sname[MAX_EVENT_COUNT][MAX_EVENT_NAME_LEN+1];
static void* p_ptr[MAX_EVENT_COUNT]={NULL};
static void** s_pptr[MAX_EVENT_COUNT]={NULL}; // 因为要修改指针,所以需要二重指针
static char pname[MAX_EVENT_COUNT][MAX_EVENT_NAME_LEN + 1];
static char sname[MAX_EVENT_COUNT][MAX_EVENT_NAME_LEN + 1];
static void *p_ptr[MAX_EVENT_COUNT] = {NULL};
static void **s_pptr[MAX_EVENT_COUNT] = {NULL}; // 因为要修改指针,所以需要二重指针
/* ----------------------------------第三方指针传递版的实现----------------------------------- */
void MessageInit()
{
@ -14,25 +16,26 @@ void MessageInit()
// 对每一个subscriber,寻找相同消息名称的publisher,可能有多个sub从相同pub获取消息
for (size_t i = 0; i < MAX_EVENT_COUNT; i++)
{
if(s_pptr[i]!=NULL)
if (s_pptr[i] != NULL)
{
for (size_t j = 0; j < MAX_EVENT_COUNT; j++) //遍历publisher
for (size_t j = 0; j < MAX_EVENT_COUNT; j++) // 遍历publisher
{
if(p_ptr[j]!=NULL) //不为空
if (p_ptr[j] != NULL) // 不为空
{
if(strcmp(sname[i],pname[j])==0) //比较消息名是否一致
if (strcmp(sname[i], pname[j]) == 0) // 比较消息名是否一致
{
*s_pptr[i]=p_ptr[j]; // 将sub的指针指向pub的数据
*s_pptr[i] = p_ptr[j]; // 将sub的指针指向pub的数据
break;
}
}
else //到结尾,退出
else // 到结尾,退出
{
while(1); //如果你卡在这里,说明没有找到消息发布者!请确认消息名称是否键入错误
while (1)
; // 如果你卡在这里,说明没有找到消息发布者!请确认消息名称是否键入错误
}
}
}
else //说明已经遍历完所有的subs
else // 说明已经遍历完所有的subs
{
break;
}
@ -40,129 +43,149 @@ void MessageInit()
}
/* 传入数据地址 */
void PublisherRegister(char* name,void* data)
void PublisherRegister(char *name, void *data)
{
static uint8_t idx;
for (size_t i = 0; i < idx; i++)
{
if(strcmp(pname[i],name)==0)
while(1); //运行至此说明pub的消息发布名称冲突
if (strcmp(pname[i], name) == 0)
while (1)
; // 运行至此说明pub的消息发布名称冲突
}
strcpy(pname[idx],name);
p_ptr[idx++]=data;
strcpy(pname[idx], name);
p_ptr[idx++] = data;
}
/* 注意传入的是指针的地址,传参时使用&对数据指针取地址 */
void SubscribeEvent(char* name,void** data_ptr)
void SubscribeEvent(char *name, void **data_ptr)
{
static uint8_t idx;
strcpy(sname[idx],name);
s_pptr[idx++]=data_ptr;
strcpy(sname[idx], name);
s_pptr[idx++] = data_ptr;
}
/* ----------------------------------链表-队列版的实现----------------------------------- */
/* message_center是fake node,是方便链表编写的技巧,这样不需要处理链表头的特殊情况 */
static Publisher_t message_center = {
.event_name = "Message_Manager",
.first_subs = NULL,
.next_event_node = NULL};
/* 以下是队列版的pubsub,TODO */
/* message_center是fake node,方便链表编写的技巧,这样不需要处理链表头的特殊情况 */
static Publisher_t message_center={
.event_name="Message_Manager",
.first_subs=NULL,
.next_event_node=NULL
};
Subscriber_t* SubRegister(char* name,uint8_t data_len)
static void CheckName(char* name)
{
Publisher_t* node=&message_center;
while(node->next_event_node) // 遍历链表
if(strnlen(name,MAX_EVENT_NAME_LEN+1)>=MAX_EVENT_NAME_LEN)
{
node=node->next_event_node;
if(strcmp(name,node->event_name)==0) //如果事件名相同就订阅这个事件
while (1); // 进入这里说明事件名超出长度限制
}
}
static void CheckLen(uint8_t len1,uint8_t len2)
{
if(len1!=len2)
{
while(1); // 相同事件的消息长度不同
}
}
Subscriber_t *SubRegister(char *name, uint8_t data_len)
{
CheckName(name);
Publisher_t *node = &message_center; // 可以将message_center看作对消息管理器的抽象,它用于管理所有pub和sub
while (node->next_event_node) // 遍历链表,如果当前有发布者已经注册
{
node = node->next_event_node; // 指向下一个发布者(发布者发布的事件)
if (strcmp(name, node->event_name) == 0) // 如果事件名相同就订阅这个事件
{
Subscriber_t* sub=node->first_subs; //指向第一个事件
while (sub->next_subs_queue) //遍历订阅了该事件的链表
CheckLen(data_len,node->data_len);
Subscriber_t *sub = node->first_subs; // 指向订阅了该事件的第一个订阅者
while (sub->next_subs_queue) // 遍历订阅了该事件的链表
{
sub=sub->next_subs_queue;
} //遇到空指针停下,创建新的订阅节点
Subscriber_t* ret=(Subscriber_t*)malloc(sizeof(Subscriber_t));
memset(ret,0,sizeof(Subscriber_t));
ret->data_len=data_len;
sub = sub->next_subs_queue; // 移动到下一个订阅者
} // 遇到空指针停下,说明到了链表尾部,创建新的订阅节点
// 申请内存,注意要memset因为新空间不一定是空的,可能有之前留存的垃圾值
Subscriber_t *ret = (Subscriber_t *)malloc(sizeof(Subscriber_t));
memset(ret, 0, sizeof(Subscriber_t));
// 对新建的Subscriber进行初始化
ret->data_len = data_len; // 设定数据长度
for (size_t i = 0; i < QUEUE_SIZE; i++)
{ //给消息队列分配空间,queue里保存的实际上是数据执指针,这样可以兼容不同的数据长度
ret->queue[i]=malloc(sizeof(data_len));
{ // 给消息队列的每一个元素分配空间,queue里保存的实际上是数据执指针,这样可以兼容不同的数据长度
ret->queue[i] = malloc(sizeof(data_len));
}
sub->next_subs_queue=ret; //接到尾部,延长该订阅该事件的subs链表
sub->next_subs_queue = ret; // 接到链表尾部,延长该订阅该事件的subs链表
return ret;
}
// 时间名不同,在下一轮循环访问下一个结点
}
//遍历完,发现尚未注册事件,那么创建一个事件.此时node是publisher链表的最后一个结点
node->next_event_node=(Publisher_t*)malloc(sizeof(Publisher_t));
memset(node->next_event_node,0,sizeof(Publisher_t));
strcpy(node->next_event_node->event_name,name);
node->next_event_node->data_len=data_len;
//创建subscriber作为新事件的第一个订阅者
Subscriber_t* ret=(Subscriber_t*)malloc(sizeof(Subscriber_t));
memset(ret,0,sizeof(Subscriber_t));
ret->data_len=data_len;
for (size_t i = 0; i < QUEUE_SIZE; i++)
{ //给消息队列分配空间
ret->queue[i]=malloc(sizeof(data_len));
}
node->next_event_node->first_subs=ret;
// 遍历完,发现尚未注册事件(还没有发布者);那么创建一个事件,此时node是publisher链表的最后一个结点
node->next_event_node = (Publisher_t *)malloc(sizeof(Publisher_t));
memset(node->next_event_node, 0, sizeof(Publisher_t));
strcpy(node->next_event_node->event_name, name);
node->next_event_node->data_len = data_len;
// 同之前,创建subscriber作为新事件的第一个订阅者
Subscriber_t *ret = (Subscriber_t *)malloc(sizeof(Subscriber_t));
memset(ret, 0, sizeof(Subscriber_t));
ret->data_len = data_len;
for (size_t i = 0; i < QUEUE_SIZE; i++)
{ // 给消息队列分配空间
ret->queue[i] = malloc(sizeof(data_len));
}
// 新建的订阅者是该发布者的第一个订阅者,发布者会通过这个指针顺序访问所有订阅者
node->next_event_node->first_subs = ret;
return ret;
}
Publisher_t* PubRegister(char* name,uint8_t data_len)
Publisher_t *PubRegister(char *name, uint8_t data_len)
{
Publisher_t* node=&message_center;
while(node->next_event_node) //message_center会直接跳过,不需要特殊处理
CheckName(name);
Publisher_t *node = &message_center;
while (node->next_event_node) // message_center会直接跳过,不需要特殊处理,这被称作dumb_head(编程技巧)
{
node=node->next_event_node;
if(strcmp(node->event_name,name)==0)
node = node->next_event_node; // 切换到下一个发布者(事件)结点
if (strcmp(node->event_name, name) == 0) // 如果已经注册了相同的事件,直接返回结点指针
{
CheckLen(data_len,node->data_len);
return node;
}
} //遍历完发现尚未创建name对应的事件
node->next_event_node=(Publisher_t*)malloc(sizeof(Publisher_t));
memset(node->next_event_node,0,sizeof(Publisher_t));
node->next_event_node->data_len=data_len;
strcpy(node->next_event_node->event_name,name);
} // 遍历完发现尚未创建name对应的事件
// 在链表尾部创建新的事件并初始化
node->next_event_node = (Publisher_t *)malloc(sizeof(Publisher_t));
memset(node->next_event_node, 0, sizeof(Publisher_t));
node->next_event_node->data_len = data_len;
strcpy(node->next_event_node->event_name, name);
return node->next_event_node;
}
/* 如果队列为空,会返回0;成功获取数据,返回1 */
uint8_t SubGetMessage(Subscriber_t* sub,void* data_ptr)
/* 如果队列为空,会返回0;成功获取数据,返回1;后续可以做更多的修改,比如剩余消息数目等 */
uint8_t SubGetMessage(Subscriber_t *sub, void *data_ptr)
{
if(sub->temp_size==0)
if (sub->temp_size == 0)
{
return 0;
}
memcpy(data_ptr,sub->queue[sub->front_idx],sub->data_len);
sub->front_idx=(sub->front_idx++)%QUEUE_SIZE;
sub->temp_size--;
memcpy(data_ptr, sub->queue[sub->front_idx], sub->data_len);
sub->front_idx = (sub->front_idx++) % QUEUE_SIZE; // 队列头索引增加
sub->temp_size--; // pop一个数据,长度减1
return 1;
}
void PubPushMessage(Publisher_t* pub,void* data_ptr)
void PubPushMessage(Publisher_t *pub, void *data_ptr)
{
pub->iter=pub->first_subs;
while (pub->iter) //遍历订阅了当前事件的所有订阅者,依次填入最新消息
Subscriber_t *iter = pub->first_subs; // iter作为订阅者指针,遍历订阅该事件的所有订阅者;如果为空说明遍历结束
while (iter) // 遍历订阅了当前事件的所有订阅者,依次填入最新消息
{
if(pub->iter->temp_size==QUEUE_SIZE) //如果队列已满,则需要删除最老的数据(头部),再填入
{ //头索引增加,相当于移动到新一个位置上
pub->iter->front_idx=(pub->iter->front_idx+1)%QUEUE_SIZE;
pub->iter->temp_size--; //相当于出队,size-1
if (iter->temp_size == QUEUE_SIZE) // 如果队列已满,则需要删除最老的数据(头部),再填入
{ // 队列头索引前移动,相当于抛弃前一个位置,被抛弃的位置稍后会被写入新的数据
iter->front_idx = (iter->front_idx + 1) % QUEUE_SIZE;
iter->temp_size--; // 相当于出队,size-1
}
// 将Pub的数据复制到队列的尾部(最新)
memcpy(pub->iter->queue[pub->iter->back_idx],data_ptr,pub->data_len);
pub->iter->back_idx=(pub->iter->back_idx+1)%QUEUE_SIZE; //队列尾部前移
pub->iter->temp_size++; //入队,size+1
memcpy(iter->queue[iter->back_idx], data_ptr, pub->data_len);
iter->back_idx = (iter->back_idx + 1) % QUEUE_SIZE; // 队列尾部前移
iter->temp_size++; // 入队,size+1
pub->iter=pub->iter->next_subs_queue;
iter = iter->next_subs_queue; // 访问下一个订阅者
}
}

View File

@ -69,11 +69,10 @@ typedef struct mqt
typedef struct ent
{
/* 事件名称 */
char event_name[MAX_EVENT_NAME_LEN];
char event_name[MAX_EVENT_NAME_LEN+2];
uint8_t data_len;
/* 指向第一个订阅了该事件的订阅者,通过链表访问所有订阅者 */
Subscriber_t* first_subs;
Subscriber_t* iter;// 用于遍历所有订阅了该事件的订阅者的指针
/* 指向下一个Publisher的指针 */
struct ent* next_event_node;

View File

@ -0,0 +1,202 @@
# message_center
<p align='right'>neozng1@hnu.edu.cn</p>
> TODO:
>
> 1. 增加互斥锁或在读写数据的时候进入临界区,防止数据竞争
>
> 2. 降低代码冗杂度,将`PubRegister`和`SubRegister`的共同部分抽象出来编写新的函数
>
> 3. 支持自定义队列长度,使得订阅者可以自行确定需要的队列长度,适应不同的需求
>
> 4. ==**!!!重要!!!**==
>
> ==**Legacy support将在正式版发布的时候剔除第三方指针传递者的实现请迁移到新的链表实现。**==
## 总览和封装说明
**重要定义:**
- 发布者:发布消息的对象。发布者会将自己的消息推送给所有订阅了某个特定**事件**的订阅者。
- 订阅者:获取消息的对象。订阅者在订阅了某个事件之后,可以通过接口获得该事件的消息。
- 事件event / topic用于区分消息来源的对象。可以将一个事件看作一刊杂志不同的发布者会将文章汇集到杂志上而订阅者选择订阅一种杂志然后就可以获取所有写在杂志上的文章。
Message Center不同应用间进行消息传递的中介它的存在可以在相当大的程度上解耦不同的app使得不同的应用之间**不存在包含关系**,让代码的自由度更大,将不同模块之间的关系降为**松耦合**。在以往,如果一个.c文件中的数据需要被其他任务/源文件共享,那么其他模块应该要包含前者的头文件,或头文件中应当存在获取该模块数据的接口(即函数,一般是返回数据指针或直接返回数据);但现在,不同的应用之间完全隔离,他们不需要了解彼此的存在,而是只能看见一个**消息中心**。
需要被共享的消息,将会被**发布者**publisher发送到消息中心要获取消息则由**订阅者**subscriber从消息中心根据订阅的消息名事件获取。在这之前发布者要在消息中心完成**注册**,将自己要发布的消息类型和消息名称提交到消息中心;订阅者同样要先在消息中心完成订阅,将自己要接收的消息类型和消息名称提交到订阅中心。消息中心会根据**消息名称**,把订阅者绑定到发布相同名称的发布者上。
> 为了节省空间,数据结构上采用了链表+循环数组模拟队列的方式。C没有哈希表因此让发布者保存所有订阅者的地址实际上只保存首地址然后通过链表访问所有订阅者
Message Center对外提供了四个接口所有原本要进行信息交互的应用都应该包含`message_center.h`,并在初始化的时候进行注册。
## 代码结构
.h 文件中包含了外部接口和类型定义,.c中包含了各个接口的具体实现。
## 外部接口
**在代码实现上,事件实际上就是通过一个字符串体现的。**
```c
Subscriber_t* SubRegister(char* name,uint8_t data_len);
Publisher_t* PubRegister(char* name,uint8_t data_len);
uint8_t SubGetMessage(Subscriber_t* sub,void* data_ptr);
void PubPushMessage(Publisher_t* pub,void* data_ptr);
```
### 订阅者
订阅者应该保存一个订阅者类型的指针`Subscriber_t*`,在初始化的时候调用`SubRegister()`并传入要订阅的事件名和该事件对应消息的长度可以直接输入字符串示例如下将从event_name订阅float数据
```c
Subscriber_t* my_sub;
my_sub=SubRegister("event_name",sizeof(float));
```
订阅完毕后,在应用中通过`SubGetMessage()`获取消息,调用时传入订阅时获得的指针,以及要存放数据的指针。在使用的时候,建议使用强制类型转换将`data_ptr` cast成void*类型(好习惯)。
如果消息队列中有消息返回值为1否则返回值为0说明没有新的消息可用。
### 发布者
发布者应该保存一个发布者类型的指针,在初始化的时候传入要发布的事件名和该事件对应的消息长度。
完成注册后,通过`PubPushMessage()`发布新的消息。所有订阅了该事件的订阅者都会收到新的消息推送。
### 可修改的宏
```c
#define MAX_EVENT_NAME_LEN 32 //最大的事件名长度,每个事件都由字符串来命名
#define QUEUE_SIZE 1 //消息队列的长度
```
修改第一个可以扩大事件名长度,第二个确定消息队列的长度,数量越大可以保存的消息越多。
## 私有函数和定义
```c
static Publisher_t message_center = {
.event_name = "Message_Manager",
.first_subs = NULL,
.next_event_node = NULL};
static void CheckName(char* name)
{
if(strnlen(name,MAX_EVENT_NAME_LEN+1)>=MAX_EVENT_NAME_LEN)
{
while (1); // 进入这里说明事件名超出长度限制
}
}
```
`message_center`内部保存了指向第一个发布者的指针可以看作整个消息中心的抽象。通过这个变量可以访问所有发布者和订阅者。它将会在各个函数中作为dumb_head哑结点以简化逻辑这样不需要对链表头进行特殊处理。
`CheckName()`在发布者/订阅者注册的时候被调用,用于检查事件名是否超过长度限制。超长后会进入死循环,方便开发者检查。
> 四个外部接口的实现都有详细的注释,有兴趣的同学可以自行阅读。下方也提供了流程图。
## 注册、发布、获取消息流程
包含一个结构图和四个流程图。
### Message Center的结构![image-20221201150945052](../../assets/image-20221201150945052.png)
<center>建议打开原图查看</center>
**多个publisher可以绑定同一个事件往该事件推送消息。但一个subscriber只能订阅一个事件如果应用需要订阅多个事件则要创建对应数量的订阅者。**
> 对于电控程序目前的情况不存在多个publisher向同一个事件推送消息的情况。
**对于一个事件,其消息长度必须相同**。发布者和订阅者在注册时都会传入消息长度,用`sizof(your_data)`获取。应当保证不同的模块在进行交互式,使用相同的数据长度。
### 发布者和订阅者注册的流程
- **发布者:**
遍历发布者的事件结点,如果发现相同的事件,直接返回指针即可;遍历完成后发现尚未创建则创建新的事件。
<img src="../../assets/image-20221201152530558.png" alt="image-20221201152530558" style="zoom: 80%;" />
- **订阅者:**
需要注意,由于不同应用/模块的初始化顺序不同,可能出现订阅者先于发布者订阅某一消息的情况,所以要进行发布者链表的遍历,判断是否已经存在相同事件名的发布者,不存在则要先创建发布者结点再将新建订阅者结点并挂载到前者上。
<img src="../../assets/image-20221201152904044.png" alt="image-20221201152904044" style="zoom:80%;" />
### 推送/获取消息的流程
- **数组+头尾索引模拟队列**
<img src="../../assets/image-20221201155228196.png" alt="image-20221201155228196" style="zoom: 71%;" />
front指向队列头即最早入队的数据back指向队列尾即最新的数据。队列是first in first outFIFO先进先出的结构。back指向的位置是入队数据被写入的位置front指向的是读取时会出队的位置。当有数据入队back++出队则front++。若碰到数组边界,则返回数组头,可以通过取模实现:
```C
idx=(idx+1)%SIZE_OF_ARRAY; //SIZE_OF_ARRAY是数组大小
```
我们还需要一个变量用于保存当前队列的元素个数,如果在写入时,队列长度等于上限,应该先将最老的数据出队,再写入新的数据,即:
```c
back=(back+1)%SIZE_OF_ARRAY;
size--;
queue[front]=new_data;
front=(front+1)%SIZE_OF_ARRAY;
```
- **发布者推送消息到指定事件**
通过发布者指针,将订阅了该事件的所有订阅者遍历,将新数据入队。
- **订阅者获取消息**
从订阅者指针访问消息队列取出最先进入队列的数据。注意判断队列是否为空如果为空则返回0。
## 示例代码
```c
typedef struct
{
float a;
uint8_t b;
uint32_t c;
}good;
good g1;
good g2;
good pub_data={.a=1,.b=2,.c=3};
// 一个发布者,两个订阅者
Subscriber_t* s=SubRegister("test",sizeof(good));
Subscriber_t* ss=SubRegister("test",sizeof(good));
Publisher_t* p=PubRegister("test",sizeof(good));
// 推送消息
PubPushMessage(p,&pub_data);
pub_data.a=4;
pub_data.b=5;
pub_data.c=6;
// 推送新消息
PubPushMessage(p,&pub_data);
volatile uint8_t d= 0; // 确定收到的消息是否有效,可以根据d的值进一步处理
d=SubGetMessage(s,&g1);
d=SubGetMessage(s,&g1);
d=SubGetMessage(s,&g1); // 此时d等于0
d=SubGetMessage(ss,&g2);
```