Chiriri's blog Chiriri's blog
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)

Iekr

苦逼后端开发
首页
  • Java

    • JavaSE
    • JavaEE
    • 设计模式
  • Python

    • Python
    • Python模块
    • 机器学习
  • Golang

    • Golang
    • gRPC
  • 服务器

    • Linux
    • MySQL
    • NoSQL
    • Kubernetes
  • 项目

    • 传智健康
    • 畅购商城
  • Hadoop生态

    • Hadoop
    • Zookeeper
    • Hive
    • Flume
    • Kafka
    • Azkaban
    • Hbase
    • Scala
    • Spark
    • Flink
  • 大数据项目

    • 离线数仓
  • 青训营

    • 第四届青训营
  • HTML

    • HTML
    • JavaScript
  • Vue

    • Vue2
    • TypeScript
    • Vue3
    • Uni-APP
  • 数据结构与算法
  • C语言
  • 考研数据结构
  • 计算机组成原理
  • 计算机操作系统
  • Java基础

    • Java基础
    • Java集合
    • JUC
    • JVM
  • 框架

    • Spring
    • Dubbo
    • Spring Cloud
  • 数据库

    • MySQL
    • Redis
    • Elasticesearch
  • 消息队列

    • RabbitMQ
    • RocketMQ
  • 408

    • 计算机网络
    • 操作系统
    • 算法
  • 分类
  • 标签
  • 归档
  • 导航站
GitHub (opens new window)
  • JavaSE

  • JavaEE

  • Linux

  • MySQL

  • NoSQL

  • Python

  • Python模块

  • 机器学习

  • 设计模式

  • 传智健康

  • 畅购商城

    • Day01 项目搭建
    • Day02 FastDFS
    • Day03 微服务鉴权
    • Day04 新增和修改商品
    • Day05 广告缓存
    • Day06 监听数据库更新广告缓存
    • Day07 ES搜索
    • Day08 Thymeleaf
    • Day09 Oauth2
    • Day10 购物车渲染
    • Day11 订单结算
    • Day12 分布式事务解决方案
    • Day13 微信支付
    • Day14 订单处理
    • Day15 秒杀前端
    • Day16 秒杀后端
      • 秒杀异步下单
        • 生产者保证消息不丢失
        • confirm机制
        • 秒杀下单服务更新库存
        • 消费者手动ACK下单实现
        • 流量削峰
        • 秒杀渲染服务-下单实现
      • 防止恶意刷单解决
      • 防止相同商品重复秒杀
      • 秒杀下单接口隐藏
        • 将随机数工具类放入common工程中
        • 秒杀渲染服务定义随机数接口
        • js修改
        • 秒杀渲染服务更改
      • 秒杀下单接口限流
  • 博客项目

  • JVM

  • JUC

  • Golang

  • Kubernetes

  • 硅谷课堂

  • C

  • 源码

  • 神领物流

  • RocketMQ

  • 短链平台

  • 后端
  • 畅购商城
Iekr
2022-05-06
目录

Day16 秒杀后端

# Day16 秒杀后端

  1. 实现秒杀异步下单,掌握如何保证生产者 & 消费者消息不丢失

  2. 实现防止恶意刷单

  3. 实现防止相同商品重复秒杀

  4. 实现秒杀下单接口隐藏

  5. 实现下单接口限流

# 秒杀异步下单

用户在下单的时候,需要基于 JWT 令牌信息进行登陆人信息认证,确定当前订单是属于谁的。

针对秒杀的特殊业务场景,仅仅依靠对象缓存或者页面静态化等技术去解决服务端压力还是远远不够。对于数据库压力还是很大,所以需要异步下单,异步是最好的解决办法,但会带来一些额外的程序上的复杂性。

将 TokenDecode 拷贝一份放到 changgou_service_seckill 项目的 config 包下

1565708840454

然后修改启动类添加 加载 bean

