Skip to content

Commit 05464ec

Browse files
committed
fix(bitcoind_rpc)!: Simplify emitter
Instead of having an avoid-reemission logic based on timestamps returned by bitcoind RPC, we wrap all transactions in `Arc` and always emit the entire mempool. This makes emission cheap while avoiding the flawed avoid-reemission logic.
1 parent 67dfb0b commit 05464ec

File tree

3 files changed

+134
-394
lines changed

3 files changed

+134
-394
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 96 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
use bdk_core::{BlockId, CheckPoint};
1313
use bitcoin::{Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
15-
use std::{collections::HashSet, ops::Deref};
15+
use std::{
16+
collections::{HashMap, HashSet},
17+
ops::Deref,
18+
sync::Arc,
19+
};
1620

1721
pub mod bip158;
1822

@@ -37,30 +41,23 @@ pub struct Emitter<C> {
3741
/// gives us an opportunity to re-fetch this result.
3842
last_block: Option<bitcoincore_rpc_json::GetBlockResult>,
3943

40-
/// The latest first-seen epoch of emitted mempool transactions. This is used to determine
41-
/// whether a mempool transaction is already emitted.
42-
last_mempool_time: usize,
43-
44-
/// The last emitted block during our last mempool emission. This is used to determine whether
45-
/// there has been a reorg since our last mempool emission.
46-
last_mempool_tip: Option<u32>,
47-
48-
/// A set of txids currently assumed to still be in the mempool.
44+
/// The last snapshot of mempool transactions.
4945
///
50-
/// This is used to detect mempool evictions by comparing the set against the latest mempool
51-
/// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is
52-
/// considered evicted.
46+
/// This is used to detect mempool evictions and as a cache for transactions to emit.
5347
///
54-
/// When the emitter emits a block, confirmed txids are removed from this set. This prevents
55-
/// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp.
56-
expected_mempool_txids: HashSet<Txid>,
48+
/// For mempool evictions, the latest call to `getrawmempool` is compared against this field.
49+
/// Any transaction that is missing from this field is considered evicted. The exception is if
50+
/// the transaction is confirmed into a block - therefore, we only emit evictions when we are
51+
/// sure the tip block is already emitted. When a block is emitted, the transactions in the
52+
/// block are removed from this field.
53+
mempool_snapshot: HashMap<Txid, Arc<Transaction>>,
5754
}
5855

5956
/// Indicates that there are no initially expected mempool transactions.
6057
///
6158
/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
6259
/// to start empty (i.e. with no unconfirmed transactions).
63-
pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
60+
pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();
6461

6562
impl<C> Emitter<C>
6663
where
@@ -75,23 +72,27 @@ where
7572
/// `start_height` starts emission from a given height (if there are no conflicts with the
7673
/// original chain).
7774
///
78-
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
79-
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
80-
/// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
75+
/// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the
76+
/// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
77+
/// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
8178
pub fn new(
8279
client: C,
8380
last_cp: CheckPoint,
8481
start_height: u32,
85-
expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
82+
expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
8683
) -> Self {
8784
Self {
8885
client,
8986
start_height,
9087
last_cp,
9188
last_block: None,
92-
last_mempool_time: 0,
93-
last_mempool_tip: None,
94-
expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
89+
mempool_snapshot: expected_mempool_txs
90+
.into_iter()
91+
.map(|tx| {
92+
let tx: Arc<Transaction> = tx.into();
93+
(tx.compute_txid(), tx)
94+
})
95+
.collect(),
9596
}
9697
}
9798

@@ -115,110 +116,89 @@ where
115116
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
116117
let client = &*self.client;
117118

118-
// This is the emitted tip height during the last mempool emission.
119-
let prev_mempool_tip = self
120-
.last_mempool_tip
121-
// We use `start_height - 1` as we cannot guarantee that the block at
122-
// `start_height` has been emitted.
123-
.unwrap_or(self.start_height.saturating_sub(1));
124-
125-
// Loop to make sure that the fetched mempool content and the fetched tip are consistent
126-
// with one another.
127-
let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop {
128-
// Determine if height and hash matches the best block from the RPC. Evictions are
129-
// deferred if we are not at the best block.
130-
let height = client.get_block_count()?;
131-
let hash = client.get_block_hash(height)?;
132-
133-
// Get the raw mempool result from the RPC client which will be used to determine if any
134-
// transactions have been evicted.
135-
let mp = client.get_raw_mempool_verbose()?;
136-
let mp_txids: HashSet<Txid> = mp.keys().copied().collect();
137-
138-
if height == client.get_block_count()? && hash == client.get_block_hash(height)? {
139-
break (mp, mp_txids, height, hash);
119+
let mut rpc_tip_height;
120+
let mut rpc_tip_hash;
121+
let mut rpc_mempool;
122+
let mut rpc_mempool_txids;
123+
124+
// Ensure we get a mempool snapshot consistent with `rpc_tip_hash` as the tip.
125+
loop {
126+
rpc_tip_height = client.get_block_count()?;
127+
rpc_tip_hash = client.get_block_hash(rpc_tip_height)?;
128+
rpc_mempool = client.get_raw_mempool_verbose()?;
129+
rpc_mempool_txids = rpc_mempool.keys().copied().collect::<HashSet<Txid>>();
130+
let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)?
131+
&& rpc_tip_height == client.get_block_count()?;
132+
if is_still_at_tip {
133+
break;
140134
}
141-
};
142-
143-
let at_tip =
144-
rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash();
145-
146-
// If at tip, any expected txid missing from raw mempool is considered evicted;
147-
// if not at tip, we don't evict anything.
148-
let evicted_txids: HashSet<Txid> = if at_tip {
149-
self.expected_mempool_txids
150-
.difference(&raw_mempool_txids)
151-
.copied()
152-
.collect()
153-
} else {
154-
HashSet::new()
155-
};
135+
}
156136

157-
// Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep
158-
// track of the latest mempool tx's timestamp to determine whether we have seen a tx
159-
// before. `prev_mempool_time` is the previous timestamp and `last_time` records what will
160-
// be the new latest timestamp.
161-
let prev_mempool_time = self.last_mempool_time;
162-
let mut latest_time = prev_mempool_time;
137+
let mut mempool_event = MempoolEvent::default();
138+
let update_time = &mut 0_u64;
163139

164-
let new_txs = raw_mempool
140+
mempool_event.update = rpc_mempool
165141
.into_iter()
166142
.filter_map({
167-
let latest_time = &mut latest_time;
168-
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
169-
let tx_time = tx_entry.time as usize;
170-
if tx_time > *latest_time {
171-
*latest_time = tx_time;
172-
}
173-
// Best-effort check to avoid re-emitting transactions we've already emitted.
174-
//
175-
// Complete suppression isn't possible, since a transaction may spend outputs
176-
// owned by the wallet. To determine if such a transaction is relevant, we must
177-
// have already seen its ancestor(s) that contain the spent prevouts.
178-
//
179-
// Fortunately, bitcoind provides the block height at which the transaction
180-
// entered the mempool. If we've already emitted that block height, we can
181-
// reasonably assume the receiver has seen all ancestor transactions.
182-
let is_already_emitted = tx_time <= prev_mempool_time;
183-
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
184-
if is_already_emitted && is_within_height {
185-
return None;
186-
}
187-
let tx = match client.get_raw_transaction(&txid, None) {
188-
Ok(tx) => tx,
189-
Err(err) if err.is_not_found_error() => return None,
190-
Err(err) => return Some(Err(err)),
143+
|(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
144+
*update_time = u64::max(*update_time, tx_entry.time);
145+
let tx = match self.mempool_snapshot.get(&txid) {
146+
Some(tx) => tx.clone(),
147+
None => match client.get_raw_transaction(&txid, None) {
148+
Ok(tx) => {
149+
let tx = Arc::new(tx);
150+
self.mempool_snapshot.insert(txid, tx.clone());
151+
tx
152+
}
153+
Err(err) if err.is_not_found_error() => return None,
154+
Err(err) => return Some(Err(err)),
155+
},
191156
};
192-
Some(Ok((tx, tx_time as u64)))
157+
Some(Ok((tx, tx_entry.time)))
193158
}
194159
})
195160
.collect::<Result<Vec<_>, _>>()?;
196161

197-
self.last_mempool_time = latest_time;
198-
self.last_mempool_tip = Some(self.last_cp.height());
162+
let at_tip =
163+
rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash();
199164

200-
// If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re
201-
// still catching up to the tip and keep accumulating.
202165
if at_tip {
203-
self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect();
166+
// We only emit evicted transactions when we have already emitted the RPC tip. This is
167+
// because we cannot differenciate between transactions that are confirmed and
168+
// transactions that are evicted, so we rely on emitted blocks to remove
169+
// transactions from the `mempool_snapshot`.
170+
mempool_event.evicted = self
171+
.mempool_snapshot
172+
.keys()
173+
.filter(|&txid| !rpc_mempool_txids.contains(txid))
174+
.map(|&txid| (txid, *update_time))
175+
.collect();
176+
self.mempool_snapshot = mempool_event
177+
.update
178+
.iter()
179+
.map(|(tx, _)| (tx.compute_txid(), tx.clone()))
180+
.collect();
204181
} else {
205-
self.expected_mempool_txids
206-
.extend(new_txs.iter().map(|(tx, _)| tx.compute_txid()));
207-
}
182+
// Since we are still catching up to the tip (a.k.a tip has not been emitted), we
183+
// accumulate more transactions in `mempool_snapshot` so that we can emit evictions in
184+
// a batch once we catch up.
185+
self.mempool_snapshot.extend(
186+
mempool_event
187+
.update
188+
.iter()
189+
.map(|(tx, _)| (tx.compute_txid(), tx.clone())),
190+
);
191+
};
208192

209-
Ok(MempoolEvent {
210-
new_txs,
211-
evicted_txids,
212-
latest_update_time: latest_time as u64,
213-
})
193+
Ok(mempool_event)
214194
}
215195

