初识Redis

Redis

一、Redis为什么快

  1. (内存操作)因为Redis将所有的数据存储在内存中,可以快速读取和写入数据,而不需要像传统数据需要从磁盘读取数据。

  2. (数据结构简单)Redis支持多种数据结构,包括字符串,哈希表、列表,集合和有序集合等。

  3. (单线程模型)Redis使采用了单线程的模型,避免了多线程并发带来的竞争和线程切换的开销,提高性能。
  4. (非阻塞I/O)Redis使用非阻塞I/O模型,可以等待I/O操作完成时处理其他请求,提高了并发的处理能力。
  5. (持久化机制)Redis支持两种持久化的操作,RDB和AOF,可以在服务器崩溃时快速的恢复数据。
  6. (高效的网络通信)Redis使用自己的协议进行网络通信,协议简单,高效,可以快速的传输数据。
  7. (原子操作)Redis支持原子操作,可以确保多个操作的原子性,避免了并大的数据的不一致性的问题。

Redis之所以快是因为它采用了内存存储和非阻塞的I/O模型,避免了磁盘IO的延迟;同时,Redis使用了IO多路复用技术,通过一个线程同时处理多个客户端请求,减少了线程切换的开销,提高了并发处理能力。

二、Redis可以用来做什么

  1. 缓存:Redis可以用作高性能的缓存存储,将热门数据存储在内存中,以提供快速的访问速度,减轻数据库的负载。

  2. 数据库:Redis支持多种数据结构,包括字符串、哈希表、列表、集合和有序集合等,可以用作简单的键值存储或者更复杂的数据结构存储。

  3. 消息队列:Redis的发布/订阅功能可以用于构建消息队列系统,实现异步消息传递和解耦。

  4. 分布式锁:Redis提供了原子操作和事务支持,可以用来实现分布式锁,保证多个客户端之间的互斥访问。

  5. 计数器和排行榜:Redis的原子操作可以用于实现计数器和排行榜功能,例如统计网站的访问量或者排名。

  6. 地理位置服务:Redis的地理位置功能可以用来存储地理位置信息,并进行附近位置的搜索和查询。

  7. 实时数据分析:Redis支持高速的数据读写,可以用于实时数据分析和处理,例如实时统计和监控。

三、Redis的持久化操作

在Redis 4.0之后提供了混合持久化的方式,顾名思义就是把RDB持久化和AOF持久化结合起来的一种方式。混合持久化就是快照以一定的频率执行,而在两次快照之间,使用 AOF 日志记录这期间的所有命令操作。

RDB持久化

RDB又被称为快照模式,也是默认使用的策略。RDB通过把内存里的数据保存到磁盘文件上;当需要数据的时候,直接读取文件把数据回复到内存里来进行持久化的

RDB在什么时候进行持久化:

  • 使用save命令时进行持久化,这是一种阻塞式的持久化,在RDB过程中不能去处理客户端发送来的请求。
  • 使用bgsave命令进行持久化,这是一种非阻塞式的持久化,它会fork一个子进程,让子进程去进行持久化,主进程去处理客户端发送的请求,在子进程fork期间,会把物理内存里的数据设置为read-only只读。主进程如果要进行写数据则进行copy-on-write机制,把数据拷贝一个副本,实际操作的是副本。
  • 正常关闭Redis时会进持久化;会在关闭之前先持久化保存
  • 自动触发RDB持久化save 3600 1、save 300 100、save 60 10000;数据变化的越多,保存频率就越高

RDB持久化还可以对文件进行压缩,所以文件体积更小。

AOF持久化

AOF被称为日志模式,需要手动开起,AOF在执行写操作的时侯会把操作命令追加到磁盘文件上,当需要恢复数据时,读取文件里的命令会依次执行,相当于重构数据库。

AOF什么时候进行持久化:

三种刷盘机制:

always:同步刷盘;每次进行写操作的命令的时候,都会把数据写道磁盘文件,这种是最安全的但是影响性能。

no:不主动刷盘;每次进行写操作的命令的时候,会放到AOF缓冲区;由操作系统决定什么时候进行刷盘,这种是性能最好但是安全性较低,可能丢失数据。

everySec:**每秒刷盘**;每次进行写操作的命令的时候,会放到AOF缓冲区;每秒执行一次刷盘操作。性能适中,也是默认的策略。

AOF文件体积大:AOF重写
手动重写:执行命令bgrewriteaof
自动重写:默认当文件体积达到64mb,并且体积增加100%,会自动重写

