学一学Redis...

【Redis】初探Redis

前言

很早之前写的文章,最近考虑到面试可能涉及到Redis,所以拿出来再看一遍

Redis概述

Redis是啥?

Redis是Remote Dicitionary Server的缩写,翻译过来就叫做远程字典服务

是开源的、使用C完成的、支持网路、基于内存、可持久化的日志型,键值对的数据库(NoSql)

目前github上有开源的由go语言实现的goredis

Redis的作用

  1. 内存存储、持久化(内存中的数据如果遇到断电就直接丢失了,所以持久化很重要)
  2. 效率高,可以用于高速缓存
  3. 发布订阅系统、地图信息分析....

特性

  1. 多样的数据类型
  2. 持久化
  3. 集群
  4. 事务

Redis安装

我直接在vps上用docker安装的

用的docker-compose

yml文件如下:

version: '2.1'
services:
    redis:
       image: redis
       hostname: redis
       container_name: wbs_redis
       environment:
          TZ: Asia/Shanghai
       command: ["redis-server","/etc/redis/redis.conf"]
       restart: always
       ports:
            - 6379:6379
       volumes:
            - "/root/workspace/docker/redis/data:/data"
            - "/root/workspace/docker/redis/redis.conf:/etc/redis/redis.conf"
            - "/root/workspace/docker/redis/logs:/logs"

这样启动直接开启了redis-server,并且绑定了配置文件redis.conf

这里的配置文件时在github上下载对应版本的,我这里把bind 127.0.0.1给注释掉了,因为如果不注释掉就只能本地连接redis

redis的默认端口是6379,docker文件需要配置一个端口映射

Redis配置文件

在上面docker-compose.yml中的启动cmd中,可以看到指定了redis.conf,同时挂载volumes也映射了redis.conf/etc/redis/redis.conf

那么这个redis.conf到底是何方神圣?

在搞清楚这个之前,我先提议嘴,使用docker拉取的Redis是不会有指定的配置文件的,需要去github上下载相对应版本的redis.conf

image-20220830220455329

下面就对主要的Redis配置文件进行分析!

基础配置

首先找到有关bind的配置,将其注释掉,bind 127.0.0.1表示只能从本地访问redis,这让我从远程访问还有啥意义嘞

image-20220830220932905

redis可以包含其他的配置文件,也就是说配置文件可以分开写,你这个文件写一部分,我再写一部分,我从这个主的redis.confinclude你这个文件就可以了

image-20220830221224929

redis默认开启保护模式,咱们就不要动他了,让他保护就完事了

image-20220830221359262

redis的默认端口是6379,需要修改可以在配置文件里修改

image-20220830221426339

redis默认是关闭守护进程的,也就是退出直接关闭redis了,但是我是在docker里运行的,docker一直运行redis与我宿主机没啥太大关系,所以我这里也就没动了,如果是本地安装的,建议改成yes,使用守护进程开启

image-20220830221620339

redis默认的日志级别是notice,可以自己配置

image-20220830221722034

redis日志输出的文件名,未空就直接在控制台输出

image-20220830221817085

redis默认有16个数据库,可以自行修改

image-20220830102028332

快照配置

快照配置涉及到了持久化,有关rdb aof的内容会在后面的章节中进行补充

save命令在注释中说的很清楚了,在1小时如果出现了一次修改就会进行快照,5分钟出现了100次修改也会进行快照,1分钟进行10000次修改同样会进行快照

image-20220830222217553

持久化如果出错,redis是否还继续工作,默认式开启的

image-20220830222425375

是否压缩rdb文件,会消耗cpu资源文件,默认开启

image-20220830222510550

校验rdb文件,默认开启

image-20220830222537345

安全配置

redis默认没有密码,使用requirepass设置密码,进入redis-server使用auth 密码进行权限校验

image-20220830225844913

客户端配置

maxclients设置最大的客户端连接数量

image-20220830230133932

内存配置

配置redis的最大内存使用maxmemory

image-20220830230304683

内存策略,默认是maxmemory-policy noeviction

image-20220830230354198

线程配置

redis在6.0版本之前是单线程的,在6.0开始是多线程的(默认关闭),使用io-threads-do-reads来设置是否开启多线程,用io-threads来设置线程数量

