Spaces:
Sleeping
Sleeping
; | |
const Readable = require('stream').Readable; | |
const EventEmitter = require('events').EventEmitter; | |
const path = require('path'); | |
const normalizeOptions = require('./normalize-options'); | |
const stat = require('./stat'); | |
const call = require('./call'); | |
/** | |
* Asynchronously reads the contents of a directory and streams the results | |
* via a {@link stream.Readable}. | |
*/ | |
class DirectoryReader { | |
/** | |
* @param {string} dir - The absolute or relative directory path to read | |
* @param {object} [options] - User-specified options, if any (see {@link normalizeOptions}) | |
* @param {object} internalOptions - Internal options that aren't part of the public API | |
* @class | |
*/ | |
constructor (dir, options, internalOptions) { | |
this.options = options = normalizeOptions(options, internalOptions); | |
// Indicates whether we should keep reading | |
// This is set false if stream.Readable.push() returns false. | |
this.shouldRead = true; | |
// The directories to read | |
// (initialized with the top-level directory) | |
this.queue = [{ | |
path: dir, | |
basePath: options.basePath, | |
posixBasePath: options.posixBasePath, | |
depth: 0 | |
}]; | |
// The number of directories that are currently being processed | |
this.pending = 0; | |
// The data that has been read, but not yet emitted | |
this.buffer = []; | |
this.stream = new Readable({ objectMode: true }); | |
this.stream._read = () => { | |
// Start (or resume) reading | |
this.shouldRead = true; | |
// If we have data in the buffer, then send the next chunk | |
if (this.buffer.length > 0) { | |
this.pushFromBuffer(); | |
} | |
// If we have directories queued, then start processing the next one | |
if (this.queue.length > 0) { | |
if (this.options.facade.sync) { | |
while (this.queue.length > 0) { | |
this.readNextDirectory(); | |
} | |
} | |
else { | |
this.readNextDirectory(); | |
} | |
} | |
this.checkForEOF(); | |
}; | |
} | |
/** | |
* Reads the next directory in the queue | |
*/ | |
readNextDirectory () { | |
let facade = this.options.facade; | |
let dir = this.queue.shift(); | |
this.pending++; | |
// Read the directory listing | |
call.safe(facade.fs.readdir, dir.path, (err, items) => { | |
if (err) { | |
// fs.readdir threw an error | |
this.emit('error', err); | |
return this.finishedReadingDirectory(); | |
} | |
try { | |
// Process each item in the directory (simultaneously, if async) | |
facade.forEach( | |
items, | |
this.processItem.bind(this, dir), | |
this.finishedReadingDirectory.bind(this, dir) | |
); | |
} | |
catch (err2) { | |
// facade.forEach threw an error | |
// (probably because fs.readdir returned an invalid result) | |
this.emit('error', err2); | |
this.finishedReadingDirectory(); | |
} | |
}); | |
} | |
/** | |
* This method is called after all items in a directory have been processed. | |
* | |
* NOTE: This does not necessarily mean that the reader is finished, since there may still | |
* be other directories queued or pending. | |
*/ | |
finishedReadingDirectory () { | |
this.pending--; | |
if (this.shouldRead) { | |
// If we have directories queued, then start processing the next one | |
if (this.queue.length > 0 && this.options.facade.async) { | |
this.readNextDirectory(); | |
} | |
this.checkForEOF(); | |
} | |
} | |
/** | |
* Determines whether the reader has finished processing all items in all directories. | |
* If so, then the "end" event is fired (via {@Readable#push}) | |
*/ | |
checkForEOF () { | |
if (this.buffer.length === 0 && // The stuff we've already read | |
this.pending === 0 && // The stuff we're currently reading | |
this.queue.length === 0) { // The stuff we haven't read yet | |
// There's no more stuff! | |
this.stream.push(null); | |
} | |
} | |
/** | |
* Processes a single item in a directory. | |
* | |
* If the item is a directory, and `option.deep` is enabled, then the item will be added | |
* to the directory queue. | |
* | |
* If the item meets the filter criteria, then it will be emitted to the reader's stream. | |
* | |
* @param {object} dir - A directory object from the queue | |
* @param {string} item - The name of the item (name only, no path) | |
* @param {function} done - A callback function that is called after the item has been processed | |
*/ | |
processItem (dir, item, done) { | |
let stream = this.stream; | |
let options = this.options; | |
let itemPath = dir.basePath + item; | |
let posixPath = dir.posixBasePath + item; | |
let fullPath = path.join(dir.path, item); | |
// If `options.deep` is a number, and we've already recursed to the max depth, | |
// then there's no need to check fs.Stats to know if it's a directory. | |
// If `options.deep` is a function, then we'll need fs.Stats | |
let maxDepthReached = dir.depth >= options.recurseDepth; | |
// Do we need to call `fs.stat`? | |
let needStats = | |
!maxDepthReached || // we need the fs.Stats to know if it's a directory | |
options.stats || // the user wants fs.Stats objects returned | |
options.recurseFn || // we need fs.Stats for the recurse function | |
options.filterFn || // we need fs.Stats for the filter function | |
EventEmitter.listenerCount(stream, 'file') || // we need the fs.Stats to know if it's a file | |
EventEmitter.listenerCount(stream, 'directory') || // we need the fs.Stats to know if it's a directory | |
EventEmitter.listenerCount(stream, 'symlink'); // we need the fs.Stats to know if it's a symlink | |
// If we don't need stats, then exit early | |
if (!needStats) { | |
if (this.filter(itemPath, posixPath)) { | |
this.pushOrBuffer({ data: itemPath }); | |
} | |
return done(); | |
} | |
// Get the fs.Stats object for this path | |
stat(options.facade.fs, fullPath, (err, stats) => { | |
if (err) { | |
// fs.stat threw an error | |
this.emit('error', err); | |
return done(); | |
} | |
try { | |
// Add the item's path to the fs.Stats object | |
// The base of this path, and its separators are determined by the options | |
// (i.e. options.basePath and options.sep) | |
stats.path = itemPath; | |
// Add depth of the path to the fs.Stats object for use this in the filter function | |
stats.depth = dir.depth; | |
if (this.shouldRecurse(stats, posixPath, maxDepthReached)) { | |
// Add this subdirectory to the queue | |
this.queue.push({ | |
path: fullPath, | |
basePath: itemPath + options.sep, | |
posixBasePath: posixPath + '/', | |
depth: dir.depth + 1, | |
}); | |
} | |
// Determine whether this item matches the filter criteria | |
if (this.filter(stats, posixPath)) { | |
this.pushOrBuffer({ | |
data: options.stats ? stats : itemPath, | |
file: stats.isFile(), | |
directory: stats.isDirectory(), | |
symlink: stats.isSymbolicLink(), | |
}); | |
} | |
done(); | |
} | |
catch (err2) { | |
// An error occurred while processing the item | |
// (probably during a user-specified function, such as options.deep, options.filter, etc.) | |
this.emit('error', err2); | |
done(); | |
} | |
}); | |
} | |
/** | |
* Pushes the given chunk of data to the stream, or adds it to the buffer, | |
* depending on the state of the stream. | |
* | |
* @param {object} chunk | |
*/ | |
pushOrBuffer (chunk) { | |
// Add the chunk to the buffer | |
this.buffer.push(chunk); | |
// If we're still reading, then immediately emit the next chunk in the buffer | |
// (which may or may not be the chunk that we just added) | |
if (this.shouldRead) { | |
this.pushFromBuffer(); | |
} | |
} | |
/** | |
* Immediately pushes the next chunk in the buffer to the reader's stream. | |
* The "data" event will always be fired (via {@link Readable#push}). | |
* In addition, the "file", "directory", and/or "symlink" events may be fired, | |
* depending on the type of properties of the chunk. | |
*/ | |
pushFromBuffer () { | |
let stream = this.stream; | |
let chunk = this.buffer.shift(); | |
// Stream the data | |
try { | |
this.shouldRead = stream.push(chunk.data); | |
} | |
catch (err) { | |
this.emit('error', err); | |
} | |
// Also emit specific events, based on the type of chunk | |
chunk.file && this.emit('file', chunk.data); | |
chunk.symlink && this.emit('symlink', chunk.data); | |
chunk.directory && this.emit('directory', chunk.data); | |
} | |
/** | |
* Determines whether the given directory meets the user-specified recursion criteria. | |
* If the user didn't specify recursion criteria, then this function will default to true. | |
* | |
* @param {fs.Stats} stats - The directory's {@link fs.Stats} object | |
* @param {string} posixPath - The item's POSIX path (used for glob matching) | |
* @param {boolean} maxDepthReached - Whether we've already crawled the user-specified depth | |
* @returns {boolean} | |
*/ | |
shouldRecurse (stats, posixPath, maxDepthReached) { | |
let options = this.options; | |
if (maxDepthReached) { | |
// We've already crawled to the maximum depth. So no more recursion. | |
return false; | |
} | |
else if (!stats.isDirectory()) { | |
// It's not a directory. So don't try to crawl it. | |
return false; | |
} | |
else if (options.recurseGlob) { | |
// Glob patterns are always tested against the POSIX path, even on Windows | |
// https://github.com/isaacs/node-glob#windows | |
return options.recurseGlob.test(posixPath); | |
} | |
else if (options.recurseRegExp) { | |
// Regular expressions are tested against the normal path | |
// (based on the OS or options.sep) | |
return options.recurseRegExp.test(stats.path); | |
} | |
else if (options.recurseFn) { | |
try { | |
// Run the user-specified recursion criteria | |
return options.recurseFn.call(null, stats); | |
} | |
catch (err) { | |
// An error occurred in the user's code. | |
// In Sync and Async modes, this will return an error. | |
// In Streaming mode, we emit an "error" event, but continue processing | |
this.emit('error', err); | |
} | |
} | |
else { | |
// No recursion function was specified, and we're within the maximum depth. | |
// So crawl this directory. | |
return true; | |
} | |
} | |
/** | |
* Determines whether the given item meets the user-specified filter criteria. | |
* If the user didn't specify a filter, then this function will always return true. | |
* | |
* @param {string|fs.Stats} value - Either the item's path, or the item's {@link fs.Stats} object | |
* @param {string} posixPath - The item's POSIX path (used for glob matching) | |
* @returns {boolean} | |
*/ | |
filter (value, posixPath) { | |
let options = this.options; | |
if (options.filterGlob) { | |
// Glob patterns are always tested against the POSIX path, even on Windows | |
// https://github.com/isaacs/node-glob#windows | |
return options.filterGlob.test(posixPath); | |
} | |
else if (options.filterRegExp) { | |
// Regular expressions are tested against the normal path | |
// (based on the OS or options.sep) | |
return options.filterRegExp.test(value.path || value); | |
} | |
else if (options.filterFn) { | |
try { | |
// Run the user-specified filter function | |
return options.filterFn.call(null, value); | |
} | |
catch (err) { | |
// An error occurred in the user's code. | |
// In Sync and Async modes, this will return an error. | |
// In Streaming mode, we emit an "error" event, but continue processing | |
this.emit('error', err); | |
} | |
} | |
else { | |
// No filter was specified, so match everything | |
return true; | |
} | |
} | |
/** | |
* Emits an event. If one of the event listeners throws an error, | |
* then an "error" event is emitted. | |
* | |
* @param {string} eventName | |
* @param {*} data | |
*/ | |
emit (eventName, data) { | |
let stream = this.stream; | |
try { | |
stream.emit(eventName, data); | |
} | |
catch (err) { | |
if (eventName === 'error') { | |
// Don't recursively emit "error" events. | |
// If the first one fails, then just throw | |
throw err; | |
} | |
else { | |
stream.emit('error', err); | |
} | |
} | |
} | |
} | |
module.exports = DirectoryReader; | |