RDB和AOF的对比:

  • 存储内容:RDB存储的是数据;AOF存储的是写操作的命令
  • 文件体积:RDB文件体积小;AOF文件体积大
  • 恢复速度:RDB恢复速度快;AOF恢复速度慢
  • 持久保存速度:一次RDB,比一次AOF 占用资源更多
  • 数据安全:RDB可能丢失数据;AOF丢失数据的可能性更小
  • 最终的选择:如果对数据安全性要求不高,使用RDB;否则使用AOFLLLL

四、Redis常用的数据类型

数据类型特征使用场景常用命令
String存储单个值缓存、计数器、键值存储Set key value、Get key
Hash字段-值对的无序散列存储对象、缓存、计数器Hash key filed value、HGETALL key
List有序、可重复的字符串集合消息队列、发布/订阅系统LPUSH key value、LRANG key start stop
Set无序、不可重复标签系统、好友关系等SADD key member、SMEMBERS key
Sorted Set有序的字符串集合,每个成员关联一个分数排行榜、按照分数范围获取成员ZADD key score member、ZRANGE key start stop 、ZREVRANGE key start stop
  • String:存储单个值,适用于键值的存储,常用的命令:Set 用于设置值,Get用于获取值。
  • Hash: 字段-值对的无序散列,适用于存储对象、缓存和计数器,常用命令:HSET 用于设值字段的值,HGETALL 用于获取散列所有的字段和值。
  • List:有序、可重复的字符串集合,适用于消息队列和发布/订阅系统,常用命令:LPUSH用于从列表左侧添加元素,LRANGE用于获取指定范围的元素。
  • Set: 无序、不可重复的字符串集合,适用于标签系统和好友关系等,常用命令:SADD用于向集合添加成员,SMEMBERS用于获取集合所有成员。
  • Sorted Set:有序的字符串集合,每个成员关联一个分数,适用于排行榜和按分数范围获取成员,常用命令:ZADD用于添加成员及其分数,ZRANGE用于获取指定范围的成员。

五、Redis的缓存机制(俗称:三兄弟)

缓存穿透:缓存穿透是指请求缓存中不存在的数据,导致所有请求都直接访问数据库,同样给数据库带来巨大的压力。

解决方案包括:

  • 对于查询结果为空的数据,也将其缓存起来,设置一个较短的过期时间,避免频繁查询数据库。
  • 使用布隆过滤器等技术,对请求进行过滤,过滤掉不存在的数据。

缓存击穿:缓存击穿是指某个热点数据失效后,大量并发请求同时访问数据库,导致数据库压力过大。

解决方案包括:

  • 使用互斥锁或分布式锁,只允许一个请求访问数据库,其他请求等待结果。
  • 使用缓存预热技术,在缓存失效之前主动加载数据到缓存中,避免缓存击穿。

缓存雪崩: 缓存雪崩是指缓存中的大量数据同时失效或者缓存服务器宕机,导致所有请求都直接访问数据库,给数据库带来巨大的压力,甚至导致数据库崩溃。

解决方案包括:

  • 设置合理的缓存过期时间,避免大量缓存同时失效。
  • 使用多级缓存架构,如本地缓存和分布式缓存结合使用,提高缓存的可靠性。
  • 实现热点数据的热加载,提前将热点数据加载到缓存中。

六、Redis的哨兵集群(Redis Sentinel)

Redis的哨兵(Sentinel)机制是一种用于监控和管理Redis实例的高可用性解决方案。它由一组独立运行的Redis哨兵节点组成,用于监控主节点和从节点的状态,并在主节点故障或其他变化时进行自动切换和故障恢复。

哨兵模式:
解决的问题:主从集群不能自动进行故障恢复,一旦Master节点宕机,就只能人工干预
哨兵的职责:
监控: 监控主从集群里所有节点的状态,通过心跳机制进行监控
哨兵会默认每秒向所有的节点发心跳,如果一个哨兵向某个节点的心跳,就会认为节点是“主观下线”
如果超过指定数量的哨兵,都认为一个节点是“主观下线”,就认为这个节点是“客观下线”
故障恢复:重新选举一个节点成为Master,承担写操作的职责
如果某个Slave与原Master断开连接的时间超过指定时间,就直接剔除选举的资格
哪个Slave的priority值越小,优先级越高。如果值是0,说明不参与选举
哪个Slave的offset值越大,优先级越高。
哪个Slave的replid越小,优先级越高

使用哨兵机制可以提高Redis的高可用性和可靠性,确保Redis集群在节点故障或其他变化时能够自动进行故障转移和恢复。客户端可以通过连接到哨兵节点,获取正确的Redis节点信息,并保持与Redis集群的稳定连接。

