转载

SpringCloud | Stream(1)-入门案例(使用RabbitMQ)

消息中间件主要用于应用解耦、异步处理、流量削峰等场景,实现高可用、高性能、可伸缩和最终一致性。但是不同的消息中间件实现方式不同,如RabbitMQ有exchange;Kafka有Topic、partition等概念。在实际环境中如果需要替换中间件,代码都需要重构,工具量巨大,也就是说中间件与系统耦合了,SpringCloud Stream提供了一种解耦合的方式。

1.1 概述

SpringCloud Stream通过Binder连接应用和中间件,应用通过Input把消息发送到中间件,通过Output从中间件获取消息。通过Binder与中间件交互,应用不需要关注中间件类型,只需要关注binder提供的接口实现业务逻辑即可。

SpringCloud | Stream(1)-入门案例(使用RabbitMQ)

1.2 核心概念

  • 绑定器 绑定器位于应用程序和中间件之间,用户只需要通过应用程序与绑定器交互,不再关心后续的中间件类型。
  • 发布/订阅模式 类似于RabbitMQ中的exchange、kafka中的topic
SpringCloud | Stream(1)-入门案例(使用RabbitMQ)

2. 入门案例

2.1 环境准备

RabbitMQ安装

docker run -it -d --name rabbitmq /
-p 5672:5672 -p 15672:15672 /
-e RABBITMQ_DEFAULT_USER=admin /
-e RABBITMQ_DEFAULT_PASS=admin  /
registry.cn-beijing.aliyuncs.com/buyimoutianxia/rabbitmq:V3.7.25
复制代码

2.2 消息生产者

2.2.1 导入依赖坐标

<!--stream producer依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
复制代码

2.2.2 配置application.yml

server:
  port: 8500

spring:
  application:
    name: streamproduder-8500
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        output:
          destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
      binders:  #绑定器
        defaultRabbit:
          type: rabbit

复制代码
  1. 创建模拟消息发送方法 com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class)
public class MyProducer implements CommandLineRunner {

    @Autowired
    private MessageChannel output;


    @Override
    public void run(String... args) throws Exception {
        output.send(MessageBuilder.withPayload("hello, my friend ...").build());
    }
}
复制代码
  1. 创建启动类
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}
复制代码
  1. 可以在rabbitmq控制台 http://localhost:15672 -->Exchanges中看到创建的 test-producer

2.3 消息消费者

  1. 导入依赖坐标
<!--stream consumer依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
复制代码
  1. 配置application.yml
server:
  port: 8501

spring:
  application:
    name: streamconsumer-8501
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        input:
          destination: test-producer #消息接收队列名称
      binders:  #绑定器
        defaultRabbit:
          type: rabbit
复制代码
  1. 模拟消息接收 com.xyz.stream.binding.MyConsumer
@EnableBinding(Sink.class)
public class MyConsumer {

    @StreamListener(Sink.INPUT)
    public void input(String message) {
        System.out.println("接收的消息:" + message);
    }

}
复制代码
  1. 创建启动类
@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
复制代码

2.4 创建消息发送的工具类和测试类

  1. 修改 com.xyz.stream.bingding.MyProducer
@EnableBinding(Source.class)
public class MyProducer {

    @Autowired
    private MessageChannel output;

    public void send(Object messsage) {
        output.send(MessageBuilder.withPayload(messsage).build());
    }

}
复制代码
  1. 创建测试类 com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestProducer {

    @Autowired
    private MyProducer myProducer;

    @Test
    public void test() {
        myProducer.send("test method ...");
    }

}
复制代码

2.5 自定义消息通道

2.5.1 Producer改造

  1. 自定义channel com.xyz.stream.channel.MyProcess
public interface MyProcess {

    /**
     * 自定义消息生产者channel
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();


    /**
     * 自定义消息消费者channel
     */
    String MYINPUT = "myinput";

    @Input("myinput")
    SubscribableChannel myinput();
}
复制代码
  1. 修改binding com.xyz.stream.bingding.MyProducer
@EnableBinding(MyProcess.class)
public class MyProducer {

    @Autowired
    private MessageChannel myoutput;

    public void send(Object messsage) {
        myoutput.send(MessageBuilder.withPayload(messsage).build());
    }

}
复制代码
  1. 修改application.yml增加自定义channel
