订阅模式 代码

霄
2022-07-24 / 0 评论 / 66 阅读 / 正在检测是否收录...

一 . 简单整合

  1. 添加RabbitMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: test
password: 123456

virtual-host: /

  1. 新建RabbitMQConfig
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 测试队列名称
private String testQueueName = test_queue;
// 测试交换机名称
private String testExchangeName = test_exchange;
// RoutingKey
private String testRoutingKey = test_routing_key;
/** 创建队列 */
@Bean
public Queue testQueue() {
    return new Queue(testQueueName);
}
/** 创建交换机 */
@Bean
public TopicExchange testExchange() {
    return new TopicExchange(testExchangeName);
}
/** 通过routingKey把队列与交换机绑定起来 */
@Bean
public Binding testBinding() {
    return BindingBuilder.bind(testQueue()).to(testExchange()).with(testRoutingKey);
}
}
  1. 新建生产者(producer)
package com.example.producer;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(email, 756840349@qq.com);
    jsonObject.put(timestamp, System.currentTimeMillis());
    String jsonString = jsonObject.toJSONString();
    // 生产者发送消息的时候需要设置消息id
    Message message = MessageBuilder.withBody(jsonString.getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(utf-8)
            .build();
    rabbitTemplate.convertAndSend(queueName, message);
}

}

  1. 新建消费者(consumer)
package com.example.listener;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutSmsConsumer {
@RabbitListener(queues = test_queue)
public void consumeMessage(Message message) throws Exception{
String msg = new String(message.getBody(), UTF-8);
JSONObject jsonObject = JSONObject.parseObject(msg);
System.out.println(消费消息: + jsonObject);
}
}
  1. 编写controller测试
package com.example.controller;
import com.example.producer.FanoutProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
private TestProducer TestProducer;
@RequestMapping(/sendMsg)
public String sendFanout() {
fanoutProducer.send(test_queue);
return success;
}
}

浏览器访问localhost:8080/sendMsg,会返回success,并且控制台监听器会打印消息:

扫描二维码,在手机上阅读!
73

评论

博主关闭了当前页面的评论