当前位置 : 首页 » 文章分类 :  开发  »  Redis-基础

Redis-基础

Redis 相关笔记

redis官方命令手册
https://redis.io/commands

redis官方文档
https://redis.io/documentation

Redis 命令参考
http://doc.redisfans.com/index.html

《Redis 设计与实现》(第一版)
https://redisbook.readthedocs.io/en/latest/index.html

Redis中文官网 - redis文档中心
http://www.redis.cn/documentation.html

redis中文网 - redis教程
http://www.redis.net.cn/tutorial/3501.html

《今天面试了吗》-Redis
https://juejin.im/post/5dccf260f265da0bf66b626d


概述

Redis是一个完全开源免费(遵守BSD协议)的高性能key-value内存数据库,可以用作数据库、缓存和消息中间件。

redis是一个高性能的key-value非关系数据库,它可以存键(key)与5种不同类型的值(value)之间的映射(mapping),支持存储的value类型包括: String(字符串)、list(链表)、set(集合)、zset(有序集合)和hash(哈希)

Redis 与其他 key - value 缓存产品有以下三个特点:

  • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
  • Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。
  • Redis支持数据的备份,即master-slave模式的数据备份。

Redis 优势

  • 性能极高 – Redis能读的速度是110000次/s,写的速度是81000次/s 。
  • 丰富的数据类型 – Redis支持二进制案例的 Strings, Lists, Hashes, Sets 及 Ordered Sets 数据类型操作。
  • 原子 – Redis的所有操作都是原子性的,同时Redis还支持对几个操作全并后的原子性执行。
  • 丰富的特性 – Redis还支持 publish/subscribe, 通知, key 过期等等特性。

这么说吧,redis配置文件中的每条命令,redis的每条操作指令,都可以衍生出一系列问题:
1、这个配置项/指令是干什么的?
2、内部实现机制是什么?
3、有哪些可选的参数?
4、分别是什么意思?
5、不同配置参数的优劣?在什么情况下使用?

随笔分类 - redis
http://www.cnblogs.com/xiaoxi/category/961351.html

redis面试总结
https://www.cnblogs.com/jiahaoJAVA/p/6244278.html


Redis为什么快?

官方提供的数据可以达到100000+的QPS(每秒内的查询次数)

Redis 快的主要原因是:
1、纯内存 I/O,相较于其他基于磁盘的 DB,Redis 的纯内存操作有着天然的性能优势。
2、数据结构简单,多数是 KV 操作,对数据操作也简单
3、I/O 多路复用,基于 epoll/select/kqueue 等 I/O 多路复用技术,实现高吞吐的网络 I/O。
4、单线程模型,单线程无法利用多核,但是从另一个层面来说则避免了不必要的上下文切换和竞争条件,不存在多线程导致的CPU切换,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有死锁问题导致的性能消耗。


单进程单线程

Redis 采用的是基于内存的采用的是单进程单线程模型的 KV 数据库,由 C 语言编写。官方提供的数据是可以达到 100000+ 的 qps. 这个数据不比采用单进程多线程的同样基于内存的 KV 数据库 Memcached 差。

多路复用io模型epoll

多路 I/O 复用模型是利用select、poll、epoll可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),且Redis在内存中操作数据的速度非常快(内存内的操作不会成为这里的性能瓶颈),主要以上两点造就了Redis具有很高的吞吐量。

和Memcached不同,Redis并没有直接使用Libevent,而是自己完成了一个非常轻量级的对select、epoll、evport、kqueue这些通用的接口的实现。在不同的系统调用选用适合的接口,linux下默认是epoll。因为Libevent比较重更通用代码量也就很庞大,拥有很多Redis用不上的功能,Redis为了追求“轻巧”并且去除依赖,就选择自己去封装了一套。

Redis为什么是单线程的?

因为CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存或者网络带宽,既然单线程容易实现,而且CPU不会成为瓶颈,那就顺便成章的采用单线程的方案

