update proto, add config update publish broadcast, add messages compacting

This commit is contained in:
2026-04-10 22:24:40 +03:00
parent e053d445da
commit b5a4fdf755
10 changed files with 287 additions and 32 deletions

View File

@@ -11,7 +11,7 @@ LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE)
SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/ SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/
STATIC_LIBS=crypto STATIC_LIBS=crypto
STANDART=c23 STANDART=c23
OPTIMIZE=-Og OPTIMIZE=-O3
TARGET=main TARGET=main
FLATCC = deps/flatcc/bin/flatcc FLATCC = deps/flatcc/bin/flatcc

12
flatbuffers/compacted.fbs Normal file
View File

@@ -0,0 +1,12 @@
namespace hmmmm;
// A single serialized ServerMessage frame packed into a byte vector.
table CompactedFrame {
data: [ubyte];
}
// A batch of ServerMessage frames sent as a single WebSocket frame to reduce
// per-frame syscall overhead when message rate exceeds the batching threshold.
table CompactedMessage {
frames: [CompactedFrame];
}

View File

@@ -2,6 +2,7 @@ include "auth/auth.fbs";
include "control/control.fbs"; include "control/control.fbs";
include "stream/stream.fbs"; include "stream/stream.fbs";
include "mem/mem.fbs"; include "mem/mem.fbs";
include "compacted.fbs";
namespace hmmmm; namespace hmmmm;
@@ -29,6 +30,7 @@ union ServerPayload {
hmmmm.ctrl.CtrlServerMessage, hmmmm.ctrl.CtrlServerMessage,
hmmmm.stream.StreamServerMessage, hmmmm.stream.StreamServerMessage,
hmmmm.mem.MemServerMessage, hmmmm.mem.MemServerMessage,
CompactedMessage,
} }
// Every frame sent by the server is a ServerMessage. // Every frame sent by the server is a ServerMessage.

View File

@@ -38,6 +38,9 @@ typedef struct {
size_t interceptCtxCount; size_t interceptCtxCount;
flatcc_builder_t stream_builder; flatcc_builder_t stream_builder;
uint64_t simRateLimit; uint64_t simRateLimit;
// Cached DeviceIdMappingNotif broadcast message (sent to newly authed clients)
uint8_t* deviceIdMappingMsg;
size_t deviceIdMappingMsgLen;
} EmulContext; } EmulContext;

View File

