|
- package com.lstznkj.groundstation.mqtt;
- import com.fasterxml.jackson.databind.JsonNode;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.lstznkj.groundstation.GlobalData;
- import com.lstznkj.groundstation.model.crane.BaseData;
- import com.lstznkj.groundstation.model.crane.DataSource;
- import com.lstznkj.groundstation.service.crane.BaseDataService;
- import com.lstznkj.groundstation.utils.SpringContextUtil;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.annotation.Configuration;
- import java.util.List;
- @Configuration
- public class MqttProvider {
- private Logger logger= LoggerFactory.getLogger(getClass());
- private String username;
- private boolean isConnected;
- private MqttConnectOptions options;
- private String password;
- private String hostUrl;
- private String clientId;
- private String defaultTopic;
- private MqttProviderCallBack callBack;
- List<DataSource> dataSourceList;
- /**
- * 客户端对象
- */
- private MqttClient client;
- /**
- * 在bean初始化后连接到服务器
- */
- // @PostConstruct
- @Deprecated
- public void init(){
- logger.info("mqqtt init ");
- // System.out.println("mqtt init");
- connect();
- MqttFactory.setInstance(client);
- }
- public void connect(MqttConnectOptions options){
- try {
- client.setCallback(callBack);
- client.connect(options);
- }
- catch (Exception ex){
- logger.error(clientId+ "connected error:"+ex.getMessage());
- }
- }
- /***
- * 订阅所有数据源主题
- */
- public void subscribeDataSource(){
- for(DataSource dataSource:this.dataSourceList){
- try {
- this.client.subscribe(dataSource.getPublishTopic(), new IMqttMessageListener() {
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
- Thread tmpThread=new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- BaseDataService baseDataService= SpringContextUtil.getBean(BaseDataService.class);
- //差异原始数据(为数据库最后一条记录),当差异数据累计到指定值时 存储到数据库,
- //private BaseData originDifferData=baseDataService.findTheLast(dataSource);
- double differTotalValue= dataSource.getDifferTotalValue();
- int sparseCount= dataSource.getTmpSparseCount();
- //private BaseData lastData;
- ObjectMapper objectMapper=new ObjectMapper();
- logger.info("originDifferData:"+objectMapper.writeValueAsString(dataSource.getLastBaseData()));
- BaseData baseData=new BaseData();
- JsonNode mqttNode=objectMapper.readTree(mqttMessage.toString());
- baseData.setName(mqttNode.get("name").asText());
- baseData.setValue(mqttNode.get("value").asDouble());
- baseData.setOriginValue(mqttNode.get("originValue").asInt());
- baseData.setCollectTime(mqttNode.get("createTime").asLong());
- // double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue());
- double differValue=Math.abs(baseData.getValue()-dataSource.getLastBaseData().getValue());
- differTotalValue=dataSource.getDifferTotalValue()+differValue;
- dataSource.setDifferTotalValue(differTotalValue);
- logger.info("differTotalValue:"+dataSource.getDifferTotalValue()+";sparseValue:"+dataSource.getDifferSparseValue());
- if(dataSource.getDifferTotalValue()>=dataSource.getTotalSparseValue()||sparseCount<=0||differValue>=dataSource.getDifferSparseValue()){
- baseData=baseDataService.add(baseData).getData();
- baseData.setDataSource(dataSource);
- baseData=baseDataService.add(baseData).getData();
- dataSource.setLastBaseData(baseData);
- dataSource.setDifferTotalValue(0);
- logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ "baseData is saved:"+objectMapper.writeValueAsString(baseData));
- //reset tmpSparseCount
- dataSource.setTmpSparseCount(dataSource.getSparseCount());
- }
- else {
- sparseCount--;
- dataSource.setTmpSparseCount(sparseCount);
- logger.info("dataSource:"+dataSource.getName()+","+dataSource.getId()+ ",baseData is sparsed,sparseCount:"+sparseCount+";"+objectMapper.writeValueAsString(baseData));
- }
- }
- catch (Exception ex){
- logger.error("base data save to database error :"+ex.getMessage());
- ex.printStackTrace();
- }
- }
- });
- GlobalData.DataSaveThreadPool.submit(tmpThread);
- logger.info("topic:"+s+": "+mqttMessage.toString());
- }
- });
- }
- catch (Exception ex){
- logger.error("errors in subsribeDatasource "+clientId+":"+ex.getMessage());
- }
- }
- }
- /**
- * 客户端连接服务端
- */
- @Deprecated
- public void connect(){
- try{
- //创建MQTT客户端对象
- client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
- //连接设置
- MqttConnectOptions options = new MqttConnectOptions();
- //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
- //设置为true表示每次连接服务器都是以新的身份
- options.setCleanSession(true);
- //设置连接用户名
- options.setUserName(username);
- //设置连接密码
- options.setPassword(password.toCharArray());
- //设置超时时间,单位为秒
- options.setConnectionTimeout(100);
- //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
- options.setKeepAliveInterval(20);
- //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
- options.setWill("willTopic",(clientId + "与服务器断开连接").getBytes(),0,false);
- //设置回调
- client.setCallback(new MqttProviderCallBack());
- client.connect(options);
- } catch(MqttException e){
- e.printStackTrace();
- }
- }
- /***
- *
- * QoS 0 至多一次
- * QoS 1 最少一次
- * QoS 1 承诺消息将至少传送一次给订阅者。
- * QoS 2 只有一次
- * 使用 QoS 2,我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,
- * 首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。
- * 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
- * @param qos
- *
- * @param retained
- * @param topic
- * @param message
- */
- public void publish(int qos,boolean retained,String topic,String message){
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setQos(qos);
- mqttMessage.setRetained(retained);
- mqttMessage.setPayload(message.getBytes());
- //主题的目的地,用于发布/订阅信息
- MqttTopic mqttTopic = client.getTopic(topic);
- //提供一种机制来跟踪消息的传递进度
- //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
- MqttDeliveryToken token;
- try {
- //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
- //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
- token = mqttTopic.publish(mqttMessage);
- token.waitForCompletion();
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
- public String getUsername() {
- return username;
- }
- public void setUsername(String username) {
- this.username = username;
- }
- public String getPassword() {
- return password;
- }
- public void setPassword(String password) {
- this.password = password;
- }
- public String getHostUrl() {
- return hostUrl;
- }
- public void setHostUrl(String hostUrl) {
- this.hostUrl = hostUrl;
- }
- public String getClientId() {
- return clientId;
- }
- public boolean isConnected() {
- return client.isConnected();
- }
- public void setConnected(boolean connected) {
- isConnected = connected;
- }
- public MqttClient getClient() {
- return client;
- }
- public void setClient(MqttClient client) {
- this.client = client;
- }
- public MqttProviderCallBack getCallBack() {
- return callBack;
- }
- public void setCallBack(MqttProviderCallBack callBack) {
- client.setCallback(callBack);
- this.callBack = callBack;
- }
- public MqttConnectOptions getOptions() {
- return options;
- }
- public void setOptions(MqttConnectOptions options) {
- this.options = options;
- }
- public List<DataSource> getDataSourceList() {
- return dataSourceList;
- }
- public void setDataSourceList(List<DataSource> dataSourceList) {
- this.dataSourceList = dataSourceList;
- }
- }
|