需要注意的是,哨兵机制并不是分布式存储方案,它只实现了高可用性和故障转移,不提供数据分片的功能。如果需要实现数据分片和水平扩展,可以使用Redis集群(Redis Cluster)来实现。

Redis的哨兵集群主要用于实现高可用性,监控Redis主、从节点的状态变化,并在主节点失效时自动将从节点升级为主节点。哨兵集群由多个哨兵节点组成,工作原理是哨兵节点通过相互通信,监测主节点的健康状态,当主节点失效时,选举新的主节点,并通知其他从节点进行切换,确保系统的可用性。

七、Redis的分片集群(Redis Cluster)

Redis分片集群是一种将数据分布到多个节点的解决方案,用于实现数据的水平扩展和提高系统的吞吐量。

分片集群:主从+哨兵,还存在问题:

高并发的问题。主从集群里只有一个 Master能够提供写的服务,写操作的并发是有限的,海量的数据存储的问题,主从集群所有的节点的数据是相同的,如果数据量过多就得使用分片集群来解决这个问题

分片集群的特点:

可以有多个Master,每个Master存储的数据不同,增加了数据存储量。每个Master可以有多个Slave。

Master之间互相心跳来实现健康监测,进行自动故障恢复。连接任意一个Master时,如果要存储的数据不在这个节点上,会帮我们重定向到正确的节点上再操作。

数据存储的方式:散列插槽(hash插槽)

哈希槽是一个固定数量的槽,总共槽位数量16384个,每个数据被映射到其中一个槽上。每个节点负责管理一部分槽和相应的数据。

当需要存取数据时:会根据key计算Hash值,如果key中包含{},就根据{}里内容进行CRC16算法的计算,得到hash值。

如果key里没有{},就根据整个key的内容进行CRC16算法的计算,得到hash值。判断一下归哪个Master管理,就重定向到哪个Master上,在对应的节点上执行操作

集群的伸缩:

因为所有数据都和节点不直接绑定,而是与hash插槽绑定的。所以如果增加新节点:只要把一部分hash插槽转换到新节点上即可,如果想要减少节点:先把节点上的hash插槽转移出来,再把节点从集群里剔除即可。

故障恢复:

自动故障恢复:某个Master节点宕机,整个集群会自动从它的Slave里挑选一个节点成为Master

手动故障恢复:执行命令 cluster failover,就可以把这个节点设置为Master节点

读写数据时是如何定位哈希槽?

在Redis分片集群中,确切的说法是使用CRC16算法计算Key的哈希值,而不是定位哈希槽。

CRC16算法(循环冗余校验算法)是一种广泛应用于网络通信和数据校验领域的哈希算法,用于计算Key的哈希值。

举例说明:

假设有一个包含三个节点的Redis分片集群,节点A、节点B和节点C。每个节点负责存储哈希槽的范围如下:

- 节点A:0-5499

- 节点B:5500-10999

- 节点C:11000-16383

现在有一个Key为”example_key”需要进行操作。

  1. 使用CRC16算法计算”example_key”的哈希值,例如为12345。

  2. 将哈希值对16384(2^14)求余,得到计算结果为12345 % 16384 = 12345。

  3. 根据计算结果12345,确定该数据应该存储在节点C的哈希槽11000-16383范围内。

  4. 客户端将数据写入节点C,或者从节点C读取相关数据。

通过CRC16算法计算Key的哈希值,可以得到一个范围在0到16383的整数值,然后根据这个哈希值确定数据所对应的哈希槽范围,进而确定数据的存储位置。

总结:Redis的分片集群主要用于实现数据的横向扩展,将数据分散存储在多个节点上,提高系统的并发能力和存储能力。分片集群由多个节点组成,根据Key经过哈希算法映射到不同的节点上,每个节点负责存储和处理一部分数据,工作原理是通过一致性哈希算法将数据按照一定规则分配到不同的节点上,实现数据的均衡存储和查询。

八、Redis分布式锁命令使用

可以使用分布式锁来实现多个客户端之间的协调和同步,以避免并发操作引起的数据冲突。下面是使用Redis实现分布式锁的一般步骤:

1.获取锁:

  • 使用SET命令尝试在指定的键上设置一个唯一的值作为锁。
  • 设置键的超时时间,以防止获取锁后出现异常导致锁一直被占用(避免死锁)。

SET命令常用的选项是:

  • NX:仅在键不存在时设置键的值,用于确保只有一个客户端能够成功设置锁。
  • PX:设置键的过期时间,用于防止锁被长时间占用。

