Skip to content

Commit 049fbea

Browse files
authored
feat: allowing configuration of application level callbacks (#3206)
1 parent ab0c1d4 commit 049fbea

File tree

5 files changed

+68
-12
lines changed

5 files changed

+68
-12
lines changed

library/libwaku.nim

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ import
2727
./waku_thread/inter_thread_communication/requests/ping_request,
2828
./waku_thread/inter_thread_communication/waku_thread_request,
2929
./alloc,
30-
./ffi_types
30+
./ffi_types,
31+
../waku/factory/app_callbacks
3132

3233
################################################################################
3334
### Wrapper around the waku node
@@ -138,10 +139,14 @@ proc waku_new(
138139

139140
ctx.userData = userData
140141

142+
let appCallbacks = AppCallbacks(relayHandler: onReceivedMessage(ctx))
143+
141144
let retCode = handleRequest(
142145
ctx,
143146
RequestType.LIFECYCLE,
144-
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson),
147+
NodeLifecycleRequest.createShared(
148+
NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks
149+
),
145150
callback,
146151
userData,
147152
)

library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import
77
../../../../waku/factory/waku,
88
../../../../waku/factory/node_factory,
99
../../../../waku/factory/networks_config,
10+
../../../../waku/factory/app_callbacks,
1011
../../../alloc
1112

1213
type NodeLifecycleMsgType* = enum
@@ -17,20 +18,27 @@ type NodeLifecycleMsgType* = enum
1718
type NodeLifecycleRequest* = object
1819
operation: NodeLifecycleMsgType
1920
configJson: cstring ## Only used in 'CREATE_NODE' operation
21+
appCallbacks: AppCallbacks
2022

2123
proc createShared*(
22-
T: type NodeLifecycleRequest, op: NodeLifecycleMsgType, configJson: cstring = ""
24+
T: type NodeLifecycleRequest,
25+
op: NodeLifecycleMsgType,
26+
configJson: cstring = "",
27+
appCallbacks: AppCallbacks = nil,
2328
): ptr type T =
2429
var ret = createShared(T)
2530
ret[].operation = op
31+
ret[].appCallbacks = appCallbacks
2632
ret[].configJson = configJson.alloc()
2733
return ret
2834

2935
proc destroyShared(self: ptr NodeLifecycleRequest) =
3036
deallocShared(self[].configJson)
3137
deallocShared(self)
3238

33-
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
39+
proc createWaku(
40+
configJson: cstring, appCallbacks: AppCallbacks = nil
41+
): Future[Result[Waku, string]] {.async.} =
3442
var conf = defaultWakuNodeConf().valueOr:
3543
return err("Failed creating node: " & error)
3644

@@ -59,7 +67,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
5967
formattedString & ". expected type: " & $typeof(confValue)
6068
)
6169

62-
let wakuRes = Waku.new(conf).valueOr:
70+
let wakuRes = Waku.new(conf, appCallbacks).valueOr:
6371
error "waku initialization failed", error = error
6472
return err("Failed setting up Waku: " & $error)
6573

@@ -73,7 +81,7 @@ proc process*(
7381

7482
case self.operation
7583
of CREATE_NODE:
76-
waku[] = (await createWaku(self.configJson)).valueOr:
84+
waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr:
7785
error "CREATE_NODE failed", error = error
7886
return err("error processing createWaku request: " & $error)
7987
of START_NODE:

waku/factory/app_callbacks.nim

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import ../waku_relay/protocol
2+
3+
type AppCallbacks* = ref object
4+
relayHandler*: WakuRelayHandler

waku/factory/node_factory.nim

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,16 @@ proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
124124
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
125125
return uint32(MaxShardIndex + 1)
126126

127+
proc getAutoshards*(
128+
node: WakuNode, contentTopics: seq[string]
129+
): Result[seq[RelayShard], string] =
130+
var autoShards: seq[RelayShard]
131+
for contentTopic in contentTopics:
132+
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
133+
return err("Could not parse content topic: " & error)
134+
autoShards.add(shard)
135+
return ok(autoshards)
136+
127137
proc setupProtocols(
128138
node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey
129139
): Future[Result[void, string]] {.async.} =
@@ -169,11 +179,8 @@ proc setupProtocols(
169179

170180
peerExchangeHandler = some(handlePeerExchange)
171181

172-
var autoShards: seq[RelayShard]
173-
for contentTopic in conf.contentTopics:
174-
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
175-
return err("Could not parse content topic: " & error)
176-
autoShards.add(shard)
182+
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
183+
return err("Could not get autoshards: " & error)
177184

178185
debug "Shards created from content topics",
179186
contentTopics = conf.contentTopics, shards = autoShards

waku/factory/waku.nim

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import
4242
../factory/node_factory,
4343
../factory/internal_config,
4444
../factory/external_config,
45+
../factory/app_callbacks,
4546
../waku_enr/multiaddr
4647

4748
logScope:
@@ -67,6 +68,7 @@ type Waku* = ref object
6768

6869
restServer*: WakuRestServerRef
6970
metricsServer*: MetricsHttpServerRef
71+
appCallbacks*: AppCallbacks
7072

7173
proc logConfig(conf: WakuNodeConf) =
7274
info "Configuration: Enabled protocols",
@@ -146,7 +148,32 @@ proc newCircuitRelay(isRelayClient: bool): Relay =
146148
return RelayClient.new()
147149
return Relay.new()
148150

149-
proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
151+
proc setupAppCallbacks(
152+
node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks
153+
): Result[void, string] =
154+
if appCallbacks.isNil():
155+
info "No external callbacks to be set"
156+
return ok()
157+
158+
if not appCallbacks.relayHandler.isNil():
159+
if node.wakuRelay.isNil():
160+
return err("Cannot configure relayHandler callback without Relay mounted")
161+
162+
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
163+
return err("Could not get autoshards: " & error)
164+
165+
let confShards =
166+
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
167+
let shards = confShards & autoShards
168+
169+
for shard in shards:
170+
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)
171+
172+
return ok()
173+
174+
proc new*(
175+
T: type Waku, confCopy: var WakuNodeConf, appCallbacks: AppCallbacks = nil
176+
): Result[Waku, string] =
150177
let rng = crypto.newRng()
151178

152179
logging.setupLog(confCopy.logLevel, confCopy.logFormat)
@@ -225,6 +252,10 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
225252

226253
let node = nodeRes.get()
227254

255+
node.setupAppCallbacks(confCopy, appCallbacks).isOkOr:
256+
error "Failed setting up app callbacks", error = error
257+
return err("Failed setting up app callbacks: " & $error)
258+
228259
## Delivery Monitor
229260
var deliveryMonitor: DeliveryMonitor
230261
if confCopy.reliabilityEnabled:
@@ -246,6 +277,7 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
246277
key: confCopy.nodekey.get(),
247278
node: node,
248279
deliveryMonitor: deliveryMonitor,
280+
appCallbacks: appCallbacks,
249281
)
250282

251283
waku.setupSwitchServices(confCopy, relay, rng)

0 commit comments

Comments
 (0)