linux lib nanomsg


原文链接: linux lib nanomsg

海思移植nanomsg

  1. 修改 CMakeLists.txt 增加海思支持
    ```makefile

    set cross-compiled system type, it's better not use the type which cmake cannot recognized.

    SET ( CMAKE_SYSTEM_NAME Linux )
    SET ( CMAKE_SYSTEM_PROCESSOR arm )

    when hislicon SDK was installed, toolchain was installed in the path as below:

    SET ( CMAKE_C_COMPILER /opt/hisi-linux/x86-arm/arm-himix200-linux/bin/arm-himix200-linux-gcc )

    SET ( CMAKE_CXX_COMPILER /opt/hisi-linux/x86-arm/arm-himix200-linux/bin/arm-himix200-linux-g++ )

SET ( CMAKE_C_COMPILER arm-none-linux-gnueabi-gcc)
SET ( CMAKE_CXX_COMPILER arm-none-linux-gnueabi-g++ )

set searching rules for cross-compiler

SET ( CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER )
SET ( CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY )
SET ( CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY )

set ${CMAKE_C_FLAGS} and ${CMAKE_CXX_FLAGS}flag for cross-compiled process

SET ( CROSS_COMPILATION_ARM himix200 )
SET ( CROSS_COMPILATION_ARCHITECTURE armv7-a )

set g++ param

SET ( CMAKE_CXX_FLAGS "-std=c++11 -march=armv7-a -mfloat-abi=softfp -mfpu=neon-vfpv4 -fopenmp ${CMAKE_CXX_FLAGS}" )

add_definitions(-D__ARM_NEON)

2.  开启 NN_STATIC_LIB 静态编译 
`camke .. -DNN_STATIC_LIB=ON `

`option (NN_STATIC_LIB "Build static library instead of shared library." ON)`

3. 修改库文件的安装路径 执行 `make install`
`SET(CMAKE_INSTALL_PREFIX ../nanomsg)`

4. 卸载清除安装文件 `cat install_manifest.txt | sudo xargs rm`

### 修改 nanomsg 的安装目录
1. 修改camke 查找路径, 到nanomsg的安装目录
vi demo/CMakeLists.txt

set(CMAKE_PREFIX_PATH ${CMAKE_PREFIX_PATH} "/Volumes/linux/c/nanomsg/nanomsg")

## 解决 undefined reference to `getaddrinfo_a`
libnanomsg.a(dns.c.o): in function `nn_dns_start': 
dns.c:(.text+0x598): undefined reference to `getaddrinfo_a

修改 CMakeLists.txt 找到 NN_ENABLE_GETADDRINFO_A 选项 配置为OFF 
option (NN_ENABLE_GETADDRINFO_A "Enable/disable use of getaddrinfo_a in place of getaddrinfo." OFF)

重新执行 cmake .. && make 生成


### nanomsg 配置
```cpp
  nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE,  "", 0);

// 设置写入超时
int timeout = 2000;
nn_setsockopt (sock, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof (timeout));

// 设置读取超时
  int timeout = 100;
  nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout , sizeof(timeout));

  int b = 1;
  nn_setsockopt(sock, NN_TCP, NN_TCP_NODELAY, &b, sizeof(b));

nanocat 使用

服务端
while true; do nanocat --pub --bind-ipc "/tmp/testingZone" --data "${s}" --delay 1; done;

while true; do nanocat --sub --connect-ipc "/tmp/testingZone" --format msgpack ; done

nanomsg/mangos: mangos is a pure Golang implementation of nanomsg's "Scalablilty Protocols"

NNG/nanomsg 是最近项目上使用到的一个通信库, 用来实现进程间过程调用和线程间通信, 很是方便。

NNG 是 nanomsg 的继任版本, 而 nanomsg 则是流行的 ZMQ 的 C 重写版。

NNG 将通信使用的协议和传输分离, 同一个协议可以工作在不同的传输层上, 类似与 TCP/IP 的应用层和传输层的分层, 同时接口上屏蔽了底层细节, 统一用字符串 URL 来描述传输模式。这样当使用场景修改时, 可以通过简单修改 URL 来实现适应, 极具灵活性。

