engineering/modules/message_center/message_center.md

8.3 KiB
Raw Permalink Blame History

message_center

neozng1@hnu.edu.cn

TODO:

支持自定义队列长度,使得订阅者可以自行确定需要的队列长度,适应不同的需求

总览和封装说明

重要定义:

  • 发布者:发布消息的对象。发布者会将自己的消息推送给所有订阅了某个特定话题的订阅者。
  • 订阅者:获取消息的对象。订阅者在订阅了某个话题之后,可以通过接口获得该话题的消息。
  • 话题topic用于区分消息来源的对象。可以将一个话题看作一刊杂志不同的发布者会将文章汇集到杂志上而订阅者选择订阅一种杂志然后就可以获取所有写在杂志上的文章。

Message Center不同应用间进行消息传递的中介它的存在可以在相当大的程度上解耦不同的app使得不同的应用之间不存在包含关系,让代码的自由度更大,将不同模块之间的关系降为松耦合。在以往,如果一个.c文件中的数据需要被其他任务/源文件共享,那么其他模块应该要包含前者的头文件,且头文件中应当存在获取该模块数据的接口(即函数,一般是返回数据指针或直接返回数据,强烈建议不要使用全局变量);但现在,不同的应用之间完全隔离,他们不需要了解彼此的存在,而是只能看见一个消息中心以及一些话题

需要被共享的消息,将会被发布者publisher发送到消息中心要获取消息则由订阅者subscriber从消息中心根据订阅的话题获取。在这之前发布者要在消息中心完成注册,将自己要发布的消息类型和话题名称提交到消息中心;订阅者同样要先在消息中心完成订阅,将自己要接收的消息类型和话题名称提交到订阅中心。消息中心会根据话题名称,把订阅者绑定到发布相同名称的发布者上。

为了节省空间,数据结构上采用了链表+循环数组模拟队列的方式。C没有哈希表因此让发布者保存所有订阅者的地址实际上只保存首地址然后通过链表访问所有订阅者

Message Center对外提供了四个接口所有原本要进行信息交互的应用都应该包含message_center.h,并在初始化的时候进行注册。

代码结构

.h 文件中包含了外部接口和类型定义,.c中包含了各个接口的具体实现。

外部接口

在代码实现上,话题名实际上就是通过一个字符串体现的。

Subscriber_t* SubRegister(char* name,uint8_t data_len);

Publisher_t* PubRegister(char* name,uint8_t data_len);

uint8_t SubGetMessage(Subscriber_t* sub,void* data_ptr);

void PubPushMessage(Publisher_t* pub,void* data_ptr);

订阅者

订阅者应该保存一个订阅者类型的指针Subscriber_t*,在初始化的时候调用SubRegister()并传入要订阅的话题名和该话题对应消息的长度可以直接输入字符串示例如下将从event_name订阅float数据

Subscriber_t* my_sub;
my_sub=SubRegister("event_name",sizeof(float));

订阅完毕后,在应用中通过SubGetMessage()获取消息,调用时传入订阅时获得的指针,以及要存放数据的指针。在使用的时候,建议使用强制类型转换将data_ptr cast成void*类型(好习惯)。

如果消息队列中有消息返回值为1否则返回值为0说明没有新的消息可用。

发布者

发布者应该保存一个发布者类型的指针,在初始化的时候传入要发布的话题名和该话题对应的消息长度。

完成注册后,通过PubPushMessage()发布新的消息。所有订阅了该话题的订阅者都会收到新的消息推送。

可修改的宏

#define MAX_EVENT_NAME_LEN  32  //最大的话题名长度,每个话题都由字符串来命名
#define QUEUE_SIZE 1            //消息队列的长度

修改第一个可以扩大话题名长度,第二个确定消息队列的长度,数量越大可以保存的消息越多。

私有函数和定义

static Publisher_t message_center = {
    .event_name = "Message_Manager",
    .first_subs = NULL,
    .next_event_node = NULL};

