如何将Node.js事件回调函数流转换为异步迭代器
Mar 3, 2024 2024年3月3日
Here’s an interesting Node.js exercise: 下面是一个有趣的Node.js练习:
Let’s say you’re streaming something (eg. reading a file in memory), but the only API you have available to you is a series of event handler callbacks. 让我们假设你正在流的东西(例如。阅读内存中的文件),但您唯一可用的API是一系列事件处理程序回调。
For example the stream API of the csv-parse
library has an API like the following (ignore the fact that the library also has an Async Iterator API for now):
例如,csv-parse
库的stream API具有如下所示的API(忽略该库现在也具有Async Iterator API的事实):
import { parse } from 'csv-parse';
const parser = parse({ delimiter: ',' });
parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
console.log(record);
}
});
parser.on('error', (err) => console.error(err.message));
parser.on('end', () => {});
This is a bit ugly though. 不过,这有点难看。
How can one turn the above code into a simple for await
loop?
如何将上面的代码变成一个简单的forawait
循环?
for await (const record of asyncIterable) {
console.log(record);
}
The task is essentially to turn that first callback code into an Async Iterable, so that the Async Iterable can be iterated via a for await
loop.
任务本质上是将第一个回调代码转换为Async Iterable,以便Async Iterable可以通过forawait
循环迭代。
Why might we want to do this? Well beyond the for await
loop just looking a hell of a lot cleaner, if we ever needed to programmatically iterate through multiple streams simultaneously (eg. comparing multiple files line by line), then that would be practically impossible to do via the callback option.
我们为什么要这么做?``如果我们需要以编程方式同时遍历多个流(例如,逐行比较多个文件),那么通过回调选项几乎不可能做到这一点。
So how do we create an Async Iterable object from the callback code above? 那么我们如何从上面的回调代码创建一个Async Iterable对象呢?
We can wrap the callback code inside an async generator function. 我们可以将回调代码包装在一个blog生成器函数中。
Turns out this is fairly complicated (at least for my small brain). This is probably the hardest Node.js specific problem I recall working on - the equivalent of a Node.js Leetcode hard. 事实证明,这是相当复杂的(至少对我的小脑袋)。这可能是我所记得的最难的Node.js特定问题-相当于Node.js Leetcode困难。
Try to do it yourself if you’d like. Otherwise, here’s what I ended up with. 如果你愿意的话,你可以试着自己做。否则,这就是我的结局。
Solution 溶液
async function* createCsvParseStream(parser) {
let results: any[] = [];
let done = false;
let resolve: (value?: any) => void;
let reject: (value?: any) => void;
let promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
results.push(record);
resolve();
promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
}
});
parser.on('error', (err) => {
console.error(err.message);
done = true;
reject();
});
parser.on('end', () => {
done = true;
resolve();
});
while (!done) {
await promise;
yield* results;
results = [];
}
}
I’ll be honest, I found this on StackOverflow, and slightly modified it. I’d reference the answer, but don’t have the link (if someone replies with it, I’ll add the link). 老实说,我在StackOverflow上找到了这个,并对其进行了轻微的修改。我会引用答案,但没有链接(如果有人回复,我会添加链接)。
Essentially you create a Promise
(promise = new Promise(...)
), store its resolve
and reject
functions, and then await
on the promise inside a while
loop. When an event is called (parser.on('readable', () => {})
), you grab the data (results.push(record)
), call resolve()
or reject()
, and then update your promise
reference with a new Promise
object, along with its updated resolve
and reject
functions.
从本质上讲,你创建了一个Promise
(promise = newPromise(...)
),存储其resolve
和reject
函数,然后在while
循环中等待
promise。当一个事件被调用时(解析器。on('readable',()=> {})
),获取数据(results.push(record)
),调用resolve()
或reject()
,然后``使用新的Promise
对象更新Promise引用,沿着更新后的resolve
和reject
函数。
The results are returned via yield*
, which delegates to the iterable results
array, ensuring that each yield
call only returns a single record (eg. if results = [a, b, c]
, then a
, b
, and c
are returned separately.
结果通过yield*
返回,它委托给可迭代的结果
数组,确保每个yield
调用只返回一个记录(例如:如果results = [a,B,c]
,则分别返回a
、B
和c
。
The while
loop will then await on this next promise. This continues until an error
or end
event is fired, which sets done = false
, exiting the while
loop.
while
循环将等待下一个promise。这将一直持续到一个错误
或结束
事件被触发,这将设置done =false
,退出while
循环。
Now one can iterate through that event callback code like this: 现在可以像这样遍历事件回调代码:
const asyncIterable = createCsvParseStream(inputStream);
for await (const input of asyncIterable) {
console.log(input);
}
Much nicer! 好多了!
Note: I’m leaving out other code specific to the csv-parse
library that I used in this example that would be required to get this to work, but that’s irrelevant to the point here.
注意:我省略了本例中使用的csv-parse
库的其他特定代码,这些代码是使其工作所必需的,但与这里的要点无关。
All this being said, the csv-parse
library actually has an Async Iterator API, so thankfully there’s no need to do all of this for this specific example.
尽管如此,csv-parse
库实际上有一个Async Iterator API,所以谢天谢地,对于这个特定的例子,没有必要做所有这些。
Conditionally iterating multiple streams simultaneously 同时并发迭代多个流
Now if we wanted to conditionally iterate through multiple streams simultaneously (eg. comparing an older version of a sorted CSV file with a newer version to find rows that were added or deleted), we could do something like the following: 现在,如果我们想有条件地同时遍历多个流(例如。比较旧版本的已排序CSV文件与新版本,以查找添加或删除的行),我们可以做如下操作:
const iter = asyncIterable1[Symbol.asyncIterator]();
const iter2 = asyncIterable2[Symbol.asyncIterator]();
let [res1, res2] = await Promise.all([iter.next(), iter2.next()]);
while (!res1.done && !res2.done) {
console.log("do stuff", res1, res2);
if (...) {
...
res1 = await iter.next();
} else if (...) {
...
res2 = await iter2.next();
} else {
...
[res1, res2] = await Promise.all([iter.next(), iter2.next()]);
}
}
What is a "for await" loop? 什么是“for await”循环?
Taking a step back, let's define what a for await
loop is.
退一步讲,让我们定义什么是forawait
循环。
Given the following Async Iterable object (parser
):
给定以下Async Iterable对象(解析器
):
import fs from "fs";
import { parse } from "csv-parse";
const parser = fs.createReadStream("test.csv").pipe(parse());
The following for await
loop:
下面是forawait
循环:
for await (const res of parser) {
console.log("res", res);
}
is equivalent to: 等同于:
const iter = parser[Symbol.asyncIterator]();
let res = await iter.next();
while (!res.done) {
console.log("res", res.value);
res = await iter.next();
}
I thought this was a fun exercise. 我觉得这是个有趣的练习。