diff --git a/.eslintrc.json b/.eslintrc.json index 43cc6da..8761f2d 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -1,6 +1,10 @@ { "env": { - "node": true + "node": true, + "es6": true + }, + "parserOptions": { + "ecmaVersion": 2018 }, "extends": "eslint:recommended", "rules": { diff --git a/README.md b/README.md index 40dbeb3..02ea6dc 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,45 @@ feedparser.on('readable', function () { ``` +You can also consume feeds using async iteration. + +When using async iteration, prefer `stream.pipeline(...)` (or a promisified +`stream.pipeline`) so stream errors are handled before data starts flowing. Async iterator usage with `pipeline` requires Node v12+. +If you use `pipe()` or otherwise start writing to `FeedParser` before iteration +begins, attach an `error` handler on `feedparser` yourself. + +```js +var FeedParser = require('feedparser'); +var fetch = require('node-fetch'); +// stream/promises requires Node v15+ but the same behavior can be +// attained by promisifying require('stream').pipeline +var pipeline = require('stream/promises').pipeline; + +async function main() { + var res = await fetch('http://someurl.site/rss.xml'); + if (res.status !== 200) throw new Error('Bad status code'); + + var feedparser = new FeedParser(options); + + try { + await pipeline( + res.body, + feedparser, + async function (feedparserIterable) { + for await (var item of feedparserIterable) { + console.log(item.title); + } + } + ) + } catch (err) { + console.error(err); + } +} + +main(); + +``` + You can also check out this nice [working implementation](https://github.com/scripting/feedRead) that demonstrates one way to handle all the hard and annoying stuff. :smiley: ### options diff --git a/index.d.ts b/index.d.ts index 6e581af..abad0ae 100644 --- a/index.d.ts +++ b/index.d.ts @@ -10,6 +10,7 @@ declare class FeedParser extends stream.Transform { options: FeedParser.Options; read(): FeedParser.Item | null; + [Symbol.asyncIterator](): AsyncGenerator; resumeSaxError(): void; on(event: 'meta', listener: (meta: FeedParser.Meta) => void): this; diff --git a/lib/feedparser/index.js b/lib/feedparser/index.js index 04c8195..40ef244 100644 --- a/lib/feedparser/index.js +++ b/lib/feedparser/index.js @@ -1239,4 +1239,47 @@ FeedParser.prototype._flush = function (done) { * @typedef {import('readable-stream').Transform & FeedParserState} FeedParserInstance */ +/** @this {FeedParserInstance} */ +FeedParser.prototype[Symbol.asyncIterator] = async function* () { + var resolve = null; + var error = null; + var ended = false; + + function onReadable() { + if (resolve) { resolve(); resolve = null; } + } + function onEnd() { + ended = true; + if (resolve) { resolve(); resolve = null; } + } + function onError(err) { + error = err; + if (resolve) { resolve(); resolve = null; } + } + + this.on('readable', onReadable); + this.on('end', onEnd); + this.on('error', onError); + + try { + while (true) { + var item; + while ((item = this.read()) !== null) { + yield item; + } + if (ended) break; + if (error) throw error; + await new Promise(function (r) { resolve = r; }); + if (error) throw error; + } + } finally { + this.removeListener('readable', onReadable); + this.removeListener('end', onEnd); + this.removeListener('error', onError); + if (!ended && !this.destroyed) { + this.destroy(); + } + } +}; + exports = module.exports = FeedParser; diff --git a/test/async-iterator.js b/test/async-iterator.js new file mode 100644 index 0000000..ada8aea --- /dev/null +++ b/test/async-iterator.js @@ -0,0 +1,109 @@ +var PassThrough = require('stream').PassThrough; +// We're using this form so we can run tests on older Node versions that don't have stream.promises.pipeline +var pipeline = require('util').promisify(require('stream').pipeline); + +describe('async iterator usage', function () { + // These tests use .pipe() only to allow testing in older Node versions. + // In modern Node versions, you can use pipeline() with async iterators + // instead of .pipe(). If you use .pipe, you must add your own error handling + // to avoid uncaught exceptions on errors. + it('should work as an async iterator', async function () { + var feedparser = new FeedParser(); + var feed = __dirname + '/feeds/rss2sample.xml'; + var items = []; + + fs.createReadStream(feed).pipe(feedparser); + + for await (var item of feedparser) { + items.push(item); + } + + assert.equal(items.length, 4); + }); + + it('should surface errors via try/catch', async function () { + var feedparser = new FeedParser(); + var feed = __dirname + '/feeds/notafeed.html'; + fs.createReadStream(feed).pipe(feedparser); + + var caught = null; + try { + for await (var item of feedparser) {} // eslint-disable-line no-empty, no-unused-vars + } catch (err) { + caught = err; + } + + assert.ok(caught instanceof Error); + assert.equal(caught.message, 'Not a feed'); + }); + + it('should catch errors after a delayed iteration start', async function () { + if (process.release.lts < 'Gallium') { + this.skip(); // Older Node versions don't allow async iterators with pipeline, so we can't test this behavior. + } + var feedparser = new FeedParser(); + var source = new PassThrough(); + var items = []; + var caught = null; + var uncaught = null; + function onUncaught(err) { + uncaught = err; + } + process.prependOnceListener('uncaughtException', onUncaught); + + source.end('not a feed'); + + await new Promise(setImmediate); + + try { + await pipeline(source, feedparser, async function (fpIterable) { + for await (var item of fpIterable) { + items.push(item.title); + } + }); + } catch (err) { + caught = err; + } finally { + process.removeListener('uncaughtException', onUncaught); + assert.equal(uncaught, null); + assert.ok(caught instanceof Error); + assert.equal(caught.message, 'Not a feed'); + assert.equal(items.length, 0); + } + }); + + describe('resume_saxerror behavior', function () { + var feed = __dirname + '/feeds/saxerror.xml'; + + it('should continue iterating past SAX errors by default (resume_saxerror: true)', async function () { + var feedparser = new FeedParser({ strict: true }); + fs.createReadStream(feed).pipe(feedparser); + var items = []; + + for await (var item of feedparser) { + items.push(item.title); + } + + assert.equal(items.length, 3); + assert.deepEqual(items, ['Good Item', 'Bad Item', 'Item After Error']); + }); + + it('should throw on SAX errors when (resume_saxerror: false)', async function () { + var feedparser = new FeedParser({ strict: true, resume_saxerror: false }); + fs.createReadStream(feed).pipe(feedparser); + var items = []; + + var caught = null; + try { + for await (var item of feedparser) { + items.push(item.title); + } + } catch (err) { + caught = err; + } + + assert.ok(caught instanceof Error); + assert.equal(items.length, 0); + }); + }); +});