nohup bash脚本中的nodejs子进程锁定

nohup bash脚本中的nodejs子进程锁定

我使用 nohup 运行脚本,其中包含数百到数千个 Nodejs 命令的列表。这些nodejs子进程将数据从mysql和salesforce同步到couchdb。

$ nohup ./mf-sync.staging-mfdb.sh 2>&1 > mf-sync.staging-mfdb.log &
$ mf-sync.staging-mfdb.sh

剧本:

#!/bin/bash
echo "Starting..."
echo "pid $$"
node /opt/node/mix-sync/mf-sync.js --mfi=100017 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100026 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100027 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100036 --source=101619
node /opt/node/mix-sync/mf-sync.js --mfi=100063 --source=100982
node /opt/node/mix-sync/mf-sync.js --mfi=100075 --source=101160
etc....

在终端中,我观察到子进程停止运行:

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6333 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ kill 6333

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6423 ?        00:00:00 mf-sync
30097 ?        00:00:00 mf-sync.staging

[rgoya@host ~]$ ps -e | grep sync
   26 ?        00:00:00 async/mgr
   30 ?        00:03:34 sync_supers
 6449 ?        00:00:01 mf-sync
30097 ?        00:00:00 mf-sync.staging

笔记:30097是进程的pid nohup

检查终止子进程之前和之后的日志,我看到下一个nodejs命令是按顺序执行的。我尝试使用--debug详细输出标志运行它们,但我没有看到任何异常。

