Spring Cloud 微服务入门教程(七):Spring Cloud Stream 消息队驱动式的微服务
上一节的《Spring Cloud 微服务入门教程(六):Spring Cloud BUS 消息总线实现配置中心动态更新配置文件》已经安装了RabbitMQ消息队列,并实现了SpringCloudBus消息总线,本节介绍Spring Cloud Stream 消息队驱动式的微服务。可以使用RabbitMQ、Apache Kafka等,用于微服务之间的异步消息传递和接收。
我们先规划一下打算怎么做,直接上代码可能会有点难理解,我们要实现的是DemoClient给DemoService发送一条消息放入消息队列中,然后DemoService接收消息并且给DemoClient回复一条消息。我说一下我个人对这个Spring Cloud Stream的理解,消息被分为很多个频道,你可以接收某个频道,也可以对某个频道发送消息,所以你需要知道频道的名称,我就统一定义在统一接口中心里,这个统一接口中心是我自己设计的架构,并不是微服务的。在上一节我们已经配置过RabbitMQ,所以配置RabbitMQ的部分不再赘述。
修改apicenter、demoservice、democlient微服务的pom文件,增加spring-cloud-starter-stream-rabbit依赖,例如:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud</artifactId>
<groupId>net.renfei</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>net.renfei</groupId>
<artifactId>apicenter</artifactId>
<version>1.0.0</version>
<name>APICenter</name>
<description>接口中心</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
</project>
修改远程Git上的配置文件
修改远程Git上的application.yml,包括democlient和demoservice的配置,增加:
spring:
stream:
bindings:
demoServiceMQ:
group: demo
content-type: application/json
demoClientMQ:
group: demo
content-type: application/json
bindings后面的是我们的频道名称,这个是自定义的,再后面的group是消费组,消费组防止消息被重复消费,微服务可能会启动多个实例组,保证每个组中只有一个成员会收到该消息,content-type是告诉框架我们要把对象作为json格式保存,这样方便我们在消息队列中查看对象的内容,调试起来会很方便。
统一接口中心新增频道名
在apicenter中新增一个net.renfei.apicenter.message.MQChannel的interface,用来规范和暴露所有微服务的频道名称:
package net.renfei.apicenter.message;
/**
* 消息队列频道名称
*
* @author RenFei
*/
public interface MQChannel {
String DEMOSERVICE = "demoServiceMQ";
String DEMOCLIENT = "demoClientMQ";
}
消息接收端
在demoservice模块中新增net.renfei.demoservice.message.DemoServiceMessageClient作为接收客户端:
package net.renfei.demoservice.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface DemoServiceMessageClient {
@Input(MQChannel.DEMOSERVICE)
SubscribableChannel input();
}
在demoservice模块中新增net.renfei.demoservice.message.DemoClientMessageClient作为发送客户端:
package net.renfei.demoservice.message;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface DemoClientMessageClient {
@Output
MessageChannel output();
}
在demoservice模块中新增net.renfei.demoservice.message.DemoServiceReceiver作为消息监听者:
package net.renfei.demoservice.message;
import lombok.extern.slf4j.Slf4j;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* 服务端监听消息队列
*
* @author RenFei
*/
@Slf4j
@Component
@EnableBinding({DemoServiceMessageClient.class, DemoClientMessageClient.class})
public class DemoServiceReceiver {
@StreamListener(MQChannel.DEMOSERVICE)
@SendTo(MQChannel.DEMOCLIENT)
public String process(Object message) {
log.info("Messages received by the DemoService:{}", message);
return "This is DemoServiceReceiver's reply";
}
}
消息发送端
在democlient中新增net.renfei.democlient.message.DemoClientMessageClient作为消息接收客户端:
package net.renfei.democlient.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface DemoClientMessageClient {
@Input(MQChannel.DEMOCLIENT)
SubscribableChannel input();
}
在democlient中新增net.renfei.democlient.message.DemoServiceMessageClient作为消息发送端:
package net.renfei.democlient.message;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* DemoService的MQ频道客户端
*
* @author RenFei
*/
public interface DemoServiceMessageClient {
@Output(MQChannel.DEMOSERVICE)
MessageChannel output();
}
在democlient中新增net.renfei.democlient.controller.SendMessageController作为消息发送的触发入口:
package net.renfei.democlient.controller;
import net.renfei.democlient.message.DemoServiceMessageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessageController {
@Autowired
private DemoServiceMessageClient demoServiceMessageClient;
@GetMapping("/sendMessage")
public void sendMessage(){
demoServiceMessageClient.output().send(
MessageBuilder.withPayload("This is a message from democlient").build()
);
}
}
运行测试
先启动注册中心eureka,然后启动配置中心config,再启动demoservice服务,最后启动democlient,访问我们新建的DemoClientMessageClient,触发消息发送,演示系统的地址是:http://localhost:18081/sendMessage


