MQTT全名为Message Queuing Telemetry Transport,是一种基于TCP/IP协议上传输的轻量级通信协议。MQTT协议是一种消息队列传输协议,采用订阅、发布机制,订阅者只接受自己已经订阅的数据,非订阅数据则不接受,既保证了必要的数据交换,又避免了无效数据造成的存储与处理。因此在工业物联网中得到广泛的应用。
MQTT协议是物联网平台的最通用协议之一,也是OneNET平台的首要设备接入协议。物联网平台必须海量设备接入,但MQTT接入服务究竟能同时支持多少设备同时在线呢?了解这个指标能更好地为平台的运维和运营提供科学的依据。
可是,如何快速简便地测试最大在线量指标呢?如何选取工具和制作脚本呢?
测试性能我们首先想到的是常用的Jmeter和Locust等性能测试工具。但是这些工具的优势在测试服务的并发和吞吐量,并不适合当前的测试场景。
然后能想到的是利用第三方Jar包或者三方库实现的协议库,采用多线程启动设备。但是压力机线程启动有限,对动则支持几十万上百万设备接入量的服务简直就是杯水车薪,需要多少压力机难以估量。
再次能想到的是Select方法批量管理设备的Socket连接。问题又出现了,Select管理的异步IO也是有极限的,此方法最终还是放弃。
经过前面的分析、实践最终方法确定,采用异步IO的方式批量模拟设备连接服务器,按照一定的频率上报注册报文,不断遍历设备Socket接收的缓存数据,解析服务消息来判断设备是否连接成功,并通过周期性上报心跳来保持设备持续在线。实现细节如下:
(1)实现基础设备类:封装部分MQTT协议报文方法,其中包括设备注册、订阅、发布、心跳等。
MQTT注册报文封装示例(Java)
(2)实现设备类:主要记录设备注册状态、订阅状态、保活间隔,最重要的服务消息的解析和响应方法,以及设备连接服务器的非阻塞Socket(Java中的SocketChanel)
服务消息解析代码示例(Java)
(3)实现程序主体类:管理批量设备,控制设备注册频率,设备何时上报数据、监听服务下发数据,统计设备连接数,持续上报心跳,保持设备在线等。具体实现逻辑如下:
批量初始化设备列表;
同时启动一下三个线程;
启动设备注册线程,初始化设备与服务连接并上行注册报文,可根据设置,指定当前可同时注册的设备数,所有设备注册完成后自动退出;
启动连接统计线程,周期性统计设备连接成功个数、订阅成功个数、连接失败设备等数据;
启动设备Socket遍历线程,持续轮询每个Socket的接收数据,对接收到的数据处理和响应,每轮遍历完毕对需要对未长时间未上报心跳的设备上行心跳报文,以达到设备保活的目的。
程序主题类接收线程逻辑代码片段(Java)
完成程序代码后,测试工具制作完成。
此工具已实际用于项目性能测试中,可将压力机全部可用端口用于最大设备在线量的测试中,实际在Linux虚拟机中几分钟内完成50000+设备注册,并保持设备长时间持续在线。
此工具能最大限度利用压力机端口资源,提升测试执行效率,对于在线量较大的服务,只需要在更多的压力机上运行此工具即可。