'use strict'; // eslint-disable-line
const HeadlessError = require('node-phantom-simple/headless_error');
const TimeoutError = require('callback-timeout/errors').TimeoutError;
const async = require('async');
const urijs = require('urijs');
const once = require('once');
const callbackTimeout = require('callback-timeout');
const NanoTimer = require('nanotimer');
const Chance = require('chance');
const JSONStream = require('JSONStream');
const createPhantomPool = require('./createPhantomPool.js');
const juration = require('juration');
const objectAssign = require('object-assign');
const immediateStopDecorator = require('./worker/immediateStopDecorator');
const step = {
acquireBrowser: require('./worker/steps/acquireBrowser.js'),
setPageSettings: require('./worker/steps/setPageSettings.js'),
createPage: require('./worker/steps/createPage.js'),
openPage: require('./worker/steps/openPage.js'),
findLinks: require('./worker/steps/findLinks.js'),
pageRunners: require('./worker/steps/pageRunners.js'),
};
const concurrencyKey = Symbol();
const urlKey = Symbol();
const finderKey = Symbol();
const timeoutKey = Symbol();
const runnerKey = Symbol();
const phantomParamsKey = Symbol();
const phantomPageSettingsKey = Symbol();
const followRedirectsKey = Symbol();
const browserCookiesKey = Symbol();
const triesKey = Symbol();
const redirectFilterKey = Symbol();
/**
* Transforms a {Map} to an {Object} hash.
*
* @private
* @param {Map} map The map to transform
* @return {Object} The transformed key/value hash object.
*/
function transformMapToObject(map) {
const result = {};
map.forEach((value, key) => {
result[key] = value;
});
return result;
}
/**
* Gets a finder definition of a {@link CrawlKit} instance.
*
* @private
* @param {!CrawlKit} crawlerInstance The {@link CrawlKit} instance.
* @return {Finder} the finder instance set via {@link CrawlKit#setFinder}.
*/
function getFinder(crawlerInstance) {
return crawlerInstance[finderKey].finder;
}
/**
* Gets finder parameters of a {@link CrawlKit} instance.
*
* @private
* @param {!CrawlKit} crawlerInstance The {@link CrawlKit} instance.
* @return {Array} the finder parameters (if set)
*/
function getFinderParameters(crawlerInstance) {
return crawlerInstance[finderKey].parameters;
}
/**
* Gets the {@link Runner} instances set for a {@link CrawlKit} instance.
*
* @private
* @param {!CrawlKit} crawlerInstance The {@link CrawlKit} instance.
* @return {Map} a map of {@link Runner} instances.
*/
function getRunners(crawlerInstance) {
return crawlerInstance[runnerKey];
}
/**
* The protocol a URL without a protocol is written to.
*
* @private
* @type {String}
*/
const defaultAbsoluteTo = 'http://';
/**
* The CrawlKit base class. This is where the magic happens.
*/
class CrawlKit {
/**
* Create a CrawlKit instance
* @constructor
* @param {String} [url] The start URL. Sets the {@link CrawlKit#url}.
* @param {String} [name] The instance name of the crawler. Used for logging purposes.
* @return {CrawlKit} a new CrawlKit instance
*/
constructor(url, name) {
if (url) {
this.url = url;
}
if (name) {
this.name = name;
}
this[runnerKey] = new Map();
this[finderKey] = {};
this[browserCookiesKey] = [];
}
/**
* Getter/setter for overall timeout for one website processing (opening page, evaluating runners and finder functions).
* The timeout starts fresh for each website.
*
* Values under zero are set to zero.
*
* @type {!integer}
* @default 30000 (30 seconds)
*/
set timeout(num) {
this[timeoutKey] = parseInt(num, 10);
}
/**
* @ignore
*/
get timeout() {
return Math.max(0, this[timeoutKey] || 30000);
}
/**
* Getter/setter for the concurrency of the crawler.
* This controls the amount of PhantomJS instances that will be spawned
* and used to work on found websites. Adapt this to the power of your machine.
*
* Values under one are set to one.
*
* @type {!integer}
* @default 1 (No concurrency)
*/
set concurrency(num) {
this[concurrencyKey] = parseInt(num, 10);
}
/**
* @ignore
*/
get concurrency() {
return Math.max(1, this[concurrencyKey] || 1);
}
/**
* Getter/setter for the start URL of the crawler.
* This is the URL that will be used as an initial endpoint for the crawler.
* If the protocol is omitted (e.g. URL starts with //), the URL will be rewritten to http://
* @type {String}
*/
set url(str) {
this[urlKey] = str;
}
/**
* @ignore
*/
get url() {
return this[urlKey];
}
/**
* With this method a {@link Finder} instance can be set for the crawler.
* A finder is used for link discovery on a website. It is run directly after page load
* and is optional (e.g. if you want to only work on a single page).
*
* @param {!Finder} finder The finder instance to use for discovery.
* @param {...*} [runnableParams] These parameters are passed to the function returned by {@link Finder#getRunnable} at evaluation time.
*/
setFinder(finder /* parameters... */) {
if (!finder || typeof finder.getRunnable !== 'function') {
throw new Error('Not a valid finder instance');
}
this[finderKey].finder = finder;
this[finderKey].parameters = Array.prototype.slice.call(arguments, 1);
}
/**
* Getter/setter for the number of tries when a PhantomJS instance crashes on a page
* or {@link CrawlKit#timeout} is hit.
* When a PhantomJS instance crashes whilst crawling a webpage, this instance is shutdown
* and replaced by a new one. By default the webpage that failed in such a way will be
* re-queued.
* If the finders and runners did not respond within the defined timeout,
* it will be tried to run them again as well.
* This member controls how often that re-queueing happens.
*
* Values under zero are set to zero.
*
* @type {!integer}
* @default 3 (read: try two more times after the first failure, three times in total)
*/
set tries(n) {
this[triesKey] = parseInt(n, 10);
}
/**
* @ignore
*/
get tries() {
return Math.max(0, this[triesKey] || 3);
}
/**
* Allows you to add a runner that is executed on each crawled page.
* The returned value of the runner is added to the overall result.
* Runners run sequentially on each webpage in the order they were added.
* If a runner is crashing PhantomJS more than {@link CrawlKit#tries} times, subsequent {@link Runner}s are not executed.
*
* @see For an example see `examples/simple.js`. For an example using parameters, see `examples/advanced.js`.
* @param {!String} key The runner identificator. This is also used in the result stream/object.
* @param {!Runner} runner The runner instance to use for discovery.
* @param {...*} [runnableParams] These parameters are passed to the function returned by {@link Runner#getRunnable} at evaluation time.
*/
addRunner(key, runner /* args ... */) {
if (!key) {
throw new Error('Not a valid runner key');
}
if (!runner || typeof runner.getCompanionFiles !== 'function' || typeof runner.getRunnable !== 'function') {
throw new Error('Not a valid runner instance');
}
const parameters = Array.prototype.slice.call(arguments, 2);
this[runnerKey].set(key, {
runner,
parameters,
});
}
/**
* Getter/setter for the map of parameters to pass to PhantomJS.
* You can use this for example to ignore SSL errors.
* For a list of parameters, please refer to the [PhantomJS documentation]{@link http://phantomjs.org/api/command-line.html}.
*
* @type {!Object.<String,String>}
*/
set phantomParameters(params) {
this[phantomParamsKey] = params;
}
/**
* @ignore
*/
get phantomParameters() {
return this[phantomParamsKey] || {};
}
/**
* Getter/setter for the map of settings to pass to an opened page.
* You can use this for example for Basic Authentication.
* For a list of options, please refer to the [PhantomJS documentation]{@link http://phantomjs.org/api/webpage/property/settings.html}.
* Nested settings can just be provided in dot notation as the key, e.g. 'settings.userAgent'.
*
* @type {!Object.<String,*>}
*/
set phantomPageSettings(settings) {
this[phantomPageSettingsKey] = settings;
}
/**
* @ignore
*/
get phantomPageSettings() {
return this[phantomPageSettingsKey] || {};
}
/**
* Getter/setter for whether to follow redirects or not.
* When following redirects, the original page is not processed.
*
* @type {!boolean}
* @default false
*/
set followRedirects(value) {
this[followRedirectsKey] = !!value;
}
/**
* @ignore
*/
get followRedirects() {
return this[followRedirectsKey] || false;
}
/**
* Getter/setter for the cookies to set within PhantomJS.
* Each entry is supposed to be an object [following the PhantomJS spec]{@link http://phantomjs.org/api/webpage/method/add-cookie.html}.
*
* @type {!Array.<Object>}
*/
set browserCookies(cookies) {
if (!(cookies instanceof Array)) {
throw new Error('Not properly munchable');
}
this[browserCookiesKey] = cookies;
}
/**
* @ignore
*/
get browserCookies() {
return this[browserCookiesKey];
}
/**
* Getter/setter for the filter that is applied to redirected URLs.
* With this filter you can prevent the redirect or rewrite it.
* The filter callback gets two arguments. The first one is the target URL
* the scond one the source URL.
* Return false for preventing the redirect. Return a String (URL) to follow the redirect.
*
* @type {Function}
*/
set redirectFilter(filter) {
if (typeof filter !== 'function') {
throw new Error('Filter must be valid function');
}
this[redirectFilterKey] = filter;
}
/**
* @ignore
*/
get redirectFilter() {
return this[redirectFilterKey] || ((targetUrl) => targetUrl);
}
/**
* This method starts the crawling/scraping process.
*
* @param {boolean} [shouldStream=false] Whether to stream the results or use a Promise
* @return {(Stream|Promise.<Object>)} By default a Promise object is returned that resolves to the result. If streaming is enabled it returns a JSON stream of the results.
*/
crawl(shouldStream) {
const stream = shouldStream ? JSONStream.stringifyObject() : null;
const prefix = 'crawlkit' + (this.name ? `:${this.name}` : '');
const logger = require('./logger')(prefix);
logger.info(`Starting to crawl. Concurrent PhantomJS browsers: ${this.concurrency}.`);
const pool = createPhantomPool(logger, this.concurrency, this.phantomParameters, this.browserCookies, prefix);
const promise = new Promise((resolve) => {
if (!this.url) {
throw new Error(`Defined url '${this.url}' is not valid.`);
}
const seen = new Map();
new NanoTimer().time((stopCrawlTimer) => {
let addUrl;
const q = async.queue((scope, queueItemFinished) => {
scope.tries++;
const workerLogPrefix = `${prefix}:task(${scope.id})`;
const workerLogger = require('./logger')(workerLogPrefix);
logger.info(`Worker started - ${q.length()} task(s) left in the queue.`);
workerLogger.info(`Took ${scope.url} from queue` + (scope.tries > 1 ? ` (attempt ${scope.tries})` : '') + '.');
new NanoTimer().time((stopWorkerTimer) => {
const workerFinished = callbackTimeout(once((err) => {
scope.stop = true;
if (err) {
workerLogger.error(err);
scope.result.error = err;
}
if (scope.page) {
workerLogger.debug(`Attempting to close page.`);
scope.page.close();
workerLogger.debug(`Page closed.`);
}
if (scope.browser) {
if (err instanceof HeadlessError) {
// take no chances - if there was an error on Phantom side, we should get rid of the instance
workerLogger.info(`Notifying pool to destroy Phantom instance.`);
pool.destroy(scope.browser);
workerLogger.debug(`Phantom instance destroyed.`);
} else {
workerLogger.debug(`Attempting to release Phantom instance.`);
pool.release(scope.browser);
workerLogger.debug(`Phantom instance released to pool.`);
}
scope.browser = null;
}
stopWorkerTimer();
if (err instanceof HeadlessError || err instanceof TimeoutError) {
if (scope.tries < this.tries) {
logger.info(`Retrying ${scope.url} - adding back to queue.`);
const clone = objectAssign({}, scope);
delete clone.result.error;
delete clone.stop;
q.push(clone);
scope.stop = true;
return queueItemFinished();
}
logger.info(`Tried to crawl ${scope.url} ${scope.tries} times. Giving up.`);
}
if (shouldStream) {
stream.write([scope.url, scope.result]);
}
queueItemFinished(err);
}), this.timeout, `Worker timed out after ${this.timeout}ms.`);
async.waterfall([
immediateStopDecorator(scope, step.acquireBrowser(scope, workerLogger, pool)),
immediateStopDecorator(scope, step.createPage(scope, workerLogger)),
immediateStopDecorator(scope, step.setPageSettings(scope, workerLogger, this.phantomPageSettings, this.followRedirects)),
immediateStopDecorator(scope, step.openPage(scope, workerLogger, addUrl, this.followRedirects, this.redirectFilter)),
immediateStopDecorator(scope, step.findLinks(scope, workerLogger, getFinder(this), getFinderParameters(this), addUrl)),
immediateStopDecorator(scope, step.pageRunners(scope, workerLogger, getRunners(this), workerLogPrefix)),
], workerFinished);
}, '', 's', (workerRuntime) => {
workerLogger.info(`Finished. Took ${juration.stringify(workerRuntime) || 'less than a second'}.`);
});
}, this.concurrency);
q.drain = () => {
logger.debug('Queue empty. Stopping crawler timer.');
stopCrawlTimer();
setImmediate(() => {
logger.debug('Draining pool.');
pool.drain(() => pool.destroyAllNow());
});
logger.debug('Finishing up.');
if (shouldStream) {
stream.end();
resolve();
} else {
const result = {
results: transformMapToObject(seen),
};
resolve(result);
}
};
addUrl = (u) => {
let url = urijs(u);
url = url.absoluteTo(defaultAbsoluteTo);
url.normalize();
url = url.toString();
if (!seen.has(url)) {
logger.info(`Adding ${url}`);
const result = {};
// don't keep result in memory if we stream
seen.set(url, shouldStream ? null : result);
q.push({
tries: 0,
stop: false,
url,
result,
id: new Chance().name(),
});
} else {
logger.debug(`Skipping ${url} - already seen.`);
}
};
addUrl(this.url);
}, '', 's', (time) => {
logger.info(`Finished. Processed ${seen.size} discovered URLs. Took ${juration.stringify(time) || 'less than a second'}.`);
});
});
if (shouldStream) {
promise.catch((err) => {
logger.error(err);
throw err;
});
return stream;
}
return promise;
}
}
module.exports = CrawlKit;