Skip to content

Commit 79fe510

Browse files
committed
feat: support APM Server intake API version 2 (#465)
Closes #356
1 parent 8de2bb7 commit 79fe510

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1365
-2135
lines changed

docs/agent-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ Defaults to `unnamed`
10901090
You can alternatively set this via <<span-type,`span.type`>>.
10911091
Defaults to `custom.code`
10921092

1093-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
1093+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
10941094

10951095
See <<span-api,Span API>> docs for details on how to use custom spans.
10961096

docs/span-api.asciidoc

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ Defaults to `unnamed`
7373
You can alternatively set this via <<span-type,`span.type`>>.
7474
Defaults to `custom.code`
7575

76-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
76+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
7777

7878
[[span-end]]
7979
==== `span.end()`
@@ -86,20 +86,3 @@ span.end()
8686
End the span.
8787
If the span has already ended,
8888
nothing happens.
89-
90-
A span that isn't ended before the parent transaction ends will be <<span-truncate,truncated>>.
91-
92-
[[span-truncate]]
93-
==== `span.truncate()`
94-
95-
[source,js]
96-
----
97-
span.truncate()
98-
----
99-
100-
Truncates and ends the span.
101-
If the span is already ended or truncated,
102-
nothing happens.
103-
104-
A truncated span is a special type of ended span.
105-
It's used to indicate that the measured event took longer than the duration recorded by the span.

docs/transaction-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Think of the transaction result as equivalent to the status code of an HTTP resp
5757
transaction.end([result])
5858
----
5959

60-
Ends the transaction and <<span-truncate,truncates>> all un-ended child spans.
60+
Ends the transaction.
6161
If the transaction has already ended,
6262
nothing happens.
6363

lib/agent.js

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ var connect = require('./middleware/connect')
1515
var Filters = require('./filters')
1616
var Instrumentation = require('./instrumentation')
1717
var parsers = require('./parsers')
18-
var request = require('./request')
1918
var stackman = require('./stackman')
2019
var symbols = require('./symbols')
20+
var truncate = require('./truncate')
2121

2222
var IncomingMessage = http.IncomingMessage
2323
var ServerResponse = http.ServerResponse
@@ -32,6 +32,7 @@ function Agent () {
3232

3333
this._instrumentation = new Instrumentation(this)
3434
this._filters = new Filters()
35+
this._apmServer = null
3536

3637
this._conf = null
3738
this._httpClient = null
@@ -52,6 +53,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
5253
}
5354
})
5455

56+
Agent.prototype.destroy = function () {
57+
if (this._apmServer) this._apmServer.destroy()
58+
}
59+
5560
Agent.prototype.startTransaction = function () {
5661
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments)
5762
}
@@ -129,15 +134,35 @@ Agent.prototype.start = function (opts) {
129134
})
130135
}
131136

132-
this._instrumentation.start()
137+
this._apmServer = new ElasticAPMHttpClient({
138+
// metadata
139+
agentName: 'nodejs',
140+
agentVersion: version,
141+
serviceName: this._conf.serviceName,
142+
serviceVersion: this._conf.serviceVersion,
143+
frameworkName: this._conf.frameworkName,
144+
frameworkVersion: this._conf.frameworkVersion,
145+
hostname: this._conf.hostname,
133146

134-
this._httpClient = new ElasticAPMHttpClient({
147+
// Sanitize conf
148+
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value)
149+
150+
// HTTP conf
135151
secretToken: this._conf.secretToken,
136152
userAgent: userAgent,
137153
serverUrl: this._conf.serverUrl,
138154
rejectUnauthorized: this._conf.verifyServerCert,
139-
serverTimeout: this._conf.serverTimeout * 1000
155+
serverTimeout: this._conf.serverTimeout * 1000,
156+
157+
// Streaming conf
158+
size: this._conf.apiRequestSize,
159+
time: this._conf.apiRequestTime * 1000
140160
})
161+
this._apmServer.on('error', err => {
162+
this.logger.error('An error occrued while communicating with the APM Server:', err.message)
163+
})
164+
165+
this._instrumentation.start()
141166

142167
Error.stackTraceLimit = this._conf.stackTraceLimit
143168
if (this._conf.captureExceptions) this.handleUncaughtExceptions()
@@ -285,10 +310,27 @@ Agent.prototype.captureError = function (err, opts, cb) {
285310
}
286311

287312
function send (error) {
288-
agent.logger.info('logging error %s with Elastic APM', id)
289-
request.errors(agent, [error], (err) => {
290-
if (cb) cb(err, error.id)
291-
})
313+
error = agent._filters.process(error) // TODO: Update filter to expect this format
314+
315+
if (!error) {
316+
agent.logger.debug('error ignored by filter %o', { id: id })
317+
cb(null, id)
318+
return
319+
}
320+
321+
truncate.error(error, agent._conf)
322+
323+
if (agent._apmServer) {
324+
agent.logger.info(`Sending error ${id} to Elastic APM`)
325+
agent._apmServer.sendError(error, function () {
326+
agent._apmServer.flush(function (err) {
327+
cb(err, id)
328+
})
329+
})
330+
} else {
331+
// TODO: Swallow this error just as it's done in agent.flush()?
332+
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'), id))
333+
}
292334
}
293335
}
294336

