我使用 nohup 运行脚本,其中包含数百到数千个 Nodejs 命令的列表。这些nodejs子进程将数据从mysql和salesforce同步到couchdb。
$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh
剧本:
#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....
在终端中,我观察到子进程停止运行:
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6333 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6333 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ kill 6333
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6423 ? 00:00:00 mf-sync
30097 ? 00:00:00 mf-sync.staging
[rgoya@host ~]$ ps -e | grep sync
26 ? 00:00:00 async/mgr
30 ? 00:03:34 sync_supers
6449 ? 00:00:01 mf-sync
30097 ? 00:00:00 mf-sync.staging
笔记:30097是进程的pid nohup
。
检查终止子进程之前和之后的日志,我看到下一个nodejs
命令是按顺序执行的。我尝试使用--debug
详细输出标志运行它们,但我没有看到任何异常。
补充笔记
- Nodejs 的内存限制为 1GB。
- Couchdb 默认最大连接数为 2048。
的内容
mf-sync.js
。#!/usr/bin/env node process.title = 'mf-sync'; var path = require('path') , fs = require('fs') , _ = require('underscore'); // Parse command-line arguments var args = _.chain(process.argv).rest(2).map(function(arg) { arg = arg.replace('--', '').split('='); _.size(arg) === 1 && arg.push(true); return arg; }).object().value(); if (!args.mfi) throw new Error('MFI ID not specified'); if (!args.source) throw new Error('Source ID not specified'); // Output when using `--debug` flag var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); }; // Simulation mode var simulate = _.has(args, 'simulate'); require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started'); simulate && console.warn('Simulation mode enabled. No changes will occurr.'); debug(args); // Load MySQL configuration var my = require('mysql'); var myConfig = require(path.join(__dirname, 'mysql.json')); var db = 'gold'; if (args.source == '101027') db = 'mfdb'; var mysql = my.createConnection(myConfig[db]); debug('MySQL', myConfig[db].database); // Load Salesforce configuration var sf = require('node-salesforce'); var sfConfig = require(path.join(__dirname, 'salesforce.json')); var salesforce = new sf.Connection(sfConfig); debug('Salesforce', sfConfig.username); // Load CouchDB configuration var cradle = require('cradle'); var couchConfig = require(path.join(__dirname, 'couchdb.json')); var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name); debug('CouchDB', couchConfig.mfdb.name); // Add missing function to Underscore.js _.mixin({ compactObject: function(obj) { _.each(obj, function(v, k) { if (_.isNull(v) || _.isFunction(v)) delete obj[k]; }); return obj; } }); // Get MFI data from MySQL // ----------------------- var getMySQLData = function(mfi, callback) { mysql.connect(); // Get master MFI metadata debug('Getting master MFI metadata from `mfi`.'); mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) { if (err) throw new Error(err); _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value()); }); // Define MFDB data tables var tables = { 'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'], 'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'], 'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'], 'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance'] }; // Remove table name variance var baseTable = _.memoize(function(table) { return table.replace('_usd', '').replace('_adjusted', ''); }); var docs = {}; // Get all available MFDB data for the current `mfi_vid` debug('Getting all available MFDB data for the current `mfi_vid`.'); _.each(_.keys(tables), function(key) { _.each(tables[key], function(table) { debug('Querying', key, 'data from', table); mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) { if (err) throw new Error(err); // Create full document data _.each(rows, function(row) { // Create doc._id var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/'); debug('Processing', table, 'data for', doc_id); // Initialize document if (!docs[doc_id]) docs[doc_id] = { _id: doc_id, type: 'mfi-period', currency: key.split('/')[0], adjusted: key.split('/')[1] === 'true', fiscal_year: row.fiscal_year, period_type: row.period_type, as_of_date: row.as_of_date }; if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code; // Extend MFDB data into document debug('Adding', table, 'data to', doc_id); row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value(); if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row; }); }); }); }); // Get all scenario data to create dimension hierarchy var tree = {}; mysql.query("SELECT * FROM scenarios", function(err, rows) { debug('Processing scenario data into hierarchical tree.'); if (err) throw new Error(err); // Get all children scenarios for any given parent var getChildren = function(parent) { var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) { // Remove used scenarios from master list to decrease stack size _.each(_.keys(scenarios), function(scenario) { rows = _.without(rows, _.findWhere(rows, {scenarios: scenario})); }); }).value(); if (_.isEmpty(children)) return null; return children; } // Recursively get dimension hierarchy var getTree = function(hierarchy) { if (_.isEmpty(hierarchy)) return; _.each(_.keys(hierarchy), function(p) { hierarchy[p] = getChildren(p); if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]); }); } tree = getChildren(''); getTree(tree); }); // Find path to nested object property var findPath = _.memoize(function(needles, haystack) { function constructPath(haystack, needle, path) { if (!_.isObject(haystack)) return false; if (typeof haystack !== 'object') return false; for (var key in haystack) { var value = haystack[key]; var currentPath = _.extend([], path); currentPath.push(key); if (key === needle) return currentPath; var foundPath = constructPath(value, needle, currentPath); if (foundPath) return foundPath; } } // Handle comma-separated nested hierarchies return _.chain(needles.split(',')).map(function(needle) { return constructPath(haystack, needle, []); }).flatten().compact().value(); }); // Assign value inside a nested object property var deepAssign = function(obj, path, val) { for (var i = 0 in path) { var key = path[i]; if (i == path.length - 1) { if (typeof obj[key] === 'object') obj[key].value = val; else obj[key] = val; } else if (typeof obj[key] !== 'object') { obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]}; } obj = obj[key]; } } // Sanitize dimension names var sanitizeDimensions = _.memoize(function(dimensions) { return _.map(dimensions, function(dimension) { dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, ''); if (/:/.test(dimension)) return dimension.split(':')[1]; else return dimension; }); }); // Get dimension data for all available documents _.each(['usd', 'local'], function(currency) { var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions'; debug('Querying', currency, 'data from', dimensions_table); mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) { debug('Processing all data from', dimensions_table); if (err) throw new Error(err); _.each(rows, function(row) { var dimension_path = findPath(row.scenarios, tree); if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios); _.each(['true', 'false'], function(adjusted) { var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/'); var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path)); docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value)); }); }); }); }); mysql.end(function(err) { debug('Disconnected from MySQL', db); if (err) throw new Error(err); callback(mfi, docs); }); } // Get MFI metadata from Salesforce // -------------------------------- var getSalesforceData = function(mfi, docs, callback) { var remaining = 4; var done = function(mfi, docs) { if (--remaining === 0) { callback(mfi, docs); // Logout from Salesforce salesforce.logout(function(err) { debug('Logged out from Salesforce'); if (err) throw new Error(err); }); } } // Login into Salesforce debug('Login into Salesforce'); salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) { if (err) throw new Error(err); // Get main MFI Metadata debug('Getting MFI metadata from Salesforce'); salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) { if (err) throw new Error(err); if (result.totalSize === 0) throw new Error('MFI does not exist'); var record = {}; _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) { // Make attributes lowercase record[k.toLowerCase()] = v; }); _.extend(mfi, record); mfi.mfi_name = mfi.name; done(mfi, docs); }); // Determine whether MFI contains Social Performance Profile data debug('Determining whether MFI contains SP Profile data.'); salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) { if (err) throw new Error(err); mfi.sp_profile = !_.isEmpty(result.records); done(mfi, docs); }); // Get list of MFI Network Affiliations debug('Getting list of MFI Network Affiliations'); salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) { if (err) throw new Error(err); mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value(); done(mfi, docs); }); // Get annual diamonds debug('Getting annual diamonds.'); salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) { if (err) throw new Error(err); // Group diamonds by year var diamonds = _.chain(result.records).map(function(period) { return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value(); }).object().value(); // Add diamonds to corresponding periods _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) { doc.annual_diamonds = diamonds[doc.fiscal_year]; }); done(mfi, docs); }); }); } // Calculate Peer Group data // ------------------------- var calculatePeerGroupData = function(docs, callback) { // Safely get data point value var getVal = function(obj, group, prop) { if (_.has(obj, group) && _.has(obj[group], prop)) { return obj[group][prop].value || obj[group][prop]; } return undefined; } _.each(docs, function(doc, id) { var peer_groups = {}; // Age debug('Calculating peer group age for', doc._id); if (_.has(doc, 'date_established__c')) { var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199); if (age) { if (age < 4) peer_groups['age'] = 'New'; else if (age <= 8) peer_groups['age'] = 'Young'; else if (age > 8) peer_groups['age'] = 'Mature'; } } // Intermediation debug('Calculating peer group intermediation for', doc._id); var deposits = getVal(doc, 'balance_sheet', 'deposits'); var total_assets = getVal(doc, 'balance_sheet', 'total_assets'); if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) { var ratio = deposits / total_assets; if (ratio === 0) peer_groups['intermediation'] = 'Non FI'; else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI'; else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI'; } else if (total_assets === 0) { peer_groups['intermediation'] = 'Non FI'; } // Market debug('Calculating peer group market for', doc._id); var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita'); var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance'); if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) { if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End'; else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad'; else if ((depth >= 1.5) && (depth < 2.5)) peer_groups['market'] = 'High End'; else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business'; } // Outreach debug('Calculating peer group outreach for', doc._id); var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers'); if (total_borrowers < 10000) peer_groups['outreach'] = 'Small'; else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium'; else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large'; // Scale debug('Calculating peer group scale for', doc._id); if (_.has(doc, 'mix_region__c')) { var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio'); if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small'; else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium'; else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large'; } // Sustainability debug('Calculating peer group sustainability for', doc._id); var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency'); if (!_.isUndefined(operational_self_sufficiency)) { if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS'; else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS'; } if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups; }); callback(docs); } // Send data to CouchDB // -------------------- var updateCouchDB = function(docs, callback) { // Update master MFI record debug('Updating master MFI record'); var mfi = docs.shift(); couch.get(mfi._id, function(err, doc) { if (err) { if (err.error === 'not_found') { require('util').log('Inserting ' + mfi._id); !simulate && couch.save(mfi._id, mfi, function(err, res) { debug('Inserted', res); if (err) throw new Error(err); }); } else throw new Error(err); } else if (doc._rev) { require('util').log('Updating ' + mfi._id); !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) { debug('Updated', res); if (err) throw new Error(err); }); } }); // Get list of existing IDs in CouchDB debug('Getting list of existing IDs in CouchDB'); couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) { if (err) throw new Error(err); // Remove outdated documents from CouchDB _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) { return _.findWhere(ids, {id: id}); }).each(function(doc) { require('util').log('Removing ' + doc.id); couch.remove(doc.id, doc.value.rev, function(err, res) { debug('Removed', res); if (err) throw new Error(err); }); }); // Insert/update all documents for this MFI _.each(docs, function(doc) { var update = _.findWhere(ids, {id: doc._id}); if (update) { require('util').log('Updating ' + doc._id); !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) { debug('Updated', res); if (err) throw new Error(err); }); } else { require('util').log('Inserting ' + doc._id); !simulate && couch.save(doc._id, doc, function(err, res) { debug('Inserted', res); if (err) throw new Error(err); }); } }); callback(); }); } // Initialize MFI document var mfi = { _id: 'mfi/' + args.source + '/' + args.mfi, type: 'mfi', source_id: args.source, mfi_id: args.mfi, updated: new Date() }; getMySQLData(mfi, function(mfi, docs) { getSalesforceData(mfi, docs, function(mfi, docs) { // Merge MFI metadata into each period _.each(docs, function(doc, id) { docs[id] = _.extend(_.clone(mfi), doc); }); calculatePeerGroupData(docs, function(docs) { // Convert to array for bulk updating docs = _.union([mfi], _.values(docs)); updateCouchDB(docs, function() { require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished'); }); }); }); });
问题
我想知道:
- 为什么这些子进程会出现冻结? (我找不到任何证据表明冻结的那些与执行和停止的有任何不同)。
- 我如何能够编写脚本来停止冻结几分钟的子进程,这样我就不必手动终止它?
答案1
必须猜测您正在耗尽某种资源(尽管您可能已经在猜测)。也许是最大打开文件数、mysql 或 salesforce。不知道。
但是您可能能够解决此问题的一种方法是,将 mf-sync 放入模块中,并使用控制 Node 脚本,该脚本使用队列来运行此 mf-sync 事物,而不是使用包含大量 mf-sync 事物的 bash 脚本有点像队列中的受控批次。尝试类似的东西https://github.com/learnboost/kue
你这样做的方式似乎有点疯狂。但如果只是文件不足,也许您可以通过增加限制来摆脱困境。 http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
或者,如果您确实想在 shell 中执行此操作,也许您可以使用batch
命令来处理队列。 http://pubs.opengroup.org/onlinepubs/009695399/utilities/batch.html
或者也许这样的东西会更好http://pebblesinthesand.wordpress.com/2008/05/22/a-srcipt-for-running-processes-in-parallel-in-bash/