1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ISeckillVoucherService SeckillVoucherService;
@Autowired
private RedissonClient redissonClient;
@Resource
private RedisIdWorker redisIdWorker;
//阻塞队列 当线程尝试从队列中获取元素时,若队列无元素,则线程会阻塞,直到队列中有元素才会被唤醒
// private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue<>(1024*1024);
//线程池,负责从阻塞队列中获取订单然后异步下单
private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();
//spring提供的注解 作用:类初始化后就执行VoucherOrderHandler方法
//向线程池提交一个线程
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//线程 内部类
private class VoucherOrderHandler implements Runnable{
String queueName ="stream.orders";
@Override
public void run() {
while (true){
try {
//获取队列中的订单信息
// VoucherOrder order = orderTasks.take();
//获取消息队列的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1")
, StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
, StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//判断消息是否获取成功
if (list==null||list.isEmpty()){
//获取失败说明没有消息,继续循环
continue;
}
//获取成功则创建订单
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
createVoucherOrder(order);
//ACK确认信息
stringRedisTemplate.opsForStream().acknowledge("s1","g1",record.getId());
} catch (Exception e) {
log.error("订单处理异常",e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true){
try {
//获取pendingList队列的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1")
, StreamReadOptions.empty().count(1)
, StreamOffset.create(queueName, ReadOffset.from("0"))
);
//判断消息是否获取成功
if (list==null||list.isEmpty()){
//获取失败说明没有消息,结束循环
break;
}
//获取成功则下单
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder order = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
createVoucherOrder(order);
//ACK确认信息
stringRedisTemplate.opsForStream().acknowledge("s1","g1",record.getId());
} catch (Exception e) {
log.error("pendingList处理异常",e);
}
}
}
}
//代理对象
IVoucherOrderService proxy;
private void handleVoucherOrder(VoucherOrder order) {
Long userId = order.getUserId();
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
boolean tryLock = redisLock.tryLock();
//判断锁是否获取成功
if (!tryLock){
log.error("不允许重复下单");
return ;
}
try {
//锁加到这里,事务提交后才释放锁
// proxy.createVoucherOrder(order);
//使用动态代理类的对象,事务可以生效
} finally {
redisLock.unlock();
}
}
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT=new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//TODO 生成了orderId,把订单信息保存到阻塞队列,由另一个线程专门根据订单信息去数据库做增删改查,这就实现了异步
long orderId = redisIdWorker.nextId("order");
//执行lua脚本判断有无购买资格
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(),String.valueOf(orderId)
);
int i = result.intValue();
if (i!=0){
return Result.fail(i==1?"优惠券库存不足":"不能重复下单");
}
//获取事务的动态代理对象,需要在启动类加注解暴漏出对象
proxy = (IVoucherOrderService)AopContext.currentProxy();//拿到动态代理对象
//添加到阻塞队列
//orderTasks.add(order);
return Result.ok(orderId);
}
//TODO spring对该类做了动态代理,用动态代理的对象提交的事务
@Transactional
public void createVoucherOrder(VoucherOrder order) {
//一人一单,根据优惠卷id和用户id去数据库查询是否已经存在该优惠卷
Long id = order.getUserId();
Long userId = order.getUserId();
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = redisLock.tryLock();
if (!isLock){
log.error("不允许重复下单");
return;
}
//为用户id加锁而不是对整个createVoucherOrder方法加锁,减小锁范围,提升性能,这样每个用户就有不同的锁
//锁加在函数内部,锁内的代码执行完后就会释放锁,而事务的提交是在整个方法执行后提交的,也就是事务的提交在锁释放之后。
//但是锁释放后其他线程就可以进来,此时事务可能还没有提交,可能出现并发问题,重复购买
//所以要扩大锁的范围,把锁加到seckillVoucher方法后面,在事务提交后才能释放锁!
try {
int count = query().eq("user_id", id).eq("voucher_id", order.getVoucherId()).count();
if (count >=1) {
//count==1说明用户拥有了一个优惠券
log.error("不能重复下单");
return;
// return Result.fail("不能重复购买优惠卷");
}
//4.扣减库存 防止超卖,加乐观锁,扣减库存前再查询一次库存判断
// boolean b = SeckillVoucherService.update()
// .setSql("stock=stock-1").
// eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update();
//使用setSql方法设置了更新语句"stock=stock-1",接着使用eq方法添加了两个条件:"voucher_id"等于voucherId和"stock"等于voucher.getStock()
//条件1:voucher_id=voucherId指当前操作的优惠卷的id=数据库中的优惠卷id,即通过优惠卷id指明了要修改哪个优惠卷的库存
//条件2:stock=voucher.getStock,说明该线程修改库存期间没有其他线程来插队修改库存,那么数据是安全的
//TODO !!!注意!这种操作在并发情况下可能导致用户在优惠卷库存充足的情况下抢购优惠卷失败,也就是即使有库存也会抢购失败,此时可以判断库存是否充足,重新抢购
//修改如下:最后库存判断,只要>0就可以修改
boolean b = SeckillVoucherService.update()
.setSql("stock=stock-1").
eq("voucher_id", order.getVoucherId()).gt("stock", 0)
.update();
if (!b) {
// return Result.fail("库存不足");
log.error("库存不足");
return;
}
//创建订单
save(order);
} finally {
//释放锁
redisLock.unlock();
}
}
}
|