Skip to content

Commit 842ae9f

Browse files
committed
Refactor cudaq_host_dispatch_loop_ctx_t to compose public API structs
Embed cudaq_ringbuffer_t, cudaq_dispatcher_config_t, and cudaq_function_table_t as members instead of flattening their fields. This eliminates field duplication, makes the data provenance clear, and simplifies construction to struct copies. Host-specific runtime state (workers, idle_mask, io_ctxs, etc.) remains as direct fields. Signed-off-by: Scott Thornton <wsttiger@gmail.com>
1 parent 5bdee00 commit 842ae9f

File tree

4 files changed

+97
-115
lines changed

4 files changed

+97
-115
lines changed

realtime/include/cudaq/realtime/daemon/dispatcher/host_dispatcher.h

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,33 +42,22 @@ typedef struct {
4242
} cudaq_host_dispatch_worker_t;
4343

4444
typedef struct {
45-
void *rx_flags; ///< opaque cuda::std::atomic<uint64_t>*
46-
void *tx_flags; ///< opaque cuda::std::atomic<uint64_t>*
47-
uint8_t *rx_data_host;
48-
uint8_t *rx_data_dev;
49-
uint8_t *tx_data_host;
50-
uint8_t *tx_data_dev;
51-
size_t tx_stride_sz;
52-
void **h_mailbox_bank;
53-
size_t num_slots;
54-
size_t slot_size;
45+
// Composed public API structs
46+
cudaq_ringbuffer_t ringbuffer;
47+
cudaq_dispatcher_config_t config;
48+
cudaq_function_table_t function_table;
49+
50+
// Host dispatch runtime state
5551
cudaq_host_dispatch_worker_t *workers;
5652
size_t num_workers;
57-
/// Host-visible function table for lookup by function_id (GRAPH_LAUNCH only;
58-
/// others dropped).
59-
cudaq_function_entry_t *function_table;
60-
size_t function_table_count;
61-
void *shutdown_flag; ///< opaque cuda::std::atomic<int>*
53+
void **h_mailbox_bank;
54+
void *shutdown_flag; ///< opaque cuda::std::atomic<int>*
6255
uint64_t *stats_counter;
6356
void *live_dispatched; ///< opaque cuda::std::atomic<uint64_t>*
6457
void *idle_mask; ///< opaque cuda::std::atomic<uint64_t>*, 1=free 0=busy
6558
int *inflight_slot_tags; ///< worker_id -> origin FPGA slot for tx_flags
6659
///< routing
6760

68-
/// Device view of tx_flags (needed for GraphIOContext.tx_flag).
69-
/// NULL when tx_flags is already a device-accessible pointer.
70-
volatile uint64_t *tx_flags_dev;
71-
7261
/// Per-worker GraphIOContext array for separate RX/TX buffer support.
7362
/// When non-NULL, launch_graph_worker fills a GraphIOContext per dispatch
7463
/// and writes its device address into h_mailbox_bank[worker_id].

realtime/lib/daemon/dispatcher/host_dispatcher.cu

Lines changed: 60 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ using atomic_int_sys = cuda::std::atomic<int>;
1717
static inline atomic_uint64_sys *as_atomic_u64(void *p) {
1818
return static_cast<atomic_uint64_sys *>(p);
1919
}
20+
static inline atomic_uint64_sys *as_atomic_u64(volatile uint64_t *p) {
21+
return reinterpret_cast<atomic_uint64_sys *>(const_cast<uint64_t *>(p));
22+
}
2023
static inline atomic_int_sys *as_atomic_int(void *p) {
2124
return static_cast<atomic_int_sys *>(p);
2225
}
@@ -36,13 +39,13 @@ lookup_function(cudaq_function_entry_t *table, size_t count,
3639
}
3740

3841
static int
39-
find_idle_graph_worker_for_function(const cudaq_host_dispatch_loop_ctx_t *config,
42+
find_idle_graph_worker_for_function(const cudaq_host_dispatch_loop_ctx_t *ctx,
4043
uint32_t function_id) {
41-
uint64_t mask = as_atomic_u64(config->idle_mask)->load(
44+
uint64_t mask = as_atomic_u64(ctx->idle_mask)->load(
4245
cuda::std::memory_order_acquire);
4346
while (mask != 0) {
4447
int worker_id = __builtin_ffsll(static_cast<long long>(mask)) - 1;
45-
if (config->workers[static_cast<size_t>(worker_id)].function_id ==
48+
if (ctx->workers[static_cast<size_t>(worker_id)].function_id ==
4649
function_id)
4750
return worker_id;
4851
mask &= ~(1ULL << worker_id);
@@ -58,104 +61,101 @@ struct ParsedSlot {
5861

5962
static ParsedSlot
6063
parse_slot_with_function_table(void *slot_host,
61-
const cudaq_host_dispatch_loop_ctx_t *config) {
64+
const cudaq_host_dispatch_loop_ctx_t *ctx) {
6265
ParsedSlot out;
6366
const RPCHeader *header = static_cast<const RPCHeader *>(slot_host);
6467
if (header->magic != RPC_MAGIC_REQUEST) {
6568
out.drop = true;
6669
return out;
6770
}
6871
out.function_id = header->function_id;
69-
out.entry = lookup_function(config->function_table,
70-
config->function_table_count, out.function_id);
72+
out.entry = lookup_function(ctx->function_table.entries,
73+
ctx->function_table.count, out.function_id);
7174
if (!out.entry)
7275
out.drop = true;
7376
return out;
7477
}
7578

76-
static void finish_slot_and_advance(const cudaq_host_dispatch_loop_ctx_t *config,
79+
static void finish_slot_and_advance(const cudaq_host_dispatch_loop_ctx_t *ctx,
7780
size_t &current_slot, size_t num_slots,
7881
uint64_t &packets_dispatched) {
79-
as_atomic_u64(config->rx_flags)[current_slot].store(
82+
as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].store(
8083
0, cuda::std::memory_order_release);
8184
packets_dispatched++;
82-
if (config->live_dispatched)
83-
as_atomic_u64(config->live_dispatched)
85+
if (ctx->live_dispatched)
86+
as_atomic_u64(ctx->live_dispatched)
8487
->fetch_add(1, cuda::std::memory_order_relaxed);
8588
current_slot = (current_slot + 1) % num_slots;
8689
}
8790

88-
static int acquire_graph_worker(const cudaq_host_dispatch_loop_ctx_t *config,
91+
static int acquire_graph_worker(const cudaq_host_dispatch_loop_ctx_t *ctx,
8992
bool use_function_table,
9093
const cudaq_function_entry_t *entry,
9194
uint32_t function_id) {
9295
if (use_function_table && entry &&
9396
entry->dispatch_mode == CUDAQ_DISPATCH_GRAPH_LAUNCH)
94-
return find_idle_graph_worker_for_function(config, function_id);
97+
return find_idle_graph_worker_for_function(ctx, function_id);
9598
uint64_t mask =
96-
as_atomic_u64(config->idle_mask)->load(cuda::std::memory_order_acquire);
99+
as_atomic_u64(ctx->idle_mask)->load(cuda::std::memory_order_acquire);
97100
if (mask == 0)
98101
return -1;
99102
return __builtin_ffsll(static_cast<long long>(mask)) - 1;
100103
}
101104

102-
static void launch_graph_worker(const cudaq_host_dispatch_loop_ctx_t *config,
105+
static void launch_graph_worker(const cudaq_host_dispatch_loop_ctx_t *ctx,
103106
int worker_id, void *slot_host,
104107
size_t current_slot) {
105-
as_atomic_u64(config->idle_mask)
108+
as_atomic_u64(ctx->idle_mask)
106109
->fetch_and(~(1ULL << worker_id), cuda::std::memory_order_release);
107-
config->inflight_slot_tags[worker_id] = static_cast<int>(current_slot);
110+
ctx->inflight_slot_tags[worker_id] = static_cast<int>(current_slot);
108111

109112
ptrdiff_t offset =
110-
static_cast<uint8_t *>(slot_host) - config->rx_data_host;
111-
void *data_dev = static_cast<void *>(config->rx_data_dev + offset);
113+
static_cast<uint8_t *>(slot_host) - ctx->ringbuffer.rx_data_host;
114+
void *data_dev = static_cast<void *>(ctx->ringbuffer.rx_data + offset);
112115

113-
if (config->io_ctxs_host != nullptr) {
114-
// GraphIOContext mode: fill per-worker context with separate RX/TX info.
115-
auto *h_ctxs = static_cast<GraphIOContext *>(config->io_ctxs_host);
116-
auto *d_ctxs = static_cast<uint8_t *>(config->io_ctxs_dev);
116+
if (ctx->io_ctxs_host != nullptr) {
117+
auto *h_ctxs = static_cast<GraphIOContext *>(ctx->io_ctxs_host);
118+
auto *d_ctxs = static_cast<uint8_t *>(ctx->io_ctxs_dev);
117119
GraphIOContext *h_ctx = &h_ctxs[worker_id];
118120

119121
h_ctx->rx_slot = data_dev;
120-
h_ctx->tx_slot = config->tx_data_dev + current_slot * config->tx_stride_sz;
121-
h_ctx->tx_flag = &config->tx_flags_dev[current_slot];
122+
h_ctx->tx_slot = ctx->ringbuffer.tx_data +
123+
current_slot * ctx->ringbuffer.tx_stride_sz;
124+
h_ctx->tx_flag = &ctx->ringbuffer.tx_flags[current_slot];
122125
h_ctx->tx_flag_value =
123126
reinterpret_cast<uint64_t>(h_ctx->tx_slot);
124-
h_ctx->tx_stride_sz = config->tx_stride_sz;
127+
h_ctx->tx_stride_sz = ctx->ringbuffer.tx_stride_sz;
125128

126129
void *d_ctx = d_ctxs + worker_id * sizeof(GraphIOContext);
127-
config->h_mailbox_bank[worker_id] = d_ctx;
130+
ctx->h_mailbox_bank[worker_id] = d_ctx;
128131

129-
// In GraphIOContext mode the graph kernel writes tx_flag_value (READY)
130-
// to tx_flags from the GPU. Set the in-flight marker BEFORE launch so
131-
// the kernel's READY write is never clobbered by a late host write.
132-
as_atomic_u64(config->tx_flags)[current_slot].store(
132+
as_atomic_u64(ctx->ringbuffer.tx_flags_host)[current_slot].store(
133133
CUDAQ_TX_FLAG_IN_FLIGHT, cuda::std::memory_order_release);
134134
__sync_synchronize();
135135
} else {
136-
config->h_mailbox_bank[worker_id] = data_dev;
136+
ctx->h_mailbox_bank[worker_id] = data_dev;
137137
}
138138
__sync_synchronize();
139139

140140
const size_t w = static_cast<size_t>(worker_id);
141-
if (config->workers[w].pre_launch_fn)
142-
config->workers[w].pre_launch_fn(config->workers[w].pre_launch_data,
143-
data_dev, config->workers[w].stream);
144-
cudaError_t err = cudaGraphLaunch(config->workers[w].graph_exec,
145-
config->workers[w].stream);
141+
if (ctx->workers[w].pre_launch_fn)
142+
ctx->workers[w].pre_launch_fn(ctx->workers[w].pre_launch_data,
143+
data_dev, ctx->workers[w].stream);
144+
cudaError_t err = cudaGraphLaunch(ctx->workers[w].graph_exec,
145+
ctx->workers[w].stream);
146146

147147
if (err != cudaSuccess) {
148148
uint64_t error_val = CUDAQ_TX_FLAG_ERROR_TAG << 48 | (uint64_t)err;
149-
as_atomic_u64(config->tx_flags)[current_slot].store(
149+
as_atomic_u64(ctx->ringbuffer.tx_flags_host)[current_slot].store(
150150
error_val, cuda::std::memory_order_release);
151-
as_atomic_u64(config->idle_mask)
151+
as_atomic_u64(ctx->idle_mask)
152152
->fetch_or(1ULL << worker_id, cuda::std::memory_order_release);
153153
} else {
154-
if (config->workers[w].post_launch_fn)
155-
config->workers[w].post_launch_fn(config->workers[w].post_launch_data,
156-
data_dev, config->workers[w].stream);
157-
if (config->io_ctxs_host == nullptr) {
158-
as_atomic_u64(config->tx_flags)[current_slot].store(
154+
if (ctx->workers[w].post_launch_fn)
155+
ctx->workers[w].post_launch_fn(ctx->workers[w].post_launch_data,
156+
data_dev, ctx->workers[w].stream);
157+
if (ctx->io_ctxs_host == nullptr) {
158+
as_atomic_u64(ctx->ringbuffer.tx_flags_host)[current_slot].store(
159159
CUDAQ_TX_FLAG_IN_FLIGHT, cuda::std::memory_order_release);
160160
}
161161
}
@@ -164,17 +164,18 @@ static void launch_graph_worker(const cudaq_host_dispatch_loop_ctx_t *config,
164164
} // anonymous namespace
165165

166166
extern "C" void
167-
cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *config) {
167+
cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) {
168168
size_t current_slot = 0;
169-
const size_t num_slots = config->num_slots;
169+
const size_t num_slots = ctx->config.num_slots;
170170
uint64_t packets_dispatched = 0;
171171
const bool use_function_table =
172-
(config->function_table != nullptr && config->function_table_count > 0);
172+
(ctx->function_table.entries != nullptr && ctx->function_table.count > 0);
173173

174-
while (as_atomic_int(config->shutdown_flag)
174+
while (as_atomic_int(ctx->shutdown_flag)
175175
->load(cuda::std::memory_order_acquire) == 0) {
176-
uint64_t rx_value = as_atomic_u64(config->rx_flags)[current_slot].load(
177-
cuda::std::memory_order_acquire);
176+
uint64_t rx_value =
177+
as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].load(
178+
cuda::std::memory_order_acquire);
178179

179180
if (rx_value == 0) {
180181
CUDAQ_REALTIME_CPU_RELAX();
@@ -187,9 +188,9 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *config) {
187188

188189
// TODO: Remove non-function-table path; RPC framing is always required.
189190
if (use_function_table) {
190-
ParsedSlot parsed = parse_slot_with_function_table(slot_host, config);
191+
ParsedSlot parsed = parse_slot_with_function_table(slot_host, ctx);
191192
if (parsed.drop) {
192-
as_atomic_u64(config->rx_flags)[current_slot].store(
193+
as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].store(
193194
0, cuda::std::memory_order_release);
194195
current_slot = (current_slot + 1) % num_slots;
195196
continue;
@@ -199,29 +200,29 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *config) {
199200
}
200201

201202
if (entry && entry->dispatch_mode != CUDAQ_DISPATCH_GRAPH_LAUNCH) {
202-
as_atomic_u64(config->rx_flags)[current_slot].store(
203+
as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].store(
203204
0, cuda::std::memory_order_release);
204205
current_slot = (current_slot + 1) % num_slots;
205206
continue;
206207
}
207208

208209
int worker_id =
209-
acquire_graph_worker(config, use_function_table, entry, function_id);
210+
acquire_graph_worker(ctx, use_function_table, entry, function_id);
210211
if (worker_id < 0) {
211212
CUDAQ_REALTIME_CPU_RELAX();
212213
continue;
213214
}
214215

215-
launch_graph_worker(config, worker_id, slot_host, current_slot);
216-
finish_slot_and_advance(config, current_slot, num_slots,
216+
launch_graph_worker(ctx, worker_id, slot_host, current_slot);
217+
finish_slot_and_advance(ctx, current_slot, num_slots,
217218
packets_dispatched);
218219
}
219220

220-
for (size_t i = 0; i < config->num_workers; ++i) {
221-
cudaStreamSynchronize(config->workers[i].stream);
221+
for (size_t i = 0; i < ctx->num_workers; ++i) {
222+
cudaStreamSynchronize(ctx->workers[i].stream);
222223
}
223224

224-
if (config->stats_counter) {
225-
*config->stats_counter = packets_dispatched;
225+
if (ctx->stats_counter) {
226+
*ctx->stats_counter = packets_dispatched;
226227
}
227228
}

realtime/lib/daemon/dispatcher/host_dispatcher_capi.cu

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,37 +147,28 @@ extern "C" cudaq_host_dispatcher_handle_t *cudaq_host_dispatcher_start_thread(
147147
}
148148
handle->io_ctxs_pinned = io_ctxs_host_ptr;
149149

150-
cudaq_host_dispatch_loop_ctx_t host_config;
151-
std::memset(&host_config, 0, sizeof(host_config));
152-
host_config.rx_flags = (void *)(uintptr_t)ringbuffer->rx_flags_host;
153-
host_config.tx_flags = (void *)(uintptr_t)ringbuffer->tx_flags_host;
154-
host_config.rx_data_host = ringbuffer->rx_data_host;
155-
host_config.rx_data_dev = ringbuffer->rx_data;
156-
host_config.tx_data_host = ringbuffer->tx_data_host;
157-
host_config.tx_data_dev = ringbuffer->tx_data;
158-
host_config.tx_stride_sz = ringbuffer->tx_stride_sz;
159-
host_config.h_mailbox_bank = handle->h_mailbox_bank;
160-
host_config.num_slots = config->num_slots;
161-
host_config.slot_size = config->slot_size;
162-
host_config.workers = handle->workers;
163-
host_config.num_workers = num_workers;
164-
host_config.function_table = table->entries;
165-
host_config.function_table_count = table->count;
150+
cudaq_host_dispatch_loop_ctx_t ctx;
151+
std::memset(&ctx, 0, sizeof(ctx));
152+
ctx.ringbuffer = *ringbuffer;
153+
ctx.config = *config;
154+
ctx.function_table = *table;
155+
ctx.workers = handle->workers;
156+
ctx.num_workers = num_workers;
157+
ctx.h_mailbox_bank = handle->h_mailbox_bank;
166158
// The C API takes volatile int* for ABI stability; internally the dispatch
167159
// loop accesses it via cuda::std::atomic<int>* for acquire semantics.
168160
// This is safe: cuda::std::atomic<int> is lock-free and layout-compatible
169161
// with int on all CUDA-supported platforms.
170-
host_config.shutdown_flag = (void *)(uintptr_t)shutdown_flag;
171-
host_config.stats_counter = stats;
172-
host_config.live_dispatched = nullptr;
173-
host_config.idle_mask = handle->idle_mask;
174-
host_config.inflight_slot_tags = handle->inflight_slot_tags;
175-
host_config.tx_flags_dev = ringbuffer->tx_flags;
176-
host_config.io_ctxs_host = io_ctxs_host_ptr;
177-
host_config.io_ctxs_dev = io_ctxs_dev_ptr;
162+
ctx.shutdown_flag = (void *)(uintptr_t)shutdown_flag;
163+
ctx.stats_counter = stats;
164+
ctx.live_dispatched = nullptr;
165+
ctx.idle_mask = handle->idle_mask;
166+
ctx.inflight_slot_tags = handle->inflight_slot_tags;
167+
ctx.io_ctxs_host = io_ctxs_host_ptr;
168+
ctx.io_ctxs_dev = io_ctxs_dev_ptr;
178169

179170
handle->thread = std::thread(
180-
[cfg = host_config]() { cudaq_host_dispatcher_loop(&cfg); });
171+
[cfg = ctx]() { cudaq_host_dispatcher_loop(&cfg); });
181172
return handle;
182173
}
183174

realtime/unittests/test_host_dispatcher.cu

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -349,20 +349,21 @@ protected:
349349
idle_mask_->store((1ULL << workers_.size()) - 1,
350350
cuda::std::memory_order_release);
351351

352-
config_.rx_flags = (void *)(uintptr_t)rx_flags_host_;
353-
config_.tx_flags = (void *)(uintptr_t)tx_flags_host_;
354-
config_.rx_data_host = rx_data_host_;
355-
config_.rx_data_dev = rx_data_dev_;
356-
config_.tx_data_host = tx_data_host_;
357-
config_.tx_data_dev = tx_data_dev_;
358-
config_.tx_stride_sz = slot_size_;
359-
config_.h_mailbox_bank = h_mailbox_bank_;
360-
config_.num_slots = num_slots_;
361-
config_.slot_size = slot_size_;
352+
config_.ringbuffer.rx_flags_host = rx_flags_host_;
353+
config_.ringbuffer.tx_flags_host = tx_flags_host_;
354+
config_.ringbuffer.rx_data_host = rx_data_host_;
355+
config_.ringbuffer.rx_data = rx_data_dev_;
356+
config_.ringbuffer.tx_data_host = tx_data_host_;
357+
config_.ringbuffer.tx_data = tx_data_dev_;
358+
config_.ringbuffer.tx_stride_sz = slot_size_;
359+
config_.ringbuffer.tx_flags = tx_flags_dev_;
360+
config_.config.num_slots = static_cast<uint32_t>(num_slots_);
361+
config_.config.slot_size = static_cast<uint32_t>(slot_size_);
362+
config_.function_table.entries = function_table_;
363+
config_.function_table.count = static_cast<uint32_t>(function_table_count_);
362364
config_.workers = workers_.data();
363365
config_.num_workers = workers_.size();
364-
config_.function_table = function_table_;
365-
config_.function_table_count = function_table_count_;
366+
config_.h_mailbox_bank = h_mailbox_bank_;
366367
config_.shutdown_flag = shutdown_flag_;
367368
config_.stats_counter = &stats_counter_;
368369
config_.live_dispatched = live_dispatched_;

0 commit comments

Comments
 (0)