同时如 NNG 描述所言 “light-weight brokerless messaging”, NNG 中的通信各方是不需要第三方程序介入的, 这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。

NNG 支持的通信协议主要有以下几种:

                        client --- server

NN_PAIR pair 一对一双向通信。 one-to-one
NN_PUSH,NN_PULL push ----> pull (pipeline) 单向通信, 类似与生产者消费者模型的消息队列 many-to-many 用于日志收集
NN_PUB,NN_SUB sub <---- pub 单向广播。 many-to-many 用于广播
NN_REP,NN_REQ req <---> rep (rpc) 请求-响应模式, 类似与 rpc 模式。
NN_BUS bus 网状连接通信, 每个加入节点都可以发送/接受广播消息。 many-to-many
NN_SURVEYOR, NN_RESPONDENT survey 用于多节点表决或者服务发现。

PAIR - simple one-to-one communication
BUS - simple many-to-many communication
REQREP - allows to build clusters of stateless services to process user requests
PUBSUB - distributes messages to large sets of interested subscribers
PIPELINE - aggregates messages from multiple sources and load balances them among many destinations
SURVEY - allows to query state of multiple applications in a single go

NNG 支持的传输模式主要有以下三种常用, 其他还有tcp附加tls 1.2加密的tls传输和基于WebSocket的ws传输:

INPROC - transport within a process (between threads, modules etc.) 线程间通信
IPC - transport between processes on a single machine   进程间通信
TCP - network transport via TCP  网络TCP
WS - websockets over TCP   websockets TCP

通信协议里除了 PAIR 之外, 基本都是一对多的通信模式, 这点需要注意, 以 PIPELINE 和 PUB/SUB 为例:

  1. PIPELINE 的 PUSH 端是 client, 一个 PUSH 可以连接多个 PULL 端, 发送数据时会选择其中一个可用的发送;PULL 端是 server, 一个 PULL 可以接收多个 PUSH 连接和数据。
  2. PUB/SUB 的 SUB 端是 client, 一个 SUB 可以连接多个不同的 PUB 端, 接收多个 PUB 端广播的数据;PUB 端是 server, 一个 PUB 可以接收多个 SUB 连接并广播数据。
    基于以上, 多个程序是没办法共用一个 PUB/SUB 通道来广播数据的, 这与 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要实现类似功能, 则可以使用 PIPELINE + PUB/SUB 来处理:

独立一个话题发布的程序, 拥有一个 PULL 和 PUB。
PULL 约定一个 URL, 所有需要发布该话题的程序都 PUSH 数据到该 URL 上。
PUB 约定一个 URL, 所有需要获取该话题的程序都 SUB 到该 URL 上。
程序内部循环将 PULL 读取的数据发送到 PUB 上。
以上则可以模拟出 ROS topic 数据合并 或者 LCM 中 channel 的类似功能。

整体上看, NNG 的 API 很简约, 主要是 4 个, open/recv/send/close, open 根据协议不同使用的函数会不同。配置则是 setopt/getopt, 与 UNIX API 类似。API 中没有上下文环境(context-less)依赖, 只需要一个 nng_socket, 这种设计和实现方法值得去学习一下(初步揣测应该是使用指针值作为handle, 如果要强制编译器做类型检测, 则会套上一层 struct, 如 typedef struct { _nng_xxx_socket * p } nng_socket;)。

NNG 协议基本上囊括了常见的通信需求, 一些特殊的需求, 也可以通过组合协议来实现, 比如上面的模拟 ROS topic 或者 LCM channel 的方法。这样一来, 如果在程序中使用 NNG, 不管是多进程, 还是多线程, 通过设计, 可以进一步增强模块化, 同时不乏灵活性。如果环境变化, 程序不管是由多进程改成多线程, 还是由多线程改成多主机, 都很容易实现。