server:
  port: 8500

spring:
  application:
    name: streamproduder-8500
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        output:
          destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
        myoutput:
          destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange

      binders:  #绑定器
        defaultRabbit:
          type: rabbit
复制代码

2.5.2 Consumer改造

  1. 自定义channel com.xyz.stream.channel.MyProcess
public interface MyProcess {

    /**
     * 自定义消息生产者channel
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();


    /**
     * 自定义消息消费者channel
     */
    String MYINPUT = "myinput";

    @Input("myinput")
    SubscribableChannel myinput();
}
复制代码
  1. 修改binding com.xyz.stream.binding.MyConsumer
@EnableBinding(MyProcess.class)
public class MyConsumer {

    @StreamListener(MyProcess.MYINPUT)
    public void input(String message) {
        System.out.println("接收的消息:" + message);
    }

}
复制代码
  1. 修改application.yml增加自定义channel
server:
  port: 8501

spring:
  application:
    name: streamconsumer-8501
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        input:
          destination: test-producer #消息接收队列名称
        myinput:
          destination: costom-topic #消息接收队列名称
      binders:  #绑定器
        defaultRabbit:
          type: rabbit
复制代码

2.6 消息分组

通常在生产环境下,服务都不会以单点的方式运行。当一个服务启动多个实例的时候,这些实例都会绑定到同一个topic上。默认情况下,生产者产生一条消息发送到topic时,这条消息会被消费者的多个实例都接收,但在有些业务场景下,我们希望这个消息只能被一个消费者接收,就需要在消费者中通过配置消费者组的方式来实现这样的功能。

spring:
  application:
    name: streamconsumer-8501
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        input:
          destination: test-producer #消息接收队列名称
        myinput:
          destination: costom-topic #消息接收队列名称
          group: mygroup #设置消费者组
      binders:  #绑定器
        defaultRabbit:
          type: rabbit

复制代码

2.8 消息分区

为了满足相同特征的生产数据能够被同一个消费者实例所接收,需要使用消息分区的功能。

  1. 改造生产者8500的配置,增加 spring.cloud.stream.bindings.output.producer.partitionKeyExpressionspring.cloud.stream.bindings.output.producer.partitionCount 2个配置标签
spring:
  application:
    name: streamproduder-8500
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        output:
          destination: test-producer #消息发送的目的地,在rabbitmq中指的是exchange
        myoutput:
          destination: costom-topic #消息发送的目的地,在rabbitmq中指的是exchange
          producer:
            partitionKeyExpression: payload #分区关键字
            partitionCount: 2 #分区总数量
      binders:  #绑定器
        defaultRabbit:
          type: rabbit
复制代码
  1. 改造消费者8501的配置,增加 spring.cloud.stream.bindings.input.consumer.partitionedspring.cloud.stream.instanceIndexspring.cloud.stream.instanceCount 配置
spring:
  application:
    name: streamconsumer-8501
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        input:
          destination: test-producer #消息接收队列名称
        myinput:
          destination: costom-topic #消息接收队列名称
          group: mygroup #设置消费者组
          consumer:
            partitioned: true #消费者端开启对分区的支持
      binders:  #绑定器
        defaultRabbit:
          type: rabbit
      instanceIndex: 0
      instanceCount: 2
复制代码
  1. 增加消费者8502,修改配置
spring:
  application:
    name: streamconsumer-8502
  rabbitmq:  #rabbiqmq连接信息
    host: localhost
    port: 5672
    username: admin
    password: admin
  cloud:
    stream:
      bindings:  #stream channel
        input:
          destination: test-producer #消息接收队列名称
        myinput:
          destination: costom-topic #消息接收队列名称
          group: mygroup #设置消费者组
          consumer:
            partitioned: true #消费者端开启对分区的支持
      binders:  #绑定器
        defaultRabbit:
          type: rabbit
      instanceIndex: 1
      instanceCount: 2
复制代码
  1. 修改生产者测试函数 com.xyz.stream.TestProducer
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TestProducer {

    @Autowired
    private MyProducer myProducer;

    @Test
    public void test() {
        for (int i = 0; i < 5; i++) {

            myProducer.send(i);

        }
    }

}
复制代码
原文  https://juejin.im/post/5e9fe37f6fb9a03c8a415d42
正文到此结束
Loading...