转载

Spring Boot 2.x 快速集成整合消息中间件 Kafka

欢迎关注个人微信公众号: 小哈学Java , 每日推送 Java 领域干货文章,关注即免费无套路附送 100G 海量学习、面试资源哟!!

个人网站: https://www.exception.site/springboot/spring-boot2-kafka

什么是 Kafka?

Kafka 是 Apache 基金会开源的一个分布式发布 - 订阅消息中间件,流处理平台。 它起源于 LinkedIn,由 Scala 和 Java两种语言编写而成。于 2011 年成为 Apache 项目,2012 成为 Apache 基金会下顶级项目。

Kafka 专为分布式高吞吐系统而设计。相比较其他消息中间件,如 RabbitMq 等,Kafka 具有更好的吞吐量,内置分区,复制和固有的容错能力,使得它非常适合应用在大数据领域。另外,Kafka 还支持离线、在线消费消息。

为什么要用 Kafka

  • 低延迟 - Kafka 支持低延迟消息传递,速度极快,能达到 200w 写/秒;
  • 高性能 - Kafka对于消息的发布、订阅都具有高吞吐量。即使存储了 TB 级的消息,依然能够保证稳定的性能;
  • 可靠性 - Kafka 是分布式,分区,复制和容错的,保证零停机和零数据丢失。
  • 可拓展性 - Kafka 支持集群水平拓展。
  • 耐用性 - Kafka 使用"分布式提交日志",消息能够快速的持久化的磁盘上。

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Kafka 环境安装

接下来,小哈为大家演示一下,在 Linux 系统中,采用最简单的单机安装方式, 因为本文着重点还是介绍 Spring Boot 2.x 快速集成整合 Kafka.

下载 Kafka

访问 Kafka 官网 http://kafka.apache.org/downloads ,下载 tgz 包, 这里演示版本为最新的 2.3.0 版本。

Spring Boot 2.x 快速集成整合消息中间件 Kafka

解压,进入目录

下载下来过后,放置到指定位置,执行命令解压:

tar -zxvf kafka_2.11-2.3.0.tgz

解压完成后,进入 Kafka 目录下:

cd kafka_2.11-2.3.0

启动 zookeeper

通过 bin 目录下的 zookeeper-server-start.sh 启动脚本,来启动 zk 单节点实例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

启动 Kafka

通过 bin 目录下的 kafka-server-start.sh 来启动 :

bin/kafka-server-start.sh  config/server.properties

注意:Kafka 默认使用 9092 端口,注意关闭防火墙,阿里云服务器的话,记得添加安全组。

Spring Boot 2.x 开始整合

新建一个 Spring Boot 2.x Web 工程。

项目结构

Spring Boot 2.x 快速集成整合消息中间件 Kafka

添加 maven 依赖

小哈这里完整的 maven 依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>site.exception</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      
           <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

添加 kafka 配置

修改 application.yml 文件,添加 kafka 相关配置:

spring:
  kafka:
    # 指定 kafka 地址,我这里在本地,直接就 localhost, 若外网地址,注意修改【PS: 可以指定多个】
    bootstrap-servers: localhost:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

关于 auto-offset-reset

auto.offset.reset 配置有3个值可以设置,分别如下:

  • earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费;
  • latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;
  • none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常;

默认建议用 earliest , 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。

none 这个设置没有用过,兼容性太差,经常出问题。

新增一个订单类

模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 订单实体类
 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单id
     */
    private long orderId;
    /**
     * 订单号
     */
    private String orderNum;
    /**
     * 订单创建时间
     */
    private LocalDateTime createTime;
}

添加一个消息发布者

新建一个 KafkaProvider 消息提供者类,源码如下:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息提供者
 **/
@Component
@Slf4j
public class KafkaProvider {

    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "xiaoha";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 构建一个订单类
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 发送消息,订单类的 json 作为消息体
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("## Send message success ...");
            }
        });
    }
}

添加一个消息消费者

消息发送出去了,当然就需要一个消费者,消费者拿到消息后,再做相关的业务处理,这里,小哈仅仅是打印消息体。

添加 KafkaConsumer 消费者类:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息消费者
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "xiaoha", groupId = "group_id")
    public void consume(String message) {
        log.info("## consume message: {}", message);
    }
}

通过 @KafkaListener 注解,我们可以指定需要监听的 topic 以及 groupId , 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic ,如: @KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id")

注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。

单元测试

新建单元测试,功能测试消息发布,以及消费。

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {

    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        // 发送 1000 个消息
        for (int i = 0; i < 1000; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }

        TimeUnit.MINUTES.sleep(1);
    }
}

发送 1000 个消息,看消息是否能够被正常发布与消费,控制台日志如下:

Spring Boot 2.x 快速集成整合消息中间件 Kafka

可以发现,1000 个消息被成功发送,且被正常消费。

我们再验证下 Kafka 的 topic 列表,看 xiaoha 这个 topic 是否正常被创建, 执行 bin 目录下查看 topic 列表的 kafka-topics.sh 脚本:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Spring Boot 2.x 快速集成整合消息中间件 Kafka

好了,大功告成!

总结

小哈今天主要和大家分享了,如何安装单机版的 kafka 环境、如何在 Spring Boot 2.x 中快速集成消息中间件 Kafka,以及演示了相关示例代码来发布消息、消费消息,希望大家看完过后有所收获,下期见!

GitHub 源码地址

https://github.com/weiwosuoai/spring-boot-tutorial/tree/master/spring-boot-kafka

参考资料

https://zh.wikipedia.org/wiki/Kafka

https://www.w3cschool.cn/apache_kafka/

https://juejin.im/post/5d406a925188255d352ab24e

https://www.jianshu.com/p/e1df7d18bb8f

免费分享 | 面试&学习福利资源

最近在网上发现一个不错的 PDF 资源《Java 核心知识&面试.pdf》分享给大家,不光是面试,学习,你都值得拥有!!!

获取方式: 关注公众号: 小哈学Java , 后台回复 资源 ,既可 免费无套路获取资源链接 ,下面是目录以及部分截图:

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Spring Boot 2.x 快速集成整合消息中间件 Kafka

Spring Boot 2.x 快速集成整合消息中间件 Kafka

重要的事情说两遍,关注公众号: 小哈学Java , 后台回复 资源 ,既可 免费无套路获取资源链接 !!!

欢迎关注微信公众号: 小哈学Java

Spring Boot 2.x 快速集成整合消息中间件 Kafka

原文  https://segmentfault.com/a/1190000020093680
正文到此结束
Loading...