Skip to content

Commit 785eb08

Browse files
committed
feat: support APM Server intake API version 2
Closes #356
1 parent 69448f5 commit 785eb08

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1306
-2094
lines changed

docs/agent-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,7 @@ Defaults to `unnamed`
10841084
You can alternatively set this via <<span-type,`span.type`>>.
10851085
Defaults to `custom.code`
10861086

1087-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
1087+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
10881088

10891089
See <<span-api,Span API>> docs for details on how to use custom spans.
10901090

docs/span-api.asciidoc

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ Defaults to `unnamed`
7373
You can alternatively set this via <<span-type,`span.type`>>.
7474
Defaults to `custom.code`
7575

76-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
76+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
7777

7878
[[span-end]]
7979
==== `span.end()`
@@ -86,20 +86,3 @@ span.end()
8686
End the span.
8787
If the span has already ended,
8888
nothing happens.
89-
90-
A span that isn't ended before the parent transaction ends will be <<span-truncate,truncated>>.
91-
92-
[[span-truncate]]
93-
==== `span.truncate()`
94-
95-
[source,js]
96-
----
97-
span.truncate()
98-
----
99-
100-
Truncates and ends the span.
101-
If the span is already ended or truncated,
102-
nothing happens.
103-
104-
A truncated span is a special type of ended span.
105-
It's used to indicate that the measured event took longer than the duration recorded by the span.

docs/transaction-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Think of the transaction result as equivalent to the status code of an HTTP resp
5757
transaction.end([result])
5858
----
5959

60-
Ends the transaction and <<span-truncate,truncates>> all un-ended child spans.
60+
Ends the transaction.
6161
If the transaction has already ended,
6262
nothing happens.
6363

lib/agent.js

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ var connect = require('./middleware/connect')
1616
var Filters = require('./filters')
1717
var Instrumentation = require('./instrumentation')
1818
var parsers = require('./parsers')
19-
var request = require('./request')
2019
var stackman = require('./stackman')
2120
var symbols = require('./symbols')
21+
var truncate = require('./truncate')
2222

2323
var IncomingMessage = http.IncomingMessage
2424
var ServerResponse = http.ServerResponse
@@ -33,6 +33,7 @@ function Agent () {
3333

3434
this._instrumentation = new Instrumentation(this)
3535
this._filters = new Filters()
36+
this._apmServer = null
3637

3738
this._config()
3839
}
@@ -43,6 +44,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
4344
}
4445
})
4546

47+
Agent.prototype.destroy = function () {
48+
if (this._apmServer) this._apmServer.destroy()
49+
}
50+
4651
Agent.prototype.startTransaction = function () {
4752
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments)
4853
}
@@ -131,15 +136,35 @@ Agent.prototype.start = function (opts) {
131136
})
132137
}
133138

134-
this._instrumentation.start()
139+
this._apmServer = new ElasticAPMHttpClient({
140+
// metadata
141+
agentName: 'nodejs',
142+
agentVersion: version,
143+
serviceName: this._conf.serviceName,
144+
serviceVersion: this._conf.serviceVersion,
145+
frameworkName: this._conf.frameworkName,
146+
frameworkVersion: this._conf.frameworkVersion,
147+
hostname: this._conf.hostname,
135148

136-
this._httpClient = new ElasticAPMHttpClient({
149+
// Sanitize conf
150+
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value)
151+
152+
// HTTP conf
137153
secretToken: this._conf.secretToken,
138154
userAgent: userAgent,
139155
serverUrl: this._conf.serverUrl,
140156
rejectUnauthorized: this._conf.verifyServerCert,
141-
serverTimeout: this._conf.serverTimeout * 1000
157+
serverTimeout: this._conf.serverTimeout * 1000,
158+
159+
// Streaming conf
160+
size: this._conf.apiRequestSize,
161+
time: this._conf.apiRequestTime * 1000
142162
})
163+
this._apmServer.on('error', err => {
164+
this.logger.error('An error occrued while communicating with the APM Server:', err.message)
165+
})
166+
167+
this._instrumentation.start()
143168

144169
Error.stackTraceLimit = this._conf.stackTraceLimit
145170
if (this._conf.captureExceptions) this.handleUncaughtExceptions()
@@ -287,8 +312,24 @@ Agent.prototype.captureError = function (err, opts, cb) {
287312
}
288313

289314
function send (error) {
290-
agent.logger.info('logging error %s with Elastic APM', id)
291-
request.errors(agent, [error], cb)
315+
error = agent._filters.process(error) // TODO: Update filter to expect this format
316+
317+
if (!error) {
318+
agent.logger.debug('error ignored by filter %o', {id: id})
319+
cb()
320+
return
321+
}
322+
323+
truncate.error(error, agent._conf)
324+
325+
if (agent._apmServer) {
326+
agent.logger.info(`Sending error ${id} to Elastic APM`)
327+
agent._apmServer.sendError(error, function () {
328+
agent._apmServer.flush(cb)
329+
})
330+
} else {
331+
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started')))
332+
}
292333
}
293334
}
294335