常见模块/进程/线程间通信, 可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用), 而不是锁+全局变量, 每个模块单元只需要做单一相关的具体事务, 无需知晓全局状态。
nanomsg的前身是zeromsg, zeromq估计很多人都见过, 是一个消息队列, 而nanomq的模式很多和zeromq是类似的, 我们这里就简单的解析一下(如果和官方不相同, 请以官方为准)。

1.One-to-one protocol

一对一协议, 这个就是字面意思, 只能一对一通信, 为通讯双向。关键字:NN_PAIR。

2.Request/reply protocol

请求/回复协议, 由请求端发起请求, 然后等待回应端应答, 一般是一个REP多个REQ;关键字:NN_REP,NN_REQ。

3.Publish/subscribe protocol

发布订阅协议, 将消息广播到多个目的地, 消息从NN_PUB发送, 并且只会由订阅了匹配主题的NN_SUB接收;这种模式只会发布给在线的订阅端, 如果发布端开始发布消息时, 订阅端尚未连接, 则这些消息会被直接丢弃;同时订阅端只负责接收不能反馈;关键字:NN_PUB, NN_SUB。

4.Survey protocol

调查协议, 允许向多个地点广播调查并收集响应, 关键字:NN_SURVEYOR, NN_RESPONDENT。

5.Pipeline protocol

通过一系列步骤传递任务的的协议, 这个协议是可扩展的(官方原文:scalability protocol for passing tasks through a series of processing steps.);它可以公平的对来自先前的处理步骤的消息进行排队, 并在下一个处理步骤的实例中对它进行负载均衡;关键字, NN_PUSH, NN_PULL。

6.Message bus protocol

消息总线(message bus), 将消息从任何节点广播到拓扑中的所有其他节点, 自身不会收到自己发出去的消息;这种模式只能缩放到本地级别(单个机器或者单个局域网), 如果尝试进一步扩展可能会导致单个节点消息过载;关键字:NN_BUS。

  这次主要是使用nanomsg库实现多线程之间的通信, 在我们复杂的多线程编程中可能各个线程需要共用一些信息, 平常加互斥锁等等, 有时候也是相当麻烦, 这里是使用nanomsg库实现多线程的通信, 主要这次是一对一线程双向通信和单向通信的demo。

NN_PAIR 多线程一对一双向通信demo

 #include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pair.h>
#include <nanomsg/bus.h>
#include <nanomsg/nn.h>
 
/*
PAIR - simple one-to-one communication
BUS - simple many-to-many communication
REQREP - allows to build clusters of stateless services to process user requests
PUBSUB - distributes messages to large sets of interested subscribers
PIPELINE - aggregates messages from multiple sources and load balances them among many destinations
SURVEY - allows to query state of multiple applications in a single go
*/
/*
INPROC - transport within a process (between threads, modules etc.) 线程
IPC - transport between processes on a single machine   进程
TCP - network transport via TCP  网络TCP
WS - websockets over TCP   websockets TCP
*/
 
//ipc:// 标识用于多进程通信,后面的sky_test是我自己命名, 可以随意命名
char *url = "ipc://sky_test";
 
int server_sock_init(int *sock)
{
  *sock = nn_socket (AF_SP, NN_PAIR);
  if (*sock < 0) {
    printf("create server sock failed\r\n");
    return 1;
  }
  if (nn_bind(*sock, url) < 0) {
    printf("bind server sock failed\r\n");
    return 1;
  }
  printf("server socket init success...\r\n");
  return 0;
}
 
int client_sock_init(int *sock)
{
  *sock = nn_socket (AF_SP, NN_PAIR);
  if (*sock < 0) {
    printf("create server sock failed\r\n");
    return 1;
  }
  if (nn_connect(*sock, url) < 0) {
    printf("bind server sock failed\r\n");
    return 1;
  }
  printf("client socket init success...\r\n");
  return 0;
}
 