@Bean
public TokenDecode tokenDecode(){
    return new TokenDecode();
}
1
2
3
4

1565708882486

在该项目下的 controller 层创建 SecKillOrderController 类

@RestController
@RequestMapping("/seckillorder")
public class SecKillOrderController {


    @Autowired
    private TokenDecode tokenDecode;


    @Autowired
    private SecKillOrderService secKillOrderService;


    @RequestMapping("/add")
    public Result add(@RequestParam("time") String time, @RequestParam("id") Long id) {
        //获取登陆人
        String username = tokenDecode.getUserInfo().get("username");

        // 基于业务进行秒杀下单
        boolean result = secKillOrderService.add(id, time, username);
        if (result) {
            //下单成功
            return new Result(true, StatusCode.OK, "下单成功");
        } else {
            return new Result(false, StatusCode.ERROR, "下单失败");
        }


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

在 service 层创建 SecKillOrderService

public interface SecKillOrderService {

    //秒杀下单
    boolean add(Long id, String time, String username);
}
1
2
3
4
5

修改 task 包下的 SeckillGoodsPushTask 类的 loadSecKillGoodsToRedis 方法 添加预加载秒杀商品信息

更改预加载秒杀商品

当预加载秒杀商品的时候,提前加载每一个商品的库存信息,后续减库存操作也会先预扣减缓存中的库存再异步扣减 mysql 数据。

预扣减库存会基于 redis 原子性操作实现

//预加载秒杀商品的库存 存放到redis中
redisTemplate.opsForValue().set(SECKILL_GOODS_STOCK_COUNT_KEY + seckillGoods.getGoodsId(), seckillGoods.getStockCount());
1
2

image-20220506104616200

秒杀下单业务层实现

业务逻辑:

获取秒杀商品数据与库存量数据,如果没有库存则抛出异常

执行 redis 预扣减库存,并获取扣减之后的库存值

如果扣减完的库存值 <=0, 则删除 redis 中对应的商品信息与库存信息

基于 mq 异步方式完成与 mysql 数据同步(最终一致性)

注意:库存数据从 redis 中取出,转换成 String

SecKillOrderServiceImpl 类

@Service
public class SecKillOrderServiceImpl implements SecKillOrderService {


    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private IdWorker idWorker;

    public static final String SECKILL_GOODS_KEY = "seckill_goods_";

    public static final String SECKILL_GOODS_STOCK_COUNT_KEY = "seckill_goods_stock_count_key";


    @Override
    public boolean add(Long id, String time, String username) {


        //1. 获取redis中商品信息与库存信息 并进行判断
        //获取商品信息
        SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps(SECKILL_GOODS_KEY + time).get(id);
        //获取库存信息
        String redisStock = (String) redisTemplate.opsForValue().get(SECKILL_GOODS_STOCK_COUNT_KEY + id);
        if (StringUtils.isEmpty(redisStock)) {
            return false;
        }
        int stock = Integer.parseInt(redisStock);
        if (seckillGoods == null || stock <= 0) {
            return false;
        }

        //2. 执行redis的预扣减存操作 并获取扣减之后的库存值
        //执行redis的预扣减
        Long decrement = redisTemplate.opsForValue().decrement(SECKILL_GOODS_STOCK_COUNT_KEY + id);
        if (decrement <= 0) {
            // 如果扣减之后的库存值为 <=0 则该商品已经售罄  则删除redis中相应的商品信息和库存信息
            redisTemplate.boundHashOps(SECKILL_GOODS_KEY + time).delete(id);
            redisTemplate.delete(SECKILL_GOODS_STOCK_COUNT_KEY + id);
        }

        //3. 基于mq完成mysql的数据同步 进行异步下单并扣减库存(mysql) 保证消息生产者对消息不丢失
        // 秒杀订单实体
        SeckillOrder seckillOrder = new SeckillOrder();
        seckillOrder.setId(idWorker.nextId());
        seckillOrder.setSeckillId(id);
        seckillOrder.setMoney(seckillGoods.getPrice());
        seckillOrder.setUserId(username);
        seckillOrder.setSellerId(seckillGoods.getSellerId());
        seckillOrder.setCreateTime(new Date());
        seckillOrder.setStatus("0");
        // 发送消息




        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# 生产者保证消息不丢失

img

按照现有 rabbitMQ 的相关知识,生产者会发送消息到达消息服务器。但是在实际生产环境下,消息生产者发送的消息很有可能当到达了消息服务器之后,由于消息服务器的问题导致消息丢失,如宕机。因为消息服务器默认会将消息存储在内存中。一旦消息服务器宕机,则消息会产生丢失。因此要保证生产者的消息不丢失,要开始持久化策略。

rabbitMQ持久化:
    交换机持久化
    队列持久化
    消息持久化
1
2
3
4
j@Bean(EX_BUYING_ADDPOINTUSER)
public Exchange EX_BUYING_ADDPOINTUSER() {
    return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();
} //.durable(true)开启持久化

//声明队列
@Bean(CG_BUYING_FINISHADDPOINT)
public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {
    Queue queue = new Queue(CG_BUYING_FINISHADDPOINT,true); //开启持久化
    return queue;
}

//消息持久化在rabbitTemplate 默认开启
1
2
3
4
5
6
7
8
9
10
11
12
13

但是如果仅仅只是开启这两部分的持久化,也很有可能造成消息丢失。因为消息服务器很有可能在持久化的过程中出现宕机。因此需要通过数据保护机制来保证消息一定会成功进行持久化,否则将一直进行消息发送。

RabbitMQ数据保护机制
  事务机制
    事务机制采用类数据库的事务机制进行数据保护,当消息到达消息服务器,首先会开启一个事务,接着进行数据磁盘持久化,只有持久化成功才会进行事务提交,向消息生产者返回成功通知,消息生产者一旦接收成功通知则不会再发送此条消息。当出现异常,则返回失败通知.消息生产者一旦接收失败通知,则继续发送该条消息。
    事务机制虽然能够保证数据安全,但是此机制采用的是同步机制,会产生系统间消息阻塞,影响整个系统的消息吞吐量。从而导致整个系统的性能下降,因此不建议使用。
  confirm机制
    confirm模式需要基于channel进行设置, 一旦某条消息被投递到队列之后,消息队列就会发送一个确认信息给生产者,如果队列与消息是可持久化的, 那么确认消息会等到消息成功写入到磁盘之后发出.
    confirm的性能高,主要得益于它是异步的.生产者在将第一条消息发出之后等待确认消息的同时也可以继续发送后续的消息.当确认消息到达之后,就可以通过回调方法处理这条确认消息. 如果MQ服务宕机了,则会返回nack消息. 生产者同样在回调方法中进行后续处理。
1
2
3
4
5
6
7

# confirm 机制

confirm 的性能高,主要得益于它是异步的。生产者在将第一条消息发出之后等待确认消息的同时也可以继续发送后续的消息。当确认消息到达之后,就可以通过回调方法处理这条确认消息。如果 MQ 服务宕机了,则会返回 nack 消息。生产者同样在回调方法中进行后续处理。

在 application 的 rabbitmq 配置项中添加 publisher-confirms 属性为 ture

  rabbitmq:
    host: 192.168.130.128
    publisher-confirms: true #  开启confirm数据保护机制
1
2
3

在 config 包下 创建 RabbitMQConfig 类 并开启队列持久化

@Configuration
public class RabbitMQConfig {

    public static final String SECKLL_ORDER_QUEUE="seckill_order";


    @Bean
    public Queue queue(){
        return new Queue(SECKLL_ORDER_QUEUE,true);
    }

    
}
1
2
3
4
5
6
7
8
9
10
11
12
13

消息持久化源码查看

1565666222921

1565666244065

1565666279691

增强 rabbitTemplate

@Component
public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;


    public static final String MESSAGE_CONFIRM_KEY = "message_confirm_";

    public ConfirmMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //回调函数
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 接收消息服务器返回的通知
     *
     * @param correlationData 唯一标识
     * @param b               true为成功 false为失败
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b) {
            //成功通知
            //删除redis中的元数据
            if (correlationData.getId() != null) {
                redisTemplate.delete(correlationData.getId());
            }
            redisTemplate.delete(MESSAGE_CONFIRM_KEY + correlationData.getId());
        } else {
            //失败通知
            //从redis中获取元数据 并重新发送
            Map<String, String> map = redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY + correlationData.getId());
            String exchange = map.get("exchange");
            String routingKey = map.get("routingKey");
            String message = map.get("message");
            rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(message));


        }
    }


    /**
     * 自定义发送消息方法
     */
    public void sendMessage(String exchange, String routingKey, String message) {
        //设置消息的唯一标识 并存入redis中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        redisTemplate.opsForValue().set(correlationData.getId(), message);

        //将本次发送消息的元数据 也存入redis中
        HashMap<String, String> map = new HashMap<>();
        map.put("exchange", exchange);
        map.put("routingKey", routingKey);
        map.put("message", message);
        redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY + correlationData.getId(), map);

        //携带本次消息的唯一标识 进行消息发送
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);


    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