@@ -314,7 +355,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
314355
}
315356

316357
Agent.prototype.flush = function (cb) {
317-
this._instrumentation.flush(cb)
358+
if (this._apmServer) {
359+
this._apmServer.flush(cb)
360+
} else {
361+
this.logger.warn(new Error('cannot flush agent before it is started'))
362+
process.nextTick(cb)
363+
}
318364
}
319365

320366
Agent.prototype.lambda = function wrapLambda (type, fn) {

lib/config.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ var DEFAULTS = {
2828
verifyServerCert: true,
2929
active: true,
3030
logLevel: 'info',
31-
hostname: os.hostname(),
31+
hostname: os.hostname(), // TODO: Should we just let the http client default to this?
32+
apiRequestSize: 1024 * 1024, // TODO: Is this the right default
33+
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms?
3234
stackTraceLimit: 50,
3335
captureExceptions: true,
3436
filterHttpHeaders: true,
@@ -44,10 +46,10 @@ var DEFAULTS = {
4446
sourceLinesSpanAppFrames: 0,
4547
sourceLinesSpanLibraryFrames: 0,
4648
errorMessageMaxLength: 2048,
47-
flushInterval: 10,
49+
flushInterval: 10, // TODO: Deprecate
4850
transactionMaxSpans: Infinity,
4951
transactionSampleRate: 1.0,
50-
maxQueueSize: 100,
52+
maxQueueSize: 100, // TODO: Deprecate
5153
serverTimeout: 30,
5254
disableInstrumentations: []
5355
}
@@ -61,6 +63,8 @@ var ENV_TABLE = {
6163
active: 'ELASTIC_APM_ACTIVE',
6264
logLevel: 'ELASTIC_APM_LOG_LEVEL',
6365
hostname: 'ELASTIC_APM_HOSTNAME',
66+
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE',
67+
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME',
6468
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME',
6569
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION',
6670
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT',
@@ -98,6 +102,8 @@ var BOOL_OPTS = [
98102
]
99103

100104
var NUM_OPTS = [
105+
'apiRequestSize',
106+
'apiRequestTime',
101107
'stackTraceLimit',
102108
'abortedErrorThreshold',
103109
'sourceLinesErrorAppFrames',

lib/instrumentation/index.js

Lines changed: 32 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
var fs = require('fs')
44
var path = require('path')
55

6-
var AsyncValuePromise = require('async-value-promise')
76
var hook = require('require-in-the-middle')
87
var semver = require('semver')
98

10-
var Queue = require('../queue')
11-
var request = require('../request')
129
var Transaction = require('./transaction')
10+
var truncate = require('../truncate')
1311
var shimmer = require('./shimmer')
1412

1513
var MODULES = [
@@ -43,7 +41,6 @@ module.exports = Instrumentation
4341

4442
function Instrumentation (agent) {
4543
this._agent = agent
46-
this._queue = null
4744
this._started = false
4845
this.currentTransaction = null
4946
}
@@ -56,19 +53,6 @@ Instrumentation.prototype.start = function () {
5653
var self = this
5754
this._started = true
5855

59-
var qopts = {
60-
flushInterval: this._agent._conf.flushInterval,
61-
maxQueueSize: this._agent._conf.maxQueueSize,
62-
logger: this._agent.logger
63-
}
64-
this._queue = new Queue(qopts, function onFlush (transactions, done) {
65-
AsyncValuePromise.all(transactions).then(function (transactions) {
66-
if (self._agent._conf.active && transactions.length > 0) {
67-
request.transactions(self._agent, transactions, done)
68-
}
69-
}, done)
70-
})
71-
7256
if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) {
7357
require('./async-hooks')(this)
7458
} else {
@@ -106,27 +90,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl
10690
}
10791

10892
Instrumentation.prototype.addEndedTransaction = function (transaction) {
93+
var agent = this._agent
94+
10995
if (this._started) {
110-
var queue = this._queue
96+
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format
97+
if (!payload) return agent.logger.debug('transaction ignored by filter %o', {id: transaction.id})
98+
truncate.transaction(payload)
99+
agent.logger.debug('sending transaction %o', {id: transaction.id})
100+
if (agent._apmServer) agent._apmServer.sendTransaction(payload)
101+
} else {
102+
agent.logger.debug('ignoring transaction %o', {id: transaction.id})
103+
}
104+
}
111105

112-
this._agent.logger.debug('adding transaction to queue %o', {id: transaction.id})
106+
Instrumentation.prototype.addEndedSpan = function (span) {
107+
var agent = this._agent
113108

114-
var payload = new AsyncValuePromise()
109+
if (this._started) {
110+
agent.logger.debug('encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type})
111+
span._encode(function (err, payload) {
112+
if (err) {
113+
agent.logger.error('error encoding span %o', {trans: span.transaction.id, name: span.name, type: span.type, error: err.message})
114+
return
115+
}
115116

116-
payload.catch(function (err) {
117-
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message)
118-
})
117+
payload = agent._filters.process(payload) // TODO: Update filter to expect this format
119118

120-
// Add the transaction payload to the queue instead of the transation
121-
// object it self to free up the transaction for garbage collection
122-
transaction._encode(function (err, _payload) {
123-
if (err) payload.reject(err)
124-
else payload.resolve(_payload)
125-
})
119+
if (!payload) {
120+
agent.logger.debug('span ignored by filter %o', {trans: span.transaction.id, name: span.name, type: span.type})
121+
return
122+
}
126123

127-
queue.add(payload)
124+
truncate.span(payload)
125+
126+
agent.logger.debug('sending span %o', {trans: span.transaction.id, name: span.name, type: span.type})
127+
if (agent._apmServer) agent._apmServer.sendSpan(payload)
128+
})
128129
} else {
129-
this._agent.logger.debug('ignoring transaction %o', {id: transaction.id})
130+
agent.logger.debug('ignoring span %o', {trans: span.transaction.id, name: span.name, type: span.type})
130131
}
131132
}
132133

@@ -216,7 +217,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) {
216217

217218
this.currentTransaction = trans
218219
}
219-
220-
Instrumentation.prototype.flush = function (cb) {
221-
this._queue.flush(cb)
222-
}

lib/instrumentation/span.js

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ module.exports = Span
1414
function Span (transaction) {
1515
this.transaction = transaction
1616
this.started = false
17-
this.truncated = false
1817
this.ended = false
1918
this.name = null
2019
this.type = null
@@ -50,18 +49,6 @@ Span.prototype.customStackTrace = function (stackObj) {
5049
this._recordStackTrace(stackObj)
5150
}
5251

53-
Span.prototype.truncate = function () {
54-
if (!this.started) {
55-
this._agent.logger.debug('tried to truncate non-started span - ignoring %o', {id: this.transaction.id, name: this.name, type: this.type})
56-
return
57-
} else if (this.ended) {
58-
this._agent.logger.debug('tried to truncate already ended span - ignoring %o', {id: this.transaction.id, name: this.name, type: this.type})
59-
return
60-
}
61-
this.truncated = true
62-
this.end()
63-
}
64-
6552
Span.prototype.end = function () {
6653
if (!this.started) {
6754
this._agent.logger.debug('tried to call span.end() on un-started span %o', {id: this.transaction.id, name: this.name, type: this.type})
@@ -75,8 +62,8 @@ Span.prototype.end = function () {
7562
this._agent._instrumentation._recoverTransaction(this.transaction)
7663

7764
this.ended = true
78-
this._agent.logger.debug('ended span %o', {id: this.transaction.id, name: this.name, type: this.type, truncated: this.truncated})
79-
this.transaction._recordEndedSpan(this)
65+
this._agent.logger.debug('ended span %o', {id: this.transaction.id, name: this.name, type: this.type})
66+
this._agent._instrumentation.addEndedSpan(this)
8067
}
8168

8269
Span.prototype.duration = function () {
@@ -157,8 +144,10 @@ Span.prototype._encode = function (cb) {
157144
}
158145

159146
var payload = {
147+
transactionId: self.transaction.id,
148+
timestamp: self.transaction.timestamp,
160149
name: self.name,
161-
type: self.truncated ? self.type + '.truncated' : self.type,
150+
type: self.type,
162151
start: self.offsetTime(),
163152
duration: self.duration()
164153
}

0 commit comments

Comments
 (0)