一、Redis安装
1.1 WIndows下安装
1.WIN下载地址:https://github.com/microsoftarchive/redis/releases/tag/win-3.2.100
2.下载完毕后,解压到工作目录。
3.双击运行服务 redis-server
4.双击客户端 redis-cli
1.2 Linux下安装
1.下载地址:https://redis.io/ ,注意版本号更高的版本需要更高的GCC版本
2.redis-5.0.8.tar.gz
3.上传服务器linux
4. tar -zxvf redis-5.0.8.tar.gz
5. yum install gcc-c++
6. cd redis-5.0.8 -> make && make install
7. 默认redis的安装路径为 : /usr/local/bin
8. 创建redis的配置目录: mkdir /usr/local/bin/redisconf
9. 复制配置文件: cp /home/redis-5.0.8/redis.conf /usr/local/bin/redisconf
10.可以修改为后台启动:vim redis.conf -->daemonize yes
10. 指定配置文件启动redis服务:redis-server /usr/local/bin/redisconf/redis.conf
11. 连接服务:redis-cli -p 6379
12. 关闭服务:shutdown --> exit
1.3 Redis性能测试工具
redis-benchmark
1.测试:100个并发连接,每个并发100000个请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
1.4 基础知识
- redis默认有 16 个数据库,默认使用第 0 个数据库。
- 切换数据库:select 3
- 查看所有的数据K: keys *
- 清空当前数据库:flushdb,清除所有数据库:flushall
- redis为单线程的。基于内存操作。性能瓶颈为机器内存和网络带宽。
- 多线程CPU切换轮转【耗时间】,上下文切换,不一定比单线程快。
- redis所有的数据全部在内存中,单线程效率最高。
二、五大数据类型
2.1 String 类型
- 设置 set key1 1www
- 获取get key1
- 追加 append key "oooo"
- 获取长度 STRLEN key1
- 自增 浏览量
set views 0
incr views 自增1
decr views 自减1
incr views 2 设置自增2 - 对象 : set user:1 {name:wang,age:24}
mset user:1:name wang user:1:age 24
mget user:1name user:1:age
2.2 List类型【栈,队列,阻塞队列】
- LPUSH list one 多个值插入到列表的头部
- LRANGE list 0 -1 通过区间拿到所有的值
- LRANGE list 0 1 通过区间拿到所有的值
- Rpush list three 插入队列的尾部
- Lpop list 移除左侧第一个值
- Rpop list 移除右侧第一个值
- lindex list 1 通过瞎做获取值
- Llen list 得到列表的长度
- Lrem list 1 one 移除一个或者多个指定的值
- ltrim list 1 2 从1-2截断下来
- lset list 0 kkk ;列表指定下标的值更新为另外一个值
2.3 Set类型
- Set中的值是不可以重复的
- sadd myset "hello" 添加一个值
- SMEMBERS myset 查看某个集合的元素
- SISMEMBER myset hello 判断某一个元素是否在这个集合中
- scard myset 获取Set集合的个数
- srem myset hello 移除某一个值
- SRANDMEMBER myset 随机取出一个元素
- spop myset 随机移除一个元素
- SDIFF key1 key2 差集
- SINTER key1 key2 交集
- SUNION key1 key2 并集
2.4 Hash类型
Map集合,key-map,值是一个map集合,和String类型差不多
更适合对象的存储
- hset myhash field1 wangyin 存储一个值
hget myhash field1 取出一个值 - hmset myhash field1 hello field2 word 存储多个k-v值
- hmget field1 field2 获得多个值
- hgetall myhash 取出所有字段值
- hdel myhash field1 删除指定的key字段,对应的value也被删除了
- hlen myhash 集合长度
- hkeys myhash 获取所有的key
- hvals myhash 获取所有的值
- hset user:1 name wangyi 存储对象属性值
- hget user:1 name 获取对象属性值
2.5 Zset类型
在set的基础之上,增加了一个值 ,set k1 v1 zset k1 score v1
- zadd myset 1 one 1 为排序
- zadd salary 1000 xiaoming
zadd salary 2000 xiaohong
zadd salary 3000 xiaowang
ZRANGEBYSCORE salary -inf +inf 所有的字段升序排序
ZRANGEBYSCORE salary 0 -1 所有的字段降序排序
ZRANGESCORE salary -inf 2500 withscores 查出所有在2500以内带有分数和名字降序排列 - zrange salary 0 -1 查询所有
- zrem salary xiaohong 移除一个元素
- zscard salary 获取有序集合中的个数
三、特殊数据类型
3.1 Geospatia地理位置
朋友定位,附近的人,打车距离查询
规则:两级数据无法导入,一般通过java批量导入
参数 key 值(经度,维度,名称)
redis在redis3.2之后就推出了,计算地理位置信息,两地距离
- geoadd china:city 116.40 39.90 beijing 添加地理位置信息
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing
geoadd china:city 114.05 22.54 guangdong
geoadd china:city 120.16 30.24 hangzhou
geoadd china:city 108.96 34.26 xian - geopos china:city beijing xian 获取指定城市的地理位置
- geodist china:city beijing shanghai 默认直线距离单位m
geodist china:city beijing shanghai km 单位为km
附近的人?获取所有的地址 ,通过半径来查询定位
4. georadius china:city 110 30 1000 km 维度 查询附近1000km的城市
5. georadius china:city 110 30 1000 km withdist withcoord count 1 周围人定位信息限定数量1
3.2 Hyperloglog 基数统计
不重复的元素的个数,可以接受误差0.81%
优点:占用的内存是固定的,只需要12K的内存。
页面的访问量【UV】,一个人访问一个网站多次,算作一个人。
传统的方式:Set保存用户ID,如果大量的话,比较麻烦,目的为了计数,而不是保存用户ID。
- PFadd mykey a b c d e f g h i h 存储
- PFCOUNT mykey 计算基数数量
- PFMERGE mykey mykey2 合并
3.3 Bitmap 位图场景
只有0和1,两个状态,操作二进制位
统计疫情感染人数: 0 1 0 1 0 0 0 1
统计同户信息,活跃,不活跃,登录, 不登录, 打卡
356天 = 365 bit 46byte足够
测试只有1-周末的打卡记录:周一:1 周二:0
- setbit sign 0 1
setbit sign 1 0
setbit sign 2 1
setbit sign 3 1
setbit sign 4 0 - getbit sign 3 查看周三有没有打卡
- bitcount sign 打卡天数
- bitcount sign 0 3 统计周一到周四的打卡记录天数
四、redis事务操作
1.ACID:要么一起成功,要么一起失败。
2.redis单条命令保证原子性,但是事务不保证原子性 ,所有的命令就会被序列化,会按照顺序执行。
3.redis事务没有隔离级别的概念,所有的命令在事务中,没有直接被执行。
4.**redis的事务:**开启事务【multi】-->命令入队【...】-->执行事务【exec】
5.DISCARD:取消事务
五、redis乐观锁
5.1 悲观锁
很悲观,认为什么时候都会出现问题
5.2 乐观锁
1.很乐观,认为神魔时候都不会出现问题,不会上锁,更新数据的时候会比较一下这个期间是否有人修改过。
2.获取版本号
3.更新时候比较version
如:
set money 100
set out 0
watch money 监控监事测试
multi 开启事务
DECRBY money 20 减少20
INCRBY out 20 增加20
exec 执行事务
六、Jedis 客户端
6.1 maven项目添加pom
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.66</version>
</dependency>
6.2 连接测试
public class TestJedis{
public static void main(String[] args){
Jedis jedis = new Jedis("127.0.0.1",6379);
// 所有的方法全部在jedis中
.........
JSONObject jsonObject = new JSONObject();
jsonObject.put("hello","world");
jsonObject.put("name","reba");
// 事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
try{
multi.set("user1",result);
multi.set("user2",result);
int i = 1/0; // 模拟异常
multi.exec(); // 执行事务
}catch(...){
multi.discard(); // 放弃事务
}finally{
jedis.close(); // 关闭连接
}
}
}
七、Springboot整合【spring-data】
7.1 添加依赖
说明:springboot2.x之后,jedis被替换成了lettuce。
**jedis:**采用的是直连,多个线程操作是不安全的,如果避免的话,需要使用jedis pool连接池。
**lettuce:**采用netty,实例可以在多个线程中共享,不存在线程不安全的情况。
7.2 添加yaml
spring.redis.host=127.0.0.1 spring.redis.port=6379
7.3 测试
@Autowired
private RedisTemplate redisTemplate;
@Test
void testRedis(){
// 获取连接对象[可不]
// RedisConnection conn = redisTemplate.getConnectionFactory().getConnection();
// conn.flushDb();
redisTemplate.opsForValue();
redisTemplate.opsForList();
redisTemplate.opsForSet();
redisTemplate.opsForHash();
redisTemplate.opsForZSet();
redisTemplate.opsForGeo();
redisTemplate.opsForHyperLogLog();
}
7.4 自定义RedisTemplate【开发】 乱码问题
默认的序列化方式为JDK的:
解决办法:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(connectionFactory);
// 解决乱码 实现序列化
Jackson2JsonRedisSerializer j2r = new Jackson2JsonRedisSerializer<>(Object);
jedisTemplate.setKeySerializer(j2r);
return redisTemplate;
}
@Bean
public RedisConnectionFactory connectionFactory(JedisPoolConfig poolConfig){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(poolConfig);
jedisConnectionFactory.setHostName("localhost");
jedisConnectionFactory.setPort(6379);
return jedisConnectionFactory;
}
@Bean
public JedisPoolConfig poolConfig(){
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(20);
jedisPoolConfig.setMaxTotal(200);
jedisPoolConfig.setMaxWaitMillis(2000);
jedisPoolConfig.setTestOnBorrow(true);
jedisPoolConfig.setTestOnCreate(true);
return jedisPoolConfig;
}
}
7.5 RedisUtil工具类
// 在我们真实的分发中,或者你们在公司,一般都可以看到一个公司自己封装RedisUtil
package com.kuang.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
八、redis持久化.RDB/.AOF
8.1 RDB
redis 是内存数据库,断电失去数据,需要持久化。
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。
Redis会单独创建( fork ) -个子进程来进行持久化,会先将数据写入到一一个临时文件中,待持久化过程都结束了, 再用这个临时文件替换.上次持久化好的文件。整个过程中,主进程是不进行任何I0操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。
默认的是RDB:dump.rdb
1.测试:修改配置文件
2.删掉dump.rdb rm -f dump.rdb
3.set k1 v1
set k2 v2
set k3 v3
set k4 v4
set k5 v5
4.可以看到有了一个 dump.rdb文件
5.redis 关机:shutdown
6.开机查看数据:get k1
7.数据恢复:将rdb文件放置redis目录下即可,启动会自动加载
优点:大规模的数据恢复,dump.rdb。对数据的完整性要求不高。
缺点:需要一定的时间来进行备份,如果脱机,最后一次修改的数据就没了。fork进行的时候,会占用内存空间。
8.2 AOF [Append Only File]
将所有的命令都记录下来,以日志的形式来记录每个写操作, 将Redis执行过的所有指令记录下来(读操作不记录) , 只许追加文件但不可以改写文件, redis启动之初会读取该文件重新构建数据,换言之, redis重启的话就根据日志文件的内容将写指令从前到后执行一-次以完成数据的恢复工作。
默认是不开启的,我们需要手动进行配置!我们只需要将appendonly改为yes就开启了aof !
重启, redis就可以生效了!
如果这个aof文件有错位,这时候redis 是启动不起来的吗,我们需要修复这个aof文件
redis给我们提供了一个工具[ redis-check-aof --fix
优点: 每一次修改都同步数据,文件完整性较好,每秒同步一次,可能会丢失1s一次
缺点: aof>rdb,修复速度较慢,运行效率也较慢
九、Redis发布-订阅
Redis发布订阅(pub/sub)是一-种消息通信模式:发送者(pub)发送消息。订阅者(sub)接收消息。
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
1.Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对Redis的理解。
2.Redis通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能。
3.通过SUBSCRIBE命令订阅某频道后, redis-server里维护了- -个字典,字典的键就是一一个个channel , 而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE 命令的关键,就是将客户端添加到给定channel的订阅链表中。
4.通过PUBLISH命令向订阅者发送消息, redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
5.Pub/Sub从字面上理解就是发布( Publish )与订阅( Subscribe) , 在Redis中,你可以设定对某- -个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
十、Redis集群
10.1 主从复制
概念
主从复制,是指将一台Redis服务 器的数据,复制到其他的Redis服务器。前者称为主节(master/eader) ,后者称为从节点(slave/follower ;数据的复制是单向的,只能由主节点到从节点。Master以写为主 . Slave 以读为主。
默认情况下.每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点) ,但- -个从节点只能有一个主节点。
主从复制的作用主要包括:
1.数据冗余:主从复制实现了数据的热备份,是持久化Z外的一种数据冗余方式。
2.故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是-种服务的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务.由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点) . 分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说 。要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:
1.从结构上,单个Redis服务器会发生单点故障, 并且一台服务器需要处理所有的请求负载,压力较大;
2.从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G ,也不能将所有内存用作Redis存储内存,
一般来说 。单台Redis最大使用内存不应该超过20G。
低配:1主2从
10.2 单机伪分布式集群搭建
只配置从库,默认为主库。
info replication -->master
1. 将上面的redis.conf 复制多份
cp redis . conf redis79. conf
cp redis . conf redis80. conf
cp redis . conf redis81. conf
2. 将所有配置文件端口修改
port 6379
port 6380
port 6381
3. 修改所有配置文件的
dbfilename dump6379.rdb
dbfilename dump6380.rdb
dbfilename dump6381.rdb
4. 修改所有配置文件的后台运行pid
pidfile /var/run/redis_6379.pid
pidfile /var/run/redis_6380.pid
pidfile /var/run/redis_6381.pid
5. 修改所有的日子文件名称
logfile "6379.log"
logfile "6380.log"
logfile "6381.log"
6. 启动所有服务测试
redis-server /usr/local/bin/redisconf/redis79.conf
redis-server /usr/local/bin/redisconf/redis80.conf
redis-server /usr/local/bin/redisconf/redis81.conf
10.3 集群配置主从复制
- 默认三台都是主机master: 6379【主机】 6380【主机】 6381【主机】
- info replication 查看信息
- 配置从机6380:【从机】
SLAVEOF 127.0.0.1 6379 - 配置从机6381:【从机】
SLAVEOF 127.0.0.1 6379
以上1主2从 配置为暂时性的,需要永久配置区配置文件。
主机负责写,从机负责读
5. 以上没有哨兵,主机宕机后,从机没有上位
从机需要手动上位:
SLAVEOF on noe
Slave启动成功连接到master后会发送一个sync命令
Master接到命令,启动后台的存盘进程,同时收集所有接收至的用于修改数据集命令,在后台进程执行完 毕之后, master将传送整个数据文件到slave ,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制: Master继续将新的所有收集到的修改命令依次传给slave ,完成同步
但是只要是重新连接master , -次完全同步(全量复制)将被自动执行
10.4 哨兵模式【自动上位】
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成-段时间内服务不可用。这不是一种推荐的方式 ,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供 了Sentinel (哨兵)架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障。如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用
●通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
●当哨兵监测到master宕机,会自动将slavet切换成master ,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票 ,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
10.4.1 配置哨兵
1.vim sentinel.conf
sentinel monitor myredis 127.0.0.1 6379 1
2.启动哨兵
redis-sentinel /usr/local/bin/redisconf/sentinel.conf
主机宕机之后,会选举一台机器作为主机。
优点:
1、哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
2、主从可以切换,故障可以转移,系统的可用性就会更好
3、哨兵模式就是主从模式的升级,手动到自动,更加健壮!
缺点:
1、 Redis不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!
2、实现哨兵模式的配置其实是很麻烦的,里面有很多选择!
十一、Redis缓存穿透和雪崩
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要 害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
11.1 缓存穿透
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中.于是向持久层数据库查询。发现也没有于是本次查询失败。 当用户很多的时候,缓存都没有命中 (秒杀! ) ,于是都去请求 了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案1:布隆过滤器
redis数据库中保存null值,对于查询的没有的数据保存null。查询mysql后保存回来。
解决方案2:布隆过滤器
布隆过滤器是一种数据结构 ,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一-段时间窗口的不一 致,这对于需要保持一致性的业务会 :有影响。
11.2 缓存击穿
这里需要注意和缓存击穿的区别, 缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一-个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案:
1.设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。
2.加互斥锁
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务 ,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
11.3 缓存雪崩
缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!
产生雪崩的原因之一, 比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一一个小时。那么到了凌晨一 -点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩, - 定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案:
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样一台挂掉之 后其他的还可以继续工作,其实就是搭建的集群。(异地多活!)
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key ,设置不同的过期时间,让缓存失效的时间点尽量均匀。
评论