在 SecKillOrderServiceImpl 注入 ConfirmMessageSender 配置类 并调用其发送方法

@Autowired
private ConfirmMessageSender confirmMessageSender;
1
2

在 add 方法底部调用发送消息的方法

// 发送消息
confirmMessageSender.sendMessage("", RabbitMQConfig.SECKLL_ORDER_QUEUE, JSON.toJSONString(seckillOrder));
1
2

image-20220506142944481

# 秒杀下单服务更新库存

创建异步下单服务 changgou_service_consume 添加依赖

<dependencies>
    <dependency>
        <groupId>com.changgou</groupId>
        <artifactId>changgou_common_db</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>com.changgou</groupId>
        <artifactId>changgou_service_order_api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>com.changgou</groupId>
        <artifactId>changgou_service_seckill_api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>com.changgou</groupId>
        <artifactId>changgou_service_goods_api</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

application

server:
  port: 9022
spring:
  jackson:
    time-zone: GMT+8
  application:
    name: sec-consume
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.130.128:3306/changgou_seckill?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true&serverTimezone=GMT%2b8
    username: root
    password: root
  main:
    allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册
  redis:
    host: 192.168.130.128
  rabbitmq:
    host: 192.168.130.128
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:6868/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
  client:
    config:
      default:   #配置全局的feign的调用超时时间  如果 有指定的服务配置 默认的配置不会生效
        connectTimeout: 60000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接  单位是毫秒
        readTimeout: 20000  # 指定的是调用服务提供者的 服务 的超时时间()  单位是毫秒
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
          #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE
          thread:
            # 熔断器超时时间,默认:1000/毫秒
            timeoutInMilliseconds: 20000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