注意:SETNX命令也可以获取锁,但还得执行其他命令设置过期时间,不如SET一条命令搞定:
例如,SET lock_key value PX 10000 NX 会将名为lock_key的锁设置为在10秒后过期。

2.释放锁:

  • 使用DEL命令删除键来释放锁。
  • 确保只有锁的持有者能够释放锁,可通过比较锁的值来进行验证。

示例演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class RedisLock {

private static Long stock = 1L;

public static void placeOrder() throws InterruptedException {
if (stock > 0){
Thread.sleep(100);
stock--;
System.out.println(Thread.currentThread().getName() + "秒杀成功");
}else {
System.out.println(Thread.currentThread().getName() + "秒杀失败!存不足");
}
}

public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
try {
placeOrder();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}

结果会导致:请看下图( ̄︶ ̄)↗ 

错了!为什么会这样呢?你猜!

  1. 线程不安全:多个线程同时访问placeOrder()方法时,可能会出现竞态条件,导致库存数量不正确。例如,多个线程同时判断stock > 0为true,然后都进入if语句块,最终导致库存减少了超过1个。

  2. 缺乏同步机制:没有使用任何同步机制来保证多个线程之间的互斥访问。因此,可能会出现多个线程同时修改库存的情况。

  3. 缺乏线程安全的库存操作:在stock--操作中,没有使用任何同步机制来保证原子性。多个线程可能同时执行stock--操作,导致库存减少超过1个。

为了解决这些问题,李华又进行了改造,请看下回讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RedisLock {

private static Long stock = 1L;

public static void placeOrder() throws InterruptedException {
//加入同步锁
synchronized (stock){
if (stock > 0){
Thread.sleep(100);
stock--;
System.out.println(Thread.currentThread().getName() + "秒杀成功");
}else {
System.out.println(Thread.currentThread().getName() + "秒杀失败!存不足");
}
}
}

public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
try {
placeOrder();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}

结果会导致:请看下图( ̄︶ ̄)↗ 

哦~成功了,加了同步锁,可以保证安全了,但是随着用户量的提升,服务器的压力越来越大,你会怎么样怎么解决呢?

对!!!正是你想的那样可以通过Nginx的负载均衡将服务器进行水平扩展通过Nginx进行分布式集群部署,吞吐量确实上去了。

你以为解决了?错了,你会发现尽然又出现超卖了。

为什么会这样呢?其实是同步锁的问题,因为同步锁是JVM级别的,它只能锁住单个线程,但是经过分布式服务后,只能锁住一个线程。

可以同过分布式锁进行结局:常见的分布式所有Redis和zookeeper,因为我之前署使用了Redis,可以采用Redis实现分布式锁。

通过Redis的setNX实现分布式锁:当A线程进来通过setNX的往某个键存储值,在这个键没有值的时候会返回一个true,当B线程进来的时候,会发现这个键已经存在值了,会返回一个false,通过这个特性就可以实现分布式锁,在此需要注意:

通过setNX进行上锁的时候需要设置一个过期时间,如果不设置会怎么样呢?

不设置用户在请求的时候这台服务器挂掉了,其他的服务器在发起请求时会发现一个阻塞的情况,因为其他服务器通过setNX进行上锁的时候会发现这键会一直存在值。这样就会造成死锁

虽然这个问题解决了,但是其中还隐藏一些隐患,就是当处理这个业务的时间超过了这个锁的过期的时间,当这个锁失效以后,其他的线程会”趁虚而入”,当线程A处理完之后释放的可能是线程B的锁,那么其他的线程会在线程B的锁失效候继续”趁虚而入”,以此类推,又会出现超卖。

上述情况中存在的两个问题:

  1. 当锁过期时线程还在处理业务
  2. 当处理完业务后释放的是其他线程的锁。

解决方案:

  1. 加长时间,并添加子线程每十秒确认线程是否在线,在线则将过期时间进行重置。
  2. 给锁加唯一的ID(UUID)

此时Redisson出现了

这里会有一个面试题:如果使用了主从的集群模式,如果你主节点挂掉了会发生什么,怎么解决?

因为Redis是一个AP模式,只保证了高可用和高性能,并不能保证高一致性,当设置锁的时候,它只会往一个节点设置一个锁,设置完后立马返回并告诉你设置成功,内部会进行一个同步,此时如果你的主节点挂了,从节点没有进行同步,没办法获得该锁,依然会出现超卖的现象。

可以通过Redisson提供的RedLock来解决,RedLock会同步Redis当中所有的主从节点,它在保证所有的节点都存储完毕才会给你进行相应,这样就保证了强一致性。

九、 Redis(或ElasticSearch)和MySQL如何保持数据一致性

Redis(或ElasticSearch)和MySQL如何保持数据一致性,以及它们的具体实现和优缺点如下:

  1. 同步双写方案:
  • 实现方式:在应用程序中,将数据同时写入Redis(或ElasticSearch)和MySQL,确保两个写操作要么同时成功,要么同时失败。
  • 优点:简单直接,实时性较高,不依赖额外组件。
  • 缺点:增加了系统复杂性,需要处理并发写冲突,可能导致写入延迟增加。
  1. 异步队列方案
  • 实现方式:将数据更新操作以消息的方式发送到消息队列中,并在消费者程序中分别更新Redis(或ElasticSearch)和MySQL。
  • 优点:实现了数据解耦和异步处理,提高了系统的可伸缩性和稳定性。
  • 缺点:增加了系统复杂性,引入了消息队列系统,可能会有一定的延迟,需要确保消息消费的可靠性。
  1. Canal方案:
  • 实现方式:使用Canal工具监听MySQL的binlog,将数据变更事件发送到Redis(或ElasticSearch)来更新数据。
  • 优点:实时性较高,不需要修改应用程序代码,可以实现MySQL的逻辑解耦和数据的多重同步。
  • 缺点:增加了系统复杂性,需要额外的Canal工具和客户端程序来处理数据同步,可能额外占用资源。

双写模式简单直接,适用于数据实时性要求较高的场景。异步队列适用于要求解耦和异步处理的场景,但可能增加了系统复杂性和引入延迟。Canal方案适用于希望实现MySQL的逻辑解耦和多重数据同步的场景,但需要额外维护Canal工具和客户端程序。

Elasticsearch和MySQL的数据一致性可以通过以下三种方案实现:

  1. 双写:每次写入操作同时将数据写入Elasticsearch和MySQL,确保数据一致性,但可能增加写延迟和复杂性。

  2. 异步队列:将写入操作请求放入队列中,后台任务异步地将数据写入Elasticsearch和MySQL,提高写入性能,但可能导致一定的数据不一致性。

  3. Canal方案:使用Canal工具订阅MySQL的binlog日志,实时将数据同步到Elasticsearch,实现数据的实时增量同步,但需要额外的工具和配置。

十、Redisson是什么,怎么用

Redisson是一个用于Java的简单易用的Redis客户端,它封装了常见的分布式操作和并发控制的功能,提供了丰富的API和功能,使得开发人员可以轻松地与Redis进行交互。

在Spring Boot中使用Redisson,需要进行以下步骤:

  1. 添加Redisson的依赖:在项目的构建文件(如pom.xml)中添加Redisson的依赖。可以通过Maven引入相应的依赖。
  1. 配置Redisson连接信息:在Spring Boot的配置文件(如application.properties或application.yml)中配置Redisson的连接信息,包括Redis的主机地址、端口号、密码等。
  1. 创建RedissonClient对象:可以通过自动注入或手动创建RedissonClient对象。如果使用自动注入,可以在配置类中添加@Bean注解将RedissonClient注入为Spring的Bean;如果手动创建,可以在需要的地方创建RedissonClient对象。
  1. 使用Redisson功能:通过RedissonClient对象,可以使用Redisson提供的各种功能,如分布式锁、分布式集合、分布式队列等。根据具体需求,调用相应的方法来操作Redis。

例如,使用Redisson的分布式锁,可以注入RedissonClient对象后调用getLock方法来获取锁对象,然后使用lock方法加锁,并在需要时执行相应的操作,最后使用unlock方法释放锁。

总结:Redisson是一个Java的Redis客户端,提供丰富的API和功能,用于封装分布式操作和并发控制。在Spring Boot中使用Redisson,首先添加Redisson的依赖,然后在配置文件中配置Redisson连接信息,接着通过@Autowired注解或手动创建RedissonClient对象。最后,利用RedissonClient对象可以使用各种功能,如分布式锁、分布式集合等,与Redis进行交互。

十一、Redisson看门狗机制的原理

  • Redisson的看门狗机制是为了解决分布式环境下使用分布式锁时的问题。它通过周期性地对锁进行心跳续期,保证在业务执行期间锁不会被自动释放,防止因执行时间过长或节点宕机而导致锁提前释放的情况发生。
  • 实现原理是在获取锁时,Redisson会使用一个独立的线程启动一个定时任务,定时更新锁的过期时间。同时,每个Redisson实例都会在Redis中生成一个唯一ID作为锁的标识,以避免其他实例错误地释放锁。
  • 看门狗机制的目的是防止锁过期时间超时,而业务执行仍在进行中的情况发生。通过定时地续期锁的过期时间,确保锁在业务执行期间一直有效。这样可以避免获得锁的实例因为处理时间过长而导致锁过期被其他实例获取,保证分布式环境下的锁的可靠性。
  • 续期与看门狗机制相关,通过定时续期,锁的过期时间会在业务执行期间不断更新,从而避免锁过期。一旦锁的续期失败,即续期任务运行失败或锁的标识不匹配,Redisson会立即释放该锁,以避免业务不再持有锁而导致的问题。
  • 总结来说,Redisson的看门狗机制通过定时续期锁的过期时间,保证在业务执行期间锁不会被自动释放,解决了在分布式环境下使用分布式锁时锁过期的问题,提高了锁的可靠性和使用效果。

总结:Redisson的看门狗机制通过定时续期锁的过期时间,保证在业务执行期间锁不会被自动释放。它解决了分布式环境下锁过期导致的资源竞争问题,确保业务能够完成。续期是看门狗机制的核心,它通过定时更新锁的过期时间来实现锁的持久性,以防止锁过期并被其他实例获得。

注:Java如何使用Redis

第一步、引入Jedis依赖

1
2
3
4
5
6
7
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>

示例:写一个测试类

1
2
3
4
5
6
7
8
9
public class RedisDemo {
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("192.168.200.128",6379);//创建连接池
Jedis jedis = jedisPool.getResource();//从连接池获取
String ping = jedis.ping();//测试链接
System.out.println(ping);//如果成功连接上了Redis服务,此处将会输出PONG
jedis.close();//释放资源,进行归还线程
}
}

示例二:

1
2
3
4
5
6
7
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool();
Jedis jedis = jedisPool.getResource();
jedis.set("token", UUID.randomUUID().toString());
System.out.println("token" + jedis.get("token"));
jedis.close();
}

