-
Notifications
You must be signed in to change notification settings - Fork 7
高并发下,(IoT)异步(如queue、mqtt)转同步(如http)的java代码实现
Thingsboard微服务分布式下-设备控制的数据流-架构与可用性分析(有架构图)
如上面文章中的该图部分(左边用户,中间服务器,右边是队列用于设备mqtt长连接消息的发布和订阅):
1,http是请求响应模式的短连接,图左边的用户通过http请求控制设备的服务器接口,并阻塞直到返回设备控制是否成功的状态(或超时),该部分是同步的
2,右边是队列,是异步的。或者我们从设备角度看,设备用发布订阅模式的mqtt长连接协议连接服务器,和服务器的通讯是异步的
3,所以服务器做了一个异步转同步的功能,那么该功能用Java如何实现?
比如设备响应的消息服务器收到后就存到分布式存储redis中,服务器发送控制命令后就每2秒轮训查询redis一次,10秒超时 但轮训是有延迟的(即设备响应后有可能2秒后才响应给用户),本文就不建议了
该场景其实就是需要实现一个阻塞等待的功能,可以用java的锁实现(wait() and notify() Methods in Java),而锁的对象就是标志这一个请求响应过程的id,比如 (deviceId + transactionId).intern() 作为锁。下面拿控制设备开关(如开灯)示例:
1,服务器收到打开deviceId的灯的http请求后,为该次请求生产了一个消息id transactionId,然后带着该transactionId通过mqtt给灯发送控制命令,然后用(deviceId + transactionId).intern()作为锁将线程阻塞wait(timeout超时时间)
2,deviceId的灯收到transactionId的控制消息后,完成开灯,然后通过mqtt发布带有该transactionId的控制成果的消息
3,服务器收到deviceId的灯通过mqtt异步返回的带有transactionId的消息后,用一个Map作为临时存储设备响应消息的容器(key就是(deviceId + transactionId).intern(), value就是设备响应的消息),然后该线程拿到(deviceId + transactionId).intern() 的锁,然后notifyAll()后该线程就结束了。
4,然后第1步中的线程就被notify,从Map中get key为(deviceId + transactionId).intern()的值就是设备响应的消息,然后该线程返回http请求,结束,通过线程间通讯完成了异步转同步
本文标题是“高并发下”,也就是如果同时有1000个请求控制设备,而每次控制需要10秒(物联网的网络通讯比传统软件的查数据库慢太多了)才能响应,那么并发就是1000,但tps只有1000/10=100。
如果方案一达到该并发量,就需要起1000个线程同时阻塞,而java的线程是非常耗性能的,如果按一个线程1M算,1000个线程内存就需要1000M多,光内存就吃不消,再考虑线程切换(Java会映射到linux内核线程,而某些语言实现了用户态层的“多线程”或者就是单线程语言(不用内核态和用户态切换的性能开销))的性能消耗等就更不行了。
所以本次推荐的方案就是利用servlet3.0的异步去实现,也就是上面方案一的第一步完成时,线程就会被回收到线程池,而不会阻塞。从而在高并发且高IO的场景下,使用少量线程处理大量请求,极大提高单机并发能力。
这里我们拿Thingsboard的源码示例:
http的path是 /twoway/{deviceId},twoway代表要返回设备的响应,deviceId为要控制的设备
这里用UUID.randomUUID()为该次请求生成了id,可以用作为我们上面说的transactionId
這個方法中return response后就释放线程了(重点,return response是异步的,即没有等待设备响应消息后才return。下面的sendRpcRequestToRuleEngine(把消息发到队列就完了(很快),不会等设备响应)和scheduleToRuleEngineTimeout(设置一个定时,不sleep,不占用线程资源,不阻塞线程)均为异步操作)
注意fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus)的result,下文设备回复时会用到
sendRpcRequestToRuleEngine 将控制设备的消息发到消息队列(微服务下用kafka作为消息队列)
scheduleToRuleEngineTimeout 为该次控制设置一个超时时间,比如如果10秒内设备没有响应,http就返回控制超时(定时任务也不多占用线程,只会在触发执行时才会占用线程资源)
================================================================================
上面過程完成了向设备发送消息
当设备异步回复消息后,由该方法接收到消息(消息来自消息队列),从localToRuleEngineRpcRequests的ConcurrentMap<UUID, Consumer>中获取consumer(fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus))并accept,最终通过reply方法中的DeferredResult.setResult设置servlet返回结果,http进行结果响应
备注:DeferredResult(重点),Google Futures工具类,java8 函数式编程,Java锁 的知识在本文并未详细讲解,大家可以通过百度有很多其他文章进行学习,以及看thingsboard该部分源码
上面的方案二大大提高了单机并发能力,单机总有极限,还得支持集群横向扩展
这里的实现设计方案也很多,比如不同节点通过不同的id,将消息进行路由(通过queue的topic或partition分区进行消息路由等,做到一个节点,只接收和处理自己需要的设备数据),比如自适应hash算法(消息队列自己的集群扩展都利用了该算法)
比如有1万个设备,5个节点,每分钟1万条数据:如按设备区分(节点可以有设备状态),那做到每个节点只处理2000个设备,或者按消息区分(节点要做到无设备状态,否则一个节点内存存1万个设备的状态,就要内存爆炸的问题了),那做到每个节点每分钟只处理2000条数据
最后,异步转同步不只IoT有这样的场景,本文对Java的实现做了实现思路的分析,大家也可以考虑其他语言的方案,后续有时间可能会对该文章进行优化,感谢star支持