Redis实现消息队列:list、PubSub、Stream,基于Stream的异步秒杀

redis消息队列,list、pubsub、stream,基于stream队列的异步秒杀、redis的消费者组

消息队列介绍

List 实现消息队列

Redis 的 list 数据结构是一个双向链表,先进先出,可以利用 LPUSH、LPOP、RPUSH、RPOP 命令来实现元素的左右进出,但是 list 并不是阻塞队列,当 list 中无元素时,线程并不会阻塞,而是从 list 中取出一个 null,这并不符合我们的业务需要!因此这里要用 BRPOP 或 BLPOP 命令实现阻塞效果!

阻塞队列
当线程尝试从队列中获取元素时,若阻塞队列中无元素,则线程会阻塞,直到队列中有元素线程才会被唤醒并从阻塞队列中取出元素。

PubSub 实现消息队列

Stream 实现消息队列

Stream 的消费者组

Redis 消息队列总结

基于 Stream 的异步秒杀

Redis 创建消费者、消费者组、消息队列

1
2
3
4
5
xadd s1 * k1 v1
xgroup create s1 g1 0

xgroup create stream.orders g1 0 mkstream
XREADGROUP GROUP g1 c1 count 1 Block 2000 streams s1 >

修改 seckill.lua 脚本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-- 1.参数列表
-- 1.1优惠券id
local voucherId=ARGV[1]

--1.2 用户id
local userId=ARGV[2]

--1.3 订单id
local orderId=ARGV[3]

--2.数据key  ..是拼接符号
--2.1 库存key
local stockKey='seckill:stock:'..voucherId
--2.2 订单key
local orderKey='seckill:order:'..voucherId

--3.脚本业务
--3.1 判断库存是否充足
if (tonumber(redis.call('get',stockKey))<=0) then
    return 1
end
--3.2判断用户是否下单   若set集合中存在该用户id,则说明已下过单,返回1
if (tonumber(redis.call('sismember',orderKey,userId))==1) then
    return 2
end

--3.4扣库存
redis.call('incrby',stockKey,-1)
--3.5保存用户到set
redis.call('sadd',orderKey,userId)
--3.6发送消息到消息队列
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0

添加新的秒杀券

数据库和 redis 同时存入 id 为 19 的秒杀券

Apifox 模拟用户抢购秒杀券

查看 Redis 中数据变化

1、Redis 中 id=19 的秒杀券的数量都-1

2、Redis 中成功存入用户 id

3、 Redis 中成功存入订单信息

4、数据库中成功存入订单信息

总结

VoucherOrderServiceImpl.java 完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder>  implements IVoucherOrderService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private ISeckillVoucherService SeckillVoucherService;

    @Autowired
    private RedissonClient redissonClient;

    @Resource
    private RedisIdWorker redisIdWorker;

    //阻塞队列  当线程尝试从队列中获取元素时,若队列无元素,则线程会阻塞,直到队列中有元素才会被唤醒