假设当你在与Redis交互的时候,发生了异常,不论是你自身业务的异常,或者Redis服务器宕机了。这时候程序抛出异常,无法执行到归还连接的代码。久而久之,连接池中的连接数都被占用了,没有归还。当还有新的请求进来,而又没有连接可以使用了,这时候程序就会阻塞,直至卡死。

为了避免这种情况的发生,最直接的想法那便是加上try-catch-final,将jedis.close()释放资源的方法放到final的语句块内,这样子便可以在发生异常的情况下保证Jedis连接的归还。

下面演示了Java8的try-with-resource语法,它与try-catch-final语法相比较更简洁,但本质上其实是语法糖,实际在解析成字节码过程中,它依然会被还原成try-catch-final的语法

1
2
3
4
5
6
7
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool();
try (Jedis jedis = jedisPool.getResource()){
//Redis相关业务...
jedis.set("name","Johnny");
}
}

虽然这样使用Redis,避免了连接数耗光的风险,但是在所有需要使用Redis的地方都要加上这样的语法限制,显然这存在着大量的重复,同时你需要依靠人为的规范限制来保护程序并不太靠谱。考虑当你的开发团队进来一个新人,可能就会在某个地方忘记使用try-with-resource,而这样就存在问题的隐患。

我们需要从程序上就形成使用规范上的限制。

