diff --git a/Makefile b/Makefile index 62fc4aa..3708f89 100644 --- a/Makefile +++ b/Makefile @@ -4,14 +4,15 @@ SRC_DIR=src INC_DIR=inc CC=gcc OBJDUMP=objdump -LIBS=deps/ptQueue/out/ptQueue.a deps/wsServer/libws.a deps/flatcc/lib/libflatccrt.a +LIBS=deps/wsServer/libws.a deps/flatcc/lib/libflatccrt.a LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE) # flatcc runtime and generated reader headers use const-dropping casts; # include both as system headers so -Wcast-qual doesn't apply to them SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/ STATIC_LIBS=crypto STANDART=c23 -OPTIMIZE=-Og +# OPTIMIZE=-Og #3 -march=native +OPTIMIZE=-O3 -march=native TARGET=main FLATCC = deps/flatcc/bin/flatcc @@ -34,14 +35,16 @@ C_INCLUDES=-I$(INC_DIR)/ $(addprefix -I,$(LIBS_HEADERS)) C_DEFS=-D_POSIX_C_SOURCE=199309L -DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop +DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop -Wno-unused-function -Wno-unused-but-set-variable PEDANTIC_FLAGS=-pedantic -pedantic-errors $(DISABLE_FLAGS) -Wall -Wcast-align -Wcast-qual -Wconversion -Wduplicated-branches -Wduplicated-cond -Werror -Wextra -Wfloat-equal -Wlogical-op -Wpedantic -Wredundant-decls -Wsign-conversion ANALYZER_FLAGS=-fanalyzer -fdiagnostics-show-option -fdiagnostics-color=always LSECTIONS=-ffunction-sections -fdata-sections -Wl,--gc-sections -CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP # -fno-omit-frame-pointer -fno-inline -fno-lto +# CFLAGS=$(C_DEFS) -g -pthread $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline # -fno-omit-frame-pointer -fno-inline -fno-lto +CFLAGS=$(C_DEFS) -g -pthread $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline # -fno-omit-frame-pointer -fno-inline -fno-lto STATICLIBS_FLAGS=$(addprefix -l,$(STATIC_LIBS)) -# LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline -fno-lto -LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -flto -fuse-linker-plugin $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline +# LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline -fno-lto +# LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -fuse-linker-plugin $(LSECTIONS) -lm -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fno-omit-frame-pointer -fno-inline -flto -lgcov # -fno-omit-frame-pointer -fno-inline # #-fprofile-generate # -flto +LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -fuse-linker-plugin $(LSECTIONS) -lm # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fno-omit-frame-pointer -fno-inline -flto -lgcov # -fno-omit-frame-pointer -fno-inline # #-fprofile-generate # -flto @@ -53,10 +56,18 @@ DEP = $(filter %.d, $(OBJECTS:.o=.d)) OBJECTS += $(LIBS) vpath %.c $(sort $(dir $(C_SOURCES))) +# asan: CFLAGS := $(filter-out -fanalyzer,$(CFLAGS)) -fsanitize=address,undefined +# asan: LFLAGS := $(filter-out -flto -fuse-linker-plugin,$(LFLAGS)) -fsanitize=address,undefined +# asan: build + +# tsan: CFLAGS := $(filter-out -fanalyzer,$(CFLAGS)) -fsanitize=thread +# tsan: LFLAGS := $(filter-out -flto -fuse-linker-plugin,$(LFLAGS)) -fsanitize=thread +# tsan: build + all: build -build: date deps Dir proto python-proto target compile_commands +build: date deps Dir proto python-proto target rebuild: clean | build @@ -121,10 +132,10 @@ $(FLATC): @$(MAKE) -C deps flatbuffers/build/flatc -.PHONY: clean deps proto python-proto +.PHONY: clean deps proto python-proto asan tsan clean + clean: @rm -rf $(BUILD_DIR)/* $(BUILD_DIR)/.* $(PROTO_INC_DIR)/* $(PROTO_SRC_DIR)/* -# @rm -f compile_commands.json @echo -e '\033[0;31mCleaned\033[0m' .NOTPARALLEL: date target rebuild deps proto @@ -133,6 +144,3 @@ date: @date @echo -e '\033[0m' - -compile_commands: -# @bear -- ./.gen_compile_commands.sh $(TARGET) $(CC) "$(CFLAGS)" "$(LFLAGS)" "$(OBJECTS)" diff --git a/devices/avr_generic/src/device.c b/devices/avr_generic/src/device.c index 2e2758b..243343b 100644 --- a/devices/avr_generic/src/device.c +++ b/devices/avr_generic/src/device.c @@ -27,13 +27,12 @@ void freeDevMem(device_mem_t* devMem) { free(devMem->memsegShifts); free(devMem->memsegSizes); + free(devMem->memsegCellSizes); free(devMem->rawCells); free(devMem->memreadCellAddrs); free(devMem->memwriteCellAddrs); free(devMem->memwriteCellSegments); - free(devMem->memwriteValues[0]); free(devMem->memwriteValues); - free(devMem->memwriteWordLengths); free(devMem->cells); free(devMem->smartAddrReadHandlers); free(devMem->smartAddrWriteHandlers); @@ -118,10 +117,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) return NULL; } - for(size_t i = 0; i < devSpec->memSpecsCount; i++) - { - // cellNames[i] = devSpec->memSpecs[i]->name; - } for (uint8_t i = 0; i < devSpec->memSpecsCount; i++) { @@ -170,8 +165,7 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) return NULL; } - void** memwriteValues = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(void*)); - + uint64_t* memwriteValues = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint64_t)); if(memwriteValues == NULL) { sprintf(errbuf, "unable to allocate write interception addrs"); @@ -186,43 +180,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) return NULL; } - uint64_t* memwriteValuesContainers = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint64_t)); - if(memwriteValuesContainers == NULL) - { - sprintf(errbuf, "unable to allocate write interception addrs"); - free(devMem->memsegShifts); - free(devMem); - free(rawCells); - free(memreadCellAddrs); - free(cells); - free(cellNames); - free(memwriteCellAddrs); - free(memwriteCellSegments); - free(memwriteValues); - return NULL; - } - - for(size_t i = 0; i < 64; i++) - { - memwriteValues[i] = &memwriteValuesContainers[i]; - } - - uint8_t* memwriteWordLengths = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint8_t)); - if(memwriteWordLengths == NULL) - { - sprintf(errbuf, "unable to allocate write interception addrs"); - free(devMem->memsegShifts); - free(devMem); - free(rawCells); - free(memreadCellAddrs); - free(cells); - free(cellNames); - free(memwriteCellAddrs); - free(memwriteCellSegments); - free(memwriteValues); - free(memwriteValuesContainers); - return NULL; - } uint64_t smartAddrReadMask = 0; uint64_t smartAddrWriteMask = 0; @@ -240,8 +197,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) free(memwriteCellAddrs); free(memwriteCellSegments); free(memwriteValues); - free(memwriteValuesContainers); - free(memwriteWordLengths); return NULL; } @@ -259,8 +214,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) free(memwriteCellAddrs); free(memwriteCellSegments); free(memwriteValues); - free(memwriteValuesContainers); - free(memwriteWordLengths); free(cells); free(cellNames); return NULL; @@ -270,7 +223,26 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) if(memsegSizes == NULL) { - sprintf(errbuf, "unable to allocate write interception handlers"); + sprintf(errbuf, "unable to allocate memseg sizes"); + free(smartAddrWriteHandlers); + free(smartAddrReadHandlers); + free(devMem->memsegShifts); + free(devMem); + free(rawCells); + free(memreadCellAddrs); + free(memwriteCellAddrs); + free(memwriteCellSegments); + free(memwriteValues); + free(cells); + free(cellNames); + return NULL; + } + + uint8_t* memsegCellSizes = calloc(devSpec->memSpecsCount, sizeof(uint8_t)); + if(memsegCellSizes == NULL) + { + sprintf(errbuf, "unable to allocate memseg cell sizes"); + free(memsegSizes); free(smartAddrWriteHandlers); free(smartAddrReadHandlers); free(devMem->memsegShifts); @@ -280,31 +252,29 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) free(memwriteCellAddrs); free(memwriteCellSegments); free(memwriteValues); - free(memwriteValuesContainers); - free(memwriteWordLengths); free(cells); free(cellNames); return NULL; } - for(uint64_t i = 0; i < memTotalSize; i++) - { - smartAddrReadHandlers[i].func = NULL; - smartAddrReadHandlers[i].ident = 0; + // for(uint64_t i = 0; i < memTotalSize; i++) + // { + // smartAddrReadHandlers[i].func = NULL; + // smartAddrReadHandlers[i].ident = 0; - smartAddrWriteHandlers[i].func = NULL; - smartAddrWriteHandlers[i].ident = 0; - } + // smartAddrWriteHandlers[i].func = NULL; + // smartAddrWriteHandlers[i].ident = 0; + // } - for(uint64_t i = 0; i < memTotalSize; i++) - { - if((i & smartAddrReadMask) == smartAddrReadMask) - { - smartAddrReadHandlers[i].func = NULL; - } - } + // for(uint64_t i = 0; i < memTotalSize; i++) + // { + // if((i & smartAddrReadMask) == smartAddrReadMask) + // { + // smartAddrReadHandlers[i].func = NULL; + // } + // } if (devSpec->smartReadSpecsCount > 0) @@ -355,7 +325,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) devMem->memreadCellAddrs = memreadCellAddrs; devMem->memwriteCellAddrs = memwriteCellAddrs; devMem->memwriteCellSegments = memwriteCellSegments; - devMem->memwriteWordLengths = memwriteWordLengths; devMem->memwriteValues = memwriteValues; devMem->memreadLen = 0; devMem->memwriteLen = 0; @@ -365,6 +334,7 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) devMem->smartAddrWriteHandlers = smartAddrWriteHandlers; devMem->memsegNames = cellNames; devMem->memsegSizes = memsegSizes; + devMem->memsegCellSizes = memsegCellSizes; memseg_metadata_t requiredSegments[] = MEMSEG_DEFINES; @@ -380,7 +350,8 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf) const uint8_t seg_id = seg_def.seg_id; devMem->memsegShifts[seg_id] = memSegToGlobal(devSpec, i, 0); devMem->memsegSizes[seg_id] = devSpec->memSpecs[i]->len; - printf("set mem segment %d meta: +%lu/%lu \n", seg_id, devMem->memsegShifts[j], devMem->memsegSizes[j]); + devMem->memsegCellSizes[seg_id] = devSpec->memSpecs[i]->wordLen; + printf("set mem segment %d meta: +%lu/%lu[%d] \n", seg_id, devMem->memsegShifts[seg_id], devMem->memsegSizes[seg_id], devMem->memsegCellSizes[seg_id]); } } } @@ -404,6 +375,10 @@ uint8_t makeDeviceTick(device_public_context_t* devContext) prog_counter_t _pc; READ_MEM(_pc, devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t) // printf("old PC is %d\n", _pc); + if(_pc >= devInfo->deviceMem->memsegSizes[MEMSEG_PS]) + { + _pc = 0; + } uint8_t ticks = makeTick(&_pc, devInfo->instr, devInfo->deviceMem); WRITE_MEM(devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t, _pc); // printf("new PC is %d\n", _pc); @@ -593,7 +568,7 @@ device_specs_t* parseSpecsFromConfig(const conf_dev_t* devConf, char* errbuf) free(requiredSegmentsFoundMap); return NULL; } - specs->memSpecs[specNum]->name = calloc(strlen(segments[i]->name), sizeof(char)); + specs->memSpecs[specNum]->name = calloc(strlen(segments[i]->name) + 1, sizeof(char)); if(specs->memSpecs[specNum]->name == NULL) { sprintf(errbuf, "unable to allocate spec %d name", i); @@ -837,6 +812,8 @@ uint8_t pubExtractPcounterSizeWords() void reset (device_specs_t* specs, device_public_context_t* devInfo) { + // printf("reset device\n"); + // uint8_t fuck = 0; for(size_t i = 0; i < specs->memSpecsCount; i++) { if(i != MEMDATA_OPSIZE) @@ -845,10 +822,12 @@ void reset (device_specs_t* specs, device_public_context_t* devInfo) for(size_t j = 0; j < spec->len; j++) { ((uint8_t*)devInfo->deviceMem->cells[i])[j] = 0; + // fuck++; } } } devInfo->deviceMem->memwriteLen = 0; devInfo->deviceMem->memreadLen = 0; -} \ No newline at end of file +} + diff --git a/hmmmm_scripts b/hmmmm_scripts index 1bcda28..6a52cfb 160000 --- a/hmmmm_scripts +++ b/hmmmm_scripts @@ -1 +1 @@ -Subproject commit 1bcda2881a5c063d0b81b1043d0ad8b364e198e5 +Subproject commit 6a52cfbb3ceb36d704333b900b61a0ec1fd646c5 diff --git a/inc/client.h b/inc/client.h index 882533e..fa9bb2e 100644 --- a/inc/client.h +++ b/inc/client.h @@ -4,13 +4,13 @@ #include #include "wsServer/include/ws.h" -#include "ptQueue/inc/ptQueue.h" +#include "ptQueue/inc/spsc.h" typedef struct { ws_cli_conn_t clientId; uint8_t isAuthed; - ptQueue* incomeQ; - ptQueue* outcomeQ; + queue_spsc_t* incomeQ; + queue_spsc_t* outcomeQ; uint64_t connectedAt; uint32_t streamRegIterator; } ClientContext; diff --git a/inc/config.h b/inc/config.h index c34002a..e3bfbae 100644 --- a/inc/config.h +++ b/inc/config.h @@ -3,10 +3,10 @@ #include "pub/libhmmmm/config.h" -void freeMemSegConf(conf_mem_seg_t* memSegConf); -void freeMemConf(conf_mem_t* memConf); +// void freeMemSegConf(conf_mem_seg_t* memSegConf); +// void freeMemConf(conf_mem_t* memConf); void freeConf(conf_dev_t* conf); void freeComposeId(char** id); -uint8_t compareComposeId(char** idA, char** idB); +// uint8_t compareComposeId(char** idA, char** idB); #endif \ No newline at end of file diff --git a/inc/context.h b/inc/context.h index 306b1f7..2fef165 100644 --- a/inc/context.h +++ b/inc/context.h @@ -2,10 +2,12 @@ #define __CONTEXT_H__ #include +#include #include -#include "ptQueue/inc/ptQueue.h" +#include "ptQueue/inc/spsc.h" #include "wsServer/include/ws.h" +#include "ptQueue/inc/mpsc.h" #include "linkedlist.h" #include "sized_ptr.h" @@ -16,8 +18,8 @@ typedef struct { SizedPtr* bufs; uint8_t buffersCount; - _Atomic (uint8_t) readRequestIdx; - _Atomic (uint8_t) currWritingIdx; + CACHE_ALIGN _Atomic (uint8_t) readRequestIdx; + CACHE_ALIGN _Atomic (uint8_t) currWritingIdx; } OutgoingBuffers; @@ -43,12 +45,15 @@ typedef struct { // Cached DeviceIdMappingNotif broadcast message (sent to newly authed clients) uint8_t* deviceIdMappingMsg; size_t deviceIdMappingMsgLen; + queue_mpsc_t* inMsgQueue; + queue_spsc_t* outMsgQueue; } EmulContext; typedef struct { pthread_mutex_t registerMutex; - ptQueue* regQueue; + queue_spsc_t* regQueue; + queue_mpsc_t* inMsgQueue; uint8_t* accessToken; EmulContext* emulContext; } ServerContext; diff --git a/inc/proto/dial.h b/inc/proto/dial.h index d90a0fc..1e84ba7 100644 --- a/inc/proto/dial.h +++ b/inc/proto/dial.h @@ -4,8 +4,9 @@ #include #include "context.h" +#include "ptQueue/inc/spsc.h" void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen); -void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen); +void dispatchOutgoingMessage(queue_spsc_t* outMsgQueue, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen); #endif \ No newline at end of file diff --git a/inc/proto/handlers/stream.h b/inc/proto/handlers/stream.h index e2c3be5..eb15f89 100644 --- a/inc/proto/handlers/stream.h +++ b/inc/proto/handlers/stream.h @@ -7,7 +7,7 @@ #include "proto/dial.h" #include "stream_reader.h" -void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId); +// static void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId); void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx); void handleIncomingStreamMessage( diff --git a/inc/proto/msg.h b/inc/proto/msg.h index 002f9e7..eab74fc 100644 --- a/inc/proto/msg.h +++ b/inc/proto/msg.h @@ -1,6 +1,7 @@ #ifndef __PROTO_MSG_H__ #define __PROTO_MSG_H__ +#include "client.h" #include "wsServer/include/ws.h" #include #include @@ -10,6 +11,7 @@ typedef struct { uint8_t* data; size_t size; + ClientContext* ctx; } FbMessage; typedef struct { diff --git a/inc/pub/libhmmmm/mem.h b/inc/pub/libhmmmm/mem.h index c9ec47d..ea91450 100644 --- a/inc/pub/libhmmmm/mem.h +++ b/inc/pub/libhmmmm/mem.h @@ -32,11 +32,11 @@ typedef struct char** memsegNames; uint64_t* memsegShifts; uint64_t* memsegSizes; + uint8_t* memsegCellSizes; uint64_t* memreadCellAddrs; - uint8_t* memwriteWordLengths; uint8_t* memwriteCellSegments; uint64_t* memwriteCellAddrs; - void** memwriteValues; + uint64_t* memwriteValues; uint8_t memreadLen; uint8_t memwriteLen; } device_mem_t; @@ -87,10 +87,9 @@ typedef void (*ext_h_write_func)(uint64_t addr, void* rawCells, void* data, void } \ __mem->memwriteCellAddrs[__mem->memwriteLen] = __addr; \ __mem->memwriteCellSegments[__mem->memwriteLen] = __segno; \ - __mem->memwriteWordLengths[__mem->memwriteLen] = sizeof(__cell_t); \ - *((__cell_t*)__mem->memwriteValues[__mem->memwriteLen]) = (__cell_t)(__val); \ + /*__mem->memwriteWordLengths[__mem->memwriteLen] = sizeof(__cell_t);*/ \ + (__mem->memwriteValues[__mem->memwriteLen]) = (__cell_t)(__val); \ __mem->memwriteLen += 1; \ } - #endif // ifndef __HMMMM_PUB_MEM_H__ diff --git a/src/compose_device.c b/src/compose_device.c index 5c4d736..27e700a 100644 --- a/src/compose_device.c +++ b/src/compose_device.c @@ -6,80 +6,81 @@ #include "compose_device.h" -char** appendId(char** prev, const char* cur, char* errbuf) -{ - if(prev == NULL) - { - prev = malloc(sizeof(char*) * 2); - if(prev == NULL) - { - snprintf(errbuf, 1024, "unable to allocate id"); - return NULL; - } - prev[0] = NULL; - prev[1] = NULL; - } +// char** appendId(char** prev, const char* cur, char* errbuf) +// { +// if(prev == NULL) +// { +// prev = malloc(sizeof(char*) * 2); +// if(prev == NULL) +// { +// snprintf(errbuf, 1024, "unable to allocate id"); +// return NULL; +// } +// prev[0] = NULL; +// prev[1] = NULL; +// } - size_t clockIdLen = 0; - while (prev[clockIdLen] != NULL){clockIdLen++;} +// size_t clockIdLen = 0; +// while (prev[clockIdLen] != NULL){clockIdLen++;} - clockIdLen++; +// clockIdLen++; - char** new = realloc(prev, sizeof(char*) * (clockIdLen + 1)); +// char** new = realloc(prev, sizeof(char*) * (clockIdLen + 1)); - if(new == NULL) - { - snprintf(errbuf, 1024, "unable to reallocate id"); - freeComposeId(prev); - return NULL; - } - prev = new; +// if(new == NULL) +// { +// snprintf(errbuf, 1024, "unable to reallocate id"); +// freeComposeId(prev); +// return NULL; +// } +// prev = new; - prev[clockIdLen] = NULL; - size_t idLen = strlen(cur); - prev[clockIdLen - 1] = malloc(sizeof(char) * (idLen + 1)); - if(prev[clockIdLen - 1] == NULL) - { - snprintf(errbuf, 1024, "unable to allocate new id entry"); - freeComposeId(prev); - return NULL; - } - strcpy(prev[clockIdLen - 1], cur); - prev[clockIdLen - 1][idLen] = '\0'; - return prev; -} +// prev[clockIdLen] = NULL; +// size_t idLen = strlen(cur); +// prev[clockIdLen - 1] = malloc(sizeof(char) * (idLen + 1)); +// if(prev[clockIdLen - 1] == NULL) +// { +// snprintf(errbuf, 1024, "unable to allocate new id entry"); +// freeComposeId(prev); +// return NULL; +// } +// strcpy(prev[clockIdLen - 1], cur); +// prev[clockIdLen - 1][idLen] = '\0'; +// return prev; +// } -void freeProjectionConfig(projection_conf_t* conf) -{ - if(conf == NULL) - { - return; - } - freeComposeId(conf->target); - freeComposeId(conf->baseAt); - if(conf->baseSeg != NULL) - { - free(conf->baseSeg); - } - conf->target = NULL; - conf->baseAt = NULL; - conf->baseSeg = NULL; - free(conf); -} -void freeProjectionConfigs(projection_conf_t** confs) -{ - if(confs == NULL) - { - return; - } - size_t i = 0; - while(confs[i] != NULL) - { - freeProjectionConfig(confs[i]); - confs[i] = NULL; - } - free(confs); -} +// static void freeProjectionConfig(projection_conf_t* conf) +// { +// if(conf == NULL) +// { +// return; +// } +// freeComposeId(conf->target); +// freeComposeId(conf->baseAt); +// if(conf->baseSeg != NULL) +// { +// free(conf->baseSeg); +// } +// conf->target = NULL; +// conf->baseAt = NULL; +// conf->baseSeg = NULL; +// free(conf); +// } + +// void freeProjectionConfigs(projection_conf_t** confs) +// { +// if(confs == NULL) +// { +// return; +// } +// size_t i = 0; +// while(confs[i] != NULL) +// { +// freeProjectionConfig(confs[i]); +// confs[i] = NULL; +// } +// free(confs); +// } device_handle_t** openComposeDevice(compose_dev_conf_t* conf, char* errbuf) diff --git a/src/config.c b/src/config.c index 8794c55..596378b 100644 --- a/src/config.c +++ b/src/config.c @@ -4,7 +4,7 @@ #include "config.h" -void xfree(void* p) +static void xfree(void* p) { if(p != NULL) { @@ -28,21 +28,21 @@ void freeComposeId(char** id) } -uint8_t compareComposeId(char** idA, char** idB) -{ - size_t i = 0; - while(idA[i] != NULL && idB[i] != NULL) - { - if(strcmp(idA[i], idB[i]) != 0) - { - return 0; - } - i++; - } - return idA[i] == NULL && idB[i] == NULL; -} +// static uint8_t compareComposeId(char** idA, char** idB) +// { +// size_t i = 0; +// while(idA[i] != NULL && idB[i] != NULL) +// { +// if(strcmp(idA[i], idB[i]) != 0) +// { +// return 0; +// } +// i++; +// } +// return idA[i] == NULL && idB[i] == NULL; +// } -void freeMemSegConf(conf_mem_seg_t* memSegConf) +static void freeMemSegConf(conf_mem_seg_t* memSegConf) { if(memSegConf == NULL) { @@ -50,7 +50,7 @@ void freeMemSegConf(conf_mem_seg_t* memSegConf) } xfree(memSegConf->name); } -void freeMemConf(conf_mem_t* memConf) +static void freeMemConf(conf_mem_t* memConf) { if(memConf == NULL) { diff --git a/src/hmmmm.c b/src/hmmmm.c index 3e91013..f36642b 100644 --- a/src/hmmmm.c +++ b/src/hmmmm.c @@ -26,7 +26,7 @@ typedef void (*_dlib_freeDevMem_t)(device_mem_t* mem); typedef void (*_dlib_fillSmartReadSpecs_t)(void* specs, smart_read_spec_t* smartReadSpecs, uint64_t smartReadSpecsCount); typedef void (*_dlib_fillSmartWriteSpecs_t)(void* specs, smart_write_spec_t* smartWriteSpecs, uint64_t smartWriteSpecsCount); -instruction_simul_handlers_t* _fillInstructionSimul(void* handle) +static instruction_simul_handlers_t* _fillInstructionSimul(void* handle) { instruction_simul_handlers_t* ret = malloc(sizeof(instruction_simul_handlers_t)); diff --git a/src/main.c b/src/main.c index a197af9..7c0b918 100644 --- a/src/main.c +++ b/src/main.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -33,7 +34,9 @@ #include "linkedlist.h" -#include "ptQueue/inc/ptQueue.h" +#include "ptQueue/inc/spsc.h" +#include "ptQueue/inc/mpsc.h" + #include "wsServer/include/ws.h" #define _GNU_SOURCE @@ -64,69 +67,77 @@ #include "proto/pack.h" -void printMemory(void* cells, uint64_t cellsCount) +volatile sig_atomic_t done = 0; + +void term(int signum) { - - for(uint64_t i = 0; i < cellsCount; i++) - { - printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]); - - // uint8_t wasRead = 0; - // for (uint8_t j = 0; j < mem->memreadLen; j++) - // { - // if(mem->memreadCellAddrs[j] == i) - // { - // wasRead = 1; - // break; - // } - // } - // uint8_t wasWrite = 0; - // for (uint8_t j = 0; j < mem->memwriteLen; j++) - // { - // if(mem->memwriteCellAddrs[j] == i) - // { - // wasWrite = 1; - // break; - // } - // } - - // if (wasRead == 1) - // { - // printf("\t[was read]"); - // } - // else - // { - // printf("\t[was not read]"); - // } - - // if (wasWrite == 1) - // { - // printf("\t[was written]"); - // } - // else - // { - // printf("\t[was not written]"); - // } - printf("\n"); - } + printf("Caught!\n"); + done = 1; } +// static void printMemory(void* cells, uint64_t cellsCount) +// { + +// for(uint64_t i = 0; i < cellsCount; i++) +// { +// printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]); + +// // uint8_t wasRead = 0; +// // for (uint8_t j = 0; j < mem->memreadLen; j++) +// // { +// // if(mem->memreadCellAddrs[j] == i) +// // { +// // wasRead = 1; +// // break; +// // } +// // } +// // uint8_t wasWrite = 0; +// // for (uint8_t j = 0; j < mem->memwriteLen; j++) +// // { +// // if(mem->memwriteCellAddrs[j] == i) +// // { +// // wasWrite = 1; +// // break; +// // } +// // } + +// // if (wasRead == 1) +// // { +// // printf("\t[was read]"); +// // } +// // else +// // { +// // printf("\t[was not read]"); +// // } + +// // if (wasWrite == 1) +// // { +// // printf("\t[was written]"); +// // } +// // else +// // { +// // printf("\t[was not written]"); +// // } +// printf("\n"); +// } +// } -void dummyWriteHandler(uint64_t ident, uint64_t addr, void* rawCells, void* data) -{ - // printf("Intercepted write on 0x%lx: 0x%02x\n", addr, *((uint8_t*)data)); - printf("Intercepted write on 0x%lx: ", addr); - printf("0b"); - for(uint8_t i = 0; i < 8; i++) - { - printf("%d", (*((uint8_t*)data) >> (7 - i)) & 1); - } - printf("\n"); - ((uint8_t*)rawCells)[addr] = *((uint8_t*)data); - return; -} + +// void dummyWriteHandler(uint64_t ident, uint64_t addr, void* rawCells, void* data) +// { +// // printf("Intercepted write on 0x%lx: 0x%02x\n", addr, *((uint8_t*)data)); +// printf("Intercepted write on 0x%lx: ", addr); +// printf("0b"); +// for(uint8_t i = 0; i < 8; i++) +// { +// printf("%d", (*((uint8_t*)data) >> (7 - i)) & 1); +// } +// printf("\n"); +// ((uint8_t*)rawCells)[addr] = *((uint8_t*)data); +// return; +// } @@ -135,24 +146,24 @@ ext_h_read_func* interceptReadRouter; ext_h_write_func* interceptWriteRouter; void** iterceptDevContextRouter; -void* readExt(uint64_t ident, uint64_t addr, void* rawCells) -{ - void* devContext = iterceptDevContextRouter[ident]; - ext_h_read_func tgt = interceptReadRouter[ident]; - return tgt(addr, rawCells, devContext); -} -void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data) -{ - void* devContext = iterceptDevContextRouter[ident]; - ext_h_write_func tgt = interceptWriteRouter[ident]; - tgt(addr, rawCells, data, devContext); -} +// static void* readExt(uint64_t ident, uint64_t addr, void* rawCells) +// { +// void* devContext = iterceptDevContextRouter[ident]; +// ext_h_read_func tgt = interceptReadRouter[ident]; +// return tgt(addr, rawCells, devContext); +// } +// static void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data) +// { +// void* devContext = iterceptDevContextRouter[ident]; +// ext_h_write_func tgt = interceptWriteRouter[ident]; +// tgt(addr, rawCells, data, devContext); +// } // void *threadMain(void *param); -void my_sleep(int microseconds) { +static void my_sleep(int microseconds) { struct timespec ts; ts.tv_sec = microseconds / 1000000; // секунды ts.tv_nsec = (microseconds % 1000000) * 1000; // наносекунды @@ -232,99 +243,90 @@ static void compact_outgoing_buffer(SizedPtr* buf) } -void* outgoingMain(void* args) +static void* outgoingMain(void* args) { - OutgoingBuffers* outBufs = args; - uint8_t bufsCount = outBufs->buffersCount; - uint8_t currBufIdx = 0; - uint8_t currWritingIdxPtr = 0; - SizedPtr* curBuf = NULL; - while (currWritingIdxPtr != 0xFF) + EmulContext* emulContext = args; + // OutgoingBuffers* outBufs = emulContext->outBufs; + // uint8_t bufsCount = outBufs->buffersCount; + // uint8_t currBufIdx = 0; + // uint8_t currWritingIdxPtr = 0; + // SizedPtr* curBuf = NULL; + while(!done) { - if(curBuf != NULL) + OutgoingMessage* outMsg = NULL; + if(queue_spsc_pop(emulContext->outMsgQueue, (void**)&outMsg)) { - compact_outgoing_buffer(curBuf); - - OutgoingMessage* messages = curBuf->ptr; - for(size_t i = 0; i < curBuf->size; i++) + if(outMsg != NULL) { - OutgoingMessage *outMsg = &messages[i]; - if(outMsg->msg == NULL) - { - // Slot was merged into a CompactedMessage; skip. - continue; - } ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); free(outMsg->msg); - outMsg->msg = NULL; + free(outMsg); } - curBuf->size = 0; - atomic_store(&outBufs->readRequestIdx, currBufIdx); - - currBufIdx++; - if(currBufIdx >= bufsCount) - { - currBufIdx = 0; - } - } - - currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx); - if(currBufIdx == currWritingIdxPtr) - { - atomic_store(&outBufs->readRequestIdx, currBufIdx); - if(curBuf == NULL) - { - // my_sleep(1); - } - else - { - curBuf = NULL; - } - } - else - { - curBuf = &outBufs->bufs[currBufIdx]; } } + // while (currWritingIdxPtr != 0xFF) + // { + // if(curBuf != NULL) + // { + // compact_outgoing_buffer(curBuf); + + // OutgoingMessage* messages = curBuf->ptr; + // for(size_t i = 0; i < curBuf->size; i++) + // { + // OutgoingMessage *outMsg = &messages[i]; + // if(outMsg->msg == NULL) + // { + // // Slot was merged into a CompactedMessage; skip. + // continue; + // } + // ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); + // free(outMsg->msg); + // outMsg->msg = NULL; + // } + // curBuf->size = 0; + // atomic_store(&outBufs->readRequestIdx, currBufIdx); + + // currBufIdx++; + // if(currBufIdx >= bufsCount) + // { + // currBufIdx = 0; + // } + // } + + // currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx); + // if(currBufIdx == currWritingIdxPtr) + // { + // atomic_store(&outBufs->readRequestIdx, currBufIdx); + // if(curBuf == NULL) + // { + // // my_sleep(1); + // } + // else + // { + // curBuf = NULL; + // } + // } + // else + // { + // curBuf = &outBufs->bufs[currBufIdx]; + // } + // } pthread_exit(0); } -void mockDevice0(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) -{ - for(size_t i = 0; i < 8; i++) - { - readAddrs[*readAddrsLen] = i; - *readAddrsLen += 1; - } - - uint64_t num = decodeBytesToU64(mem); - num += 1; - encodeUintToBytes(num, mem); - for(size_t i = 0; i < 8; i++) - { - writeAddrs[*writeAddrsLen] = i; - *writeAddrsLen += 1; - } -} - -void mockDevice1(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) -{ - -} - -void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, device_mem_t* mem) +static void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, device_mem_t* mem) { size_t mlen = 0; uint8_t* msg = fb_build_stream_data_push( &emulContext->stream_builder, UINT64_MAX, reg->regId, *emulContext->clockCounter, (uint8_t*)(((uint64_t)(mem->cells[reg->segId])) + (uint64_t)reg->startAddr), reg->segLen, &mlen); - dispatchOutgoingMessage(emulContext->outBufs, reg->clientContext->clientId, msg, mlen); + dispatchOutgoingMessage(emulContext->outMsgQueue, reg->clientContext->clientId, msg, mlen); } -void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, device_mem_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode) +static inline void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, device_mem_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode) { if(deviceRegs->regCount == 0) { @@ -338,46 +340,66 @@ void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg size_t dispatchRegsCnt = 0; // uint8_t dispatchedRegMap[1024 * 16] = {0}; - for(size_t i = 0; i < addrsLen; i++) + + for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) { - uint64_t addr = addrs[i]; - for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + StreamReg* reg = &deviceRegs->regs[regIdx]; + if(reg->mode == mode) { - StreamReg* reg = &deviceRegs->regs[regIdx]; - if(reg->mode == mode) + for(size_t i = 0; i < addrsLen; i++) { + const uint64_t addr = addrs[i]; if(reg->startGlobalAddr <= addr && addr <= reg->startGlobalAddr + reg->segLen) { - // if(dispatchedRegMap[reg->regId] == 1) - // { - // // break; - // } - // else - // { - // dispatchRegs[dispatchRegsCnt] = reg; - // dispatchedRegMap[reg->regId] = 1; - // dispatchRegsCnt++; - // } - uint8_t isDuplicate = 0; - for(size_t j = 0; j < dispatchRegsCnt; j++) - { - if(dispatchRegs[j] == reg) - { - isDuplicate = 1; - break; - } - } - if(!isDuplicate) - { - dispatchRegs[dispatchRegsCnt] = reg; - // dispatchedRegMap[reg->regId] = 1; - dispatchRegsCnt++; - } + dispatchRegs[dispatchRegsCnt] = reg; + dispatchRegsCnt++; + break; } } } } + + // for(size_t i = 0; i < addrsLen; i++) + // { + // uint64_t addr = addrs[i]; + // for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + // { + // StreamReg* reg = &deviceRegs->regs[regIdx]; + // if(reg->mode == mode) + // { + // if(reg->startGlobalAddr <= addr && addr <= reg->startGlobalAddr + reg->segLen) + // { + // // if(dispatchedRegMap[reg->regId] == 1) + // // { + // // // break; + // // } + // // else + // // { + // // dispatchRegs[dispatchRegsCnt] = reg; + // // dispatchedRegMap[reg->regId] = 1; + // // dispatchRegsCnt++; + // // } + // uint8_t isDuplicate = 0; + // for(size_t j = 0; j < dispatchRegsCnt; j++) + // { + // if(dispatchRegs[j] == reg) + // { + // isDuplicate = 1; + // break; + // } + // } + // if(!isDuplicate) + // { + // dispatchRegs[dispatchRegsCnt] = reg; + // // dispatchedRegMap[reg->regId] = 1; + // dispatchRegsCnt++; + // } + // } + // } + // } + // } + // if(dispatchRegsCnt == 0) // { // printf("No memory dispatched\n"); @@ -391,7 +413,7 @@ void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg } -uint64_t getCurrentUsec() +static uint64_t getCurrentUsec() { struct timeval tv; gettimeofday(&tv,NULL); @@ -406,10 +428,218 @@ static void print_usage(const char* progname) fprintf(stderr, " --host HOST Listen host (default: localhost)\n"); } + +static void outbufMaintence(EmulContext* emulContext) +{ + OutgoingBuffers* outBufs = emulContext->outBufs; + + uint8_t readReqIdx = atomic_load(&outBufs->readRequestIdx); + if(readReqIdx == outBufs->currWritingIdx || outBufs->bufs[outBufs->currWritingIdx].size >= outBufs->bufs[outBufs->currWritingIdx].allocatedSize / 2) + { + uint8_t newWriteIdx = outBufs->currWritingIdx + 1; + if(newWriteIdx >= outBufs->buffersCount) + { + newWriteIdx = 0; + } + if(outBufs->bufs[outBufs->currWritingIdx].size != 0 || readReqIdx != newWriteIdx) + { + while(readReqIdx == newWriteIdx) + { + printf("FLOOD CONTROL - SLEEP\n"); + my_sleep(100); + readReqIdx = atomic_load(&outBufs->readRequestIdx); + } + atomic_store(&outBufs->currWritingIdx, newWriteIdx); + outBufs->bufs[outBufs->currWritingIdx].size = 0; + } + } +} + +static StreamReg* dispatchRegs[1024 * 16]; + +static void commitDevMem(EmulContext* emulContext) +{ + + for(size_t di = 0; di < emulContext->devicesCount; di++) + { + device_handle_t* dev = (device_handle_t*)emulContext->deviceHandles[di]; + device_mem_t* devMem = dev->ctx->deviceMem; + if(dev->clockCycleCounter == 0) + { + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[di]; + + size_t dispatchRegsCnt = 0; + + if(devMem->memwriteLen > 0) + { + + // printf("device %lu has %d writes\n", di, devMem->memwriteLen); + // uint64_t globalWriteAddrs[MEM_ACCESS_INTERCEPT_BUF_SIZE] = {0}; + for(size_t i = 0; i < devMem->memwriteLen; i++) + { + const uint8_t seg = devMem->memwriteCellSegments[i]; + + const uint8_t wordLen = devMem->memsegCellSizes[seg]; // devMem->memwriteWordLengths[i]; + const uint64_t addr = devMem->memwriteCellAddrs[i]; + uint64_t val = devMem->memwriteValues[i]; + + switch(wordLen) + { + case 1: + { + ((uint8_t*)(devMem->cells[seg]))[addr] = ((uint8_t)val); + // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint8_t*)val, seg, addr); + break; + } + case 2: + { + ((uint16_t*)(devMem->cells[seg]))[addr] = ((uint16_t)val); + // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint16_t*)val, seg, addr); + break; + } + case 4: + { + ((uint32_t*)(devMem->cells[seg]))[addr] = ((uint32_t)val); + // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint32_t*)val, seg, addr); + break; + } + case 8: + { + ((uint64_t*)(devMem->cells[seg]))[addr] = ((uint64_t)val); + // printf("[DEV/WRITE] %lu -> [%d].%lu\n", *(uint64_t*)val, seg, addr); + break; + } + default: + { + printf("invalid word size: %d\n", wordLen); + } + } + + size_t globalAddr = devMem->memsegShifts[seg] + addr; + + for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + { + StreamReg* reg = &deviceRegs->regs[regIdx]; + if(reg->mode == STREAM_MODE_WRITE) + { + if(reg->startGlobalAddr <= globalAddr && globalAddr <= reg->startGlobalAddr + reg->segLen) + { + uint8_t isDuplicate = 0; + for(size_t j = 0; j < dispatchRegsCnt; j++) + { + if(dispatchRegs[j] == reg) + { + isDuplicate = 1; + break; + } + } + if(!isDuplicate) + { + dispatchRegs[dispatchRegsCnt] = reg; + dispatchRegsCnt++; + } + } + } + } + // globalWriteAddrs[i] = globalAddr; + } + + + // for(size_t i = 0; i < dispatchRegsCnt; i++) + // { + // dispatchStreamSegment(emulContext, dispatchWriteRegs[i], devMem); + // } + + // dispatchMemAccessNotifications(emulContext, emulContext->deviceStreamRegs[di], devMem, globalWriteAddrs, (size_t)devMem->memwriteLen, STREAM_MODE_WRITE); + devMem->memwriteLen = 0; + } + else + { + // printf("device %lu has no writes\n", di); + } + if(devMem->memreadLen > 0) + { + // StreamReg* dispatchReadRegs[1024 * 16]; + // size_t dispatchRegsCnt = 0; + + for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + { + StreamReg* reg = &deviceRegs->regs[regIdx]; + if(reg->mode == STREAM_MODE_READ) + { + for(size_t i = 0; i < devMem->memreadLen; i++) + { + size_t globalAddr = devMem->memreadCellAddrs[i]; + if(reg->startGlobalAddr <= globalAddr && globalAddr <= reg->startGlobalAddr + reg->segLen) + { + uint8_t isDuplicate = 0; + for(size_t j = 0; j < dispatchRegsCnt; j++) + { + if(dispatchRegs[j] == reg) + { + isDuplicate = 1; + break; + } + } + if(!isDuplicate) + { + dispatchRegs[dispatchRegsCnt] = reg; + dispatchRegsCnt++; + } + } + } + } + } + + // for(size_t i = 0; i < dispatchRegsCnt; i++) + // { + // dispatchStreamSegment(emulContext, dispatchReadRegs[i], devMem); + // } + + + + // dispatchMemAccessNotifications(emulContext, emulContext->deviceStreamRegs[di], devMem, devMem->memreadCellAddrs, (size_t)devMem->memreadLen, STREAM_MODE_READ); + devMem->memreadLen = 0; + } + + + for(size_t i = 0; i < dispatchRegsCnt; i++) + { + dispatchStreamSegment(emulContext, dispatchRegs[i], devMem); + } + } + } +} + +static void tickDevices(EmulContext* emulContext, uint64_t clockCounter) +{ + for (size_t di = 0; di < emulContext->devicesCount; di++) + { + device_handle_t* dev = (device_handle_t*)emulContext->deviceHandles[di]; + + if (clockCounter % dev->clockDivider == 0) + { + if(dev->clockCycleCounter == 0) + { + // printf("clock device %lu\n", di); + // device_mem_t* devMem = dev->ctx->deviceMem; + + dev->clockCycleCounter = dev->lib->makeDeviceTick(dev->ctx); + } + } + } +} + int main(int argc, char** argv) { - uint16_t cli_port = 8181; - const char* cli_host = "localhost"; + struct sigaction action; + memset(&action, 0, sizeof(action)); + action.sa_handler = term; + + sigaction(SIGTERM, &action, NULL); + + uint16_t cli_port = 9000; + const char* cli_host = "0.0.0.0"; const char* cli_token = NULL; for (int i = 1; i < argc; i++) { @@ -433,12 +663,17 @@ int main(int argc, char** argv) return 1; } } - char errbuf[1024]; + // char errbuf[1024]; pthread_mutex_t mtx; pthread_mutex_init(&mtx, NULL); - ptQueue* regQ = ptQueueCreate(errbuf); - NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf); + queue_spsc_t regQ; + if(!queue_spsc_init(®Q, 65536)) + { + panic("Unable to create reg q"); + } + // queue_spsc_t* regQ = queue_spsc_init(errbuf); + // NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf); size_t deviceCount = 0; DeviceSegStreamReg** deviceStreamRegs = NULL; @@ -455,10 +690,10 @@ int main(int argc, char** argv) for(size_t i = 0; i < outBufsCount; i++) { - OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 128); + OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 1024); NULL_GUARD(messages); bufs[i].ptr = messages; - bufs[i].allocatedSize = 128; + bufs[i].allocatedSize = 1024; bufs[i].size = 0; for(size_t j = 0; j < bufs[i].allocatedSize; j++) @@ -477,11 +712,18 @@ int main(int argc, char** argv) }; + queue_mpsc_t* inMsgQueue = calloc(1, sizeof(queue_mpsc_t)); + queue_mpsc_init(inMsgQueue, 65536); + + queue_spsc_t outMsgQueue; + queue_spsc_init(&outMsgQueue, 65536); + LinkedListEntry* clientsLinkedListHead = NULL; uint64_t clockCounter = 0; uint64_t tickTarget = 0; uint8_t resetRequest = 0; + uint8_t utilizedFlag = 0; EmulContext emulContext = { @@ -503,13 +745,16 @@ int main(int argc, char** argv) 0, /* simRateLimit */ NULL, /* deviceIdMappingMsg */ 0, /* deviceIdMappingMsgLen */ + inMsgQueue, + &outMsgQueue, }; if (flatcc_builder_init(&emulContext.stream_builder)) { panic("flatcc_builder_init failed\n"); } - static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + // static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + static uint8_t default_token[] = "test_token"; uint8_t* access_token = default_token; /* If --token was given, copy it into a mutable buffer */ uint8_t* cli_token_buf = NULL; @@ -526,7 +771,8 @@ int main(int argc, char** argv) ServerContext ctx = { mtx, - regQ, + ®Q, + inMsgQueue, access_token, &emulContext, }; @@ -535,7 +781,7 @@ int main(int argc, char** argv) pthread_attr_t outAttr; pthread_attr_init(&outAttr); - pthread_create(&outTid, &outAttr, outgoingMain, &outBufs); + pthread_create(&outTid, &outAttr, outgoingMain, &emulContext); pthread_detach(outTid); @@ -550,22 +796,24 @@ int main(int argc, char** argv) .evs.onmessage = &onWsMessage }); - ptQueueElem* regQueueTail = regQ->tail; + // ptQueueElem* regQueueTail = regQ->tail; - uint16_t clients_try_timer = 1000; + uint16_t clients_try_timer = 10000; uint64_t lastTickAt = getCurrentUsec(); - uint64_t lastTickCountWindowAt = getCurrentUsec(); - uint64_t lastTickCounter = 0; + // uint64_t lastTickCountWindowAt = getCurrentUsec(); + // uint64_t lastTickCounter = 0; while(1) { - ClientRegistrationEvent* payload = regQueueTail->payload; + ClientRegistrationEvent* payload = NULL; + queue_spsc_pop(®Q, (void**)&payload); + // ClientRegistrationEvent* payload = regQueueTail->payload; if(payload != NULL) { - printf("Got reg queue data\n"); + // printf("Got reg queue data\n"); handleRegEvent(&emulContext, payload); - regQueueTail = regQueueTail->nextEl; + // regQueueTail = regQueueTail->nextEl; utilizedFlag = 1; } @@ -573,38 +821,19 @@ int main(int argc, char** argv) if (clients_try_timer == 0 || emulState != EMUL_STATE_EXEC) { - if(getCurrentUsec() - lastTickCountWindowAt > 1000000) - { + // if(getCurrentUsec() - lastTickCountWindowAt > 1000000) + // { // uint64_t dtimeUs = getCurrentUsec() - lastTickCountWindowAt; // lastTickCountWindowAt = getCurrentUsec(); // uint64_t dtick = clockCounter - lastTickCounter; // lastTickCounter = clockCounter; // double rate = ((double)dtick) / (((double)dtimeUs) / 1000000); // printf("clock rate: %f\n", rate); - } + // } + clients_try_timer = 10000; handleAllClients(&emulContext); - clients_try_timer = 1000; - uint8_t readReqIdx = atomic_load(&outBufs.readRequestIdx); - // uint8_t readReqIdx = outBufs.readRequestIdx; - if(readReqIdx == outBufs.currWritingIdx || outBufs.bufs[outBufs.currWritingIdx].size >= outBufs.bufs[outBufs.currWritingIdx].allocatedSize / 2) - { - uint8_t newWriteIdx = outBufs.currWritingIdx + 1; - if(outBufs.bufs[outBufs.currWritingIdx].size != 0 || readReqIdx != newWriteIdx) - { - if(newWriteIdx >= outBufs.buffersCount) - { - newWriteIdx = 0; - } - while(readReqIdx == newWriteIdx) - { - my_sleep(100); - readReqIdx = atomic_load(&outBufs.readRequestIdx); - } - atomic_store(&outBufs.currWritingIdx, newWriteIdx); - outBufs.bufs[outBufs.currWritingIdx].size = 0; - } - } + // outbufMaintence(&emulContext); } else { @@ -613,15 +842,29 @@ int main(int argc, char** argv) if(resetRequest) { - for(size_t di = 0; di < emulContext.devicesCount; di++) + if(emulState != EMUL_STATE_STILL) { - device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di]; - dev->lib->reset(dev->specs, dev->ctx); - dev->clockCycleCounter = 0; - dev->clockCycleLimit = 0; + printf("running full reset\n"); + for(size_t di = 0; di < emulContext.devicesCount; di++) + { + device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di]; + dev->lib->reset(dev->specs, dev->ctx); + dev->clockCycleCounter = 0; + dev->clockCycleLimit = 0; + } + + emulState = EMUL_STATE_STILL; + printf("[RESET/EXEC] state -> %u\n", emulState); + + size_t msg_len; + uint8_t* out = fb_build_exec_notify(0, 0, emulState, &msg_len); + broadcastClients(&emulContext, out, msg_len); } clockCounter = 0; resetRequest = 0; + + + } @@ -647,104 +890,11 @@ int main(int argc, char** argv) } } - for(size_t di = 0; di < emulContext.devicesCount; di++) - { - device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di]; - device_mem_t* devMem = dev->ctx->deviceMem; - if(dev->clockCycleCounter == 0) - { + // printf("tick %lu\n", clockCounter); - if(devMem->memwriteLen > 0) - { - // printf("device %lu has %d writes\n", di, devMem->memwriteLen); - uint64_t globalWriteAddrs[MEM_ACCESS_INTERCEPT_BUF_SIZE] = {0}; - for(size_t i = 0; i < devMem->memwriteLen; i++) - { - const uint8_t seg = devMem->memwriteCellSegments[i]; - const uint8_t wordLen = devMem->memwriteWordLengths[i]; - const uint64_t addr = devMem->memwriteCellAddrs[i]; - void* val = devMem->memwriteValues[i]; + commitDevMem(&emulContext); - // const uint64_t segLen = devMem->memsegSizes[seg]; - - // // device_specs_t* spec = dev->specs; - // // spec - - // if(addr >= segLen) - // { - // printf("write out of bounds of segment of len %lu: [%d].%lu\n", segLen, seg, addr); - // emulState = EMUL_STATE_PAUSE; - // } - // else - { - switch(wordLen) - { - case 1: - { - ((uint8_t*)(devMem->cells[seg]))[addr] = *((uint8_t*)val); - // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint8_t*)val, seg, addr); - break; - } - case 2: - { - ((uint16_t*)(devMem->cells[seg]))[addr] = *((uint16_t*)val); - // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint16_t*)val, seg, addr); - break; - } - case 4: - { - ((uint32_t*)(devMem->cells[seg]))[addr] = *((uint32_t*)val); - // printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint32_t*)val, seg, addr); - break; - } - case 8: - { - ((uint64_t*)(devMem->cells[seg]))[addr] = *((uint64_t*)val); - // printf("[DEV/WRITE] %lu -> [%d].%lu\n", *(uint64_t*)val, seg, addr); - break; - } - default: - { - printf("invalid word size: %d\n", wordLen); - } - } - } - - globalWriteAddrs[i] = devMem->memsegShifts[seg] + addr; - - } - - dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[di], devMem, globalWriteAddrs, (size_t)devMem->memwriteLen, STREAM_MODE_WRITE); - devMem->memwriteLen = 0; - } - else - { - // printf("device %lu has no writes\n", di); - } - if(devMem->memreadLen > 0) - { - dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[di], devMem, devMem->memreadCellAddrs, (size_t)devMem->memreadLen, STREAM_MODE_READ); - devMem->memreadLen = 0; - } - } - } - - - for (size_t di = 0; di < emulContext.devicesCount; di++) - { - device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di]; - - if (clockCounter % dev->clockDivider == 0) - { - if(dev->clockCycleCounter == 0) - { - // printf("clock device %lu\n", di); - // device_mem_t* devMem = dev->ctx->deviceMem; - - dev->clockCycleCounter = dev->lib->makeDeviceTick(dev->ctx); - } - } - } + tickDevices(&emulContext, clockCounter); clockCounter++; @@ -764,14 +914,24 @@ int main(int argc, char** argv) } else if(!utilizedFlag) { + // if(emulState != EMUL_STATE_EXEC) + // { + // my_sleep(100000); + // } // my_sleep(1000); } + if(done) + { + break; + } } atomic_store(&outBufs.currWritingIdx, 0xFF); pthread_join(outTid, NULL); + printf("bye!\n"); + return 0; diff --git a/src/proto/dial.c b/src/proto/dial.c index 3f6031b..1de2c1a 100644 --- a/src/proto/dial.c +++ b/src/proto/dial.c @@ -1,11 +1,13 @@ #include "proto/dial.h" +#include #include #include #include "context.h" #include "panic.h" #include "proto/msg.h" +#include "ptQueue/inc/spsc.h" static uint64_t dial_now_us(void) { @@ -25,7 +27,7 @@ void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) memcpy(newMsg, msg, msgLen); ClientContext* ctx = clientsHead->payload; - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, newMsg, msgLen); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, newMsg, msgLen); clientsHead = clientsHead->nextEntry; } free(msg); @@ -35,21 +37,25 @@ void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) -void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen) +void dispatchOutgoingMessage(queue_spsc_t* outMsgQueue, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen) { - SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx]; - if(p->size + 1 >= p->allocatedSize) - { - // printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx); - OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 5); - NULL_GUARD(newPtr); - p->ptr = newPtr; - p->allocatedSize = p->allocatedSize * 5; - } - OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size]; + // OutgoingBuffers* outBufs = emulContext->outBufs; + // SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx]; + // if(p->size + 1 >= p->allocatedSize) + // { + // // printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx); + // OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 5); + // NULL_GUARD(newPtr); + // p->ptr = newPtr; + // p->allocatedSize = p->allocatedSize * 5; + // } + OutgoingMessage* outmsg = malloc(sizeof(OutgoingMessage)); + // OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size]; + NULL_GUARD(outmsg); outmsg->msg = msg; outmsg->msgLen = msgLen; outmsg->clientIdx = clientIdx; outmsg->dispatch_us = dial_now_us(); - p->size++; + queue_spsc_push(outMsgQueue, outmsg); + // p->size++; } \ No newline at end of file diff --git a/src/proto/handlers.c b/src/proto/handlers.c index 531bf3c..e48d98a 100644 --- a/src/proto/handlers.c +++ b/src/proto/handlers.c @@ -13,11 +13,13 @@ #include "proto_reader.h" #include "proto_verifier.h" #include "control_reader.h" +#include "ptQueue/inc/mpsc.h" +#include "ptQueue/inc/spsc.h" #include "stream_reader.h" #include "mem_reader.h" -void handleCloseClient(EmulContext* emulContext, ClientContext* ctx) +static void handleCloseClient(EmulContext* emulContext, ClientContext* ctx) { // if (ctx->streamRegIterator > 0) { unregisterClientStreams(emulContext, ctx); @@ -113,35 +115,77 @@ static void handleIncomingMessage( void handleAllClients(EmulContext* emulContext) { - LinkedListEntry* clientEntry = *emulContext->clientsHead; - size_t handleLimit = 128; - while (clientEntry != NULL && handleLimit > 0) { - handleLimit--; - ClientContext* ctx = clientEntry->payload; + queue_mpsc_t* inMsgQueue = emulContext->inMsgQueue; - if (!ctx->isAuthed) { - clientEntry = disconnectDueTimeout(emulContext, clientEntry); - if (clientEntry == NULL) break; - if (*emulContext->utilizedFlag) continue; - } else { - FbMessage* fbmsg = ctx->incomeQ->head->payload; - if (fbmsg != NULL) { + FbMessage* fbmsg = NULL; + + for(size_t handleLimit = 0; handleLimit < 32; handleLimit++) + { + fbmsg = NULL; + if(queue_mpsc_pop(inMsgQueue, (void**)&fbmsg)) + { + if(fbmsg != NULL) + { *emulContext->utilizedFlag = 1; // if (hmmmm_ClientMessage_verify_as_root(fbmsg->data, fbmsg->size) == 0) { hmmmm_ClientMessage_table_t cm = hmmmm_ClientMessage_as_root(fbmsg->data); - handleIncomingMessage(cm, ctx, emulContext); + // printf("read message len %lu\n", fbmsg->size); + handleIncomingMessage(cm, fbmsg->ctx, emulContext); // } else { - // printf("client %lu: dropped malformed FlatBuffer\n", ctx->clientId); + // printf("client %lu: dropped malformed FlatBuffer\n", fbmsg->ctx->clientId); // } free(fbmsg->data); free(fbmsg); - ctx->incomeQ->head = ctx->incomeQ->head->nextEl; + } - clientEntry = clientEntry->nextEntry; + else + { + break; + } + } + else + { + break; } } + + + // LinkedListEntry* clientEntry = *emulContext->clientsHead; + // size_t handleLimit = 128; + + // while (clientEntry != NULL && handleLimit > 0) { + // handleLimit--; + // ClientContext* ctx = clientEntry->payload; + + // if (!ctx->isAuthed) { + // clientEntry = disconnectDueTimeout(emulContext, clientEntry); + // if (clientEntry == NULL) break; + // if (*emulContext->utilizedFlag) continue; + // } else { + // FbMessage* fbmsg = NULL; + // queue_spsc_pop(ctx->incomeQ, (void**)&fbmsg); + // // FbMessage* fbmsg = ctx->incomeQ->head->payload; + // if (fbmsg != NULL) { + // *emulContext->utilizedFlag = 1; + + // // if (hmmmm_ClientMessage_verify_as_root(fbmsg->data, fbmsg->size) == 0) { + // printf("read message len %lu\n", fbmsg->size); + // hmmmm_ClientMessage_table_t cm = + // hmmmm_ClientMessage_as_root(fbmsg->data); + // handleIncomingMessage(cm, ctx, emulContext); + // // } else { + // // printf("client %lu: dropped malformed FlatBuffer\n", ctx->clientId); + // // } + + // free(fbmsg->data); + // free(fbmsg); + // // ctx->incomeQ->head = ctx->incomeQ->head->nextEl; + // } + // clientEntry = clientEntry->nextEntry; + // } + // } } diff --git a/src/proto/handlers/auth.c b/src/proto/handlers/auth.c index 68912a2..1d2fc60 100644 --- a/src/proto/handlers/auth.c +++ b/src/proto/handlers/auth.c @@ -16,6 +16,7 @@ #include "proto_reader.h" #include "proto_verifier.h" #include "auth_reader.h" +#include "ptQueue/inc/spsc.h" static uint8_t validateAccessTokenDeterministic( @@ -99,10 +100,12 @@ uint8_t handle_auth( with_lock(&ctx->registerMutex) { printf("Writing auth event\n"); - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if (exitCode) { - panic("Unable to push to reg queue: %s\n", errbuf); - } + // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + + queue_spsc_push(ctx->regQueue, ev); + // if (!queue_spsc_push(ctx->regQueue, ev)) { + // panic("Unable to push to reg queue: %s\n", errbuf); + // } } return 1; } @@ -135,13 +138,13 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext) // Send AuthResponse with assigned seat_id size_t len = 0; uint8_t* authResp = fb_build_auth_response(UINT64_MAX, cctx->clientId, &len); - dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, authResp, len); + dispatchOutgoingMessage(emulContext->outMsgQueue, cctx->clientId, authResp, len); // Send current execution state len = 0; uint8_t* stateMsg = fb_build_exec_notify( UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len); - dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, stateMsg, len); + dispatchOutgoingMessage(emulContext->outMsgQueue, cctx->clientId, stateMsg, len); // Send cached DeviceIdMappingNotif if a config has been loaded if (emulContext->deviceIdMappingMsg && emulContext->deviceIdMappingMsgLen > 0) { @@ -149,7 +152,7 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext) if (mappingCopy) { memcpy(mappingCopy, emulContext->deviceIdMappingMsg, emulContext->deviceIdMappingMsgLen); dispatchOutgoingMessage( - emulContext->outBufs, cctx->clientId, + emulContext->outMsgQueue, cctx->clientId, mappingCopy, emulContext->deviceIdMappingMsgLen); } } diff --git a/src/proto/handlers/config.c b/src/proto/handlers/config.c index 3d1121d..7918a92 100644 --- a/src/proto/handlers/config.c +++ b/src/proto/handlers/config.c @@ -152,6 +152,8 @@ static int load_devices_recursive( return -1; } + dev->lib->reset(dev->specs, dev->ctx); + freeConf(dc); size_t idx = st->count; @@ -243,7 +245,7 @@ static size_t find_device_by_id(LoadState* st, const char* id) // Find a segment index by name in a device static size_t find_seg_by_name(device_handle_t* dev, size_t seg_count, const char* name) { - device_mem_t* mem = dev->ctx->deviceMem; + const device_mem_t* mem = dev->ctx->deviceMem; for (size_t i = 0; i < seg_count; i++) { if (strcmp(mem->memsegNames[i], name) == 0) return i; } @@ -327,7 +329,7 @@ typedef struct { // shadow_replace: read from point device instead of base static void* intercept_replace_read(uint64_t ident, uint64_t addr, void* rawCells) { - intercept_ctx_t* ctx = (intercept_ctx_t*)(uintptr_t)ident; + const intercept_ctx_t* ctx = (intercept_ctx_t*)(uintptr_t)ident; return ctx->point_cell; } @@ -536,13 +538,13 @@ void handleConfigCtrlMessage( char errbuf[1024]; // Block config load only while actively executing (ticking) - if (*emulContext->emulState == EMUL_STATE_EXEC) { - snprintf(errbuf, 1024, "stop or pause the emulator before loading config"); + if (*emulContext->emulState != EMUL_STATE_STILL) { + snprintf(errbuf, 1024, "reset emulator before loading config, current state is %d", *emulContext->emulState); printf("[CTRL/CONFIG] error: %s\n", errbuf); size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -555,7 +557,7 @@ void handleConfigCtrlMessage( size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -569,7 +571,7 @@ void handleConfigCtrlMessage( size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -597,7 +599,7 @@ void handleConfigCtrlMessage( size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -616,7 +618,7 @@ void handleConfigCtrlMessage( size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -634,7 +636,7 @@ void handleConfigCtrlMessage( size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -671,7 +673,7 @@ void handleConfigCtrlMessage( snprintf(errbuf, sizeof(errbuf), "failed to allocate device arrays"); size_t msg_len; uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); return; } @@ -700,7 +702,7 @@ void handleConfigCtrlMessage( dsr->regCount = 0; dsr->regs = calloc(4, sizeof(StreamReg)); if (!dsr->regs) - { + { free(dsr); emulContext->deviceStreamRegs[i] = NULL; continue; @@ -709,7 +711,8 @@ void handleConfigCtrlMessage( } // Reset emulation state to STILL and notify all clients - if (*emulContext->emulState != EMUL_STATE_STILL) { + if (*emulContext->emulState != EMUL_STATE_STILL) + { *emulContext->emulState = EMUL_STATE_STILL; size_t notify_len; uint8_t* notify = fb_build_exec_notify(0, *emulContext->clockCounter, EMUL_STATE_STILL, ¬ify_len); @@ -723,7 +726,7 @@ void handleConfigCtrlMessage( for(size_t i = 0; i < emulContext->devicesCount; i++) { - device_handle_t* handl = emulContext->deviceHandles[i]; + const device_handle_t* handl = emulContext->deviceHandles[i]; for(size_t j = 0; j < st.seg_counts[i]; j++) { if(st.seg_names[i][j]) @@ -734,7 +737,7 @@ void handleConfigCtrlMessage( st.seg_names[i][j] = strdup(handl->ctx->deviceMem->memsegNames[j]); } } - *emulContext->resetRequest = 1; + //*emulContext->resetRequest = 1; uint8_t* out = fb_build_config_device_id_mapping( nonce, st.paths, st.path_lens, st.seg_names, st.seg_counts, dc, &msg_len); diff --git a/src/proto/handlers/control.c b/src/proto/handlers/control.c index 4ba21a4..8b14f3c 100644 --- a/src/proto/handlers/control.c +++ b/src/proto/handlers/control.c @@ -63,24 +63,29 @@ void handleIncomingCtrlMessage( return; } - if(state_op == EMUL_STATE_OP_RESET) + if(state_op == EMUL_STATE_OP_RESET && emulContext->emulState != EMUL_STATE_STILL) { *emulContext->resetRequest = 1; } + else + { + uint8_t new_state = switchNewEmulState(*emulContext->emulState, state_op); + *emulContext->emulState = new_state; + printf("[CTRL/EXEC] state -> %u\n", new_state); - uint8_t new_state = switchNewEmulState(*emulContext->emulState, state_op); - *emulContext->emulState = new_state; - printf("[CTRL/EXEC] state -> %u\n", new_state); + if (new_state == EMUL_STATE_EXEC && tick_count > 0) { + *emulContext->tickTarget = *emulContext->clockCounter + tick_count; - if (new_state == EMUL_STATE_EXEC && tick_count > 0) { - *emulContext->tickTarget = *emulContext->clockCounter + tick_count; - } else if (new_state == EMUL_STATE_EXEC) { - *emulContext->tickTarget = 0; + printf("set ticket target at %lu\n", *emulContext->tickTarget); + } else if (new_state == EMUL_STATE_EXEC) { + *emulContext->tickTarget = 0; + printf("reset tick target\n"); + } + + size_t msg_len; + uint8_t* out = fb_build_exec_notify(0, *emulContext->clockCounter, new_state, &msg_len); + broadcastClients(emulContext, out, msg_len); } - - size_t msg_len; - uint8_t* out = fb_build_exec_notify(0, *emulContext->clockCounter, new_state, &msg_len); - broadcastClients(emulContext, out, msg_len); } else if (ptype == hmmmm_ctrl_CtrlClientPayload_SetupBuf) { @@ -96,7 +101,7 @@ void handleIncomingCtrlMessage( size_t msg_len; uint8_t* out = fb_build_setup_buf(nonce, lost_buf_size, lifetime_ticks, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); } else if (ptype == hmmmm_ctrl_CtrlClientPayload_OrphanedRequest) { @@ -104,7 +109,7 @@ void handleIncomingCtrlMessage( size_t msg_len; uint8_t* out = fb_build_orphaned_response(nonce, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); } else if (ptype == hmmmm_ctrl_CtrlClientPayload_ConfigCtrlMessage) { @@ -125,6 +130,6 @@ void handleIncomingCtrlMessage( size_t msg_len; uint8_t* out = fb_build_lost_messages_response(nonce, seat_id, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); } } diff --git a/src/proto/handlers/mem.c b/src/proto/handlers/mem.c index 24e5e0c..acce392 100644 --- a/src/proto/handlers/mem.c +++ b/src/proto/handlers/mem.c @@ -9,7 +9,52 @@ #include "pub/libhmmmm/mem.h" // #define DEVICE_MEM_SIZE ((size_t)(256 * 1024)) +static void printMemory(void* cells, uint64_t cellsCount) +{ + for(uint64_t i = 0; i < cellsCount; i++) + { + printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]); + + // uint8_t wasRead = 0; + // for (uint8_t j = 0; j < mem->memreadLen; j++) + // { + // if(mem->memreadCellAddrs[j] == i) + // { + // wasRead = 1; + // break; + // } + // } + // uint8_t wasWrite = 0; + // for (uint8_t j = 0; j < mem->memwriteLen; j++) + // { + // if(mem->memwriteCellAddrs[j] == i) + // { + // wasWrite = 1; + // break; + // } + // } + + // if (wasRead == 1) + // { + // printf("\t[was read]"); + // } + // else + // { + // printf("\t[was not read]"); + // } + + // if (wasWrite == 1) + // { + // printf("\t[was written]"); + // } + // else + // { + // printf("\t[was not written]"); + // } + printf("\n"); + } +} void handleIncomingMemMessage( hmmmm_mem_MemClientMessage_table_t msg, @@ -20,9 +65,9 @@ void handleIncomingMemMessage( hmmmm_mem_MemClientPayload_union_type_t ptype = hmmmm_mem_MemClientMessage_payload_type(msg); - printf("[MEM] client=%lu nonce=%lu type=%s\n", - ctx->clientId, nonce, - hmmmm_mem_MemClientPayload_type_name(ptype)); + // printf("[MEM] client=%lu nonce=%lu type=%s\n", + // ctx->clientId, nonce, + // hmmmm_mem_MemClientPayload_type_name(ptype)); if (ptype == hmmmm_mem_MemClientPayload_MemReadRequest) { @@ -35,8 +80,8 @@ void handleIncomingMemMessage( uint32_t offset = hmmmm_mem_MemReadRequest_offset(req); uint32_t length = hmmmm_mem_MemReadRequest_length(req); - printf("[MEM/READ] device=%u seg=%u offset=%u len=%u\n", - dev_id, seg_id, offset, length); + // printf("[MEM/READ] device=%u seg=%u offset=%u len=%u\n", + // dev_id, seg_id, offset, length); if (dev_id >= (uint32_t)emulContext->devicesCount) { printf("[MEM/READ] invalid device %u\n", dev_id); @@ -49,18 +94,18 @@ void handleIncomingMemMessage( // printf("[MEM/READ] out of bounds\n"); // return; // } - printf("[MEM/READ] from %d/%d+%d:%d\n", dev_id, seg_id, offset, length); + // printf("[MEM/READ] from %d/%d+%d:%d\n", dev_id, seg_id, offset, length); device_handle_t* handl = emulContext->deviceHandles[dev_id]; const uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; //emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id]; - for(size_t i = 0; i < length; i++) - { - printf("%02X ", (base + offset)[i]); - } - printf("\n"); + // for(size_t i = 0; i < length; i++) + // { + // printf("%02X ", (base + offset)[i]); + // } + // printf("\n"); size_t out_len; uint8_t* out = fb_build_mem_read_response( @@ -68,7 +113,7 @@ void handleIncomingMemMessage( dev_id, seg_id, offset, base + offset, (size_t)length, &out_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, out_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, out_len); } else if (ptype == hmmmm_mem_MemClientPayload_MemWriteRequest) { @@ -82,13 +127,18 @@ void handleIncomingMemMessage( flatbuffers_uint8_vec_t data = hmmmm_mem_MemWriteRequest_data(req); size_t data_len = flatbuffers_uint8_vec_len(data); - printf("[MEM/WRITE] device=%u seg=%u offset=%u len=%zu\n", - dev_id, seg_id, offset, data_len); + // printf("[MEM/WRITE] device=%u seg=%u offset=%u len=%zu\n", + // dev_id, seg_id, offset, data_len); + if (dev_id >= (uint32_t)emulContext->devicesCount) { printf("[MEM/WRITE] invalid device %u\n", dev_id); return; } + if(!data) { + printf("[MEM/WRITE] invalid data\n"); + return; + } // if ((size_t)offset + data_len > DEVICE_MEM_SIZE) { // printf("[MEM/WRITE] out of bounds\n"); // return; @@ -98,16 +148,24 @@ void handleIncomingMemMessage( device_handle_t* handl = emulContext->deviceHandles[dev_id]; - printf("[MEM/WRITE] from %d/%d+%d:%lu\n", dev_id, seg_id, offset, data_len); + // printf("[MEM/WRITE] from %d/%d+%d:%lu\n", dev_id, seg_id, offset, data_len); uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; // emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id]; memcpy(base + offset, data, data_len); + + + // for(size_t i = 0; i < data_len; i++) + // { + // printf("%02X ", (base + offset)[i]); + // } + // printf("\n"); + size_t out_len; uint8_t* out = fb_build_mem_read_response( nonce, *emulContext->clockCounter, dev_id, seg_id, offset, base + offset, data_len, &out_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, out_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, out_len); } } diff --git a/src/proto/handlers/stream.c b/src/proto/handlers/stream.c index 6bc05a2..6574620 100644 --- a/src/proto/handlers/stream.c +++ b/src/proto/handlers/stream.c @@ -14,7 +14,7 @@ // ── Stream registration bookkeeping (unchanged logic) ──────────────────────── -void unregisterClientStream( +static void unregisterClientStream( EmulContext* emulContext, ClientContext* ctx, uint32_t regId) { for (size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) { @@ -127,7 +127,7 @@ void handleIncomingStreamMessage( size_t msg_len; uint8_t* out = fb_build_stream_reg_confirm( nonce, reg_id, dev_id, seg_id, offset, length, (uint8_t)mode, &msg_len); - dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); + dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len); } else if (ptype == hmmmm_stream_StreamClientPayload_StreamDeregRequest) { diff --git a/src/proto/handlers/ws.c b/src/proto/handlers/ws.c index 1f5292b..93f6333 100644 --- a/src/proto/handlers/ws.c +++ b/src/proto/handlers/ws.c @@ -1,4 +1,5 @@ #include "proto/handlers/ws.h" +#include "context.h" #include "proto/handlers.h" #include @@ -11,22 +12,38 @@ #include "proto_verifier.h" #include "proto_reader.h" +#include "ptQueue/inc/spsc.h" +#include "wsServer/include/ws.h" void onWsOpen(ws_cli_conn_t client) { char errbuf[1024]; - ptQueue* incomeQueue = ptQueueCreate(errbuf); - NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf); + queue_spsc_t *incomeQueue = malloc(sizeof(queue_spsc_t)); + // queue_spsc_t incomeQueue, outcomeQueue; + if(incomeQueue == NULL || !queue_spsc_init(incomeQueue, 65536)) + { + panic("Unable to create income queue"); + } + queue_spsc_t *outcomeQueue = malloc(sizeof(queue_spsc_t)); + if(outcomeQueue == NULL || !queue_spsc_init(outcomeQueue, 65536)) + { + panic("Unable to create outcome queue"); + } + + // = ptQueueCreate(errbuf); + // NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf); - ptQueue* outcomeQueue = ptQueueCreate(errbuf); - NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf); + // queue_spsc_t* outcomeQueue = ptQueueCreate(errbuf); + // NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf); ClientContext* cctx = malloc(sizeof(ClientContext)); if (cctx == NULL) { - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); + queue_spsc_destroy(incomeQueue); + queue_spsc_destroy(outcomeQueue); + // ptQueueFree(incomeQueue); + // ptQueueFree(outcomeQueue); panic("Unable to allocate client context\n"); } cctx->clientId = client; @@ -38,8 +55,10 @@ void onWsOpen(ws_cli_conn_t client) ClientRegistrationEvent* ev = malloc(sizeof(ClientRegistrationEvent)); if (ev == NULL) { - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); + queue_spsc_destroy(incomeQueue); + queue_spsc_destroy(outcomeQueue); + // ptQueueFree(incomeQueue); + // ptQueueFree(outcomeQueue); free(cctx); panic("Unable to allocate register event"); } @@ -52,23 +71,27 @@ void onWsOpen(ws_cli_conn_t client) with_lock(&ctx->registerMutex) { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if (exitCode) { - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - free(cctx); - panic("Unable to push to reg queue: %s\n", errbuf); - } + // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + + queue_spsc_push(ctx->regQueue, ev); + // if (!queue_spsc_push(ctx->regQueue, ev)) { + // queue_spsc_destroy(incomeQueue); + // queue_spsc_destroy(outcomeQueue); + // // ptQueueFree(incomeQueue); + // // ptQueueFree(outcomeQueue); + // free(cctx); + // panic("Unable to push to reg queue: %s\n", errbuf); + // } } - char* cli = ws_getaddress(client); + const char* cli = ws_getaddress(client); printf("Connection %lu opened, addr: %s\n", client, cli); } void onWsClose(ws_cli_conn_t client) { - char* cli = ws_getaddress(client); + const char* cli = ws_getaddress(client); printf("Connection %lu closed, addr: %s\n", client, cli); char errbuf[1024]; @@ -91,11 +114,12 @@ void onWsClose(ws_cli_conn_t client) with_lock(&ctx->registerMutex) { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if (exitCode) - { - panic("Unable to push to reg queue: %s\n", errbuf); - } + queue_spsc_push(ctx->regQueue, ev); + // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + // if (!queue_spsc_push(ctx->regQueue, ev)) + // { + // panic("Unable to push to reg queue: %s\n", errbuf); + // } } } @@ -108,6 +132,7 @@ void onWsMessage( panic("Unable to get client context\n"); } + if (!cctx->isAuthed) { handle_auth(cctx, client, msg, size, type); return; @@ -138,13 +163,20 @@ void onWsMessage( panic("Unable to allocate FbMessage buffer\n"); } fbmsg->size = (size_t)size; + fbmsg->ctx = cctx; memcpy(fbmsg->data, msg, (size_t)size); char errbuf[1024]; - int isErr = ptQueuePush(cctx->incomeQ, fbmsg, errbuf); - if (isErr) { - free(fbmsg->data); - free(fbmsg); - panic("Unable to queue client message: %s\n", errbuf); - } + // int isErr = ptQueuePush(cctx->incomeQ, fbmsg, errbuf); + ServerContext* sctx = ws_get_server_context(client); + + // printf("got message with len %lu\n", size); + + queue_mpsc_push(sctx->inMsgQueue, fbmsg); + // if (!queue_spsc_push(cctx->incomeQ, fbmsg)) { + // if (!queue_mpsc_push(sctx->inMsgQueue, fbmsg)) { + // free(fbmsg->data); + // free(fbmsg); + // panic("Unable to queue client message: %s\n", errbuf); + // } }