//    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);

    //线程池,负责从阻塞队列中获取订单然后异步下单
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

    //spring提供的注解  作用:类初始化后就执行VoucherOrderHandler方法
    //向线程池提交一个线程
    @PostConstruct
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    //线程  内部类
    private class VoucherOrderHandler implements Runnable{

        String queueName ="stream.orders";
        @Override
        public void run() {
            while (true){
                try {
                    //获取队列中的订单信息
                    //  VoucherOrder order = orderTasks.take();

                    //获取消息队列的订单信息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1")
                            , StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
                            , StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //判断消息是否获取成功
                    if (list==null||list.isEmpty()){
                        //获取失败说明没有消息,继续循环
                        continue;
                    }
                    //获取成功则创建订单
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    createVoucherOrder(order);
                     //ACK确认信息
                    stringRedisTemplate.opsForStream().acknowledge("s1","g1",record.getId());
                } catch (Exception e) {
                    log.error("订单处理异常",e);
                    handlePendingList();
                }
            }
        }
        private void handlePendingList() {
            while (true){
                try {
                    //获取pendingList队列的订单信息
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1")
                            , StreamReadOptions.empty().count(1)
                            , StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //判断消息是否获取成功
                    if (list==null||list.isEmpty()){
                        //获取失败说明没有消息,结束循环
                        break;
                    }

                    //获取成功则下单
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    createVoucherOrder(order);

                    //ACK确认信息
                    stringRedisTemplate.opsForStream().acknowledge("s1","g1",record.getId());
                } catch (Exception e) {
                    log.error("pendingList处理异常",e);

                }

            }
        }
    }

    //代理对象
    IVoucherOrderService proxy;

    private void handleVoucherOrder(VoucherOrder order) {

        Long userId = order.getUserId();

        RLock redisLock = redissonClient.getLock("lock:order:" + userId);

        boolean tryLock = redisLock.tryLock();

        //判断锁是否获取成功
        if (!tryLock){
            log.error("不允许重复下单");
            return ;
        }
        try {
            //锁加到这里,事务提交后才释放锁
//            proxy.createVoucherOrder(order);
            //使用动态代理类的对象,事务可以生效
        } finally {
            redisLock.unlock();
        }
    }

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static  {
        SECKILL_SCRIPT=new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();

        //TODO 生成了orderId,把订单信息保存到阻塞队列,由另一个线程专门根据订单信息去数据库做增删改查,这就实现了异步
        long orderId = redisIdWorker.nextId("order");

        //执行lua脚本判断有无购买资格
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(),String.valueOf(orderId)
        );
        int i = result.intValue();
        if (i!=0){
            return Result.fail(i==1?"优惠券库存不足":"不能重复下单");
        }


        //获取事务的动态代理对象,需要在启动类加注解暴漏出对象
        proxy = (IVoucherOrderService)AopContext.currentProxy();//拿到动态代理对象

        //添加到阻塞队列
        //orderTasks.add(order);

        return Result.ok(orderId);
    }

    //TODO spring对该类做了动态代理,用动态代理的对象提交的事务
    @Transactional
    public void createVoucherOrder(VoucherOrder order) {
        //一人一单,根据优惠卷id和用户id去数据库查询是否已经存在该优惠卷
        Long id = order.getUserId();
        Long userId = order.getUserId();

        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = redisLock.tryLock();
        if (!isLock){
            log.error("不允许重复下单");
            return;
        }
            //为用户id加锁而不是对整个createVoucherOrder方法加锁,减小锁范围,提升性能,这样每个用户就有不同的锁
            //锁加在函数内部,锁内的代码执行完后就会释放锁,而事务的提交是在整个方法执行后提交的,也就是事务的提交在锁释放之后。
            //但是锁释放后其他线程就可以进来,此时事务可能还没有提交,可能出现并发问题,重复购买
            //所以要扩大锁的范围,把锁加到seckillVoucher方法后面,在事务提交后才能释放锁!

        try {
            int count = query().eq("user_id", id).eq("voucher_id", order.getVoucherId()).count();
            if (count >=1) {
                //count==1说明用户拥有了一个优惠券
                log.error("不能重复下单");
                return;
//                return Result.fail("不能重复购买优惠卷");
            }

            //4.扣减库存  防止超卖,加乐观锁,扣减库存前再查询一次库存判断
//        boolean b = SeckillVoucherService.update()
//                .setSql("stock=stock-1").
//                eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update();
            //使用setSql方法设置了更新语句"stock=stock-1",接着使用eq方法添加了两个条件:"voucher_id"等于voucherId和"stock"等于voucher.getStock()
            //条件1:voucher_id=voucherId指当前操作的优惠卷的id=数据库中的优惠卷id,即通过优惠卷id指明了要修改哪个优惠卷的库存
            //条件2:stock=voucher.getStock,说明该线程修改库存期间没有其他线程来插队修改库存,那么数据是安全的
            //TODO !!!注意!这种操作在并发情况下可能导致用户在优惠卷库存充足的情况下抢购优惠卷失败,也就是即使有库存也会抢购失败,此时可以判断库存是否充足,重新抢购

            //修改如下:最后库存判断,只要>0就可以修改
            boolean b = SeckillVoucherService.update()
                    .setSql("stock=stock-1").
                    eq("voucher_id", order.getVoucherId()).gt("stock", 0)
                    .update();

            if (!b) {
//                return Result.fail("库存不足");
                log.error("库存不足");
                return;
            }
            //创建订单
            save(order);
        } finally {
            //释放锁
            redisLock.unlock();
        }
    }
}

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计