image-20220830103053802

AOF配置

默认aof是不打开的

image-20220830230744153

appendfsync always 表示每次修改都会进行同步,非常消耗性能

appendfsync everysec 表示每秒执行一次同步,可能会丢失这1s的数据

appendfsync no表示不执行铜鼓吧,这个时候操作系统会自动同步数据,这样速度最快

默认是everysec

image-20220830231028928

Redis常用命令

redis默认使用第一个数据库,使用select x切换数据库,x表示某个数据库(select 3,表示使用3号数据库)

keys * 可以查看数据库中所有的键

flushdb清空当前数据库

flushall清空所有数据库

set key value 表示将key这个键设置值为value

get key 表示获取key这个键对应的值

del key 表示删除key,可以携带多个key

exists key 表示判断当前的key是否存在,存在返回1,不存在返回0

move key 1 表示将key这个键值对移动到1号数据库中

expire key 10 表示将key这个键设置10秒后失效

tll key 表示查看key这个键的即将失效的时间,单位是秒

五大基础数据类型

String

大部分使用场景都是操作string,例如最常用的set,set了一个string之后,可以使用append进行字符串的追加

如果append的键是不存在的,那么会创建一个新的键

image-20220830110148478

同理,字符串还有查看长度的strlen,还有字符串截取getrange

image-20220830111309814

有了get当然有对应的setrange offset val

image-20220830111433635

还有一个比较关键的自增操作incr

image-20220830110802684

同理还有自减操作decr

image-20220830110901859

那么如果想控制增长量和减少量呢,使用incrby key 10decrby key 10就可以设置自增和自减的值了

下面是setexsetnxsetex表示存在某个键就设置,setnx表示不存在这个键才设置

setex key seconds value 需要设置一个seconds来表示过期时间

setnx key value 只能设置不存在的键

然后是设置多个keys,使用mset指令

mset k1 v1 k2 v2 k3 v3

image-20220830112240919

同理可以使用mget去请求多个键

image-20220830112314038

List

可以在redis中,将list想象成一个双向队列

使用lpush可以向队头存放数据,再使用lrange list start end来指定查看键为list的数据的范围,这里的0 -1表示查看所有

image-20220830133501671

同理,使用rpush向队列尾存放数据

image-20220830133805055

push相对应的就是pop了,使用lpoprpop可以将队头或队尾的元素弹出

image-20220830134044807

使用lindex key index获取列表中对应的下标,下标从0开始

image-20220830134208599

使用llen查看列表的长度

使用lrem key count val来删除指定count个数的val精确值,我这里是删除了两个“2”字符串

image-20220830134626859

使用ltrim key start end 来进行截取列表中指定下标的元素,这里截取1坐标到2坐标,只剩下b和c了

image-20220830135215405

使用rpoplpush source target 将source列表中的队尾弹出,将这个元素插入到target列表的队头

使用lset key index value 将列表中index的值更新为value,如果这个key不存在就会报错!

使用linsert key before/after val newval来插入一元素,在某个元素之前或者之后,如果key不存在或者val不存在,就插入无效

image-20220830160203825

Set

无法重复的list

sadd key value 插入键值对

smembers key 查看所有的值

sismember key value 查看这个value是否存在于key中

scard key 查看键所具有的值的长度

srem key value 删除键值

srandmember key [x] 随机抽选出x个元素

image-20220830161149609

spop key 弹出集合第一个元素

smove key1 key2 value 将key1中的value移动到key2中

image-20220830161500230

sdiff key1 key1 以key1为参照物,比较key1与key2的不同元素(差集)

image-20220830161820183

sinter key1 key2 以key1为参照物,获得key1与key2的交集

sunion key1 key2 获得key1与key2的并集

HashMap

在Redis这个键值对数据库里存放K-V是否有点....

hset key field value 设置一个键,它的值是一个键值对

hget key field 获取一个键中的键值对

image-20220830162549276

hgetall key 获取所有的键值对

image-20220830162744717

hdel key field 删除某个键,这个键所指向的键值对被删除

hlen key 获取某个key的长度

