在SpringBoot中集成MQTT

MQTT( Message Queuing Telemetry Transport)是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。在实际的开发中,我们通常会用到Spring,这里简单描述一下在SpringBoot中如何集成MQTT。

在Spring的一系列文档中,已经有了对应的集成代码。见:

  • spring-integration-samples
  • Spring Integration MQTT Support

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置 mqttClientFactory

@Bean
publicMqttPahoClientFactorymqttClientFactory(){
    List<String> urls = mqttUrls().getUrls();
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://localhost:1883");
    return factory;
}

配置消费者

@Bean
publicIntegrationFlowmqttInFlow(){
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p)
            .handle(mqttService.handler())
            .get();
}

privateMessageProducerSupportmqttInbound(){
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("customer",
            mqttClientFactory(), "test-topic");
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

配置生产者

@Bean
publicIntegrationFlowmqttOutFlow(){
    return IntegrationFlows.from(outChannel())
            .handle(mqttOutbound())
            .get();
}

@Bean
publicMessageChanneloutChannel(){
    return new DirectChannel();
}

@Bean
publicMessageHandlermqttOutbound(){
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("test-topic");
    return messageHandler;
}
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MessageWriter{
    void write(String data);
}

生产者的使用可以为:

@Autowired
MessageWriter messageWriter

void publish(String data){
    messageWriter.write(data)
}

遇到的坑以及未解决的问题

  • 生产者和消费者的clientId一定一定不能一样
  • 未解决:多个topic和data的动态生产

原文 

http://webfuse.cn/2017/08/17/在SpringBoot中集成MQTT/

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 在SpringBoot中集成MQTT

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址