This repository was archived by the owner on Aug 23, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
feat: limit the number of cold calls we can do #316
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
6194d58
feat: limit the number of cold calls we can do
jacobheun a982c14
refactor: cleanup
jacobheun 483557d
fix: constant reference
jacobheun a654c68
refactor: make cold calls configurable
jacobheun e9c67d4
chore: fix linting
jacobheun 253cd9e
fix: make blacklist duration longer
jacobheun 14adab6
fix: improve blacklisting
jacobheun eac4b6b
test: add some tests for queue
jacobheun 0075cc8
fix: make number of attempts configurable
jacobheun 87d9bdc
fix: ensure peers are only added to 1 queue
jacobheun a3e08cf
test: validate cold queue is removed
jacobheun d3889cf
docs: revert bad jsdocs change
jacobheun 9f5c86b
feat: purge old queues every hour
jacobheun 6203879
test: fix aegir post script node shutdown
jacobheun ca4a0b3
fix: abort the cold call queue on manager abort
jacobheun 6b85d63
fix: improve queue cleanup and lower interval to 15 mins
jacobheun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
'use strict' | ||
|
||
module.exports = { | ||
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again | ||
BLACK_LIST_TTL: 5 * 60 * 1e3, // How long before an errored peer can be dialed again | ||
BLACK_LIST_ATTEMPTS: 5, // Num of unsuccessful dials before a peer is permanently blacklisted | ||
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take | ||
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials | ||
MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued | ||
MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials | ||
QUARTER_HOUR: 15 * 60e3 | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,10 @@ | |
|
||
const once = require('once') | ||
const Queue = require('./queue') | ||
const { DIAL_ABORTED } = require('../errors') | ||
const nextTick = require('async/nextTick') | ||
const retimer = require('retimer') | ||
const { QUARTER_HOUR } = require('../constants') | ||
const noop = () => {} | ||
|
||
class DialQueueManager { | ||
|
@@ -11,9 +15,53 @@ class DialQueueManager { | |
*/ | ||
constructor (_switch) { | ||
this._queue = new Set() | ||
this._coldCallQueue = new Set() | ||
this._dialingQueues = new Set() | ||
this._queues = {} | ||
this.switch = _switch | ||
this._cleanInterval = retimer(this._clean.bind(this), QUARTER_HOUR) | ||
} | ||
|
||
/** | ||
* Runs through all queues, aborts and removes them if they | ||
* are no longer valid. A queue that is blacklisted indefinitely, | ||
* is considered no longer valid. | ||
* @private | ||
*/ | ||
_clean () { | ||
const queues = Object.values(this._queues) | ||
queues.forEach(dialQueue => { | ||
// Clear if the queue has reached max blacklist | ||
if (dialQueue.blackListed === Infinity) { | ||
dialQueue.abort() | ||
delete this._queues[dialQueue.id] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about doing This is more efficient than deleting, but than we will need to verify that keys have a value when iterating the array, so I do not have a strong opinion here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is only running on a 15min interval I think the performance hit should be fairly insignificant and would avoid us needing to check values. |
||
return | ||
} | ||
|
||
// Keep track of blacklisted queues | ||
if (dialQueue.blackListed) return | ||
|
||
// Clear if peer is no longer active | ||
// To avoid reallocating memory, dont delete queues of | ||
// connected peers, as these are highly likely to leverage the | ||
// queues in the immediate term | ||
if (!dialQueue.isRunning && dialQueue.length < 1) { | ||
let isConnected = false | ||
try { | ||
const peerInfo = this.switch._peerBook.get(dialQueue.id) | ||
isConnected = Boolean(peerInfo.isConnected()) | ||
} catch (_) { | ||
// If we get an error, that means the peerbook doesnt have the peer | ||
} | ||
|
||
if (!isConnected) { | ||
dialQueue.abort() | ||
delete this._queues[dialQueue.id] | ||
} | ||
} | ||
}) | ||
|
||
this._cleanInterval.reschedule(QUARTER_HOUR) | ||
} | ||
|
||
/** | ||
|
@@ -25,16 +73,21 @@ class DialQueueManager { | |
abort () { | ||
// Clear the general queue | ||
this._queue.clear() | ||
// Clear the cold call queue | ||
this._coldCallQueue.clear() | ||
|
||
this._cleanInterval.clear() | ||
|
||
// Abort the individual peer queues | ||
const queues = Object.values(this._queues) | ||
queues.forEach(dialQueue => { | ||
dialQueue.abort() | ||
delete this._queues[dialQueue.id] | ||
}) | ||
} | ||
|
||
/** | ||
* Adds the `dialRequest` to the queue and ensures the queue is running | ||
* Adds the `dialRequest` to the queue and ensures queue is running | ||
* | ||
* @param {DialRequest} dialRequest | ||
* @returns {void} | ||
|
@@ -44,6 +97,11 @@ class DialQueueManager { | |
|
||
// Add the dial to its respective queue | ||
const targetQueue = this.getQueue(peerInfo) | ||
// If we have too many cold calls, abort the dial immediately | ||
if (this._coldCallQueue.size >= this.switch.dialer.MAX_COLD_CALLS && !protocol) { | ||
return nextTick(callback, DIAL_ABORTED()) | ||
} | ||
|
||
targetQueue.add(protocol, useFSM, callback) | ||
|
||
// If we're already connected to the peer, start the queue now | ||
|
@@ -54,10 +112,23 @@ class DialQueueManager { | |
return | ||
} | ||
|
||
// Add the id to the general queue set if the queue isn't running | ||
// and if the queue is allowed to dial | ||
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) { | ||
this._queue.add(targetQueue.id) | ||
// If dialing is not allowed, abort | ||
if (!targetQueue.isDialAllowed()) { | ||
return | ||
} | ||
|
||
// Add the id to its respective queue set if the queue isn't running | ||
if (!targetQueue.isRunning) { | ||
dirkmc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (protocol) { | ||
this._queue.add(targetQueue.id) | ||
this._coldCallQueue.delete(targetQueue.id) | ||
// Only add it to the cold queue if it's not in the normal queue | ||
} else if (!this._queue.has(targetQueue.id)) { | ||
this._coldCallQueue.add(targetQueue.id) | ||
// The peer is already in the normal queue, abort the cold call | ||
} else { | ||
return nextTick(callback, DIAL_ABORTED()) | ||
} | ||
} | ||
|
||
this.run() | ||
|
@@ -67,11 +138,21 @@ class DialQueueManager { | |
* Will execute up to `MAX_PARALLEL_DIALS` dials | ||
*/ | ||
run () { | ||
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) { | ||
let nextQueue = this._queue.values().next() | ||
if (nextQueue.done) return | ||
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS) { | ||
let nextQueue = { done: true } | ||
// Check the queue first and fall back to the cold call queue | ||
if (this._queue.size > 0) { | ||
nextQueue = this._queue.values().next() | ||
this._queue.delete(nextQueue.value) | ||
} else if (this._coldCallQueue.size > 0) { | ||
nextQueue = this._coldCallQueue.values().next() | ||
this._coldCallQueue.delete(nextQueue.value) | ||
} | ||
|
||
if (nextQueue.done) { | ||
return | ||
} | ||
|
||
this._queue.delete(nextQueue.value) | ||
let targetQueue = this._queues[nextQueue.value] | ||
this._dialingQueues.add(targetQueue.id) | ||
targetQueue.start() | ||
|
@@ -83,7 +164,9 @@ class DialQueueManager { | |
* @param {PeerInfo} peerInfo | ||
*/ | ||
clearBlacklist (peerInfo) { | ||
this.getQueue(peerInfo).blackListed = null | ||
const queue = this.getQueue(peerInfo) | ||
queue.blackListed = null | ||
queue.blackListCount = 0 | ||
} | ||
|
||
/** | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.