如果万一CPU成为你的Redis瓶颈了,或者,你就是不想让服务器其他核闲置,那怎么办?
那也很简单,你多起几个Redis进程就好了。Redis是keyvalue数据库,又不是关系数据库,数据之间没有约束。只要客户端分清哪些key放在哪个Redis进程上就可以了。redis-cluster可以帮你做的更好

redis进程里只有一个线程吗?

redis 的单线程指的是 核心网络模型是单线程的,并不是说整个 redis 进程中只有一个线程。
在 Redis 的 v6.0 版本正式引入多线程之前,其网络模型一直是单线程模式的;
Redis 在 v4.0 版本的时候就已经引入了的多线程来做一些异步操作,此举主要针对的是那些非常耗时的命令,通过将这些命令的执行进行异步化,避免阻塞单线程的事件循环。


Redis 6.0引入多线程网络模型

Redis 多线程网络模型全面揭秘
https://segmentfault.com/a/1190000039223696


Redis应用

lua+redis令牌桶限流算法

Redis与Lua脚本

Redis分布式锁

set k v nx px实现普通Redis分布式锁(单redis实例分布式锁)

通过redis命令 set key value px milliseconds nx 实现简单的单机分布式锁
核心命令:

- 获取锁(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000

- 释放锁(lua脚本中,一定要比较value,防止误解锁)
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

这种实现方式有3大要点(也是面试概率非常高的地方):
1、set 命令要用 set key value px milliseconds nx 保证原子性
2、value 要具有唯一性,可以取 host+threadId
3、释放锁时要验证value值,不能误解锁;

事实上这类琐最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:
1、在Redis的master节点上拿到了锁;
2、但是这个加锁的key还没有同步到slave节点;
3、master故障,发生故障转移,slave节点升级为master节点;
4、导致锁丢失。

redis官方文档里最后也给出了使用 set 做分布式锁的使用建议
https://redis.io/commands/set

redis分布式锁实例(Java)

@Service
@Slf4j
public class ReentrantRedisLock {
    private static final String LOCK_PREFIX = "dlock";
    private static final String LOCK_SUFFIX = "app-name";

    @Value("${spring.profiles.active}")
    private String env;

    @Autowired
    private StringRedisTemplate redisTemplate;

    // 加锁
    public boolean tryLock(String key, final long timeout, final TimeUnit unit) {
        String sLockKey = getLockKey(key);
        return tryAcquire(sLockKey, timeout, unit);
    }

    // 解锁
    public void unlock(String key) {
        String sLockValue = redisTemplate.opsForValue().get(getLockKey(key));
        // 解锁时必须比较锁内容
        if (sLockValue != null && getLockValue().equalsIgnoreCase(sLockValue)) {
            redisTemplate.delete(getLockKey(key));
        }
    }

    private boolean tryAcquire(String sLockKey, final long timeout, final TimeUnit unit) {
        String result = redisTemplate.execute((RedisCallback<String>) connection -> {
            JedisCommands commands = (JedisCommands) connection.getNativeConnection();
            SetParams setParams = new SetParams();
            setParams.nx();
            // 加锁一定要带有效期
            setParams.px(TimeoutUtils.toMillis(timeout, unit));
            return commands.set(sLockKey, getLockValue(), setParams);
        });
        if (StringUtils.isNotEmpty(result)) {
            log.info("Get lock for key: {},timeout:{}s", sLockKey, TimeoutUtils.toSeconds(timeout, unit));
            return true;
        }
        return false;
    }

    // 锁key
    private String getLockKey(String key) {
        return LOCK_PREFIX + "_" + this.env + "_" + LOCK_SUFFIX + "_" + key;
    }

    // 锁的内容是加锁线程的标识,host+线程id
    private String getLockValue() {
        return resolveHostName() + "_" + Thread.currentThread().getId();
    }

    private String resolveHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (Exception e) {
            return LocalIpAddressUtil.resolveLocalIp();
        }
    }
}