规范的使用姿势

我们需要一个接口和自己封装的连接池使用方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//定义一个接口
public interface CallJedis {
void call(Jedis jedis);
}

//自己封装一个可以安全使用的连接池
public class MyRedisPool {
private JedisPool jedisPool;
//实例化连接池
public MyRedisPool() {
this.jedisPool = new JedisPool();
}
//获取Redis连接资源,并确保在使用后归还
public void execute(CallJedis caller){
try(Jedis jedis = jedisPool.getResource()){
caller.call(jedis);
}
}
}

//这样子我们就可以安全的使用Redis啦
public static void main(String[] args) {
//实例化连接池
MyRedisPool myRedisPool = new MyRedisPool();
//获取Redis连接资源,并确保在使用后归还
myRedisPool.execute(new CallJedis() {
@Override
public void call(Jedis jedis) {
//执行Redis相关业务...
jedis.set("name","Johnny");
System.out.println(jedis.get("name"));
}
});
}
这样子,我们通过自己封装的Jedis连接池来获取并归还连接,避免了自己获取连接然后忘记归还的情况。但是每次使用都需要提供一个回调类来执行Redis代码,略显麻烦。这同样可以使用Java8提供的新特性Lambda表达式来简化代码,如下

public static void main(String[] args) {
MyRedisPool myRedisPool = new MyRedisPool();
myRedisPool.execute(jedis -> {
//Redis相关业务...
jedis.set("name","Johnny");
Systrem.out.println(jedis.get("name"));
});
}

