前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【IoT迷你赛】完成结果视频

【IoT迷你赛】完成结果视频

原创
作者头像
杰杰
修改2019-08-27 10:27:57
1.3K0
修改2019-08-27 10:27:57
举报
文章被收录于专栏:腾讯云IoT腾讯云IoT腾讯云IoT

首先放视频

视频内容

【提示】下面图片代码略多,注意观看~

初次连接到腾讯云物联网开发平台(IoT Explorer)

一开始拿到板子就使用demo连接到腾讯云物联网开发平台(IoT Explorer):

image.png
image.png
image.png
image.png

完美完成通信。

开发自己的demo

方案构思如下:https://cloud.tencent.com/developer/article/1492173

方案构思
方案构思

完成情况:

微信小程序开发
微信小程序开发
pc上位机开发
pc上位机开发

更多功能详见视频简介。

使用TencentOS tiny移植lwip

lwip工程demo
lwip工程demo

移植的接口文件

移植的部分接口文件如下:

#include "debug.h"

#include <lwip/opt.h>
#include <lwip/arch.h>

#include "tcpip.h"
#include "lwip/init.h"
#include "lwip/netif.h"
#include "lwip/sio.h"
#include "ethernetif.h"

#if !NO_SYS
#include "sys_arch.h"
#endif
#include <lwip/stats.h>
#include <lwip/debug.h>
#include <lwip/sys.h>
#include "lwip/dhcp.h"
#include <string.h>

int errno;


u32_t lwip_sys_now;

struct sys_timeouts {
  struct sys_timeo *next;
};

struct timeoutlist
{
	struct sys_timeouts timeouts;
	k_task_t *pid;
};

#define SYS_THREAD_MAX 4

static struct timeoutlist s_timeoutlist[SYS_THREAD_MAX];

static u16_t s_nextthread = 0;

u32_t
sys_jiffies(void)
{
  lwip_sys_now = tos_systick_get();
  return lwip_sys_now;
}

u32_t
sys_now(void)
{
  lwip_sys_now = tos_systick_get();
  return lwip_sys_now;
}

void
sys_init(void)
{
	int i;
	// Initialize the the per-thread sys_timeouts structures
	// make sure there are no valid pids in the list
	for(i = 0; i < SYS_THREAD_MAX; i++)
	{
		s_timeoutlist[i].pid = 0;
		s_timeoutlist[i].timeouts.next = NULL;
	}
	// keep track of how many threads have been created
	s_nextthread = 0;
}

struct sys_timeouts *sys_arch_timeouts(void)
{
	int i;
	k_task_t *pid;
	struct timeoutlist *tl;
	pid = k_curr_task;
	for(i = 0; i < s_nextthread; i++)
	{
		tl = &(s_timeoutlist[i]);
		if(tl->pid == pid)
		{
			return &(tl->timeouts);
		}
	}
	return NULL;
}

sys_prot_t sys_arch_protect(void)
{
    TOS_CPU_CPSR_ALLOC();
    TOS_CPU_INT_DISABLE();
    return cpu_cpsr;
}

void sys_arch_unprotect(sys_prot_t cpu_cpsr)
{
	TOS_CPU_INT_ENABLE();
}

#if !NO_SYS


err_t
sys_sem_new(sys_sem_t *sem, u8_t count)
{
    /* 创建 sem */
    tos_sem_create(sem,count);

#if SYS_STATS
	++lwip_stats.sys.sem.used;
 	if (lwip_stats.sys.sem.max < lwip_stats.sys.sem.used) {
		lwip_stats.sys.sem.max = lwip_stats.sys.sem.used;
	}
#endif /* SYS_STATS */
  
  if(sem != SYS_SEM_NULL)
    return ERR_OK;
  else
  {
#if SYS_STATS
    ++lwip_stats.sys.sem.err;
#endif /* SYS_STATS */
    printf("[sys_arch]:new sem fail!\n");
    return ERR_MEM;
  }
}

void
sys_sem_free(sys_sem_t *sem)
{
#if SYS_STATS
   --lwip_stats.sys.sem.used;
#endif /* SYS_STATS */
  /* 删除 sem */
  tos_sem_destroy(sem);
  sem = SYS_SEM_NULL;
}


int sys_sem_valid(sys_sem_t *sem)                                               
{
  return (sem->pend_obj.type != NULL);                                    
}


void
sys_sem_set_invalid(sys_sem_t *sem)
{
    sem = SYS_SEM_NULL;
    (void)sem;
}

/* 
 如果timeout参数不为零,则返回值为
 等待信号量所花费的毫秒数。如果
 信号量未在指定时间内发出信号,返回值为
 SYS_ARCH_TIMEOUT。如果线程不必等待信号量
 该函数返回零。 */
