跳到主要内容

如何将Node.js事件回调函数流转换为异步迭代器

Mar 3, 2024 2024年3月3日

Here’s an interesting Node.js exercise: 下面是一个有趣的Node.js练习:

Let’s say you’re streaming something (eg. reading a file in memory), but the only API you have available to you is a series of event handler callbacks. 让我们假设你正在流的东西(例如。阅读内存中的文件),但您唯一可用的API是一系列事件处理程序回调。

For example the stream API of the csv-parse library has an API like the following (ignore the fact that the library also has an Async Iterator API for now): 例如,csv-parse库的stream API具有如下所示的API(忽略该库现在也具有Async Iterator API的事实):

import { parse } from 'csv-parse';
const parser = parse({ delimiter: ',' });

parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
console.log(record);
}
});
parser.on('error', (err) => console.error(err.message));
parser.on('end', () => {});

This is a bit ugly though. 不过,这有点难看。

How can one turn the above code into a simple for await loop? 如何将上面的代码变成一个简单的forawait循环?

for await (const record of asyncIterable) {
console.log(record);
}

The task is essentially to turn that first callback code into an Async Iterable, so that the Async Iterable can be iterated via a for await loop. 任务本质上是将第一个回调代码转换为Async Iterable,以便Async Iterable可以通过forawait循环迭代。

Why might we want to do this? Well beyond the for await loop just looking a hell of a lot cleaner, if we ever needed to programmatically iterate through multiple streams simultaneously (eg. comparing multiple files line by line), then that would be practically impossible to do via the callback option. 我们为什么要这么做?``如果我们需要以编程方式同时遍历多个流(例如,逐行比较多个文件),那么通过回调选项几乎不可能做到这一点。

So how do we create an Async Iterable object from the callback code above? 那么我们如何从上面的回调代码创建一个Async Iterable对象呢?

We can wrap the callback code inside an async generator function. 我们可以将回调代码包装在一个blog生成器函数中。

Turns out this is fairly complicated (at least for my small brain). This is probably the hardest Node.js specific problem I recall working on - the equivalent of a Node.js Leetcode hard. 事实证明,这是相当复杂的(至少对我的小脑袋)。这可能是我所记得的最难的Node.js特定问题-相当于Node.js Leetcode困难。

Try to do it yourself if you’d like. Otherwise, here’s what I ended up with. 如果你愿意的话,你可以试着自己做。否则,这就是我的结局。

Solution 溶液

async function* createCsvParseStream(parser) {
let results: any[] = [];
let done = false;
let resolve: (value?: any) => void;
let reject: (value?: any) => void;
let promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});

parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
results.push(record);
resolve();
promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
}
});

parser.on('error', (err) => {
console.error(err.message);
done = true;
reject();
});

parser.on('end', () => {
done = true;
resolve();
});

while (!done) {
await promise;
yield* results;
results = [];
}
}

I’ll be honest, I found this on StackOverflow, and slightly modified it. I’d reference the answer, but don’t have the link (if someone replies with it, I’ll add the link). 老实说,我在StackOverflow上找到了这个,并对其进行了轻微的修改。我会引用答案,但没有链接(如果有人回复,我会添加链接)。

Essentially you create a Promise (promise = new Promise(...)), store its resolve and reject functions, and then await on the promise inside a while loop. When an event is called (parser.on('readable', () => {})), you grab the data (results.push(record)), call resolve() or reject(), and then update your promise reference with a new Promise object, along with its updated resolve and reject functions. 从本质上讲,你创建了一个Promisepromise = newPromise(...)),存储其resolvereject函数,然后在while循环中等待promise。当一个事件被调用时(解析器。on('readable',()=> {})),获取数据(results.push(record)),调用resolve()reject(),然后``使用新的Promise对象更新Promise引用,沿着更新后的resolvereject函数。

The results are returned via yield*, which delegates to the iterable results array, ensuring that each yield call only returns a single record (eg. if results = [a, b, c] , then a, b, and c are returned separately. 结果通过yield*返回,它委托给可迭代的结果数组,确保每个yield调用只返回一个记录(例如:如果results = [a,B,c],则分别返回aBc

The while loop will then await on this next promise. This continues until an error or end event is fired, which sets done = false, exiting the while loop. while循环将等待下一个promise。这将一直持续到一个错误结束事件被触发,这将设置done =false,退出while循环。

Now one can iterate through that event callback code like this: 现在可以像这样遍历事件回调代码:

const asyncIterable = createCsvParseStream(inputStream);

for await (const input of asyncIterable) {
console.log(input);
}

Much nicer! 好多了!

Note: I’m leaving out other code specific to the csv-parse library that I used in this example that would be required to get this to work, but that’s irrelevant to the point here. 注意:我省略了本例中使用的csv-parse库的其他特定代码,这些代码是使其工作所必需的,但与这里的要点无关。

All this being said, the csv-parse library actually has an Async Iterator API, so thankfully there’s no need to do all of this for this specific example. 尽管如此,csv-parse库实际上有一个Async Iterator API,所以谢天谢地,对于这个特定的例子,没有必要做所有这些。

Conditionally iterating multiple streams simultaneously 同时并发迭代多个流

Now if we wanted to conditionally iterate through multiple streams simultaneously (eg. comparing an older version of a sorted CSV file with a newer version to find rows that were added or deleted), we could do something like the following: 现在,如果我们想有条件地同时遍历多个流(例如。比较旧版本的已排序CSV文件与新版本,以查找添加或删除的行),我们可以做如下操作:

const iter = asyncIterable1[Symbol.asyncIterator]();
const iter2 = asyncIterable2[Symbol.asyncIterator]();

let [res1, res2] = await Promise.all([iter.next(), iter2.next()]);

while (!res1.done && !res2.done) {
console.log("do stuff", res1, res2);

if (...) {
...
res1 = await iter.next();
} else if (...) {
...
res2 = await iter2.next();
} else {
...
[res1, res2] = await Promise.all([iter.next(), iter2.next()]);
}
}

What is a "for await" loop? 什么是“for await”循环?

Taking a step back, let's define what a for await loop is. 退一步讲,让我们定义什么是forawait循环。

Given the following Async Iterable object (parser): 给定以下Async Iterable对象(解析器):

import fs from "fs";
import { parse } from "csv-parse";

const parser = fs.createReadStream("test.csv").pipe(parse());

The following for await loop: 下面是forawait循环:

for await (const res of parser) {
console.log("res", res);
}

is equivalent to: 等同于:

const iter = parser[Symbol.asyncIterator]();
let res = await iter.next();

while (!res.done) {
console.log("res", res.value);
res = await iter.next();
}

I thought this was a fun exercise. 我觉得这是个有趣的练习。