Skip to content

Commit be190a8

Browse files
committed
sqldb+graph/db: implement FilterChannelRange
This lets us run `TestFilterChannelRange` against the SQL backends.
1 parent 2ed7b68 commit be190a8

File tree

6 files changed

+242
-1
lines changed

6 files changed

+242
-1
lines changed

docs/release-notes/release-notes-0.20.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ circuit. The indices are only available for forwarding events saved after v0.20.
8585
* [3](https://github.com/lightningnetwork/lnd/pull/9887)
8686
* [4](https://github.com/lightningnetwork/lnd/pull/9931)
8787
* [5](https://github.com/lightningnetwork/lnd/pull/9935)
88+
* [6](https://github.com/lightningnetwork/lnd/pull/9936)
8889

8990
## RPC Updates
9091

graph/db/graph_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2727,7 +2727,7 @@ func TestFilterChannelRange(t *testing.T) {
27272727
t.Parallel()
27282728
ctx := context.Background()
27292729

2730-
graph := MakeTestGraph(t)
2730+
graph := MakeTestGraphNew(t)
27312731

27322732
// We'll first populate our graph with two nodes. All channels created
27332733
// below will be made between these two nodes.

graph/db/sql_store.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"math"
1111
"net"
12+
"sort"
1213
"strconv"
1314
"sync"
1415
"time"
@@ -92,6 +93,7 @@ type SQLQueries interface {
9293
ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error)
9394
ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error)
9495
GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg sqlc.GetChannelsByPolicyLastUpdateRangeParams) ([]sqlc.GetChannelsByPolicyLastUpdateRangeRow, error)
96+
GetPublicV1ChannelsBySCID(ctx context.Context, arg sqlc.GetPublicV1ChannelsBySCIDParams) ([]sqlc.Channel, error)
9597

9698
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
9799
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
@@ -100,6 +102,7 @@ type SQLQueries interface {
100102
Channel Policy table queries.
101103
*/
102104
UpsertEdgePolicy(ctx context.Context, arg sqlc.UpsertEdgePolicyParams) (int64, error)
105+
GetChannelPolicyByChannelAndNode(ctx context.Context, arg sqlc.GetChannelPolicyByChannelAndNodeParams) (sqlc.ChannelPolicy, error)
103106

104107
InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
105108
GetChannelPolicyExtraTypes(ctx context.Context, arg sqlc.GetChannelPolicyExtraTypesParams) ([]sqlc.GetChannelPolicyExtraTypesRow, error)
@@ -1258,6 +1261,141 @@ func (s *SQLStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
12581261
}, sqldb.NoOpReset)
12591262
}
12601263

1264+
// FilterChannelRange returns the channel ID's of all known channels which were
1265+
// mined in a block height within the passed range. The channel IDs are grouped
1266+
// by their common block height. This method can be used to quickly share with a
1267+
// peer the set of channels we know of within a particular range to catch them
1268+
// up after a period of time offline. If withTimestamps is true then the
1269+
// timestamp info of the latest received channel update messages of the channel
1270+
// will be included in the response.
1271+
//
1272+
// NOTE: This is part of the V1Store interface.
1273+
func (s *SQLStore) FilterChannelRange(startHeight, endHeight uint32,
1274+
withTimestamps bool) ([]BlockChannelRange, error) {
1275+
1276+
var (
1277+
ctx = context.TODO()
1278+
startSCID = &lnwire.ShortChannelID{
1279+
BlockHeight: startHeight,
1280+
}
1281+
endSCID = lnwire.ShortChannelID{
1282+
BlockHeight: endHeight,
1283+
TxIndex: math.MaxUint32 & 0x00ffffff,
1284+
TxPosition: math.MaxUint16,
1285+
}
1286+
)
1287+
1288+
var chanIDStart [8]byte
1289+
byteOrder.PutUint64(chanIDStart[:], startSCID.ToUint64())
1290+
var chanIDEnd [8]byte
1291+
byteOrder.PutUint64(chanIDEnd[:], endSCID.ToUint64())
1292+
1293+
// 1) get all channels where channelID is between start and end chan ID.
1294+
// 2) skip if not public (ie, no channel_proof)
1295+
// 3) collect that channel.
1296+
// 4) if timestamps are wanted, fetch both policies for node 1 and node2
1297+
// and add those timestamps to the collected channel.
1298+
channelsPerBlock := make(map[uint32][]ChannelUpdateInfo)
1299+
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
1300+
dbChans, err := db.GetPublicV1ChannelsBySCID(
1301+
ctx, sqlc.GetPublicV1ChannelsBySCIDParams{
1302+
StartScid: chanIDStart[:],
1303+
EndScid: chanIDEnd[:],
1304+
},
1305+
)
1306+
if err != nil {
1307+
return fmt.Errorf("unable to fetch channel range: %w",
1308+
err)
1309+
}
1310+
1311+
for _, dbChan := range dbChans {
1312+
cid := lnwire.NewShortChanIDFromInt(
1313+
byteOrder.Uint64(dbChan.Scid),
1314+
)
1315+
chanInfo := NewChannelUpdateInfo(
1316+
cid, time.Time{}, time.Time{},
1317+
)
1318+
1319+
if !withTimestamps {
1320+
channelsPerBlock[cid.BlockHeight] = append(
1321+
channelsPerBlock[cid.BlockHeight],
1322+
chanInfo,
1323+
)
1324+
1325+
continue
1326+
}
1327+
1328+
//nolint:ll
1329+
node1Policy, err := db.GetChannelPolicyByChannelAndNode(
1330+
ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{
1331+
Version: int16(ProtocolV1),
1332+
ChannelID: dbChan.ID,
1333+
NodeID: dbChan.NodeID1,
1334+
},
1335+
)
1336+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
1337+
return fmt.Errorf("unable to fetch node1 "+
1338+
"policy: %w", err)
1339+
} else if err == nil {
1340+
chanInfo.Node1UpdateTimestamp = time.Unix(
1341+
node1Policy.LastUpdate.Int64, 0,
1342+
)
1343+
}
1344+
1345+
//nolint:ll
1346+
node2Policy, err := db.GetChannelPolicyByChannelAndNode(
1347+
ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{
1348+
Version: int16(ProtocolV1),
1349+
ChannelID: dbChan.ID,
1350+
NodeID: dbChan.NodeID2,
1351+
},
1352+
)
1353+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
1354+
return fmt.Errorf("unable to fetch node2 "+
1355+
"policy: %w", err)
1356+
} else if err == nil {
1357+
chanInfo.Node2UpdateTimestamp = time.Unix(
1358+
node2Policy.LastUpdate.Int64, 0,
1359+
)
1360+
}
1361+
1362+
channelsPerBlock[cid.BlockHeight] = append(
1363+
channelsPerBlock[cid.BlockHeight], chanInfo,
1364+
)
1365+
}
1366+
1367+
return nil
1368+
}, func() {
1369+
channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
1370+
})
1371+
if err != nil {
1372+
return nil, fmt.Errorf("unable to fetch channel range: %w", err)
1373+
}
1374+
1375+
if len(channelsPerBlock) == 0 {
1376+
return nil, nil
1377+
}
1378+
1379+
// Return the channel ranges in ascending block height order.
1380+
blocks := make([]uint32, 0, len(channelsPerBlock))
1381+
for block := range channelsPerBlock {
1382+
blocks = append(blocks, block)
1383+
}
1384+
sort.Slice(blocks, func(i, j int) bool {
1385+
return blocks[i] < blocks[j]
1386+
})
1387+
1388+
channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
1389+
for _, block := range blocks {
1390+
channelRanges = append(channelRanges, BlockChannelRange{
1391+
Height: block,
1392+
Channels: channelsPerBlock[block],
1393+
})
1394+
}
1395+
1396+
return channelRanges, nil
1397+
}
1398+
12611399
// forEachNodeDirectedChannel iterates through all channels of a given
12621400
// node, executing the passed callback on the directed edge representing the
12631401
// channel and its incoming policy. If the node is not found, no error is