操作String数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void main(String[] args) {
//连接本地的 Redis 服务
Jedis jedis = new Jedis("127.0.0.1", 6379);
String response = jedis.ping();
System.out.println(response); // PONG

//删除当前选择数据库中的所有key
System.out.println("删除当前选择数据库中的所有key:" + jedis.flushDB());

//设置 redis 字符串数据
//新增<'name','yixin'>的键值对
jedis.set("name","huhu");
// 获取存储的数据并输出
System.out.println("redis 存储的字符串为: "+ jedis.get("name"));

//判断某个键是否存在
System.out.println("redis 存储的字符串是否存在:" + jedis.exists("name"));

//系统中所有的键
Set<String> keys = jedis.keys("*");
System.out.println(keys);

//按索引查询
System.out.println("按索引查询:"+ jedis.select(0));

//查看键name所存储的值的类型
System.out.println(jedis.type("name"));

// 随机返回key空间的一个
System.out.println(jedis.randomKey());

//重命名key
System.out.println(jedis.rename("name","username"));
System.out.println("取出改后的name:" + jedis.get("username"));

//删除键username
System.out.println("删除username:" + jedis.del("username"));

//删除当前选择数据库中的所有key
System.out.println("删除当前选择数据库中的所有key:" + jedis.flushDB());

//查看当前库中所有的key的数目
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());

//删除数据库中的所有key
//System.out.println("删除数据库中的所有key:" + jedis.flushAll());
}

操作List数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
//连接本地的 Redis 服务
Jedis jedis = new Jedis("127.0.0.1", 6379);
String response = jedis.ping();
System.out.println(response); // PONG

System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());

//List实例
//存储数据到列表中
jedis.lpush("list", "num1");
jedis.lpush("list", "num2");
jedis.lpush("list", "num3");

// 获取存储的数据并输出
List<String> list = jedis.lrange("list", 0 ,-1);
for(int i=0; i<list.size(); i++) {
System.out.println("列表项为: "+list.get(i));
}
}

操作事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
//连接本地的 Redis 服务
Jedis jedis = new Jedis("127.0.0.1", 6379);
String response = jedis.ping();
System.out.println(response); // PONG

//事务测试
jedis.flushDB();
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello","world");
jsonObject.put("name","yixin");

//开启事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
// jedis.watch(result)
try {
multi.set("user1", result);
multi.set("user2", result);
int i = 1 / 0; // 代码抛出异常事务,执行失败!
multi.exec(); // 执行事务!

}catch (Exception e){
multi.discard();// 放弃事务
e.printStackTrace();
}finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close();
}
}

二、SpringBoot集成Redis

介绍

这次我们并不使用jedis来进行连接,而是使用lettuce来进行连接,jedis和lettuce的对比如下:

jedis:采用的直连,多个线程操作的话,是不安全的;想要避免不安全,使用jedis pool连接池。更像BIO模式

lettuce:采用netty,实例可以在多个线程中共享,不存在线程不安全的情况;可以减少线程数量。更像NIO模式

2.1 创建SpringBoot 项目

2.1.1导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.2 编写配置文件

application.properties:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#配置redis
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

application.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring:
redis:
# redis数据库索引(默认为0),我们使用索引为3的数据库,避免和其他数据库冲突
database: 0
# redis服务器地址(默认为loaclhost)
host: loaclhost
# redis端口(默认为6379)
port: 6379
# redis访问密码(默认为空)
# password:
# redis连接超时时间(单位毫秒)
timeout: 0
# redis连接池配置
pool:
# 最大可用连接数(默认为8,负数表示无限)
max-active: 8
# 最大空闲连接数(默认为8,负数表示无限)
max-idle: 8
# 最小空闲连接数(默认为0,该值只有为正数才有用)
min-idle: 0
# 从连接池中获取连接最大等待时间(默认为-1,单位为毫秒,负数表示无限)
max-wait: -1

3.1编写测试

1
2
3
4
5
6
7
8
@Autowired
private RedisTemplate redisTemplate;

@Test
void contextLoads() {
redisTemplate.opsForValue().set("name","huhu");
System.out.println(redisTemplate.opsForValue().get("name"));
}