Redlock 跨redis实例分布式锁

在Redis的分布式环境中,我们假设有N个Redis master。这些节点完全互相独立,或者说有N个互相隔离的reids单例/redis集群(注意不是一个redis集群里有N个结点),不存在主从复制或者其他集群协调机制。
我们确保将在N个实例上使用与在Redis单实例下相同方法获取和释放锁。现在我们假设有5个Redis master节点,同时我们需要在5台服务器上面运行这些Redis实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行以下操作:
1、获取当前Unix时间,以毫秒为单位。
2、依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。
3、客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
4、如果取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
5、如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

Redlock:Redis分布式锁最牛逼的实现
https://www.jianshu.com/p/7e47a4503b87

Distributed locks with Redis
https://redis.io/topics/distlock

基于 Redis 的分布式锁 Redlock
https://zhuanlan.zhihu.com/p/40915772

服务器出现时钟回拨时会有什么问题?


redis分布式锁对比zookeeper分布式锁

没有绝对的好坏,只有更适合自己的业务。
就性能而言,redis很明显优于zookeeper;
就分布式锁实现的健壮性而言,zookeeper很明显优于redis。
如何选择,取决于你的业务!


分布式锁key设计错误导致没锁住案例排查

有个关系表,比如

create table student_school_table
(
    student_id bigint null,
    school_id bigint null
);

业务要求是一个学生只能有唯一的一所学校,即 student_school_table 表中 每个 student_id 最多对应一个 school_id,但表上忘了给 student_id 加唯一索引。

后端代码中,更新 student_school_table 表时,redis 分布式锁的key设计为 keyprefix-student_id-school_id, 这么设计是不对的。
出错案例如下:
并发有两个更新 student_id=1 的学校的请求同时打到2台服务器server1, server2上,server1上的请求是把 学生1的学校id更新为100, server2上的请求是把 学生1的学校id更新为200。
两台服务器上会分别请求加分布式锁,server1请求加锁 keyprefix-1-100, server2请求加锁 keyprefix-1-200, 都能加锁成功,从而往表中写入 student_id=1 的两条学校记录。

解决方案:
分布式key设计为 keyprefix-student_id 即可,即对同一个学生的学校修改要改为串行的。


Redisson

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-Memory Data Grid)。
充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

Redisson项目介绍
https://github.com/redisson/redisson/wiki

redisson/redisson
https://github.com/redisson/redisson

Redisson实现redis分布式锁

Redisson实现Redis分布式锁的N种姿势
https://www.jianshu.com/p/f302aa345ca8


Jedis

Jedis模糊匹配keys

Jedis Redis 模糊匹配 取得 key 列表

往Redis中初始化几条测试数据:

china:beijing
china:shandong:heze
china:shandong:jinan

Jedis jedis = new Jedis("10.110.20.152", 6379);
Set<String> set = jedis.keys("china:shandong*");
for (String key : set) {
    System.out.println(key);
}

* 0到任意多个字符
? 1个字符


Lettuce

ConnectionWatchdog Reconnecting, last destination was

spring boot 2.x 使用 lettuce 客户端连接 redis 集群,启动后一致打下面的日志
io.lettuce.core.protocol.ConnectionWatchdog - Reconnecting, last destination was ***

这是 lettuce 客户端的重连机制,一段时间没有使用 redis 的话会断开连接,然后 lettuce 会自动重连。不是错误。

why lettuce client keep reconnecting #861
https://github.com/lettuce-io/lettuce-core/issues/861


jedis/lettuce/redisson 对比

Jedis
老牌的Java实现客户端,提供了比较全面的Redis命令的支持,
使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步。Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

Redisson
实现了分布式和可扩展的Java数据结构。
促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如分布式锁,分布式集合,可通过Redis支持延迟队列
基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作

Lettuce
高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。
主要在一些分布式缓存框架上使用比较多
基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作