void child_process_test()
{
  int c_sock;
  char *tx_msg = "Hello Main Process";
  if (0 != client_sock_init(&c_sock)) {
    return;
  }
  while (1) {
    //发送信息到主进程客户端
    while (1) {
      size_t len = strlen (tx_msg) + 1;
      if (nn_send(c_sock, tx_msg, len, 0) < 0) {
        printf("Thread Send Msg Failed\r\n");
        usleep(500000);
        continue;
      }
      break;
    }
    while (1) {
      //接收主进程客户端信息
      char *rx_msg = NULL;
      int result = nn_recv(c_sock, &rx_msg, NN_MSG,NN_DONTWAIT);
      if (result > 0) {
        printf("Child Process Recieve: %s\r\n", rx_msg);
        nn_freemsg (rx_msg);
        break;
      }
      usleep(200000);
    }
  }
}
 
void main_process_test()
{
  int s_sock;
  char *tx_msg = "Hi Child Process Test";
  if (0 != server_sock_init(&s_sock)) {
    return;
  }
  sleep(1);
  while (1) {
    while (1) {
      //接收子进程客户端信息
      char *rx_msg = NULL;
      int result = nn_recv(s_sock, &rx_msg, NN_MSG,NN_DONTWAIT);
      if (result > 0) {
        printf("Main Recieve: %s\r\n", rx_msg);
        nn_freemsg (rx_msg);
        break;
      }
      usleep(200000);
    }
    //发送信息到子进程客户端
    while (1) {
      size_t len = strlen (tx_msg) + 1;
      if (nn_send(s_sock, tx_msg, len, 0) < 0) {
        printf("Main Send Msg Failed\r\n");
        usleep(500000);
        continue;
      }
      break;
    }
  }
}
 
int main()
{
  pid_t fpid;
  fpid = fork();
  if (fpid < 0) {
    printf("fork failed\r\n");
    return 1;
  } else if (fpid == 0) {
    //子进程
    child_process_test();
  } else {
    //主进程
    main_process_test();
  }
  return 0;
}

编译:
gcc -o nanomsg_pair nanomsg_pair.c -lnanomsg -lpthread
运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pair$ ./nanomsg_pair
server socket init success...
client socket init success...
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
.....

NN_PUSH 线程一对一单向通信(类似管道)demo

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pipeline.h>
#include <nanomsg/nn.h>
 
/*
  此程序为nanomsg多线程一对一单向通信demo。
*/
 
//inproc 标识用于多线程通信
char *url = "inproc://sky_test";
 
//发送数据的socket初始化
int send_sock_init(int *sock)
{
  *sock = nn_socket (AF_SP, NN_PUSH);
  if (*sock < 0) {
    printf("create send data sock failed\r\n");
    return 1;
  }
  if (nn_bind(*sock, url) < 0) {
    printf("bind send data sock failed\r\n");
    return 1;
  }
  printf("send data socket init success...\r\n");
  return 0;
}
 
//接收数据的socket初始化
int recieve_sock_init(int *sock)
{
  *sock = nn_socket (AF_SP, NN_PULL);
  if (*sock < 0) {
    printf("create recieve data sock failed\r\n");
    return 1;
  }
  if (nn_connect(*sock, url) < 0) {
    printf("connect recieve data sock failed\r\n");
    return 1;
  }
  printf("recieve data socket init success...\r\n");
  return 0;
}
 
//线程测试
void *thread_test(void *arg)
{
  int c_sock;
  if (0 != recieve_sock_init(&c_sock)) {
    return;
  }
  while (1) {
    //轮询接收信息
    char *rx_msg = NULL;
    int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
    if (result > 0) {
      printf("Thread Recieve: %s\r\n", rx_msg);
      nn_freemsg (rx_msg);
    }
    sleep(1);
  }
}
 
int main()
{
  int s_sock;
  pthread_t ps;
  char *tx_msg = "Hello thread test";
  if (0 != send_sock_init(&s_sock)) {
    return 1;
  }
  pthread_create(&ps, NULL, thread_test, NULL);
  sleep(1);
  //间隔两秒, 发送信息到子线程接收数据端
  while (1) {
    size_t len = strlen (tx_msg) + 1;
    if (nn_send(s_sock, tx_msg, len, 0) < 0) {
      printf("Main Send Msg Failed\r\n");
      usleep(500000);
      continue;
    }
    sleep(2);
  }
  return 0;
}