总结
代码已经陈述完了,做一下总结,@Input SubscribableChannel是订阅频道,用于接收消息;@Output MessageChannel是用来发送消息,这样应用之间可以解耦合降低依赖,比较经典的场景是发送短信,核心的业务不需要等待短信接口的结果,直接给短信服务发送一个消息以后就去干别的事了,短信服务接收到消息以后逐一执行发送短信的任务。
商业用途请联系作者获得授权。
版权声明:本文为博主「任霏」原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接及本声明。
相关推荐
猜你还喜欢这些内容,不妨试试阅读一下评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。微信订阅号
扫码关注「任霏博客」微信订阅号- 大佬 引入jar包那里的 driver class 怎么选的?
- 我也遇到了这个问题,已经解决了,在此分享一下 1、宿主机也要创建kingbase的用户和用户组,并且要查看一下用户和用户组的ID(这个很重要) 2、把data目录的用户和用户组设置为kingbase 3、先不要把data路径挂载到宿主机上,这时就可以正常启动,启动后进入容器,查看一下容器内的kingbase的用户和用户组ID是多少,和第一步的ID是否一致,如果ID一致,那正常挂载目录就行;如果ID不一致,那就需要修改Dockerfile文件,在构建镜像时,修改容器内的用户和用户组ID,必须和宿主机的保持一致。然后重新构建镜像,就可以正常挂载宿主机目录了 4、其实直接修改宿主机的用户和用户组ID也是可以的,但是容器内的ID一般是1000,但是宿主机的这个ID很可能已经被占用了,无法修改,就只能修改容器内的ID
- 接口已经允许跨域请求,也就是说你可以在你的页面上调用,获取用户的公网 IP。 如果你还需要其他需求,可以提交 Issue 给我。
- V008R003C002B0320 这个对应的jdbc链接驱动你在哪里找到的?我也遇到了这个问题。
- WARNING: max_connections should be less than orequal than 10 (restricted by license) HINT: the value of max_connect is set 10 WARNING: max_connections should be less than orequal than 10 (restricted by license) HINT: the value of max_connect is set 10 kingbase: superuser_reserved_connections must be less than max_connections 我按照文档修改了以后,不知道如何重启。
- 然后把数字都改成 1 再启动。 如何重新启动?
- ksql: could not connect to server: No such file or directory Is the server running locally and accepting connections on Unix domain socket "/tmp/.s.KINGBASE.54321"
- 进入容器查看一下日志,是不是启动失败了,日志文件在:/opt/kingbase/logfile
- ksql: could not connect to server: No such file or directory Is the server running locally and accepting connections on Unix domain socket "/tmp/.s.KINGBASE.54321"?
- 先通过 docker exec -it 容器名/id /bin/bash 进入容器,然后在容器中使用 ksql 客户端进行连接数据库:/opt/kingbase/Server/bin/ksql -U system test
- 免费.ml域名10年委托合同到期被马里共和国收回域名经营权
- 从极狐Gitlab看各种中间件技术选型
- 时隔十年首次收到 Google AdSense 的付款
- ga域名被加蓬共和国从Freenom公司手中收回域名经营权
- Freenom 被 Meta(Facebook) 起诉导致暂停 .tk/.ga/.ml/.cf/.gq 等新域名注册
- 生花妙笔信手来 – 基于 Amazon SageMaker 使用 Grounded-SAM 加速电商广告素材生成 [1]
- github.renfei.net 不再完整代理 Github 页面改为代理指定文件
- 优雅的源代码管理(三):本地优雅的使用 Git Rebase 变基
- 优雅的源代码管理(二):Git 的工作原理
- 优雅的源代码管理(一):版本控制系统 VCS(Version Control System)与软件配置管理 SCM(Software Configuration Management)