static void CheckName(char* name)
{
    if(strnlen(name,MAX_EVENT_NAME_LEN+1)>=MAX_EVENT_NAME_LEN)
    {
        while (1); // 进入这里说明话题名超出长度限制
    }
}

message_center内部保存了指向第一个发布者的指针可以看作整个消息中心的抽象。通过这个变量可以访问所有发布者和订阅者。它将会在各个函数中作为dumb_head哑结点以简化逻辑这样不需要对链表头进行特殊处理。

CheckName()在发布者/订阅者注册的时候被调用,用于检查话题名是否超过长度限制。超长后会进入死循环,方便开发者检查。

四个外部接口的实现都有详细的注释,有兴趣的同学可以自行阅读。下方也提供了流程图。

注册、发布、获取消息流程

包含一个结构图和四个流程图。

Message Center的结构image-20221201150945052

建议打开原图查看

多个publisher可以绑定同一个话题往该话题推送消息。但一个subscriber只能订阅一个话题如果应用需要订阅多个话题则要创建对应数量的订阅者。

对于电控程序目前的情况不存在多个publisher向同一个话题推送消息的情况。

对于相同话题,其消息长度必须相同。发布者和订阅者在注册时都会传入消息长度,用sizof(your_data)获取。应当保证不同的模块在进行交互式,使用相同的数据长度。

发布者和订阅者注册的流程

  • 发布者:

    遍历发布者的话题结点,如果发现相同的话题,直接返回指针即可;遍历完成后发现尚未创建则创建新的话题。

image-20221201152530558
  • 订阅者:

    需要注意,由于不同应用/模块的初始化顺序不同,可能出现订阅者先于发布者订阅某一消息的情况,所以要进行发布者链表的遍历,判断是否已经存在相同话题名的发布者,不存在则要先创建发布者结点再将新建订阅者结点并挂载到前者上。

image-20221201152904044

推送/获取消息的流程

  • 数组+头尾索引模拟队列
image-20221201155228196

front指向队列头即最早入队的数据back指向队列尾即最新的数据。队列是first in first outFIFO先进先出的结构。back指向的位置是入队数据被写入的位置front指向的是读取时会出队的位置。当有数据入队back++出队则front++。若碰到数组边界,则返回数组头,可以通过取模实现:

idx=(idx+1)%SIZE_OF_ARRAY; //SIZE_OF_ARRAY是数组大小

我们还需要一个变量用于保存当前队列的元素个数,如果在写入时,队列长度等于上限,应该先将最老的数据出队,再写入新的数据,即:

back=(back+1)%SIZE_OF_ARRAY;
size--;
queue[front]=new_data;
front=(front+1)%SIZE_OF_ARRAY;
  • 发布者推送消息到指定话题

通过发布者指针,将订阅了该话题的所有订阅者遍历,将新数据入队。

  • 订阅者获取消息

从订阅者指针访问消息队列取出最先进入队列的数据。注意判断队列是否为空如果为空则返回0。

示例代码

typedef struct 
{
    float a;
    uint8_t b;
    uint32_t c;
}good;

good g1;
good g2;
good pub_data={.a=1,.b=2,.c=3};
// 一个发布者,两个订阅者
Subscriber_t* s=SubRegister("test",sizeof(good));
Subscriber_t* ss=SubRegister("test",sizeof(good));
Publisher_t* p=PubRegister("test",sizeof(good));
// 推送消息
PubPushMessage(p,&pub_data);
pub_data.a=4;
pub_data.b=5;
pub_data.c=6;
// 推送新消息
PubPushMessage(p,&pub_data);
volatile uint8_t d= 0; // 确定收到的消息是否有效,可以根据d的值进一步处理
d=SubGetMessage(s,&g1);
d=SubGetMessage(s,&g1);
d=SubGetMessage(s,&g1); // 此时d等于0
d=SubGetMessage(ss,&g2);