hexists key field 判断key中的某个字段是否存在

hkeys key 获取key中的所有键

hvals key 获取key中的所有值

hincrby key field count 给某个属性自增count

hsetnx key field value 如果不存在则可以设置

Zset

在set的基础上,可以进行排序,即有序集合

zadd key score member 添加一个值,价值为score

zrange key start end 查看start到end范围内的所有值,0 -1 可以列出所有,默认是sroce从小到大的排序方式

image-20220830165312829

zrangebyscore key (startscore (endscroe [withscores] 通过score排序,查看startscore到endscore区间内的值,左括号表示闭区间,不带左括号就是开区间,inf表示无限,withscores是一个可选参数,表示携带score输出

image-20220830165602553

与顺序排序相反的就是逆序,使用zrevrange就可以了

image-20220830170022267

zrem key value 删除zset中的某个元素

zcard key 查看key集合的长度

zcount key (min (max 查看在min和max区间内的集合长度,括号表示闭区间

三大特殊数据类型

Geospatial

和地理位置挂钩的一个数据类型,底层是基于zset的,可以用zset的指令来操作相关的key

geoadd key 经度 纬度 地点名称 添加一个地点的经纬度

geopos key 地点 获取某个地点的经纬度

geodist key 地点1 地点2 单位 查看两个地点置间的直线距离,单位如下

  • m 米

  • km 千米

  • mi 英里

  • ft 英尺

georadius key 经度 纬度 半径 单位 [withdist / withcoord] [count x] 以某个经纬度为圆心,找在半径面积区域内圆的地点,withdist 表示显示直线距离,withcoord表示显示经纬度, count x表示限制限制x个地点

georadiusbymember key 地点 半径 单位 以某个地点为圆心,在半径面积区域内找点

Hyperloglog

Redis中的Hyperloglog是基于统计基数的算法实现的

Hyperloglog相对于set的优势是:在对很大的数据量进行计数处理的时候,内存占用小,缺点是:因为hyperloglog是近似计算,在数据量小的时候,误差会较大

Hyperloglog可以用于文章阅读量或者网站点击量等情景,目的是计数而非统计每个用户的ip,只注重最后的点击量或浏览量

Hyperloglog占用的内存固定,2^64的不同元素只需要12kb的内存就可以存储

Hyperloglog的错误率为0.81%

pfadd key values 给key添加元素,可以多个

pfmerge newkey key1 key2 将key1和key2进行set去重后合并到newkey中

pfcount key 查询key中的数量(不重复的元素)

Bigmaps

对位进行操作,可以非常的节约空间,但是一般用来处理0、1这种两种情况的变量

例如,如果需要统计一年365天打卡的次数,可以创建一个365位的空间,每一天对一位进行赋值处理,如果打卡了就给那一天的那一位置位1,反之就是0

setbit key 第几位 0或1 给key的某一位置为0/1

getbit key 第几位 获取key的某一位

bitcount key [start end] 获取start位到end位上1的数量

Redis事务

==Redis中的单条命令是保证原子性的,但是Redis中的事务是不保证原子性的==

==Redis事务没有隔离(独立)的概念,所有的命令只有在最后发起执行的时候才会统一进行执行==

Redis中事务的本质就是一组命令的集合,一个事务中的命令都会被序列化,在事务执行的过程中严格按照顺序执行
  • 开启事务(multi)
  • 命令入队(....)
  • 执行事务(exec) / 放弃事务(discard)

image-20220830202309028

编译时异常:(使用了错误的语法,所有的命令都不会被执行

image-20220830202556864

运行时异常:(运行中,对某些操作进行了错误的使用,例如自增一个字符串,这样的异常不会导致事务的失败,仅仅是那一句话的失败,所以说redis的事务不保证原子性)

image-20220830202911580

Redis监视模式

先引入两个锁的概念:

  1. 悲观锁:
  • 认为什么时候都会出问题,无论做什么都会加锁(非常影响性能)
  1. 乐观锁:
  • 认为什么时候都不会出现问题,所以不会上锁。更新数据的时候判断一下,在此期间是否有人修改过这个数据

正常情况下的操作,使用watch money没有发现money有其他的变动,可以正常执行事务中的命令,让money 减少60, out增加60

image-20220830203740046

异常情况下的操作(多个IO对Redis进行同时处理),注意看下图的时间,首先设置原始的money 为 100, out为0,再在右边给watch money对其监视,开启事务,输入两条指令但不输入exec,这个时候到左边输入set money 1000,这样就对money这个键进行了更改,再去右边exec事务,发现返回nil,其实就是事务执行失败,原因就是添加了watch,相当于给money增加了一个乐观锁

image-20220830204808508

unwatch命令可以将所有的watch给取消

Java操作Redis

在springboot2之前,springboot内置的操作redis的工具类是jedis

在springboot2之后,springboot内置的操作redis的工具类是lettuce

两者的区别就是

  • jedis使用的是BIO,在高并发的情况下即使有线程池也可能造成崩溃的情况
  • lettuce使用的是NIO,可以让单个线程去轮流干事情,更高效

这里就不提使用最基础的jedis,直接说说springboot整合redis吧

首先pom.xml文件导入依赖

<!--redis数据库模块-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

阅读源码可以发现,springboot中的redis默认使用jdk的序列化操作,这样会让在redis-server中看到的key和value存在编码问题,无法直接显示字符

image-20220830215532927

解决的方式就是咱们自己使用@Configuration注解写一个RedisTemplate的Bean来使用StringRedisSerializer序列化

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String,Object> template = new RedisTemplate<>();
        //关联
        template.setConnectionFactory(factory);
        //设置key的序列化器
        template.setKeySerializer(new StringRedisSerializer());
        //设置value的序列化器
        template.setValueSerializer(new StringRedisSerializer());
        return template;
    }
}

同时,为了更好的使用redis的操作,在网上搜索而来使用非常广的RedisUtils工具类

/**
 * Java版本RedisUtil
 */
@Component
public class RedisUtils {

    @Resource
    private RedisTemplate<String,Object> redisTemplate;

    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 指定缓存失效时间
     *
     * @param key     * @param time 时间(秒)
     * @return boolean
     */
    private boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据key 获取过期时间
     *
     * @param key 键 不能为null
     * @return 时间(秒) 返回0代表为永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判断key是否存在
     *
     * @param key     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除缓存
     *
     * @param key 可以传一个值 或多个
     */
    @SuppressWarnings("unchecked")
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
            }
        }
    }

    /**
     * 普通缓存获取
     *
     * @param key     * @return     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 普通缓存放入
     *
     * @param key     * @param value     * @return true成功 false失败
     */
    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }

    /**
     * 普通缓存放入并设置时间
     *
     * @param key     * @param value     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 递增
     *
     * @param key     * @param delta 要增加几(大于0)
     * @return long
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key     * @param delta 要减少几(小于0)
     * @return long
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * HashGet
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return     */
    public Object hget(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 获取hashKey对应的所有键值
     *
     * @param key     * @return 对应的多个键值
     */
    public Map<Object, Object> hmget(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * HashSet
     *
     * @param key     * @param map 对应多个键值
     * @return true 成功 false 失败
     */
    public boolean hmset(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * HashSet 并设置时间
     *
     * @param key     * @param map  对应多个键值
     * @param time 时间(秒)
     * @return true成功 false失败
     */
    private boolean hmset(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key     * @param item     * @param value     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一张hash表中放入数据,如果不存在将创建
     *
     * @param key     * @param item     * @param value     * @param time  时间(秒)  注意:如果已存在的hash表有时间,这里将会替换原有的时间
     * @return true 成功 false失败
     */
    public boolean hset(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 删除hash表中的值
     *
     * @param key  键 不能为null
     * @param item 项 可以使多个 不能为null
     */
    public void hdel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判断hash表中是否有该项的值
     *
     * @param key  键 不能为null
     * @param item 项 不能为null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash递增 如果不存在,就会创建一个 并把新增后的值返回
     *
     * @param key     * @param item     * @param by   要增加几(大于0)
     * @return double
     */
    public double hincr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash递减
     *
     * @param key     * @param item     * @param by   要减少记(小于0)
     * @return
     */
    public double hdecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    /**
     * 根据key获取Set中的所有值
     *
     * @param key     * @return Set
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根据value从一个set中查询,是否存在
     *
     * @param key     * @param value     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将数据放入set缓存
     *
     * @param key     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 将set数据放入缓存
     *
     * @param key     * @param time   时间(秒)
     * @param values 值 可以是多个
     * @return 成功个数
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取set缓存的长度
     *
     * @param key     * @return long
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值为value的
     *
     * @param key     * @param values 值 可以是多个
     * @return 移除的个数
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 获取list缓存的内容
     *
     * @param key     * @param start 开始
     * @param end   结束  0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取list缓存的长度
     *
     * @param key     * @return long
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通过索引 获取list中的值
     *
     * @param key     * @param index 索引  index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
     * @return Object
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key     * @param value     * @return boolean
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key     * @param value     * @param time  时间(秒)
     * @return boolean
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key     * @param value     * @return boolean
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 将list放入缓存
     *
     * @param key     * @param value     * @param time  时间(秒)
     * @return boolean
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根据索引修改list中的某条数据
     *
     * @param key     * @param index 索引
     * @param value     * @return boolean
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N个值为value
     *
     * @param key     * @param count 移除多少个
     * @param value     * @return 移除的个数
     */
    public long lRemove(String key, long count, Object value) {
        try {
            return redisTemplate.opsForList().remove(key, count, value);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    public void expire(int i, int i1, int i2) {
    }
}

这样使用springboot开发有关redis读写的操作就非常简单了

最后是springboot的application的配置文件,我这里是参考了网络上配置lettuce的一些操作,如果不需要就直接使用port、host、password进行连接就行了

spring:
    redis:
        port: your port
        host: your ip
        password: your passwd
        lettuce:
          pool:
            min-idle: 0
            max-wait: 5s
            max-active: 10
            max-idle: 10
            time-between-eviction-runs: 1s

Redis持久化

Redis属于内存型数据库,一旦服务器断电,如何保证内存中的数据不会损失呢?这就涉及到了Redis的持久化了!

Redis的持久化有两种模式

  • RDB
  • AOF

下面分别来对两种模式进行简单分析

RDB(Redis Database)

在之前的redis.conf中,有对于rdb的设置,包括什么情况进行rdb文件的生成,还有rdb文件的名称等等

网上大部分都说默认的redis的save策略是

save 900 1   -- 900秒内至少有一个改动(就触发
save 300 10		-- 300秒内至少有10个改动
save 60 10000	-- 60秒内至少有10000个改动

但是我下载的最新版本7.0.3的redis.conf其中的配置都被注释了,我也不懂是不是新版本与旧版本配置文件不同的原因

# save 3600 1 300 100 60 10000

默认写的文件配置是,也就是当前redis-server目录下保存的rdb文件的名称

dbfilename dump.rdb

Redis使用了linux系统的CopyOnWrite技术(写时复制),在生成快照文件的时候,仍然可以接收命令来处理数据。

简单来说就是,在开始进行持久化的时候,会从主线程fork出一个子线程,同时这个子线程会共享主线程的所有内存数据,同时我们也知道redis是基于内存的数据库,共享内存数据就等于共享了数据库中的所有的信息,这样这个子线程从主线程读取内存数据,将内存数据写入到dump.rdb中。

此时,如果主线程处理的命令都是read,那么子线程不会受影响

如果,主线程是处理了write,那么会对该命令操作的数据copy一份,生成副本,子线程会把这个副本写入到dump.rdb中,这个过程中,主线程仍然可以执行命令

那么如何出发redis的rdb持久化?

  • 触发了配置文件中save的情况
  • 执行flushall或save命令
  • 退出redis,使用shutdown会自动触发save

如何导入rdb文件?

  • 直接将redis-server所在的目录下导入dump.rdb,redis开启会自动读取

优点:

  • dump.rdb是二进制文件,能快速导入

缺点:

  • rdb每次持久化需要将所有内存数据写入文件,并替换源文件,当内存数据率很大,频繁生产快照非常消耗性能
  • save策略如果设置的时间间隔较大,在没备份时候宕机会导致数据丢失

弄了张网图过来,逻辑一看就懂:

img

AOF(Append Only File)

redis默认不开启AOF,需要在配置项appendonly改成yes

AOF是redis提供的另一种数据持久化方式,它会记录客户端对redis服务端的每一次write操作,并将这些写操作以redis协议追加保存到后缀为aof的文件末尾。在redis服务器重启时,会读取并加载aof文件,达到恢复数据的目的。

也就是对比rbd文件来说,rdb文件是一个二进制文件,而aof是一个记录文件,类似history一样,是可以看见命令的

如果我们人为在aof文件中写入了redis不可识别的命令,可以使用redis-check-aof文件来进行修复,但是可能会丢失一部分数据

image-20220831160954409

redis-check-aof --fix appendonly.aof

同时可以在redis.conf中设置aof文件名和文件目录

appendfilename "appendonly.aof"
appenddirname "appendonlydir"

接下来就是aof的三种写入策略,默认使用的是everysec

# appendfsync always
appendfsync everysec
# appendfsync no
  • always

    客户端对每次的redis写入都会被记录到aof文件中,这种操作非常影响redis的性能,相当于每一次write的操作都进行了一次磁盘的io,好处就是可以记录所有的写数据,基本不会造成数据的丢失,但是代价太大了

  • everysec

    每秒刷新一次buf中的数据到aof中,是aof的默认策略,理论上这种方式最多丢失1s的数据

  • no

    redis服务器不会将数据写入到aof文件中,而是交给操作系统来判断什么时候写入,这种方式是最快的一种策略,但是丢失数据的可能性最大

aof既然是通过append(追加)的方式来存储redis的写记录的,如果对同一个key进行多次的write操作,会产生大量的对同一个key的操作记录,这就会使aof文件非常大,为了避免这个问题,配置文件中可以通过设置auto-aof-rewrite-percentageauto-aof-rewrite-min-size,默认参数如下

image-20220831163443614

auto-aof-rewrite-percentage 100 表示当aof文件大小到达原先文件(上次启动redis时候记录的aof文件大小)的两倍,就会进行文件重写

auto-aof-rewrite-min-size 64mb:aof文件低于64mb不会被重写

这两个指标同时满足,就会触发aof的重写机制,我们也可以通过命令bgrewriteaof手动触发重写。

重写的原理就是重写最新的键,之前设置的键都不记录了,当作垃圾数据。同时重写是主线程fork出一条子线程来讲文件进行重写,遍历这个子线程中的共享的内存数据去将内存转为操作指令,再序列化成一个新的aof文件。

注意:重写的aof文件不会去读取旧的aof文件,而是将当前redis的内存序列化成指令而已,类似于快照

但是考虑到一个问题,就是重写的时候,如果主线程被执行了write的操作,这样新生成的指令如何被写入新的aof中呢?

所以为了保证主线程与子线程的一致性,使用了两个buf完成

AOF文件的重写流程如下:

(1)bgrewriteaof触发重写,判断是否存在bgsave或者bgrewriteaof正在执行,存在则等待其执行结束再执行;

(2)主进程fork子进程,防止主进程阻塞无法提供服务;

(3)子进程遍历Redis内存快照中数据写入临时AOF文件,同时会将新的写指令写入aof_buf和aof_rewrite_buf两个重写缓冲区,前者是为了写回旧的AOF文件,后者是为了后续刷新到临时AOF文件中,防止快照内存遍历时新的写入操作丢失;

(4)子进程结束临时AOF文件写入后,通知主进程;

(5)主进程会将上面的aof_rewirte_buf缓冲区中的数据写入到子进程生成的临时AOF文件中;

(6)主进程使用临时AOF文件替换旧AOF文件,完成整个重写过程。

img

Redis发布订阅

redis的发布订阅是一种消息通信模式

  • 发布者发送消息(pub)
  • 订阅者接收消息(sub)
  • 消息通道(channel)

image-20220831170440269

虽然redis可以做这种发布订阅,但是主流市场根本没人用(汗

基本上都是使用消息队列的框架,Kafka、rabbitmq之类的

有关redis发布订阅的相关操作语句直接去菜鸟教程中看就行了,我估计也不咋用,这里就不写了

Redis主从复制

TODO