启动类

@SpringBootApplication
@EnableEurekaClient
public class OrderConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderConsumerApplication.class,args);
    }
}
1
2
3
4
5
6
7

# 消费者手动 ACK 下单实现

按照现有 RabbitMQ 知识,可以得知当消息消费者成功接收到消息后,会进行消费并自动通知消息服务器将该条消息删除。此种方式的实现使用的是消费者自动应答机制。但是此种方式非常的不安全。

在生产环境下,当消息消费者接收到消息,很有可能在处理消息的过程中出现意外情况从而导致消息丢失,因为如果使用自动应答机制是非常不安全。

我们需要确保消费者当把消息成功处理完成之后,消息服务器才会将该条消息删除。此时要实现这种效果的话,就需要将自动应答转换为手动应答, 只有在消息消费者将消息处理完,才会通知消息服务器将该条消息删除。

** 更改配置文件 ** 开启手动应答

  rabbitmq:
    host: 192.168.130.128
    listener:
      simple:
        acknowledge-mode: manual
1
2
3
4
5

在 consume 项目下创建 config 包 将 changgou_service_seckill 下 config 的 RabbitMQConfig 类 复制一份拷贝到 config 包中

image-20220506144923118

创建 service 包 新建 SecKillOrderService 类

public interface SecKillOrderService {

