Spring Cloud Stream 概念

Spring Cloud Stream 作为用来构建消息驱动的微服务。

Spring Cloud Stream 中,提供了一个微服务和消息中间件之间的一个粘合剂,这个粘合剂叫做 Binder,Binder 负责与消息中间件进行交互。而我们开发者则通过 inputs 或者 outputs 这样的消息通道与 Binder 进行交互。


默认消息通道:Hello,stream。

  1. 首先创建一个 Spring Cloud Stream 项目,在其中添加三个依赖:web、rabbitmq、cloud stream。
  2. 项目创建成功后,添加 RabbitMQ 的基本配置信息。
  3. 接下来,创建一个简单的消息接收器 MsgReceiver。
  4. 单启动 stream 项目,然后在 rabbitmq 后台管理页面发送一条测试消息。
    http://localhost:15672/ 
    Queues -> Queue input.anonymous.cgUOBnM6Qc-Ek7paO3zPKA -> Publish message -> Payload -> published Message。
  5. 后台会返回信息:Received:hello,sihai。

自定义消息通道

  1. 首先创建一个名为 MyChannel 的接口,注意:从 F 版开始,默认使用通道的名称作为实例命令,所以现在这里的两个消息通道的名称不可以一致 (在早起版本中可以相同),这样的话,为了能够正常的收发消息,需要我们做一些额外的配置。
  2. 因为现在这两个输入输出通道名称不一样,为了使消息输入输出通道对接上,所以我们需要在 application.yml 中配置。
  3. 自定义一个消息接收器,用来接收自己的消息通道里的消息。
  4. 定义一个 HelloController 作为测试。
  5. 最后启动项目,访问一下 “http://localhost:8080/hello" ,查看一下后台是否有返回消息。

Spring Cloud Stream 消息分组

默认情况下,如果消费者是一个集群的话,那么此时一条消息会被多次消费。那么我们可以通过消息分组来解决这个问题。

  1. 在 application.yml 中配置信息分组,分组之后,即使使用了集群,消息也只会被消费一次。
  2. 我们可以将 stream 项目打包,同时启动多个实例来模拟集群,可以发现,在集群下,消息也只会被一个实例消费,至于是哪一个这个不可控,因为暂时配置了一个分组。