@@ -314,7 +356,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
314356
}
315357

316358
Agent.prototype.flush = function (cb) {
317-
this._instrumentation.flush(cb)
359+
if (this._apmServer) {
360+
this._apmServer.flush(cb)
361+
} else {
362+
this.logger.warn(new Error('cannot flush agent before it is started'))
363+
process.nextTick(cb)
364+
}
318365
}
319366

320367
Agent.prototype.lambda = function wrapLambda (type, fn) {

lib/config.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

33
var fs = require('fs')
4-
var os = require('os')
54
var path = require('path')
65

76
var consoleLogLevel = require('console-log-level')
@@ -35,7 +34,8 @@ var DEFAULTS = {
3534
verifyServerCert: true,
3635
active: true,
3736
logLevel: 'info',
38-
hostname: os.hostname(),
37+
apiRequestSize: 1024 * 1024, // TODO: Is this the right default
38+
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms?
3939
stackTraceLimit: 50,
4040
captureExceptions: true,
4141
filterHttpHeaders: true,
@@ -51,10 +51,10 @@ var DEFAULTS = {
5151
sourceLinesSpanAppFrames: 0,
5252
sourceLinesSpanLibraryFrames: 0,
5353
errorMessageMaxLength: 2048,
54-
flushInterval: 10,
54+
flushInterval: 10, // TODO: Deprecate
5555
transactionMaxSpans: 500,
5656
transactionSampleRate: 1.0,
57-
maxQueueSize: 100,
57+
maxQueueSize: 100, // TODO: Deprecate
5858
serverTimeout: 30,
5959
disableInstrumentations: []
6060
}
@@ -68,6 +68,8 @@ var ENV_TABLE = {
6868
active: 'ELASTIC_APM_ACTIVE',
6969
logLevel: 'ELASTIC_APM_LOG_LEVEL',
7070
hostname: 'ELASTIC_APM_HOSTNAME',
71+
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE',
72+
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME',
7173
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME',
7274
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION',
7375
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT',
@@ -105,6 +107,8 @@ var BOOL_OPTS = [
105107
]
106108

107109
var NUM_OPTS = [
110+
'apiRequestSize',
111+
'apiRequestTime',
108112
'stackTraceLimit',
109113
'abortedErrorThreshold',
110114
'flushInterval',

lib/instrumentation/index.js

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
var fs = require('fs')
44
var path = require('path')
55

6-
var AsyncValuePromise = require('async-value-promise')
76
var hook = require('require-in-the-middle')
87
var semver = require('semver')
98

10-
var Queue = require('../queue')
11-
var request = require('../request')
129
var Transaction = require('./transaction')
10+
var truncate = require('../truncate')
1311
var shimmer = require('./shimmer')
1412

1513
var MODULES = [
@@ -46,7 +44,6 @@ module.exports = Instrumentation
4644

4745
function Instrumentation (agent) {
4846
this._agent = agent
49-
this._queue = null
5047
this._hook = null // this._hook is only exposed for testing purposes
5148
this._started = false
5249
this.currentTransaction = null
@@ -60,21 +57,6 @@ Instrumentation.prototype.start = function () {
6057
var self = this
6158
this._started = true
6259

63-
var qopts = {
64-
flushInterval: this._agent._conf.flushInterval,
65-
maxQueueSize: this._agent._conf.maxQueueSize,
66-
logger: this._agent.logger
67-
}
68-
this._queue = new Queue(qopts, function onFlush (transactions, done) {
69-
AsyncValuePromise.all(transactions).then(function (transactions) {
70-
if (self._agent._conf.active && transactions.length > 0) {
71-
request.transactions(self._agent, transactions, done)
72-
} else {
73-
done()
74-
}
75-
}, done)
76-
})
77-
7860
if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) {
7961
require('./async-hooks')(this)
8062
} else {
@@ -111,27 +93,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl
11193
}
11294

11395
Instrumentation.prototype.addEndedTransaction = function (transaction) {
96+
var agent = this._agent
97+
11498
if (this._started) {
115-
var queue = this._queue
99+
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format
100+
if (!payload) return agent.logger.debug('transaction ignored by filter %o', { id: transaction.id })
101+
truncate.transaction(payload)
102+
agent.logger.debug('sending transaction %o', { id: transaction.id })
103+
agent._apmServer.sendTransaction(payload)
104+
} else {
105+
agent.logger.debug('ignoring transaction %o', { id: transaction.id })
106+
}
107+
}
116108

117-
this._agent.logger.debug('adding transaction to queue %o', { id: transaction.id })
109+
Instrumentation.prototype.addEndedSpan = function (span) {
110+
var agent = this._agent
118111

119-
var payload = new AsyncValuePromise()
112+
if (this._started) {
113+
agent.logger.debug('encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type })
114+
span._encode(function (err, payload) {
115+
if (err) {
116+
agent.logger.error('error encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type, error: err.message })
117+
return
118+
}
120119

121-
payload.catch(function (err) {
122-
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message)
123-
})
120+
payload = agent._filters.process(payload) // TODO: Update filter to expect this format
124121

125-
// Add the transaction payload to the queue instead of the transation
126-
// object it self to free up the transaction for garbage collection
127-
transaction._encode(function (err, _payload) {
128-
if (err) payload.reject(err)
129-
else payload.resolve(_payload)
130-
})
122+
if (!payload) {
123+
agent.logger.debug('span ignored by filter %o', { trans: span.transaction.id, name: span.name, type: span.type })
124+
return
125+
}
131126

132-
queue.add(payload)
127+
truncate.span(payload)
128+
129+
agent.logger.debug('sending span %o', { trans: span.transaction.id, name: span.name, type: span.type })
130+
if (agent._apmServer) agent._apmServer.sendSpan(payload)
131+
})
133132
} else {
134-
this._agent.logger.debug('ignoring transaction %o', { id: transaction.id })
133+
agent.logger.debug('ignoring span %o', { trans: span.transaction.id, name: span.name, type: span.type })
135134
}
136135
}
137136

@@ -221,11 +220,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) {
221220

222221
this.currentTransaction = trans
223222
}
224-
225-
Instrumentation.prototype.flush = function (cb) {
226-
if (this._queue) {
227-
this._queue.flush(cb)
228-
} else {
229-
process.nextTick(cb)
230-
}
231-
}

lib/instrumentation/span.js

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ module.exports = Span
1414
function Span (transaction) {
1515
this.transaction = transaction
1616
this.started = false
17-
this.truncated = false
1817
this.ended = false
1918
this.name = null
2019
this.type = null
@@ -50,18 +49,6 @@ Span.prototype.customStackTrace = function (stackObj) {
5049
this._recordStackTrace(stackObj)
5150
}
5251

53-
Span.prototype.truncate = function () {
54-
if (!this.started) {
55-
this._agent.logger.debug('tried to truncate non-started span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
56-
return
57-
} else if (this.ended) {
58-
this._agent.logger.debug('tried to truncate already ended span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
59-
return
60-
}
61-
this.truncated = true
62-
this.end()
63-
}
64-
6552
Span.prototype.end = function () {
6653
if (!this.started) {
6754
this._agent.logger.debug('tried to call span.end() on un-started span %o', { id: this.transaction.id, name: this.name, type: this.type })
@@ -75,8 +62,8 @@ Span.prototype.end = function () {
7562
this._agent._instrumentation._recoverTransaction(this.transaction)
7663

7764
this.ended = true
78-
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type, truncated: this.truncated })
79-
this.transaction._recordEndedSpan(this)
65+
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type })
66+
this._agent._instrumentation.addEndedSpan(this)
8067
}
8168

8269
Span.prototype.duration = function () {
@@ -157,8 +144,10 @@ Span.prototype._encode = function (cb) {
157144
}
158145

159146
var payload = {
147+
transactionId: self.transaction.id,
148+
timestamp: self.transaction.timestamp,
160149
name: self.name,
161-
type: self.truncated ? self.type + '.truncated' : self.type,
150+
type: self.type,
162151
start: self.offsetTime(),
163152
duration: self.duration()
164153
}

0 commit comments

Comments
 (0)