redis管道(pipeline)

Redis的pipeline(管道)功能在命令行中没有,但redis是支持pipeline的,而且在各个语言版的client中都有相应的实现。

Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP连接中减少了“交互往返”的时间。

不过在编码时请注意,pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。

管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,pipeline通过减少客户端与redis的通信次数来实现降低往返延时时间,而且Pipeline 实现的原理是队列,而队列的原理是时先进先出,这样就保证数据的顺序性。 Pipeline 的默认的同步的个数为53个,也就是说arges中累加到53条数据时会把数据提交。

需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。具体多少合适需要根据具体情况测试。

适用场景
有些系统可能对可靠性要求很高,每次操作都需要立马知道这次操作是否成功,是否数据已经写进redis了,那这种场景就不适合。

还有的系统,可能是批量的将数据写入redis,允许一定比例的写入失败,那么这种场景就可以使用了,比如10000条一下进入redis,可能失败了2条无所谓,后期有补偿机制就行了

分布式缓存Redis之Pipeline(管道)
https://blog.csdn.net/u011489043/article/details/78769428


直接通过Jedis使用管道

不集成 spring,直接利用 jedis 客户端直接操作

public void test3Pipelined() {
    Jedis jedis = new Jedis("localhost");
    Pipeline pipeline = jedis.pipelined();
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        pipeline.set("p" + i, "p" + i);
    }
    List<Object> results = pipeline.syncAndReturnAll();
    long end = System.currentTimeMillis();
    System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");
    jedis.disconnect();
}

Redis的Java客户端Jedis的八种调用方式(事务、管道、分布式…)介绍
http://www.blogways.net/blog/2013/06/02/jedis-demo.html


通过 Spring RedisTemplate 使用管道

List<Object> results = this.getRedisTemplate().executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        RedisSerializer<String> keySerializer=new StringRedisSerializer();
        for (String key : keys) {
            connection.get(keySerializer.serialize(key));
        }
        return null;
    }
});

在doInRedis方法中实现需要的redis操作
doInRedis中的redis操作不会立刻执行
所有redis操作会在connection.closePipeline()之后一并提交到redis并执行,这是pipeline方式的优势
所有操作的执行结果为executePipelined()的返回值

redis pipeline简介
https://www.jianshu.com/p/a8e33e058518

我们的封装方式:

// 封装stringRedisTemplate的管道方法
public List<Object> executePipelined(RedisCallback<?> action){
    return stringRedisTemplate.executePipelined(action);
}

// 使用管道实现带过期时间的mset
public void multiSet(Map<String, String> map, long milliSeconds) {
    checkState(map != null, "Required not-null param 'map'");
    checkState(milliSeconds > 0, "Required param 'seconds' must > 0");

    executePipelined(redisConnection -> {
        RedisSerializer<String> serializer = new StringRedisSerializer();
        map.forEach((k, v) -> {
            redisConnection.set(serializer.serialize(k), serializer.serialize(v));
            redisConnection.pExpire(serializer.serialize(k), milliSeconds);
        });
        return null;
    });
}

redis集群模式下使用pipeline

为什么RedisCluster无法使用pipeline?

Redis 集群的键空间被分割为 16384 个槽(slot),集群的最大节点数量也是 16384 个。每个主节点都负责处理 16384 个哈希槽的其中一部分。
具体的redis命令,会根据key计算出一个槽位(slot),然后根据槽位去特定的节点redis上执行操作。

一次 pipeline 会批量执行多个命令,那么每个命令都需要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),然后根据槽位去特定的机器执行命令,也就是说一次 pipeline 操作会使用多个节点的 redis 连接,而目前 JedisCluster 是无法支持的。

基于JedisCluster扩展pipeline?

设计思路
1、首先要根据 key 计算出此次 pipeline 会使用到的节点对应的连接(也就是 jedis 对象,通常每个节点对应一个Pool)。
2、相同槽位的 key ,使用同一个 jedis.pipeline 去执行命令。
3、合并此次 pipeline 所有的 response 返回。
4、连接释放返回到池中。

