package com.lstznkj.groundstation.mqtt; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.lstznkj.groundstation.GlobalData; import com.lstznkj.groundstation.model.crane.BaseData; import com.lstznkj.groundstation.model.crane.DataSource; import com.lstznkj.groundstation.service.crane.BaseDataService; import com.lstznkj.groundstation.utils.SpringContextUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import java.util.List; @Configuration public class MqttProvider { private Logger logger= LoggerFactory.getLogger(getClass()); private String username; private boolean isConnected; private MqttConnectOptions options; private String password; private String hostUrl; private String clientId; private String defaultTopic; private MqttProviderCallBack callBack; List dataSourceList; /** * 客户端对象 */ private MqttClient client; /** * 在bean初始化后连接到服务器 */ // @PostConstruct @Deprecated public void init(){ logger.info("mqqtt init "); // System.out.println("mqtt init"); connect(); MqttFactory.setInstance(client); } public void connect(MqttConnectOptions options){ try { client.setCallback(callBack); client.connect(options); } catch (Exception ex){ logger.error(clientId+ "connected error:"+ex.getMessage()); } } /*** * 订阅所有数据源主题 */ public void subscribeDataSource(){ for(DataSource dataSource:this.dataSourceList){ try { this.client.subscribe(dataSource.getPublishTopic(), new IMqttMessageListener() { @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { Thread tmpThread=new Thread(new Runnable() { @Override public void run() { try { BaseDataService baseDataService= SpringContextUtil.getBean(BaseDataService.class); //差异原始数据(为数据库最后一条记录),当差异数据累计到指定值时 存储到数据库, //private BaseData originDifferData=baseDataService.findTheLast(dataSource); double differTotalValue= dataSource.getDifferTotalValue(); int sparseCount= dataSource.getTmpSparseCount(); //private BaseData lastData; ObjectMapper objectMapper=new ObjectMapper(); logger.info("originDifferData:"+objectMapper.writeValueAsString(dataSource.getLastBaseData())); BaseData baseData=new BaseData(); JsonNode mqttNode=objectMapper.readTree(mqttMessage.toString()); baseData.setName(mqttNode.get("name").asText()); baseData.setValue(mqttNode.get("value").asDouble()); baseData.setOriginValue(mqttNode.get("originValue").asInt()); baseData.setCollectTime(mqttNode.get("createTime").asLong()); // double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue()); double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue()); differTotalValue=dataSource.getDifferTotalValue()+differValue; dataSource.setDifferTotalValue(differTotalValue); logger.info("differTotalValue:"+dataSource.getDifferTotalValue()+";sparseValue:"+dataSource.getDifferSparseValue()); if(dataSource.getDifferTotalValue()>=dataSource.getTotalSparseValue()||sparseCount<=0||differValue>=dataSource.getDifferSparseValue()){ baseData=baseDataService.add(baseData).getData(); baseData.setDataSource(dataSource); baseData=baseDataService.add(baseData).getData(); dataSource.setLastBaseData(baseData); dataSource.setDifferTotalValue(0); logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ "baseData is saved:"+objectMapper.writeValueAsString(baseData)); //reset tmpSparseCount dataSource.setTmpSparseCount(dataSource.getSparseCount()); } else { sparseCount--; dataSource.setTmpSparseCount(sparseCount); logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ ",baseData is sparsed,sparseCount:"+sparseCount+";"+objectMapper.writeValueAsString(baseData)); } } catch (Exception ex){ logger.error("base data save to database error :"+ex.getMessage()); ex.printStackTrace(); } } }); GlobalData.DataSaveThreadPool.submit(tmpThread); logger.info("topic:"+s+": "+mqttMessage.toString()); } }); } catch (Exception ex){ logger.error("errors in subsribeDatasource "+clientId+":"+ex.getMessage()); } } } /** * 客户端连接服务端 */ @Deprecated public void connect(){ try{ //创建MQTT客户端对象 client = new MqttClient(hostUrl,clientId,new MemoryPersistence()); //连接设置 MqttConnectOptions options = new MqttConnectOptions(); //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 //设置为true表示每次连接服务器都是以新的身份 options.setCleanSession(true); //设置连接用户名 options.setUserName(username); //设置连接密码 options.setPassword(password.toCharArray()); //设置超时时间,单位为秒 options.setConnectionTimeout(100); //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 options.setKeepAliveInterval(20); //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false); //设置回调 client.setCallback(new MqttProviderCallBack()); client.connect(options); } catch(MqttException e){ e.printStackTrace(); } } /*** * * QoS 0 至多一次 * QoS 1 最少一次 * QoS 1 承诺消息将至少传送一次给订阅者。 * QoS 2 只有一次 * 使用 QoS 2,我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次, * 首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。 * 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1 * @param qos * * @param retained * @param topic * @param message */ public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主题的目的地,用于发布/订阅信息 MqttTopic mqttTopic = client.getTopic(topic); //提供一种机制来跟踪消息的传递进度 //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 MqttDeliveryToken token; try { //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } public void setClientId(String clientId) { this.clientId = clientId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getHostUrl() { return hostUrl; } public void setHostUrl(String hostUrl) { this.hostUrl = hostUrl; } public String getClientId() { return clientId; } public boolean isConnected() { return client.isConnected(); } public void setConnected(boolean connected) { isConnected = connected; } public MqttClient getClient() { return client; } public void setClient(MqttClient client) { this.client = client; } public MqttProviderCallBack getCallBack() { return callBack; } public void setCallBack(MqttProviderCallBack callBack) { client.setCallback(callBack); this.callBack = callBack; } public MqttConnectOptions getOptions() { return options; } public void setOptions(MqttConnectOptions options) { this.options = options; } public List getDataSourceList() { return dataSourceList; } public void setDataSourceList(List dataSourceList) { this.dataSourceList = dataSourceList; } }