标签搜索

订阅模式 代码

admin
2025-04-03 / 0 评论 / 0 阅读 / 正在检测是否收录...

一 . 简单整合

  1. 添加RabbitMQ依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 配置application.yml

    spring:
    rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: test
    password: 123456

    virtual-host: /

  3. 新建RabbitMQConfig
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 测试队列名称
private String testQueueName = test_queue;
// 测试交换机名称
private String testExchangeName = test_exchange;
// RoutingKey
private String testRoutingKey = test_routing_key;
/** 创建队列 */
@Bean
public Queue testQueue() {
    return new Queue(testQueueName);
}
/** 创建交换机 */
@Bean
public TopicExchange testExchange() {
    return new TopicExchange(testExchangeName);
}
/** 通过routingKey把队列与交换机绑定起来 */
@Bean
public Binding testBinding() {
    return BindingBuilder.bind(testQueue()).to(testExchange()).with(testRoutingKey);
}
}
  1. 新建生产者(producer)
package com.example.producer;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName) {
    JSONObject jsonObject = new JSONObject();
    jsonObject.put(email, 756840349@qq.com);
    jsonObject.put(timestamp, System.currentTimeMillis());
    String jsonString = jsonObject.toJSONString();
    // 生产者发送消息的时候需要设置消息id
    Message message = MessageBuilder.withBody(jsonString.getBytes())
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(utf-8)
            .build();
    rabbitTemplate.convertAndSend(queueName, message);
}

}

  1. 新建消费者(consumer)

    package com.example.listener;
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    @Component
    public class FanoutSmsConsumer {
    @RabbitListener(queues = test_queue)
    public void consumeMessage(Message message) throws Exception{
    String msg = new String(message.getBody(), UTF-8);
    JSONObject jsonObject = JSONObject.parseObject(msg);
    System.out.println(消费消息: + jsonObject);
    }
    }
  2. 编写controller测试

    package com.example.controller;
    import com.example.producer.FanoutProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class ProducerController {
    @Autowired
    private TestProducer TestProducer;
    @RequestMapping(/sendMsg)
    public String sendFanout() {
    fanoutProducer.send(test_queue);
    return success;
    }
    }

    浏览器访问localhost:8080/sendMsg,会返回success,并且控制台监听器会打印消息:

0

评论 (0)

取消