在这种连接方式中,redisTemplate操作着不同的数据类型,api和我们的指令是一样的。

opsForValue:操作字符串 类似String

opsForList:操作List 类似List

opsForSet:操作Set,类似Set

opsForHash:操作Hash

opsForZSet:操作ZSet

opsForGeo:操作Geospatial

opsForHyperLogLog:操作HyperLogLog

除了基本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务,和基本的CRUD。

3.2 保存对象

(1)编写实体类

注意:要实现序列号Serializable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.itheima.pojo;

import java.io.Serializable;

public class User implements Serializable {

private String name;
private int age;

public User(){

}
public User(String name, int age) {
this.name = name;
this.age = age;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}

}

(2)编写RedsTemplate配置

Tip:在开发当中,我们可以直接把这个模板拿去使用。**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class RedisConfig {

@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

//为了自己开发方便,一般直接使用 <String, Object>
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);

// Json序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);

// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();

return template;

}

}

存储对象

1
2
3
4
5
6
7
@Test
public void contextLoads() {
User user=new User("huhu",18);
redisTemplate.opsForValue().set("user",user);
System.out.println(redisTemplate.opsForValue().get("user"));

}

解决以下两个问题:

  1. 如何增加Redis重试机制,有时候当网络波动时,并不是每条Redis指令都能确保执行成功。当遇到执行失败的Redis命令,如何让他再执行一次呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
在Java中可以通过使用Redisson库来增加Redis重试机制。Redisson是一个基于Redis的Java驻留内存数据网格(In-Memory Data Grid)和分布式锁服务。它提供了一种简单的方式来处理Redis命令的重试。

下面是一个使用Redisson实现Redis重试机制的示例代码:

import org.redisson.Redisson;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

public class RedisRetryExample {
public static void main(String[] args) {
// 创建Redisson客户端
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

// 获取Redis Map对象
RMap<String, String> map = redisson.getMap("myMap");

// 设置重试次数
int maxRetries = 3;

// 执行Redis命令
boolean success = false;
for (int i = 0; i < maxRetries; i++) {
try {
RFuture<Boolean> future = map.fastPutAsync("key", "value");
success = future.await();
if (success) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}

// 关闭Redisson客户端
redisson.shutdown();

// 处理执行结果
if (success) {
System.out.println("Redis命令已成功执行");
} else {
System.out.println("重试后 Redis 命令失败");
}
}
}


在上述示例中,我们使用Redisson创建了一个RedissonClient对象,并通过它来执行Redis命令。在执行命令时,我们使用了`fastPutAsync`方法来异步执行Redis的`SET`命令,并使用`await`方法等待命令执行结果。

如果命令执行成功,则跳出循环;如果命令执行失败,则继续进行重试。在重试过程中,可以根据实际需求进行异常处理或者记录日志。最后,根据重试结果来处理执行成功或失败的情况。

需要注意的是,重试机制并不能保证100%的命令执行成功,但可以提高命令执行成功的概率。在实际应用中,可以根据具体情况来设置重试次数和重试间隔,以达到最佳的重试效果。
  1. 我们知道Lambda表达式内是闭包的,而这意味着无法在里面修改闭包外面的变量,假设当我们需要在闭包外获取Redis中某个List的长度,而当我们在闭包内执行完Redis获取List长度的命令后,并无法将这个变量传递到闭包外,这时候该怎么办?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
在Java中,Lambda表达式内是可以访问外部的final或effectively final变量的,但是无法修改它们的值。如果需要在闭包外部获取Redis中某个List的长度,可以使用一个包装类或者数组来存储这个长度值,并将它声明为final或effectively final变量。

例如,可以创建一个长度为1的数组来存储List的长度值:

final int[] length = new int[1];

// 在闭包内获取Redis中List的长度,并将结果存储到数组中
redisTemplate.execute((RedisCallback<Object>) connection -> {
length[0] = connection.lLen("listKey");
return null;
});

// 在闭包外部可以访问length[0]获取List的长度
int listLength = length[0];

另外,如果使用的是Java 8及以上版本,也可以使用AtomicInteger类来存储长度值,它提供了原子操作保证线程安全:

AtomicInteger length = new AtomicInteger();

// 在闭包内获取Redis中List的长度,并将结果存储到AtomicInteger中
redisTemplate.execute((RedisCallback<Object>) connection -> {
length.set(connection.lLen("listKey").intValue());
return null;
});

// 在闭包外部可以通过length.get()获取List的长度
int listLength = length.get();

通过以上方式,可以在闭包外部获取到Redis中List的长度值。