@@ -16,6 +16,7 @@ typedef struct {
ws_cli_conn_t clientIdx; ws_cli_conn_t clientIdx;
uint8_t* msg; uint8_t* msg;
size_t msgLen; size_t msgLen;
uint64_t dispatch_us; // microsecond timestamp when queued (for batching window)
} OutgoingMessage; } OutgoingMessage;
// Build a ServerMessage{AuthResponse} frame. Returns a malloc'd buffer; // Build a ServerMessage{AuthResponse} frame. Returns a malloc'd buffer;
@@ -59,12 +60,21 @@ uint8_t* fb_build_stream_reg_confirm(
// Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{DeviceIdMappingNotif}}} frame. // Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{DeviceIdMappingNotif}}} frame.
// paths: array of NULL-terminated string arrays; path_lens: component count per path. // paths: array of NULL-terminated string arrays; path_lens: component count per path.
// seg_names[i]: array of seg_counts[i] segment name strings for device i.
uint8_t* fb_build_config_device_id_mapping( uint8_t* fb_build_config_device_id_mapping(
uint64_t nonce, uint64_t nonce,
char** paths[], size_t path_lens[], char** paths[], size_t path_lens[],
char* seg_names[][64], size_t seg_counts[],
size_t device_count, size_t* len_out); size_t device_count, size_t* len_out);
// Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{ConfigLoadError}}} frame. // Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{ConfigLoadError}}} frame.
uint8_t* fb_build_config_error(uint64_t nonce, const char* message, size_t* len_out); uint8_t* fb_build_config_error(uint64_t nonce, const char* message, size_t* len_out);
// Build a ServerMessage{CompactedMessage{frames=[...]}} frame.
// msgs / msg_lens: array of `count` pre-built ServerMessage byte buffers to pack.
// The caller retains ownership of the source buffers; this function copies their bytes.
// Nonce is set to UINT64_MAX (no-nonce / fire-and-forget).
uint8_t* fb_build_compacted_message(
uint8_t* const* msgs, const size_t* msg_lens, size_t count, size_t* len_out);
#endif // __PROTO_MSG_H__ #endif // __PROTO_MSG_H__

View File

@@ -164,6 +164,74 @@ void my_sleep(int microseconds) {
// Compact threshold: if a client has more than this many messages in a single
// outgoing buffer that all arrived within COMPACT_WINDOW_US, pack them into
// a single CompactedMessage frame to reduce ws_sendframe_bin syscall overhead.
#define COMPACT_THRESHOLD 4
#define COMPACT_WINDOW_US 100000ULL // 100 ms
static void compact_outgoing_buffer(SizedPtr* buf)
{
OutgoingMessage* msgs = (OutgoingMessage*)buf->ptr;
size_t n = buf->size;
if (n <= COMPACT_THRESHOLD) return;
// Collect unique client IDs present in the buffer (stack-allocated; n is small).
ws_cli_conn_t clients[n];
size_t client_count = 0;
for (size_t i = 0; i < n; i++) {
if (msgs[i].msg == NULL) continue;
ws_cli_conn_t cid = msgs[i].clientIdx;
uint8_t found = 0;
for (size_t j = 0; j < client_count; j++) {
if (clients[j] == cid) { found = 1; break; }
}
if (!found) clients[client_count++] = cid;
}
for (size_t ci = 0; ci < client_count; ci++) {
ws_cli_conn_t cid = clients[ci];
// Gather indices, timestamps for this client.
size_t idxs[n];
size_t cnt = 0;
uint64_t first_us = UINT64_MAX, last_us = 0;
for (size_t i = 0; i < n; i++) {
if (msgs[i].msg == NULL || msgs[i].clientIdx != cid) continue;
idxs[cnt++] = i;
if (msgs[i].dispatch_us < first_us) first_us = msgs[i].dispatch_us;
if (msgs[i].dispatch_us > last_us) last_us = msgs[i].dispatch_us;
}
// Only compact if: more than threshold AND all arrived within 100ms window.
if (cnt <= COMPACT_THRESHOLD) continue;
if (last_us - first_us > COMPACT_WINDOW_US) continue;
// Collect pointers for the builder (it copies the bytes internally).
uint8_t* ptrs[cnt];
size_t lens[cnt];
for (size_t i = 0; i < cnt; i++) {
ptrs[i] = msgs[idxs[i]].msg;
lens[i] = msgs[idxs[i]].msgLen;
}
size_t cm_len = 0;
uint8_t* cm = fb_build_compacted_message(
(uint8_t* const*)ptrs, lens, cnt, &cm_len);
if (cm == NULL) continue; // builder failed; send individually
// Replace the first slot with the compacted frame; free and null the rest.
free(msgs[idxs[0]].msg);
msgs[idxs[0]].msg = cm;
msgs[idxs[0]].msgLen = cm_len;
for (size_t i = 1; i < cnt; i++) {
free(msgs[idxs[i]].msg);
msgs[idxs[i]].msg = NULL; // sentinel: skip in send loop
}
}
}
void* outgoingMain(void* args) void* outgoingMain(void* args)
{ {
OutgoingBuffers* outBufs = args; OutgoingBuffers* outBufs = args;
@@ -175,19 +243,20 @@ void* outgoingMain(void* args)
{ {
if(curBuf != NULL) if(curBuf != NULL)
{ {
compact_outgoing_buffer(curBuf);
OutgoingMessage* messages = curBuf->ptr; OutgoingMessage* messages = curBuf->ptr;
for(size_t i = 0; i < curBuf->size; i++) for(size_t i = 0; i < curBuf->size; i++)
{ {
// my_sleep(100000);
OutgoingMessage *outMsg = &messages[i]; OutgoingMessage *outMsg = &messages[i];
if(outMsg->msg == NULL) if(outMsg->msg == NULL)
{ {
panic("Got double read on buf %d while writing on buf %d\n", currBufIdx, currWritingIdxPtr); // Slot was merged into a CompactedMessage; skip.
continue;
} }
ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen);
free(outMsg->msg); free(outMsg->msg);
outMsg->msg = NULL; outMsg->msg = NULL;
// printf("done free on buf %d message %lu\n", currBufIdx, i);
} }
curBuf->size = 0; curBuf->size = 0;
atomic_store(&outBufs->readRequestIdx, currBufIdx); atomic_store(&outBufs->readRequestIdx, currBufIdx);
@@ -205,7 +274,7 @@ void* outgoingMain(void* args)
atomic_store(&outBufs->readRequestIdx, currBufIdx); atomic_store(&outBufs->readRequestIdx, currBufIdx);
if(curBuf == NULL) if(curBuf == NULL)
{ {
my_sleep(100); // my_sleep(1);
} }
else else
{ {
@@ -315,9 +384,41 @@ uint64_t getCurrentUsec()
return (1000000 * (uint64_t)tv.tv_sec) + (uint64_t)tv.tv_usec; return (1000000 * (uint64_t)tv.tv_sec) + (uint64_t)tv.tv_usec;
} }
static void print_usage(const char* progname)
{
fprintf(stderr, "Usage: %s [--port PORT] [--token TOKEN] [--host HOST]\n", progname);
fprintf(stderr, " --port PORT WebSocket listen port (default: 8181)\n");
fprintf(stderr, " --token TOKEN Access token for client auth\n");
fprintf(stderr, " --host HOST Listen host (default: localhost)\n");
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
(void)argc; (void)argv; uint16_t cli_port = 8181;
const char* cli_host = "localhost";
const char* cli_token = NULL;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) {
long p = strtol(argv[++i], NULL, 10);
if (p <= 0 || p > 65535) {
fprintf(stderr, "Invalid port: %s\n", argv[i]);
return 1;
}
cli_port = (uint16_t)p;
} else if (strcmp(argv[i], "--token") == 0 && i + 1 < argc) {
cli_token = argv[++i];
} else if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) {
cli_host = argv[++i];
} else if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0) {
print_usage(argv[0]);
return 0;
} else {
fprintf(stderr, "Unknown argument: %s\n", argv[i]);
print_usage(argv[0]);
return 1;
}
}
char errbuf[1024]; char errbuf[1024];
pthread_mutex_t mtx; pthread_mutex_t mtx;
pthread_mutex_init(&mtx, NULL); pthread_mutex_init(&mtx, NULL);
@@ -381,19 +482,34 @@ int main(int argc, char** argv)
NULL, /* interceptCtxs */ NULL, /* interceptCtxs */
0, /* interceptCtxCount */ 0, /* interceptCtxCount */
{0}, /* stream_builder — initialized below */ {0}, /* stream_builder — initialized below */
0, 0, /* simRateLimit */
NULL, /* deviceIdMappingMsg */
0, /* deviceIdMappingMsgLen */
}; };
if (flatcc_builder_init(&emulContext.stream_builder)) { if (flatcc_builder_init(&emulContext.stream_builder)) {
panic("flatcc_builder_init failed\n"); panic("flatcc_builder_init failed\n");
} }
uint8_t access_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63";
uint8_t* access_token = default_token;
/* If --token was given, copy it into a mutable buffer */
uint8_t* cli_token_buf = NULL;
if (cli_token != NULL) {
size_t tlen = strlen(cli_token) + 1;
cli_token_buf = malloc(tlen);
if (cli_token_buf == NULL) {
fprintf(stderr, "Failed to allocate token buffer\n");
return 1;
}
memcpy(cli_token_buf, cli_token, tlen);
access_token = cli_token_buf;
}
ServerContext ctx = { ServerContext ctx = {
mtx, mtx,
regQ, regQ,
(uint8_t*)&access_token, access_token,
&emulContext, &emulContext,
}; };
@@ -406,8 +522,8 @@ int main(int argc, char** argv)
ws_socket(&(struct ws_server){ ws_socket(&(struct ws_server){
.host = "localhost", .host = cli_host,
.port = 8181, .port = cli_port,
.thread_loop = 1, .thread_loop = 1,
.timeout_ms = 1000, .timeout_ms = 1000,
.context = &ctx, .context = &ctx,
@@ -418,7 +534,7 @@ int main(int argc, char** argv)
ptQueueElem* regQueueTail = regQ->tail; ptQueueElem* regQueueTail = regQ->tail;
uint8_t clients_try_timer = 100; uint8_t clients_try_timer = 1;
uint64_t lastTickAt = getCurrentUsec(); uint64_t lastTickAt = getCurrentUsec();
@@ -437,7 +553,7 @@ int main(int argc, char** argv)
if (clients_try_timer == 0) if (clients_try_timer == 0)
{ {
handleAllClients(&emulContext); handleAllClients(&emulContext);
clients_try_timer = 100; clients_try_timer = 1;
} }
else else
{ {
@@ -465,8 +581,11 @@ int main(int argc, char** argv)
} }
uint64_t now = emulContext.simRateLimit > 0? getCurrentUsec() : 0; uint64_t now = emulContext.simRateLimit > 0? getCurrentUsec() : 0;
if(emulState == EMUL_STATE_STOP)
if(emulState == EMUL_STATE_EXEC && emulContext.devicesCount > 0 && (emulContext.simRateLimit == 0 || now - lastTickAt >= 1000000 / emulContext.simRateLimit)) {
clockCounter = 0;
}
else if(emulState == EMUL_STATE_EXEC && emulContext.devicesCount > 0 && (emulContext.simRateLimit == 0 || now - lastTickAt >= 1000000 / emulContext.simRateLimit))
{ {
for (size_t di = 0; di < emulContext.devicesCount; di++) for (size_t di = 0; di < emulContext.devicesCount; di++)
{ {

View File

@@ -1,11 +1,19 @@
#include "proto/dial.h" #include "proto/dial.h"
#include <string.h> #include <string.h>
#include <sys/time.h>
#include "context.h" #include "context.h"
#include "panic.h" #include "panic.h"
#include "proto/msg.h" #include "proto/msg.h"
static uint64_t dial_now_us(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t)tv.tv_sec * 1000000ULL + (uint64_t)tv.tv_usec;
}
void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen)
{ {
@@ -42,5 +50,6 @@ void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx,
outmsg->msg = msg; outmsg->msg = msg;
outmsg->msgLen = msgLen; outmsg->msgLen = msgLen;
outmsg->clientIdx = clientIdx; outmsg->clientIdx = clientIdx;
outmsg->dispatch_us = dial_now_us();
p->size++; p->size++;
} }

View File

@@ -1,6 +1,7 @@
#include "proto/handlers/auth.h" #include "proto/handlers/auth.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include <openssl/sha.h> #include <openssl/sha.h>
@@ -141,4 +142,15 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext)
uint8_t* stateMsg = fb_build_exec_notify( uint8_t* stateMsg = fb_build_exec_notify(
UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len); UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len);
dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, stateMsg, len); dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, stateMsg, len);
// Send cached DeviceIdMappingNotif if a config has been loaded
if (emulContext->deviceIdMappingMsg && emulContext->deviceIdMappingMsgLen > 0) {
uint8_t* mappingCopy = malloc(emulContext->deviceIdMappingMsgLen);
if (mappingCopy) {
memcpy(mappingCopy, emulContext->deviceIdMappingMsg, emulContext->deviceIdMappingMsgLen);
dispatchOutgoingMessage(
emulContext->outBufs, cctx->clientId,
mappingCopy, emulContext->deviceIdMappingMsgLen);
}
}
} }

View File

@@ -17,6 +17,7 @@
#define MAX_DEVICES 64 #define MAX_DEVICES 64
#define MAX_DEPTH 16 #define MAX_DEPTH 16
#define MAX_SEGMENTS 64
// Converts a FlatBuffer BaseDeviceConfig into a heap-allocated conf_dev_t // Converts a FlatBuffer BaseDeviceConfig into a heap-allocated conf_dev_t
@@ -75,10 +76,10 @@ typedef struct {
const char* path_stack[MAX_DEPTH]; // current path components const char* path_stack[MAX_DEPTH]; // current path components
size_t depth; size_t depth;
// For building the DeviceIdMappingNotif response: // For building the DeviceIdMappingNotif response:
// store copies of hierarchical path per device char** paths[MAX_DEVICES]; // hierarchical path per device (heap-alloc'd)
char** paths[MAX_DEVICES]; // each is a NULL-terminated string array size_t path_lens[MAX_DEVICES]; // number of path components per device
size_t path_lens[MAX_DEVICES]; // number of components per path
size_t seg_counts[MAX_DEVICES]; // number of memory segments per device size_t seg_counts[MAX_DEVICES]; // number of memory segments per device
char* seg_names[MAX_DEVICES][MAX_SEGMENTS]; // segment names (heap-alloc'd)
size_t count; size_t count;
} LoadState; } LoadState;
@@ -155,11 +156,18 @@ static int load_devices_recursive(
size_t idx = st->count; size_t idx = st->count;
st->handles[idx] = dev; st->handles[idx] = dev;
// Store segment count from FlatBuffer config // Store segment count and names from FlatBuffer config
hmmmm_config_MemSegment_vec_t dev_segs = hmmmm_config_MemSegment_vec_t dev_segs =
hmmmm_config_BaseDeviceConfig_mem_segments(base); hmmmm_config_BaseDeviceConfig_mem_segments(base);
st->seg_counts[idx] = dev_segs size_t nseg = dev_segs ? hmmmm_config_MemSegment_vec_len(dev_segs) : 0;
? hmmmm_config_MemSegment_vec_len(dev_segs) : 0; if (nseg > MAX_SEGMENTS) nseg = MAX_SEGMENTS;
st->seg_counts[idx] = nseg;
for (size_t si = 0; si < nseg; si++) {
hmmmm_config_MemSegment_table_t seg =
hmmmm_config_MemSegment_vec_at(dev_segs, si);
flatbuffers_string_t sname = hmmmm_config_MemSegment_name(seg);
st->seg_names[idx][si] = strdup(sname ? sname : "");
}
// Copy path // Copy path
st->path_lens[idx] = st->depth; st->path_lens[idx] = st->depth;
@@ -203,6 +211,9 @@ static void free_load_state_paths(LoadState* st)
} }
free(st->paths[i]); free(st->paths[i]);
} }
for (size_t si = 0; si < st->seg_counts[i]; si++) {
free(st->seg_names[i][si]);
}
} }
} }
@@ -489,6 +500,12 @@ static void free_old_config(EmulContext* emulContext)
emulContext->deviceStreamRegs = NULL; emulContext->deviceStreamRegs = NULL;
emulContext->devicesCount = 0; emulContext->devicesCount = 0;
emulContext->simRateLimit = 0; emulContext->simRateLimit = 0;
if (emulContext->deviceIdMappingMsg) {
free(emulContext->deviceIdMappingMsg);
emulContext->deviceIdMappingMsg = NULL;
emulContext->deviceIdMappingMsgLen = 0;
}
} }
@@ -502,9 +519,9 @@ void handleConfigCtrlMessage(
char errbuf[1024]; char errbuf[1024];
// Must be in STILL state // Block config load only while actively executing (ticking)
if (*emulContext->emulState != EMUL_STATE_STILL) { if (*emulContext->emulState == EMUL_STATE_EXEC) {
snprintf(errbuf, 1024, "config load requires STILL state"); snprintf(errbuf, 1024, "stop or pause the emulator before loading config");
printf("[CTRL/CONFIG] error: %s\n", errbuf); printf("[CTRL/CONFIG] error: %s\n", errbuf);
size_t msg_len; size_t msg_len;
@@ -674,12 +691,31 @@ void handleConfigCtrlMessage(
emulContext->deviceStreamRegs[i] = dsr; emulContext->deviceStreamRegs[i] = dsr;
} }
// Build DeviceIdMappingNotif response // Reset emulation state to STILL and notify all clients
if (*emulContext->emulState != EMUL_STATE_STILL) {
*emulContext->emulState = EMUL_STATE_STILL;
size_t notify_len;
uint8_t* notify = fb_build_exec_notify(0, *emulContext->clockCounter, EMUL_STATE_STILL, &notify_len);
broadcastClients(emulContext, notify, notify_len);
}
// Build DeviceIdMappingNotif and broadcast to all connected clients
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_device_id_mapping( uint8_t* out = fb_build_config_device_id_mapping(
nonce, st.paths, st.path_lens, dc, &msg_len); nonce, st.paths, st.path_lens, st.seg_names, st.seg_counts, dc, &msg_len);
free_load_state_paths(&st); free_load_state_paths(&st);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); // Cache a copy for newly connecting clients
if (emulContext->deviceIdMappingMsg) {
free(emulContext->deviceIdMappingMsg);
}
emulContext->deviceIdMappingMsg = malloc(msg_len);
if (emulContext->deviceIdMappingMsg) {
memcpy(emulContext->deviceIdMappingMsg, out, msg_len);
emulContext->deviceIdMappingMsgLen = msg_len;
}
// Broadcast to all clients (broadcastClients takes ownership of out)
broadcastClients(emulContext, out, msg_len);
} }