也就是将一个 JedisCluster 下的 pipeline 分解为每个单节点下独立的 jedisPipeline 操作,最后合并 response 返回。具体实现就是通过 JedisClusterCRC16.getSlot(key) 计算 key 的 slot 值,通过每个节点的 slot 分布,就知道了哪些 key 应该在哪些节点上。再获取这个节点的 JedisPool 就可以使用 pipeline 进行读写了。

一种简单实现Redis集群Pipeline功能的方法及性能测试
https://www.cnblogs.com/xiaodf/p/11002184.html

redis集群客户端JedisCluster优化 - 管道(pipeline)模式支持
https://blog.csdn.net/youaremoon/article/details/51751991

redis-cluster集群模式下使用pipeline,mget,mset批量操作
https://my.oschina.net/u/1266221/blog/894308

redis-cluster官方集群模式下使用pipeline批量操作
https://blog.csdn.net/kevin_pso/article/details/53945053


redis pipeline不保证原子性

pipeline 只是批量操作,但不保证多个操作的原子性


redis pipeline与lua脚本对比

pipeline 只是批量操作,但不保证多个操作的原子性
lua 脚本能保证多个操作的原子性


Redis和数据库不一致问题

如果要“保证”数据的安全性,那么会带来开销的进一步提升,以至于使用redis带来的性能优势都会丧失。正确的做法是区分不同的业务,使得并不需要“保证”数据一致性的场合,可以使用redis优化。而敏感的场合依然使用mysql。

数据库和缓存之间一般不需要强一致性。
一般缓存是这样的:
读的顺序是先读缓存,后读数据库
写的顺序是先写数据库,然后写缓存
每次更新了相关的数据,都要把该缓存清理掉
为了避免极端条件下造成的缓存与数据库之间的数据不一致,缓存需要设置一个失效时间。时间到了,缓存自动被清理,达到缓存和数据库数据的“最终一致性”

为保证redis和数据库的数据一致性,写入策略应该是:
先使redis key失效(删除key),再写入数据库,等redis查询时自动去数据库更新。也就是缓存只做失效,不做更新。

数据库与缓存双写情况下导致数据不一致问题
场景一
当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后删除缓存,发现删除缓存失败了,这意味着数据库存的是99,而缓存是100,这导致数据库和缓存不一致。

场景一解决方案
这种情况应该是先删除缓存,然后在更新数据库,如果删除缓存失败,那就不要更新数据库,如果说删除缓存成功,而更新数据库失败,那查询的时候只是从数据库里查了旧的数据而已,这样就能保持数据库与缓存的一致性。

场景二
在高并发的情况下,如果当删除完缓存的时候,这时去更新数据库,但还没有更新完,另外一个请求来查询数据,发现缓存里没有,就去数据库里查,还是以上面商品库存为例,如果数据库中产品的库存是100,那么查询到的库存是100,然后插入缓存,插入完缓存后,原来那个更新数据库的线程把数据库更新为了99,导致数据库与缓存不一致的情况

场景二解决方案
遇到这种情况,可以用队列的去解决这个问,创建几个队列,如20个,根据商品的ID去做hash值,然后对队列个数取摸,当有数据更新请求时,先把它丢到队列里去,当更新完后在从队列里去除,如果在更新的过程中,遇到以上场景,先去缓存里看下有没有数据,如果没有,可以先去队列里看是否有相同商品ID在做更新,如果有也把查询的请求发送到队列里去,然后同步等待缓存更新完成。

redis系列之数据库与缓存数据一致性解决方案
http://blog.csdn.net/simba_1986/article/details/77823309

mysql binlog

Mysql的binlog日志作用是用来记录mysql内部增删改查等对mysql数据库有更新的内容的记录(对数据库的改动),对数据库的查询select或show等不会被binlog日志记录;主要用于数据库的主从复制以及增量恢复。

