写在最前
上一篇《NanoMsg框架|Android中简单封装PAIR的使用(附Demo地址)》已经把NanoMsg的PAIR使用完成了,但是也是一个半成品,后面要完善的多较多,本章我又重新写了一个Android的Demo,一个是自己用的,并且做了一些小的测试,解决了一次测试中的问题后的版本,还未在生产环境下使用,不过也准备开始测试移植了。
重写的原因
上一个Demo中我们先把NanoMsg的源码进行了动态库的编译,然后再新建的项目中引入动态库的方式,其实这个模式也是模仿的OpenCV的NDK方式进行的,但是使用动态库的调用时我们的CMakeList文件中也必须要加入头文件的引用,要不调用不到nn.h里相关的函数,所以感觉还是有点麻烦
所以后来我还是直接用源码加入native-lib的C++调用写了一个新的工程,不再另编译NanoMsg的动态库了,于是也有了这一篇的出现。
实现效果
重点介绍
微卡智享
其实基本上核心的东西在上一篇中也说的差不多了,这里我主要是把几个特别要注意的点说一下,主要也是在测试过程中遇到的,也算是排坑过程中的一个记录。
CMakeList中其实写法和编译NanoMsg的动态库很像,不过因为我们把调都方法都一起编译在里面了,所以最后要加入我们native-lib.cpp的文件
# For more information about using CMake with Android Studio, read the
# documentation: https://d.android.com/studio/projects/add-native-code.html
# Sets the minimum version of CMake required to build the native library.
cmake_minimum_required(VERSION 3.4.1)
set(NN_SOURCES
src/nn.h
src/inproc.h
src/ipc.h
src/tcp.h
src/ws.h
src/pair.h
src/pubsub.h
src/reqrep.h
src/pipeline.h
src/survey.h
src/bus.h
src/core/ep.h
src/core/ep.c
src/core/global.h
src/core/global.c
src/core/pipe.c
src/core/poll.c
src/core/sock.h
src/core/sock.c
src/core/sockbase.c
src/core/symbol.c
src/aio/ctx.h
src/aio/ctx.c
src/aio/fsm.h
src/aio/fsm.c
src/aio/pool.h
src/aio/pool.c
src/aio/timer.h
src/aio/timer.c
src/aio/timerset.h
src/aio/timerset.c
src/aio/usock.h
src/aio/usock.c
src/aio/worker.h
src/aio/worker.c
src/utils/alloc.h
src/utils/alloc.c
src/utils/atomic.h
src/utils/atomic.c
src/utils/attr.h
src/utils/chunk.h
src/utils/chunk.c
src/utils/chunkref.h
src/utils/chunkref.c
src/utils/clock.h
src/utils/clock.c
src/utils/closefd.h
src/utils/closefd.c
src/utils/cont.h
src/utils/efd.h
src/utils/efd.c
src/utils/err.h
src/utils/err.c
src/utils/fast.h
src/utils/fd.h
src/utils/hash.h
src/utils/hash.c
src/utils/list.h
src/utils/list.c
src/utils/msg.h
src/utils/msg.c
src/utils/condvar.h
src/utils/condvar.c
src/utils/mutex.h
src/utils/mutex.c
src/utils/once.h
src/utils/once.c
src/utils/queue.h
src/utils/queue.c
src/utils/random.h
src/utils/random.c
src/utils/sem.h
src/utils/sem.c
src/utils/sleep.h
src/utils/sleep.c
src/utils/strcasecmp.c
src/utils/strcasecmp.h
src/utils/strcasestr.c
src/utils/strcasestr.h
src/utils/strncasecmp.c
src/utils/strncasecmp.h
src/utils/thread.h
src/utils/thread.c
src/utils/wire.h
src/utils/wire.c
src/devices/device.h
src/devices/device.c
src/protocols/utils/dist.h
src/protocols/utils/dist.c
src/protocols/utils/excl.h
src/protocols/utils/excl.c
src/protocols/utils/fq.h
src/protocols/utils/fq.c
src/protocols/utils/lb.h
src/protocols/utils/lb.c
src/protocols/utils/priolist.h
src/protocols/utils/priolist.c
src/protocols/bus/bus.c
src/protocols/bus/xbus.h
src/protocols/bus/xbus.c
src/protocols/pipeline/push.c
src/protocols/pipeline/pull.c
src/protocols/pipeline/xpull.h
src/protocols/pipeline/xpull.c
src/protocols/pipeline/xpush.h
src/protocols/pipeline/xpush.c
src/protocols/pair/pair.c
src/protocols/pair/xpair.h
src/protocols/pair/xpair.c
src/protocols/pubsub/pub.c
src/protocols/pubsub/sub.c
src/protocols/pubsub/trie.h
src/protocols/pubsub/trie.c
src/protocols/pubsub/xpub.h
src/protocols/pubsub/xpub.c
src/protocols/pubsub/xsub.h
src/protocols/pubsub/xsub.c
src/protocols/reqrep/req.h
src/protocols/reqrep/req.c
src/protocols/reqrep/rep.h
src/protocols/reqrep/rep.c
src/protocols/reqrep/task.h
src/protocols/reqrep/task.c
src/protocols/reqrep/xrep.h
src/protocols/reqrep/xrep.c
src/protocols/reqrep/xreq.h
src/protocols/reqrep/xreq.c
src/protocols/survey/respondent.c
src/protocols/survey/surveyor.c
src/protocols/survey/xrespondent.h
src/protocols/survey/xrespondent.c
src/protocols/survey/xsurveyor.h
src/protocols/survey/xsurveyor.c
src/transports/utils/backoff.h
src/transports/utils/backoff.c
src/transports/utils/dns.h
src/transports/utils/dns.c
src/transports/utils/dns_getaddrinfo.h
src/transports/utils/dns_getaddrinfo.inc
src/transports/utils/dns_getaddrinfo_a.h
src/transports/utils/dns_getaddrinfo_a.inc
src/transports/utils/iface.h
src/transports/utils/iface.c
src/transports/utils/literal.h
src/transports/utils/literal.c
src/transports/utils/port.h
src/transports/utils/port.c
src/transports/utils/streamhdr.h
src/transports/utils/streamhdr.c
src/transports/utils/base64.h
src/transports/utils/base64.c
src/transports/inproc/binproc.h
src/transports/inproc/binproc.c
src/transports/inproc/cinproc.h
src/transports/inproc/cinproc.c
src/transports/inproc/inproc.c
src/transports/inproc/ins.h
src/transports/inproc/ins.c
src/transports/inproc/msgqueue.h
src/transports/inproc/msgqueue.c
src/transports/inproc/sinproc.h
src/transports/inproc/sinproc.c
src/transports/ipc/aipc.h
src/transports/ipc/aipc.c
src/transports/ipc/bipc.h
src/transports/ipc/bipc.c
src/transports/ipc/cipc.h
src/transports/ipc/cipc.c
src/transports/ipc/ipc.c
src/transports/ipc/sipc.h
src/transports/ipc/sipc.c
src/transports/tcp/atcp.h
src/transports/tcp/atcp.c
src/transports/tcp/btcp.h
src/transports/tcp/btcp.c
src/transports/tcp/ctcp.h
src/transports/tcp/ctcp.c
src/transports/tcp/stcp.h
src/transports/tcp/stcp.c
src/transports/tcp/tcp.c
src/transports/ws/aws.h
src/transports/ws/aws.c
src/transports/ws/bws.h
src/transports/ws/bws.c
src/transports/ws/cws.h
src/transports/ws/cws.c
src/transports/ws/sws.h
src/transports/ws/sws.c
src/transports/ws/ws.c
src/transports/ws/ws_handshake.h
src/transports/ws/ws_handshake.c
src/transports/ws/sha1.h
src/transports/ws/sha1.c
)
#设置socket.h中的结构体定义,如果没有这个提cmake中一直会报错
add_definitions(-DNN_HAVE_MSG_CONTROL)
add_definitions(-DNN_HAVE_EPOLL)
add_definitions(-DNN_USE_EPOLL)
add_definitions(-DNN_HAVE_KQUEUE)
add_definitions(-DNN_USE_KQUEUE)
add_definitions(-DNN_HAVE_POLL)
add_definitions(-DNN_USE_POLL)
list(APPEND NN_SOURCES
src/aio/poller.h
src/aio/poller.c
src/aio/poller_epoll.h
src/aio/poller_epoll.inc
src/aio/poller_kqueue.h
src/aio/poller_kqueue.inc
src/aio/poller_poll.h
src/aio/poller_poll.inc
)
add_definitions(-DNN_HAVE_EVENTFD)
add_definitions(-DNN_USE_EVENTFD)
list(APPEND NN_SOURCES
src/utils/efd_eventfd.h
src/utils/efd_eventfd.inc
)
add_definitions(-DNN_HAVE_PIPE)
add_definitions(-DNN_USE_PIPE)
list(APPEND NN_SOURCES
src/utils/efd_pipe.h
src/utils/efd_pipe.inc
)
add_definitions(-DNN_HAVE_SOCKETPAIR)
add_definitions(-DNN_USE_SOCKETPAIR)
list(APPEND NN_SOURCES
src/utils/efd_socketpair.h
src/utils/efd_socketpair.inc
)
add_definitions(-DNN_HAVE_WINSOCK)
add_definitions(-DNN_USE_WINSOCK)
list(APPEND NN_SOURCES
src/utils/efd_win.h
src/utils/efd_win.inc
)
#这里加入我们自己封装的的native-lib的cpp文件
list(APPEND NN_SOURCES
native-lib.cpp
)
# Creates and names a library, sets it as either STATIC
# or SHARED, and provides the relative paths to its source code.
# You can define multiple libraries, and CMake builds them for you.
# Gradle automatically packages shared libraries with your APK.
add_library( # Sets the name of the library.
vnanomsg
# Sets the library as a shared library.
SHARED
# Provides a relative path to your source file(s).
${NN_SOURCES})
# Searches for a specified prebuilt library and stores the path as a
# variable. Because CMake includes system libraries in the search path by
# default, you only need to specify the name of the public NDK library
# you want to add. CMake verifies that the library exists before
# completing its build.
find_library( # Sets the name of the path variable.
log-lib
# Specifies the name of the NDK library that
# you want CMake to locate.
log)
# Specifies libraries CMake should link to your target library. You
# can link multiple libraries, such as libraries you define in this
# build script, prebuilt third-party libraries, or system libraries.
target_link_libraries( # Specifies the target library.
vnanomsg
# Links the target library to the log library
# included in the NDK.
${log-lib})
在JNI的方法中,我们加入了sengbyte和recvbyte两个方法,用于处理直接发送为byte[],不需要再转成String了
因为byte[]数组在NDK中为jbyteArray,在nn_send和nn_recv中通讯我们要传入char*指针,所以在NDK中还要加呚两个函数是jbytearr和pchar的相互转换。
//JByteArray转为PChar
char *jbyteArrTopChar(JNIEnv *env, jbyteArray array, int len) {
char *buf = new char[len];
env->GetByteArrayRegion(array, 0, len, reinterpret_cast<jbyte *>(buf));
return buf;
}
//pchar转为jbyteArray
jbyteArray pCharTojbyteArr(JNIEnv *env, char *buf, int len) {
jbyteArray array = env->NewByteArray(len);
env->SetByteArrayRegion(array, 0, len, reinterpret_cast<jbyte *>(buf));
return array;
}
在NanoMsg的PUBSUB模式中,做为SUB端我们可以设置订阅的主题,所以我们这里也加入了一个订阅的JNI方法,取消订阅和订阅只是一个参数不同,所以我们用一个方法实现即可。
//设置订单主题前缀
external fun subscribe(connectsocket: Int, subs: String, itype: Int): Int
external fun subscribebyte(connectsocket: Int, subs: ByteArray, itype: Int): Int
对应的native-lib.cpp中的方法
extern "C" JNIEXPORT jint JNICALL
Java_com_vaccae_vnanomsg_utils_NanoMsgJNI_subscribe(
JNIEnv *env,
jobject,
jint socketid_,
jstring prestr_,
jint itype_
) {
int subcount = -1;
//获取发送字符串
const char *prestr = env->GetStringUTFChars(prestr_, 0);
//计算发送字节长度
int str_len = strlen(prestr);
try {
//设置订阅前缀
if (itype_ == 0) {
subcount = nn_setsockopt(socketid_, NN_SUB, NN_SUB_SUBSCRIBE, prestr,
static_cast<size_t>(str_len));
} else{
subcount = nn_setsockopt(socketid_, NN_SUB, NN_SUB_UNSUBSCRIBE, prestr,
static_cast<size_t>(str_len));
}
if (subcount < 0) {
throw subcount;
}
} catch (int e) {
char errmsg[100];
sprintf(errmsg, "设置订阅前缀失败!返回码:%d", e);
LOGE("%s\n", errmsg);
throwByName(env, ERRCLS, errmsg);
}
return subcount;
}
extern "C" JNIEXPORT jint JNICALL
Java_com_vaccae_vnanomsg_utils_NanoMsgJNI_subscribebyte(
JNIEnv *env,
jobject,
jint socketid_,
jbyteArray prebytearr_,
jint itype_
) {
int subcount = -1;
//计算发送字节长度
int bytelen = env->GetArrayLength(prebytearr_);
//获取发送字符串
const char *prestr = jbyteArrTopChar(env, prebytearr_, bytelen);
//计算发送字节长度
int str_len = strlen(prestr);
try {
//设置订阅前缀
if (itype_ == 0) {
subcount = nn_setsockopt(socketid_, NN_SUB, NN_SUB_SUBSCRIBE, prestr,
static_cast<size_t>(str_len));
} else{
subcount = nn_setsockopt(socketid_, NN_SUB, NN_SUB_UNSUBSCRIBE, prestr,
static_cast<size_t>(str_len));
}
if (subcount < 0) {
throw subcount;
}
} catch (int e) {
char errmsg[100];
sprintf(errmsg, "设置订阅前缀失败!返回码:%d", e);
LOGE("%s\n", errmsg);
throwByName(env, ERRCLS, errmsg);
}
return subcount;
}
在测试nn_recv的发现,原来直接接收到的char*返回给android后,有时候会多一个字符,反复测试和分析了一下,发现直接传给nn_recv函数的char*指针最好不要直接用,还要通过返回的接收的int值再重橷截取一遍后再传给android才可以,所以recv的函数也做了一下修改。
extern "C" JNIEXPORT jbyteArray JNICALL
Java_com_vaccae_vnanomsg_utils_NanoMsgJNI_recvbyte(
JNIEnv *env,
jobject,
jint socketid_
) {
jbyteArray recvbytes = NULL;
try {
int nbytes = 0;
//定义一个空指针
void *buf = NULL;
//接收数据
nbytes = nn_recv(socketid_, &buf, NN_MSG, 0);
if (nbytes < 0) {
return recvbytes;
} else {
//接上数据按接收长度重新生成char,防止出现多一个字符情况
char *recvbuf = new char[nbytes];
memcpy(recvbuf, (char *) buf, static_cast<size_t>(nbytes));
recvbytes = pCharTojbyteArr(env, recvbuf, nbytes);
int rc = nn_freemsg(buf);
if (rc != 0) {
throw rc;
}
}
} catch (int e) {
char errmsg[100];
sprintf(errmsg, "接收数据失败!返回码:%d", e);
LOGE("%s\n", errmsg);
throwByName(env, ERRCLS, errmsg);
}
return recvbytes;
}
刚做完SURVEY模式时,发现服务端发送的数据无法接收到,也是查了好久,包括下载了NNanomsg的封装源码看,发现了可能NNanoMsg用的版本要比我的低,他的宏定义和我下载的源码中的定义不一样,而且这个里面也说了老版本的是0和1,新版本的改为2和3了
/* NB: Version 0 used 16 + 0/1. That version lacked backtraces, and so
is wire-incompatible with this version. */
survey.h的修改
/*
Copyright (c) 2012 Martin Sustrik All rights reserved.
Copyright 2015 Garrett D'Amore <garrett@damore.org>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom
the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
*/
#ifndef SURVEY_H_INCLUDED
#define SURVEY_H_INCLUDED
#ifdef __cplusplus
extern "C" {
#endif
#define NN_PROTO_SURVEY 6
/* NB: Version 0 used 16 + 0/1. That version lacked backtraces, and so
is wire-incompatible with this version. */
//#define NN_SURVEYOR (NN_PROTO_SURVEY * 16 + 2)
//#define NN_RESPONDENT (NN_PROTO_SURVEY * 16 + 3)
#define NN_SURVEYOR (NN_PROTO_SURVEY * 16 + 0)
#define NN_RESPONDENT (NN_PROTO_SURVEY * 16 + 1)
#define NN_SURVEYOR_DEADLINE 1
#ifdef __cplusplus
}
#endif
#endif
暂时就遇到了这几个问题,所以也就列了这么多了,下面几个动图就是视频中对应模式通讯的效果
REQREP模式
PUBSUB模式
SURVEY模式
源码地址
https://github.com/Vaccae/VNanoMsg.git
上面的地址已经改为VNanoMsg的发布地址了,后面的文章我们介绍VNanoMsg的使用,敬请期待。