十二棵橡树

异步处理多个文件的解决方案

橡树上2021-03-05

目录

前言

在更新博客源码的过程中,需要迁移原来的文章,给每一个文章的 metainfo 加上 uniqueid。

7bfb5e0b.png

这就遇到一个问题,我要处理整个 content 目录下的文件,每一个文件要经历读取文件 -> 更新文件内容 -> 写入原来文件的过程,这是一个异步的过程。由此引申出的问题:如何优雅地处理 N 多个异步任务

在前端开发者的认识里,有多个异步任务,要么一个个处理,那样效率就会很低,并且异步操作很容易写出回调嵌套函数这样难以理解的代码,要么使用类似 Promise.all 的方法,并发请求,但这样的话短时间内性能消耗很大,得不偿失。

这里提供两种思路:一是使用异步并发器,另一个是使用异步迭代器

使用异步并发器

方案思路:

既然不能将所有异步任务一起并发,那么可以拆分异步任务,用排队摇号的思路,设立一个最大异步并发数,每处理完一个补一个,直到都完成。有点类似于 Promise.all 的加强版。这样既不多消耗性能,也能保证效率。

这个就是 ConcurrencyPromisePool 代码的思路。

// utils/ConcurrencyPromisePool.js
class ConcurrencyPromisePool {
    constructor(limit) {
        // 限制
        this.limit = limit;
        // 正在执行的请求数
        this.runningNum = 0;
        // 队列
        this.queue = [];
        // 最终结果
        this.results = [];
    }

    all(promises = []) {
        return new Promise((resolve, reject) => {
            for (const promise of promises) {
                this._run(promise, resolve, reject);
            }
        });
    }

    _run(promise, resolve, reject) {
        // 如果请求数大于限制数,就加入 quene 队列
        if (this.runningNum >= this.limit) {
            console.log(">>> 达到上限,入队!");
            this.queue.push(promise);
            return;
        }
        // 自加
        ++this.runningNum;
        promise()
            .then(res => {
                this.results.push(res);
                --this.runningNum;
                // 当队列为 0 并且执行的任务也为 0 时,说明都完成了
                if (this.queue.length === 0 && this.runningNum === 0) {
                    return resolve(this.results);
                }
                // 当队列中还有新的任务,则递归继续 _run
                if (this.queue.length) {
                    this._run(this.queue.shift(), resolve, reject);
                }
            })
            .catch(reject);
    }
}

module.exports = ConcurrencyPromisePool;
// 使用
async function MigrateBlogWithConcurrencyPromisePool() {
    try {
        // 异步控制并发器,最多六个
        const pool  = new ConcurrencyPromisePool(6);
        // 读取所有文件夹
        const dirs = await fsPromise.readdir(pathname)
        // 读取所有文件路径
        const files = dirs
            .filter(dir => dir !== '.DS_Store')
            .map(dir => path.resolve(__dirname, `../content/${dir}/index.md`));
        // 生成异步操作的 promise
        const filePromises = files.map(fileName => asyncHandleFileMigration(fileName));
        // 开始处理
        await pool.all(filePromises);
        console.log('done!')
    } catch(e) {
        console.log(e)
    }
}

MigrateBlogWithConcurrencyPromisePool()

不过这个方案也有缺点,使用 ConcurrencyPromisePool 必须是等到所有的值都处理完才会返回最终 result 数组,无法迭代一个处理一个。要想像同步代码那样迭代处理,可以使用下面的方案:异步迭代器。

使用异步迭代器

使用异步迭代的思路,把 filePromises 当作要迭代处理的对象,用 for..wait..of 循环处理,优点在于语义非常直观,而且能在每一次迭代中得知异步结果,并且 AsyncIterator 设置了并发数,可以在一次迭代中并发消费多个 promise 对象,提高效率。

// utils/AsyncIterator.js
class AsyncIterator {
    constructor(promiseList = [], concurrencyCount = 1) {
        if (!promiseList.length) {
            throw new Error('not a list');
        }
        // 要迭代的异步对象
        this._iterableList = promiseList;
        // 支持并发处理
        this._concurrencyCount = concurrencyCount;
    }
    _splitIterableList() {
        const result = [];
        let taskList = []
        for (const iterable of this._iterableList) {
            // 当 taskList 小于限制数时,直接添加
            if (taskList.length < this._concurrencyCount) {
                taskList.push(iterable);
            }
            // 当 taskList 等于限制数时,整个 taskList 添加到 result 中,并将 taskList 置为空
            if (taskList.length === this._concurrencyCount) {
                result.push(taskList);
                taskList = [];
            }
        }
        return result;
    }
    async *[Symbol.asyncIterator] () {
        const splitedIterableList = this._splitIterableList();
        for (let taskList of splitedIterableList) {
            yield await Promise.all(taskList)
        }
    }
}

module.exports = AsyncIterator;
// 使用
async function MigrateBlogWithAsyncIterator() {
    try {
        // 读取所有文件夹
        const dirs = await fsPromise.readdir(pathname)
        // 读取所有文件路径
        const files = dirs
            .filter(dir => dir !== '.DS_Store')
            .map(dir => path.resolve(__dirname, `../content/${dir}/index.md`));
        // 生成异步操作的 promise
        const filePromises = files.map(fileName => asyncHandleFileMigration(fileName));
        // 生成异步可迭代对象
        const filePromisesIterables  = new AsyncIterator(filePromises, 2);
        // 异步迭代
        for await (const fileResult of filePromisesIterables) {
            console.log(fileResult)
        }
        console.log('done')
    } catch(e) {
        console.log(e)
    }
}

MigrateBlogWithAsyncIterator()

这个方案也有缺点,目前没有支持 Symbol.asyncIterator 异步迭代器的原生对象,需要手动部署。上面使用的是异步生成器(Async Generator)来简化异步迭代器的写法。

参考