网站首页

人工智能P2P分享搜索全网发布信息网站地图标签大全

当前位置:诺佳网 > 电子/半导体 > 嵌入式技术 >

消息队列应用于线程间通信的简单例子

时间:2023-05-12 10:12

人气:

作者:admin

标签: Linux  线程  消息队列  通信 

导读:在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。...

大家好,我是LinuxZn。

在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。

应用:线程间通信/进程间通信。Hello系列 | 多线程编程基础!

Linux系统中提供了两种不同接口的消息队列:

POSIX消息队列。POSIX为可移植的操作系统接口

System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。

其中,POSIX消息队列可移植性较强,使用较广。

Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。

本文介绍POSIX消息队列应用于线程间通信。

头文件:

#include/*ForO_*constants*/
#include/*Formodeconstants*/
#include

编译链接需要加上 -lr 链接。

Linux内核提供了一系列函数来使用消息队列:

/**
*@brief创建消息队列实例
*
*Detailedfunctiondescription
*
*@param[in]name:消息队列名称
*@param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列
-O_CREAT:创建一个消息队列
-O_EXCL:检查消息队列是否存在,一般与O_CREAT一起使用
-O_CREAT|O_EXCL:消息队列不存在则创建,已存在返回NULL
-O_NONBLOCK:非阻塞模式打开,消息队列不存在返回NULL
-O_RDONLY:只读模式打开
-O_WRONLY:只写模式打开
-O_RDWR:读写模式打开
*@param[in] mode:访问权限
*@param[in] attr:消息队列属性地址
*
*@return成功返回消息队列描述符,失败返回-1,错误码存于error中
*/
mqd_tmq_open(constchar*name,intoflag,mode_tmode,structmq_attr*attr);

/**
*@brief无限阻塞方式接收消息
*
*Detailedfunctiondescription
*
*@param[in]mqdes:消息队列描述符
*@param[in] msg_ptr:消息体缓冲区地址
*@param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
*@param[in] msg_prio:消息优先级
*
*@return成功返回消息长度,失败返回-1,错误码存于error中
*/
mqd_tmq_receive(mqd_tmqdes,char*msg_ptr,size_tmsg_len,unsigned*msg_prio);

/**
*@brief指定超时时间阻塞方式接收消息
*
*Detailedfunctiondescription
*
*@param[in]mqdes:消息队列描述符
*@param[in] msg_ptr:消息体缓冲区地址
*@param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
*@param[in] msg_prio:消息优先级
*@param[in] abs_timeout:超时时间
*
*@return成功返回消息长度,失败返回-1,错误码存于error中
*/
mqd_tmq_timedreceive(mqd_tmqdes,char*msg_ptr,size_tmsg_len,unsigned*msg_prio,conststructtimespec*abs_timeout);

/**
*@brief无限阻塞方式发送消息
*
*Detailedfunctiondescription
*
*@param[in]mqdes:消息队列描述符
*@param[in] msg_ptr:待发送消息体缓冲区地址
*@param[in] msg_len:消息体长度
*@param[in] msg_prio:消息优先级
*
*@return成功返回0,失败返回-1
*/
mqd_tmq_send(mqd_tmqdes,constchar*msg_ptr,size_tmsg_len,unsignedmsg_prio);

/**
*@brief指定超时时间阻塞方式发送消息
*
*Detailedfunctiondescription
*
*@param[in]mqdes:消息队列描述符
*@param[in] msg_ptr:待发送消息体缓冲区地址
*@param[in] msg_len:消息体长度
*@param[in] msg_prio:消息优先级
*@param[in] abs_timeout:超时时间
*
*@return成功返回0,失败返回-1
*/
mqd_tmq_timedsend(mqd_tmqdes,constchar*msg_ptr,size_tmsg_len,unsignedmsg_prio,conststructtimespec*abs_timeout);

/**
*@brief关闭消息队列
*
*Detailedfunctiondescription
*
*@param[in]mqdes:消息队列描述符
*
*@return成功返回0,失败返回-1
*/
mqd_tmq_close(mqd_tmqdes);

/**
*@brief分离消息队列
*
*Detailedfunctiondescription
*
*@param[in]name:消息队列名称
*
*@return成功返回0,失败返回-1
*/
mqd_tmq_unlink(constchar*name);

例子:线程1不断给线程2发送字符串数据。

#include
#include
#include
#include
#include
#include/*ForO_*constants*/
#include/*Formodeconstants*/
#include

#defineMQ_MSG_MAX_SIZE512///< 最大消息长度 
#define MQ_MSG_MAX_ITEM    5  ///< 最大消息数目

static pthread_t s_thread1_id;
static pthread_t s_thread2_id;
static unsigned char s_thread1_running = 0;
static unsigned char s_thread2_running = 0;

static mqd_t s_mq;
static char send_msg[10] = "hello";

void *thread1_fun(void *arg)
{
    int ret = 0;

    s_thread1_running = 1;
    while (s_thread1_running)  
    {
  ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0);
  if (ret < 0)
  {
         perror("mq_send error");
  }
        printf("send msg = %s
", send_msg);
        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

void *thread2_fun(void *arg)
{
 char  buf[MQ_MSG_MAX_SIZE];
 int recv_size = 0;

    s_thread2_running = 1;
    while (s_thread2_running)
    {
  recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), NULL);
  if (-1 != recv_size)
  {
   printf("receive msg = %s
", buf);
  }
  else
  {
   perror("mq_receive error");
   break;
  }

        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

int main(void)
{
    int ret = 0;
    struct mq_attr attr;

    ///< 创建消息队列
    memset(&attr, 0, sizeof(attr));
    attr.mq_maxmsg = MQ_MSG_MAX_ITEM;
    attr.mq_msgsize = MQ_MSG_MAX_SIZE;
    attr.mq_flags = 0;
    s_mq = mq_open("/mq", O_CREAT|O_RDWR, 0777, &attr);
 if(-1 == s_mq)
    {
        perror("mq_open error");
        return -1;
    }

    ///< 创建线程1
    ret = pthread_create(&s_thread1_id, NULL, thread1_fun, NULL);
    if (ret != 0)
    {
        printf("thread1_create error!
");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread1_id);
    if (ret != 0)
    {
        printf("s_thread1_id error!
");
        exit(EXIT_FAILURE);
    }

    ///< 创建线程2
    ret = pthread_create(&s_thread2_id, NULL, thread2_fun, NULL);
    if (ret != 0)
    {
        printf("thread2_create error!
");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread2_id);
    if (ret != 0)
    {
        printf("s_thread2_id error!
");
        exit(EXIT_FAILURE);
    }

    while (1)
    {
        sleep(1);
    }

    return 0;
}

编译、运行:

09a74950-efe5-11ed-90ce-dac502259ad0.png

以上就是本次的分享,如果文章有帮助,麻烦帮忙转发,谢谢!

审核编辑:汤梓红

温馨提示:以上内容整理于网络,仅供参考,如果对您有帮助,留下您的阅读感言吧!
相关阅读
本类排行
相关标签
本类推荐

CPU | 内存 | 硬盘 | 显卡 | 显示器 | 主板 | 电源 | 键鼠 | 网站地图

Copyright © 2025-2035 诺佳网 版权所有 备案号:赣ICP备2025066733号
本站资料均来源互联网收集整理,作品版权归作者所有,如果侵犯了您的版权,请跟我们联系。

关注微信