sqldb/sqlc/graph.sql.go

Lines changed: 86 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqldb/sqlc/querier.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqldb/sqlc/queries/graph.sql

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,13 @@ FROM channels c
314314
WHERE c.version = $1
315315
AND (c.node_id_1 = $2 OR c.node_id_2 = $2);
316316

317+
-- name: GetPublicV1ChannelsBySCID :many
318+
SELECT *
319+
FROM channels
320+
WHERE node_1_signature IS NOT NULL
321+
AND scid >= sqlc.arg(start_scid)
322+
AND scid < sqlc.arg(end_scid);
323+
317324
-- name: ListChannelsPaginated :many
318325
SELECT
319326
sqlc.embed(c),
@@ -417,6 +424,13 @@ ON CONFLICT (channel_id, node_id, version)
417424
WHERE EXCLUDED.last_update > channel_policies.last_update
418425
RETURNING id;
419426

427+
-- name: GetChannelPolicyByChannelAndNode :one
428+
SELECT *
429+
FROM channel_policies
430+
WHERE channel_id = $1
431+
AND node_id = $2
432+
AND version = $3;
433+
420434
/* ─────────────────────────────────────────────
421435
channel_policy_extra_types table queries
422436
─────────────────────────────────────────────

0 commit comments

Comments
 (0)