View File

@@ -8,6 +8,7 @@
#include "auth_builder.h" #include "auth_builder.h"
#include "stream_builder.h" #include "stream_builder.h"
#include "stream_data_builder.h" #include "stream_data_builder.h"
#include "compacted_builder.h"
uint8_t* fb_build_auth_response(uint64_t nonce, uint64_t seat_id, size_t* len_out) uint8_t* fb_build_auth_response(uint64_t nonce, uint64_t seat_id, size_t* len_out)
@@ -253,6 +254,7 @@ uint8_t* fb_build_stream_reg_confirm(
uint8_t* fb_build_config_device_id_mapping( uint8_t* fb_build_config_device_id_mapping(
uint64_t nonce, uint64_t nonce,
char** paths[], size_t path_lens[], char** paths[], size_t path_lens[],
char* seg_names[][64], size_t seg_counts[],
size_t device_count, size_t* len_out) size_t device_count, size_t* len_out)
{ {
flatcc_builder_t B; flatcc_builder_t B;
@@ -272,8 +274,20 @@ uint8_t* fb_build_config_device_id_mapping(
} }
flatbuffers_string_vec_ref_t path_vec = flatbuffers_string_vec_end(&B); flatbuffers_string_vec_ref_t path_vec = flatbuffers_string_vec_end(&B);
// Empty seg_ids vector // Build SegIdEntry refs for this device's segments
size_t nseg = seg_counts[i];
hmmmm_ctrl_config_notif_SegIdEntry_ref_t seg_refs[nseg];
for (size_t si = 0; si < nseg; si++) {
flatbuffers_string_ref_t name_ref =
flatbuffers_string_create_str(&B, seg_names[i][si] ? seg_names[i][si] : "");
seg_refs[si] = hmmmm_ctrl_config_notif_SegIdEntry_create(
&B, name_ref, (uint32_t)si);
}
hmmmm_ctrl_config_notif_SegIdEntry_vec_start(&B); hmmmm_ctrl_config_notif_SegIdEntry_vec_start(&B);
for (size_t si = 0; si < nseg; si++) {
hmmmm_ctrl_config_notif_SegIdEntry_vec_push(&B, seg_refs[si]);
}
hmmmm_ctrl_config_notif_SegIdEntry_vec_ref_t seg_ids_vec = hmmmm_ctrl_config_notif_SegIdEntry_vec_ref_t seg_ids_vec =
hmmmm_ctrl_config_notif_SegIdEntry_vec_end(&B); hmmmm_ctrl_config_notif_SegIdEntry_vec_end(&B);
@@ -350,3 +364,41 @@ uint8_t* fb_build_config_error(uint64_t nonce, const char* message, size_t* len_
NULL_GUARD(buf); NULL_GUARD(buf);
return buf; return buf;
} }
uint8_t* fb_build_compacted_message(
uint8_t* const* msgs, const size_t* msg_lens, size_t count, size_t* len_out)
{
flatcc_builder_t B;
if (flatcc_builder_init(&B)) {
panic("flatcc_builder_init failed\n");
}
// Build CompactedFrame refs bottom-up (FlatBuffers requires nested objects
// to be created before the table that references them).
hmmmm_CompactedFrame_ref_t frame_refs[count];
for (size_t i = 0; i < count; i++) {
flatbuffers_uint8_vec_ref_t data_vec =
flatbuffers_uint8_vec_create(&B, msgs[i], msg_lens[i]);
frame_refs[i] = hmmmm_CompactedFrame_create(&B, data_vec);
}
hmmmm_CompactedFrame_vec_start(&B);
for (size_t i = 0; i < count; i++) {
hmmmm_CompactedFrame_vec_push(&B, frame_refs[i]);
}
hmmmm_CompactedFrame_vec_ref_t frames_vec = hmmmm_CompactedFrame_vec_end(&B);
hmmmm_CompactedMessage_ref_t cm = hmmmm_CompactedMessage_create(&B, frames_vec);
hmmmm_ServerPayload_union_ref_t payload =
hmmmm_ServerPayload_as_CompactedMessage(cm);
// UINT64_MAX = no-nonce (fire-and-forget batch frame)
hmmmm_ServerMessage_create_as_root(&B, UINT64_MAX, payload);
uint8_t* buf = flatcc_builder_finalize_buffer(&B, len_out);
flatcc_builder_clear(&B);
NULL_GUARD(buf);
return buf;
}