背景介绍

我们的业务需求是批量处理7702条数据,调用抖音开放平台的接口进行广告主的批量操作。最初,我们采用的是同步方式处理数据,导致处理时间过长,用户体验极差。为了提升效率,我们决定引入多线程和异步任务。

第一步:发现问题 —— 同步处理太慢

初始代码示例

for (Item item : list) {
    engineService.blockListBatch(...); // 一个一个调,太慢!
}

这种同步处理方式显然无法满足实时性要求,我们需要一种更高效的方式来处理这些任务。

第二步:引入异步 —— @Async 启动异步线程池

为了提高处理速度,我们首先引入了Spring的 @Async注解,将其配置为异步执行:

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    private ThreadPoolTaskExecutor executor;

    @PostConstruct
    public void init() {
        executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("async-block-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
    }

    @Override
    public Executor getAsyncExecutor() {
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params) -> System.out.println("异步任务异常 in " + method.getName() + ": " + ex.getMessage());
    }

    // 添加 getter,供其他类复用此线程池
    public Executor getExecutor() {
        return this.executor;
    }
}

同时,在需要异步执行的方法上添加 @Async注解:

@Async
@Transactional
public void blockListBatch(...) { ... }

这样做的好处是,方法一调用,立刻返回,不阻塞主线程,任务交给后台线程池执行,大大提升了用户体验。

第三步:并发爆炸 —— “抖音接口报 40110,被限流了!”

虽然异步处理提高了速度,但由于没有对请求频率进行控制,导致我们频繁触发抖音接口的QPS限流(每秒请求数),收到错误码 40110

解决方案:引入限流器

为了避免这种情况,我们引入了Guava的 RateLimiter来进行限流控制:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.3-jre</version>
</dependency>
private static final RateLimiter rateLimiter = RateLimiter.create(20.0); // 20 QPS

CompletableFuture.runAsync(() -> {
    rateLimiter.acquire(); // 获取令牌
    BlockAdvertiserResult result = engineService.blockListBatch(batchId, item.getAppId(), item.getAdvertiserId());
    // 其他逻辑...
}, getAsyncExecutor());

这样,我们可以保证每秒最多发出20个请求,避免触发抖音接口的限流机制。

第四步:滑动窗口 vs 令牌桶 —— 算法选择

对比项

滑动窗口

令牌桶

核心思想

把时间分成小窗口,统计最近1秒内的请求数

每秒生成N个“令牌”,有令牌才能请求

流量控制

平滑,避免突发

允许短时突发

QPS控制

严格控制任意1秒内不超过N次

控制长期平均不超过N QPS

尽管 RateLimiter允许短时突发,但在我们的实际场景中,由于任务是逐步提交的,并不会出现瞬间大量请求的情况,因此 RateLimiter仍然适用。

最终实现

@Async
public void blockListBatch(...) {
    List<CompletableFuture<Void>> futures = list.stream()
        .map(item -> CompletableFuture.runAsync(() -> {
            rateLimiter.acquire(); // 获取令牌
            engineService.blockListBatch(...); // 调用接口
        }, getAsyncExecutor()))
        .collect(Collectors.toList());

  // 等待所有任务完成
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
// 获取当前使用的异步执行器(复用 AsyncConfig 中的逻辑)
private Executor getAsyncExecutor() {
    return asyncConfig.getExecutor();
}