2020-02-26 17:30:00

Spring Cloud 微服务入门教程(七):Spring Cloud Stream 消息队驱动式的微服务


Spring Cloud 微服务入门教程(七):Spring Cloud Stream 消息队驱动式的微服务

上一节的《Spring Cloud 微服务入门教程(六):Spring Cloud BUS 消息总线实现配置中心动态更新配置文件》已经安装了RabbitMQ消息队列,并实现了SpringCloudBus消息总线,本节介绍Spring Cloud Stream 消息队驱动式的微服务。可以使用RabbitMQ、Apache Kafka等,用于微服务之间的异步消息传递和接收。

我们先规划一下打算怎么做,直接上代码可能会有点难理解,我们要实现的是DemoClient给DemoService发送一条消息放入消息队列中,然后DemoService接收消息并且给DemoClient回复一条消息。我说一下我个人对这个Spring Cloud Stream的理解,消息被分为很多个频道,你可以接收某个频道,也可以对某个频道发送消息,所以你需要知道频道的名称,我就统一定义在统一接口中心里,这个统一接口中心是我自己设计的架构,并不是微服务的。在上一节我们已经配置过RabbitMQ,所以配置RabbitMQ的部分不再赘述。

修改apicenter、demoservice、democlient微服务的pom文件,增加spring-cloud-starter-stream-rabbit依赖,例如:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud</artifactId>
        <groupId>net.renfei</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <groupId>net.renfei</groupId>
    <artifactId>apicenter</artifactId>
    <version>1.0.0</version>
    <name>APICenter</name>
    <description>接口中心</description>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
</project>

修改远程Git上的配置文件

修改远程Git上的application.yml,包括democlient和demoservice的配置,增加:

spring:
  stream:
    bindings:
      demoServiceMQ:
        group: demo
        content-type: application/json
      demoClientMQ:
        group: demo
        content-type: application/json

bindings后面的是我们的频道名称,这个是自定义的,再后面的group是消费组,消费组防止消息被重复消费,微服务可能会启动多个实例组,保证每个组中只有一个成员会收到该消息,content-type是告诉框架我们要把对象作为json格式保存,这样方便我们在消息队列中查看对象的内容,调试起来会很方便。

统一接口中心新增频道名

在apicenter中新增一个net.renfei.apicenter.message.MQChannel的interface,用来规范和暴露所有微服务的频道名称:

package net.renfei.apicenter.message;

/**
 * 消息队列频道名称
 *
 * @author RenFei
 */
public interface MQChannel {
    String DEMOSERVICE = "demoServiceMQ";
    String DEMOCLIENT = "demoClientMQ";
}

消息接收端

在demoservice模块中新增net.renfei.demoservice.message.DemoServiceMessageClient作为接收客户端:

package net.renfei.demoservice.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface DemoServiceMessageClient {
    @Input(MQChannel.DEMOSERVICE)
    SubscribableChannel input();
}

在demoservice模块中新增net.renfei.demoservice.message.DemoClientMessageClient作为发送客户端:

package net.renfei.demoservice.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface DemoClientMessageClient {
    @Output
    MessageChannel output();
}

在demoservice模块中新增net.renfei.demoservice.message.DemoServiceReceiver作为消息监听者:

package net.renfei.demoservice.message;

import lombok.extern.slf4j.Slf4j;
import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * 服务端监听消息队列
 *
 * @author RenFei
 */
@Slf4j
@Component
@EnableBinding({DemoServiceMessageClient.class, DemoClientMessageClient.class})
public class DemoServiceReceiver {
    @StreamListener(MQChannel.DEMOSERVICE)
    @SendTo(MQChannel.DEMOCLIENT)
    public String process(Object message) {
        log.info("Messages received by the DemoService:{}", message);
        return "This is DemoServiceReceiver's reply";
    }
}

消息发送端

在democlient中新增net.renfei.democlient.message.DemoClientMessageClient作为消息接收客户端:

package net.renfei.democlient.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DemoClientMessageClient {
    @Input(MQChannel.DEMOCLIENT)
    SubscribableChannel input();
}

在democlient中新增net.renfei.democlient.message.DemoServiceMessageClient作为消息发送端:

package net.renfei.democlient.message;

import net.renfei.apicenter.message.MQChannel;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * DemoService的MQ频道客户端
 *
 * @author RenFei
 */
public interface DemoServiceMessageClient {
    @Output(MQChannel.DEMOSERVICE)
    MessageChannel output();
}

在democlient中新增net.renfei.democlient.controller.SendMessageController作为消息发送的触发入口:

package net.renfei.democlient.controller;

import net.renfei.democlient.message.DemoServiceMessageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessageController {
    @Autowired
    private DemoServiceMessageClient demoServiceMessageClient;
    
    @GetMapping("/sendMessage")
    public void sendMessage(){
        demoServiceMessageClient.output().send(
                MessageBuilder.withPayload("This is a message from democlient").build()
        );
    }
}

运行测试

先启动注册中心eureka,然后启动配置中心config,再启动demoservice服务,最后启动democlient,访问我们新建的DemoClientMessageClient,触发消息发送,演示系统的地址是:http://localhost:18081/sendMessage

消息服务发送端消息服务接收端

总结

代码已经陈述完了,做一下总结,@Input SubscribableChannel是订阅频道,用于接收消息;@Output MessageChannel是用来发送消息,这样应用之间可以解耦合降低依赖,比较经典的场景是发送短信,核心的业务不需要等待短信接口的结果,直接给短信服务发送一个消息以后就去干别的事了,短信服务接收到消息以后逐一执行发送短信的任务。


版权声明:本文为博主「任霏」原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://www.renfei.net/posts/1003327
评论与留言
以下内容均由网友提交发布,版权与真实性无法查证,请自行辨别。

本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!


本站有缓存策略,时间约2小时后能看到您的评论。本站使用自动审核机制,如果您的内容包含广告/谩骂/恐怖/暴力/涉政等不和谐内容将无法展示!

关注任霏博客
扫码关注「任霏博客」微信订阅号
微博:任霏博客网
Twitter:@renfeii
Facebook:任霏