u32_t
sys_arch_sem_wait(sys_sem_t *sem, u32_t timeout)
{
  k_tick_t wait_tick = 0;
  k_tick_t start_tick = 0 ;
  
  //看看信号量是否有效
  if(sem == SYS_SEM_NULL)
    return SYS_ARCH_TIMEOUT;
  
  //首先获取开始等待信号量的时钟节拍
  start_tick = sys_now();
  
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(timeout != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  else
    wait_tick = TOS_TIME_FOREVER;  //一直阻塞
  
  //等待成功,计算等待的时间,否则就表示等待超时
  if(tos_sem_pend(sem, wait_tick) == K_ERR_NONE)
    return ((sys_now()-start_tick) * (1000/TOS_CFG_CPU_TICK_PER_SECOND));
  else
    return SYS_ARCH_TIMEOUT;
}

void
sys_sem_signal(sys_sem_t *sem)
{
  if(tos_sem_post( sem ) != K_ERR_NONE)
    printf("[sys_arch]:sem signal fail!\n");
}

err_t
sys_mutex_new(sys_mutex_t *mutex)
{
  /* 创建 sem */   
  tos_mutex_create(mutex);
  if(mutex != SYS_MRTEX_NULL)
    return ERR_OK;
  else
  {
    printf("[sys_arch]:new mutex fail!\n");
    return ERR_MEM;
  }
}

void
sys_mutex_free(sys_mutex_t *mutex)
{
  tos_mutex_destroy(mutex);
}

void
sys_mutex_set_invalid(sys_mutex_t *mutex)
{
    mutex = SYS_MRTEX_NULL;
    (void)mutex;
}

void
sys_mutex_lock(sys_mutex_t *mutex)
{
  tos_mutex_pend_timed(mutex,/* 互斥量句柄 */
                       TOS_TIME_FOREVER); /* 等待时间 */
}

void
sys_mutex_unlock(sys_mutex_t *mutex)
{
  tos_mutex_post( mutex );//给出互斥量
}


sys_thread_t
sys_thread_new(const char *name, lwip_thread_fn function, void *arg, int stacksize, int prio)
{
    k_err_t err;
    sys_thread_t task;
    k_stack_t *task_stack;
    
    task = tos_mmheap_alloc(sizeof(k_task_t));
    task_stack = tos_mmheap_alloc(stacksize);
    /* 创建MidPriority_Task任务 */
    err = tos_task_create(task, 
                          (char*)name, 
                          function,
                          arg, 
                          prio, 
                          task_stack,
                          stacksize,
                          20);
    if(err != K_ERR_NONE)
        printf("TencentOS Create task fail! code : %d \r\n",err);
  if(task == K_NULL)
  {
    printf("[sys_arch]:create task fail!\n");
    return NULL;
  }
  return task;
}

err_t
sys_mbox_new(sys_mbox_t *mbox, int size)
{
    k_err_t err;
    /* 创建Test_Queue */
    err = tos_queue_create(mbox);
    if(err != K_ERR_NONE)
    {
        printf("TencentOS Create mbox fail! code : %d \r\n",err);
        return ERR_MEM;
    }
    return ERR_OK;
}

void
sys_mbox_free(sys_mbox_t *mbox)
{
    tos_queue_destroy(mbox);
}

int sys_mbox_valid(sys_mbox_t *mbox)          
{      
  return (mbox->pend_obj.type != NULL);
}   

void
sys_mbox_set_invalid(sys_mbox_t *mbox)
{
    mbox = SYS_MBOX_NULL; 
    (void)mbox;
}

void
sys_mbox_post(sys_mbox_t *q, void *msg)
{
    tos_queue_post( q, /* 消息队列的句柄 */
                    msg,/* 发送的消息内容 */
                    sizeof(void *));
//    while(tos_queue_post( q, /* 消息队列的句柄 */
//                    msg,/* 发送的消息内容 */
//                    sizeof(void *)) != K_ERR_NONE);
}

err_t
sys_mbox_trypost(sys_mbox_t *q, void *msg)
{
  if(tos_queue_post(q,msg,sizeof(void *)) == K_ERR_NONE)  
    return ERR_OK;
  else
    return ERR_MEM;
}

err_t
sys_mbox_trypost_fromisr(sys_mbox_t *q, void *msg)
{
  return sys_mbox_trypost(q, msg);
}

u32_t
sys_arch_mbox_fetch(sys_mbox_t *q, void **msg, u32_t timeout)
{
  void *dummyptr;
  k_tick_t wait_tick = 0;
  k_tick_t start_tick = 0 ;
  size_t size;
  size = sizeof(void *);
    
  if (msg == NULL )  //看看存储消息的地方是否有效
		msg = &dummyptr;
  
  //首先获取开始等待信号量的时钟节拍
  start_tick = sys_now();
  
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(timeout != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = timeout / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  //一直阻塞
  else
    wait_tick = TOS_TIME_FOREVER;
  
  //等待成功,计算等待的时间,否则就表示等待超时
  if(tos_queue_pend(q,&(*msg),&size, wait_tick) == K_ERR_NONE)
    return ((sys_now() - start_tick)*(1000/TOS_CFG_CPU_TICK_PER_SECOND));
  else
  {
    *msg = NULL;
    return SYS_ARCH_TIMEOUT;
  }
}

u32_t
sys_arch_mbox_tryfetch(sys_mbox_t *q, void **msg)
{
    size_t size;
    size = sizeof(void *);
	void *dummyptr;
	if ( msg == NULL )
		msg = &dummyptr;
  
  //等待成功,计算等待的时间
  if(tos_queue_pend(q,&(*msg),&size, 0) == K_ERR_NONE)
    return ERR_OK;
  else
    return SYS_MBOX_EMPTY;
}

#if LWIP_NETCONN_SEM_PER_THREAD
#error LWIP_NETCONN_SEM_PER_THREAD==1 not supported
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

#endif /* !NO_SYS */

/* Variables Initialization */
struct netif gnetif;
ip4_addr_t ipaddr;
ip4_addr_t netmask;
ip4_addr_t gw;
uint8_t IP_ADDRESS[4];
uint8_t NETMASK_ADDRESS[4];
uint8_t GATEWAY_ADDRESS[4];

void TCPIP_Init(void)
{
  tcpip_init(NULL, NULL);
  
  /* IP addresses initialization */
  /* USER CODE BEGIN 0 */
#if LWIP_DHCP
  ip_addr_set_zero_ip4(&ipaddr);
  ip_addr_set_zero_ip4(&netmask);
  ip_addr_set_zero_ip4(&gw);
#else
  IP4_ADDR(&ipaddr,IP_ADDR0,IP_ADDR1,IP_ADDR2,IP_ADDR3);
  IP4_ADDR(&netmask,NETMASK_ADDR0,NETMASK_ADDR1,NETMASK_ADDR2,NETMASK_ADDR3);
  IP4_ADDR(&gw,GW_ADDR0,GW_ADDR1,GW_ADDR2,GW_ADDR3);
#endif /* USE_DHCP */
  /* USER CODE END 0 */
  /* Initilialize the LwIP stack without RTOS */
  /* add the network interface (IPv4/IPv6) without RTOS */
  netif_add(&gnetif, &ipaddr, &netmask, &gw, NULL, &ethernetif_init, &tcpip_input);

  /* Registers the default network interface */
  netif_set_default(&gnetif);

  if (netif_is_link_up(&gnetif))
  {
    /* When the netif is fully configured this function must be called */
    netif_set_up(&gnetif);
  }
  else
  {
    /* When the netif link is down this function must be called */
    netif_set_down(&gnetif);
  }
  
#if LWIP_DHCP	   			//若使用了DHCP
  int err;
  /*  Creates a new DHCP client for this interface on the first call.
  Note: you must call dhcp_fine_tmr() and dhcp_coarse_tmr() at
  the predefined regular intervals after starting the client.
  You can peek in the netif->dhcp struct for the actual DHCP status.*/
  
  printf("本例程将使用DHCP动态分配IP地址,如果不需要则在lwipopts.h中将LWIP_DHCP定义为0\n\n");
  
  err = dhcp_start(&gnetif);      //开启dhcp
  if(err == ERR_OK)
    printf("lwip dhcp init success...\n\n");
  else
    printf("lwip dhcp init fail...\n\n");
  while(ip_addr_cmp(&(gnetif.ip_addr),&ipaddr))   //等待dhcp分配的ip有效
  {
    vTaskDelay(1);
  } 
#endif
  printf("本地IP地址是:%d.%d.%d.%d\n\n",  \
        ((gnetif.ip_addr.addr)&0x000000ff),       \
        (((gnetif.ip_addr.addr)&0x0000ff00)>>8),  \
        (((gnetif.ip_addr.addr)&0x00ff0000)>>16), \
        ((gnetif.ip_addr.addr)&0xff000000)>>24);
}

iperf测速

测速信息(表示非常稳定,更多详情见附件):

bin/iperf.exe -c 192.168.0.122 -P 1 -i 1 -p 5001 -f k -t 1000000000
------------------------------------------------------------
Client connecting to 192.168.0.122, TCP port 5001
TCP window size: 64.0 KByte (default)
------------------------------------------------------------
[300] local 192.168.0.195 port 56104 connected with 192.168.0.122 port 5001
[ ID] Interval       Transfer     Bandwidth
[300]  0.0- 1.0 sec  11640 KBytes  95355 Kbits/sec
[300]  1.0- 2.0 sec  11592 KBytes  94962 Kbits/sec
[300]  2.0- 3.0 sec  11576 KBytes  94831 Kbits/sec
[300]  3.0- 4.0 sec  11592 KBytes  94962 Kbits/sec
[300]  4.0- 5.0 sec  11592 KBytes  94962 Kbits/sec
[300]  5.0- 6.0 sec  11576 KBytes  94831 Kbits/sec
[300]  6.0- 7.0 sec  11592 KBytes  94962 Kbits/sec
[300]  7.0- 8.0 sec  11576 KBytes  94831 Kbits/sec
[300]  8.0- 9.0 sec  11592 KBytes  94962 Kbits/sec
[300]  9.0-10.0 sec  11576 KBytes  94831 Kbits/sec
[300] 10.0-11.0 sec  11552 KBytes  94634 Kbits/sec
[300] 11.0-12.0 sec  11592 KBytes  94962 Kbits/sec
[300] 12.0-13.0 sec  11568 KBytes  94765 Kbits/sec
[300] 13.0-14.0 sec  11592 KBytes  94962 Kbits/sec
[300] 14.0-15.0 sec  11576 KBytes  94831 Kbits/sec
[300] 15.0-16.0 sec  11584 KBytes  94896 Kbits/sec

移植emxgui

emxgui
emxgui

部分接口文件如下(效果见视频):

#include	<stddef.h>
#include	"emXGUI_Arch.h"

#include "tos.h"

/*===================================================================================*/
/*
函数功能: 创建一个互斥(该互斥锁必须支持嵌套使用)
返回: 互斥对象句柄(唯一标识)
说明: 互斥对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_MUTEX*	GUI_MutexCreate(void)
{
    GUI_MUTEX *mutex;
    mutex = tos_mmheap_alloc(sizeof(k_mutex_t));
	tos_mutex_create((k_mutex_t*)mutex);	
    return mutex;
}

/*===================================================================================*/
/*
函数功能: 互斥锁定
参数: hMutex(由GUI_MutexCreate返回的句柄); 
      time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL	GUI_MutexLock(GUI_MUTEX *hMutex,U32 time)
{
  k_tick_t wait_tick;
  //timeout != 0,需要将ms换成系统的时钟节拍
  if(time != 0)
  {
    //将ms转换成时钟节拍
    wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
    if (wait_tick == 0)
      wait_tick = 1;
  }
  else if(time == 0xFFFFFFFF)
    wait_tick = TOS_TIME_FOREVER;  //一直阻塞
  
    if(tos_mutex_pend_timed((k_mutex_t*)hMutex, time) == K_ERR_NONE)
	{
		return TRUE;
	}
	return	FALSE;
}

/*===================================================================================*/
/*
函数功能: 互斥解锁
参数: hMutex(由GUI_MutexCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_MutexUnlock(GUI_MUTEX *hMutex)
{
	 tos_mutex_post((k_mutex_t*)hMutex);
}

/*===================================================================================*/
/*
函数功能: 互斥删除
参数: hMutex(由GUI_MutexCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_MutexDelete(GUI_MUTEX *hMutex)
{
	tos_mutex_destroy((k_mutex_t*)hMutex);
    tos_mmheap_free(hMutex);
}

/*===================================================================================*/
/*
函数功能: 创建一个信号量
参数: init: 信号量初始值; max: 信号量最大值
返回: 信号量对象句柄(唯一标识)
说明: 信号量对象句柄按实际OS所定,可以是指针,ID号等...
*/
GUI_SEM*	GUI_SemCreate(int init,int max)
{
    GUI_SEM *sem;
    sem = tos_mmheap_alloc(sizeof(k_sem_t));
	tos_sem_create((k_sem_t*)sem,init);	
    return sem;
}

/*===================================================================================*/
/*
函数功能: 信号量等待
参数: hsem(由GUI_SemCreate返回的句柄); 
      time 最长等待毫秒数,0立既返回,0xFFFFFFFF,一直等待
返回: TRUE:成功;FALSE:失败或超时
说明: .
*/
BOOL	GUI_SemWait(GUI_SEM *hsem,U32 time)
{
    k_tick_t wait_tick;
    //timeout != 0,需要将ms换成系统的时钟节拍
    if(time != 0)
    {
        //将ms转换成时钟节拍
        wait_tick = time / (1000/TOS_CFG_CPU_TICK_PER_SECOND);
        if (wait_tick == 0)
            wait_tick = 1;
    }
    else if(time == 0xFFFFFFFF)
        wait_tick = TOS_TIME_FOREVER;  //一直阻塞
    else
        wait_tick = 0;
    
	if(tos_sem_pend((k_sem_t*)hsem,time)== K_ERR_NONE)
	{
		return TRUE;
	}
	return FALSE;
}

/*===================================================================================*/
/*
函数功能: 信号量发送
参数: hsem(由GUI_SemCreate返回的句柄);  
返回: 无
说明: .
*/
void	GUI_SemPost(GUI_SEM *hsem)
{
	tos_sem_post((k_sem_t*)hsem);
}
/*
函数功能: 信号量发送(受freertos管理的中断)
参数: hsem(由GUI_SemCreate返回的句柄);  
返回: 无
说明: 若在受freertos管理的中断中调用GUI_SemPost,会导致port.c:425
*/
void GUI_SemPostISR(GUI_SEM *hsem)
{ 
    tos_sem_post((k_sem_t*)hsem);
}
/*===================================================================================*/
/*
函数功能: 信号量删除
参数: hsem(由GUI_SemCreate返回的句柄);    
返回: 无
说明: .
*/
void	GUI_SemDelete(GUI_SEM *hsem)
{
	tos_sem_destroy((k_sem_t*)hsem);
    tos_mmheap_free(hsem);
}

/*===================================================================================*/
/*
函数功能: 获得当前线程句柄(唯一标识)
参数: 无  
返回: 当前线程唯一标识,按实际OS所定,可以是指针,ID号等...
说明: .
*/
HANDLE	GUI_GetCurThreadHandle(void)
{
	return	(HANDLE)k_curr_task;
}


/*===================================================================================*/
/*
函数功能: 获得当前系统时间(单位:毫秒)
参数: 无  
返回: 当前系统时间
说明: .
*/
U32	GUI_GetTickCount(void)
{
	U32 i;
	
	i=tos_systick_get();
	
	return (i*1000)/TOS_CFG_CPU_TICK_PER_SECOND;

}

/*===================================================================================*/
/*
函数功能: 最短时间内让出CPU
参数: 无  
返回: 无
说明: 按具体OS情况而定,最简单的方法是:OS Delay 一个 tick 周期.
*/
void	GUI_Yield(void)
{
	tos_task_delay(2);
}

/*===================================================================================*/
/*
函数功能: 延时函数
参数: ms: 延时时间(单位:毫秒) 
返回: 无
说明: 
*/
void	GUI_msleep(u32 ms)
{
	tos_task_delay((ms/(1000/TOS_CFG_CPU_TICK_PER_SECOND)));
}

/*
 * 函数功能: 创建线程
 * @param name 线程名
 * @param entry 线程入口函数
 * @param parameter 线程参数
 * @param stack_size 线程栈大小(单位字节,注意部分系统需要进行单位转换)
 * @param priority 线程优先级
 * @param tick FreeRTOS没这个功能,时间片(同优先级任务的时间片轮转)
 * @return 是否创建成功
*/
BOOL GUI_Thread_Create(void (*entry)(void *parameter),
                         const char *name,
                         u32  stack_size,
                         void *parameter,
                         u32  priority,
                         u32  tick)
{
    k_err_t err;
    k_task_t *task;
    k_stack_t *task_stack;
    
    task = tos_mmheap_alloc(sizeof(k_task_t));
    task_stack = tos_mmheap_alloc(stack_size);
    /* 创建MidPriority_Task任务 */
    err = tos_task_create(task, 
                          (char*)name, 
                          entry,
                          parameter, 
                          priority, 
                          task_stack,
                          stack_size,
                          tick);

  if(err == K_ERR_NONE)
    return TRUE;
  else
  {
    GUI_ERROR("GUI Thread Create failed:%s",name);
    return FALSE; 
  }    
}

/**
 * @breif: 删除线程,可通过GUI_GetCurThreadHandle获取当前任务句柄作为输入参数
 * @return 无
*/
void GUI_Thread_Delete(HANDLE thread)
{
    k_task_t* task = (k_task_t*)thread;
    tos_task_destroy(task);
    tos_mmheap_free(task->stk_base);
    tos_mmheap_free(task);
    
}
#endif

使用iot hub平台

设备在线情况:

image.png
image.png

使用规则引擎:

image.png
image.png

开发云平台签名小工具:

image.png
image.png

MQTT.fx连接

image.png
image.png

stm32f429使用mqtt连接iot hub平台

image.png
image.png

部分mqtt代码:

#include "mqttclient.h"
#include "transport.h"
#include "MQTTPacket.h"
#include "tos.h"

#include "string.h"
#include "sockets.h"

#include "lwip/opt.h"

#include "lwip/sys.h"
#include "lwip/api.h"

#include "lwip/sockets.h"

#include "cJSON_Process.h"
#include "bsp_dht11.h"

/******************************* 全局变量声明 ************************************/
/*
 * 当我们在写应用程序的时候,可能需要用到一些全局变量。
 */
extern k_queue_t MQTT_Data_Queue;


//定义用户消息结构体
MQTT_USER_MSG  mqtt_user_msg;

int32_t MQTT_Socket = 0;


void deliverMessage(MQTTString *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg);

/************************************************************************
** 函数名称: MQTT_Connect								
** 函数功能: 初始化客户端并登录服务器
** 入口参数: int32_t sock:网络描述符
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
uint8_t MQTT_Connect(void)
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    uint8_t buf[200];
    int buflen = sizeof(buf);
    int len = 0;
    data.clientID.cstring = CLIENT_ID;                   //随机
    data.keepAliveInterval = KEEPLIVE_TIME;         //保持活跃
    data.username.cstring = USER_NAME;              //用户名
    data.password.cstring = PASSWORD;               //密钥
    data.MQTTVersion = MQTT_VERSION;                //3表示3.1版本,4表示3.11版本
    data.cleansession = 1;
    //组装消息
    len = MQTTSerialize_connect((unsigned char *)buf, buflen, &data);
    //发送消息
    transport_sendPacketBuffer(buf, len);

    /* 等待连接响应 */
    if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
    {
        unsigned char sessionPresent, connack_rc;
        if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
        {
          printf("无法连接,错误代码是: %d!\n", connack_rc);
            return Connect_NOK;
        }
        else 
        {
            printf("用户名与密钥验证成功,MQTT连接成功!\n");
            return Connect_OK;
        }
    }
    else
        printf("MQTT连接无响应!\n");
        return Connect_NOTACK;
}


/************************************************************************
** 函数名称: MQTT_PingReq								
** 函数功能: 发送MQTT心跳包
** 入口参数: 无
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTT_PingReq(int32_t sock)
{
	  int32_t len;
		uint8_t buf[200];
		int32_t buflen = sizeof(buf);	 
		fd_set readfd;
	  struct timeval tv;
	  tv.tv_sec = 5;
	  tv.tv_usec = 0;
	
	  FD_ZERO(&readfd);
	  FD_SET(sock,&readfd);			
	
		len = MQTTSerialize_pingreq(buf, buflen);
		transport_sendPacketBuffer(buf, len);
	
		//等待可读事件
		if(select(sock+1,&readfd,NULL,NULL,&tv) == 0)
			return -1;
		
	  //有可读事件
		if(FD_ISSET(sock,&readfd) == 0)
			return -2;
		
		if(MQTTPacket_read(buf, buflen, transport_getdata) != PINGRESP)
			return -3;
		
		return 0;
	
}


/************************************************************************
** 函数名称: MQTTSubscribe								
** 函数功能: 订阅消息
** 入口参数: int32_t sock:套接字
**           int8_t *topic:主题
**           enum QoS pos:消息质量
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTTSubscribe(int32_t sock,char *topic,enum QoS pos)
{
	  static uint32_t PacketID = 0;
	  uint16_t packetidbk = 0;
	  int32_t conutbk = 0;
		uint8_t buf[100];
		int32_t buflen = sizeof(buf);
	  MQTTString topicString = MQTTString_initializer;  
		int32_t len;
	  int32_t req_qos,qosbk;
	
		fd_set readfd;
	  struct timeval tv;
	  tv.tv_sec = 2;
	  tv.tv_usec = 0;
	
	  FD_ZERO(&readfd);
	  FD_SET(sock,&readfd);		
	
	  //复制主题
    topicString.cstring = (char *)topic;
		//订阅质量
	  req_qos = pos;
	
	  //串行化订阅消息
    len = MQTTSerialize_subscribe(buf, buflen, 0, PacketID++, 1, &topicString, &req_qos);
		//发送TCP数据
	  if(transport_sendPacketBuffer(buf, len) < 0)
				return -1;
	  
    //等待可读事件--等待超时
		if(select(sock+1,&readfd,NULL,NULL,&tv) == 0)
				return -2;
		//有可读事件--没有可读事件
		if(FD_ISSET(sock,&readfd) == 0)
				return -3;

		//等待订阅返回--未收到订阅返回
		if(MQTTPacket_read(buf, buflen, transport_getdata) != SUBACK)
				return -4;	
		
		//拆订阅回应包
		if(MQTTDeserialize_suback(&packetidbk,1, &conutbk, &qosbk, buf, buflen) != 1)
				return -5;
		
		//检测返回数据的正确性
		if((qosbk == 0x80)||(packetidbk != (PacketID-1)))
				return -6;
		
    //订阅成功
		return 0;
}


/************************************************************************
** 函数名称: UserMsgCtl						
** 函数功能: 用户消息处理函数
** 入口参数: MQTT_USER_MSG  *msg:消息结构体指针
** 出口参数: 无
** 备    注: 
************************************************************************/
void UserMsgCtl(MQTT_USER_MSG  *msg)
{
		//这里处理数据只是打印,用户可以在这里添加自己的处理方式
	  printf("*****收到订阅的消息!******\n");
		//返回后处理消息
	  switch(msg->msgqos)
		{
			case 0:
				    printf("MQTT>>消息质量:QoS0\n");
				    break;
			case 1:
				    printf("MQTT>>消息质量:QoS1\n");
				    break;
			case 2:
				    printf("MQTT>>消息质量:QoS2\n");
				    break;
			default:
				    printf("MQTT>>错误的消息质量\n");
				    break;
		}
		printf("MQTT>>消息主题:%s\n",msg->topic);	
		printf("MQTT>>消息类容:%s\n",msg->msg);	
		printf("MQTT>>消息长度:%d\n",msg->msglenth);	 
        Proscess(msg->msg);
	  //处理完后销毁数据
	  msg->valid  = 0;
}

/************************************************************************
** 函数名称: GetNextPackID						
** 函数功能: 产生下一个数据包ID
** 入口参数: 无
** 出口参数: uint16_t packetid:产生的ID
** 备    注: 
************************************************************************/
uint16_t GetNextPackID(void)
{
	 static uint16_t pubpacketid = 0;
	 return pubpacketid++;
}

/************************************************************************
** 函数名称: mqtt_msg_publish						
** 函数功能: 用户推送消息
** 入口参数: MQTT_USER_MSG  *msg:消息结构体指针
** 出口参数: >=0:发送成功 <0:发送失败
** 备    注: 
************************************************************************/
int32_t MQTTMsgPublish(int32_t sock, char *topic, int8_t qos, uint8_t* msg)
{
    int8_t retained = 0;      //保留标志位
    uint32_t msg_len;         //数据长度
		uint8_t buf[MSG_MAX_LEN];
		int32_t buflen = sizeof(buf),len;
		MQTTString topicString = MQTTString_initializer;
	  uint16_t packid = 0,packetidbk;
	
		//填充主题
	  topicString.cstring = (char *)topic;

	  //填充数据包ID
	  if((qos == QOS1)||(qos == QOS2))
		{ 
			packid = GetNextPackID();
		}
		else
		{
			  qos = QOS0;
			  retained = 0;
			  packid = 0;
		}
     
    msg_len = strlen((char *)msg);
    
		//推送消息
		len = MQTTSerialize_publish(buf, buflen, 0, qos, retained, packid, topicString, (unsigned char*)msg, msg_len);
		if(len <= 0)
				return -1;
		if(transport_sendPacketBuffer(buf, len) < 0)	
				return -2;	
		
		//质量等级0,不需要返回
		if(qos == QOS0)
		{
				return 0;
		}
		
		//等级1
		if(qos == QOS1)
		{
				//等待PUBACK
			  if(WaitForPacket(sock,PUBACK,5) < 0)
					 return -3;
				return 1;
			  
		}
		//等级2
		if(qos == QOS2)	
		{
			  //等待PUBREC
			  if(WaitForPacket(sock,PUBREC,5) < 0)
					 return -3;
			  //发送PUBREL
        len = MQTTSerialize_pubrel(buf, buflen,0, packetidbk);
				if(len == 0)
					return -4;
				if(transport_sendPacketBuffer(buf, len) < 0)	
					return -6;			
			  //等待PUBCOMP
			  if(WaitForPacket(sock,PUBREC,5) < 0)
					 return -7;
				return 2;
		}
		//等级错误
		return -8;
}

/************************************************************************
** 函数名称: ReadPacketTimeout					
** 函数功能: 阻塞读取MQTT数据
** 入口参数: int32_t sock:网络描述符
**           uint8_t *buf:数据缓存区
**           int32_t buflen:缓冲区大小
**           uint32_t timeout:超时时间--0-表示直接查询,没有数据立即返回
** 出口参数: -1:错误,其他--包类型
** 备    注: 
************************************************************************/
int32_t ReadPacketTimeout(int32_t sock,uint8_t *buf,int32_t buflen,uint32_t timeout)
{
		fd_set readfd;
	  struct timeval tv;
	  if(timeout != 0)
		{
				tv.tv_sec = timeout;
				tv.tv_usec = 0;
				FD_ZERO(&readfd);
				FD_SET(sock,&readfd); 

				//等待可读事件--等待超时
				if(select(sock+1,&readfd,NULL,NULL,&tv) == 0)
						return -1;
				//有可读事件--没有可读事件
				if(FD_ISSET(sock,&readfd) == 0)
						return -1;
	  }
		//读取TCP/IP事件
		return MQTTPacket_read(buf, buflen, transport_getdata);
}


/************************************************************************
** 函数名称: deliverMessage						
** 函数功能: 接受服务器发来的消息
** 入口参数: MQTTMessage *msg:MQTT消息结构体
**           MQTT_USER_MSG *mqtt_user_msg:用户接受结构体
**           MQTTString  *TopicName:主题
** 出口参数: 无
** 备    注: 
************************************************************************/
void deliverMessage(MQTTString  *TopicName,MQTTMessage *msg,MQTT_USER_MSG *mqtt_user_msg)
{
		//消息质量
		mqtt_user_msg->msgqos = msg->qos;
		//保存消息
		memcpy(mqtt_user_msg->msg,msg->payload,msg->payloadlen);
		mqtt_user_msg->msg[msg->payloadlen] = 0;
		//保存消息长度
		mqtt_user_msg->msglenth = msg->payloadlen;
		//消息主题
		memcpy((char *)mqtt_user_msg->topic,TopicName->lenstring.data,TopicName->lenstring.len);
		mqtt_user_msg->topic[TopicName->lenstring.len] = 0;
		//消息ID
		mqtt_user_msg->packetid = msg->id;
		//标明消息合法
		mqtt_user_msg->valid = 1;		
}


/************************************************************************
** 函数名称: mqtt_pktype_ctl						
** 函数功能: 根据包类型进行处理
** 入口参数: uint8_t packtype:包类型
** 出口参数: 无
** 备    注: 
************************************************************************/
void mqtt_pktype_ctl(uint8_t packtype,uint8_t *buf,uint32_t buflen)
{
	  MQTTMessage msg;
		int32_t rc;
	  MQTTString receivedTopic;
	  uint32_t len;
		switch(packtype)
		{
			case PUBLISH:
        //拆析PUBLISH消息
        if(MQTTDeserialize_publish(&msg.dup,(int*)&msg.qos, &msg.retained, &msg.id, &receivedTopic,
          (unsigned char **)&msg.payload, &msg.payloadlen, buf, buflen) != 1)
            return;	
        //接受消息
        deliverMessage(&receivedTopic,&msg,&mqtt_user_msg);
        
        //消息质量不同,处理不同
        if(msg.qos == QOS0)
        {
           //QOS0-不需要ACK
           //直接处理数据
           UserMsgCtl(&mqtt_user_msg);
           return;
        }
        //发送PUBACK消息
        if(msg.qos == QOS1)
        {
            len =MQTTSerialize_puback(buf,buflen,mqtt_user_msg.packetid);
            if(len == 0)
              return;
            //发送返回
            if(transport_sendPacketBuffer(buf,len)<0)
               return;	
            //返回后处理消息
            UserMsgCtl(&mqtt_user_msg); 
            return;												
        }

        //对于质量2,只需要发送PUBREC就可以了
        if(msg.qos == QOS2)
        {
           len = MQTTSerialize_ack(buf, buflen, PUBREC, 0, mqtt_user_msg.packetid);			                
           if(len == 0)
             return;
           //发送返回
           transport_sendPacketBuffer(buf,len);	
        }		
        break;
			case  PUBREL:				           
        //解析包数据,必须包ID相同才可以
        rc = MQTTDeserialize_ack(&msg.type,&msg.dup, &msg.id, buf,buflen);
        if((rc != 1)||(msg.type != PUBREL)||(msg.id != mqtt_user_msg.packetid))
          return ;
        //收到PUBREL,需要处理并抛弃数据
        if(mqtt_user_msg.valid == 1)
        {
           //返回后处理消息
           UserMsgCtl(&mqtt_user_msg);
        }      
        //串行化PUBCMP消息
        len = MQTTSerialize_pubcomp(buf,buflen,msg.id);	                   	
        if(len == 0)
          return;									
        //发送返回--PUBCOMP
        transport_sendPacketBuffer(buf,len);										
        break;
			case   PUBACK://等级1客户端推送数据后,服务器返回
				break;
			case   PUBREC://等级2客户端推送数据后,服务器返回
				break;
			case   PUBCOMP://等级2客户端推送PUBREL后,服务器返回
        break;
			default:
				break;
		}
}

/************************************************************************
** 函数名称: WaitForPacket					
** 函数功能: 等待特定的数据包
** 入口参数: int32_t sock:网络描述符
**           uint8_t packettype:包类型
**           uint8_t times:等待次数
** 出口参数: >=0:等到了特定的包 <0:没有等到特定的包
** 备    注: 
************************************************************************/
int32_t WaitForPacket(int32_t sock,uint8_t packettype,uint8_t times)
{
	  int32_t type;
		uint8_t buf[MSG_MAX_LEN];
	  uint8_t n = 0;
		int32_t buflen = sizeof(buf);
		do
		{
				//读取数据包
				type = ReadPacketTimeout(sock,buf,buflen,2);
			  if(type != -1)
					mqtt_pktype_ctl(type,buf,buflen);
				n++;
		}while((type != packettype)&&(n < times));
		//收到期望的包
		if(type == packettype)
			 return 0;
		else 
			 return -1;		
}



void Client_Connect(void)
{
    char* host_ip;
  
#ifdef  LWIP_DNS
    ip4_addr_t dns_ip;
    netconn_gethostbyname(HOST_NAME, &dns_ip);
    host_ip = ip_ntoa(&dns_ip);
    printf("host name : %s , host_ip : %s\n",HOST_NAME,host_ip);
#else
    host_ip = HOST_NAME;
#endif  
MQTT_START: 
  
		//创建网络连接
		printf("1.开始连接对应云平台的服务器...\n");
    printf("服务器IP地址:%s,端口号:%0d!\n",host_ip,HOST_PORT);
		while(1)
		{
				//连接服务器
				MQTT_Socket = transport_open((int8_t*)host_ip,HOST_PORT);
				//如果连接服务器成功
				if(MQTT_Socket >= 0)
				{
						printf("连接云平台服务器成功!\n");
						break;
				}
				printf("连接云平台服务器失败,等待3秒再尝试重新连接!\n");
				//等待3秒
				tos_task_delay(3000);
		}
    
    printf("2.MQTT用户名与密钥验证登录...\n");
    //MQTT用户名与密钥验证登录
    if(MQTT_Connect() != Connect_OK)
    {
         //重连服务器
         printf("MQTT用户名与密钥验证登录失败...\n");
          //关闭链接
         transport_close();
         goto MQTT_START;	 
    }
    
		//订阅消息
		printf("3.开始订阅消息...\n");
    //订阅消息
    if(MQTTSubscribe(MQTT_Socket,(char *)SUB_TOPIC,QOS1) < 0)
    {
         //重连服务器
         printf("客户端订阅消息失败...\n");
          //关闭链接
         transport_close();
         goto MQTT_START;	   
    }	

		//无限循环
		printf("4.开始循环接收订阅的消息...\n");

}





/************************************************************************
** 函数名称: mqtt_recv_thread								
** 函数功能: MQTT任务
** 入口参数: void *pvParameters:任务参数
** 出口参数: 无
** 备    注: MQTT连云步骤:
**           1.连接对应云平台的服务器
**           2.MQTT用户与密钥验证登录
**           3.订阅指定主题
**           4.等待接收主题的数据与上报主题数据
************************************************************************/
void mqtt_recv_thread(void *pvParameters)
{
	  uint32_t curtick;
		uint8_t no_mqtt_msg_exchange = 1;
		uint8_t buf[MSG_MAX_LEN];
		int32_t buflen = sizeof(buf);
    int32_t type;
    fd_set readfd;
	  struct timeval tv;      //等待时间
	  tv.tv_sec = 0;
	  tv.tv_usec = 10;

  
MQTT_START: 
    //开始连接
    Client_Connect();
    //获取当前滴答,作为心跳包起始时间
		curtick = sys_now();
		while(1)
		{
				//表明无数据交换
				no_mqtt_msg_exchange = 1;
			
				FD_ZERO(&readfd);
				FD_SET(MQTT_Socket,&readfd);						  

				//等待可读事件
				select(MQTT_Socket+1,&readfd,NULL,NULL,&tv);
				
				//判断MQTT服务器是否有数据
				if(FD_ISSET(MQTT_Socket,&readfd) != 0)
				{
						//读取数据包--注意这里参数为0,不阻塞
						type = ReadPacketTimeout(MQTT_Socket,buf,buflen,0);
						if(type != -1)
						{
								mqtt_pktype_ctl(type,buf,buflen);
								//表明有数据交换
								no_mqtt_msg_exchange = 0;
								//获取当前滴答,作为心跳包起始时间
								curtick = sys_now();
						}
				}
        
        //这里主要目的是定时向服务器发送PING保活命令
        if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
        {
            curtick = sys_now();
            //判断是否有数据交换
            if(no_mqtt_msg_exchange == 0)
            {
               //如果有数据交换,这次就不需要发送PING消息
               continue;
            }
            
            if(MQTT_PingReq(MQTT_Socket) < 0)
            {
               //重连服务器
               printf("发送保持活性ping失败....\n");
               goto CLOSE;	 
            }
            
            //心跳成功
            printf("发送保持活性ping作为心跳成功....\n");
            //表明有数据交换
            no_mqtt_msg_exchange = 0;
        }   
		}

CLOSE:
	 //关闭链接
	 transport_close();
	 //重新链接服务器
	 goto MQTT_START;	
}

void mqtt_send_thread(void *pvParameters)
{
    printf("mqtt_send_thread\n");
    int32_t ret;
    uint8_t no_mqtt_msg_exchange = 1;
    uint32_t curtick;
    uint8_t res;
    /* 定义一个创建信息返回值,默认为pdTRUE */
    k_err_t err;
    /* 定义一个接收消息的变量 */
    size_t msg_size;
    DHT11_Data_TypeDef* recv_data;
    //初始化json数据
    cJSON* cJSON_Data = NULL;
    cJSON_Data = cJSON_Data_Init();
    double a,b,c;
MQTT_SEND_START:
  
    while(1)
    {
        
       err = tos_queue_pend(&MQTT_Data_Queue,    
                         (void*)&recv_data,
                         &msg_size,
                         3000); 
//       recv_data = r_data;
       if(err == K_ERR_NONE)
       {
        a = recv_data->temperature;
        b = recv_data->humidity;
        c = sys_now();
        printf("a = %f,b = %f\n",a,b);
        //更新数据      
        res = cJSON_Update(cJSON_Data,TIME_NUM,&c);
        res = cJSON_Update(cJSON_Data,TEMP_NUM,&a);
        res = cJSON_Update(cJSON_Data,HUM_NUM,&b);
        
        if(UPDATE_SUCCESS == res)
        {
            //更新数据成功,
            char* p = cJSON_Print(cJSON_Data);
            //发布消息
            ret = MQTTMsgPublish(MQTT_Socket,(char*)PUB_TOPIC,QOS0,(uint8_t*)p);
            if(ret >= 0)
            {
                //表明有数据交换
                no_mqtt_msg_exchange = 0;
                //获取当前滴答,作为心跳包起始时间
                curtick = sys_now();				
            }
            tos_mmheap_free(p);
            p = NULL;
        }
        else
          printf("update fail\n");
      }
      //这里主要目的是定时向服务器发送PING保活命令
      if((sys_now() - curtick) >(KEEPLIVE_TIME/2*1000))
      {
          curtick = sys_now();
          //判断是否有数据交换
          if(no_mqtt_msg_exchange == 0)
          {
             //如果有数据交换,这次就不需要发送PING消息
             continue;
          }
          
          if(MQTT_PingReq(MQTT_Socket) < 0)
          {
             //重连服务器
             printf("发送保持活性ping失败....\n");
             goto MQTT_SEND_CLOSE;	 
          }
          
          //心跳成功
          printf("发送保持活性ping作为心跳成功....\n");
          //表明有数据交换
          no_mqtt_msg_exchange = 0;
      } 
  }
MQTT_SEND_CLOSE:
	 //关闭链接
	 transport_close(); 
   //开始连接
   Client_Connect();
   goto MQTT_SEND_START;
}

void
mqtt_thread_init(void)
{
  sys_thread_new("mqtt_recv_thread", mqtt_recv_thread, NULL, 2048*4, 4);
  sys_thread_new("mqtt_send_thread", mqtt_send_thread, NULL, 2048*4, 3);
}

此外还自己基于腾讯云主机部署mqtt服务器

使用TencentOS连接成功并完成通讯

image.png
image.png

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 首先放视频
  • 初次连接到腾讯云物联网开发平台(IoT Explorer)
  • 开发自己的demo
    • 完成情况:
    • 使用TencentOS tiny移植lwip
      • 移植的接口文件
        • iperf测速
        • 移植emxgui
        • 使用iot hub平台
          • 设备在线情况:
            • 使用规则引擎:
              • 开发云平台签名小工具:
                • MQTT.fx连接
                  • stm32f429使用mqtt连接iot hub平台
                  • 此外还自己基于腾讯云主机部署mqtt服务器
                  相关产品与服务
                  物联网开发平台
                  腾讯云物联网开发平台(IoT Explorer)是面向智慧生活与产业物联应用的一站式物联网PaaS平台,为各行业用户提供一站式设备智能化服务。平台提供海量设备连接与消息通信能力,基于腾讯连连小程序应用开发能力、音视频能力及AI增值服务,聚合腾讯生态内容能力。提升传统行业设备智能化的效率,降低用户的开发运维成本。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档