一 . 简单整合
添加RabbitMQ依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: test password: 123456
virtual-host: /
- 新建RabbitMQConfig
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 测试队列名称
private String testQueueName = test_queue;
// 测试交换机名称
private String testExchangeName = test_exchange;
// RoutingKey
private String testRoutingKey = test_routing_key;
/** 创建队列 */
@Bean
public Queue testQueue() {
return new Queue(testQueueName);
}
/** 创建交换机 */
@Bean
public TopicExchange testExchange() {
return new TopicExchange(testExchangeName);
}
/** 通过routingKey把队列与交换机绑定起来 */
@Bean
public Binding testBinding() {
return BindingBuilder.bind(testQueue()).to(testExchange()).with(testRoutingKey);
}
}
- 新建生产者(producer)
package com.example.producer;
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName) {
JSONObject jsonObject = new JSONObject();
jsonObject.put(email, 756840349@qq.com);
jsonObject.put(timestamp, System.currentTimeMillis());
String jsonString = jsonObject.toJSONString();
// 生产者发送消息的时候需要设置消息id
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(utf-8)
.build();
rabbitTemplate.convertAndSend(queueName, message);
}
}
新建消费者(consumer)
package com.example.listener; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutSmsConsumer { @RabbitListener(queues = test_queue) public void consumeMessage(Message message) throws Exception{ String msg = new String(message.getBody(), UTF-8); JSONObject jsonObject = JSONObject.parseObject(msg); System.out.println(消费消息: + jsonObject); } }
编写controller测试
package com.example.controller; import com.example.producer.FanoutProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private TestProducer TestProducer; @RequestMapping(/sendMsg) public String sendFanout() { fanoutProducer.send(test_queue); return success; } }
浏览器访问localhost:8080/sendMsg,会返回success,并且控制台监听器会打印消息:
评论 (0)