您的当前位置:首页正文

控制请求并发数量:p-limit 源码解读

2024-10-17 来源:个人技术集锦

p-limit 是一个控制请求并发数量的库,他的整体代码不多,思路挺好的,很有学习价值;

举例

当我们同时发起多个请求时,一般是这样做的

Promise.all([    requestFn1,    requestFn2,    requestFn3]).then(res =>{})

或者

requestFn1()requestFn2()requestFn3()

而使用 p-limit 限制并发请求数量是这样做的:

var limit = pLimit(8); // 设置最大并发数量为 8var input = [ // Limit函数包装各个请求    limit(() => fetchSomething('1')),    limit(() => fetchSomething('2')),    limit(() => fetchSomething('3')),    limit(() => fetchSomething('4')),    limit(() => fetchSomething('5')),    limit(() => fetchSomething('6')),    limit(() => fetchSomething('7')),    limit(() => fetchSomething('8')),];// 执行请求Promise.all(input).then(res =>{    console.log(res)})

上面 input 数组包含了 8 limit 函数,每个 limit 函数包含了要发起的请求

当设置最大并发数量为 8 时,上面 8 个请求会同时执行

来看下效果,假设每个请求执行时间为 1s

var fetchSomething = (str) => {    return new Promise((resolve, reject) => {        setTimeout(() => {            console.log(str)            resolve(str)        }, 1000)    })}

当设置并发请求数量为 2

当设置并发请求数量为 3

p-limit 限制并发请求数量本质上是,在内部维护了一个请求队列;

当请求发起时,先将请求推入队列,判断当前执行的请求数量是否小于配置的请求并发数量,如果是则执行当前请求,否则等待正在发起的请求中谁请求完了,再从队列首部取出一个执行;

源码(v2.3.0)

pLimit 源码如下(这个源码是 版本的,因为项目中引入的版本比较早。后面会分析从 2.3.0 到最新版本的源码,看看增加或者改进了什么):

