转载

在SpringBoot中集成MQTT

MQTT( Message Queuing Telemetry Transport)是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。在实际的开发中,我们通常会用到Spring,这里简单描述一下在SpringBoot中如何集成MQTT。

在Spring的一系列文档中,已经有了对应的集成代码。见:

  • spring-integration-samples
  • Spring Integration MQTT Support

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置 mqttClientFactory

@Bean
publicMqttPahoClientFactorymqttClientFactory(){
    List<String> urls = mqttUrls().getUrls();
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://localhost:1883");
    return factory;
}

配置消费者

@Bean
publicIntegrationFlowmqttInFlow(){
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p)
            .handle(mqttService.handler())
            .get();
}

privateMessageProducerSupportmqttInbound(){
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("customer",
            mqttClientFactory(), "test-topic");
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

配置生产者

@Bean
publicIntegrationFlowmqttOutFlow(){
    return IntegrationFlows.from(outChannel())
            .handle(mqttOutbound())
            .get();
}

@Bean
publicMessageChanneloutChannel(){
    return new DirectChannel();
}

@Bean
publicMessageHandlermqttOutbound(){
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("test-topic");
    return messageHandler;
}
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MessageWriter{
    void write(String data);
}

生产者的使用可以为:

@Autowired
MessageWriter messageWriter

void publish(String data){
    messageWriter.write(data)
}

遇到的坑以及未解决的问题

  • 生产者和消费者的clientId一定一定不能一样
  • 未解决:多个topic和data的动态生产
原文  http://webfuse.cn/2017/08/17/在SpringBoot中集成MQTT/
正文到此结束
Loading...