216196
/// Emit the next block height and block (if any).
217197
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
218198
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
219199
// Stop tracking unconfirmed transactions that have been confirmed in this block.
220200
for tx in &block.txdata {
221-
self.expected_mempool_txids.remove(&tx.compute_txid());
201+
self.mempool_snapshot.remove(&tx.compute_txid());
222202
}
223203
return Ok(Some(BlockEvent { block, checkpoint }));
224204
}
@@ -227,32 +207,13 @@ where
227207
}
228208

229209
/// A new emission from mempool.
230-
#[derive(Debug)]
210+
#[derive(Debug, Default)]
231211
pub struct MempoolEvent {
232-
/// Unemitted transactions or transactions with ancestors that are unseen by the receiver.
233-
///
234-
/// To understand the second condition, consider a receiver which filters transactions based on
235-
/// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction
236-
/// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up
237-
/// to block of height `h-1`, we want to re-emit this transaction until the receiver has
238-
/// seen the block at height `h`.
239-
pub new_txs: Vec<(Transaction, u64)>,
240-
241-
/// [`Txid`]s of all transactions that have been evicted from mempool.
242-
pub evicted_txids: HashSet<Txid>,
212+
/// Transactions currently in the mempool alongside their seen-at timestamp.
213+
pub update: Vec<(Arc<Transaction>, u64)>,
243214

244-
/// The latest timestamp of when a transaction entered the mempool.
245-
///
246-
/// This is useful for setting the timestamp for evicted transactions.
247-
pub latest_update_time: u64,
248-
}
249-
250-
impl MempoolEvent {
251-
/// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions.
252-
pub fn evicted_ats(&self) -> impl ExactSizeIterator<Item = (Txid, u64)> + '_ {
253-
let time = self.latest_update_time;
254-
self.evicted_txids.iter().map(move |&txid| (txid, time))
255-
}
215+
/// Transactions evicted from the mempool alongside their evicted-at timestamp.
216+
pub evicted: Vec<(Txid, u64)>,
256217
}
257218

