diff --git a/Makefile b/Makefile index 62fc4aa..9b96003 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE) SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/ STATIC_LIBS=crypto STANDART=c23 -OPTIMIZE=-Og +OPTIMIZE=-O3 TARGET=main FLATCC = deps/flatcc/bin/flatcc diff --git a/flatbuffers/compacted.fbs b/flatbuffers/compacted.fbs new file mode 100644 index 0000000..446fbb1 --- /dev/null +++ b/flatbuffers/compacted.fbs @@ -0,0 +1,12 @@ +namespace hmmmm; + +// A single serialized ServerMessage frame packed into a byte vector. +table CompactedFrame { + data: [ubyte]; +} + +// A batch of ServerMessage frames sent as a single WebSocket frame to reduce +// per-frame syscall overhead when message rate exceeds the batching threshold. +table CompactedMessage { + frames: [CompactedFrame]; +} diff --git a/flatbuffers/proto.fbs b/flatbuffers/proto.fbs index a248c77..eb659c1 100644 --- a/flatbuffers/proto.fbs +++ b/flatbuffers/proto.fbs @@ -2,6 +2,7 @@ include "auth/auth.fbs"; include "control/control.fbs"; include "stream/stream.fbs"; include "mem/mem.fbs"; +include "compacted.fbs"; namespace hmmmm; @@ -29,6 +30,7 @@ union ServerPayload { hmmmm.ctrl.CtrlServerMessage, hmmmm.stream.StreamServerMessage, hmmmm.mem.MemServerMessage, + CompactedMessage, } // Every frame sent by the server is a ServerMessage. diff --git a/inc/context.h b/inc/context.h index e47e21e..9d9d67d 100644 --- a/inc/context.h +++ b/inc/context.h @@ -38,6 +38,9 @@ typedef struct { size_t interceptCtxCount; flatcc_builder_t stream_builder; uint64_t simRateLimit; + // Cached DeviceIdMappingNotif broadcast message (sent to newly authed clients) + uint8_t* deviceIdMappingMsg; + size_t deviceIdMappingMsgLen; } EmulContext; diff --git a/inc/proto/msg.h b/inc/proto/msg.h index 0c37827..002f9e7 100644 --- a/inc/proto/msg.h +++ b/inc/proto/msg.h @@ -16,6 +16,7 @@ typedef struct { ws_cli_conn_t clientIdx; uint8_t* msg; size_t msgLen; + uint64_t dispatch_us; // microsecond timestamp when queued (for batching window) } OutgoingMessage; // Build a ServerMessage{AuthResponse} frame. Returns a malloc'd buffer; @@ -59,12 +60,21 @@ uint8_t* fb_build_stream_reg_confirm( // Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{DeviceIdMappingNotif}}} frame. // paths: array of NULL-terminated string arrays; path_lens: component count per path. +// seg_names[i]: array of seg_counts[i] segment name strings for device i. uint8_t* fb_build_config_device_id_mapping( uint64_t nonce, char** paths[], size_t path_lens[], + char* seg_names[][64], size_t seg_counts[], size_t device_count, size_t* len_out); // Build a ServerMessage{CtrlServerMessage{ConfigNotifMessage{ConfigLoadError}}} frame. uint8_t* fb_build_config_error(uint64_t nonce, const char* message, size_t* len_out); +// Build a ServerMessage{CompactedMessage{frames=[...]}} frame. +// msgs / msg_lens: array of `count` pre-built ServerMessage byte buffers to pack. +// The caller retains ownership of the source buffers; this function copies their bytes. +// Nonce is set to UINT64_MAX (no-nonce / fire-and-forget). +uint8_t* fb_build_compacted_message( + uint8_t* const* msgs, const size_t* msg_lens, size_t count, size_t* len_out); + #endif // __PROTO_MSG_H__ diff --git a/src/main.c b/src/main.c index af243e4..f76f5e1 100644 --- a/src/main.c +++ b/src/main.c @@ -164,6 +164,74 @@ void my_sleep(int microseconds) { +// Compact threshold: if a client has more than this many messages in a single +// outgoing buffer that all arrived within COMPACT_WINDOW_US, pack them into +// a single CompactedMessage frame to reduce ws_sendframe_bin syscall overhead. +#define COMPACT_THRESHOLD 4 +#define COMPACT_WINDOW_US 100000ULL // 100 ms + +static void compact_outgoing_buffer(SizedPtr* buf) +{ + OutgoingMessage* msgs = (OutgoingMessage*)buf->ptr; + size_t n = buf->size; + if (n <= COMPACT_THRESHOLD) return; + + // Collect unique client IDs present in the buffer (stack-allocated; n is small). + ws_cli_conn_t clients[n]; + size_t client_count = 0; + for (size_t i = 0; i < n; i++) { + if (msgs[i].msg == NULL) continue; + ws_cli_conn_t cid = msgs[i].clientIdx; + uint8_t found = 0; + for (size_t j = 0; j < client_count; j++) { + if (clients[j] == cid) { found = 1; break; } + } + if (!found) clients[client_count++] = cid; + } + + for (size_t ci = 0; ci < client_count; ci++) { + ws_cli_conn_t cid = clients[ci]; + + // Gather indices, timestamps for this client. + size_t idxs[n]; + size_t cnt = 0; + uint64_t first_us = UINT64_MAX, last_us = 0; + for (size_t i = 0; i < n; i++) { + if (msgs[i].msg == NULL || msgs[i].clientIdx != cid) continue; + idxs[cnt++] = i; + if (msgs[i].dispatch_us < first_us) first_us = msgs[i].dispatch_us; + if (msgs[i].dispatch_us > last_us) last_us = msgs[i].dispatch_us; + } + + // Only compact if: more than threshold AND all arrived within 100ms window. + if (cnt <= COMPACT_THRESHOLD) continue; + if (last_us - first_us > COMPACT_WINDOW_US) continue; + + // Collect pointers for the builder (it copies the bytes internally). + uint8_t* ptrs[cnt]; + size_t lens[cnt]; + for (size_t i = 0; i < cnt; i++) { + ptrs[i] = msgs[idxs[i]].msg; + lens[i] = msgs[idxs[i]].msgLen; + } + + size_t cm_len = 0; + uint8_t* cm = fb_build_compacted_message( + (uint8_t* const*)ptrs, lens, cnt, &cm_len); + if (cm == NULL) continue; // builder failed; send individually + + // Replace the first slot with the compacted frame; free and null the rest. + free(msgs[idxs[0]].msg); + msgs[idxs[0]].msg = cm; + msgs[idxs[0]].msgLen = cm_len; + for (size_t i = 1; i < cnt; i++) { + free(msgs[idxs[i]].msg); + msgs[idxs[i]].msg = NULL; // sentinel: skip in send loop + } + } +} + + void* outgoingMain(void* args) { OutgoingBuffers* outBufs = args; @@ -175,19 +243,20 @@ void* outgoingMain(void* args) { if(curBuf != NULL) { + compact_outgoing_buffer(curBuf); + OutgoingMessage* messages = curBuf->ptr; for(size_t i = 0; i < curBuf->size; i++) { - // my_sleep(100000); OutgoingMessage *outMsg = &messages[i]; if(outMsg->msg == NULL) { - panic("Got double read on buf %d while writing on buf %d\n", currBufIdx, currWritingIdxPtr); + // Slot was merged into a CompactedMessage; skip. + continue; } ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); free(outMsg->msg); outMsg->msg = NULL; - // printf("done free on buf %d message %lu\n", currBufIdx, i); } curBuf->size = 0; atomic_store(&outBufs->readRequestIdx, currBufIdx); @@ -205,7 +274,7 @@ void* outgoingMain(void* args) atomic_store(&outBufs->readRequestIdx, currBufIdx); if(curBuf == NULL) { - my_sleep(100); + // my_sleep(1); } else { @@ -315,9 +384,41 @@ uint64_t getCurrentUsec() return (1000000 * (uint64_t)tv.tv_sec) + (uint64_t)tv.tv_usec; } +static void print_usage(const char* progname) +{ + fprintf(stderr, "Usage: %s [--port PORT] [--token TOKEN] [--host HOST]\n", progname); + fprintf(stderr, " --port PORT WebSocket listen port (default: 8181)\n"); + fprintf(stderr, " --token TOKEN Access token for client auth\n"); + fprintf(stderr, " --host HOST Listen host (default: localhost)\n"); +} + int main(int argc, char** argv) { - (void)argc; (void)argv; + uint16_t cli_port = 8181; + const char* cli_host = "localhost"; + const char* cli_token = NULL; + + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) { + long p = strtol(argv[++i], NULL, 10); + if (p <= 0 || p > 65535) { + fprintf(stderr, "Invalid port: %s\n", argv[i]); + return 1; + } + cli_port = (uint16_t)p; + } else if (strcmp(argv[i], "--token") == 0 && i + 1 < argc) { + cli_token = argv[++i]; + } else if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) { + cli_host = argv[++i]; + } else if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-h") == 0) { + print_usage(argv[0]); + return 0; + } else { + fprintf(stderr, "Unknown argument: %s\n", argv[i]); + print_usage(argv[0]); + return 1; + } + } char errbuf[1024]; pthread_mutex_t mtx; pthread_mutex_init(&mtx, NULL); @@ -380,20 +481,35 @@ int main(int argc, char** argv) deviceHandlesArr, NULL, /* interceptCtxs */ 0, /* interceptCtxCount */ - {0}, /* stream_builder — initialized below */ - 0, + {0}, /* stream_builder — initialized below */ + 0, /* simRateLimit */ + NULL, /* deviceIdMappingMsg */ + 0, /* deviceIdMappingMsgLen */ }; if (flatcc_builder_init(&emulContext.stream_builder)) { panic("flatcc_builder_init failed\n"); } - uint8_t access_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + uint8_t* access_token = default_token; + /* If --token was given, copy it into a mutable buffer */ + uint8_t* cli_token_buf = NULL; + if (cli_token != NULL) { + size_t tlen = strlen(cli_token) + 1; + cli_token_buf = malloc(tlen); + if (cli_token_buf == NULL) { + fprintf(stderr, "Failed to allocate token buffer\n"); + return 1; + } + memcpy(cli_token_buf, cli_token, tlen); + access_token = cli_token_buf; + } ServerContext ctx = { mtx, regQ, - (uint8_t*)&access_token, + access_token, &emulContext, }; @@ -406,8 +522,8 @@ int main(int argc, char** argv) ws_socket(&(struct ws_server){ - .host = "localhost", - .port = 8181, + .host = cli_host, + .port = cli_port, .thread_loop = 1, .timeout_ms = 1000, .context = &ctx, @@ -418,7 +534,7 @@ int main(int argc, char** argv) ptQueueElem* regQueueTail = regQ->tail; - uint8_t clients_try_timer = 100; + uint8_t clients_try_timer = 1; uint64_t lastTickAt = getCurrentUsec(); @@ -437,7 +553,7 @@ int main(int argc, char** argv) if (clients_try_timer == 0) { handleAllClients(&emulContext); - clients_try_timer = 100; + clients_try_timer = 1; } else { @@ -465,8 +581,11 @@ int main(int argc, char** argv) } uint64_t now = emulContext.simRateLimit > 0? getCurrentUsec() : 0; - - if(emulState == EMUL_STATE_EXEC && emulContext.devicesCount > 0 && (emulContext.simRateLimit == 0 || now - lastTickAt >= 1000000 / emulContext.simRateLimit)) + if(emulState == EMUL_STATE_STOP) + { + clockCounter = 0; + } + else if(emulState == EMUL_STATE_EXEC && emulContext.devicesCount > 0 && (emulContext.simRateLimit == 0 || now - lastTickAt >= 1000000 / emulContext.simRateLimit)) { for (size_t di = 0; di < emulContext.devicesCount; di++) { diff --git a/src/proto/dial.c b/src/proto/dial.c index 3a6228b..6d6fc29 100644 --- a/src/proto/dial.c +++ b/src/proto/dial.c @@ -1,11 +1,19 @@ #include "proto/dial.h" #include +#include #include "context.h" #include "panic.h" #include "proto/msg.h" +static uint64_t dial_now_us(void) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint64_t)tv.tv_sec * 1000000ULL + (uint64_t)tv.tv_usec; +} + void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) { @@ -42,5 +50,6 @@ void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, outmsg->msg = msg; outmsg->msgLen = msgLen; outmsg->clientIdx = clientIdx; + outmsg->dispatch_us = dial_now_us(); p->size++; } \ No newline at end of file diff --git a/src/proto/handlers/auth.c b/src/proto/handlers/auth.c index a379206..68912a2 100644 --- a/src/proto/handlers/auth.c +++ b/src/proto/handlers/auth.c @@ -1,6 +1,7 @@ #include "proto/handlers/auth.h" #include +#include #include #include #include @@ -141,4 +142,15 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext) uint8_t* stateMsg = fb_build_exec_notify( UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len); dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, stateMsg, len); + + // Send cached DeviceIdMappingNotif if a config has been loaded + if (emulContext->deviceIdMappingMsg && emulContext->deviceIdMappingMsgLen > 0) { + uint8_t* mappingCopy = malloc(emulContext->deviceIdMappingMsgLen); + if (mappingCopy) { + memcpy(mappingCopy, emulContext->deviceIdMappingMsg, emulContext->deviceIdMappingMsgLen); + dispatchOutgoingMessage( + emulContext->outBufs, cctx->clientId, + mappingCopy, emulContext->deviceIdMappingMsgLen); + } + } } diff --git a/src/proto/handlers/config.c b/src/proto/handlers/config.c index 2e1d015..f79d9d7 100644 --- a/src/proto/handlers/config.c +++ b/src/proto/handlers/config.c @@ -15,8 +15,9 @@ #include "device_reader.h" #include "mem_config_reader.h" -#define MAX_DEVICES 64 -#define MAX_DEPTH 16 +#define MAX_DEVICES 64 +#define MAX_DEPTH 16 +#define MAX_SEGMENTS 64 // Converts a FlatBuffer BaseDeviceConfig into a heap-allocated conf_dev_t @@ -75,10 +76,10 @@ typedef struct { const char* path_stack[MAX_DEPTH]; // current path components size_t depth; // For building the DeviceIdMappingNotif response: - // store copies of hierarchical path per device - char** paths[MAX_DEVICES]; // each is a NULL-terminated string array - size_t path_lens[MAX_DEVICES]; // number of components per path - size_t seg_counts[MAX_DEVICES]; // number of memory segments per device + char** paths[MAX_DEVICES]; // hierarchical path per device (heap-alloc'd) + size_t path_lens[MAX_DEVICES]; // number of path components per device + size_t seg_counts[MAX_DEVICES]; // number of memory segments per device + char* seg_names[MAX_DEVICES][MAX_SEGMENTS]; // segment names (heap-alloc'd) size_t count; } LoadState; @@ -155,11 +156,18 @@ static int load_devices_recursive( size_t idx = st->count; st->handles[idx] = dev; - // Store segment count from FlatBuffer config + // Store segment count and names from FlatBuffer config hmmmm_config_MemSegment_vec_t dev_segs = hmmmm_config_BaseDeviceConfig_mem_segments(base); - st->seg_counts[idx] = dev_segs - ? hmmmm_config_MemSegment_vec_len(dev_segs) : 0; + size_t nseg = dev_segs ? hmmmm_config_MemSegment_vec_len(dev_segs) : 0; + if (nseg > MAX_SEGMENTS) nseg = MAX_SEGMENTS; + st->seg_counts[idx] = nseg; + for (size_t si = 0; si < nseg; si++) { + hmmmm_config_MemSegment_table_t seg = + hmmmm_config_MemSegment_vec_at(dev_segs, si); + flatbuffers_string_t sname = hmmmm_config_MemSegment_name(seg); + st->seg_names[idx][si] = strdup(sname ? sname : ""); + } // Copy path st->path_lens[idx] = st->depth; @@ -203,6 +211,9 @@ static void free_load_state_paths(LoadState* st) } free(st->paths[i]); } + for (size_t si = 0; si < st->seg_counts[i]; si++) { + free(st->seg_names[i][si]); + } } } @@ -489,6 +500,12 @@ static void free_old_config(EmulContext* emulContext) emulContext->deviceStreamRegs = NULL; emulContext->devicesCount = 0; emulContext->simRateLimit = 0; + + if (emulContext->deviceIdMappingMsg) { + free(emulContext->deviceIdMappingMsg); + emulContext->deviceIdMappingMsg = NULL; + emulContext->deviceIdMappingMsgLen = 0; + } } @@ -502,9 +519,9 @@ void handleConfigCtrlMessage( char errbuf[1024]; - // Must be in STILL state - if (*emulContext->emulState != EMUL_STATE_STILL) { - snprintf(errbuf, 1024, "config load requires STILL state"); + // Block config load only while actively executing (ticking) + if (*emulContext->emulState == EMUL_STATE_EXEC) { + snprintf(errbuf, 1024, "stop or pause the emulator before loading config"); printf("[CTRL/CONFIG] error: %s\n", errbuf); size_t msg_len; @@ -674,12 +691,31 @@ void handleConfigCtrlMessage( emulContext->deviceStreamRegs[i] = dsr; } - // Build DeviceIdMappingNotif response + // Reset emulation state to STILL and notify all clients + if (*emulContext->emulState != EMUL_STATE_STILL) { + *emulContext->emulState = EMUL_STATE_STILL; + size_t notify_len; + uint8_t* notify = fb_build_exec_notify(0, *emulContext->clockCounter, EMUL_STATE_STILL, ¬ify_len); + broadcastClients(emulContext, notify, notify_len); + } + + // Build DeviceIdMappingNotif and broadcast to all connected clients size_t msg_len; uint8_t* out = fb_build_config_device_id_mapping( - nonce, st.paths, st.path_lens, dc, &msg_len); + nonce, st.paths, st.path_lens, st.seg_names, st.seg_counts, dc, &msg_len); free_load_state_paths(&st); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + // Cache a copy for newly connecting clients + if (emulContext->deviceIdMappingMsg) { + free(emulContext->deviceIdMappingMsg); + } + emulContext->deviceIdMappingMsg = malloc(msg_len); + if (emulContext->deviceIdMappingMsg) { + memcpy(emulContext->deviceIdMappingMsg, out, msg_len); + emulContext->deviceIdMappingMsgLen = msg_len; + } + + // Broadcast to all clients (broadcastClients takes ownership of out) + broadcastClients(emulContext, out, msg_len); } diff --git a/src/proto/msg.c b/src/proto/msg.c index 5e25112..3106eaf 100644 --- a/src/proto/msg.c +++ b/src/proto/msg.c @@ -8,6 +8,7 @@ #include "auth_builder.h" #include "stream_builder.h" #include "stream_data_builder.h" +#include "compacted_builder.h" uint8_t* fb_build_auth_response(uint64_t nonce, uint64_t seat_id, size_t* len_out) @@ -253,6 +254,7 @@ uint8_t* fb_build_stream_reg_confirm( uint8_t* fb_build_config_device_id_mapping( uint64_t nonce, char** paths[], size_t path_lens[], + char* seg_names[][64], size_t seg_counts[], size_t device_count, size_t* len_out) { flatcc_builder_t B; @@ -272,8 +274,20 @@ uint8_t* fb_build_config_device_id_mapping( } flatbuffers_string_vec_ref_t path_vec = flatbuffers_string_vec_end(&B); - // Empty seg_ids vector + // Build SegIdEntry refs for this device's segments + size_t nseg = seg_counts[i]; + hmmmm_ctrl_config_notif_SegIdEntry_ref_t seg_refs[nseg]; + for (size_t si = 0; si < nseg; si++) { + flatbuffers_string_ref_t name_ref = + flatbuffers_string_create_str(&B, seg_names[i][si] ? seg_names[i][si] : ""); + seg_refs[si] = hmmmm_ctrl_config_notif_SegIdEntry_create( + &B, name_ref, (uint32_t)si); + } + hmmmm_ctrl_config_notif_SegIdEntry_vec_start(&B); + for (size_t si = 0; si < nseg; si++) { + hmmmm_ctrl_config_notif_SegIdEntry_vec_push(&B, seg_refs[si]); + } hmmmm_ctrl_config_notif_SegIdEntry_vec_ref_t seg_ids_vec = hmmmm_ctrl_config_notif_SegIdEntry_vec_end(&B); @@ -350,3 +364,41 @@ uint8_t* fb_build_config_error(uint64_t nonce, const char* message, size_t* len_ NULL_GUARD(buf); return buf; } + + +uint8_t* fb_build_compacted_message( + uint8_t* const* msgs, const size_t* msg_lens, size_t count, size_t* len_out) +{ + flatcc_builder_t B; + if (flatcc_builder_init(&B)) { + panic("flatcc_builder_init failed\n"); + } + + // Build CompactedFrame refs bottom-up (FlatBuffers requires nested objects + // to be created before the table that references them). + hmmmm_CompactedFrame_ref_t frame_refs[count]; + for (size_t i = 0; i < count; i++) { + flatbuffers_uint8_vec_ref_t data_vec = + flatbuffers_uint8_vec_create(&B, msgs[i], msg_lens[i]); + frame_refs[i] = hmmmm_CompactedFrame_create(&B, data_vec); + } + + hmmmm_CompactedFrame_vec_start(&B); + for (size_t i = 0; i < count; i++) { + hmmmm_CompactedFrame_vec_push(&B, frame_refs[i]); + } + hmmmm_CompactedFrame_vec_ref_t frames_vec = hmmmm_CompactedFrame_vec_end(&B); + + hmmmm_CompactedMessage_ref_t cm = hmmmm_CompactedMessage_create(&B, frames_vec); + + hmmmm_ServerPayload_union_ref_t payload = + hmmmm_ServerPayload_as_CompactedMessage(cm); + + // UINT64_MAX = no-nonce (fire-and-forget batch frame) + hmmmm_ServerMessage_create_as_root(&B, UINT64_MAX, payload); + + uint8_t* buf = flatcc_builder_finalize_buffer(&B, len_out); + flatcc_builder_clear(&B); + NULL_GUARD(buf); + return buf; +}