MQTT( Message Queuing Telemetry Transport)是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。在实际的开发中,我们通常会用到Spring,这里简单描述一下在SpringBoot中如何集成MQTT。
在Spring的一系列文档中,已经有了对应的集成代码。见:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
@Bean
publicMqttPahoClientFactorymqttClientFactory(){
List<String> urls = mqttUrls().getUrls();
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
return factory;
}
@Bean
publicIntegrationFlowmqttInFlow(){
return IntegrationFlows.from(mqttInbound())
.transform(p -> p)
.handle(mqttService.handler())
.get();
}
privateMessageProducerSupportmqttInbound(){
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("customer",
mqttClientFactory(), "test-topic");
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
@Bean
publicIntegrationFlowmqttOutFlow(){
return IntegrationFlows.from(outChannel())
.handle(mqttOutbound())
.get();
}
@Bean
publicMessageChanneloutChannel(){
return new DirectChannel();
}
@Bean
publicMessageHandlermqttOutbound(){
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test-topic");
return messageHandler;
}
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MessageWriter{
void write(String data);
}
生产者的使用可以为:
@Autowired
MessageWriter messageWriter
void publish(String data){
messageWriter.write(data)
}