编译:
gcc -o nanomsg_pipe nanomsg_pipe.c -lnanomsg -lpthread

运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pipeline$ ./nanomsg_pipe
send data socket init success...
recieve data socket init success...
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
...
 总结:
      使用nanomsg实现线程间通信, 对于多线程编程来说是相当方便了, nanomsg库又很轻量级, 对于嵌入式程序编程真的非常好用。
————————————————
版权声明:本文为CSDN博主「DancerSky」的原创文章, 遵循CC 4.0 BY-SA版权协议, 转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Dancer
Sky/article/details/83538565

#include "common.h"
#include <nanomsg/pair.h>
 
#define NODE0 "node0"
#define NODE1 "node1"
#define SOCKET_ADDR "ipc:///tmp/pair.ipc"
 
int send_name(int sock, const char *name)
{
        printf("%s: SENDING \"%s\"\n", name, name);
        int sz_n = strlen(name) + 1;
        return nn_send(sock, name, sz_n, 0);
}
 
int recv_name(int sock, const char *name)
{
        char *buf = NULL;
        int result = nn_recv(sock, &buf, NN_MSG, 0);
        if (result > 0)
        {
                printf("%s: RECEIVED \"%s\"\n", name, buf);
                nn_freemsg(buf);
        }
        return result;
}
 
int send_recv(int sock, const char *name)
{
        int to = 100;
        assert(nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0);
        while(1)
        {
                recv_name(sock, name);
                sleep(1);
                send_name(sock, name);
        }
}
 
int node0(const char *url)
{
        int sock = nn_socket(AF_SP, NN_PAIR);
        assert(sock >= 0);
        assert(nn_bind (sock, url) >= 0);
        send_recv(sock, NODE0);
        return nn_shutdown (sock, 0);
}
 
int node1(const char *url)
{
        int sock = nn_socket(AF_SP, NN_PAIR);
        assert(sock >= 0);
        assert(nn_connect(sock, url) >= 0);
        send_recv(sock, NODE1);
        return nn_shutdown (sock, 0);
}
 
int main(int argc, char **argv)
{
        if (argc == 2 && strncmp(NODE0, argv[1], strlen(NODE0)) == 0) {
                return node0(SOCKET_ADDR);
        } else if (argc == 2 && strncmp (NODE1, argv[1], strlen (NODE1)) == 0) {
                return node1(SOCKET_ADDR);
        } else {
                fprintf (stderr, "Usage: pair %s|%s <ARG> ...\n", NODE0, NODE1);
                return 1;
        }
}

Pub Sub一对多主题订阅通信Demo

#   这次是nanomsg库实现的一个类似于MQTT通信的一种方式, 广播订阅的一个一对多的通信方式。一个主的广播消息, 其他可以订阅自己想要的主题信息, 然后就会只接收订阅的主题的信息。

PubSub一对多主题订阅通信Demo
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pubsub.h>
#include <nanomsg/nn.h>
 
/*
  此程序为nanomsg多线程一对多单向通信demo,类似MQTT通信, 一个广播, 其他为订阅相应主题
  客户端只接收到自己订阅的对应主题的内容。
*/
 
//inproc 标识用于多线程通信
char *url = "inproc://sky_test";
 
//发送数据的socket初始化
int send_sock_init(int *sock)
{
  *sock = nn_socket (AF_SP, NN_PUB);
  if (*sock < 0) {
    printf("create send data sock failed\r\n");
    return 1;
  }
  if (nn_bind(*sock, url) < 0) {
    printf("bind send data sock failed\r\n");
    return 1;
  }
  printf("send data socket init success...\r\n");
  return 0;
}
 