'use strict';const pTry = require('p-try');const pLimit = concurrency => {    // 限制为正整数    if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {        return Promise.reject(new TypeError('Expected `concurrency` to be a number from 1 and up'));    }    const queue = []; // 请求队列    let activeCount = 0; // 当前并发的数量    const next = () => { // 一个请求完成时执行的回调        activeCount--;        if (queue.length > 0) {            queue.shift()();        }    };    const run = (fn, resolve, ...args) => { // 请求开始执行        activeCount++;        const result = pTry(fn, ...args);        resolve(result); // 将结果传递给 generator        result.then(next, next); // 请求执行完调用回调    };    // 将请求加入队列    const enqueue = (fn, resolve, ...args) => {        if (activeCount < concurrency) {            run(fn, resolve, ...args);        } else {            queue.push(run.bind(null, fn, resolve, ...args));        }    };    const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args));        // 暴露内部属性给外界    Object.defineProperties(generator, {        activeCount: {            get: () => activeCount        },        pendingCount: {            get: () => queue.length        },        clearQueue: {            value: () => {                queue.length = 0;            }        }    });    return generator;};module.exports = pLimit;module.exports.default = pLimit;

下面一一剖析下

1、 pLimit 函数整体是一个闭包函数,返回了一个名叫 generator 的函数,由 generator 处理并发逻辑,
generator 返回值必须是 promise ,这样才能被 Promise.all 捕获到

const generator = (fn,...args) => new Promise((resolve,reject)=7enqueue(fn,resolve,...args))

2、在 enqueue 函数里面

// 将请求加入队列const enqueue = (fn, resolve, ...args) => {    if (activeCount < concurrency) {        run(fn, resolve, ...args);    } else {        queue.push(run.bind(null, fn, resolve, ...args));    }};

activeCount 表示正在执行的请求数量,当 activeCount 小于配置的并发数量( concurrency )时,则可以执行当前的 fn (执行 run 函数),否则推入请求队列等待。

3、 run 函数接收了三个形参

const run = (fn, resolve, ...args) => { // 请求开始执行    activeCount++;    const result = pTry(fn, ...args);    resolve(result);    result.then(next, next);};
  • fn 表示执行的请求,

  • resolve generator 定义并往下传,一直跟踪到请求执行完毕后,调用 resolve(result); 代表 generator 函数 fulfilled

  • ···args 表示其余的参数,最终会作为 fn 的参数。

4、执行 run 函数时

const run = (fn, resolve, ...args) => { // 请求开始执行    activeCount++; // 请求开始执行,当前请求数量 +1    const result = pTry(fn, ...args);    resolve(result);    result.then(next, next);};

这里执行 fn 使用的是 const result = pTry(fn,...args) pTry 的作用就是创建一个 promise 包裹的结果,不论 fn 是同步函数还是异步函数

// pTry 源码const pTry = (fn,...args) => new Promise((resolve,reject) => resolve(fn(...args)));

现在 fn 执行( fn(...args) )完毕并兑现( resolve(fn(...args)) )之后, result 就会兑现。

result 兑现后, generator promise 也就兑现了( resolve(result) ),那么当前请求 fn 的流程就执行完了。

5、当前请求执行完后,对应的当前正在请求的数量也要减一, activeCount--

const next = () => { // 一个请求完成时执行的回调    activeCount--;    if (queue.length > 0) {        queue.shift()();    }};

然后继续从队列头部取出请求来执行

6、最后暴露内部属性给外界

Object.defineProperties(generator, {    activeCount: { // 当前正在请求的数量        get: () => activeCount    },    pendingCount: { // 等待执行的数量        get: () => queue.length    },    clearQueue: {        value: () => {            queue.length = 0;        }    }});

源码(v2.3.0)=> 源码(v6.1.0)

从 到最新的 版本中间加了一些改进

1、 :始终异步执行传进 limit 的函数

3.0.0 中,作者将请求入队放在前面,将 if 判断语句和请求执行置于微任务中运行;正如源码注释中解释的:因为当 run 函数执行时, activeCount 是异步更新的,那么这里的 if 判断语句也应该异步执行才能实时获取到 activeCount 的值。

这样一开始批量执行 limit(fn) 时,将会先把这些请求全部放入队列中,然后再根据条件判断是否执行请求;

2、 :修复传入的无效并发数引起的错误;

return Promise.reject 改为了直接 throw 一个错误

3、 :移除 pTry 的依赖;改善性能;

移除了 pTry 依赖,改为了 async 包裹,上面有提到, pTry 是一个 promise 包装函数,返回结果是一个 promise ;两者本质都是一样;

增加了 yocto-queue 依赖, yocto-queue 是一个队列数据结构,用队列代替数组,性能更好;队列的入队和出队操作时间复杂度是 O(1) ,而数组的 shift() O(n) ;

4、 :修复上下文传播问题

引入了 AsyncResource

export const AsyncResource = {    bind(fn, _type, thisArg) {        return fn.bind(thisArg);    }}

这里用 AsyncResource.bind() 包裹 run.bind(undefined, fn, resolve, args) ,其实不是太明白为啥加这一层。。。这里用的到三个参数( fn,resolve,args )都是通过函数传参过来的,和 this 没关系吧,各位知道的可以告知下么。

相关 issue

5、 :性能优化,主要优化的地方在下面

移除了 AsyncResource.bind() ,改为使用一个立即执行的 promise ,并将 promise resolve 方法插入队列,一旦 resolve 完成兑现,调用相应请求;相关 issue

6、 :允许实时修改并发限制数

改变并发数后立马再检测是否可以执行请求;


最后

在上面第 4 点的,第 5 点中的优化没太看明白,因为执行请求用的到三个参数( fn,resolve,args )都是通过函数传参过来的,看起来 this 没关系,为啥要进行多层 bind 绑定呢?各位知道的可以不吝赐教下么。

显示全文