MQ-Spring Cloud与Kafka

概观

本示例项目演示了如何 使用事件驱动架构,  Spring Boot,Spring Cloud Stream,  Apache Kafka  和  Lombok构建  实时流应用程序  。

到本教程结束时,您将拥有一个简单的基于Spring Boot的Greetings微服务

  1. 从REST API获取消息
  2. 将它写入Kafka主题
  3. 从主题中读取它
  4. 输出到控制台

让我们开始吧!

顺便说一句,你会在这里找到源代码。

什么是Spring Cloud?

Spring Cloud Stream是基于Spring Boot构建的构建消息驱动微服务的框架。

什么是卡夫卡?

Kafka是最早由LinkedIn开发的流行的高性能和可水平扩展的消息传递平台。

安装Kafka

从这里下载Kafka   并解压缩它:


 

启动Zookeeper和Kafka

在Windows上:


 

在Linux或Mac上:


 

如果卡夫卡没有运行,并且在计算机从休眠状态唤醒后无法启动,请删除该   <TMP_DIR>/kafka-logs文件夹,然后再次启动卡夫卡。

什么是Lombok?

Lombok是一个在代码中自动生成getter,setter,toString(),构建器,记录器等的java框架。

Maven依赖关系

转到  https://start.spring.io  创建一个maven项目:

  1. 添加必要的依赖关系:   Spring Cloud Stream,   Kafka,   Devtools (在开发过程中的热重新部署,可选),  Actuator (用于监测应用,可选配),  Lombok (确保也有安装在您的IDE龙目插件)
  2. 点击Generate Project按钮将项目下载为zip文件
  3. 提取zip文件并将maven项目导入到您最喜欢的IDE中

注意 pom.xml 文件中的maven依赖关系  :


 

……也是该   <dependencyManagement> 部分:


 

…和   <repository> 部分:


 

定义Kafka流


 

为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流将消息写入Kafka主题,并且需要一个入站流来读取来自Kafka主题的消息。

Spring Cloud提供了一种简便的方法来创建一个界面,为每个流定义一个单独的方法。

该   inboundGreetings() 方法定义了要从Kafka读取的入站流,并且该   outboundGreetings() 方法定义了写入Kafka的出站流。

在运行期间,Spring将创建一个基于java代理的 GreetingsStreams 接口实现,该  接口可以作为Spring Bean注入代码中的任何位置以访问我们的两个流。

配置Spring云流

我们的下一步是配置Spring Cloud Stream以在 GreetingsStreams 接口中绑定到我们的流  。这可以通过 使用下面的代码创建一个 @Configuration 类  来完成   com.kaviddiss.streamkafka.config.StreamsConfig


 

绑定流是使用 接口传递给它的 @EnableBinding 注释  完成的   GreatingsService

Kafka的配置属性

默认情况下,配置属性存储在   src/main/resources/application.properties 文件中。

但是我更喜欢使用YAML格式,因为它不那么冗长,并且允许在同一个文件中保持常见和特定于环境的属性。

现在,让我们重命名   application.properties 到   application.yaml 和粘贴下面的配置片断到文件:


 

上述配置属性配置了要连接的Kafka服务器的地址,以及我们用于代码中入站和出站流的Kafka主题。他们都必须使用相同的卡夫卡主题!

这些   contentType 属性告诉Spring Cloud Stream在流中发送/接收我们的消息对象   String

创建消息对象

com.kaviddiss.streamkafka.model.Greetings 用下面的代码创建一个简单的  类,它代表我们读取的消息对象并写入   greetings Kafka主题:


 

请注意,由于Lombok注释,该班级没有任何获得者和设置者。该   @ToString 会 toString() 使用类的字段生成一个  方法,这个   @Builder 注释将允许我们 Greetings 使用流利的构建器来创建  对象(见下文)。

创建服务层以写入Kafka

让我们 com.kaviddiss.streamkafka.service.GreetingsService 用下面的代码创建这个  类,它将把一个 Greetings对象  写入   greetings Kafka主题:


 

该   @Service 注释将配置该类作为一个Spring bean并注入   GreetingsService 通过构造依赖。

该   @Slf4j 注释将生成我们可用于记录的SLF4J记录器字段。

在该   sendGreeting() 方法中,我们使用注入的   GreetingsStream 对象来发送由 Greetings 对象表示的消息  。

创建REST API

现在我们将创建一个REST api端点,它将触发使用 GreetingsService Spring Bean 向Kafka发送消息  :


 

该   @RestController 注解告诉春天,这是一个控制器豆(该C从MVC)。该   greetings() 方法定义了一个   HTTP GET /greetings 端点,它接受一个   message 请求参数并将其传递给该   sendGreeting() 方法   GreetingsService

聆听卡夫卡主题的问候

我们创建一个   com.kaviddiss.streamkafka.service.GreetingsListener 类来监听 greetingsKafka主题上的消息  并将它们记录在控制台上:


 

该   @Component 注释 @Service 与 @RestController Spring Bean 类似,  并  定义了它。

GreetingsListener 有一个单一的方法,   handleGreetings() 将由Spring Cloud Stream与 Kafka主题 Greetings 上的每个新消息对象  一起调用   greetings。这要归功于 @StreamListener 为该 handleGreetings() 方法配置的  注释  。

运行应用程序

最后一块难题是 com.kaviddiss.streamkafka.StreamKafkaApplication Spring初始化程序自动生成的  类:


 

无需在此进行任何更改。您可以从IDE运行此类作为Java应用程序,也可以使用Spring Boot maven插件从命令行运行应用程序:


 

一旦应用程序运行,请 在浏览器中转到  http:// localhost:8080 / greetings?message = hello并检查您的控制台。

原创于 【模棱博客】