如何将Node.js事件回调函数流转换为异步迭代器
Mar 3, 2024 2024年3月3日
Here’s an interesting Node.js exercise: 下面是一个有趣的Node.js练习:
Let’s say you’re streaming something (eg. reading a file in memory), but the only API you have available to you is a series of event handler callbacks. 让我们假设你正在流的东西(例如。阅读内存中的文件),但您唯一可用的API是一系列事件处理程序回调。
For example the stream API of the csv-parse
library has an API like the following (ignore the fact that the library also has an Async Iterator API for now):
例如,csv-parse
库的stream API具有如下所示的API(忽略该库现在也具有Async Iterator API的事实):
import { parse } from 'csv-parse';
const parser = parse({ delimiter: ',' });
parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
console.log(record);
}
});
parser.on('error', (err) => console.error(err.message));
parser.on('end', () => {});
This is a bit ugly though. 不过,这有点难看。
How can one turn the above code into a simple for await
loop?
如何将上面的代码变成一个简单的forawait
循环?
for await (const record of asyncIterable) {
console.log(record);
}
The task is essentially to turn that first callback code into an Async Iterable, so that the Async Iterable can be iterated via a for await
loop.
任务本质上是将第一个回调代码转换为Async Iterable,以便Async Iterable可以通过forawait
循环迭代。
Why might we want to do this? Well beyond the for await
loop just looking a hell of a lot cleaner, if we ever needed to programmatically iterate through multiple streams simultaneously (eg. comparing multiple files line by line), then that would be practically impossible to do via the callback option.
我们为什么要这么做?``如果我们需要以编程方式同时遍历多个流(例如,逐行比较多个文件),那么通过回调选项几乎不可能做到这一点。
So how do we create an Async Iterable object from the callback code above? 那么我们如何从上面的回调代码创建一个Async Iterable对象呢?
We can wrap the callback code inside an async generator function. 我们可以将回调代码包装在一个blog生成器函数中。
Turns out this is fairly complicated (at least for my small brain). This is probably the hardest Node.js specific problem I recall working on - the equivalent of a Node.js Leetcode hard. 事实证明,这是相当复杂的(至少对我的小脑袋)。这可能是我所记得的最难的Node.js特定问题-相当于Node.js Leetcode困难。
Try to do it yourself if you’d like. Otherwise, here’s what I ended up with. 如果你愿意的话,你可以试着自己做。否则,这就是我的结局。
Solution 溶液
async function* createCsvParseStream(parser) {
let results: any[] = [];
let done = false;
let resolve: (value?: any) => void;
let reject: (value?: any) => void;
let promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
parser.on('readable', () => {
let record;
while ((record = parser.read()) !== null) {
results.push(record);
resolve();
promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
}
});
parser.on('error', (err) => {
console.error(err.message);
done = true;
reject();
});
parser.on('end', () => {
done = true;
resolve();
});
while (!done) {
await promise;
yield* results;
results = [];
}
}