MqttProvider.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package com.lstznkj.groundstation.mqtt;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.lstznkj.groundstation.GlobalData;
  5. import com.lstznkj.groundstation.model.crane.BaseData;
  6. import com.lstznkj.groundstation.model.crane.DataSource;
  7. import com.lstznkj.groundstation.service.crane.BaseDataService;
  8. import com.lstznkj.groundstation.utils.SpringContextUtil;
  9. import org.eclipse.paho.client.mqttv3.*;
  10. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.context.annotation.Configuration;
  14. import java.util.List;
  15. @Configuration
  16. public class MqttProvider {
  17. private Logger logger= LoggerFactory.getLogger(getClass());
  18. private String username;
  19. private boolean isConnected;
  20. private MqttConnectOptions options;
  21. private String password;
  22. private String hostUrl;
  23. private String clientId;
  24. private String defaultTopic;
  25. private MqttProviderCallBack callBack;
  26. List<DataSource> dataSourceList;
  27. /**
  28. * 客户端对象
  29. */
  30. private MqttClient client;
  31. /**
  32. * 在bean初始化后连接到服务器
  33. */
  34. // @PostConstruct
  35. @Deprecated
  36. public void init(){
  37. logger.info("mqqtt init ");
  38. // System.out.println("mqtt init");
  39. connect();
  40. MqttFactory.setInstance(client);
  41. }
  42. public void connect(MqttConnectOptions options){
  43. try {
  44. client.setCallback(callBack);
  45. client.connect(options);
  46. }
  47. catch (Exception ex){
  48. logger.error(clientId+ "connected error:"+ex.getMessage());
  49. }
  50. }
  51. /***
  52. * 订阅所有数据源主题
  53. */
  54. public void subscribeDataSource(){
  55. for(DataSource dataSource:this.dataSourceList){
  56. try {
  57. this.client.subscribe(dataSource.getPublishTopic(), new IMqttMessageListener() {
  58. @Override
  59. public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
  60. Thread tmpThread=new Thread(new Runnable() {
  61. @Override
  62. public void run() {
  63. try {
  64. BaseDataService baseDataService= SpringContextUtil.getBean(BaseDataService.class);
  65. //差异原始数据(为数据库最后一条记录),当差异数据累计到指定值时 存储到数据库,
  66. //private BaseData originDifferData=baseDataService.findTheLast(dataSource);
  67. double differTotalValue= dataSource.getDifferTotalValue();
  68. int sparseCount= dataSource.getTmpSparseCount();
  69. //private BaseData lastData;
  70. ObjectMapper objectMapper=new ObjectMapper();
  71. logger.info("originDifferData:"+objectMapper.writeValueAsString(dataSource.getLastBaseData()));
  72. BaseData baseData=new BaseData();
  73. JsonNode mqttNode=objectMapper.readTree(mqttMessage.toString());
  74. baseData.setName(mqttNode.get("name").asText());
  75. baseData.setValue(mqttNode.get("value").asDouble());
  76. baseData.setOriginValue(mqttNode.get("originValue").asInt());
  77. baseData.setCollectTime(mqttNode.get("createTime").asLong());
  78. // double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue());
  79. double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue());
  80. differTotalValue=dataSource.getDifferTotalValue()+differValue;
  81. dataSource.setDifferTotalValue(differTotalValue);
  82. logger.info("differTotalValue:"+dataSource.getDifferTotalValue()+";sparseValue:"+dataSource.getDifferSparseValue());
  83. if(dataSource.getDifferTotalValue()>=dataSource.getTotalSparseValue()||sparseCount<=0||differValue>=dataSource.getDifferSparseValue()){
  84. baseData=baseDataService.add(baseData).getData();
  85. baseData.setDataSource(dataSource);
  86. baseData=baseDataService.add(baseData).getData();
  87. dataSource.setLastBaseData(baseData);
  88. dataSource.setDifferTotalValue(0);
  89. logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ "baseData is saved:"+objectMapper.writeValueAsString(baseData));
  90. //reset tmpSparseCount
  91. dataSource.setTmpSparseCount(dataSource.getSparseCount());
  92. }
  93. else {
  94. sparseCount--;
  95. dataSource.setTmpSparseCount(sparseCount);
  96. logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ ",baseData is sparsed,sparseCount:"+sparseCount+";"+objectMapper.writeValueAsString(baseData));
  97. }
  98. }
  99. catch (Exception ex){
  100. logger.error("base data save to database error :"+ex.getMessage());
  101. ex.printStackTrace();
  102. }
  103. }
  104. });
  105. GlobalData.DataSaveThreadPool.submit(tmpThread);
  106. logger.info("topic:"+s+": "+mqttMessage.toString());
  107. }
  108. });
  109. }
  110. catch (Exception ex){
  111. logger.error("errors in subsribeDatasource "+clientId+":"+ex.getMessage());
  112. }
  113. }
  114. }
  115. /**
  116. * 客户端连接服务端
  117. */
  118. @Deprecated
  119. public void connect(){
  120. try{
  121. //创建MQTT客户端对象
  122. client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
  123. //连接设置
  124. MqttConnectOptions options = new MqttConnectOptions();
  125. //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
  126. //设置为true表示每次连接服务器都是以新的身份
  127. options.setCleanSession(true);
  128. //设置连接用户名
  129. options.setUserName(username);
  130. //设置连接密码
  131. options.setPassword(password.toCharArray());
  132. //设置超时时间,单位为秒
  133. options.setConnectionTimeout(100);
  134. //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
  135. options.setKeepAliveInterval(20);
  136. //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
  137. options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
  138. //设置回调
  139. client.setCallback(new MqttProviderCallBack());
  140. client.connect(options);
  141. } catch(MqttException e){
  142. e.printStackTrace();
  143. }
  144. }
  145. /***
  146. *
  147. * QoS 0 至多一次
  148. * QoS 1 最少一次
  149. * QoS 1 承诺消息将至少传送一次给订阅者。
  150. * QoS 2 只有一次
  151. * 使用 QoS 2,我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,
  152. * 首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。
  153. * 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
  154. * @param qos
  155. *
  156. * @param retained
  157. * @param topic
  158. * @param message
  159. */
  160. public void publish(int qos,boolean retained,String topic,String message){
  161. MqttMessage mqttMessage = new MqttMessage();
  162. mqttMessage.setQos(qos);
  163. mqttMessage.setRetained(retained);
  164. mqttMessage.setPayload(message.getBytes());
  165. //主题的目的地,用于发布/订阅信息
  166. MqttTopic mqttTopic = client.getTopic(topic);
  167. //提供一种机制来跟踪消息的传递进度
  168. //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
  169. MqttDeliveryToken token;
  170. try {
  171. //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
  172. //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
  173. token = mqttTopic.publish(mqttMessage);
  174. token.waitForCompletion();
  175. } catch (MqttException e) {
  176. e.printStackTrace();
  177. }
  178. }
  179. public void setClientId(String clientId) {
  180. this.clientId = clientId;
  181. }
  182. public String getUsername() {
  183. return username;
  184. }
  185. public void setUsername(String username) {
  186. this.username = username;
  187. }
  188. public String getPassword() {
  189. return password;
  190. }
  191. public void setPassword(String password) {
  192. this.password = password;
  193. }
  194. public String getHostUrl() {
  195. return hostUrl;
  196. }
  197. public void setHostUrl(String hostUrl) {
  198. this.hostUrl = hostUrl;
  199. }
  200. public String getClientId() {
  201. return clientId;
  202. }
  203. public boolean isConnected() {
  204. return client.isConnected();
  205. }
  206. public void setConnected(boolean connected) {
  207. isConnected = connected;
  208. }
  209. public MqttClient getClient() {
  210. return client;
  211. }
  212. public void setClient(MqttClient client) {
  213. this.client = client;
  214. }
  215. public MqttProviderCallBack getCallBack() {
  216. return callBack;
  217. }
  218. public void setCallBack(MqttProviderCallBack callBack) {
  219. client.setCallback(callBack);
  220. this.callBack = callBack;
  221. }
  222. public MqttConnectOptions getOptions() {
  223. return options;
  224. }
  225. public void setOptions(MqttConnectOptions options) {
  226. this.options = options;
  227. }
  228. public List<DataSource> getDataSourceList() {
  229. return dataSourceList;
  230. }
  231. public void setDataSourceList(List<DataSource> dataSourceList) {
  232. this.dataSourceList = dataSourceList;
  233. }
  234. }