258219
/// A newly emitted block from [`Emitter`].
@@ -396,16 +357,6 @@ where
396357
continue;
397358
}
398359
PollResponse::AgreementFound(res, cp) => {
399-
let agreement_h = res.height as u32;
400-
401-
// The tip during the last mempool emission needs to in the best chain, we reduce
402-
// it if it is not.
403-
if let Some(h) = emitter.last_mempool_tip.as_mut() {
404-
if *h > agreement_h {
405-
*h = agreement_h;
406-
}
407-
}
408-
409360
// get rid of evicted blocks
410361
emitter.last_cp = cp;
411362
emitter.last_block = Some(res);
@@ -479,7 +430,7 @@ mod test {
479430

480431
for txid in &mempool_txids {
481432
assert!(
482-
emitter.expected_mempool_txids.contains(txid),
433+
emitter.mempool_snapshot.contains_key(txid),
483434
"Expected txid {txid:?} missing"
484435
);
485436
}
@@ -500,19 +451,19 @@ mod test {
500451
.collect::<HashSet<_>>();
501452
for txid in confirmed_txids {
502453
assert!(
503-
!emitter.expected_mempool_txids.contains(&txid),
454+
!emitter.mempool_snapshot.contains_key(&txid),
504455
"Expected txid {txid:?} should have been removed"
505456
);
506457
}
507458
for txid in &mempool_txids {
508459
assert!(
509-
emitter.expected_mempool_txids.contains(txid),
460+
emitter.mempool_snapshot.contains_key(txid),
510461
"Expected txid {txid:?} missing"
511462
);
512463
}
513464
}
514465

515-
assert!(emitter.expected_mempool_txids.is_empty());
466+
assert!(emitter.mempool_snapshot.is_empty());
516467

517468
Ok(())
518469
}

0 commit comments

Comments
 (0)