使用阿里的同步工具canal,canal实现方式是模拟mysql slave和master的同步机制,监控DB bitlog的日志更新来触发缓存的更新,此种方法可以解放程序员双手,减少工作量,但在使用时有些局限性。


其他特性

redis事务

redis的事务中,一次执行多条命令,本质是一组命令的集合,一个事务中所有的命令将被序列化,即按顺序执行而不会被其他命令插入

在redis中,事务的作用就是在一个队列中一次性、顺序性、排他性的执行一系列的命令。

常用的关于事务的命令有:

  1. MULTI:使用该命令,标记一个事务块的开始,通常在执行之后会回复OK,(但不一定真的OK),这个时候用户可以输入多个操作来代替逐条操作,redis会将这些操作放入队列中。
  2. EXEC:执行这个事务内的所有命令
  3. DISCARD:放弃事务,即该事务内的所有命令都将取消
  4. WATCH:监控一个或者多个key,如果这些key在提交事务(EXEC)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作(类似于乐观锁)。
  5. UNWATCH:取消WATCH命令对多有key的监控,所有监控锁将会被取消。

redis事务不支持回滚

如下,事务中某条命令执行时错误:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> SET key 1
QUEUED
127.0.0.1:6379> SADD key 2  //用集合命令sadd操作string类型key出现错误
QUEUED
127.0.0.1:6379> SET key 3  //错误之后的命令依然会继续执。
QUEUED
127.0.0.1:6379> EXEC
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
3) OK
127.0.0.1:6379> GET key //键值被错误命令之后的命令修改
"3"

但是提交事务的时候会发现,这条错误命令确实没有执行,但是其他正确的命令却执行,这是为什么的?
原因是在redis中,对于一个存在问题的命令,如果在入队的时候就已经出错,整个事务内的命令将都不会被执行(其后续的命令依然可以入队),如果这个错误命令在入队的时候并没有报错,而是在执行的时候出错了,那么redis默认跳过这个命令执行后续命令。
也就是说,redis只实现了部分事务。不支持回滚。

为什么redis事务不支持回滚?

如果你有使用关系式数据库的经验, 那么 “Redis 在事务失败时不进行回滚,而是继续执行余下的命令”这种做法可能会让你觉得有点奇怪。

以下是这种做法的优点:
Redis 命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。
因为不需要对回滚进行支持,所以 Redis 的内部可以保持简单且快速。

为什么 Redis 不支持回滚(roll back)
http://doc.redisfans.com/topic/transaction.html#redis-roll-back

用watch命令实现乐观锁

redis的锁CAS(check and set)类似于乐观锁,redis的实现原理是使用watch进行监视一个(或多个)数据,如果在事务提交之前数据发生了变化(估计使用了类似于乐观锁的标记),那么整个事务将提交失败。

我们可以举一个例子,我们开启两个终端,模拟两个人的操作,设置一条数据为count,初始时100,现在A对其进行监控,并且为count增加20。在没有提交之前,B也获取了这个count,为其减少50,那么这个时候A如果提交事务,会出现失败提示。可以看到,在A对数据的修改过程中,B对数据进行了修改,那么这条数据的“标记”就发生了变化,已经不是当初A取出数据的时候的标记了,这样,A的事务也就提交失败了。

Redis入门之浅谈redis事务
https://blog.csdn.net/candy_rainbow/article/details/52810440

Redis的事务功能详解
https://www.cnblogs.com/kyrin/p/5967620.html


Redis发布订阅

SUBSCRIBE, UNSUBSCRIBEPUBLISH 三个命令实现了发布与订阅信息泛型(Publish/Subscribe messaging paradigm), 在这个实现中, 发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端), 而是将信息发送给频道(channel), 然后由频道将信息转发给所有对这个频道感兴趣的订阅者。

