首页
归档
关于
Search
1
C服务器端
9 阅读
2
1.数据流图(下午题)
8 阅读
3
管道处理模型
8 阅读
4
数据结构与算法
8 阅读
5
3.面向对象设计
7 阅读
软件设计师笔记
.Net
Java
数据库
PHP
运维
前端
Python
中间件相关
云原生
架构设计
Search
标签搜索
websocket
科技新闻
Bi8bo
累计撰写
267
篇文章
累计收到
2
条评论
首页
栏目
软件设计师笔记
.Net
Java
数据库
PHP
运维
前端
Python
中间件相关
云原生
架构设计
页面
归档
关于
搜索到
17
篇与
的结果
2025-04-03
Redis集群方案
只需要监听主库即可
2025年04月03日
4 阅读
0 评论
0 点赞
2025-04-03
RedisTemplate之opsForValue使用说明
1、set(K key, V value) 新增一个字符串类型的值,key是键,value是值。redisTemplate.opsForValue().set("stringValue","bbb"); 2、get(Object key) 获取key键对应的值。String stringValue = redisTemplate.opsForValue().get("key")3、append(K key, String value)在原有的值基础上新增字符串到末尾。redisTemplate.opsForValue().append("key", "appendValue"); String stringValueAppend = redisTemplate.opsForValue().get("key"); System.out.println("通过append(K key, String value)方法修改后的字符串:"+stringValueAppend); 4、get(K key, long start, long end)截取key键对应值得字符串,从开始下标位置开始到结束下标的位置(包含结束下标)的字符串。String cutString = redisTemplate.opsForValue().get("key", 0, 3); System.out.println("通过get(K key, long start, long end)方法获取截取的字符串:"+cutString); 5、getAndSet(K key, V value) 获取原来key键对应的值并重新赋新值。String oldAndNewStringValue = redisTemplate.opsForValue().getAndSet("key", "ccc"); System.out.print("通过getAndSet(K key, V value)方法获取原来的值:" + oldAndNewStringValue ); String newStringValue = redisTemplate.opsForValue().get("key"); System.out.println("修改过后的值:"+newStringValue); 6、setBit(K key, long offset, boolean value) key键对应的值value对应的ascii码,在offset的位置(从左向右数)变为value。redisTemplate.opsForValue().setBit("key",1,false); newStringValue = redisTemplate.opsForValue().get("key")+""; System.out.println("通过setBit(K key,long offset,boolean value)方法修改过后的值:"+newStringValue); 7、getBit(K key, long offset) 判断指定的位置ASCII码的bit位是否为1。boolean bitBoolean = redisTemplate.opsForValue().getBit("key",1); System.out.println("通过getBit(K key,long offset)方法判断指定bit位的值是:" + bitBoolean); 8、size(K key) 获取指定字符串的长度Long stringValueLength = redisTemplate.opsForValue().size("key"); System.out.println("通过size(K key)方法获取字符串的长度:"+stringValueLength); 9、increment(K key, double delta) 以增量的方式将double值存储在变量中。double stringValueDouble = redisTemplate.opsForValue().increment("doubleKey",5); System.out.println("通过increment(K key, double delta)方法以增量方式存储double值:" + stringValueDouble); 10、increment(K key, long delta) 以增量的方式将long值存储在变量中。double stringValueLong = redisTemplate.opsForValue().increment("longKey",6); System.out.println("通过increment(K key, long delta)方法以增量方式存储long值:" + stringValueLong); 11、setIfAbsent(K key, V value) 如果键不存在则新增,存在则不改变已经有的值。boolean absentBoolean = redisTemplate.opsForValue().setIfAbsent("absentKey","fff"); System.out.println("通过setIfAbsent(K key, V value)方法判断变量值absentValue是否存在:" + absentBoolean); if(absentBoolean){ String absentValue = redisTemplate.opsForValue().get("absentKey")+""; System.out.print(",不存在,则新增后的值是:"+absentValue); boolean existBoolean = redisTemplate.opsForValue().setIfAbsent("absentKey","eee"); System.out.print(",再次调用setIfAbsent(K key, V value)判断absentValue是否存在并重新赋值:" + existBoolean); if(!existBoolean){ absentValue = redisTemplate.opsForValue().get("absentKey")+""; System.out.print("如果存在,则重新赋值后的absentValue变量的值是:" + absentValue); 12、set(K key, V value, long timeout, TimeUnit unit) 设置变量值的过期时间。redisTemplate.opsForValue().set("timeOutKey", "timeOut", 5, TimeUnit.SECONDS); String timeOutValue = redisTemplate.opsForValue().get("timeOutKey")+""; System.out.println("通过set(K key, V value, long timeout, TimeUnit unit)方法设置过期时间,过期之前获取的数据:"+timeOutValue); Thread.sleep(5*1000); timeOutValue = redisTemplate.opsForValue().get("timeOutKey")+""; System.out.print(",等待10s过后,获取的值:"+timeOutValue); 13、set(K key, V value, long offset) 覆盖从指定位置开始的值。redisTemplate.opsForValue().set("absentKey","dd",1); String overrideString = redisTemplate.opsForValue().get("absentKey"); System.out.println("通过set(K key, V value, long offset)方法覆盖部分的值:"+overrideString); 14、multiSet(Map<? extends K,? extends V> map) 设置map集合到redis。Map valueMap = new HashMap(); valueMap.put("valueMap1","map1"); valueMap.put("valueMap2","map2"); valueMap.put("valueMap3","map3"); redisTemplate.opsForValue().multiSet(valueMap); 15、multiGet(Collection keys) 根据集合取出对应的value值。//根据List集合取出对应的value值 List paraList = new ArrayList(); paraList.add("valueMap1"); paraList.add("valueMap2"); paraList.add("valueMap3"); List<String> valueList = redisTemplate.opsForValue().multiGet(paraList); for (String value : valueList){ System.out.println("通过multiGet(Collection<K> keys)方法获取map值:" + value); }16、multiSetIfAbsent(Map<? extends K,? extends V> map)Map valueMap = new HashMap(); valueMap.put("valueMap1","map1"); valueMap.put("valueMap2","map2"); valueMap.put("valueMap3","map3"); redisTemplate.opsForValue().multiSetIfAbsent(valueMap);
2025年04月03日
4 阅读
0 评论
0 点赞
2025-04-03
Redis五大基本类型底层数据结构
暂无简介
2025年04月03日
3 阅读
0 评论
0 点赞
2025-04-03
RabbitMQ工作模式(知乎)
RabbitMQ基本概念RabbitMQ 基础架构:Broker接收和分发消息的应用,RabbitMQ Server就是 Message BrokerVirtual host虚拟主机,出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等,每一个虚拟主机都有AMQP的全套基础组件,并且可以针对每个虚拟主机进行权限以及数据分配,并且不同虚拟主机之间是完全隔离的。Connection客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接。RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,提高性能。Channel客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。Exchange消息队列交换机,消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。交换机多用来与生产者打交道。生产者发送的消息通过Exchange交换机分配到各个不同的Queue队列上,而对于消息消费者来说,通常只需要关注自己的队列就可以了。Queue消息队列,队列是实际保存数据的最小单位。队列结构天生就具有FIFO的顺序。Producer消息生产者,即生产方客户端,生产方客户端将消息发送Consumer消息消费者,即消费方客户端,接收MQ转发的消息。消息发送者的固定步骤1.创建消息生产者producer,并制定生产者组名2.指定Nameserver地址3.启动producer4.创建消息对象,指定主题Topic、Tag和消息体5.发送消息6.关闭生产者producer消息消费者的固定步骤1.创建消费者Consumer,制定消费者组名2.指定Nameserver地址3.订阅主题Topic和Tag4.设置回调函数,处理消息5.启动消费者consumer编程模型引入依赖<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>创建连接获取ChannelConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setPort(HOST_PORT); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUAL_HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();声明Exchange(可选)channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;Exchange有四种类型: fanout、 topic 、headers 、direct声明queuechannel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);durable 表示是否持久化。Durable选项表示会将队列的消息写入硬盘,这样服务重启后这些消息就不会丢失。声明Exchange与Queue的绑定关系(可选)channel.queueBind(String queue, String exchange, String routingKey) throws IOException;声明了Exchange和Queue,那么就还需要声明Exchange与Queue的绑定关系Binding。有了这些Binding,Exchange才可以知道Producer发送过来的消息将要分发到哪些Queue上。这些Binding涉及到消息的不同分发逻辑,与Exchange和Queue一样,如果Broker上没有建立绑定关系,那么RabbitMQ会按照客户端的声明,创建这些绑定关系。发送消息channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;其中Exchange如果不需要,传个空字符串。props的这些配置项,可以用RabbitMQ中提供的一个Builder对象来构建。AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); //对应页面上的Properties部分,传入一些预定的参数值。 builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()); builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()); //builder.headers(headers);对应页面上的Headers部分。传入自定义的参数值 builder.build() AMQP.BasicProperties prop = builder.build();MessageProperties.PERSISTENT_TEXT_PLAIN是RabbitMQ提供的持久化消息的默认配置。消费消息被动消费模式Consumer等待rabbitMQ 服务器将message推送过来再消费。channel.basicConsume(String queue, boolean autoAck, Consumer callback);主动消费模式Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck);消费消息确认自动ACK:autoAck为true,消息一旦被接收,消费者自动发送ACK,如果消费失败了,后续也无法再消费了手动ACK:autoAck为false,消息接收后,不会发送ACK,需要手动调用 channel.basicAck 来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成消息丢失。释放资源channel.close(); conection.clouse();消息模型简单模式最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费。img在上图的模型中,有以下概念:P:生产者,也就是要发送消息的程序C:消费者:消息的接受者。queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。producer:channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));consumer:channel.queueDeclare(QUEUE_NAME, false, false, false, null);Work queues 工作队列模式Work Queues:与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。一个消息只会被一个消费者消费。一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));Consumer: 每次拉取一条消息。channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME, false, consumer);Publish/Subscribe 发布订阅exchange type是 fanout 。在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)C:消费者,消息的接收者Queue:消息队列,接收消息、缓存消息Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:Fanout:广播,将消息交给所有绑定到交换机的队列,交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。Direct:定向,把消息交给符合指定routing key 的队列Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。使用场景:所有消费者获得相同的消息,例如天气预报。生产者:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));消费者:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");发布订阅模式与工作队列模式的区别:工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机Rout 路由模式exchange typ 是 direct 。P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing keyX:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列C1:消费者,其所在队列指定了需要 routing key 为 error 的消息C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息路由模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。生产者:channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));消费者:channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueBind(queueName, EXCHANGE_NAME, routingKey1); channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); channel.basicConsume(queueName, true, consumer);Topics 通配符模式exchange type 是 topic红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配对routingKey进行了模糊匹配单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。Producer:channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));Receiver:channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(queueName, EXCHANGE_NAME, routingKey1); channel.queueBind(queueName, EXCHANGE_NAME, routingKey2); channel.basicConsume(queueName, true, consumer);发送消息确认发送的消息如果没有被消费者及时消费有可能会导致消息丢失。发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。channel.confirmSelect();使用异步确认消息保证消息在生产端不丢失。Producer在channel中注册监听器来对消息进行确认。核心代码:channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);ConfirmCallback,监听器接口,里面只有一个方法:void handle(long sequenceNumber, boolean multiple) throws IOException;这方法中的两个参数,sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,Producer怎么知道是哪一条消息成功或者失败呢?RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo();来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。没错!是的!需要客户端自己去做对应!multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。SpringBoot集成RabbitMQ添加依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置文件server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /rabbitMQ配置类配置Exchange、Queue、及绑定交换机,下面配置Topic交换机。package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitmqConfig */ @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; //声明交换机 @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //声明QUEUE_INFORM_EMAIL队列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } //声明QUEUE_INFORM_SMS队列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } //ROUTINGKEY_SMS队列绑定交换机,指定routingKey @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }发送消息String message = "hello world"; rabbitTemplate.convertAndSend(RabbitmqTopicConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message);消费消息消费者都是通过@RabbitListener注解来声明。在@RabbitMQListener注解中包含了非常多对Queue进行定制的属性,大部分的属性都是有默认值的。//监听email队列 @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_EMAIL}) public void receive_email(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_EMAIL msg"+msg); } //监听sms队列 @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_SMS}) public void receive_sms(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_SMS msg"+msg); }
2025年04月03日
3 阅读
0 评论
0 点赞
2025-04-03
订阅模式 代码
一 . 简单整合添加RabbitMQ依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>配置application.ymlspring: rabbitmq: host: 127.0.0.1 port: 5672 username: test password: 123456virtual-host: /新建RabbitMQConfigpackage com.example.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { // 测试队列名称 private String testQueueName = test_queue; // 测试交换机名称 private String testExchangeName = test_exchange; // RoutingKey private String testRoutingKey = test_routing_key; /** 创建队列 */ @Bean public Queue testQueue() { return new Queue(testQueueName); } /** 创建交换机 */ @Bean public TopicExchange testExchange() { return new TopicExchange(testExchangeName); } /** 通过routingKey把队列与交换机绑定起来 */ @Bean public Binding testBinding() { return BindingBuilder.bind(testQueue()).to(testExchange()).with(testRoutingKey); } }新建生产者(producer)package com.example.producer; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class TestProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String queueName) { JSONObject jsonObject = new JSONObject(); jsonObject.put(email, 756840349@qq.com); jsonObject.put(timestamp, System.currentTimeMillis()); String jsonString = jsonObject.toJSONString(); // 生产者发送消息的时候需要设置消息id Message message = MessageBuilder.withBody(jsonString.getBytes()) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(utf-8) .build(); rabbitTemplate.convertAndSend(queueName, message); }}新建消费者(consumer)package com.example.listener; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutSmsConsumer { @RabbitListener(queues = test_queue) public void consumeMessage(Message message) throws Exception{ String msg = new String(message.getBody(), UTF-8); JSONObject jsonObject = JSONObject.parseObject(msg); System.out.println(消费消息: + jsonObject); } }编写controller测试package com.example.controller; import com.example.producer.FanoutProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private TestProducer TestProducer; @RequestMapping(/sendMsg) public String sendFanout() { fanoutProducer.send(test_queue); return success; } }浏览器访问localhost:8080/sendMsg,会返回success,并且控制台监听器会打印消息:
2025年04月03日
0 阅读
0 评论
0 点赞
1
2
3
4