补充笔记

  • Nodejs 的内存限制为 1GB。
  • Couchdb 默认最大连接数为 2048。
  • 的内容mf-sync.js

    #!/usr/bin/env node
    process.title = 'mf-sync';
    
    var path = require('path')
    ,   fs = require('fs')
    ,   _ = require('underscore');
    
    // Parse command-line arguments
    var args = _.chain(process.argv).rest(2).map(function(arg) {
        arg = arg.replace('--', '').split('=');
        _.size(arg) === 1 && arg.push(true);
        return arg;
    }).object().value();
    
    if (!args.mfi) throw new Error('MFI ID not specified');
    if (!args.source) throw new Error('Source ID not specified');
    
    // Output when using `--debug` flag
    var debug = function() { if (_.has(args, 'debug')) console.info.apply(this, arguments); };
    
    // Simulation mode
    var simulate = _.has(args, 'simulate');
    
    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' started');
    simulate && console.warn('Simulation mode enabled. No changes will occurr.');
    debug(args);
    
    // Load MySQL configuration
    var my = require('mysql');
    var myConfig = require(path.join(__dirname, 'mysql.json'));
    var db = 'gold';
    if (args.source == '101027') db = 'mfdb';
    var mysql = my.createConnection(myConfig[db]);
    debug('MySQL', myConfig[db].database);
    
    // Load Salesforce configuration
    var sf = require('node-salesforce');
    var sfConfig = require(path.join(__dirname, 'salesforce.json'));
    var salesforce = new sf.Connection(sfConfig);
    debug('Salesforce', sfConfig.username);
    
    // Load CouchDB configuration
    var cradle = require('cradle');
    var couchConfig = require(path.join(__dirname, 'couchdb.json'));
    var couch = new(cradle.Connection)(couchConfig.mfdb.host, couchConfig.mfdb.port, couchConfig.mfdb.options).database(couchConfig.mfdb.name);
    debug('CouchDB', couchConfig.mfdb.name);
    
    // Add missing function to Underscore.js
    _.mixin({
        compactObject: function(obj) {
            _.each(obj, function(v, k) {
                if (_.isNull(v) || _.isFunction(v)) delete obj[k];
            });
            return obj;
        }
    });
    
    // Get MFI data from MySQL
    // -----------------------
    var getMySQLData = function(mfi, callback) {
        mysql.connect();
    
        // Get master MFI metadata
        debug('Getting master MFI metadata from `mfi`.');
        mysql.query("SELECT * FROM mfi WHERE source_id = ? AND mfi_id = ?", [mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
            if (err) throw new Error(err);
            _.defaults(mfi, _.chain(rows).first().omit(['parse', '_typeCast']).value());
        });
    
        // Define MFDB data tables
        var tables = {
            'usd/false': ['balance_sheet_usd', 'calculation_usd', 'income_statement_usd', 'infrastructure', 'portfolio_report_usd', 'products_and_clients', 'social_performance'],
            'usd/true': ['balance_sheet_adjusted_usd', 'calculation_adjusted_usd', 'income_statement_adjusted_usd', 'infrastructure_adjusted', 'portfolio_report_adjusted_usd', 'products_and_clients_adjusted', 'social_performance'],
            'local/false': ['balance_sheet', 'calculation', 'income_statement', 'infrastructure', 'portfolio_report', 'products_and_clients', 'social_performance'],
            'local/true': ['balance_sheet_adjusted', 'calculation_adjusted', 'income_statement_adjusted', 'infrastructure_adjusted', 'portfolio_report_adjusted', 'products_and_clients_adjusted', 'social_performance']
        };
        // Remove table name variance
        var baseTable = _.memoize(function(table) {
            return table.replace('_usd', '').replace('_adjusted', '');
        });
    
        var docs = {};
        // Get all available MFDB data for the current `mfi_vid`
        debug('Getting all available MFDB data for the current `mfi_vid`.');
        _.each(_.keys(tables), function(key) {
            _.each(tables[key], function(table) {
                debug('Querying', key, 'data from', table);
                mysql.query("SELECT t.* FROM ?? t INNER JOIN mfi ON t.source_id = mfi.source_id AND t.mfi_id = mfi.mfi_id AND t.mfi_vid = mfi.mfi_vid WHERE t.source_id = ? AND t.mfi_id = ? ORDER BY t.fiscal_year ASC, t.period_type DESC, t.as_of_date ASC", [table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                    if (err) throw new Error(err);
    
                    // Create full document data
                    _.each(rows, function(row) {
                        // Create doc._id
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, key, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        debug('Processing', table, 'data for', doc_id);
    
                        // Initialize document
                        if (!docs[doc_id]) docs[doc_id] = {
                            _id: doc_id,
                            type: 'mfi-period',
                            currency: key.split('/')[0],
                            adjusted: key.split('/')[1] === 'true',
                            fiscal_year: row.fiscal_year,
                            period_type: row.period_type,
                            as_of_date: row.as_of_date
                        };
                        if (!docs[doc_id].currency_code && row.currency_code) docs[doc_id].currency_code = row.currency_code;
    
                        // Extend MFDB data into document
                        debug('Adding', table, 'data to', doc_id);
                        row = _.chain(row).omit(['mfi_id', 'mfi_vid', 'source_id', 'period_type', 'as_of_date', 'fiscal_year', 'currency_code', 'currency_unit']).compactObject().value();
                        if (!_.isEmpty(row)) docs[doc_id][baseTable(table)] = row;
                    });
                });
            });
        });
    
        // Get all scenario data to create dimension hierarchy
        var tree = {};
        mysql.query("SELECT * FROM scenarios", function(err, rows) {
            debug('Processing scenario data into hierarchical tree.');
            if (err) throw new Error(err);
    
            // Get all children scenarios for any given parent
            var getChildren = function(parent) {
                var children = _.chain(rows).where({parent: parent}).sortBy('weight').pluck('scenarios').object({}).tap(function(scenarios) {
                    // Remove used scenarios from master list to decrease stack size
                    _.each(_.keys(scenarios), function(scenario) {
                        rows = _.without(rows, _.findWhere(rows, {scenarios: scenario}));
                    });
                }).value();
                if (_.isEmpty(children)) return null;
                return children;
            }
    
            // Recursively get dimension hierarchy
            var getTree = function(hierarchy) {
                if (_.isEmpty(hierarchy)) return;
                _.each(_.keys(hierarchy), function(p) {
                    hierarchy[p] = getChildren(p);
                    if (!_.isEmpty(hierarchy[p])) getTree(hierarchy[p]);
                });
            }
    
            tree = getChildren('');
            getTree(tree);
        });
    
        // Find path to nested object property
        var findPath = _.memoize(function(needles, haystack) {
            function constructPath(haystack, needle, path) {
                if (!_.isObject(haystack)) return false;
                if (typeof haystack !== 'object') return false;
                for (var key in haystack) {
                    var value = haystack[key];
                    var currentPath = _.extend([], path);
                    currentPath.push(key);
                    if (key === needle) return currentPath;
                    var foundPath = constructPath(value, needle, currentPath);
                    if (foundPath) return foundPath;
                }
            }
            // Handle comma-separated nested hierarchies
            return _.chain(needles.split(',')).map(function(needle) {
                return constructPath(haystack, needle, []);
            }).flatten().compact().value();
        });
        // Assign value inside a nested object property
        var deepAssign = function(obj, path, val) {
            for (var i = 0 in path) {
                var key = path[i];
                if (i == path.length - 1) {
                    if (typeof obj[key] === 'object') obj[key].value = val;
                    else obj[key] = val;
                } else if (typeof obj[key] !== 'object') {
                    obj[key] = _.isUndefined(obj[key]) ? {} : {value: obj[key]};
                }
                obj = obj[key];
            }
        }
        // Sanitize dimension names
        var sanitizeDimensions = _.memoize(function(dimensions) {
            return _.map(dimensions, function(dimension) {
                dimension = dimension.replace(/mix_/g, '').replace(/Dimension/g, '').replace(/Member/g, '');
                if (/:/.test(dimension)) return dimension.split(':')[1];
                else return dimension;
            });
        });
    
        // Get dimension data for all available documents
        _.each(['usd', 'local'], function(currency) {
            var dimensions_table = currency === 'usd' ? 'dimensions_usd' : 'dimensions';
            debug('Querying', currency, 'data from', dimensions_table);
            mysql.query("SELECT d.fiscal_year, d.period_type, d.as_of_date, d.scenarios, d.line_item_value, t.db_table, t.db_field FROM ?? d INNER JOIN mfi ON d.source_id = mfi.source_id AND d.mfi_id = mfi.mfi_id AND d.mfi_vid = mfi.mfi_vid LEFT JOIN Taxonomy t ON d.element_id = t.Elementid WHERE d.line_item_value IS NOT NULL AND t.db_table IS NOT NULL AND t.db_field IS NOT NULL AND d.source_id = ? AND d.mfi_id = ?", [dimensions_table, mfi.source_id, mfi.mfi_id], function(err, rows, fields) {
                debug('Processing all data from', dimensions_table);
                if (err) throw new Error(err);
                _.each(rows, function(row) {
                    var dimension_path = findPath(row.scenarios, tree);
                    if (_.isEmpty(dimension_path)) return console.warn('MISSING SCENARIO', row.scenarios);
                    _.each(['true', 'false'], function(adjusted) {
                        var doc_id = ['mfi-period', mfi.source_id, mfi.mfi_id, currency, adjusted, row.fiscal_year, row.period_type, row.as_of_date.toISOString().substr(0, 10)].join('/');
                        var path = sanitizeDimensions([row.db_table, row.db_field].concat(dimension_path));
                        docs[doc_id] && deepAssign(docs[doc_id], path, parseFloat(row.line_item_value));
                    });
                });
            });
        });
    
        mysql.end(function(err) {
            debug('Disconnected from MySQL', db);
            if (err) throw new Error(err);
            callback(mfi, docs);
        });
    }
    
    // Get MFI metadata from Salesforce
    // --------------------------------
    var getSalesforceData = function(mfi, docs, callback) {
        var remaining = 4;
        var done = function(mfi, docs) {
            if (--remaining === 0) {
                callback(mfi, docs);
    
                // Logout from Salesforce
                salesforce.logout(function(err) {
                    debug('Logged out from Salesforce');
                    if (err) throw new Error(err);
                });
            }
        }
    
        // Login into Salesforce
        debug('Login into Salesforce');
        salesforce.login(sfConfig.username, sfConfig.password + sfConfig.security_token, function(err, userInfo) {
            if (err) throw new Error(err);
    
            // Get main MFI Metadata
            debug('Getting MFI metadata from Salesforce');
            salesforce.query("SELECT Id, Name, Record_ID__c, mix_Diamonds__c, Date_Established__c, mix_Region__c, Country__c, Operations_Comprised_by_MF__c, Regulated__c, Current_Legal_Status__c, Profit_Status__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "'", function(err, result) {
                if (err) throw new Error(err);
                if (result.totalSize === 0) throw new Error('MFI does not exist');
                var record = {};
                _.chain(result.records).first().omit(['attributes', 'Id']).each(function(v, k) {
                    // Make attributes lowercase
                    record[k.toLowerCase()] = v;
                });
                _.extend(mfi, record);
                mfi.mfi_name = mfi.name;
                done(mfi, docs);
            });
    
            // Determine whether MFI contains Social Performance Profile data
            debug('Determining whether MFI contains SP Profile data.');
            salesforce.query("SELECT Id, Record_ID__c FROM Account WHERE Record_ID__c = '" + mfi.mfi_id + "' AND Id IN (SELECT Organization__c FROM SP_Profile__c)", function(err, result) {
                if (err) throw new Error(err);
                mfi.sp_profile = !_.isEmpty(result.records);
                done(mfi, docs);
            });
    
            // Get list of MFI Network Affiliations
            debug('Getting list of MFI Network Affiliations');
            salesforce.query("SELECT Source_Organization__r.Name FROM Partnership__c WHERE Relationship__c = 'Network Affiliation' AND Status__c = 'Current' AND Target_Organization__r.Id = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                mfi.networks = _.chain(result.records).pluck('Source_Organization__r').pluck('Name').value();
                done(mfi, docs);
            });
    
            // Get annual diamonds
            debug('Getting annual diamonds.');
            salesforce.query("SELECT Period__c, Diamond_Score__c FROM Data_Campaign_Status__c WHERE Organization__c = '" + mfi.organization_id + "'", function(err, result) {
                if (err) throw new Error(err);
                // Group diamonds by year
                var diamonds = _.chain(result.records).map(function(period) {
                    return _.chain(period).pick(['Period__c', 'Diamond_Score__c']).values().value();
                }).object().value();
                // Add diamonds to corresponding periods
                _.chain(docs).filter(function(doc) { return doc.period_type === 'ANN'; }).each(function(doc) {
                    doc.annual_diamonds = diamonds[doc.fiscal_year];
                });
                done(mfi, docs);
            });
        });
    }
    
    // Calculate Peer Group data
    // -------------------------
    var calculatePeerGroupData = function(docs, callback) {
        // Safely get data point value
        var getVal = function(obj, group, prop) {
            if (_.has(obj, group) && _.has(obj[group], prop)) {
                return obj[group][prop].value || obj[group][prop];
            }
            return undefined;
        }
    
        _.each(docs, function(doc, id) {
            var peer_groups = {};
    
            // Age
            debug('Calculating peer group age for', doc._id);
            if (_.has(doc, 'date_established__c')) {
                var age = Math.abs(Date.parse(doc.as_of_date) - Date.parse(doc.date_established__c)) / (86400000 * 365.242199);
                if (age) {
                    if (age < 4) peer_groups['age'] = 'New';
                    else if (age <= 8) peer_groups['age'] = 'Young';
                    else if (age > 8) peer_groups['age'] = 'Mature';
                }
            }
    
            // Intermediation
            debug('Calculating peer group intermediation for', doc._id);
            var deposits = getVal(doc, 'balance_sheet', 'deposits');
            var total_assets = getVal(doc, 'balance_sheet', 'total_assets');
            if (!_.isUndefined(deposits) && !_.isUndefined(total_assets) && total_assets > 0) {
                var ratio = deposits / total_assets;
                if (ratio === 0) peer_groups['intermediation'] = 'Non FI';
                else if (ratio < 0.2) peer_groups['intermediation'] = 'Low FI';
                else if (ratio >= 0.2) peer_groups['intermediation'] = 'High FI';
            }
            else if (total_assets === 0) {
                peer_groups['intermediation'] = 'Non FI';
            }
    
            // Market
            debug('Calculating peer group market for', doc._id);
            var depth = getVal(doc, 'calculation', 'average_balance_borrower_per_capita') || getVal(doc, 'calculation', 'average_outstanding_balance_per_capita');
            var average_loan_size = getVal(doc, 'calculation', 'average_balance_borrower') || getVal(doc, 'calculation', 'average_outstanding_balance');
            if (!_.isUndefined(depth) || !_.isUndefined(average_loan_size)) {
                if (depth < .2 || average_loan_size < 150) peer_groups['market'] = 'Low End';
                else if ((depth >= .2) && (depth < 1.5)) peer_groups['market'] = 'Broad';
                else if ((depth >= 1.5)  && (depth < 2.5)) peer_groups['market'] = 'High End';
                else if ((depth >= 2.5)) peer_groups['market'] = 'Small Business';
            }
    
            // Outreach
            debug('Calculating peer group outreach for', doc._id);
            var total_borrowers = getVal(doc, 'products_and_clients', 'total_borrowers');
            if (total_borrowers < 10000) peer_groups['outreach'] = 'Small';
            else if (total_borrowers < 30000) peer_groups['outreach'] = 'Medium';
            else if (total_borrowers >= 30000) peer_groups['outreach'] = 'Large';
    
            // Scale
            debug('Calculating peer group scale for', doc._id);
            if (_.has(doc, 'mix_region__c')) {
                var gross_loan_portfolio = getVal(doc, 'balance_sheet', 'gross_loan_portfolio');
                if (gross_loan_portfolio < 2000000 || (gross_loan_portfolio < 4000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Small';
                else if (gross_loan_portfolio < 8000000 || (gross_loan_portfolio < 15000000 && doc.mix_region__c == 'Latin America and The Caribbean')) peer_groups['scale'] = 'Medium';
                else if (gross_loan_portfolio > 8000000) peer_groups['scale'] = 'Large';
            }
    
            // Sustainability
            debug('Calculating peer group sustainability for', doc._id);
            var operational_self_sufficiency = getVal(doc, 'calculation', 'operational_self_sufficiency');
            if (!_.isUndefined(operational_self_sufficiency)) {
                if (doc.adjusted) peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-FSS' : 'FSS';
                else peer_groups['sustainability'] = operational_self_sufficiency < 1 ? 'Non-OSS' : 'OSS';
            }
    
            if (!_.isEmpty(peer_groups)) docs[id].peer_groups = peer_groups;
        });
    
        callback(docs);
    }
    
    // Send data to CouchDB
    // --------------------
    var updateCouchDB = function(docs, callback) {
        // Update master MFI record
        debug('Updating master MFI record');
        var mfi = docs.shift();
        couch.get(mfi._id, function(err, doc) {
            if (err) {
                if (err.error === 'not_found') {
                    require('util').log('Inserting ' + mfi._id);
                    !simulate && couch.save(mfi._id, mfi, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                } else throw new Error(err);
            } else if (doc._rev) {
                require('util').log('Updating ' + mfi._id);
                !simulate && couch.save(mfi._id, doc._rev, mfi, function(err, res) {
                    debug('Updated', res);
                    if (err) throw new Error(err);
                });
            }
        });
    
        // Get list of existing IDs in CouchDB
        debug('Getting list of existing IDs in CouchDB');
        couch.all({startkey: ['mfi-period', args.source, args.mfi].join('/'), endkey: ['mfi-period', args.source, args.mfi, '~'].join('/')}, function(err, ids) {
            if (err) throw new Error(err);
    
            // Remove outdated documents from CouchDB
            _.chain(ids).pluck('id').difference(_.pluck(docs, '_id')).map(function(id) {
                return _.findWhere(ids, {id: id});
            }).each(function(doc) {
                require('util').log('Removing ' + doc.id);
                couch.remove(doc.id, doc.value.rev, function(err, res) {
                    debug('Removed', res);
                    if (err) throw new Error(err);
                });
            });
    
            // Insert/update all documents for this MFI
            _.each(docs, function(doc) {
                var update = _.findWhere(ids, {id: doc._id});
                if (update) {
                    require('util').log('Updating ' + doc._id);
                    !simulate && couch.save(doc._id, update.value.rev, doc, function(err, res) {
                        debug('Updated', res);
                        if (err) throw new Error(err);
                    });
                } else {
                    require('util').log('Inserting ' + doc._id);
                    !simulate && couch.save(doc._id, doc, function(err, res) {
                        debug('Inserted', res);
                        if (err) throw new Error(err);
                    });
                }
            });
    
            callback();
        });
    }
    
    // Initialize MFI document
    var mfi = {
        _id: 'mfi/' + args.source + '/' + args.mfi,
        type: 'mfi',
        source_id: args.source,
        mfi_id: args.mfi,
        updated: new Date()
    };
    
    getMySQLData(mfi, function(mfi, docs) {
        getSalesforceData(mfi, docs, function(mfi, docs) {
            // Merge MFI metadata into each period
            _.each(docs, function(doc, id) {
                docs[id] = _.extend(_.clone(mfi), doc);
            });
            calculatePeerGroupData(docs, function(docs) {
                // Convert to array for bulk updating
                docs = _.union([mfi], _.values(docs));
                updateCouchDB(docs, function() {
                    require('util').log('Synchronization for ' + ['mfi', args.source, args.mfi].join('/') + ' finished');
                });
            });
        });
    });
    

问题

我想知道:

  1. 为什么这些子进程会出现冻结? (我找不到任何证据表明冻结的那些与执行和停止的有任何不同)。
  2. 我如何能够编写脚本来停止冻结几分钟的子进程,这样我就不必手动终止它?

答案1

必须猜测您正在耗尽某种资源(尽管您可能已经在猜测)。也许是最大打开文件数、mysql 或 salesforce。不知道。

但是您可能能够解决此问题的一种方法是,将 mf-sync 放入模块中,并使用控制 Node 脚本,该脚本使用队列来运行此 mf-sync 事物,而不是使用包含大量 mf-sync 事物的 bash 脚本有点像队列中的受控批次。尝试类似的东西https://github.com/learnboost/kue

你这样做的方式似乎有点疯狂。但如果只是文件不足,也许您可​​以通过增加限制来摆脱困境。 http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

或者,如果您确实想在 shell 中执行此操作,也许您可​​以使用batch命令来处理队列。 http://pubs.opengroup.org/onlinepubs/009695399/utilities/batch.html

或者也许这样的东西会更好http://pebblesinthesand.wordpress.com/2008/05/22/a-srcipt-for-running-processes-in-parallel-in-bash/

相关内容