linux lib nanomsg
海思移植nanomsg
修改 CMakeLists.txt 增加海思支持
```makefileset cross-compiled system type, it's better not use the type which cmake cannot recognized.
SET ( CMAKE_SYSTEM_NAME Linux )
SET ( CMAKE_SYSTEM_PROCESSOR arm )when hislicon SDK was installed, toolchain was installed in the path as below:
SET ( CMAKE_C_COMPILER /opt/hisi-linux/x86-arm/arm-himix200-linux/bin/arm-himix200-linux-gcc )
SET ( CMAKE_CXX_COMPILER /opt/hisi-linux/x86-arm/arm-himix200-linux/bin/arm-himix200-linux-g++ )
SET ( CMAKE_C_COMPILER arm-none-linux-gnueabi-gcc)
SET ( CMAKE_CXX_COMPILER arm-none-linux-gnueabi-g++ )
set searching rules for cross-compiler
SET ( CMAKE_FIND_ROOT_PATH_MODE_PROGRAM NEVER )
SET ( CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY )
SET ( CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY )
set ${CMAKE_C_FLAGS} and ${CMAKE_CXX_FLAGS}flag for cross-compiled process
SET ( CROSS_COMPILATION_ARM himix200 )
SET ( CROSS_COMPILATION_ARCHITECTURE armv7-a )
set g++ param
SET ( CMAKE_CXX_FLAGS "-std=c++11 -march=armv7-a -mfloat-abi=softfp -mfpu=neon-vfpv4 -fopenmp ${CMAKE_CXX_FLAGS}" )
add_definitions(-D__ARM_NEON)
2. 开启 NN_STATIC_LIB 静态编译
`camke .. -DNN_STATIC_LIB=ON `
`option (NN_STATIC_LIB "Build static library instead of shared library." ON)`
3. 修改库文件的安装路径 执行 `make install`
`SET(CMAKE_INSTALL_PREFIX ../nanomsg)`
4. 卸载清除安装文件 `cat install_manifest.txt | sudo xargs rm`
### 修改 nanomsg 的安装目录
1. 修改camke 查找路径, 到nanomsg的安装目录
vi demo/CMakeLists.txt
set(CMAKE_PREFIX_PATH ${CMAKE_PREFIX_PATH} "/Volumes/linux/c/nanomsg/nanomsg")
## 解决 undefined reference to `getaddrinfo_a`
libnanomsg.a(dns.c.o): in function `nn_dns_start':
dns.c:(.text+0x598): undefined reference to `getaddrinfo_a
修改 CMakeLists.txt 找到 NN_ENABLE_GETADDRINFO_A 选项 配置为OFF
option (NN_ENABLE_GETADDRINFO_A "Enable/disable use of getaddrinfo_a in place of getaddrinfo." OFF)
重新执行 cmake .. && make 生成
### nanomsg 配置
```cpp
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
// 设置写入超时
int timeout = 2000;
nn_setsockopt (sock, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof (timeout));
// 设置读取超时
int timeout = 100;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout , sizeof(timeout));
int b = 1;
nn_setsockopt(sock, NN_TCP, NN_TCP_NODELAY, &b, sizeof(b));
nanocat 使用
服务端while true; do nanocat --pub --bind-ipc "/tmp/testingZone" --data "${s}" --delay 1; done;
while true; do nanocat --sub --connect-ipc "/tmp/testingZone" --format msgpack ; done
nanomsg/mangos: mangos is a pure Golang implementation of nanomsg's "Scalablilty Protocols"
NNG/nanomsg 是最近项目上使用到的一个通信库, 用来实现进程间过程调用和线程间通信, 很是方便。
NNG 是 nanomsg 的继任版本, 而 nanomsg 则是流行的 ZMQ 的 C 重写版。
NNG 将通信使用的协议和传输分离, 同一个协议可以工作在不同的传输层上, 类似与 TCP/IP 的应用层和传输层的分层, 同时接口上屏蔽了底层细节, 统一用字符串 URL 来描述传输模式。这样当使用场景修改时, 可以通过简单修改 URL 来实现适应, 极具灵活性。
同时如 NNG 描述所言 “light-weight brokerless messaging”, NNG 中的通信各方是不需要第三方程序介入的, 这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。
NNG 支持的通信协议主要有以下几种:
client --- server
NN_PAIR pair 一对一双向通信。 one-to-one
NN_PUSH,NN_PULL push ----> pull (pipeline) 单向通信, 类似与生产者消费者模型的消息队列 many-to-many 用于日志收集
NN_PUB,NN_SUB sub <---- pub 单向广播。 many-to-many 用于广播
NN_REP,NN_REQ req <---> rep (rpc) 请求-响应模式, 类似与 rpc 模式。
NN_BUS bus 网状连接通信, 每个加入节点都可以发送/接受广播消息。 many-to-many
NN_SURVEYOR, NN_RESPONDENT survey 用于多节点表决或者服务发现。
PAIR - simple one-to-one communication
BUS - simple many-to-many communication
REQREP - allows to build clusters of stateless services to process user requests
PUBSUB - distributes messages to large sets of interested subscribers
PIPELINE - aggregates messages from multiple sources and load balances them among many destinations
SURVEY - allows to query state of multiple applications in a single go
NNG 支持的传输模式主要有以下三种常用, 其他还有tcp附加tls 1.2加密的tls传输和基于WebSocket的ws传输:
INPROC - transport within a process (between threads, modules etc.) 线程间通信
IPC - transport between processes on a single machine 进程间通信
TCP - network transport via TCP 网络TCP
WS - websockets over TCP websockets TCP
通信协议里除了 PAIR 之外, 基本都是一对多的通信模式, 这点需要注意, 以 PIPELINE 和 PUB/SUB 为例:
- PIPELINE 的 PUSH 端是 client, 一个 PUSH 可以连接多个 PULL 端, 发送数据时会选择其中一个可用的发送;PULL 端是 server, 一个 PULL 可以接收多个 PUSH 连接和数据。
- PUB/SUB 的 SUB 端是 client, 一个 SUB 可以连接多个不同的 PUB 端, 接收多个 PUB 端广播的数据;PUB 端是 server, 一个 PUB 可以接收多个 SUB 连接并广播数据。
基于以上, 多个程序是没办法共用一个 PUB/SUB 通道来广播数据的, 这与 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要实现类似功能, 则可以使用 PIPELINE + PUB/SUB 来处理:
独立一个话题发布的程序, 拥有一个 PULL 和 PUB。
PULL 约定一个 URL, 所有需要发布该话题的程序都 PUSH 数据到该 URL 上。
PUB 约定一个 URL, 所有需要获取该话题的程序都 SUB 到该 URL 上。
程序内部循环将 PULL 读取的数据发送到 PUB 上。
以上则可以模拟出 ROS topic 数据合并 或者 LCM 中 channel 的类似功能。
整体上看, NNG 的 API 很简约, 主要是 4 个, open/recv/send/close, open 根据协议不同使用的函数会不同。配置则是 setopt/getopt, 与 UNIX API 类似。API 中没有上下文环境(context-less)依赖, 只需要一个 nng_socket, 这种设计和实现方法值得去学习一下(初步揣测应该是使用指针值作为handle, 如果要强制编译器做类型检测, 则会套上一层 struct, 如 typedef struct { _nng_xxx_socket * p } nng_socket;)。
NNG 协议基本上囊括了常见的通信需求, 一些特殊的需求, 也可以通过组合协议来实现, 比如上面的模拟 ROS topic 或者 LCM channel 的方法。这样一来, 如果在程序中使用 NNG, 不管是多进程, 还是多线程, 通过设计, 可以进一步增强模块化, 同时不乏灵活性。如果环境变化, 程序不管是由多进程改成多线程, 还是由多线程改成多主机, 都很容易实现。
常见模块/进程/线程间通信, 可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用), 而不是锁+全局变量, 每个模块单元只需要做单一相关的具体事务, 无需知晓全局状态。
nanomsg的前身是zeromsg, zeromq估计很多人都见过, 是一个消息队列, 而nanomq的模式很多和zeromq是类似的, 我们这里就简单的解析一下(如果和官方不相同, 请以官方为准)。
1.One-to-one protocol
一对一协议, 这个就是字面意思, 只能一对一通信, 为通讯双向。关键字:NN_PAIR。
2.Request/reply protocol
请求/回复协议, 由请求端发起请求, 然后等待回应端应答, 一般是一个REP多个REQ;关键字:NN_REP,NN_REQ。
3.Publish/subscribe protocol
发布订阅协议, 将消息广播到多个目的地, 消息从NN_PUB发送, 并且只会由订阅了匹配主题的NN_SUB接收;这种模式只会发布给在线的订阅端, 如果发布端开始发布消息时, 订阅端尚未连接, 则这些消息会被直接丢弃;同时订阅端只负责接收不能反馈;关键字:NN_PUB, NN_SUB。
4.Survey protocol
调查协议, 允许向多个地点广播调查并收集响应, 关键字:NN_SURVEYOR, NN_RESPONDENT。
5.Pipeline protocol
通过一系列步骤传递任务的的协议, 这个协议是可扩展的(官方原文:scalability protocol for passing tasks through a series of processing steps.);它可以公平的对来自先前的处理步骤的消息进行排队, 并在下一个处理步骤的实例中对它进行负载均衡;关键字, NN_PUSH, NN_PULL。
6.Message bus protocol
消息总线(message bus), 将消息从任何节点广播到拓扑中的所有其他节点, 自身不会收到自己发出去的消息;这种模式只能缩放到本地级别(单个机器或者单个局域网), 如果尝试进一步扩展可能会导致单个节点消息过载;关键字:NN_BUS。
这次主要是使用nanomsg库实现多线程之间的通信, 在我们复杂的多线程编程中可能各个线程需要共用一些信息, 平常加互斥锁等等, 有时候也是相当麻烦, 这里是使用nanomsg库实现多线程的通信, 主要这次是一对一线程双向通信和单向通信的demo。
NN_PAIR 多线程一对一双向通信demo
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pair.h>
#include <nanomsg/bus.h>
#include <nanomsg/nn.h>
/*
PAIR - simple one-to-one communication
BUS - simple many-to-many communication
REQREP - allows to build clusters of stateless services to process user requests
PUBSUB - distributes messages to large sets of interested subscribers
PIPELINE - aggregates messages from multiple sources and load balances them among many destinations
SURVEY - allows to query state of multiple applications in a single go
*/
/*
INPROC - transport within a process (between threads, modules etc.) 线程
IPC - transport between processes on a single machine 进程
TCP - network transport via TCP 网络TCP
WS - websockets over TCP websockets TCP
*/
//ipc:// 标识用于多进程通信,后面的sky_test是我自己命名, 可以随意命名
char *url = "ipc://sky_test";
int server_sock_init(int *sock)
{
*sock = nn_socket (AF_SP, NN_PAIR);
if (*sock < 0) {
printf("create server sock failed\r\n");
return 1;
}
if (nn_bind(*sock, url) < 0) {
printf("bind server sock failed\r\n");
return 1;
}
printf("server socket init success...\r\n");
return 0;
}
int client_sock_init(int *sock)
{
*sock = nn_socket (AF_SP, NN_PAIR);
if (*sock < 0) {
printf("create server sock failed\r\n");
return 1;
}
if (nn_connect(*sock, url) < 0) {
printf("bind server sock failed\r\n");
return 1;
}
printf("client socket init success...\r\n");
return 0;
}
void child_process_test()
{
int c_sock;
char *tx_msg = "Hello Main Process";
if (0 != client_sock_init(&c_sock)) {
return;
}
while (1) {
//发送信息到主进程客户端
while (1) {
size_t len = strlen (tx_msg) + 1;
if (nn_send(c_sock, tx_msg, len, 0) < 0) {
printf("Thread Send Msg Failed\r\n");
usleep(500000);
continue;
}
break;
}
while (1) {
//接收主进程客户端信息
char *rx_msg = NULL;
int result = nn_recv(c_sock, &rx_msg, NN_MSG,NN_DONTWAIT);
if (result > 0) {
printf("Child Process Recieve: %s\r\n", rx_msg);
nn_freemsg (rx_msg);
break;
}
usleep(200000);
}
}
}
void main_process_test()
{
int s_sock;
char *tx_msg = "Hi Child Process Test";
if (0 != server_sock_init(&s_sock)) {
return;
}
sleep(1);
while (1) {
while (1) {
//接收子进程客户端信息
char *rx_msg = NULL;
int result = nn_recv(s_sock, &rx_msg, NN_MSG,NN_DONTWAIT);
if (result > 0) {
printf("Main Recieve: %s\r\n", rx_msg);
nn_freemsg (rx_msg);
break;
}
usleep(200000);
}
//发送信息到子进程客户端
while (1) {
size_t len = strlen (tx_msg) + 1;
if (nn_send(s_sock, tx_msg, len, 0) < 0) {
printf("Main Send Msg Failed\r\n");
usleep(500000);
continue;
}
break;
}
}
}
int main()
{
pid_t fpid;
fpid = fork();
if (fpid < 0) {
printf("fork failed\r\n");
return 1;
} else if (fpid == 0) {
//子进程
child_process_test();
} else {
//主进程
main_process_test();
}
return 0;
}
编译:gcc -o nanomsg_pair nanomsg_pair.c -lnanomsg -lpthread
运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pair$ ./nanomsg_pair
server socket init success...
client socket init success...
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
Thread Recieve: Hi Thread Test
Main Recieve: Hello Main Thread
.....
NN_PUSH 线程一对一单向通信(类似管道)demo
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pipeline.h>
#include <nanomsg/nn.h>
/*
此程序为nanomsg多线程一对一单向通信demo。
*/
//inproc 标识用于多线程通信
char *url = "inproc://sky_test";
//发送数据的socket初始化
int send_sock_init(int *sock)
{
*sock = nn_socket (AF_SP, NN_PUSH);
if (*sock < 0) {
printf("create send data sock failed\r\n");
return 1;
}
if (nn_bind(*sock, url) < 0) {
printf("bind send data sock failed\r\n");
return 1;
}
printf("send data socket init success...\r\n");
return 0;
}
//接收数据的socket初始化
int recieve_sock_init(int *sock)
{
*sock = nn_socket (AF_SP, NN_PULL);
if (*sock < 0) {
printf("create recieve data sock failed\r\n");
return 1;
}
if (nn_connect(*sock, url) < 0) {
printf("connect recieve data sock failed\r\n");
return 1;
}
printf("recieve data socket init success...\r\n");
return 0;
}
//线程测试
void *thread_test(void *arg)
{
int c_sock;
if (0 != recieve_sock_init(&c_sock)) {
return;
}
while (1) {
//轮询接收信息
char *rx_msg = NULL;
int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
if (result > 0) {
printf("Thread Recieve: %s\r\n", rx_msg);
nn_freemsg (rx_msg);
}
sleep(1);
}
}
int main()
{
int s_sock;
pthread_t ps;
char *tx_msg = "Hello thread test";
if (0 != send_sock_init(&s_sock)) {
return 1;
}
pthread_create(&ps, NULL, thread_test, NULL);
sleep(1);
//间隔两秒, 发送信息到子线程接收数据端
while (1) {
size_t len = strlen (tx_msg) + 1;
if (nn_send(s_sock, tx_msg, len, 0) < 0) {
printf("Main Send Msg Failed\r\n");
usleep(500000);
continue;
}
sleep(2);
}
return 0;
}
编译:gcc -o nanomsg_pipe nanomsg_pipe.c -lnanomsg -lpthread
运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pipeline$ ./nanomsg_pipe
send data socket init success...
recieve data socket init success...
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
Thread Recieve: Hello thread test
...
总结:
使用nanomsg实现线程间通信, 对于多线程编程来说是相当方便了, nanomsg库又很轻量级, 对于嵌入式程序编程真的非常好用。
————————————————
版权声明:本文为CSDN博主「DancerSky」的原创文章, 遵循CC 4.0 BY-SA版权协议, 转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/DancerSky/article/details/83538565
#include "common.h"
#include <nanomsg/pair.h>
#define NODE0 "node0"
#define NODE1 "node1"
#define SOCKET_ADDR "ipc:///tmp/pair.ipc"
int send_name(int sock, const char *name)
{
printf("%s: SENDING \"%s\"\n", name, name);
int sz_n = strlen(name) + 1;
return nn_send(sock, name, sz_n, 0);
}
int recv_name(int sock, const char *name)
{
char *buf = NULL;
int result = nn_recv(sock, &buf, NN_MSG, 0);
if (result > 0)
{
printf("%s: RECEIVED \"%s\"\n", name, buf);
nn_freemsg(buf);
}
return result;
}
int send_recv(int sock, const char *name)
{
int to = 100;
assert(nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0);
while(1)
{
recv_name(sock, name);
sleep(1);
send_name(sock, name);
}
}
int node0(const char *url)
{
int sock = nn_socket(AF_SP, NN_PAIR);
assert(sock >= 0);
assert(nn_bind (sock, url) >= 0);
send_recv(sock, NODE0);
return nn_shutdown (sock, 0);
}
int node1(const char *url)
{
int sock = nn_socket(AF_SP, NN_PAIR);
assert(sock >= 0);
assert(nn_connect(sock, url) >= 0);
send_recv(sock, NODE1);
return nn_shutdown (sock, 0);
}
int main(int argc, char **argv)
{
if (argc == 2 && strncmp(NODE0, argv[1], strlen(NODE0)) == 0) {
return node0(SOCKET_ADDR);
} else if (argc == 2 && strncmp (NODE1, argv[1], strlen (NODE1)) == 0) {
return node1(SOCKET_ADDR);
} else {
fprintf (stderr, "Usage: pair %s|%s <ARG> ...\n", NODE0, NODE1);
return 1;
}
}
Pub Sub一对多主题订阅通信Demo
# 这次是nanomsg库实现的一个类似于MQTT通信的一种方式, 广播订阅的一个一对多的通信方式。一个主的广播消息, 其他可以订阅自己想要的主题信息, 然后就会只接收订阅的主题的信息。
PubSub一对多主题订阅通信Demo
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/pubsub.h>
#include <nanomsg/nn.h>
/*
此程序为nanomsg多线程一对多单向通信demo,类似MQTT通信, 一个广播, 其他为订阅相应主题
客户端只接收到自己订阅的对应主题的内容。
*/
//inproc 标识用于多线程通信
char *url = "inproc://sky_test";
//发送数据的socket初始化
int send_sock_init(int *sock)
{
*sock = nn_socket (AF_SP, NN_PUB);
if (*sock < 0) {
printf("create send data sock failed\r\n");
return 1;
}
if (nn_bind(*sock, url) < 0) {
printf("bind send data sock failed\r\n");
return 1;
}
printf("send data socket init success...\r\n");
return 0;
}
//接收数据的socket初始化
int recieve_sock_init(int *sock, char *topic)
{
*sock = nn_socket (AF_SP, NN_SUB);
if (*sock < 0) {
printf("create recieve data sock failed\r\n");
return 1;
}
if (NULL == topic) {
//设置订阅主题为全部
nn_setsockopt(*sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
} else {
//设置订阅的主题及主题长度, 主要是对比发送内容的开始字节
//如设置主题为"sky:",那么就会对比信息前面字节是否相同, 相同则可以收到该信息
nn_setsockopt(*sock, NN_SUB, NN_SUB_SUBSCRIBE, topic, strlen(topic));
}
if (nn_connect(*sock, url) < 0) {
printf("connect recieve data sock failed\r\n");
return 1;
}
printf("recieve data socket init success...\r\n");
return 0;
}
//线程1测试
void *thread_test(void *arg)
{
int c_sock;
if (0 != recieve_sock_init(&c_sock, "sky:")) {
return;
}
while (1) {
//轮询接收订阅主题"sky:"信息
char *rx_msg = NULL;
int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
if (result > 0) {
printf("Thread 1 Recieve: %s\r\n\r\n", rx_msg);
nn_freemsg (rx_msg);
}
sleep(1);
}
}
//线程2测试
void *thread_test2(void *arg)
{
int c_sock;
if (0 != recieve_sock_init(&c_sock, "born:")) {
return;
}
while (1) {
//轮询接收订阅主题"born:"信息
char *rx_msg = NULL;
int result = nn_recv(c_sock, &rx_msg, NN_MSG, NN_DONTWAIT);
if (result > 0) {
printf("Thread 2 Recieve: %s\r\n\r\n", rx_msg);
nn_freemsg (rx_msg);
}
sleep(1);
}
}
//发送数据
int send_data(int sock, char *data)
{
if (data == NULL) {
return 1;
}
if (nn_send(sock, data, strlen(data)+1, 0) < 0) {
return 1;
}
printf("Main Server Send:%s\r\n\r\n", data);
return 0;
}
int main()
{
int s_sock, ret, i = 0;
pthread_t ps, ps2;
char *tx_msg = "sky:Hello Thread Sky";
char *tx_msg1 = "born:Hello Thread Born";
char *tx_msg2 = "Storm:Hello Thread Storm";
if (0 != send_sock_init(&s_sock)) {
return 1;
}
//创建子线程, 接收信息
pthread_create(&ps, NULL, thread_test, NULL);
pthread_create(&ps2, NULL, thread_test2, NULL);
sleep(1);
//间隔两秒, 发送信息到子线程接收数据端
while (1) {
//测试发送广播
if (0 == i) {
ret = send_data(s_sock, tx_msg);
if (0 == ret) {
i ++;
}
} else if (1 == i) {
ret = send_data(s_sock, tx_msg1);
if (0 == ret) {
i ++;
}
} else if (2 == i) {
ret = send_data(s_sock, tx_msg2);
if (0 == ret) {
i = 0;
}
}
sleep(2);
}
return 0;
}
编译gcc -o nanomsg_pubsub nanomsg_pubsub.c -lnanomsg -lpthread
运行结果:
sky@ubuntu:~/Study/nanomsg/code_test/inproc/pubsub$ ./nanomsg_pubsub
send data socket init success...
recieve data socket init success...
recieve data socket init success...
Main Server Send:sky:Hello Thread Sky
Thread 1 Recieve: sky:Hello Thread Sky
Main Server Send:born:Hello Thread Born
Thread 2 Recieve: born:Hello Thread Born
Main Server Send:Storm:Hello Thread Storm
Main Server Send:sky:Hello Thread Sky
Thread 1 Recieve: sky:Hello Thread Sky
...
根据结果可以看到, 线程1, 2分别订阅了sky born主题, 所以可以收到, 而storm主题没订阅所以都没有收到。
————————————————
版权声明:本文为CSDN博主「DancerSky」的原创文章, 遵循CC 4.0 BY-SA版权协议, 转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/DancerSky/article/details/83539077
3. [文件] reqrep.c
#include "common.h"
#include <nanomsg/reqrep.h>
#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"
#define SOCKET_ADDR "ipc:///tmp/reqrep.ipc"
char *date(void)
{
time_t raw = time(&raw);
struct tm *info = localtime(&raw);
char *text = asctime(info);
text[strlen(text) - 1] = '\0';
return text;
}
int node0(const char *url)
{
int sz_date = strlen(DATE) + 1;
int sock = nn_socket(AF_SP, NN_REP);
assert(sock >= 0);
assert(nn_bind(sock, url) >= 0);
while (1) {
char *buf = NULL;
int bytes = nn_recv(sock, &buf, NN_MSG, 0);
assert(bytes >= 0);
if (strncmp(DATE, buf, sz_date) == 0) {
printf("NODE0: RECEIVED DATE REQUEST\n");
char *d = date();
int sz_d = strlen(d) + 1;
printf("NODE0: SENDING DATE %s\n", d);
bytes = nn_send(sock, d, sz_d, 0);
assert(bytes == sz_d);
}
nn_freemsg (buf);
}
return nn_shutdown (sock, 0);
}
int node1(const char *url)
{
int sz_date = strlen(DATE) + 1;
char *buf = NULL;
int bytes = -1;
int sock = nn_socket(AF_SP, NN_REQ);
assert(sock >= 0);
assert(nn_connect(sock, url) >= 0);
/* Send */
printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
bytes = nn_send (sock, DATE, sz_date, 0);
assert(bytes == sz_date);
/* Receive */
bytes = nn_recv (sock, &buf, NN_MSG, 0);
assert(bytes >= 0);
printf("NODE1: RECEIVED DATE %s\n", buf);
nn_freemsg(buf);
return nn_shutdown (sock, 0);
}
int main (int argc, char **argv)
{
if (argc == 2 && strncmp(NODE0, argv[1], strlen(NODE0)) == 0) {
return node0(SOCKET_ADDR);
} else if (argc == 2 && strncmp(NODE1, argv[1], strlen(NODE1)) == 0) {
return node1(SOCKET_ADDR);
} else {
fprintf (stderr, "Usage: reqrep %s|%s <ARG> ...\n", NODE0, NODE1);
return 1;
}
}
- [文件] survey.c
```cpp
#include "common.h"
#include
#define SERVER "server"
#define CLIENT "client"
#define DATE "DATE"
#define SOCKET_ADDR "ipc:///tmp/survey.ipc"
char *date(void)
{
time_t raw = time (&raw);
struct tm *info = localtime (&raw);
char *text = asctime (info);
text[strlen(text)-1] = '\0';
return text;
}
int server(const char *url)
{
int sock = nn_socket(AF_SP, NN_SURVEYOR);
assert(sock >= 0);
assert(nn_bind(sock, url) >= 0);
sleep(1); /* wait for connections */
/* Send */
printf("SERVER: SENDING DATE SURVEY REQUEST\n");
int sz_d = strlen(DATE) + 1;
int bytes = nn_send(sock, DATE, sz_d, 0);
assert (bytes == sz_d);
while (1) {
/* Receive */
char *buf = NULL;
bytes = nn_recv(sock, &buf, NN_MSG, 0);
if (bytes == ETIMEDOUT) {
break;
}
if (bytes >= 0) {
printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
nn_freemsg (buf);
}
}
return nn_shutdown(sock, 0);
}
int client(const char *url, const char *name)
{
int sock = nn_socket(AF_SP, NN_RESPONDENT);
assert(sock >= 0);
assert(nn_connect(sock, url) >= 0);
while (1) {
char *buf = NULL;
int bytes = nn_recv(sock, &buf, NN_MSG, 0);
if (bytes >= 0) {
printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n", name, buf);
nn_freemsg(buf);
char *d = date();
int sz_d = strlen(d) + 1; // '\0' too
printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n", name);
int bytes = nn_send (sock, d, sz_d, 0);
assert(bytes == sz_d);
}
}
return nn_shutdown(sock, 0);
}
int main(int argc, char **argv)
{
if (argc == 2 && strncmp(SERVER, argv[1], strlen(SERVER)) == 0) {
return server (SOCKET_ADDR);
} else if (argc == 3 && strncmp(CLIENT, argv[1], strlen(CLIENT)) == 0) {
return client (SOCKET_ADDR, argv[2]);
} else {
fprintf(stderr, "Usage: survey %s|%s <ARG> ...\n", SERVER, CLIENT);
return 1;
}
}
5. [文件] bus.c ~ 3KB 下载(21)
```cpp
#include "common.h"
#include <nanomsg/bus.h>
#define NODE0_SOCKET_ADDR "ipc:///tmp/node0.ipc"
#define NODE1_SOCKET_ADDR "ipc:///tmp/node1.ipc"
#define NODE2_SOCKET_ADDR "ipc:///tmp/node2.ipc"
#define NODE3_SOCKET_ADDR "ipc:///tmp/node3.ipc"
int node0(void)
{
int sock = nn_socket(AF_SP, NN_BUS);
assert(sock >= 0);
assert(nn_bind(sock, NODE0_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
assert(nn_connect(sock, NODE1_SOCKET_ADDR) >= 0);
assert(nn_connect(sock, NODE2_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
return sock;
}
int node1(void)
{
int sock = nn_socket(AF_SP, NN_BUS);
assert(sock >= 0);
assert(nn_bind(sock, NODE1_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
assert(nn_connect(sock, NODE2_SOCKET_ADDR) >= 0);
assert(nn_connect(sock, NODE3_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
return sock;
}
int node2(void)
{
int sock = nn_socket(AF_SP, NN_BUS);
assert(sock >= 0);
assert(nn_bind(sock, NODE2_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
assert(nn_connect(sock, NODE3_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
return sock;
}
int node3(void)
{
int sock = nn_socket(AF_SP, NN_BUS);
assert(sock >= 0);
assert(nn_bind(sock, NODE3_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
assert(nn_connect(sock, NODE0_SOCKET_ADDR) >= 0);
sleep(1); /* wait for connections */
return sock;
}
int bus_on(int sock, const char *name)
{
int to = 100;
assert(nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to)) >= 0);
/* SEND */
int sz_n = strlen(name) + 1;
printf("%s: SENDING '%s' ONTO BUS\n", name, name);
int send = nn_send(sock, name, sz_n, 0);
assert (send == sz_n);
while (1) {
/* RECV */
char *buf = NULL;
int recv = nn_recv(sock, &buf, NN_MSG, 0);
if (recv >= 0) {
printf("%s: RECEIVED '%s' FROM BUS\n", name, buf);
nn_freemsg(buf);
}
}
return nn_shutdown(sock, 0);
}
int node(const char *name)
{
int sock;
if (!strcmp(name, "node0")) {
sock = node0();
} else if (!strcmp(name, "node1")) {
sock = node1();
} else if (!strcmp(name, "node2")) {
sock = node2();
} else if (!strcmp(name, "node3")) {
sock = node3();
} else {
return -1;
}
return bus_on(sock, name);
}
int main(int argc, char **argv)
{
if (argc == 2) {
return node(argv[1]);
} else {
fprintf (stderr, "Usage: bus <NODE_NAME> ...\n");
return 1;
}
}