From 6a9958232b8aae09507a0b4db07c7907d18a38a4 Mon Sep 17 00:00:00 2001 From: ttkopec Date: Mon, 6 May 2019 14:03:20 +0200 Subject: [PATCH 1/2] Integrate with monitoring service --- Dockerfile | 10 +- handler.js | 346 +++++++++++++++++++++++++------------------------- metadata.js | 39 ++++++ monitoring.js | 183 ++++++++++++++++++++++++++ package.json | 29 +++-- 5 files changed, 418 insertions(+), 189 deletions(-) create mode 100644 metadata.js create mode 100644 monitoring.js diff --git a/Dockerfile b/Dockerfile index 58f0d8d..efcc630 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -FROM node:8 -WORKDIR /usr/src/app -COPY package*.json ./ -RUN npm install -COPY . . +FROM node:8-alpine +WORKDIR /usr/src/app +COPY package*.json ./ +RUN npm install +COPY . . CMD [ "npm", "start" ] \ No newline at end of file diff --git a/handler.js b/handler.js index a9b914c..0c9e8b5 100644 --- a/handler.js +++ b/handler.js @@ -1,172 +1,174 @@ -'use strict'; - -const childProcess = require("child_process"); -const fs = require("fs"); -const async = require("async"); -const aws = require("aws-sdk"); -const s3 = new aws.S3(); - -function handleRequest(request) { - - const executable = request.executable; - const args = request.args; - const bucket_name = request.options.bucket; - const prefix = request.options.prefix; - const inputs = request.inputs.map(input => input.name); - const outputs = request.outputs.map(output => output.name); - const files = inputs.slice(); - files.push(executable); - - console.log("Executable: " + executable); - console.log("Arguments: " + args); - console.log("Inputs: " + inputs); - console.log("Outputs: " + outputs); - console.log("Bucket: " + bucket_name); - console.log("Prefix: " + prefix); - console.log("Stdout: " + request.stdout); - - const t_start = Date.now(); - - async.waterfall([ - download, - execute, - upload - ], function (err) { - if (err) { - console.error("Error: " + err); - process.exit(1) - } else { - console.log("Success"); - const t_end = Date.now(); - const duration = t_end - t_start; - console.log("AWS Fargate exit: duration " + duration + " ms, executable: " + executable + " args: " + args); - } - }); - - function download(callback) { - async.each(files, function (file, callback) { - - console.log("Downloading " + bucket_name + "/" + prefix + "/" + file); - - const params = { - Bucket: bucket_name, - Key: prefix + "/" + file - }; - s3.getObject(params, function (err, data) { - if (err) { - console.log("Error downloading file " + JSON.stringify(params)); - process.exit(1) - } else { - const path = "/tmp/" + file; - fs.writeFile(path, data.Body, function (err) { - if (err) { - console.log("Unable to save file " + path); - process.exit(1) - } - console.log("Downloaded " + path); - console.log("Downloaded and saved file " + path); - callback(); - }); - } - }); - }, function (err) { - if (err) { - console.error("Failed to download file:" + err); - process.exit(1) - } else { - console.log("All files have been downloaded successfully"); - callback() - } - }); - } - - function execute(callback) { - const proc_name = /tmp/ + "/" + executable; - fs.chmodSync(proc_name, "777"); - - let proc; - console.log("Running executable" + proc_name); - - if (proc_name.endsWith(".js")) { - proc = childProcess.fork(proc_name, args, {cwd: "/tmp"}); - } else if (proc_name.endsWith(".jar")) { - let java_args = ['-jar', proc_name]; - const program_args = java_args.concat(args); - proc = childProcess.spawn('java', program_args, {cwd: "/tmp"}); - } else { - proc = childProcess.spawn(proc_name, args, {cwd: "/tmp"}); - - proc.stdout.on("data", function (exedata) { - console.log("Stdout: " + executable + exedata); - }); - - proc.stderr.on("data", function (exedata) { - console.log("Stderr: " + executable + exedata); - }); - } - - if (request.stdout) { - let stdoutStream = fs.createWriteStream("/tmp" + "/" + body.stdout, {flags: 'w'}); - proc.stdout.pipe(stdoutStream); - } - - proc.on("error", function (code) { - console.error("Error!!" + executable + JSON.stringify(code)); - }); - proc.on("exit", function () { - console.log("My exe exit " + executable); - }); - - proc.on("close", function () { - console.log("My exe close " + executable); - callback() - }); - } - - function upload(callback) { - async.each(outputs, function (file, callback) { - - console.log("Uploading " + bucket_name + "/" + prefix + "/" + file); - const path = "/tmp/" + file; - - fs.readFile(path, function (err, data) { - if (err) { - console.log("Error reading file " + path); - process.exit(1) - } - - const params = { - Bucket: bucket_name, - Key: prefix + "/" + file, - Body: data - }; - - s3.putObject(params, function (err) { - if (err) { - console.log("Error uploading file " + file); - process.exit(1) - } - console.log("Uploaded file " + file); - callback(); - }); - }); - - }, function (err) { - }, function (err) { - if (err) { - console.log("Error uploading file " + err); - process.exit(1) - } else { - console.log("All files have been uploaded successfully"); - callback() - } - }); - } -} - -if (!process.argv[2]) { - console.log("Received empty request, exiting..."); - process.exit(1); -} - -handleRequest(JSON.parse(process.argv[2])); +'use strict'; + +const childProcess = require("child_process"); +const fs = require("fs"); +const async = require("async"); +const aws = require("aws-sdk"); +const s3 = new aws.S3(); + +const monitoring = require('./monitoring'); + +function handleRequest(request) { + const start = Date.now(); + + const executable = request.executable; + const args = request.args; + const bucket_name = request.options.bucket; + const prefix = request.options.prefix; + const inputs = request.inputs.map(input => input.name); + const outputs = request.outputs.map(output => output.name); + const files = inputs.slice(); + files.push(executable); + + console.log("Executable: " + executable); + console.log("Arguments: " + args); + console.log("Inputs: " + inputs); + console.log("Outputs: " + outputs); + console.log("Bucket: " + bucket_name); + console.log("Prefix: " + prefix); + console.log("Stdout: " + request.stdout); + + async.waterfall([ + monitoring.init, + download, + execute, + upload, + ], function (err) { + if (err) { + console.error(`Error in waterfall: ${err}`); + process.exit(1); + } else { + monitoring.reportExecTime(start, Date.now(), function (err, duration) { + console.log(`AWS Fargate exit: duration ${duration} ms, executable: ${executable}, args: ${args}`); + process.exit(0); + }); + } + }); + + function download(callback) { + async.each(files, function (file, callback) { + + console.log("Downloading " + bucket_name + "/" + prefix + "/" + file); + + const params = { + Bucket: bucket_name, + Key: prefix + "/" + file + }; + + s3.getObject(params, function (err, data) { + if (err) { + console.log("Error downloading file " + JSON.stringify(params)); + process.exit(1) + } else { + const path = "/tmp/" + file; + fs.writeFile(path, data.Body, function (err) { + if (err) { + console.log("Unable to save file " + path); + process.exit(1) + } + console.log("Downloaded " + path); + console.log("Downloaded and saved file " + path); + callback(); + }); + } + }); + }, function (err) { + if (err) { + console.error("Failed to download file:" + err); + process.exit(1) + } else { + console.log("All files have been downloaded successfully"); + callback() + } + }); + } + + function execute(callback) { + const proc_name = /tmp/ + "/" + executable; + fs.chmodSync(proc_name, "777"); + + let proc; + console.log("Running executable" + proc_name); + + if (proc_name.endsWith(".js")) { + proc = childProcess.fork(proc_name, args, {cwd: "/tmp"}); + } else if (proc_name.endsWith(".jar")) { + let java_args = ['-jar', proc_name]; + const program_args = java_args.concat(args); + proc = childProcess.spawn('java', program_args, {cwd: "/tmp"}); + } else { + proc = childProcess.spawn(proc_name, args, {cwd: "/tmp"}); + + proc.stdout.on("data", function (exedata) { + console.log("Stdout: " + executable + exedata); + }); + + proc.stderr.on("data", function (exedata) { + console.log("Stderr: " + executable + exedata); + }); + } + + if (request.stdout) { + let stdoutStream = fs.createWriteStream("/tmp" + "/" + body.stdout, {flags: 'w'}); + proc.stdout.pipe(stdoutStream); + } + + proc.on("error", function (code) { + console.error("Error!!" + executable + JSON.stringify(code)); + }); + proc.on("exit", function () { + console.log("My exe exit " + executable); + }); + + proc.on("close", function () { + console.log("My exe close " + executable); + callback() + }); + } + + function upload(callback) { + async.each(outputs, function (file, callback) { + + console.log("Uploading " + bucket_name + "/" + prefix + "/" + file); + const path = "/tmp/" + file; + + fs.readFile(path, function (err, data) { + if (err) { + console.log("Error reading file " + path); + process.exit(1) + } + + const params = { + Bucket: bucket_name, + Key: prefix + "/" + file, + Body: data + }; + + s3.putObject(params, function (err) { + if (err) { + console.log("Error uploading file " + file); + process.exit(1) + } + console.log("Uploaded file " + file); + callback(); + }); + }); + + }, function (err) { + if (err) { + console.log("Error uploading file " + err); + process.exit(1) + } else { + console.log("All files have been uploaded successfully"); + callback() + } + }); + } +} + +if (!process.argv[2]) { + console.log("Received empty request, exiting..."); + process.exit(1); +} + +handleRequest(JSON.parse(process.argv[2])); diff --git a/metadata.js b/metadata.js new file mode 100644 index 0000000..7c12075 --- /dev/null +++ b/metadata.js @@ -0,0 +1,39 @@ +'use strict'; + +const http = require('http'); + +const METADATA_URL = process.env.METADATA_URL || 'http://169.254.170.2/v2/metadata'; + +let _metadata = null; + +function fetch(callback) { + if (_metadata) { + return callback(null, _metadata) + } + + http.get(METADATA_URL, res => { + let body = ''; + + res.on('data', chunk => body += chunk); + + res.on('end', () => { + console.log(`Received metadata ${body} from ${METADATA_URL}`); + + try { + _metadata = JSON.parse(body) + } catch (e) { + console.warn(`Error while parsing metadata: ${e.message}`); + _metadata = null; + return callback(e); + } + + callback(null, _metadata); + }) + }).on('error', function (e) { + console.warn(`Error while requesting metadata from ${METADATA_URL}: ${e.message}`); + + callback(e); + }); +} + +exports.fetch = fetch; diff --git a/monitoring.js b/monitoring.js new file mode 100644 index 0000000..60151cd --- /dev/null +++ b/monitoring.js @@ -0,0 +1,183 @@ +'use strict'; + +const INTERFACE = process.env.INTERFACE || 'eth0'; +const DISK_OPTS = { + device: process.env.DISK_DEVICE || 'xvda1', + units: 'KiB', +}; +const TASK_ID = process.env.TASK_ID || 'undefinedTaskId'; +const START = process.env.START || 0; +const PUSH_GW_URL = process.env.PUSH_GW_URL || 'http://localhost:9091'; +const LABELS = process.env.LABELS || ''; +const COLLECT_INTERVAL = 1000; + +const si = require('systeminformation'); +const os_utils = require('os-utils'); +const diskStat = require('disk-stat'); +const async = require('async'); +const prometheus = require('prom-client'); +const gateway = new prometheus.Pushgateway(PUSH_GW_URL); + +const fetchMetaData = require('./metadata').fetch; + +// splits labels in form of a string: 'key1=val1,key2=val2', to object: {key1: val1, key2: val2} +function parseLabels(labelsString) { + return labelsString ? labelsString.split(',') + .map(s => s.split('=')) + .reduce((acc, curr) => { + acc[curr[0]] = curr[1]; + return acc; + }, {}) : + {}; +} + +const labels = { + containerID: null, + taskID: TASK_ID, + ...parseLabels(LABELS) +}; + +const labelNames = Object.keys(labels); + +const prometheusMetrics = { + hyperflow_cpu_usage: new prometheus.Gauge({ + name: 'hyperflow_cpu_usage', + help: 'CPU usage', + labelNames: labelNames + }), + hyperflow_memory_usage: new prometheus.Gauge({ + name: 'hyperflow_memory_usage', + help: 'Memory usage', + labelNames: labelNames + }), + hyperflow_connection_received: new prometheus.Gauge({ + name: 'hyperflow_connection_received', + help: 'Received bytes per second', + labelNames: labelNames + }), + hyperflow_connection_transferred: new prometheus.Gauge({ + name: 'hyperflow_connection_transferred', + help: 'Transferred bytes per second', + labelNames: labelNames + }), + hyperflow_disc_read: new prometheus.Gauge({ + name: 'hyperflow_disc_read', + help: 'Read kB per second', + labelNames: labelNames + }), + hyperflow_disc_write: new prometheus.Gauge({ + name: 'hyperflow_disc_write', + help: 'Write kB per second', + labelNames: labelNames + }), + hyperflow_task_execution_time: new prometheus.Gauge({ + name: 'hyperflow_task_execution_time', + help: 'Task execution time in seconds', + labelNames: labelNames + }), + hyperflow_task_execution_time_buckets: new prometheus.Histogram({ + name: 'hyperflow_task_execution_time_buckets', + help: 'Task execution time in seconds', + labelNames: labelNames + }), + hyperflow_task_start_time: new prometheus.Gauge({ + name: 'hyperflow_task_start_time', + help: 'Task start timestamp', + labelNames: labelNames + }), + hyperflow_task_end_time: new prometheus.Gauge({ + name: 'hyperflow_task_end_time', + help: 'Task end timestamp', + labelNames: labelNames + }) +}; + +prometheus.collectDefaultMetrics(); + +function collectUsage(callback) { + async.waterfall([ + function (callback) { + os_utils.cpuUsage(value => { + prometheusMetrics.hyperflow_cpu_usage.set(labels, value); + callback(null); + }); + }, + function (callback) { + si.mem(data => { + prometheusMetrics.hyperflow_memory_usage.set(labels, data.used / 1024); + callback(null); + }); + }, + function (callback) { + si.networkStats(INTERFACE, data => { + prometheusMetrics.hyperflow_connection_received.set(labels, data.rx_sec || 0); + prometheusMetrics.hyperflow_connection_transferred.set(labels, data.tx_sec || 0); + callback(null); + }); + }, + function (callback) { + diskStat.usageRead(DISK_OPTS, value => { + prometheusMetrics.hyperflow_disc_read.set(labels, value || 0); + callback(null); + }); + }, + function (callback) { + diskStat.usageWrite(DISK_OPTS, value => { + prometheusMetrics.hyperflow_disc_write.set(labels, value || 0); + callback(null); + }); + }, + function (callback) { + gateway.pushAdd({jobName: 'hyperflow-service'}, function (err, resp, body) { + if (err) { + return callback(err); + } + console.log('Successfully pushed metrics to gateway'); + callback(null); + }); + } + ], function (err) { + if (err) { + console.warn(`Error while pushing metrics to gateway: ${err.message}`); + } + + // ignore metrics error, just log them + if (callback) { + callback(null); + } + }); +} + +function reportExecTime(start, end, callback) { + const duration = end - start; + prometheusMetrics.hyperflow_task_start_time.set(labels, start); + prometheusMetrics.hyperflow_task_end_time.set(labels, end); + prometheusMetrics.hyperflow_task_execution_time.set(labels, duration); + prometheusMetrics.hyperflow_task_execution_time_buckets.observe(labels, duration); + + collectUsage(err => { + if (err) { + console.warn(err.message); + } + + callback(null, duration); + }); +} + +function _init(callback) { + fetchMetaData((err, metadata) => { + labels.containerID = err ? 'undefinedContainerId' : metadata.Containers[0].DockerId; + + setInterval(collectUsage, COLLECT_INTERVAL); + + callback(null); + }); + +} + +if (START) { + _init(() => console.log('Initialized monitoring service')); +} else { + exports.init = _init; + exports.reportExecTime = reportExecTime; +} \ No newline at end of file diff --git a/package.json b/package.json index b11ebd2..3931bad 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,17 @@ -{ - "name": "aws-fargate-executor", - "version": "1.0.0", - "main": "handler.js", - "scripts": { - "start": "node handler.js" - }, - "dependencies": { - "async": "^2.6.2", - "aws-sdk": "^2.429.0" - } -} +{ + "name": "aws-fargate-executor", + "version": "1.0.0", + "main": "handler.js", + "scripts": { + "start": "node handler.js", + "dev": "node monitoring.js" + }, + "dependencies": { + "async": "^2.6.2", + "aws-sdk": "^2.429.0", + "disk-stat": "^1.0.4", + "os-utils": "0.0.14", + "prom-client": "^11.3.0", + "systeminformation": "^4.1.4" + } +} From cdecc345d6672a9e759c520b1fbbcea22c964cad Mon Sep 17 00:00:00 2001 From: ttkopec Date: Mon, 16 Sep 2019 21:12:25 +0200 Subject: [PATCH 2/2] Update --- handler.js | 161 +++++++++++++++++++++-------------- metadata.js | 28 +------ monitoring.js | 227 +++++++++++++++++++++----------------------------- package.json | 2 +- 4 files changed, 197 insertions(+), 221 deletions(-) diff --git a/handler.js b/handler.js index 8364713..e21b5bb 100644 --- a/handler.js +++ b/handler.js @@ -1,14 +1,32 @@ 'use strict'; -const childProcess = require("child_process"); -const fs = require("fs"); -const async = require("async"); -const aws = require("aws-sdk"); +const childProcess = require('child_process'); +const fs = require('fs'); +const async = require('async'); +const aws = require('aws-sdk'); const s3 = new aws.S3(); -const monitoring = require('./monitoring'); +const init = require('./monitoring').init; function handleRequest(request) { + if (request.s3) { + const params = { + Bucket: request.options.bucket, + Key: `${request.options.prefix}/${request.s3}` + }; + s3.getObject(params, (err, data) => { + if (err) { + console.error(`Error downloading job message file from S3 - bucket: ${params.Bucket}, key: ${params.Key}`); + process.exit(1); + } + handle(JSON.parse(data.Body.toString())); + }) + } else { + handle(request); + } +} + +function handle(request) { const start = Date.now(); const executable = request.executable; @@ -20,154 +38,175 @@ function handleRequest(request) { const files = inputs.slice(); files.push(executable); - console.log("Executable: " + executable); - console.log("Arguments: " + args); - console.log("Inputs: " + inputs); - console.log("Outputs: " + outputs); - console.log("Bucket: " + bucket_name); - console.log("Prefix: " + prefix); - console.log("Stdout: " + request.stdout); + console.log(`Executable: ${executable}`); + console.log(`Arguments: ${args}`); + console.log(`Inputs: ${inputs}`); + console.log(`Outputs: ${outputs}`); + console.log(`Bucket: ${bucket_name}`); + console.log(`Prefix: ${prefix}`); + console.log(`Stdout: ${request.stdout}`); + console.log(`PATH: ${process.env.PATH}`); async.waterfall([ - monitoring.init, + init, download, execute, upload, - ], function (err) { + ], function (err, reporter, previousData) { if (err) { console.error(`Error in waterfall: ${err}`); process.exit(1); } else { - monitoring.reportExecTime(start, Date.now(), function (err, duration) { - console.log(`AWS Fargate exit: duration ${duration} ms, executable: ${executable}, args: ${args}`); + const end = Date.now(); + + reporter.report({start: start, end: end, ...previousData}, function (err) { + if (err) { + console.error(`Error on reporting exec time: ${err}`); + } + + console.log(`AWS Fargate exit: duration ${end - start} ms, executable: ${executable}, args: ${args}`); process.exit(0); }); } }); - function download(callback) { + function download(reporter, callback) { + const downloadStart = Date.now(); + async.each(files, function (file, callback) { - console.log("Downloading " + bucket_name + "/" + prefix + "/" + file); + console.log(`Downloading ${bucket_name}/${prefix}/${file}`); const params = { Bucket: bucket_name, - Key: prefix + "/" + file + Key: `${prefix}/${file}` }; s3.getObject(params, function (err, data) { if (err) { - console.log("Error downloading file " + JSON.stringify(params)); - process.exit(1) + console.log(`Error downloading file ${JSON.stringify(params)}`); + process.exit(1); } else { const path = "/tmp/" + file; + fs.writeFile(path, data.Body, function (err) { if (err) { - console.log("Unable to save file " + path); + console.log(`Unable to save file ${path}`); process.exit(1) } - console.log("Downloaded " + path); - console.log("Downloaded and saved file " + path); - callback(); + + console.log(`Downloaded and saved file ${path}`); + + fs.chmod(path, '777', callback); }); } }); }, function (err) { if (err) { - console.error("Failed to download file:" + err); + console.error(`Failed to download file: ${err}`); process.exit(1) } else { - console.log("All files have been downloaded successfully"); - callback() + console.log(`All files have been downloaded successfully`); + + callback(null, reporter, { + download_start: downloadStart, + download_end: Date.now() + }); } }); } - function execute(callback) { - const proc_name = /tmp/ + "/" + executable; - fs.chmodSync(proc_name, "777"); + function execute(reporter, previousData, callback) { + const executeStart = Date.now(); + + const proc_name = `/tmp/${executable}`; let proc; console.log("Running executable" + proc_name); - if (proc_name.endsWith(".js")) { - proc = childProcess.fork(proc_name, args, {cwd: "/tmp"}); - } else if (proc_name.endsWith(".jar")) { + if (proc_name.endsWith('.js')) { + proc = childProcess.fork(proc_name, args, {cwd: '/tmp'}); + } else if (proc_name.endsWith('.jar')) { let java_args = ['-jar', proc_name]; const program_args = java_args.concat(args); - proc = childProcess.spawn('java', program_args, {cwd: "/tmp"}); + proc = childProcess.spawn('java', program_args, {cwd: '/tmp'}); } else { - proc = childProcess.spawn(proc_name, args, {cwd: "/tmp"}); + proc = childProcess.spawn(proc_name, args, {cwd: '/tmp', env: {PATH: process.env.PATH}}); - proc.stdout.on("data", function (exedata) { - console.log("Stdout: " + executable + exedata); + proc.stdout.on('data', function (exedata) { + console.log(`Stdout: ${executable} / ${exedata}`); }); - proc.stderr.on("data", function (exedata) { - console.log("Stderr: " + executable + exedata); + proc.stderr.on('data', function (exedata) { + console.log(`Stderr: ${executable} / ${exedata}`); }); } if (request.stdout) { - let stdoutStream = fs.createWriteStream("/tmp" + "/" + request.stdout, {flags: 'w'}); + let stdoutStream = fs.createWriteStream(`/tmp/${request.stdout}`, {flags: 'w'}); proc.stdout.pipe(stdoutStream); } - proc.on("error", function (code) { - console.error("Error!!" + executable + JSON.stringify(code)); + proc.on('error', function (code) { + console.error(`Error: ${JSON.stringify(code)}`); + callback(code); }); - proc.on("exit", function () { - console.log("My exe exit " + executable); + proc.on('exit', function () { + console.log(`My exe exit ${executable}`); }); - proc.on("close", function () { - console.log("My exe close " + executable); - callback() + proc.on('close', function () { + console.log(`My exe close ${executable}`); + + callback(null, reporter, {execute_start: executeStart, execute_end: Date.now(), ...previousData}); }); } - function upload(callback) { + function upload(reporter, previousData, callback) { + const uploadStart = Date.now(); + async.each(outputs, function (file, callback) { - console.log("Uploading " + bucket_name + "/" + prefix + "/" + file); - const path = "/tmp/" + file; + console.log(`Uploading ${bucket_name}/${prefix}/${file}`); + const path = `/tmp/${file}`; fs.readFile(path, function (err, data) { if (err) { - console.log("Error reading file " + path); - process.exit(1) + console.log(`Error reading file ${path}, err: ${err}`); + process.exit(1); } const params = { Bucket: bucket_name, - Key: prefix + "/" + file, + Key: `${prefix}/${file}`, Body: data }; s3.putObject(params, function (err) { if (err) { - console.log("Error uploading file " + file); + console.log(`Error uploading file ${file}`); process.exit(1) } - console.log("Uploaded file " + file); + console.log(`Uploaded file ${file}`); callback(); }); }); }, function (err) { if (err) { - console.log("Error uploading file " + err); - process.exit(1) + console.log(`Error uploading file ${err}`); + process.exit(1); } else { - console.log("All files have been uploaded successfully"); - callback() + console.log('All files have been uploaded successfully'); + + callback(null, reporter, {upload_start: uploadStart, upload_end: Date.now(), ...previousData}) } }); } } if (!process.argv[2]) { - console.log("Received empty request, exiting..."); + console.log('Received empty request, exiting...'); process.exit(1); } diff --git a/metadata.js b/metadata.js index 7c12075..8c54b50 100644 --- a/metadata.js +++ b/metadata.js @@ -1,8 +1,6 @@ 'use strict'; -const http = require('http'); - -const METADATA_URL = process.env.METADATA_URL || 'http://169.254.170.2/v2/metadata'; +const METADATA_URL = process.env.ECS_CONTAINER_METADATA_URI; let _metadata = null; @@ -11,29 +9,9 @@ function fetch(callback) { return callback(null, _metadata) } - http.get(METADATA_URL, res => { - let body = ''; - - res.on('data', chunk => body += chunk); - - res.on('end', () => { - console.log(`Received metadata ${body} from ${METADATA_URL}`); - - try { - _metadata = JSON.parse(body) - } catch (e) { - console.warn(`Error while parsing metadata: ${e.message}`); - _metadata = null; - return callback(e); - } - - callback(null, _metadata); - }) - }).on('error', function (e) { - console.warn(`Error while requesting metadata from ${METADATA_URL}: ${e.message}`); + _metadata = METADATA_URL.substring(METADATA_URL.lastIndexOf('/'), METADATA_URL.length); - callback(e); - }); + return callback(null, _metadata); } exports.fetch = fetch; diff --git a/monitoring.js b/monitoring.js index 60151cd..5974aa7 100644 --- a/monitoring.js +++ b/monitoring.js @@ -1,176 +1,136 @@ 'use strict'; -const INTERFACE = process.env.INTERFACE || 'eth0'; +const INTERFACE = process.env.INTERFACE || null; const DISK_OPTS = { device: process.env.DISK_DEVICE || 'xvda1', units: 'KiB', }; const TASK_ID = process.env.TASK_ID || 'undefinedTaskId'; const START = process.env.START || 0; -const PUSH_GW_URL = process.env.PUSH_GW_URL || 'http://localhost:9091'; const LABELS = process.env.LABELS || ''; +const INFLUXDB_HOST = process.env.INFLUXDB_HOST || 'influxdb'; +const INFLUXDB_DB_NAME = process.env.INFLUXDB_NAME || 'hyperflow-database'; const COLLECT_INTERVAL = 1000; const si = require('systeminformation'); const os_utils = require('os-utils'); const diskStat = require('disk-stat'); const async = require('async'); -const prometheus = require('prom-client'); -const gateway = new prometheus.Pushgateway(PUSH_GW_URL); +const Influx = require('influx'); const fetchMetaData = require('./metadata').fetch; +const MetricDispatcher = function (err, metadata) { + this.tags = { + containerID: err ? 'undefinedContainerId' : metadata, + taskID: TASK_ID, + ...this._parseLabels(LABELS) + }; + + this._collectUsage = this._collectUsage.bind(this); + + // initialize influx, create database and start collecting usage + this._initInflux(INFLUXDB_HOST, INFLUXDB_DB_NAME, Object.keys(this.tags)) + .then(() => setInterval(this._collectUsage, COLLECT_INTERVAL)); +}; + // splits labels in form of a string: 'key1=val1,key2=val2', to object: {key1: val1, key2: val2} -function parseLabels(labelsString) { +MetricDispatcher.prototype._parseLabels = function (labelsString) { return labelsString ? labelsString.split(',') - .map(s => s.split('=')) - .reduce((acc, curr) => { - acc[curr[0]] = curr[1]; - return acc; - }, {}) : - {}; -} - -const labels = { - containerID: null, - taskID: TASK_ID, - ...parseLabels(LABELS) + .map(s => s.split('=')) + .reduce((acc, curr) => { + acc[curr[0]] = curr[1]; + return acc; + }, {}) : + {}; }; -const labelNames = Object.keys(labels); - -const prometheusMetrics = { - hyperflow_cpu_usage: new prometheus.Gauge({ - name: 'hyperflow_cpu_usage', - help: 'CPU usage', - labelNames: labelNames - }), - hyperflow_memory_usage: new prometheus.Gauge({ - name: 'hyperflow_memory_usage', - help: 'Memory usage', - labelNames: labelNames - }), - hyperflow_connection_received: new prometheus.Gauge({ - name: 'hyperflow_connection_received', - help: 'Received bytes per second', - labelNames: labelNames - }), - hyperflow_connection_transferred: new prometheus.Gauge({ - name: 'hyperflow_connection_transferred', - help: 'Transferred bytes per second', - labelNames: labelNames - }), - hyperflow_disc_read: new prometheus.Gauge({ - name: 'hyperflow_disc_read', - help: 'Read kB per second', - labelNames: labelNames - }), - hyperflow_disc_write: new prometheus.Gauge({ - name: 'hyperflow_disc_write', - help: 'Write kB per second', - labelNames: labelNames - }), - hyperflow_task_execution_time: new prometheus.Gauge({ - name: 'hyperflow_task_execution_time', - help: 'Task execution time in seconds', - labelNames: labelNames - }), - hyperflow_task_execution_time_buckets: new prometheus.Histogram({ - name: 'hyperflow_task_execution_time_buckets', - help: 'Task execution time in seconds', - labelNames: labelNames - }), - hyperflow_task_start_time: new prometheus.Gauge({ - name: 'hyperflow_task_start_time', - help: 'Task start timestamp', - labelNames: labelNames - }), - hyperflow_task_end_time: new prometheus.Gauge({ - name: 'hyperflow_task_end_time', - help: 'Task end timestamp', - labelNames: labelNames - }) +MetricDispatcher.prototype._initInflux = function (url, dbName, tags) { + this.influx = new Influx.InfluxDB({ + host: url, + database: dbName, + schema: [ + { + measurement: 'performance', + fields: { + cpu_usage: Influx.FieldType.FLOAT, + mem_usage: Influx.FieldType.INTEGER, + conn_recv: Influx.FieldType.INTEGER, + conn_transferred: Influx.FieldType.INTEGER, + disk_read: Influx.FieldType.INTEGER, + disk_write: Influx.FieldType.INTEGER + }, + tags: tags + }, + { + measurement: 'hflow_task', + fields: { + start: Influx.FieldType.FLOAT, + end: Influx.FieldType.FLOAT, + download_start: Influx.FieldType.FLOAT, + download_end: Influx.FieldType.FLOAT, + execute_start: Influx.FieldType.FLOAT, + execute_end: Influx.FieldType.FLOAT, + upload_start: Influx.FieldType.FLOAT, + upload_end: Influx.FieldType.FLOAT + }, + tags: tags + }, + ] + }); + + return this.influx.createDatabase(dbName) }; -prometheus.collectDefaultMetrics(); +MetricDispatcher.prototype._write = function (measurement, fields, callback) { + this.influx.writeMeasurement(measurement, [{ + tags: this.tags, + fields: fields + }]).then(callback).catch(callback); +}; -function collectUsage(callback) { +MetricDispatcher.prototype._collectUsage = function () { async.waterfall([ - function (callback) { - os_utils.cpuUsage(value => { - prometheusMetrics.hyperflow_cpu_usage.set(labels, value); - callback(null); - }); + callback => { + os_utils.cpuUsage(value => this._write('hflow_performance', {cpu_usage: value}, callback)); }, - function (callback) { - si.mem(data => { - prometheusMetrics.hyperflow_memory_usage.set(labels, data.used / 1024); - callback(null); - }); + callback => { + si.mem(data => this._write('hflow_performance', {mem_usage: data.used / 1024}, callback)); }, - function (callback) { - si.networkStats(INTERFACE, data => { - prometheusMetrics.hyperflow_connection_received.set(labels, data.rx_sec || 0); - prometheusMetrics.hyperflow_connection_transferred.set(labels, data.tx_sec || 0); - callback(null); - }); + callback => { + si.networkStats(INTERFACE, data => this._write('hflow_performance', { + conn_recv: data[0].rx_sec, + conn_transferred: data[0].tx_sec + }, callback)); }, - function (callback) { - diskStat.usageRead(DISK_OPTS, value => { - prometheusMetrics.hyperflow_disc_read.set(labels, value || 0); - callback(null); - }); + callback => { + diskStat.usageRead(DISK_OPTS, value => this._write('hflow_performance', {disk_read: value}, callback)); }, - function (callback) { - diskStat.usageWrite(DISK_OPTS, value => { - prometheusMetrics.hyperflow_disc_write.set(labels, value || 0); - callback(null); - }); - }, - function (callback) { - gateway.pushAdd({jobName: 'hyperflow-service'}, function (err, resp, body) { - if (err) { - return callback(err); - } - console.log('Successfully pushed metrics to gateway'); - callback(null); - }); + callback => { + diskStat.usageWrite(DISK_OPTS, value => this._write('hflow_performance', {disk_write: value}, callback)); } + ], function (err) { if (err) { - console.warn(`Error while pushing metrics to gateway: ${err.message}`); - } - - // ignore metrics error, just log them - if (callback) { - callback(null); + console.warn(`Error while pushing metrics to tsdb: ${err.message}`); + } else { + console.log('Successfully pushed metrics to tsdb'); } }); -} - -function reportExecTime(start, end, callback) { - const duration = end - start; - prometheusMetrics.hyperflow_task_start_time.set(labels, start); - prometheusMetrics.hyperflow_task_end_time.set(labels, end); - prometheusMetrics.hyperflow_task_execution_time.set(labels, duration); - prometheusMetrics.hyperflow_task_execution_time_buckets.observe(labels, duration); +}; - collectUsage(err => { - if (err) { - console.warn(err.message); - } +MetricDispatcher.prototype.reportExecTime = function (start, end, callback) { + this._write('hflow_task', {start: start, end: end}, err => callback(err, end - start)); +}; - callback(null, duration); - }); -} +MetricDispatcher.prototype.report = function (fields, callback) { + this._write('hflow_task', fields, callback); +}; function _init(callback) { fetchMetaData((err, metadata) => { - labels.containerID = err ? 'undefinedContainerId' : metadata.Containers[0].DockerId; - - setInterval(collectUsage, COLLECT_INTERVAL); - - callback(null); + const reporter = new MetricDispatcher(err, metadata); + callback(null, reporter); }); } @@ -179,5 +139,4 @@ if (START) { _init(() => console.log('Initialized monitoring service')); } else { exports.init = _init; - exports.reportExecTime = reportExecTime; } \ No newline at end of file diff --git a/package.json b/package.json index 3931bad..2322300 100644 --- a/package.json +++ b/package.json @@ -10,8 +10,8 @@ "async": "^2.6.2", "aws-sdk": "^2.429.0", "disk-stat": "^1.0.4", + "influx": "^5.0.7", "os-utils": "0.0.14", - "prom-client": "^11.3.0", "systeminformation": "^4.1.4" } }