//接收数据的socket初始化
int recieve_sock_init(int *sock, char *topic)
{
  *sock = nn_socket (AF_SP, NN_SUB);
  if (*sock < 0) {
    printf("create recieve data sock failed\r\n");
    return 1;
  }
  if (NULL == topic) {
    //设置订阅主题为全部
    nn_setsockopt(*sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
  } else {
    //设置订阅的主题及主题长度, 主要是对比发送内容的开始字节
    //如设置主题为"sky:",那么就会对比信息前面字节是否相同, 相同则可以收到该信息
    nn_setsockopt(*sock, NN_SUB, NN_SUB_SUBSCRIBE, topic, strlen(topic));
  }
  if (nn_connect(*sock, url) < 0) {
    printf("connect recieve data sock failed\r\n");
    return 1;
  }
  printf("recieve data socket init success...\r\n");
  return 0;
}
 
//线程1测试
void *thread_test(void *arg)
{
  int c_sock;
  if (0 != recieve_sock_init(&c_sock, "sky:")) {
    return;
  }
  while (1) {
    //轮询接收订阅主题"sky:"信息
    char *rx_msg = NULL;
    int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
    if (result > 0) {
      printf("Thread 1 Recieve: %s\r\n\r\n", rx_msg);
      nn_freemsg (rx_msg);
    }
    sleep(1);
  }
}
 
//线程2测试
void *thread_test2(void *arg)
{
  int c_sock;
  if (0 != recieve_sock_init(&c_sock, "born:")) {
    return;
  }
  while (1) {
    //轮询接收订阅主题"born:"信息
    char *rx_msg = NULL;
    int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
    if (result > 0) {
      printf("Thread 2 Recieve: %s\r\n\r\n", rx_msg);
      nn_freemsg (rx_msg);
    }
    sleep(1);
  }
}
 
//发送数据
int send_data(int sock, char *data)
{
  if (data == NULL) {
    return 1;
  }
  if (nn_send(sock, data, strlen(data)+1, 0) < 0) {
    return 1;
  }
  printf("Main Server Send:%s\r\n\r\n", data);
  return 0;
}
 
int main()
{
  int s_sock, ret, i = 0;
  pthread_t ps, ps2;
  char *tx_msg = "sky:Hello Thread Sky";
  char *tx_msg1 = "born:Hello Thread Born";
  char *tx_msg2 = "Storm:Hello Thread Storm";
  if (0 != send_sock_init(&s_sock)) {
    return 1;
  }
  //创建子线程, 接收信息
  pthread_create(&ps, NULL, thread_test, NULL);
  pthread_create(&ps2, NULL, thread_test2, NULL);
  sleep(1);
  //间隔两秒, 发送信息到子线程接收数据端
  while (1) {
    //测试发送广播
    if (0 == i) {
      ret = send_data(s_sock, tx_msg);
      if (0 == ret) {
        i ++;
      }
    } else if (1 == i) {
      ret = send_data(s_sock, tx_msg1);
      if (0 == ret) {
        i ++;
      }
    } else if (2 == i) {
      ret = send_data(s_sock, tx_msg2);
      if (0 == ret) {
        i = 0;
      }
    }
    sleep(2);
  }
  return 0;
}

编译
gcc -o nanomsg_pubsub nanomsg_pubsub.c -lnanomsg -lpthread
运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pubsub$ ./nanomsg_pubsub
send data socket init success...
recieve data socket init success...
recieve data socket init success...
Main Server Send:sky:Hello Thread Sky

Thread 1 Recieve: sky:Hello Thread Sky

Main Server Send:born:Hello Thread Born

Thread 2 Recieve: born:Hello Thread Born

Main Server Send:Storm:Hello Thread Storm

Main Server Send:sky:Hello Thread Sky

Thread 1 Recieve: sky:Hello Thread Sky
...

根据结果可以看到, 线程1, 2分别订阅了sky born主题, 所以可以收到, 而storm主题没订阅所以都没有收到。
————————————————
版权声明:本文为CSDN博主「DancerSky」的原创文章, 遵循CC 4.0 BY-SA版权协议, 转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Dancer
Sky/article/details/83539077

3. [文件] reqrep.c

#include "common.h"
#include <nanomsg/reqrep.h>
 
#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"
#define SOCKET_ADDR "ipc:///tmp/reqrep.ipc"
 
char *date(void)
{
        time_t raw = time(&raw);
        struct tm *info = localtime(&raw);
        char *text = asctime(info);
 
        text[strlen(text) - 1] = '\0';
 
        return text;
}
 
int node0(const char *url)
{
        int sz_date = strlen(DATE) + 1;
        int sock = nn_socket(AF_SP, NN_REP);
 
        assert(sock >= 0);
        assert(nn_bind(sock, url) >= 0);
 
        while (1) {
                char *buf = NULL;
                int bytes = nn_recv(sock, &buf, NN_MSG, 0);
 
                assert(bytes >= 0);
                if (strncmp(DATE, buf, sz_date) == 0) {
                        printf("NODE0: RECEIVED DATE REQUEST\n");
                        char *d = date();
                        int sz_d = strlen(d) + 1;
                        printf("NODE0: SENDING DATE %s\n", d);
                        bytes = nn_send(sock, d, sz_d, 0);
                        assert(bytes == sz_d);
                }
                nn_freemsg (buf);
        }
 
        return nn_shutdown (sock, 0);
}
 
int node1(const char *url)
{
        int sz_date = strlen(DATE) + 1;
        char *buf = NULL;
        int bytes = -1;
        int sock = nn_socket(AF_SP, NN_REQ);
 
        assert(sock >= 0);
        assert(nn_connect(sock, url) >= 0);
 
        /* Send */
        printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
        bytes = nn_send (sock, DATE, sz_date, 0);
        assert(bytes == sz_date);
 
        /* Receive */
        bytes = nn_recv (sock, &buf, NN_MSG, 0);
        assert(bytes >= 0);
        printf("NODE1: RECEIVED DATE %s\n", buf);
        nn_freemsg(buf);
 
        return nn_shutdown (sock, 0);
}
 
int main (int argc, char **argv)
{
        if (argc == 2 && strncmp(NODE0, argv[1], strlen(NODE0)) == 0) {
                return node0(SOCKET_ADDR);
        } else if (argc == 2 && strncmp(NODE1, argv[1], strlen(NODE1)) == 0) {
                return node1(SOCKET_ADDR);
        } else {
                fprintf (stderr, "Usage: reqrep %s|%s <ARG> ...\n", NODE0, NODE1);
                return 1;
        }
}
  1. [文件] survey.c
    ```cpp
    #include "common.h"
    #include

#define SERVER "server"
#define CLIENT "client"
#define DATE "DATE"
#define SOCKET_ADDR "ipc:///tmp/survey.ipc"

char *date(void)
{

    time_t raw = time (&raw);
    struct tm *info = localtime (&raw);
    char *text = asctime (info);
    text[strlen(text)-1] = '\0';
    return text;

}

int server(const char *url)
{

    int sock = nn_socket(AF_SP, NN_SURVEYOR);

    assert(sock >= 0);
    assert(nn_bind(sock, url) >= 0);
    sleep(1); /* wait for connections */

    /* Send */
    printf("SERVER: SENDING DATE SURVEY REQUEST\n");
    int sz_d = strlen(DATE) + 1;
    int bytes = nn_send(sock, DATE, sz_d, 0);
    assert (bytes == sz_d);

    while (1) {
            /* Receive */
            char *buf = NULL;
            bytes = nn_recv(sock, &buf, NN_MSG, 0);
            if (bytes == ETIMEDOUT) {
                    break;
            }
            if (bytes >= 0) {
                    printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
                    nn_freemsg (buf);
            }
    }

    return nn_shutdown(sock, 0);

}

int client(const char *url, const char *name)
{

    int sock = nn_socket(AF_SP, NN_RESPONDENT);

    assert(sock >= 0);
    assert(nn_connect(sock, url) >= 0);
    while (1) {
            char *buf = NULL;
            int bytes = nn_recv(sock, &buf, NN_MSG, 0);
            if (bytes >= 0) {
                    printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
                    nn_freemsg(buf);
                    char *d = date();
                    int sz_d = strlen(d) + 1; // '\0' too
                    printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", name);
                    int bytes = nn_send (sock, d, sz_d, 0);
                    assert(bytes == sz_d);
            }
    }

    return nn_shutdown(sock, 0);

}

int main(int argc, char **argv)
{

    if (argc == 2 && strncmp(SERVER, argv[1], strlen(SERVER)) == 0) {
            return server (SOCKET_ADDR);
    } else if (argc == 3 && strncmp(CLIENT, argv[1], strlen(CLIENT)) == 0) {
            return client (SOCKET_ADDR, argv[2]);
    } else {
            fprintf(stderr, "Usage: survey %s|%s <ARG> ...\n", SERVER, CLIENT);
            return 1;
    }

}

5. [文件] bus.c ~ 3KB     下载(21)     
```cpp
#include "common.h"
#include <nanomsg/bus.h>
 
#define NODE0_SOCKET_ADDR "ipc:///tmp/node0.ipc"
#define NODE1_SOCKET_ADDR "ipc:///tmp/node1.ipc"
#define NODE2_SOCKET_ADDR "ipc:///tmp/node2.ipc"
#define NODE3_SOCKET_ADDR "ipc:///tmp/node3.ipc"
 
int node0(void)
{
        int sock = nn_socket(AF_SP, NN_BUS);
 
        assert(sock >= 0);
        assert(nn_bind(sock, NODE0_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        assert(nn_connect(sock, NODE1_SOCKET_ADDR) >= 0);
        assert(nn_connect(sock, NODE2_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        return sock;
}
 
int node1(void)
{
        int sock = nn_socket(AF_SP, NN_BUS);
 
        assert(sock >= 0);
        assert(nn_bind(sock, NODE1_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        assert(nn_connect(sock, NODE2_SOCKET_ADDR) >= 0);
        assert(nn_connect(sock, NODE3_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        return sock;
}
 
int node2(void)
{
        int sock = nn_socket(AF_SP, NN_BUS);
 
        assert(sock >= 0);
        assert(nn_bind(sock, NODE2_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        assert(nn_connect(sock, NODE3_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        return sock;
}
 
int node3(void)
{
        int sock = nn_socket(AF_SP, NN_BUS);
 
        assert(sock >= 0);
        assert(nn_bind(sock, NODE3_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        assert(nn_connect(sock, NODE0_SOCKET_ADDR) >= 0);
        sleep(1); /* wait for connections */
 
        return sock;
}
 
int bus_on(int sock, const char *name)
{
        int to = 100;
        assert(nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to)) >= 0);
 
        /* SEND */
        int sz_n = strlen(name) + 1;
        printf("%s: SENDING '%s' ONTO BUS\n", name, name);
        int send = nn_send(sock, name, sz_n, 0);
        assert (send == sz_n);
 
        while (1) {
                /* RECV */
                char *buf = NULL;
                int recv = nn_recv(sock, &buf, NN_MSG, 0);
                if (recv >= 0) {
                        printf("%s: RECEIVED '%s' FROM BUS\n", name, buf);
                        nn_freemsg(buf);
                }
        }
 
        return nn_shutdown(sock, 0);
}
 
int node(const char *name)
{
        int sock;
 
        if (!strcmp(name, "node0")) {
                sock = node0();
        } else if (!strcmp(name, "node1")) {
                sock = node1();
        } else if (!strcmp(name, "node2")) {
                sock = node2();
        } else if (!strcmp(name, "node3")) {
                sock = node3();
        } else {
                return -1;
        }
 
        return bus_on(sock, name);
}
 
int main(int argc, char **argv)
{
        if (argc == 2) {
                return node(argv[1]);
        } else {
                fprintf (stderr, "Usage: bus <NODE_NAME> ...\n");
                return 1;
        }
}
`