    int createOrder(SeckillOrder seckillOrder);
}

1
2
3
4
5

将 changgou_service_seckill 的 dao 层的两个类复制到 changgou_service_consume 中

image-20220506154545534

修改启动类 添加 @MapperScan 注解

@MapperScan(basePackages = "com.changgou.consume.dao")

1
2

image-20220506161054410

在 mapper 层的 SeckillGoodsMapper 类添加一个 update 方法

import com.changgou.seckill.pojo.SeckillGoods;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
import tk.mybatis.mapper.common.Mapper;


public interface SeckillGoodsMapper extends Mapper<SeckillGoods> {

    @Update("update tb_seckill_goods set stock_count = stock_count-1 when id = #{id} and stock_count >= 1")
    int updateStockCount(@Param("id") Long id);
}
1
2
3
4
5
6
7
8
9
10
11

impl 实现

@Service
public class SecKillOrderServiceImpl implements SecKillOrderService {

    @Autowired
    private SeckillGoodsMapper seckillGoodsMapper;

    @Autowired
    private SeckillOrderMapper seckillOrderMapper;

    @Override
    @Transactional
    public int createOrder(SeckillOrder seckillOrder) {
        //同步mysql中的数据
        //1.扣减秒杀商品的库存
        int result = seckillGoodsMapper.updateStockCount(seckillOrder.getSeckillId());
        if (result <= 0) {
            return 0;
        }
        //2.新建秒杀订单
        result = seckillOrderMapper.insertSelective(seckillOrder);
        if (result <= 0) {
            return 0;
        }
        return 1;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

创建 listener 包 新建 ConsumeListener 监听类 用于处理同步 mysql 结果的

package com.changgou.consume.listener;

import com.alibaba.fastjson.JSON;
import com.changgou.consume.config.RabbitMQConfig;
import com.changgou.consume.service.SecKillOrderService;
import com.changgou.seckill.pojo.SeckillOrder;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Iekr
 * Date:  2022/5/6/0006 14:50
 */
@Component
public class ConsumeListener {

    @Autowired
    private SecKillOrderService secKillOrderService;

    @RabbitListener(queues = RabbitMQConfig.SECKLL_ORDER_QUEUE)
    public void receiveSecKillOrderMessage(Message message, Channel channel) {
        //1.转换消息格式
        SeckillOrder seckillOrder = JSON.parseObject(message.getBody(), SeckillOrder.class);

        //2.基于业务层完成同步mysql操作
        int result = secKillOrderService.createOrder(seckillOrder);
        if (result > 0) {
            //同步mysql成功 向生产者发起成功通知
            try {
                /**
                 * void basicAck(long deliveryTag, boolean multiple)
                 * deliveryTag 消息的唯一标识
                 * multiple 是否开启批处理
                 */
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        } else {
            //同步mysql失败
            //向消息服务器返回失败通知
            try {
                /**
                 * void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                 * deliveryTag 唯一标识
                 * multiple true为所有消费者都会拒绝这个消息 false只有当前消费者拒绝
                 * requeue true当前消息会进入死信队列(延迟消息队列)  false当前消息会重新进入到原有的队列中,默认回到头部
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

# 流量削峰

在秒杀这种高并发的场景下,每秒都有可能产生几万甚至十几万条消息,如果没有对消息处理量进行任何限制的话,很有可能因为过多的消息堆积从而导致消费者宕机的情况。因此官网建议对每一个消息消费者都设置处理消息总数(消息抓取总数)。

消息抓取总数的值,设置过大或者过小都不好,过小的话,会导致整个系统消息吞吐能力下降,造成性能浪费。过大的话,则很有可能导致消息过多,导致整个系统 OOM。因此官网建议每一个消费者将该值设置在 100-300 之间。

更新消费者监听类 ConsumeListener 。

//设置预抓取总数
channel.basicQos(300);
1
2

image-20220506155902680

# 秒杀渲染服务 - 下单实现

在 changgou_service_seckill_api 项目的 feign 包新建 SecKillOrderFeign 类

@FeignClient(name = "seckil")
public interface SecKillOrderFeign {
    @RequestMapping("/seckillorder/add")
    public Result add(@RequestParam("time") String time, @RequestParam("id") Long id);
}
1
2
3
4
5

修改 changgou_web_seckill 项目 controller 层的 SecKillOrderController 方法 并注入 fegin

@Autowired
private SecKillOrderFeign secKillOrderFeign;

@RequestMapping("/add")
public Result add(@RequestParam("time") String time, @RequestParam("id") Long id) {
    Result result = secKillOrderFeign.add(time, id);
    return result;
}
1
2
3
4
5
6
7
8

修改 seckill-index.html 页面 vue 中的 add 方法

之前调用 moment 时括号写错位置 注意修改

add:function (id) {
					axios.get("/api/wseckillorder/add?time="+moment(app.dateMenus[0]).format("YYYYMMDDHH")+"&id="+id).then(function (response) {
						if (response.data.flag){
							app.msg="抢单成功,即将进入支付";
						} else{
							app.msg="抢单失败";
						}
					})
				}
1
2
3
4
5
6
7
8
9

在 changgou_service_seckill 的 service.impl 包下的 SecKillGoodsServiceImpl 的 list 方法添加更新数据来源编码 用于回显数据给前端

public static final String SECKILL_GOODS_STOCK_COUNT_KEY = "seckill_goods_stock_count_key";


//更新数据来源
for (SeckillGoods seckillGoods : list) {
    String value = (String) redisTemplate.opsForValue().get(SECKILL_GOODS_STOCK_COUNT_KEY + seckillGoods.getGoodsId());
    seckillGoods.setStockCount(Integer.valueOf(value));
}
1
2
3
4
5
6
7
8

# 防止恶意刷单解决

在生产场景下,很有可能会存在某些用户恶意刷单的情况出现。这样的操作对于系统而言,会导致业务出错、脏数据、后端访问压力大等问题的出现。

一般要解决这个问题的话,需要前端进行控制,同时后端也需要进行控制。后端实现可以通过 Redis incrde 原子性递增来进行解决。

在 com.changgou.seckill.service.impl.SecKillOrderServiceImpl 类中添加 Redis incrde 操作方法

private String preventRepeatCommit(String username, Long id) {
    String redis_key = "seckill_user_" + username + "_id" + id;

    long count = redisTemplate.opsForValue().increment(redis_key, 1);
    if (count == 1) {
        //代表当前用户是第一次访问
        //对当前的key设置一个五分钟的有效期
        redisTemplate.expire(redis_key, 5, TimeUnit.MINUTES);
        return "success";
    }
    if (count > 1) {
        return "fail";
    }
    return "fail";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

然后在 add 方法中调用此方法

//防止用户恶意刷单
String result = this.preventRepeatCommit(username, id);
if ("fail".equals(result)) {
    return false;
}
1
2
3
4
5

image-20220507185931125

# 防止相同商品重复秒杀

在 changgou_service_seckill 包的 dao 层 SeckillOrderMapper 新增查询方法

    @Select("select * from  tb_seckill_order where user_id =#{username} and seckill_id = #{id}")
    SeckillOrder getOrderInfoByUserNameAndGoodsId(@Param("username") String username, @Param("id") Long id);
1
2

回到 service 的 impl 实现层 的 SecKillOrderServiceImpl 类 注入 mapper 类 并在 add 方法调用

@Autowired
private SeckillOrderMapper seckillOrderMapper;

        //防止重复购买
        SeckillOrder order = seckillOrderMapper.getOrderInfoByUserNameAndGoodsId(username, id);
        if (order != null) {
            return false;
        }
1
2
3
4
5
6
7
8

image-20220507190630458

# 秒杀下单接口隐藏

当前虽然可以确保用户只有在登录的情况下才可以进行秒杀下单,但是无法方法有一些恶意的用户在登录了之后,猜测秒杀下单的接口地址进行恶意刷单。所以需要对秒杀接口地址进行隐藏。

在用户每一次点击抢购的时候,都首先去生成一个随机数并存入 redis,接着用户携带着这个随机数去访问秒杀下单,下单接口首先会从 redis 中获取该随机数进行匹配,如果匹配成功,则进行后续下单操作,如果匹配不成功,则认定为非法访问。

# 将随机数工具类放入 common 工程中

public class RandomUtil {
    public static String getRandomString() {
        int length = 15;
        String base = "abcdefghijklmnopqrstuvwxyz0123456789";
        Random random = new Random();
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < length; i++) {
            int number = random.nextInt(base.length());
            sb.append(base.charAt(number));
        }
        return sb.toString();
    }

    public static void main(String[] args) {
        String randomString = RandomUtil.getRandomString();
        System.out.println(randomString);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 秒杀渲染服务定义随机数接口

在 changgou_web_seckill 项目下 新建 util 包 创建 CookieUtil 类

public class CookieUtil {

    /**
     * 设置cookie
     *
     * @param response
     * @param name     cookie名字
     * @param value    cookie值
     * @param maxAge   cookie生命周期 以秒为单位
     */
    public static void addCookie(HttpServletResponse response, String domain, String path, String name,
                                 String value, int maxAge, boolean httpOnly) {
        Cookie cookie = new Cookie(name, value);
        cookie.setDomain(domain);
        cookie.setPath(path);
        cookie.setMaxAge(maxAge);
        cookie.setHttpOnly(httpOnly);
        response.addCookie(cookie);
    }



    /**
     * 根据cookie名称读取cookie
     * @param request
     * @return map<cookieName,cookieValue>
     */

    public static Map<String,String> readCookie(HttpServletRequest request, String ... cookieNames) {
        Map<String,String> cookieMap = new HashMap<String,String>();
            Cookie[] cookies = request.getCookies();
            if (cookies != null) {
                for (Cookie cookie : cookies) {
                    String cookieName = cookie.getName();
                    String cookieValue = cookie.getValue();
                    for(int i=0;i<cookieNames.length;i++){
                        if(cookieNames[i].equals(cookieName)){
                            cookieMap.put(cookieName,cookieValue);
                        }
                    }
                }
            }
        return cookieMap;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

回到 controller 层 的 SecKillOrderController 类中 添加随机数接口

@Autowired
private RedisTemplate redisTemplate;

@GetMapping("/getToken")
@ResponseBody
public String getToken() {
    String randomString = RandomUtil.getRandomString();
    String cookie = this.readCookie();
    redisTemplate.opsForValue().set("randomCode_" + cookie, randomString,5, TimeUnit.SECONDS );
    return randomString;
}

private String readCookie() {
    HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
    String jti = CookieUtil.readCookie(request, "uid").get("uid");
    return jti;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# js 修改

将 vue 中 methods 中的 add 方法替换

add:function (id) {
   axios.get("/api/wseckillorder/getToken").then( (res) => {
      let random = res.data
      axios.get("/api/wseckillorder/add?time="+moment(app.dateMenus[0]).format("YYYYMMDDHH")+"&id="+id+"&random"+random).then(function (response) {
         if (response.data.flag){
            app.msg="抢单成功,即将进入支付";
         } else{
            app.msg="抢单失败";
         }
      })
   })
}
1
2
3
4
5
6
7
8
9
10
11
12

# 秒杀渲染服务更改

回到 web 的 controller 层中的 SecKillOrderController 类中 修改 add 方法的参数和验证

    /**
     * 秒杀下单
     *
     * @param time 当前时间段
     * @param id   秒杀商品id
     * @return
     */
    @RequestMapping("/add")
    public Result add(@RequestParam("time") String time,@RequestParam("id") Long id,String random) {
        String cookieValue = this.readCookie();
        String redisRandomCode = (String) redisTemplate.opsForValue().get("randomCode_" + cookieValue);
        if (StringUtils.isEmpty(redisRandomCode) || !random.equals(redisRandomCode)){
            return new Result(false, StatusCode.ERROR,"下单失败");
        }

//        System.out.println(secKillOrderFeign);
        Result result = secKillOrderFeign.add(time, id);
        return result;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 秒杀下单接口限流

因为秒杀的特殊业务场景,生产场景下,还有可能要对秒杀下单接口进行访问流量控制,防止过多的请求进入到后端服务器。对于限流的实现方式,我们之前已经接触过通过 nginx 限流,网关限流。但是他们都是对一个大的服务进行访问限流,如果现在只是要对某一个服务中的接口方法进行限流呢?这里推荐使用 google 提供的 guava 工具包中的 RateLimiter 进行实现,其内部是基于令牌桶算法进行限流计算

在 changgou_web_seckill 项目中 添加 guava 依赖

<dependency>
  <groupId>com.google.guava</groupId>
  <artifactId>guava</artifactId>
  <version>28.0-jre</version>
</dependency>
1
2
3
4
5

创建 aspect 包 并创建 AccessLimit 注解

@Inherited
@Documented
@Target({ElementType.FIELD,ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME) //不仅保存到class中 并且该注解jvm加载了class后仍然存在

public @interface AccessLimit {
}
1
2
3
4
5
6
7

创建自定义切面类 AccessLimitAop

package com.changgou.seckill.web.aspect;

import com.alibaba.fastjson.JSON;
import com.changgou.entity.Result;
import com.changgou.entity.StatusCode;
import com.google.common.util.concurrent.RateLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author Iekr
 * Date:  2022/5/7/0007 19:32
 */
@Component
@Scope
@Aspect
public class AccessLimitAop {
    //设置令牌的生成速率
    private RateLimiter rateLimiter = RateLimiter.create(2.0); // 每秒生成2个令牌存入桶中

    @Pointcut("@annotation(com.changgou.seckill.web.aspect.AccessLimit)")
    public void limit() {
    }

    @Around("limit()")
    public Object around(ProceedingJoinPoint proceedingJoinPoint) {
        boolean flag = rateLimiter.tryAcquire();
        Object obj = null;
        if (flag) {
            //允许访问
            try {
                obj = proceedingJoinPoint.proceed(); //放行
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
        } else {
            //不允许访问 拒绝
            obj = JSON.toJSONString(new Result<>(flag, StatusCode.ACCESSERROR, "fail"));
        }
        return obj;
    }


    //向客户端返回信息
    private void outMessage(HttpServletResponse response, String errorMessage) {
        ServletOutputStream outputStream = null;
        try {
            response.setContentType("application/json;charset=utf-8");
            outputStream = response.getOutputStream();
            outputStream.write(errorMessage.getBytes(StandardCharsets.UTF_8));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

使用自定义限流注解

在 SecKillOrderController 类

image-20220507194342470

编辑 (opens new window)
上次更新: 2023/12/06, 01:31:48
Day15 秒杀前端
工程搭建及首页功能

← Day15 秒杀前端 工程搭建及首页功能→

最近更新
01
k8s
06-06
02
进程与线程
03-04
03
计算机操作系统概述
02-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Iekr | Blog
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式