发送者无须知道任何关于订阅者的信息, 而订阅者也无须知道是那个客户端给它发送信息, 它只要关注自己感兴趣的频道即可。

比如说, 要订阅频道 foo 和 bar , 客户端可以使用频道名字作为参数来调用 SUBSCRIBE 命令:
redis> SUBSCRIBE foo bar

当有客户端发送信息到这些频道时, Redis 会将传入的信息推送到所有订阅这些频道的客户端里面。

正在订阅频道的客户端不应该发送除 SUBSCRIBE 和 UNSUBSCRIBE 之外的其他命令。 其中, SUBSCRIBE 可以用于订阅更多频道, 而 UNSUBSCRIBE 则可以用于退订已订阅的一个或多个频道。

SUBSCRIBE 和 UNSUBSCRIBE 的执行结果会以信息的形式返回, 客户端可以通过分析所接收信息的第一个元素, 从而判断所收到的内容是一条真正的信息, 还是 SUBSCRIBE 或 UNSUBSCRIBE 命令的操作结果。

publish channel message

PUBLISH channel message 将信息 message 发送到指定的频道 channel
时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。
返回值:接收到信息 message 的订阅者数量。

# 对没有订阅者的频道发送信息
redis> publish bad_channel "can any body hear me?"
(integer) 0

# 向有一个订阅者的频道发送信息

redis> publish msg "good morning"
(integer) 1

# 向有多个订阅者的频道发送信息

redis> publish chat_room "hello~ everyone"
(integer) 3

subscribe channel

SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。
时间复杂度:O(N),其中 N 是订阅的频道的数量。

unsubscribe channel

UNSUBSCRIBE [channel [channel ...]] 指示客户端退订给定的频道。
如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
时间复杂度:O(N) , N 是客户端已订阅的频道的数量。

发布订阅演示实例

1、打开 redis 连接1,直接向一个全新的 channel 名字发送一个消息就创建了这个 channel

> publish new_channel 'new_channel'
0

返回 0 表示当前有 0 个消费者监听

2、另开一个 redis 连接2,订阅这个 channel

> subscribe new_channel
subscribe
new_channel
1

返回订阅的 频道 名字和订阅的频道数量

3、在 redis 连接1 中,向 new_channel 发送消息

> publish new_channel '能听到吗'
1

则 redis 连接2 中立即会收到此消息:

message
new_channel
能听到吗

发布与订阅(pub/sub)
http://doc.redisfans.com/topic/pubsub.html


问题

somaxconn is set to the lower value of 128

redis 日志中有 warning:
The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.

原因:
redis 并发请求过高,超过默认 TCP 监听队列长度。
net.core.somaxconn 是 Linux 中的一个 kernel 参数,表示 socket 监听队列 backlog 的长度,即服务的最大并发 TCP 连接数。
什么是backlog呢?backlog就是socket的监听队列,当一个请求(request)尚未被处理或建立时,他会进入backlog。而socket server可以一次性处理backlog中的所有请求,处理后的请求不再位于监听队列中。当server处理请求较慢,以至于监听队列被填满后,新来的请求会被拒绝。
对于一个经常处理新连接的高负载 web 服务环境来说,默认的 128 太小了,大多数环境这个值建议增加到 1024 或者更多。

每一个处于监听(Listen)状态的端口,都有自己的监听队列,监听队列的长度与如下两方面有关:
1、somaxconn 参数.
2、使用该端口的程序中的listen()函数.

解决:
将 net.core.somaxconn 设置为 1024
1、临时生效,重启系统后失效
echo 1024 > /proc/sys/net/core/somaxconn

sysctl -w net.core.somaxconn=1024
2、永久生效
vim /etc/sysctl.conf
net.core.somaxconn=1024
sysctl -p


上一篇 Hexo博客(14)添加来必力评论系统

下一篇 Redis-命令与数据类型

阅读
评论
8.5k
阅读预计32分钟
创建日期 2017-07-05
修改日期 2024-06-02
类别

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论