From 165a96aa3c9b2cc4e72c02367a649addaea290c6 Mon Sep 17 00:00:00 2001 From: nikto_b Date: Fri, 10 Oct 2025 03:19:45 +0300 Subject: [PATCH] Impl basic ws binary proto with data and execution state manipulation --- Makefile | 34 +- hmmmm_scripts | 1 - inc/base_device.h | 21 - inc/client.h | 26 + inc/compose_device.h | 41 -- inc/config.h | 36 - inc/context.h | 46 ++ inc/events.h | 13 + inc/hmmmm.h | 33 - inc/libdevice.h | 37 - inc/libhmmmm.h | 14 - inc/libmem.h | 52 -- inc/linkedlist.h | 13 + inc/panic.h | 42 ++ inc/proto/dial.h | 11 + inc/proto/enums.h | 37 + inc/proto/handlers.h | 11 + inc/proto/handlers/auth.h | 11 + inc/proto/handlers/control.h | 10 + inc/proto/handlers/mem.h | 10 + inc/proto/handlers/stream.h | 12 + inc/proto/handlers/ws.h | 10 + inc/proto/msg.h | 29 + inc/proto/pack.h | 30 + inc/sized_ptr.h | 13 + inc/state.h | 20 + inc/streamed.h | 27 + src/base_device.c | 276 ------- src/compose_device.c | 529 -------------- src/config.c | 86 --- src/hmmmm.c | 238 ------ src/linkedlist.c | 49 ++ src/main.c | 1323 +++++++++++----------------------- src/panic.c | 21 + src/proto/dial.c | 46 ++ src/proto/handlers.c | 199 +++++ src/proto/handlers/auth.c | 143 ++++ src/proto/handlers/control.c | 199 +++++ src/proto/handlers/mem.c | 84 +++ src/proto/handlers/stream.c | 124 ++++ src/proto/handlers/ws.c | 156 ++++ src/proto/msg.c | 107 +++ src/proto/pack.c | 57 ++ src/state.c | 57 ++ 44 files changed, 2065 insertions(+), 2269 deletions(-) delete mode 160000 hmmmm_scripts delete mode 100644 inc/base_device.h create mode 100644 inc/client.h delete mode 100644 inc/compose_device.h delete mode 100644 inc/config.h create mode 100644 inc/context.h create mode 100644 inc/events.h delete mode 100644 inc/hmmmm.h delete mode 100644 inc/libdevice.h delete mode 100644 inc/libhmmmm.h delete mode 100644 inc/libmem.h create mode 100644 inc/linkedlist.h create mode 100644 inc/panic.h create mode 100644 inc/proto/dial.h create mode 100644 inc/proto/enums.h create mode 100644 inc/proto/handlers.h create mode 100644 inc/proto/handlers/auth.h create mode 100644 inc/proto/handlers/control.h create mode 100644 inc/proto/handlers/mem.h create mode 100644 inc/proto/handlers/stream.h create mode 100644 inc/proto/handlers/ws.h create mode 100644 inc/proto/msg.h create mode 100644 inc/proto/pack.h create mode 100644 inc/sized_ptr.h create mode 100644 inc/state.h create mode 100644 inc/streamed.h delete mode 100644 src/base_device.c delete mode 100644 src/compose_device.c delete mode 100644 src/config.c delete mode 100644 src/hmmmm.c create mode 100644 src/linkedlist.c create mode 100644 src/panic.c create mode 100644 src/proto/dial.c create mode 100644 src/proto/handlers.c create mode 100644 src/proto/handlers/auth.c create mode 100644 src/proto/handlers/control.c create mode 100644 src/proto/handlers/mem.c create mode 100644 src/proto/handlers/stream.c create mode 100644 src/proto/handlers/ws.c create mode 100644 src/proto/msg.c create mode 100644 src/proto/pack.c create mode 100644 src/state.c diff --git a/Makefile b/Makefile index f568845..dc82aca 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,12 @@ +OPENSSL_INCLUDE ?= /usr/include/openssl/ BUILD_DIR=out SRC_DIR=src INC_DIR=inc CC=gcc OBJDUMP=objdump LIBS=deps/tomlc99/libtoml.a deps/ptQueue/out/ptQueue.a deps/wsServer/libws.a -LIBS_HEADERS=deps/ +LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE) +STATIC_LIBS=crypto STANDART=c23 OPTIMIZE=-Og TARGET=main @@ -12,38 +14,41 @@ TARGET=main DEFINES=#-DOPCODE_WORDSIZE=2 -DMEM_CELL_WORDS=1 -DPC_WORDSIZE=2 -DGP_REG_CELL_WORDS=1 -DIO_REG_CELL_WORDS=1 -C_SOURCES=$(wildcard $(SRC_DIR)/*.c) -# C_SOURCES=$(wildcard $(SRC_DIR)/*.c) -C_HEADERS=$(wildcard $(INC_DIR)/*.h) +C_SOURCES := $(shell find $(SRC_DIR) -type f -name '*.c') C_INCLUDES=-I$(INC_DIR)/ $(addprefix -I,$(LIBS_HEADERS)) C_DEFS=-D_POSIX_C_SOURCE=199309L -DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith +DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop PEDANTIC_FLAGS=-pedantic -pedantic-errors $(DISABLE_FLAGS) -Wall -Wcast-align -Wcast-qual -Wconversion -Wduplicated-branches -Wduplicated-cond -Werror -Wextra -Wfloat-equal -Wlogical-op -Wpedantic -Wredundant-decls -Wsign-conversion ANALYZER_FLAGS=-fanalyzer -fdiagnostics-show-option -fdiagnostics-color=always LSECTIONS=-ffunction-sections -fdata-sections -Wl,--gc-sections -CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) -flto -fuse-linker-plugin $(LSECTIONS) -lm +CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP +STATICLIBS_FLAGS=$(addprefix -l,$(STATIC_LIBS)) +LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -flto -fuse-linker-plugin $(LSECTIONS) -lm -OBJECTS = $(addprefix $(BUILD_DIR)/,$(notdir $(C_SOURCES:.c=.o))) $(LIBS) +OBJECTS = $(patsubst $(SRC_DIR)/%.c, $(BUILD_DIR)/%.o, $(C_SOURCES)) + +DEP = $(filter %.d, $(OBJECTS:.o=.d)) +$(info $(DEP)) + +OBJECTS += $(LIBS) vpath %.c $(sort $(dir $(C_SOURCES))) -vpath %.h $(sort $(dir $(C_HEADERS))) - -# $(info SOURCES=$(C_SOURCES)) -# $(info HEADERS=$(C_HEADERS)) -# $(info OBJECTS=$(OBJECTS)) all: build -build: date deps Dir $(C_HEADERS) target compile_commands +build: date deps Dir target compile_commands rebuild: clean | build +$(info $(DEP)) + +-include $(DEP) + $(BUILD_DIR)/%.o: %.c Makefile | $(BUILD_DIR) @echo -e '\033[1;32mCC\t'$<'\t->\t'$@'\033[0m' @$(CC) -c $(CFLAGS) -Wa,-a,-ad,-alms=$(BUILD_DIR)/$(notdir $(<:.c=.lst)) $< -o $@ @@ -62,6 +67,7 @@ deps: BuildDir: @mkdir -p $(BUILD_DIR) + $(shell mkdir -p $(dir $(OBJECTS))) SrcDir: @mkdir -p $(SRC_DIR) diff --git a/hmmmm_scripts b/hmmmm_scripts deleted file mode 160000 index 9cc7b36..0000000 --- a/hmmmm_scripts +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9cc7b36cbbfbd6324874069add330fcd60839f54 diff --git a/inc/base_device.h b/inc/base_device.h deleted file mode 100644 index f861185..0000000 --- a/inc/base_device.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef __BASE_DEVICE_H__ -#define __BASE_DEVICE_H__ - -#include "hmmmm.h" - -typedef struct { - void* specs; - device_lib_t* lib; - device_public_context_t* ctx; - uint64_t clockCycleLimit; - uint64_t clockCycleCounter; -} device_handle_t; - - -device_handle_t* openBaseDeviceFromConfig(const char* configPath, char* errbuf); -device_handle_t* openBaseDevice(conf_dev_t* devConf, char* errbuf); -conf_dev_t* openBaseDeviceConfig(const char* configPath, char* errbuf); -void closeBaseDevice(device_handle_t* devHandle); - - -#endif // ifndef __BASE_DEVICE_H__ diff --git a/inc/client.h b/inc/client.h new file mode 100644 index 0000000..9118294 --- /dev/null +++ b/inc/client.h @@ -0,0 +1,26 @@ +#ifndef __CLIENT_H__ +#define __CLIENT_H__ + +#include +#include + +#include "sized_ptr.h" + +#include "wsServer/include/ws.h" +#include "ptQueue/inc/ptQueue.h" + +typedef struct { + ws_cli_conn_t clientId; + uint64_t seatId; + uint8_t isAuthed; + ptQueue* incomeQ; + // ptQueue* outcomeQ; + uint64_t connectedAt; + uint32_t streamRegIterator; + uint64_t orphanedAt; + uint64_t orphanedDeadTimeout; + SizedPtr* fallbackOutcomeQ; + size_t fallbackOutcomeQPadding; +} ClientContext; + +#endif //ifndef __CLIENT_H__ \ No newline at end of file diff --git a/inc/compose_device.h b/inc/compose_device.h deleted file mode 100644 index 3223d6d..0000000 --- a/inc/compose_device.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef __COMPOSE_DEVICE_H__ -#define __COMPOSE_DEVICE_H__ -#include "base_device.h" - -typedef struct { - uint64_t limiter; -} clock_conf_t; - -typedef struct { - char** baseAt; - char* baseSeg; - char** target; - uint64_t projectionShift; - -} projection_conf_t; - -typedef struct { - - char** baseAt; - char** pointAt; - uint64_t addr; - char* seg; - -} intercept_conf_t; - - -typedef struct { - clock_conf_t clockConf; - conf_dev_t** baseConfigs; - projection_conf_t** projections; - intercept_conf_t** intercepts; -} compose_dev_conf_t; - -typedef struct { - device_handle_t** devHandlers; -} compose_dev_handle_t; - -compose_dev_conf_t* openComposeDeviceConfig(const char* configPath, char* errbuf); -device_handle_t** openComposeDevice(compose_dev_conf_t* conf, char* errbuf); - -#endif // ifndef __COMPOSE_DEVICE_H__ \ No newline at end of file diff --git a/inc/config.h b/inc/config.h deleted file mode 100644 index e08df2f..0000000 --- a/inc/config.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef __HMMMM_CONFIG_H__ -#define __HMMMM_CONFIG_H__ -#include -#include - - -typedef struct { - char* name; - size_t start; - size_t len; - uint8_t wordLen; - uint8_t isExecutable; -} conf_mem_seg_t; - -typedef struct { - conf_mem_seg_t** memSegConfs; -} conf_mem_t; - -typedef struct { - char** id; - char** clockId; - uint64_t clockDivider; - uint64_t clockMultipler; - conf_mem_t* memConf; - char* libPath; -} conf_dev_t; - - - -void freeMemSegConf(conf_mem_seg_t* memSegConf); -void freeMemConf(conf_mem_t* memConf); -void freeConf(conf_dev_t* conf); -void freeComposeId(char** id); -uint8_t compareComposeId(char** idA, char** idB); - -#endif // ifndef __HMMMM_CONFIG_H__ diff --git a/inc/context.h b/inc/context.h new file mode 100644 index 0000000..3e0373b --- /dev/null +++ b/inc/context.h @@ -0,0 +1,46 @@ +#ifndef __CONTEXT_H__ +#define __CONTEXT_H__ + +#include + +#include "ptQueue/inc/ptQueue.h" +#include "wsServer/include/ws.h" + +#include "linkedlist.h" +#include "sized_ptr.h" + +#include "streamed.h" + +typedef struct { + SizedPtr* bufs; + uint8_t buffersCount; + _Atomic (uint8_t) readRequestIdx; + _Atomic (uint8_t) currWritingIdx; +} OutgoingBuffers; + + + + +typedef struct { + uint8_t* emulState; + uint64_t* clockCounter; + LinkedListEntry** clientsHead; + uint8_t* utilizedFlag; + OutgoingBuffers* outBufs; + DeviceSegStreamReg** deviceStreamRegs; + uint8_t** devicesMem; + size_t devicesCount; +} EmulContext; + + +typedef struct { + pthread_mutex_t registerMutex; + ptQueue* regQueue; + uint8_t* accessToken; + EmulContext* emulContext; + _Atomic (uint64_t) seatCounter; +} ServerContext; + + + +#endif //ifndef __CONTEXT_H__ \ No newline at end of file diff --git a/inc/events.h b/inc/events.h new file mode 100644 index 0000000..390b47d --- /dev/null +++ b/inc/events.h @@ -0,0 +1,13 @@ +#ifndef __EVENTS_H__ +#define __EVENTS_H__ + +#include +#include "context.h" + +typedef struct { + uint8_t regType; + ClientContext* ctx; +} ClientRegistrationEvent; + + +#endif //ifndef __EVENTS_H__ \ No newline at end of file diff --git a/inc/hmmmm.h b/inc/hmmmm.h deleted file mode 100644 index 03a2864..0000000 --- a/inc/hmmmm.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef __HMMMMM__ -#define __HMMMMM__ -#include "libhmmmm.h" -#include "config.h" - -typedef struct { - void* (*extractPcounterPtr)(device_public_context_t* devContext); - - size_t (*extractPcounter)(device_public_context_t* devContext); - size_t (*extractOpcode)(device_mem_t* devMem, size_t _programCounter); - uint8_t (*extractPcounterSizeWords)(); - - -} instruction_simul_handlers_t; - -typedef struct { - uint8_t devType; - device_public_context_t* (*init)(void*, char* errbuf); - uint8_t (*makeDeviceTick)(device_public_context_t* devInfo); - void* extendedHandlers; - device_public_context_t* devContext; - void* _dlhandl; - void* (*parseSpecsFromConfig)(const conf_dev_t* devConf, char* errbuf); - void (*freeSpecs)(void* specs); - void (*freeDevMem)(device_mem_t* mem); - void (*fillSmartReadSpecs)(void* specs, smart_read_spec_t* smartReadSpecs, uint64_t smartReadSpecsCount); - void (*fillSmartWriteSpecs)(void* specs, smart_write_spec_t* smartWriteSpecs, uint64_t smartWriteSpecsCount); -} device_lib_t; - -device_lib_t* loadDeviceLib(const char *libpath, char* errbuf); - - -#endif // ifndef __HMMMMM__ \ No newline at end of file diff --git a/inc/libdevice.h b/inc/libdevice.h deleted file mode 100644 index 93c5ca9..0000000 --- a/inc/libdevice.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef __LIBDEVICE_H__ -#define __LIBDEVICE_H__ - -#include -#include "libmem.h" - -#define SMART_ADDR_TYPE_GLOBAL 1 -#define SMART_ADDR_TYPE_SEGMENTED 2 -typedef struct -{ - uint64_t addr; - uint8_t segno; - uint64_t localAddr; - uint8_t addrType; - mem_h_read_func* handler; - uint64_t ident; -} smart_read_spec_t; - -typedef struct -{ - uint64_t addr; - uint8_t segno; - uint64_t localAddr; - uint8_t addrType; - mem_h_write_func handler; - uint64_t ident; -} smart_write_spec_t; - -typedef struct { - device_mem_t* deviceMem; - void* deviceInfo; -} device_public_context_t; - - - -#endif // ifndef __LIBDEVICE_H__ - diff --git a/inc/libhmmmm.h b/inc/libhmmmm.h deleted file mode 100644 index 4fc30a7..0000000 --- a/inc/libhmmmm.h +++ /dev/null @@ -1,14 +0,0 @@ -#ifndef __LIB_HMMMMM__ -#define __LIB_HMMMMM__ - -#include "libmem.h" -#include "libdevice.h" - -#define EXTENDED_DEVICE_TYPE_DUMMY 0 -#define EXTENDED_DEVICE_TYPE_INSTR_SIMUL 1 - - - - - -#endif // ifndef __LIB_HMMMMM__ \ No newline at end of file diff --git a/inc/libmem.h b/inc/libmem.h deleted file mode 100644 index 72f7965..0000000 --- a/inc/libmem.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef __LIBMEM_H__ -#define __LIBMEM_H__ - -#include -#include - -#define GET_BIT(n, b) ((n >> b) & 1) - -// Internal mem handlers -typedef void* (*mem_h_read_func)(uint64_t ident, uint64_t addr, void* rawCells); -typedef void (*mem_h_write_func)(uint64_t ident, uint64_t addr, void* rawCells, void* data); - -typedef struct { - mem_h_read_func func; - uint64_t ident; -} mem_h_read_handler; -typedef struct { - mem_h_write_func func; - uint64_t ident; -} mem_h_write_handler; - -typedef struct -{ - void* rawCells; - void** cells; - uint64_t smartAddrReadMask; - uint64_t smartAddrWriteMask; - mem_h_read_handler* smartAddrReadHandlers; - mem_h_write_handler* smartAddrWriteHandlers; - char** memsegNames; - uint64_t* memsegShifts; - uint64_t* memreadCellAddrs; - uint64_t* memwriteCellAddrs; - uint8_t memreadLen; - uint8_t memwriteLen; -} device_mem_t; - - -typedef struct -{ - uint64_t start; - uint64_t len; - uint8_t wordLen; - char* name; -} memseg_spec_t; - - -// External handlers -typedef void* (*ext_h_read_func)(uint64_t addr, void* rawCells, void* devContext); -typedef void (*ext_h_write_func)(uint64_t addr, void* rawCells, void* data, void* devContext); - -#endif // ifndef __LIBMEM_H__ diff --git a/inc/linkedlist.h b/inc/linkedlist.h new file mode 100644 index 0000000..b2797c2 --- /dev/null +++ b/inc/linkedlist.h @@ -0,0 +1,13 @@ +#ifndef __LINKEDLIST_H__ +#define __LINKEDLIST_H__ + + +typedef struct LinkedListEntry { + struct LinkedListEntry* prevEntry; + struct LinkedListEntry* nextEntry; + void* payload; +} LinkedListEntry; + +void removeLinkedListEntry(LinkedListEntry** head, LinkedListEntry* entry); + +#endif //ifndef __LINKEDLIST_H__ diff --git a/inc/panic.h b/inc/panic.h new file mode 100644 index 0000000..8425f70 --- /dev/null +++ b/inc/panic.h @@ -0,0 +1,42 @@ +#ifndef __PANIC_H__ +#define __PANIC_H__ + +#include +#include +#include + +#ifndef PANIC_FD +#define PANIC_FD stderr +#endif //ifndef PANIC_FD + +#ifndef PANIC_STRACE_LEN +#define PANIC_STRACE_LEN 32 +#endif //ifndef PANIC_STRACE_LEN + +void print_stacktrace(FILE* fd); + +#define panic(...) \ + do { \ + fprintf(PANIC_FD, "Panic at %s:%d\n", __FILE__, __LINE__); \ + fprintf(PANIC_FD, "Panic message:\n\t" __VA_ARGS__); \ + print_stacktrace(PANIC_FD); \ + abort(); \ + } while(0) + +#define NULL_GUARD(ptr, ...) \ + do { \ + if((ptr) == NULL) \ + { \ + if(sizeof(#__VA_ARGS__) > 1) \ + { \ + panic(__VA_ARGS__); \ + } \ + else \ + { \ + panic("NULL detected in %s", #ptr); \ + } \ + } \ + } while(0) + + +#endif //ifndef __PANIC_H__ diff --git a/inc/proto/dial.h b/inc/proto/dial.h new file mode 100644 index 0000000..9a0f66d --- /dev/null +++ b/inc/proto/dial.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_DIAL_H__ +#define __PROTO_DIAL_H__ + +#include + +#include "context.h" + +void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen); +void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ClientContext* client, uint8_t* msg, size_t msgLen); + +#endif \ No newline at end of file diff --git a/inc/proto/enums.h b/inc/proto/enums.h new file mode 100644 index 0000000..41e9bcd --- /dev/null +++ b/inc/proto/enums.h @@ -0,0 +1,37 @@ +#ifndef __PROTO_ENUMS_H__ +#define __PROTO_ENUMS_H__ + + +#define REG_EVTYPE_CONNECT 1 +#define REG_EVTYPE_AUTH 2 +#define REG_EVTYPE_CLOSE 3 + + + + +#define PACKET_TYPE_CTRL 0b0001 +#define PACKET_TYPE_STREAM 0b0010 +#define PACKET_TYPE_MEM 0b0011 + + +#define CTRL_TYPE_EXEC 0b0001 +#define CTRL_TYPE_NOTIF_STATE 0b0010 +#define CTRL_TYPE_LIST_ORPHANED 0b1001 +#define CTRL_TYPE_LOAD_FAILED 0b1010 +#define CTRL_TYPE_SETUP_CONNECTION 0b1011 + +#define NOTIF_TYPE_EXEC 0b0000 + +#define STREAM_TYPE_REG_REQUEST 0b001 +#define STREAM_TYPE_REG_DISCARD 0b011 +#define STREAM_TYPE_REG_CONFIRM 0b101 +#define STREAM_TYPE_SEND 0b000 + +#define MEM_TYPE_READ_REQ 0b0000 +#define MEM_TYPE_READ_RESP 0b0001 +#define MEM_TYPE_WRITE_PUSH 0b0010 + + + + +#endif //ifndef __PROTO_ENUMS_H__ diff --git a/inc/proto/handlers.h b/inc/proto/handlers.h new file mode 100644 index 0000000..61888ee --- /dev/null +++ b/inc/proto/handlers.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_HANDLERS_H__ +#define __PROTO_HANDLERS_H__ + +#include "events.h" + + +void handleRegEvent(EmulContext* emulContext, ClientRegistrationEvent* ev); +void handleAllClients(EmulContext* emulContext); + + +#endif //ifndef __PROTO_HANDLERS_H__ \ No newline at end of file diff --git a/inc/proto/handlers/auth.h b/inc/proto/handlers/auth.h new file mode 100644 index 0000000..81b0b35 --- /dev/null +++ b/inc/proto/handlers/auth.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_HANDLERS_AUTH_H__ +#define __PROTO_HANDLERS_AUTH_H__ + +#include +#include "context.h" + +uint8_t handle_auth(ClientContext* cctx, ws_cli_conn_t client, const uint8_t* msg, uint64_t msgSize, int msgType); +LinkedListEntry* disconnectDueTimeout(EmulContext* emulContext, LinkedListEntry* clientEntry); +void handleOnClientAuthDone(ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_AUTH_H__ diff --git a/inc/proto/handlers/control.h b/inc/proto/handlers/control.h new file mode 100644 index 0000000..393a3a7 --- /dev/null +++ b/inc/proto/handlers/control.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_CONTROL_H__ +#define __PROTO_HANDLERS_CONTROL_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + +void handleIncomingControlMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext); + + +#endif //ifndef __PROTO_HANDLERS_CONTROL_H__ \ No newline at end of file diff --git a/inc/proto/handlers/mem.h b/inc/proto/handlers/mem.h new file mode 100644 index 0000000..b913a89 --- /dev/null +++ b/inc/proto/handlers/mem.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_MEM_H__ +#define __PROTO_HANDLERS_MEM_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + +void handleIncomingMemMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_MEM_H__ + diff --git a/inc/proto/handlers/stream.h b/inc/proto/handlers/stream.h new file mode 100644 index 0000000..7073dfd --- /dev/null +++ b/inc/proto/handlers/stream.h @@ -0,0 +1,12 @@ +#ifndef __PROTO_HANDLERS_STREAM_H__ +#define __PROTO_HANDLERS_STREAM_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + + +void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId); +void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx); +void handleIncomingStreamMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_STREAM_H__ \ No newline at end of file diff --git a/inc/proto/handlers/ws.h b/inc/proto/handlers/ws.h new file mode 100644 index 0000000..5bea2aa --- /dev/null +++ b/inc/proto/handlers/ws.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_WS_H__ +#define __PROTO_HANDLERS_WS_H__ + +#include "wsServer/include/ws.h" + +void onWsMessage(ws_cli_conn_t client, const unsigned char *msg, uint64_t size, int type); +void onWsClose(ws_cli_conn_t client); +void onWsOpen(ws_cli_conn_t client); + +#endif //ifndef __PROTO_HANDLERS_WS_H__ diff --git a/inc/proto/msg.h b/inc/proto/msg.h new file mode 100644 index 0000000..3297621 --- /dev/null +++ b/inc/proto/msg.h @@ -0,0 +1,29 @@ +#ifndef __PROTO_MSG_H__ +#define __PROTO_MSG_H__ + +#include "wsServer/include/ws.h" +#include +#include "client.h" + +typedef struct { + ClientContext* client; + uint8_t* msg; + size_t msgLen; +} OutgoingMessage; + +typedef struct { + uint64_t nonce; + uint8_t packetType; + uint8_t payloadHeader; + const void* payload; + size_t payloadLen; +} BaseMessage; + +BaseMessage* parseMessage(const uint8_t* bytes, size_t size); +void fillHead(uint64_t nonce, uint8_t packetType, uint8_t payloadHeader, uint8_t* outmsg); +uint8_t* createControlNotifyMessage(uint64_t nonce, uint64_t clockCounter, uint8_t newEmulState, size_t* lenOut); +uint8_t* createDoneRegMessage(uint64_t nonce, uint8_t X, uint64_t devId, uint64_t segId, uint64_t startAddr, uint64_t segLength, uint32_t regId, size_t* lenOut); +uint8_t* createStreamSegmentPush(uint8_t mode, uint32_t regId, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut); +uint8_t* createClientSetup(uint64_t nonce, ClientContext* ctx, size_t* lenOut); + +#endif //ifndef __PROTO_MSG_H__ \ No newline at end of file diff --git a/inc/proto/pack.h b/inc/proto/pack.h new file mode 100644 index 0000000..ca6aeed --- /dev/null +++ b/inc/proto/pack.h @@ -0,0 +1,30 @@ +#ifndef __PROTO_PACK_H__ +#define __PROTO_PACK_H__ + +#include + + + +uint64_t decodeBytesToU64(const uint8_t* bytes); +uint32_t decodeBytesToU32(const uint8_t* bytes); +uint16_t decodeBytesToU16(const uint8_t* bytes); + + + +void encodeUint8ToBytes(uint8_t num, uint8_t* tgt); +void encodeUint16ToBytes(uint16_t num, uint8_t* tgt); +void encodeUint32ToBytes(uint32_t num, uint8_t* tgt); +void encodeUint64ToBytes(uint64_t num, uint8_t* tgt); + + + + + +#define encodeUintToBytes(num, tgt) _Generic((num), \ + uint8_t: encodeUint8ToBytes, \ + uint16_t: encodeUint16ToBytes, \ + uint32_t: encodeUint32ToBytes, \ + uint64_t: encodeUint64ToBytes \ + )(num, tgt) + +#endif //ifndef __PROTO_PACK_H__ \ No newline at end of file diff --git a/inc/sized_ptr.h b/inc/sized_ptr.h new file mode 100644 index 0000000..ed61199 --- /dev/null +++ b/inc/sized_ptr.h @@ -0,0 +1,13 @@ +#ifndef __SIZED_PTR_H__ +#define __SIZED_PTR_H__ + +#include + +typedef struct { + void* ptr; + size_t size; + size_t allocatedSize; +} SizedPtr; + + +#endif //ifndef __SIZED_PTR_H__ diff --git a/inc/state.h b/inc/state.h new file mode 100644 index 0000000..da03a5c --- /dev/null +++ b/inc/state.h @@ -0,0 +1,20 @@ +#ifndef __EMUL_STATE_H__ +#define __EMUL_STATE_H__ + +#include + +#define EMUL_STATE_STILL 0 +#define EMUL_STATE_EXEC 1 +#define EMUL_STATE_PAUSE 2 +#define EMUL_STATE_STOP 3 + +#define EMUL_STATE_OP_START 1 +#define EMUL_STATE_OP_PAUSE 2 +#define EMUL_STATE_OP_RESUME 3 +#define EMUL_STATE_OP_RESET 4 +#define EMUL_STATE_OP_STOP 5 + + +uint8_t switchNewEmulState(const uint8_t currentState, const uint8_t controlOp); + +#endif //ifndef __EMUL_STATE_H__ \ No newline at end of file diff --git a/inc/streamed.h b/inc/streamed.h new file mode 100644 index 0000000..3949f7b --- /dev/null +++ b/inc/streamed.h @@ -0,0 +1,27 @@ +#ifndef __STREAMED_H__ +#define __STREAMED_H__ + +#include +#include +#include "client.h" + +#define STREAM_MODE_READ 0 +#define STREAM_MODE_WRITE 1 + + +typedef struct { + ClientContext* clientContext; + uint32_t regId; + uint64_t startAddr; + uint64_t segLen; + uint8_t mode; +} StreamReg; + +typedef struct { + StreamReg* regs; + size_t regCount; + size_t allocatedSize; +} DeviceSegStreamReg; + + +#endif //ifndef __STREAMED_H__ diff --git a/src/base_device.c b/src/base_device.c deleted file mode 100644 index 98eb981..0000000 --- a/src/base_device.c +++ /dev/null @@ -1,276 +0,0 @@ -#include -#include -#include - -#include "tomlc99/toml.h" - -#include "base_device.h" - - - - -device_handle_t* openBaseDeviceFromConfig(const char* configPath, char* errbuf) -{ - char intErrbuf[1024]; - - conf_dev_t *devConf = openBaseDeviceConfig(configPath, intErrbuf); - if (devConf == NULL) - { - sprintf(errbuf, "unable to read device config: %s", intErrbuf); - return NULL; - } - - device_handle_t* ret = openBaseDevice(devConf, intErrbuf); - - if(ret == NULL) - { - freeConf(devConf); - free(devConf); - } - - return ret; -} - -void closeBaseDevice(device_handle_t* devHandle) -{ - if(devHandle->ctx != NULL) - { - if(devHandle->specs != NULL) - { - devHandle->lib->freeSpecs(devHandle->specs); - } - if(devHandle->ctx->deviceMem != NULL) - { - devHandle->lib->freeDevMem(devHandle->ctx->deviceMem); - } - free(devHandle->ctx); - } - if(devHandle->lib->extendedHandlers != NULL) - { - free(devHandle->lib->extendedHandlers); - } - dlclose(devHandle->lib->_dlhandl); - free(devHandle->lib); -} - -device_handle_t* openBaseDevice(conf_dev_t* devConf, char* errbuf) -{ - device_handle_t* ret = malloc(sizeof(device_handle_t)); - if (ret == NULL) - { - sprintf(errbuf, "unable to allocate device handle struct"); - return NULL; - } - - char intErrbuf[1024]; - - device_lib_t* devLib = loadDeviceLib(devConf->libPath, intErrbuf); - if (devLib == NULL) - { - sprintf(errbuf, "unable to load device library %s: %s", devConf->libPath, intErrbuf); - free(ret); - return NULL; - } - - void* devSpecs = devLib->parseSpecsFromConfig(devConf, intErrbuf); - - if (devSpecs == NULL) - { - sprintf(errbuf, "device config parse error: %s", intErrbuf); - dlclose(devLib->_dlhandl); - free(ret); - free(devLib); - return NULL; - } - - - ret->lib = devLib; - ret->specs = devSpecs; - ret->clockCycleCounter = 0; - ret->clockCycleLimit = 0; - - return ret; -} - - - -conf_mem_seg_t** parseMemToml(const toml_table_t* memTable) -{ - uint16_t memSegIdx = 0; - const char* memSegKey = toml_key_in(memTable, 0); - while (memSegKey && memSegIdx < 0xFFFF) - { - memSegIdx++; - memSegKey = toml_key_in(memTable, memSegIdx); - } - - conf_mem_seg_t** segConfigs = malloc(sizeof(conf_mem_seg_t*) * ((size_t)memSegIdx + 1)); - if (segConfigs == NULL) - { - return NULL; - } - segConfigs[memSegIdx] = NULL; - - for (size_t i = 0; i < memSegIdx; i++) - { - segConfigs[i] = (conf_mem_seg_t*)malloc(sizeof(conf_mem_seg_t)); - memSegKey = toml_key_in(memTable, (int)i); - - if (segConfigs[i] == NULL || memSegKey == NULL) - { - if (segConfigs[i] != NULL) - { - free(segConfigs[i]); - } - else - { - } - for(size_t j = 0; j < i; j++) - { - free(segConfigs[j]->name); - free(segConfigs[j]); - } - free(segConfigs); - return NULL; - } - segConfigs[i]->name = (char*)malloc(strlen(memSegKey) + 1); - if (segConfigs[i]->name == NULL) - { - for(size_t j = 0; j <= i; j++) - { - if (j < i) - free(segConfigs[j]->name); - free(segConfigs[j]); - } - free(segConfigs); - return NULL; - } - strcpy(segConfigs[i]->name, memSegKey); - - toml_table_t* segTable = toml_table_in(memTable, memSegKey); - if (segTable == NULL) - { - - for(size_t j = 0; j <= i; j++) - { - free(segConfigs[j]->name); - free(segConfigs[j]); - } - free(segConfigs); - return NULL; - } - - toml_datum_t startDatum = toml_int_in(segTable, "start"); - toml_datum_t lenDatum = toml_int_in(segTable, "len"); - toml_datum_t wordLenDatum = toml_int_in(segTable, "wordLen"); - toml_datum_t executableDatum = toml_bool_in(segTable, "executable"); - - if (!startDatum.ok || !lenDatum.ok || !wordLenDatum.ok) - { - for(size_t j = 0; j <= i; j++) - { - free(segConfigs[j]->name); - free(segConfigs[j]); - } - free(segConfigs); - return NULL; - } - - segConfigs[i]->start = (size_t)startDatum.u.i; - segConfigs[i]->len = (size_t)lenDatum.u.i; - segConfigs[i]->wordLen = (uint8_t)wordLenDatum.u.i; - segConfigs[i]->isExecutable = 0; - if (executableDatum.ok) - { - segConfigs[i]->isExecutable = executableDatum.u.b != 0; - } - } - - - - return segConfigs; -} - - -conf_dev_t* openBaseDeviceConfig(const char* configPath, char* errbuf) -{ - conf_dev_t* ret = malloc(sizeof(conf_dev_t)); - if (ret == NULL) - { - sprintf(errbuf, "unable to allocate device conf struct"); - return NULL; - } - - ret->id = NULL; - ret->clockDivider = 1; - ret->clockMultipler = 1; - ret->clockId = NULL; - - - ret->memConf = malloc(sizeof(conf_mem_t)); - if (ret->memConf == NULL) - { - sprintf(errbuf, "unable to allocate device memory conf struct"); - free(ret); - return NULL; - } - - char intErrbuf[1024]; - - - - FILE *fp = fopen (configPath, "rb"); - if (!fp) { - sprintf(errbuf, "unable to open config file %s", configPath); - free(ret->memConf); - free(ret); - return NULL; - } - toml_table_t* conf = toml_parse_file(fp, intErrbuf, sizeof(intErrbuf)); - fclose(fp); - - if (!conf) { - sprintf(errbuf, "cannot parse - %s", intErrbuf); - free(ret->memConf); - free(ret); - return NULL; - } - - toml_table_t* devBlock = toml_table_in(conf, "dev"); - if (!devBlock) { - sprintf(errbuf, "missing [dev]"); - free(ret->memConf); - free(ret); - return NULL; - } - - toml_table_t* memTable = toml_table_in(conf, "mem"); - if (!memTable) { - sprintf(errbuf, "missing [mem]"); - free(ret->memConf); - free(ret); - return NULL; - } - - toml_datum_t libpathBlock = toml_string_in(devBlock, "libpath"); - if (!libpathBlock.ok) { - sprintf(errbuf, "unable to read dev.libpath"); - free(ret->memConf); - free(ret); - return NULL; - } - - conf_mem_seg_t** memSegConfs = parseMemToml(memTable); - if (memSegConfs == NULL) - { - sprintf(errbuf, "unable to parse mem segments"); - free(ret->memConf); - free(ret); - return NULL; - } - - ret->memConf->memSegConfs = memSegConfs; - ret->libPath = libpathBlock.u.s; - - return ret; -} diff --git a/src/compose_device.c b/src/compose_device.c deleted file mode 100644 index d0fc3b4..0000000 --- a/src/compose_device.c +++ /dev/null @@ -1,529 +0,0 @@ -#include -#include -#include - -#include "tomlc99/toml.h" - -#include "base_device.h" -#include "compose_device.h" - - -// void fillDevClockConfig(const toml_table_t* clockTable, conf_dev_t** devConfs) -// { -// uint16_t clockSegIdx = 0; -// uint16_t clockSegSkips = 0; -// const char* clockSegKey = toml_key_in(clockTable, 0); -// while (clockSegKey && (clockSegIdx + clockSegSkips) < 0xFFFF) -// { -// clockSegKey = toml_key_in(clockTable, clockSegIdx + clockSegSkips + 1); -// if(strcmp(clockSegKey, "limiter") != 0) -// { -// clockSegIdx++; -// } -// else -// { -// clockSegSkips++; -// } -// } -// } - -char** appendId(char** prev, const char* cur, char* errbuf) -{ - if(prev == NULL) - { - prev = malloc(sizeof(char*) * 2); - if(prev == NULL) - { - sprintf(errbuf, "unable to allocate id"); - return NULL; - } - prev[0] = NULL; - prev[1] = NULL; - } - - size_t clockIdLen = 0; - while (prev[clockIdLen] != NULL){clockIdLen++;} - - clockIdLen++; - - char** new = realloc(prev, sizeof(char*) * (clockIdLen + 1)); - - if(new == NULL) - { - sprintf(errbuf, "unable to reallocate id"); - freeComposeId(prev); - return NULL; - } - prev = new; - - prev[clockIdLen] = NULL; - size_t idLen = strlen(cur); - prev[clockIdLen - 1] = malloc(sizeof(char) * (idLen + 1)); - if(prev[clockIdLen - 1] == NULL) - { - sprintf(errbuf, "unable to allocate new id entry"); - freeComposeId(prev); - return NULL; - } - strcpy(prev[clockIdLen - 1], cur); - prev[clockIdLen - 1][idLen] = '\0'; - return prev; -} - -conf_dev_t** parseDevToml(const toml_table_t* devTable, const toml_table_t* clockTable, char* errbuf) -{ - uint16_t devSegIdx = 0; - const char* devSegKey = toml_key_in(devTable, 0); - while (devSegKey && devSegIdx < 0xFFFF) - { - devSegIdx++; - devSegKey = toml_key_in(devTable, devSegIdx); - } - - conf_dev_t** deviceConfigs = malloc(sizeof(conf_dev_t*) * (devSegIdx + 1)); - if(deviceConfigs == NULL) - { - return NULL; - } - - deviceConfigs[devSegIdx] = NULL; - - for(size_t i = 0; i < devSegIdx; i++) - { - devSegKey = toml_key_in(devTable, (int)i); - if(devSegKey == NULL) - { - sprintf(errbuf, "unable to load device key %lu", i); - for(size_t j = 0; j < i; j++) - { - freeConf(deviceConfigs[j]); - free(deviceConfigs[j]); - } - free(deviceConfigs); - return NULL; - } - toml_datum_t devpathDatum = toml_string_in(devTable, devSegKey); - if(!devpathDatum.ok || !devpathDatum.u.s) - { - sprintf(errbuf, "unable to get device path for %s", devSegKey); - - for(size_t j = 0; j < i; j++) - { - freeConf(deviceConfigs[j]); - free(deviceConfigs[j]); - } - free(deviceConfigs); - return NULL; - } - char intErrbuf[1024]; - - conf_dev_t* conf = openBaseDeviceConfig(devpathDatum.u.s, intErrbuf); - if(conf == NULL) - { - sprintf(errbuf, "unable to load config for %s: %s", devSegKey, intErrbuf); - - for(size_t j = 0; j < i; j++) - { - freeConf(deviceConfigs[j]); - free(deviceConfigs[j]); - } - free(deviceConfigs); - return NULL; - } - - conf->id = NULL; - conf->id = appendId(conf->id, devSegKey, intErrbuf); - - - if(conf->id == NULL) - { - sprintf(errbuf, "unable to allocate device %s name field", devSegKey); - for(size_t j = 0; j <= i; j++) - { - freeConf(deviceConfigs[j]); - free(deviceConfigs[j]); - } - free(deviceConfigs); - return NULL; - } - - deviceConfigs[i] = conf; - } - - if(clockTable != NULL) - { - for(size_t i = 0; i < devSegIdx; i++) - { - conf_dev_t* conf = deviceConfigs[i]; - toml_table_t* devClockTable = toml_table_in(clockTable, conf->id[0]); - if(devClockTable == NULL) - { - continue; - } - - toml_datum_t dividerDatum = toml_int_in(devClockTable, "divider"); - if(dividerDatum.ok) - { - conf->clockDivider = (uint64_t)dividerDatum.u.i; - } - toml_datum_t multiplerDatum = toml_int_in(devClockTable, "multipler"); - if(multiplerDatum.ok) - { - conf->clockMultipler = (uint64_t)multiplerDatum.u.i; - } - - toml_datum_t srcDatum = toml_string_in(devClockTable, "src"); - if(srcDatum.ok && srcDatum.u.s != NULL) - { - char intErrbuf[1024] = {0}; - conf->clockId = appendId(conf->clockId, srcDatum.u.s, intErrbuf); - if(conf->clockId == NULL) - { - sprintf(errbuf, "unable to append clock id for %s: %s", conf->id[0], intErrbuf); - for(size_t j = 0; j <= i; j++) - { - freeConf(deviceConfigs[j]); - free(deviceConfigs[j]); - } - free(deviceConfigs); - return NULL; - } - } - - } - } - - return deviceConfigs; -} -void freeProjectionConfig(projection_conf_t* conf) -{ - if(conf == NULL) - { - return; - } - freeComposeId(conf->target); - freeComposeId(conf->baseAt); - if(conf->baseSeg != NULL) - { - free(conf->baseSeg); - } - conf->target = NULL; - conf->baseAt = NULL; - conf->baseSeg = NULL; - free(conf); -} -void freeProjectionConfigs(projection_conf_t** confs) -{ - if(confs == NULL) - { - return; - } - size_t i = 0; - while(confs[i] != NULL) - { - freeProjectionConfig(confs[i]); - confs[i] = NULL; - } - free(confs); -} - - -projection_conf_t* parseDeviceProjectionConf(const toml_table_t* deviceProjectionTable, char* errbuf) -{ - toml_datum_t baseAtDatum = toml_string_in(deviceProjectionTable, "base_at"); - if(!baseAtDatum.ok || baseAtDatum.u.s == NULL) - { - sprintf(errbuf, "missing 'base_at'"); - return NULL; - } - - toml_datum_t baseSegDatum = toml_string_in(deviceProjectionTable, "base_seg"); - if(!baseSegDatum.ok || baseSegDatum.u.s == NULL) - { - sprintf(errbuf, "missing 'base_seg'"); - return NULL; - } - - toml_datum_t projectionShiftDatum = toml_int_in(deviceProjectionTable, "projection_shift"); - - char intErrbuf[1024] = {0}; - - projection_conf_t* ret = malloc(sizeof(projection_conf_t)); - if(ret == NULL) - { - sprintf(errbuf, "unable to allocate projection conf"); - return NULL; - } - ret->baseAt = NULL; - ret->baseSeg = NULL; - ret->target = NULL; - ret->projectionShift = 0; - - if(projectionShiftDatum.ok) - { - ret->projectionShift = (uint64_t)projectionShiftDatum.u.i; - } - - ret->baseAt = appendId(ret->baseAt, baseAtDatum.u.s, intErrbuf); - if(ret->baseAt == NULL) - { - sprintf(errbuf, "unable to append base_at id"); - freeProjectionConfig(ret); - return NULL; - } - ret->baseSeg = malloc(strlen(baseSegDatum.u.s)); - if(ret->baseSeg == NULL) - { - sprintf(errbuf, "unable to fill base_seg"); - freeProjectionConfig(ret); - return NULL; - } - strcpy(ret->baseSeg, baseSegDatum.u.s); - - return ret; -} - -projection_conf_t** parseDeviceProjectionConfs(const toml_table_t* deviceProjectionsTable, char* errbuf) -{ - uint16_t projSegIdx = 0; - const char* projSegKey = toml_key_in(deviceProjectionsTable, 0); - while (projSegKey && projSegIdx < 0xFFFF) - { - projSegIdx++; - projSegKey = toml_key_in(deviceProjectionsTable, projSegIdx); - } - char intErrbuf[1024] = {0}; - - projection_conf_t** ret = malloc(sizeof(projection_conf_t*) * (projSegIdx + 1)); - if(ret == NULL) - { - sprintf(errbuf, "unable to allocate device projections confs"); - return NULL; - } - - for(size_t i = 0; i <= projSegIdx; i++){ret[i] = NULL;} - - for(size_t i = 0; i < projSegIdx; i++) - { - projSegKey = toml_key_in(deviceProjectionsTable, (int)i); - if(projSegKey == NULL) - { - sprintf(errbuf, "unable to load device projection key %lu", i); - freeProjectionConfigs(ret); - return NULL; - } - - toml_table_t* projTable = toml_table_in(deviceProjectionsTable, projSegKey); - if(projTable == NULL) - { - sprintf(errbuf, "unable to open device %s projection table", projSegKey); - freeProjectionConfigs(ret); - return NULL; - } - - ret[i] = parseDeviceProjectionConf(projTable, intErrbuf); - if(ret[i] == NULL) - { - sprintf(errbuf, "unable to load device %s projection configs: %s", projSegKey, intErrbuf); - freeProjectionConfigs(ret); - return NULL; - } - - ret[i]->target = appendId(ret[i]->target, projSegKey, intErrbuf); - if(ret[i]->target == NULL) - { - sprintf(errbuf, "unable to append projection target id: %s", intErrbuf); - freeProjectionConfigs(ret); - return NULL; - } - } - - return ret; -} - -projection_conf_t** parseProjectionConfs(const toml_table_t* projectionTable, char* errbuf) -{ - uint16_t devicesProjSegIdx = 0; - const char* devicesProjSegKey = toml_key_in(projectionTable, 0); - while (devicesProjSegKey && devicesProjSegIdx < 0xFFFF) - { - devicesProjSegIdx++; - devicesProjSegKey = toml_key_in(projectionTable, devicesProjSegIdx); - } - - - char intErrbuf[1024] = {0}; - - size_t totalRetSize = 0; - projection_conf_t** ret = malloc(sizeof(projection_conf_t*)); - if(ret == NULL) - { - sprintf(errbuf, "unable to allocate base projection conf array"); - return NULL; - } - ret[0] = NULL; - - for(size_t i = 0; i < devicesProjSegIdx; i++) - { - devicesProjSegKey = toml_key_in(projectionTable, (int)i); - if(devicesProjSegKey == NULL) - { - sprintf(errbuf, "unable to load proj key %lu", i); - freeProjectionConfigs(ret); - return NULL; - } - - toml_table_t* deviceProjTable = toml_table_in(projectionTable, devicesProjSegKey); - if(deviceProjTable == NULL) - { - sprintf(errbuf, "unable to open device projection table"); - freeProjectionConfigs(ret); - return NULL; - } - - projection_conf_t** devProjections = parseDeviceProjectionConfs(deviceProjTable, intErrbuf); - - if(devProjections == NULL) - { - sprintf(errbuf, "unable to parse device %s projection rules: %s", devicesProjSegKey, intErrbuf); - freeProjectionConfigs(ret); - return NULL; - } - size_t dlen = 0; - while(devProjections[dlen] != NULL) - { - devProjections[dlen]->target = appendId(devProjections[dlen]->target, devicesProjSegKey, intErrbuf); - if(devProjections[dlen]->target == NULL) - { - freeProjectionConfigs(ret); - freeProjectionConfigs(devProjections); - return NULL; - } - dlen++; - } - ret = realloc(ret, sizeof(projection_conf_t*) * (dlen + totalRetSize + 1)); - if(ret == NULL) - { - sprintf(errbuf, "unable to reallocate full projection array"); - freeProjectionConfigs(devProjections); - return NULL; - } - for(size_t i = 0; i <= dlen; i++) - { - ret[totalRetSize + i] = devProjections[i]; - } - totalRetSize += dlen; - ret[totalRetSize] = NULL; - free(devProjections); - } - - return ret; -} - -compose_dev_conf_t* openComposeDeviceConfig(const char* configPath, char* errbuf) -{ - compose_dev_conf_t* ret = malloc(sizeof(compose_dev_conf_t)); - - if(ret == NULL) - { - sprintf(errbuf, "unable to allocate device struct"); - return NULL; - } - - char intErrbuf[1024] = {0}; - FILE *fp = fopen (configPath, "rb"); - if (!fp) { - sprintf(errbuf, "unable to open config file %s", configPath); - free(ret); - return NULL; - } - toml_table_t* conf = toml_parse_file(fp, intErrbuf, sizeof(intErrbuf)); - fclose(fp); - - - toml_table_t* devTable = toml_table_in(conf, "dev"); - if (!devTable) { - sprintf(errbuf, "missing [dev]"); - free(ret); - return NULL; - } - ret->projections = NULL; - - toml_table_t* memTable = toml_table_in(conf, "mem"); - if(memTable) - { - toml_table_t* projectionTable = toml_table_in(memTable, "projection"); - if(projectionTable) - { - ret->projections = parseProjectionConfs(projectionTable, intErrbuf); - if(ret->projections == NULL) - { - sprintf(errbuf, "unable to load projections: %s", intErrbuf); - free(ret); - return NULL; - } - } - } - - - toml_table_t* clockTable = toml_table_in(conf, "clock"); - - - conf_dev_t** devHandlers = parseDevToml(devTable, clockTable, intErrbuf); - - if(devHandlers == NULL) - { - sprintf(errbuf, "unable to load devices configs: %s", intErrbuf); - freeProjectionConfigs(ret->projections); - free(ret); - return NULL; - } - - ret->baseConfigs = devHandlers; - - - return ret; -} - - -device_handle_t** openComposeDevice(compose_dev_conf_t* conf, char* errbuf) -{ - size_t devIdx = 0; - while(conf->baseConfigs[devIdx] != NULL){devIdx++;} - - device_handle_t** devHandlers = malloc(sizeof(device_handle_t*) * (devIdx + 1)); - - if(devHandlers == NULL) - { - sprintf(errbuf, "unable to allocate dev handlers"); - return NULL; - } - devHandlers[devIdx] = NULL; - - - for(size_t i = 0; i < devIdx; i++) - { - conf_dev_t* devConf = conf->baseConfigs[i]; - char intErrbuf[1024] = {0}; - device_handle_t* devHandle = openBaseDevice(devConf, intErrbuf); - if(devHandle == NULL) - { - sprintf(errbuf, "unable to open base device %s: %s", devConf->id[0], intErrbuf); - for(size_t j = 0; j < i; j++) - { - closeBaseDevice(devHandlers[j]); - free(devHandlers[j]); - } - free(devHandlers); - return NULL; - } - - devHandlers[i] = devHandle; - } - - return devHandlers; -} - - - - diff --git a/src/config.c b/src/config.c deleted file mode 100644 index 8794c55..0000000 --- a/src/config.c +++ /dev/null @@ -1,86 +0,0 @@ -#include -#include - -#include "config.h" - - -void xfree(void* p) -{ - if(p != NULL) - { - free(p); - } -} - -void freeComposeId(char** id) -{ - if(id == NULL) - { - return; - } - size_t i = 0; - while(id[i] != NULL) - { - free(id[i]); - i++; - } - free(id); -} - - -uint8_t compareComposeId(char** idA, char** idB) -{ - size_t i = 0; - while(idA[i] != NULL && idB[i] != NULL) - { - if(strcmp(idA[i], idB[i]) != 0) - { - return 0; - } - i++; - } - return idA[i] == NULL && idB[i] == NULL; -} - -void freeMemSegConf(conf_mem_seg_t* memSegConf) -{ - if(memSegConf == NULL) - { - return; - } - xfree(memSegConf->name); -} -void freeMemConf(conf_mem_t* memConf) -{ - if(memConf == NULL) - { - return; - } - for(size_t i = 0; i < 0xFF && memConf->memSegConfs[i] != NULL; i++) - { - freeMemSegConf(memConf->memSegConfs[i]); - } - xfree(memConf->memSegConfs); - free(memConf); -} -void freeConf(conf_dev_t* conf) -{ - if(conf == NULL) - { - return; - } - if(conf->clockId != NULL) - { - size_t i = 0; - while(conf->clockId[i] != NULL) - { - free(conf->clockId[i]); - i++; - } - free(conf->clockId); - } - freeComposeId(conf->id); - xfree(conf->libPath); - freeMemConf(conf->memConf); - free(conf); -} diff --git a/src/hmmmm.c b/src/hmmmm.c deleted file mode 100644 index 00f6d2a..0000000 --- a/src/hmmmm.c +++ /dev/null @@ -1,238 +0,0 @@ -#include "hmmmm.h" - - -#include -#include - -typedef size_t (*_dlib_pubExtractPcounter_t)(device_public_context_t* devContext); -typedef size_t (*_dlib_pubExtractOpcode_t)(device_mem_t* devMem, size_t _programCounter); - -typedef uint8_t(*_dlib_pubExtractPcounterSizeWords_t)(); - -typedef device_public_context_t* (*_dlib_dev_init_t)(void* specs, char* errbuf); - -typedef uint8_t (*_dlib_makeDeviceTick_t)(device_public_context_t* devInfo); - -typedef uint8_t (*_dlib_deviceType_t)(); - -typedef void* (*_dlib_pubExtractPcounterPtr_t)(device_public_context_t* devContext); - -typedef void* (*_dlib_parseSpecsFromConfig_t)(const conf_dev_t* devConf, char* errbuf); - -typedef void (*_dlib_freeSpecs_t)(void* specs); -typedef void (*_dlib_freeDevMem_t)(device_mem_t* mem); - -typedef void (*_dlib_fillSmartReadSpecs_t)(void* specs, smart_read_spec_t* smartReadSpecs, uint64_t smartReadSpecsCount); -typedef void (*_dlib_fillSmartWriteSpecs_t)(void* specs, smart_write_spec_t* smartWriteSpecs, uint64_t smartWriteSpecsCount); - -instruction_simul_handlers_t* _fillInstructionSimul(void* handle) -{ - instruction_simul_handlers_t* ret = malloc(sizeof(instruction_simul_handlers_t)); - - if (ret == NULL) - { - return NULL; - } - - _dlib_pubExtractPcounterPtr_t _dlib_pubExtractPcounterPtr = (_dlib_pubExtractPcounterPtr_t)(uintptr_t)dlsym(handle, "pubExtractPcounterPtr"); - - const char *dlib_pubExtractPcounterPtr_error = dlerror(); - if (dlib_pubExtractPcounterPtr_error) { - dlclose(handle); - free(ret); - return NULL; - } - - ret->extractPcounterPtr = _dlib_pubExtractPcounterPtr; - - - _dlib_pubExtractPcounter_t _dlib_pubExtractPcounter = (_dlib_pubExtractPcounter_t)(uintptr_t)dlsym(handle, "pubExtractPcounter"); - - const char *dlib_pubExtractPcounter_error = dlerror(); - if (dlib_pubExtractPcounter_error) { - dlclose(handle); - free(ret); - return NULL; - } - - ret->extractPcounter = _dlib_pubExtractPcounter; - - - _dlib_pubExtractOpcode_t _dlib_pubExtractOpcode = (_dlib_pubExtractOpcode_t)(uintptr_t)dlsym(handle, "pubExtractOpcode"); - - const char *dlib_pubExtractOpcode_error = dlerror(); - if (dlib_pubExtractOpcode_error) { - dlclose(handle); - free(ret); - return NULL; - } - - ret->extractOpcode = _dlib_pubExtractOpcode; - - _dlib_pubExtractPcounterSizeWords_t _dlib_pubExtractPcounterSizeWords = (_dlib_pubExtractPcounterSizeWords_t)(uintptr_t)dlsym(handle, "pubExtractPcounterSizeWords"); - - const char *dlib_pubExtractPcounterSizeWords_error = dlerror(); - if (dlib_pubExtractPcounterSizeWords_error) { - dlclose(handle); - free(ret); - return NULL; - } - - ret->extractPcounterSizeWords = _dlib_pubExtractPcounterSizeWords; - - - - return ret; -} - - -device_lib_t* loadDeviceLib(const char *libpath, char* errbuf) -{ - device_lib_t* dev = malloc(sizeof(device_lib_t)); - - if (dev == NULL) - { - sprintf(errbuf, "unable to allocate device lib struct"); - return NULL; - } - - void *handle = dlopen(libpath, RTLD_NOW); - - if (!handle) { - sprintf(errbuf, "unable to open dl handle"); - free(dev); - return NULL; - } - - dlerror(); - - _dlib_dev_init_t _dlib_dev_init = (_dlib_dev_init_t)(uintptr_t)dlsym(handle, "init"); - - const char *dlsym_init_error = dlerror(); - if (dlsym_init_error) { - sprintf(errbuf, "unable to find init symbol: %s", dlsym_init_error); - dlclose(handle); - free(dev); - return NULL; - } - - _dlib_makeDeviceTick_t _dlib_makeDeviceTick = (_dlib_makeDeviceTick_t)(uintptr_t)dlsym(handle, "makeDeviceTick"); - - - const char *dlsym_maketick_error = dlerror(); - if (dlsym_maketick_error) { - sprintf(errbuf, "unable to find makeDeviceTick symbol: %s", dlsym_maketick_error); - dlclose(handle); - free(dev); - return NULL; - } - - - - _dlib_parseSpecsFromConfig_t _dlib_parseSpecsFromConfig = (_dlib_parseSpecsFromConfig_t)(uintptr_t)dlsym(handle, "parseSpecsFromConfig"); - - const char *_dlib_parseSpecsFromConfig_error = dlerror(); - if (_dlib_parseSpecsFromConfig_error) { - sprintf(errbuf, "unable to find parseSpecsFromConfig symbol: %s", _dlib_parseSpecsFromConfig_error); - dlclose(handle); - free(dev); - return NULL; - } - - - - _dlib_fillSmartReadSpecs_t _dlib_fillSmartReadSpecs = (_dlib_fillSmartReadSpecs_t)(uintptr_t)dlsym(handle, "fillSmartReadSpecs"); - - const char *_dlib_fillSmartReadSpecs_error = dlerror(); - if (_dlib_fillSmartReadSpecs_error) { - sprintf(errbuf, "unable to find fillSmartReadSpecs symbol: %s", _dlib_fillSmartReadSpecs_error); - dlclose(handle); - free(dev); - return NULL; - } - - - - _dlib_fillSmartWriteSpecs_t _dlib_fillSmartWriteSpecs = (_dlib_fillSmartWriteSpecs_t)(uintptr_t)dlsym(handle, "fillSmartWriteSpecs"); - - const char *_dlib_fillSmartWriteSpecs_error = dlerror(); - if (_dlib_fillSmartWriteSpecs_error) { - sprintf(errbuf, "unable to find fillSmartWriteSpecs symbol: %s", _dlib_fillSmartWriteSpecs_error); - dlclose(handle); - free(dev); - return NULL; - } - - - _dlib_deviceType_t _dlib_deviceType = (_dlib_deviceType_t)(uintptr_t)dlsym(handle, "pubDeviceType"); - - const char *dlib_deviceType_error = dlerror(); - if (dlib_deviceType_error) { - sprintf(errbuf, "unable to find pubDeviceType symbol: %s", dlib_deviceType_error); - dlclose(handle); - free(dev); - return NULL; - } - - - - - _dlib_freeSpecs_t _dlib_freeSpecs = (_dlib_freeSpecs_t)(uintptr_t)dlsym(handle, "freeDevSpecs"); - - const char *dlib_freeSpecs_error = dlerror(); - if (dlib_freeSpecs_error) { - sprintf(errbuf, "unable to find freeSpecs symbol: %s", dlib_freeSpecs_error); - dlclose(handle); - free(dev); - return NULL; - } - - - _dlib_freeDevMem_t _dlib_freeDevMem = (_dlib_freeDevMem_t)(uintptr_t)dlsym(handle, "freeDevSpecs"); - - const char *dlib_freeDevMem_error = dlerror(); - if (dlib_freeDevMem_error) { - sprintf(errbuf, "unable to find freeDevMem symbol: %s", dlib_freeDevMem_error); - dlclose(handle); - free(dev); - return NULL; - } - - dev->devContext = NULL; - dev->init = _dlib_dev_init; - dev->makeDeviceTick = _dlib_makeDeviceTick; - dev->parseSpecsFromConfig = _dlib_parseSpecsFromConfig; - dev->fillSmartReadSpecs = _dlib_fillSmartReadSpecs; - dev->fillSmartWriteSpecs = _dlib_fillSmartWriteSpecs; - dev->freeSpecs = _dlib_freeSpecs; - dev->freeDevMem = _dlib_freeDevMem; - - uint8_t devType = _dlib_deviceType(); - - dev->devType = devType; - - if (devType == EXTENDED_DEVICE_TYPE_DUMMY) - { - dev->extendedHandlers = NULL; - } - else if (devType == EXTENDED_DEVICE_TYPE_INSTR_SIMUL) - { - dev->extendedHandlers = _fillInstructionSimul(handle); - if (dev->extendedHandlers == NULL) - { - free(dev); - dlclose(handle); - return NULL; - } - } - else - { - free(dev); - dlclose(handle); - return NULL; - } - - dev->_dlhandl = handle; - - return dev; -} \ No newline at end of file diff --git a/src/linkedlist.c b/src/linkedlist.c new file mode 100644 index 0000000..3f7393e --- /dev/null +++ b/src/linkedlist.c @@ -0,0 +1,49 @@ +#include +#include "linkedlist.h" + + +void removeLinkedListEntry(LinkedListEntry** head, LinkedListEntry* entry) +{ + if(entry == NULL) + { + return; + } + // check for head + if(entry->prevEntry != NULL) + { + entry->prevEntry->nextEntry = entry->nextEntry; + } + // check for tail + if(entry->nextEntry != NULL) + { + entry->nextEntry->prevEntry = entry->prevEntry; + } + + // check for removing head entry + if(entry == *head) + { + if(entry->nextEntry == NULL && entry->prevEntry == NULL) + { + // removing full list + *head = NULL; + } + else if(entry->nextEntry != NULL) + { + // moving head to next entry + *head = entry->nextEntry; + } + else if(entry->prevEntry != NULL) + { + // head that have previous item, must be impossible + *head = entry->prevEntry; + } + + if(entry == *head) + { + // if still head => there is a circular list, removing whole + *head = NULL; + } + } + + free(entry); +} \ No newline at end of file diff --git a/src/main.c b/src/main.c index ecfd534..ec8f943 100644 --- a/src/main.c +++ b/src/main.c @@ -7,123 +7,48 @@ #include #include -// #define OPCODE_WORDSIZE 2 -// #define MEM_CELL_WORDS 1 - -// #include "abc.h" -// #include "instr.h" -// #include "mem.h" -// #include "runner.h" - -#include -// #include "runner.h" -#include "hmmmm.h" #include "my_mutex.h" -#include "tomlc99/toml.h" +#include "panic.h" + #include -#include "compose_device.h" - +#include "linkedlist.h" #include "ptQueue/inc/ptQueue.h" #include "wsServer/include/ws.h" - -void printMemory(void* cells, uint64_t cellsCount) -{ - - for(uint64_t i = 0; i < cellsCount; i++) - { - printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]); - - // uint8_t wasRead = 0; - // for (uint8_t j = 0; j < mem->memreadLen; j++) - // { - // if(mem->memreadCellAddrs[j] == i) - // { - // wasRead = 1; - // break; - // } - // } - // uint8_t wasWrite = 0; - // for (uint8_t j = 0; j < mem->memwriteLen; j++) - // { - // if(mem->memwriteCellAddrs[j] == i) - // { - // wasWrite = 1; - // break; - // } - // } - - // if (wasRead == 1) - // { - // printf("\t[was read]"); - // } - // else - // { - // printf("\t[was not read]"); - // } - - // if (wasWrite == 1) - // { - // printf("\t[was written]"); - // } - // else - // { - // printf("\t[was not written]"); - // } - printf("\n"); - } -} - - - - -void dummyWriteHandler(uint64_t ident, uint64_t addr, void* rawCells, void* data) -{ - // printf("Intercepted write on 0x%lx: 0x%02x\n", addr, *((uint8_t*)data)); - printf("Intercepted write on 0x%lx: ", addr); - printf("0b"); - for(uint8_t i = 0; i < 8; i++) - { - printf("%d", (*((uint8_t*)data) >> (7 - i)) & 1); - } - printf("\n"); - ((uint8_t*)rawCells)[addr] = *((uint8_t*)data); - return; -} - - - - -ext_h_read_func* interceptReadRouter; -ext_h_write_func* interceptWriteRouter; -void** iterceptDevContextRouter; - -void* readExt(uint64_t ident, uint64_t addr, void* rawCells) -{ - void* devContext = iterceptDevContextRouter[ident]; - ext_h_read_func tgt = interceptReadRouter[ident]; - return tgt(addr, rawCells, devContext); -} -void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data) -{ - void* devContext = iterceptDevContextRouter[ident]; - ext_h_write_func tgt = interceptWriteRouter[ident]; - tgt(addr, rawCells, data, devContext); -} - - - -// void *threadMain(void *param); #define _GNU_SOURCE -#include #include +#include +#include +#include + +#include + + + +#include "context.h" +#include "streamed.h" +#include "events.h" + +#include "state.h" + +#include "proto/msg.h" +#include "proto/enums.h" +#include "proto/dial.h" + +#include "proto/handlers.h" +#include "proto/handlers/ws.h" + + +#include "proto/pack.h" + + void my_sleep(int microseconds) { struct timespec ts; @@ -132,173 +57,230 @@ void my_sleep(int microseconds) { nanosleep(&ts, NULL); } -#define REG_EVTYPE_CONNECT 1 -#define REG_EVTYPE_CLOSE 2 -#define CLIENT_STATE_UNAUTHED 0 -typedef struct { - pthread_mutex_t registerMutex; - ptQueue* regQueue; -} ServerContext; -typedef struct { - ws_cli_conn_t clientId; - uint8_t isAuthed; - ptQueue* incomeQ; - ptQueue* outcomeQ; -} ClientContext; -typedef struct { - uint8_t regType; - ClientContext* ctx; -} ClientRegistrationEvent; - -/** - * @brief This function is called whenever a new connection is opened. - * @param client Client connection. - */ -void onopen(ws_cli_conn_t client) +void* outgoingMain(void* args) { - char errbuf[1024]; - - ptQueue* incomeQueue = ptQueueCreate(errbuf); - if(incomeQueue == NULL) + OutgoingBuffers* outBufs = args; + uint8_t bufsCount = outBufs->buffersCount; + uint8_t currBufIdx = 0; + uint8_t currWritingIdxPtr = 0; + SizedPtr* curBuf = NULL; + while (currWritingIdxPtr != 0xFF) { - printf("Unable to create income queue: %s\n", errbuf); - abort(); - } - - ptQueue* outcomeQueue = ptQueueCreate(errbuf); - if(outcomeQueue == NULL) - { - printf("Unable to create outcome queue: %s\n", errbuf); - ptQueueFree(incomeQueue); - abort(); - } - - ClientContext* cctx = malloc(sizeof(ClientContext)); - if(cctx == NULL) - { - printf("Unable to allocate client context\n"); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - abort(); - } - cctx->clientId = client; - cctx->isAuthed = 0; - cctx->incomeQ = incomeQueue; - cctx->outcomeQ = outcomeQueue; - - ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); - if(ev == NULL) - { - printf("Unable to allocate register event"); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - free(cctx); - abort(); - } - ev->regType = REG_EVTYPE_CONNECT; - ev->ctx = cctx; - - ws_set_connection_context(client, cctx); - - ServerContext* ctx = ws_get_server_context(client); - - with_lock(&ctx->registerMutex) - { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if(exitCode) + if(curBuf != NULL) { - printf("Unable to push to reg queue: %s\n", errbuf); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - free(cctx); - abort(); + OutgoingMessage* messages = curBuf->ptr; + for(size_t i = 0; i < curBuf->size; i++) + { + + + OutgoingMessage *outMsg = &messages[i]; + if(outMsg->msg == NULL && outMsg->msgLen != 0) + { + panic("Got double read on buf %d while writing on buf %d\n", currBufIdx, currWritingIdxPtr); + } + + ClientContext* client = outMsg->client; + + SizedPtr* fallbackMsgQueuePtr = client->fallbackOutcomeQ; + OutgoingMessage* fallbackMsgQueue = (OutgoingMessage*)fallbackMsgQueuePtr->ptr; + + // try to deliver failed message + + if (ws_get_state(client->clientId) == WS_STATE_OPEN) + { + + while(client->orphanedAt != 0 && client->fallbackOutcomeQ->size > client->fallbackOutcomeQPadding) + { + printf("Trying to resend clients fallbacked packets\n"); + OutgoingMessage* fallbackedMessage = &fallbackMsgQueue[client->fallbackOutcomeQPadding]; + int written = ws_sendframe_bin(client->clientId, (const char*)fallbackedMessage->msg, fallbackedMessage->msgLen); + if(written == -1) + { + break; + } + + free(fallbackedMessage->msg); + fallbackedMessage->msg = NULL; + client->fallbackOutcomeQPadding++; + } + } + + // check for all delivered + if(client->fallbackOutcomeQPadding >= client->fallbackOutcomeQ->size) + { + // clear fallback queue if all delivered + client->fallbackOutcomeQ->size = 0; + + // was a fluke, remove orphaned time + client->orphanedAt = 0; + } + + // try to send original message + int written = -1; + + + if(outMsg->msg == NULL) + { + printf("removed client %lu\n", client->seatId); + for(size_t j = 0; j < client->fallbackOutcomeQ->size; j++) + { + free(fallbackMsgQueue[j].msg); + } + free(client->fallbackOutcomeQ->ptr); + free(client->fallbackOutcomeQ); + ptQueueFree(client->incomeQ); + free(client); + continue; + } + + + if(client->orphanedAt == 0) + { + // printf("send is allowed for client %lu (seat %lu)\n", client->clientId, client->seatId); + // try only with an active connection + written = ws_sendframe_bin(client->clientId, (const char*)outMsg->msg, outMsg->msgLen); + } + // else + // { + // printf("no send is allowed for client %lu (seat %lu)\n", client->clientId, client->seatId); + // } + + if(written == -1) + { + // printf("No data was sent for client %lu (seat %lu)\n", client->clientId, client->seatId); + // dispatch message to failed if not sent + if(fallbackMsgQueuePtr->size + 1 < fallbackMsgQueuePtr->allocatedSize) + { + fallbackMsgQueue[fallbackMsgQueuePtr->size] = *outMsg; + fallbackMsgQueuePtr->size++; + } + + // set orphaned time + if(client->orphanedAt == 0 && fallbackMsgQueuePtr->size > 8) + { + printf("Set client as orphaned\n"); + client->orphanedAt = (uint64_t)time(NULL); + } + } + else + { + free(outMsg->msg); + outMsg->msg = NULL; + } + } + curBuf->size = 0; + atomic_store(&outBufs->readRequestIdx, currBufIdx); + + currBufIdx++; + if(currBufIdx >= bufsCount) + { + currBufIdx = 0; + } + } + + currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx); + if(currBufIdx == currWritingIdxPtr) + { + atomic_store(&outBufs->readRequestIdx, currBufIdx); + if(curBuf == NULL) + { + my_sleep(100); + } + else + { + curBuf = NULL; + } + } + else + { + curBuf = &outBufs->bufs[currBufIdx]; + } + } + pthread_exit(0); +} + + +void mockDevice0(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) +{ + for(size_t i = 0; i < 8; i++) + { + readAddrs[*readAddrsLen] = i; + *readAddrsLen += 1; + } + + uint64_t num = decodeBytesToU64(mem); + num += 1; + encodeUintToBytes(num, mem); + + for(size_t i = 0; i < 8; i++) + { + writeAddrs[*writeAddrsLen] = i; + *writeAddrsLen += 1; + } +} + +void mockDevice1(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) +{ + +} + +void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, uint8_t* mem) +{ + size_t mlen = 0; + uint8_t* msg = createStreamSegmentPush(reg->mode, reg->regId, *emulContext->clockCounter, mem + reg->startAddr, reg->segLen, &mlen); + dispatchOutgoingMessage(emulContext->outBufs, reg->clientContext, msg, mlen); +} + +void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, uint8_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode) +{ + if(deviceRegs->regCount == 0) + { + return; + } + StreamReg** dispatchRegs = alloca(sizeof(StreamReg*) * deviceRegs->regCount); + NULL_GUARD(dispatchRegs); + size_t dispatchRegsCnt = 0; + + for(size_t i = 0; i < addrsLen; i++) + { + uint64_t addr = addrs[i]; + for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + { + StreamReg* reg = &deviceRegs->regs[regIdx]; + if(reg->mode == mode) + { + if(reg->startAddr <= addr && reg->startAddr + reg->segLen >= addr) + { + uint8_t isDuplicate = 0; + for(size_t j = 0; j < dispatchRegsCnt; j++) + { + if(dispatchRegs[j] == reg) + { + isDuplicate = 1; + break; + } + } + if(!isDuplicate) + { + dispatchRegs[dispatchRegsCnt] = reg; + dispatchRegsCnt++; + } + } + } } } - char *cli; - cli = ws_getaddress(client); - printf("Connection opened, addr: %s\n", cli); + for(size_t i = 0; i < dispatchRegsCnt; i++) + { + dispatchStreamSegment(emulContext, dispatchRegs[i], mem); + } } -/** - * @brief This function is called whenever a connection is closed. - * @param client Client connection. - */ -void onclose(ws_cli_conn_t client) -{ - char errbuf[1024]; - - ClientContext* cctx = ws_get_connection_context(client); - if(cctx == NULL) - { - printf("Unable to get client context\n"); - abort(); - } - - - ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); - if(ev == NULL) - { - printf("Unable to allocate register event"); - abort(); - } - ev->regType = REG_EVTYPE_CONNECT; - ev->ctx = cctx; - - ws_set_connection_context(client, cctx); - - ServerContext* ctx = ws_get_server_context(client); - - with_lock(&ctx->registerMutex) - { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if(exitCode) - { - printf("Unable to push to reg queue: %s\n", errbuf); - abort(); - } - } - - - pthread_t tid = pthread_self(); - char *cli; - cli = ws_getaddress(client); - printf("Connection closed, addr: %s, thread id: %lu\n", cli, tid); -} - -/** - * @brief Message events goes here. - * @param client Client connection. - * @param msg Message content. - * @param size Message size. - * @param type Message type. - */ -void onmessage(ws_cli_conn_t client, - const unsigned char *msg, uint64_t size, int type) -{ - ClientContext* cctx = ws_get_connection_context(client); - if(cctx == NULL) - { - printf("Unable to get client context\n"); - abort(); - } - - pthread_t tid = pthread_self(); - char *cli; - cli = ws_getaddress(client); - printf("I receive a message: %s (%zu), from: %s, thread id: %lu\n", msg, - size, cli, tid); - - ws_sendframe_txt(client, "hello"); - ws_sendframe_txt(client, "world"); -} - - int main(int argc, char** argv) { char errbuf[1024]; @@ -306,655 +288,214 @@ int main(int argc, char** argv) pthread_mutex_init(&mtx, NULL); ptQueue* regQ = ptQueueCreate(errbuf); - if(regQ == NULL) + NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf); + + size_t deviceCount = 2; + + DeviceSegStreamReg* device0SegStreamRegs = malloc(sizeof(DeviceSegStreamReg) * 4); + NULL_GUARD(device0SegStreamRegs); + + device0SegStreamRegs->allocatedSize = 0; + device0SegStreamRegs->regCount = 0; + device0SegStreamRegs->regs = NULL; + + DeviceSegStreamReg* device1SegStreamRegs = malloc(sizeof(DeviceSegStreamReg) * 4); + NULL_GUARD(device1SegStreamRegs); + + + device1SegStreamRegs->allocatedSize = 0; + device1SegStreamRegs->regCount = 0; + device1SegStreamRegs->regs = NULL; + + DeviceSegStreamReg** deviceStreamRegs = malloc(sizeof(DeviceSegStreamReg*) * deviceCount); + NULL_GUARD(deviceStreamRegs); + + deviceStreamRegs[0] = device0SegStreamRegs; + deviceStreamRegs[1] = device1SegStreamRegs; + + uint8_t emulState = EMUL_STATE_STILL; + + + uint8_t* device0mem = malloc(sizeof(uint8_t) * 1024 * 1024); + NULL_GUARD(device0mem); + + uint8_t* device1mem = malloc(sizeof(uint8_t) * 1024 * 1024); + NULL_GUARD(device1mem); + + uint8_t** devicesMem = malloc(sizeof(uint8_t*) * deviceCount); + NULL_GUARD(devicesMem); + devicesMem[0] = device0mem; + devicesMem[1] = device1mem; + + uint64_t* device0readAddrs = malloc(sizeof(uint64_t) * 1024); + NULL_GUARD(device0readAddrs); + size_t device0readAddrsLen = 0; + uint64_t* device0writeAddrs = malloc(sizeof(uint64_t) * 1024); + NULL_GUARD(device0writeAddrs); + size_t device0writeAddrsLen = 0; + + + uint64_t* device1readAddrs = malloc(sizeof(uint64_t) * 1024); + NULL_GUARD(device1readAddrs); + size_t device1readAddrsLen = 0; + uint64_t* device1writeAddrs = malloc(sizeof(uint64_t) * 1024); + NULL_GUARD(device1writeAddrs); + size_t device1writeAddrsLen = 0; + + + uint8_t outBufsCount = 16; + SizedPtr* bufs = malloc(sizeof(SizedPtr) * outBufsCount); + NULL_GUARD(bufs); + + for(size_t i = 0; i < outBufsCount; i++) { - printf("Unable to create reg q: %s\n", errbuf); - abort(); + OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 128); + NULL_GUARD(messages); + bufs[i].ptr = messages; + bufs[i].allocatedSize = 128; + bufs[i].size = 0; + + for(size_t j = 0; j < bufs[i].allocatedSize; j++) + { + messages[j].msgLen = 0xFFFF; + messages[j].client = NULL; + messages[j].msg = NULL; + } } - ServerContext ctx = { - mtx, - regQ + OutgoingBuffers outBufs = { + bufs, + outBufsCount, + 0, + 0 }; + LinkedListEntry* clientsLinkedListHead = NULL; + + uint64_t clockCounter = 0; + + uint8_t utilizedFlag = 0; + EmulContext emulContext = { + &emulState, + &clockCounter, + &clientsLinkedListHead, + &utilizedFlag, + &outBufs, + deviceStreamRegs, + devicesMem, + deviceCount + }; + + + uint8_t access_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + + ServerContext ctx = { + mtx, + regQ, + (uint8_t*)&access_token, + &emulContext, + 0 + }; + + pthread_t outTid; + pthread_attr_t outAttr; + + pthread_attr_init(&outAttr); + pthread_create(&outTid, &outAttr, outgoingMain, &outBufs); + pthread_detach(outTid); + + const uint64_t pingTimeoutMs = 1000; + ws_socket(&(struct ws_server){ - /* - * Bind host, such as: - * localhost -> localhost/127.0.0.1 - * 0.0.0.0 -> global IPv4 - * :: -> global IPv4+IPv6 (Dual stack) - */ .host = "localhost", - .port = 8080, + .port = 8181, .thread_loop = 1, - .timeout_ms = 1000, + .timeout_ms = pingTimeoutMs, .context = &ctx, - .evs.onopen = &onopen, - .evs.onclose = &onclose, - .evs.onmessage = &onmessage + .evs.onopen = &onWsOpen, + .evs.onclose = &onWsClose, + .evs.onmessage = &onWsMessage }); + ptQueueElem* regQueueTail = regQ->tail; + + uint64_t lastPingAt = 0; + while(1) { - sleep(1); + + if(((uint64_t)time(NULL) - lastPingAt) * 1000 > pingTimeoutMs) + { + ws_ping(0, 3); + lastPingAt = (uint64_t)time(NULL); + } + + ClientRegistrationEvent* payload = regQueueTail->payload; + if(payload != NULL) + { + printf("Got reg queue data\n"); + handleRegEvent(&emulContext, payload); + regQueueTail = regQueueTail->nextEl; + utilizedFlag = 1; + } + + + handleAllClients(&emulContext); + + uint8_t readReqIdx = atomic_load(&outBufs.readRequestIdx); + if(readReqIdx == outBufs.currWritingIdx || outBufs.bufs[outBufs.currWritingIdx].size >= outBufs.bufs[outBufs.currWritingIdx].allocatedSize / 2) + { + uint8_t oldWriteIdx = outBufs.currWritingIdx; + uint8_t newWriteIdx = outBufs.currWritingIdx + 1; + if(outBufs.bufs[outBufs.currWritingIdx].size != 0 || readReqIdx != newWriteIdx) + { + if(newWriteIdx >= outBufs.buffersCount) + { + newWriteIdx = 0; + } + + while(readReqIdx == newWriteIdx) + { + my_sleep(1000); + readReqIdx = atomic_load(&outBufs.readRequestIdx); + } + atomic_store(&outBufs.currWritingIdx, newWriteIdx); + outBufs.bufs[outBufs.currWritingIdx].size = 0; + } + } + + if(emulState == EMUL_STATE_EXEC) + { + device0readAddrsLen = 0; + device0writeAddrsLen = 0; + + device1readAddrsLen = 0; + device1writeAddrsLen = 0; + + mockDevice0(device0mem, device0readAddrs, &device0readAddrsLen, device0writeAddrs, &device0writeAddrsLen); + mockDevice1(device1mem, device1readAddrs, &device1readAddrsLen, device1writeAddrs, &device1writeAddrsLen); + + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[0], device0mem, device0readAddrs, device0readAddrsLen, STREAM_MODE_READ); + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[0], device0mem, device0writeAddrs, device0writeAddrsLen, STREAM_MODE_WRITE); + + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[1], device1mem, device1readAddrs, device1readAddrsLen, STREAM_MODE_READ); + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[1], device1mem, device1writeAddrs, device1writeAddrsLen, STREAM_MODE_WRITE); + + clockCounter++; + } + else if(!utilizedFlag) + { + my_sleep(100); + } + } + + atomic_store(&outBufs.currWritingIdx, 0xFF); + pthread_join(outTid, NULL); + + + return 0; } - -// int main(int argc, char** argv) -// { -// char errbuf[1024]; - -// pthread_t tid; /* идентификатор потока */ -// pthread_attr_t attr; /* отрибуты потока */ - - -// ptQueue* qOut = ptQueueCreate(errbuf); -// ptQueue* qIn = ptQueueCreate(errbuf); - -// ptQueue** ptArgs = malloc(sizeof(ptQueue*) * 2); -// if(ptArgs == NULL) -// { -// abort(); -// } -// ptArgs[0] = qOut; -// ptArgs[1] = qIn; - -// pthread_attr_init(&attr); -// pthread_create(&tid, &attr, threadMain, ptArgs); - - -// for(int i = 1; i < 4096; i++) -// { -// int* pass = malloc(sizeof(int)); -// if(pass == NULL) -// { -// abort(); -// } -// *pass = i; - -// ptQueuePush(qOut, pass, errbuf); -// } - -// int* ii = malloc(sizeof(int)); -// if(ii == NULL) -// { -// abort(); -// } -// *ii = 0; - -// ptQueuePush(qOut, ii, errbuf); - -// ptQueueElem* tailIn = qIn->tail; - -// while (1) -// { -// void* payload = atomic_load(&(tailIn->payload)); -// if(payload == NULL) -// { -// my_sleep(100); -// continue; -// } -// int* code = payload; - -// printf("nextEl)); -// if (newTail == NULL) -// { -// abort(); -// } -// free(tailIn); -// tailIn = newTail; -// } - -// pthread_join(tid, NULL); -// return 0; -// } - - - -// void *threadMain(void *param) -// { -// char errubf[1024]; -// ptQueue** params = param; -// ptQueue* qIn = params[0]; -// ptQueue* qOut = params[1]; -// ptQueueElem* tailIn = atomic_load(&(qIn->head)); - -// while (1) -// { -// void* payload = atomic_load(&(tailIn->payload)); -// if(payload == NULL) -// { -// my_sleep(100); -// continue; -// } -// int* code = payload; - -// printf(">Done reading %d\n", *code); - - -// int* pass = malloc(sizeof(int)); -// if(pass != NULL) -// { -// *pass = *code; -// ptQueuePush(qOut, pass, errubf); -// } - -// if(*code == 0) -// { -// free(payload); -// break; -// } -// free(payload); -// ptQueueElem* newTail = atomic_load(&(tailIn->nextEl)); -// if (newTail == NULL) -// { -// abort(); -// } -// free(tailIn); -// tailIn = newTail; - -// } - -// for(int i = 4096; i >= 0; i--) -// { -// int* pass = malloc(sizeof(int)); -// if(pass == NULL) -// { -// continue; -// } -// *pass = i; -// ptQueuePush(qOut, pass, errubf); -// } - -// pthread_exit(0); -// } - -// int _main(int argc, char** argv) -// { -// char errbuf[4096] = {0}; -// compose_dev_conf_t* devConf = openComposeDeviceConfig("/home/nikto_b/Documents/baum/hmmmm/glob.toml", errbuf); -// if(devConf == NULL) -// { -// printf("Err: %s\n", errbuf); -// return 1; -// } - -// device_handle_t** devices = openComposeDevice(devConf, errbuf); -// if(devices == NULL) -// { -// printf("Err: %s\n", errbuf); -// return 1; -// } - -// uint8_t failed = 0; - - -// if(devices[0] != NULL) -// { -// uint64_t readSpecsCount = 0; -// uint64_t writeSpecsCount = 0x20; - -// smart_read_spec_t* readSpecs = malloc(sizeof(smart_read_spec_t) * readSpecsCount); -// smart_write_spec_t* writeSpecs = malloc(sizeof(smart_write_spec_t) * writeSpecsCount); - -// if (writeSpecs == NULL && writeSpecsCount != 0) -// { -// return 1; -// } -// if (readSpecs == NULL && readSpecsCount != 0) -// { -// return 1; -// } - -// for (size_t i = 0; i < writeSpecsCount; i++) -// { -// writeSpecs[i].addr = 0; -// writeSpecs[i].segno = 2; -// writeSpecs[i].ident = 0; -// writeSpecs[i].localAddr = i; -// writeSpecs[i].addrType = SMART_ADDR_TYPE_SEGMENTED; -// writeSpecs[i].handler = dummyWriteHandler; -// } - -// devices[0]->lib->fillSmartReadSpecs(devices[0]->specs, readSpecs, readSpecsCount); -// devices[0]->lib->fillSmartWriteSpecs(devices[0]->specs, writeSpecs, writeSpecsCount); -// } - - -// size_t devIdx = 0; -// while(devices[devIdx] != NULL) -// { -// device_handle_t* dev = devices[devIdx]; -// dev->ctx = dev->lib->init(dev->specs, errbuf); - -// if(dev->ctx == NULL) -// { -// printf("Unable to init device %lu: %s\n", devIdx, errbuf); -// memset(errbuf, 0, strlen(errbuf)); -// failed = 1; -// } -// devIdx++; -// } - -// if(failed) -// { -// for(size_t i = 0; i < devIdx; i++) -// { -// closeBaseDevice(devices[i]); -// } -// free(devices); -// return 1; -// } - -// size_t projectIdx = 0; -// // while(devConf->projections[projectIdx] != NULL) -// // { -// // projection_conf_t* projection = devConf->projections[projectIdx]; - -// // size_t foundBaseDevId = (size_t)~0; - -// // for(size_t i = 0; i < devIdx; i++) -// // { -// // conf_dev_t* cnf = devConf->baseConfigs[i]; -// // if(compareComposeId(cnf->id, projection->baseAt)) -// // { -// // foundBaseDevId = i; -// // break; -// // } -// // } - -// // if(foundBaseDevId == (size_t)~0) -// // { -// // printf("Unable to find projection base: "); -// // for(size_t i = 0; projection->baseAt[i] != NULL; i++) -// // { -// // printf("->%s", projection->baseAt[i]); -// // } -// // printf("\n"); -// // return 1; -// // } - -// // size_t foundTargetDevId = (size_t)~0; - -// // for(size_t i = 0; i < devIdx; i++) -// // { -// // conf_dev_t* cnf = devConf->baseConfigs[i]; -// // if(compareComposeId(cnf->id, projection->target + 1)) -// // { -// // foundTargetDevId = i; -// // break; -// // } -// // } - -// // if(foundTargetDevId == (size_t)~0) -// // { -// // printf("Unable to find projection target: "); -// // for(size_t i = 0; projection->target[i] != NULL; i++) -// // { -// // printf("->%s", projection->target[i]); -// // } -// // printf("\n"); -// // return 1; -// // } - -// // uint16_t baseDevMemSpecCount = 0; -// // while(devConf->baseConfigs[foundBaseDevId]->memConf->memSegConfs[baseDevMemSpecCount] != NULL){baseDevMemSpecCount++;} - -// // uint16_t foundBaseSegno = (uint16_t)~0; - -// // for(uint16_t i = 0; i < baseDevMemSpecCount; i++) -// // { -// // if(strcmp(devices[foundBaseDevId]->ctx->deviceMem->memsegNames[i], projection->baseSeg) == 0) -// // { -// // foundBaseSegno = i; -// // break; -// // } -// // } - -// // if(foundBaseSegno == (uint16_t)~0) -// // { -// // printf("Unable to find projection segment %s for base: ", projection->baseSeg); -// // for(size_t i = 0; projection->baseAt[i] != NULL; i++) -// // { -// // printf("->%s", projection->baseAt[i]); -// // } -// // printf("\n"); -// // return 1; -// // } - -// // uint16_t targetDevMemSpecCount = 0; -// // while(devConf->baseConfigs[foundTargetDevId]->memConf->memSegConfs[targetDevMemSpecCount] != NULL){targetDevMemSpecCount++;} - -// // uint16_t foundTargetSegno = (uint16_t)~0; - -// // for(uint16_t i = 0; i < targetDevMemSpecCount; i++) -// // { -// // if(strcmp(devices[foundTargetDevId]->ctx->deviceMem->memsegNames[i], projection->target[0]) == 0) -// // { -// // foundTargetSegno = i; -// // break; -// // } -// // } - -// // if(foundTargetSegno == (uint16_t)~0) -// // { -// // printf("Unable to find projection segment for target: "); -// // for(size_t i = 0; projection->target[i] != NULL; i++) -// // { -// // printf("->%s", projection->target[i]); -// // } -// // printf("\n"); -// // return 1; -// // } - - -// // void* foundBaseAt = devices[foundBaseDevId]->ctx->deviceMem->cells[foundBaseSegno]; - -// // foundBaseAt += projection->projectionShift; - -// // devices[foundTargetDevId]->ctx->deviceMem->cells[foundTargetSegno] = foundBaseAt; - - -// // projectIdx++; -// // } - -// FILE *fp = fopen ("/home/nikto_b/Documents/baum/avr_selftests/test.bin", "rb"); -// if (!fp) { -// fprintf (stderr, "error: file open failed.\n"); -// return 1; -// } - -// #define BUFSZ 1 - -// uint8_t buf[BUFSZ] = {0}; -// size_t bytes = 0, i, readsz = sizeof(buf); - -// size_t addr = 0; -// while ((bytes = fread(buf, sizeof(*buf), readsz, fp)) == readsz) -// { -// for (i = 0; i < readsz; i++) -// { -// ((uint8_t*)(devices[0]->ctx->deviceMem->rawCells))[addr] = buf[i]; -// addr += 1; -// } -// } - -// fclose(fp); - - -// // uint64_t ticks_limiter = 16 * 1000 * 1000 * 100; -// // uint64_t ticks_limiter = 40; -// uint64_t cycles_limiter = 18; -// uint64_t ticks = 0; -// uint64_t cycles = 0; - -// // instruction_simul_handlers_t* instr_handl = (instruction_simul_handlers_t*)dev->lib->extendedHandlers; - -// // uint16_t* pcounter = (uint16_t*)instr_handl->extractPcounterPtr(dev->ctx); - - -// struct timeval start; -// gettimeofday(&start, NULL); - -// // uint16_t prevOp = 0; -// // uint16_t opStrikeCount = 0; - -// printf("Init sequence done\n"); - -// while (cycles < cycles_limiter) -// { -// for(size_t i = 0; i < 1; i++) -// { -// device_handle_t* dev = devices[i]; -// dev->ctx->deviceMem->memreadLen = 0; -// dev->ctx->deviceMem->memwriteLen = 0; - -// // uint16_t op = ((uint16_t*)(devInfo->deviceMem->cells[0]))[*(pcounter)]; -// // if (op != prevOp) -// // { -// // opStrikeCount = 0; -// // } -// // else -// // { -// // opStrikeCount++; -// // if(opStrikeCount > 0xFFF) -// // { -// // printf("Found endless loop\n"); -// // break; -// // } -// // } -// // prevOp = op; -// // printf("Executing at 0x%04X: 0x%04X\n", (*(pcounter)), op); - -// ticks += dev->lib->makeDeviceTick(dev->ctx); -// // ticks += makeDeviceTick(devInfo); - -// } - -// cycles += 1; - - -// // for (size_t i = 0; i < devInfo->deviceMem->memwriteLen; i++) -// // { -// // printf("Got write [0x%04zuX] at 0x%04lX\n", i, devInfo->deviceMem->memwriteCellAddrs[i]); -// // } -// } -// struct timeval end; - -// gettimeofday(&end, NULL); - -// long seconds = end.tv_sec - start.tv_sec; -// long microseconds = end.tv_usec - start.tv_usec; - -// long long total_microseconds = seconds * 1000000 + microseconds; -// // double rate = 0; -// // if (total_microseconds > 0) -// // { -// // rate = ((double)total_microseconds) / ((double)cycles); -// // } - - - - -// // fflush(stdout); -// // printf("Rate: %f us/cycle (%f clock/us)\n", rate, ((double)ticks) / ((double)total_microseconds)); - - - -// printf("Execution consumed %lu ticks, %lu iterations and %llu us\n", ticks, cycles, total_microseconds); - - - -// return 0; -// } - -// int main(int argc, char** argv) -// { - -// char errbuf[4096];+ - -// device_handle_t* dev = openBaseDevice("/home/nikto_b/Documents/baum/hmmmm/devices/avr_generic/AVRrc.toml", errbuf); -// if (dev == NULL) -// { -// printf("Unable to open device: %s\n", errbuf); -// return 1; -// } - -// uint64_t readSpecsCount = 0; -// uint64_t writeSpecsCount = 0x20; - -// smart_read_spec_t* readSpecs = malloc(sizeof(smart_read_spec_t) * readSpecsCount); -// smart_write_spec_t* writeSpecs = malloc(sizeof(smart_write_spec_t) * writeSpecsCount); - -// if (writeSpecs == NULL && writeSpecsCount != 0) -// { -// return 1; -// } -// if (readSpecs == NULL && readSpecsCount != 0) -// { -// return 1; -// } - -// for (size_t i = 0; i < writeSpecsCount; i++) -// { -// writeSpecs[i].addr = 0; -// writeSpecs[i].segno = 2; -// writeSpecs[i].localAddr = i; -// writeSpecs[i].addrType = SMART_ADDR_TYPE_SEGMENTED; -// writeSpecs[i].handler = dummyWriteHandler; -// } - -// dev->lib->fillSmartReadSpecs(dev->specs, readSpecs, readSpecsCount); -// dev->lib->fillSmartWriteSpecs(dev->specs, writeSpecs, writeSpecsCount); - - -// dev->ctx = dev->lib->init(dev->specs, errbuf); - -// if (dev->ctx == NULL) -// { -// printf("Unable to init device: %s\n", errbuf); -// return 1; -// } - - -// FILE *fp = fopen ("/home/nikto_b/Documents/baum/avr_selftests/test.bin", "rb"); -// if (!fp) { -// fprintf (stderr, "error: file open failed.\n"); -// return 1; -// } - -// #define BUFSZ 1 - -// uint8_t buf[BUFSZ] = {0}; -// size_t bytes = 0, i, readsz = sizeof(buf); - -// size_t addr = 0; -// while ((bytes = fread(buf, sizeof(*buf), readsz, fp)) == readsz) -// { -// for (i = 0; i < readsz; i++) -// { -// ((uint8_t*)(dev->ctx->deviceMem->rawCells))[addr] = buf[i]; -// addr += 1; -// } -// } - -// fclose(fp); - -// printf("Done writing %zu bytes from dump file\n", addr); - - - -// uint64_t ticks_limiter = 16 * 1000 * 1000 * 100; -// // uint64_t ticks_limiter = 64; -// uint64_t ticks = 0; -// uint64_t cycles = 0; - -// instruction_simul_handlers_t* instr_handl = (instruction_simul_handlers_t*)dev->lib->extendedHandlers; - -// uint16_t* pcounter = (uint16_t*)instr_handl->extractPcounterPtr(dev->ctx); - - -// struct timeval start; -// gettimeofday(&start, NULL); - -// // uint16_t prevOp = 0; -// // uint16_t opStrikeCount = 0; - -// while (ticks < ticks_limiter) -// { -// dev->ctx->deviceMem->memreadLen = 0; -// dev->ctx->deviceMem->memwriteLen = 0; - -// // uint16_t op = ((uint16_t*)(devInfo->deviceMem->cells[0]))[*(pcounter)]; -// // if (op != prevOp) -// // { -// // opStrikeCount = 0; -// // } -// // else -// // { -// // opStrikeCount++; -// // if(opStrikeCount > 0xFFF) -// // { -// // printf("Found endless loop\n"); -// // break; -// // } -// // } -// // prevOp = op; -// // printf("Executing at 0x%04X: 0x%04X\n", (*(pcounter)), op); - -// ticks += dev->lib->makeDeviceTick(dev->ctx); -// // ticks += makeDeviceTick(devInfo); - - -// cycles += 1; - - -// // for (size_t i = 0; i < devInfo->deviceMem->memwriteLen; i++) -// // { -// // printf("Got write [0x%04zuX] at 0x%04lX\n", i, devInfo->deviceMem->memwriteCellAddrs[i]); -// // } -// } -// struct timeval end; - -// gettimeofday(&end, NULL); - -// long seconds = end.tv_sec - start.tv_sec; -// long microseconds = end.tv_usec - start.tv_usec; - -// long long total_microseconds = seconds * 1000000 + microseconds; -// // double rate = 0; -// // if (total_microseconds > 0) -// // { -// // rate = ((double)total_microseconds) / ((double)cycles); -// // } - - - - -// // fflush(stdout); -// // printf("Rate: %f us/cycle (%f clock/us)\n", rate, ((double)ticks) / ((double)total_microseconds)); - - - -// printf("Execution consumed %lu ticks, %lu cycles and %llu us\n", ticks, cycles, total_microseconds); - -// printf("general purpose registers:\n"); -// // printMemory(devInfo->deviceMem->cells[1], 32); - - -// // printf("SREG: 0x%016X\n", ((io_reg_cell_t*)devInfo->deviceMem->cells[2])[0x3F]); - -// // char* buf = foo(); -// // if (buf == NULL) { -// // return 1; -// // } -// // for (uint8_t i = 0; i < 32; i++) { -// // printf("%c", buf[i]); -// // } - -// // printf("\n"); - - -// // dlclose(handle); - -// return 0; -// } diff --git a/src/panic.c b/src/panic.c new file mode 100644 index 0000000..90af8d8 --- /dev/null +++ b/src/panic.c @@ -0,0 +1,21 @@ +#include "panic.h" + +void print_stacktrace(FILE* fd) +{ + void *buffer[PANIC_STRACE_LEN]; + int size = backtrace(buffer, PANIC_STRACE_LEN); + char **symbols = backtrace_symbols(buffer, size); + + if (symbols == NULL) { + fprintf(fd, "backtrace_symbols"); + abort(); + } + + fprintf(fd, "\n\nStack trace:\n"); + for (int i = 0; i < size; i++) + { + fprintf(fd, "%s\n", symbols[i]); + } + + free(symbols); +} \ No newline at end of file diff --git a/src/proto/dial.c b/src/proto/dial.c new file mode 100644 index 0000000..846cb04 --- /dev/null +++ b/src/proto/dial.c @@ -0,0 +1,46 @@ +#include "proto/dial.h" + +#include + +#include "context.h" +#include "panic.h" +#include "proto/msg.h" + + +void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) +{ + LinkedListEntry* clientsHead = *emulContext->clientsHead; + while(clientsHead != NULL) + { + uint8_t* newMsg = malloc(sizeof(uint8_t) * msgLen); + NULL_GUARD(newMsg); + memcpy(newMsg, msg, msgLen); + + ClientContext* ctx = clientsHead->payload; + dispatchOutgoingMessage(emulContext->outBufs, ctx, newMsg, msgLen); + clientsHead = clientsHead->nextEntry; + } + free(msg); +} + + + + + +void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ClientContext* client, uint8_t* msg, size_t msgLen) +{ + SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx]; + if(p->size + 1 >= p->allocatedSize) + { + printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx); + OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 2); + NULL_GUARD(newPtr); + p->ptr = newPtr; + p->allocatedSize = p->allocatedSize * 2; + } + OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size]; + outmsg->msg = msg; + outmsg->msgLen = msgLen; + outmsg->client = client; + p->size++; +} \ No newline at end of file diff --git a/src/proto/handlers.c b/src/proto/handlers.c new file mode 100644 index 0000000..01304e9 --- /dev/null +++ b/src/proto/handlers.c @@ -0,0 +1,199 @@ +#include "proto/handlers.h" + +#include +#include "proto/enums.h" +#include "panic.h" + +#include "proto/handlers/auth.h" +#include "proto/handlers/stream.h" +#include "proto/handlers/control.h" +#include "proto/handlers/mem.h" +#include "state.h" + + +void handleCloseClient(EmulContext* emulContext, LinkedListEntry* clientEntry) +{ + ClientContext* ctx = clientEntry->payload; + + if(ctx->orphanedAt == 0) + { + ctx->orphanedAt = (uint64_t)time(NULL); + } + + // if(ctx->streamRegIterator > 0) + // { + // unregisterClientStreams(emulContext, ctx); + // } + + // if(ctx->streamRegIterator == 0) + // { + // removeLinkedListEntry(emulContext->clientsHead, clientEntry); + // ctx->clientId = 0; + // free(ctx->fallbackOutcomeQ->ptr); + // free(ctx->fallbackOutcomeQ); + // ptQueueFree(ctx->incomeQ); + // free(ctx); + // } + + +} + + +void handleRegEvent(EmulContext* emulContext, ClientRegistrationEvent* ev) +{ + if(ev->regType == REG_EVTYPE_CONNECT) + { + printf("open client %lu\n", ev->ctx->seatId); + LinkedListEntry* newClientsLinkedListHead = malloc(sizeof(LinkedListEntry)); + NULL_GUARD(newClientsLinkedListHead); + + newClientsLinkedListHead->payload = ev->ctx; + if(*emulContext->clientsHead != NULL) + { + (*emulContext->clientsHead)->prevEntry = newClientsLinkedListHead; + } + newClientsLinkedListHead->prevEntry = NULL; + newClientsLinkedListHead->nextEntry = *emulContext->clientsHead; + *emulContext->clientsHead = newClientsLinkedListHead; + } + else if (ev->regType == REG_EVTYPE_CLOSE) + { + LinkedListEntry* clientEntry = *emulContext->clientsHead; + + while(clientEntry != NULL) + { + if(clientEntry->payload == ev->ctx) + { + printf("close client %lu\n", ev->ctx->seatId); + handleCloseClient(emulContext, clientEntry); + break; + } + clientEntry = clientEntry->nextEntry; + } + } + else if (ev->regType == REG_EVTYPE_AUTH) + { + printf("auth client %lu\n", ev->ctx->seatId); + ev->ctx->isAuthed = 1; + handleOnClientAuthDone(ev->ctx, emulContext); + } +} + + + + +void handleIncomingMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + switch (msg->packetType) + { + case PACKET_TYPE_CTRL: + { + handleIncomingControlMessage(msg, ctx, emulContext); + break; + } + case PACKET_TYPE_STREAM: + { + handleIncomingStreamMessage(msg, ctx, emulContext); + break; + } + case PACKET_TYPE_MEM: + { + handleIncomingMemMessage(msg, ctx, emulContext); + break; + } + default: + { + printf("Unsupported packet type: %u\n", msg->packetType); + break; + } + } +} + + +void handleAllClients(EmulContext* emulContext) +{ + LinkedListEntry* clientEntry = *emulContext->clientsHead; + + size_t handleLimit = 128; + uint8_t hasAliveClients = 0; + + while(clientEntry != NULL && handleLimit > 0) + { + LinkedListEntry* currEntry = clientEntry; + handleLimit--; + ClientContext* ctx = clientEntry->payload; + + + + if(ctx->orphanedDeadTimeout != 0 && ctx->orphanedAt != 0 && ctx->isAuthed) + { + // printf("Found orphaned client\n"); + if((uint64_t)time(NULL) - ctx->orphanedAt > ctx->orphanedDeadTimeout || ctx->streamRegIterator == 0) + { + printf("Orphaned client dead timeout exceeded\n"); + if(ctx->streamRegIterator > 0) + { + unregisterClientStreams(emulContext, ctx); + } + clientEntry = clientEntry->nextEntry; + removeLinkedListEntry(emulContext->clientsHead, currEntry); + dispatchOutgoingMessage(emulContext->outBufs, ctx, NULL, 0); + + continue; + } + } + + if(!ctx->isAuthed) + { + clientEntry = disconnectDueTimeout(emulContext, clientEntry); + if(clientEntry == NULL) + { + break; + } + if(*emulContext->utilizedFlag) + { + continue; + } + } + else + { + if(ctx->orphanedAt == 0) + { + hasAliveClients = 1; + } + void* payload = ctx->incomeQ->head->payload; + if(payload != NULL) + { + *emulContext->utilizedFlag = 1; + BaseMessage* msg = payload; + printf("client %lu sent data: \nnonce %lu, ptype %4u, ph: %u\n", ctx->clientId, msg->nonce, msg->packetType, msg->payloadHeader); + + handleIncomingMessage(msg, ctx, emulContext); + + + free(payload); + ctx->incomeQ->head = ctx->incomeQ->head->nextEl; + } + clientEntry = clientEntry->nextEntry; + } + } + + if(hasAliveClients == 0) + { + uint8_t newEmulState = switchNewEmulState(*emulContext->emulState, EMUL_STATE_OP_PAUSE); + if(newEmulState != *emulContext->emulState) + { + printf("No alive clients, pausing execution\n"); + *emulContext->emulState = newEmulState; + + size_t len = 0; + uint8_t* notify = createControlNotifyMessage((uint64_t)~0, *emulContext->clockCounter, newEmulState, &len); + broadcastClients(emulContext, notify, len); + } + } +} + + + + + diff --git a/src/proto/handlers/auth.c b/src/proto/handlers/auth.c new file mode 100644 index 0000000..16c709a --- /dev/null +++ b/src/proto/handlers/auth.c @@ -0,0 +1,143 @@ +#include "proto/handlers/auth.h" +#include +#include +#include +#include + +#include "my_mutex.h" + +#include "proto/enums.h" +#include "events.h" +#include "panic.h" + + +#include "proto/pack.h" +#include "proto/dial.h" +#include "proto/handlers/control.h" + +uint8_t validateAccessTokenDeterministic(const uint8_t* data, const uint8_t* token, uint64_t timestamp) +{ + char buf[1024]; + sprintf(buf, "%s%lu", token, timestamp); + + uint8_t hash[SHA512_DIGEST_LENGTH]; + SHA512((uint8_t*)buf, strlen(buf), hash); + + uint8_t valid = 1; + + for(size_t i = 0; i < SHA512_DIGEST_LENGTH; i++) + { + if (data[i] != hash[i]) + { + valid = 0; + } + } + + return valid; +} + +uint8_t validateAccessToken(const uint8_t* data, const uint8_t* access_token) +{ + uint64_t t = (uint64_t)time(NULL) / 30; + + uint8_t valid1 = validateAccessTokenDeterministic(data, access_token, t); + uint8_t valid2 = validateAccessTokenDeterministic(data, access_token, t - 1); + + return valid1 || valid2; +} + + +uint8_t handle_auth(ClientContext* cctx, ws_cli_conn_t client, const uint8_t* msg, uint64_t msgSize, int msgType) +{ + ServerContext* ctx = ws_get_server_context(client); + if(msgSize != SHA512_DIGEST_LENGTH) + { + int ret = ws_close_client(client); + if(ret == -1) + { + printf("Unable to close client %lu\n", client); + } + return 0; + } + uint8_t isValid = validateAccessToken(msg, (const uint8_t*)ctx->accessToken); + if(!isValid) + { + printf("Auth token invalid\n"); + int ret = ws_close_client(client); + if(ret == -1) + { + printf("Unable to close client %lu\n", client); + } + return 0; + } + + char errbuf[1024]; + + printf("Auth token is valid\n"); + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + if(ev == NULL) + { + panic("Unable to allocate register event"); + } + ev->regType = REG_EVTYPE_AUTH; + ev->ctx = cctx; + + ws_set_connection_context(client, cctx); + + + with_lock(&ctx->registerMutex) + { + printf("Writing auth event\n"); + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + panic("Unable to push to reg queue: %s\n", errbuf); + } + } + return 1; +} + + +LinkedListEntry* disconnectDueTimeout(EmulContext* emulContext, LinkedListEntry* clientEntry) +{ + ClientContext* ctx = clientEntry->payload; + + uint64_t now = (uint64_t)time(NULL); + if(now - ctx->connectedAt <= 30) + { + return clientEntry->nextEntry; + } + + printf("Timeout on connection %lu\n", ctx->clientId); + int ret = ws_close_client(ctx->clientId); + if(ret == -1) + { + printf("Unable to close client\n"); + } + LinkedListEntry* nextEntry = clientEntry->nextEntry; + + removeLinkedListEntry(emulContext->clientsHead, clientEntry); + clientEntry = nextEntry; + *emulContext->utilizedFlag = 1; + return clientEntry; +} + +void handleOnClientAuthDone(ClientContext* ctx, EmulContext* emulContext) +{ + uint8_t* framedata = malloc(sizeof(uint8_t) * 8); + NULL_GUARD(framedata); + + encodeUintToBytes(ctx->seatId, framedata); + + dispatchOutgoingMessage(emulContext->outBufs, ctx, framedata, 8); + + + size_t len = 0; + uint8_t* msg = createControlNotifyMessage((uint64_t)~0, *emulContext->clockCounter, *emulContext->emulState, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx, msg, len); + + + uint8_t* outMsg = createClientSetup((uint64_t)~0, ctx, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, len); +} \ No newline at end of file diff --git a/src/proto/handlers/control.c b/src/proto/handlers/control.c new file mode 100644 index 0000000..cf23515 --- /dev/null +++ b/src/proto/handlers/control.c @@ -0,0 +1,199 @@ +#include "proto/handlers/control.h" + +#include "proto/pack.h" +#include "proto/enums.h" +#include "panic.h" +#include "state.h" + +// #include + + +void handleIncomingExecControl(BaseMessage* msg, EmulContext* emulContext) +{ + printf("ctrl exec\n"); + uint8_t stateOp = ((const uint8_t*)(msg->payload))[0]; + printf("state operation: %u\n", stateOp); + uint8_t newEmulState = switchNewEmulState(*emulContext->emulState, stateOp); + if(newEmulState != *emulContext->emulState) + { + printf("Switch state %u -> %u\n", *emulContext->emulState, newEmulState); + *emulContext->emulState = newEmulState; + + size_t len = 0; + uint8_t* notify = createControlNotifyMessage(msg->nonce, *emulContext->clockCounter, newEmulState, &len); + broadcastClients(emulContext, notify, len); + } +} + + +void handleIncomingEditConnectionSettingsControl(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint64_t fallbackQLen = decodeBytesToU64(msg->payload); + uint64_t orphanedDeadTimeout = decodeBytesToU64(msg->payload + 8); + + ctx->orphanedDeadTimeout = orphanedDeadTimeout; + uint8_t* newFallbackQPtr = realloc(ctx->fallbackOutcomeQ->ptr, sizeof(OutgoingMessage) * fallbackQLen); + NULL_GUARD(newFallbackQPtr); + ctx->fallbackOutcomeQ->ptr = newFallbackQPtr; + ctx->fallbackOutcomeQ->allocatedSize = fallbackQLen; + + size_t len = 0; + uint8_t* outMsg = createClientSetup(msg->nonce, ctx, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, len); +} + + + +size_t fillOrphanedClientReportItem(uint8_t* outmsg, ClientContext* ctx) +{ + printf("filling orphaned %lu with %lu packets\n", ctx->seatId, ctx->fallbackOutcomeQ->size - ctx->fallbackOutcomeQPadding); + encodeUintToBytes(ctx->seatId, outmsg); + encodeUintToBytes(ctx->orphanedAt, outmsg + 8); + encodeUintToBytes((uint64_t)(ctx->fallbackOutcomeQ->size - ctx->fallbackOutcomeQPadding), outmsg + 8 + 8); + return 8 * 3; +} + +void handleIncomingListOrphaned(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + LinkedListEntry* clientEntry = *emulContext->clientsHead; + size_t orphanedCount = 0; + size_t orphanedPackets = 0; + + while(clientEntry != NULL) + { + ClientContext* orphanedCtx = clientEntry->payload; + if(orphanedCtx->orphanedAt != 0) + { + orphanedCount++; + orphanedPackets += orphanedCtx->fallbackOutcomeQ->size - ctx->fallbackOutcomeQPadding; + } + clientEntry = clientEntry->nextEntry; + } + + printf("found %lu orphaned clients with %lu packets\n", orphanedCount, orphanedPackets); + + const size_t orphanedListReportItemSize = 8 * 3; + const size_t msgLen = ((orphanedListReportItemSize * orphanedCount) + 9 + 8); + uint8_t* outMsg = malloc(sizeof(uint8_t) * msgLen); + NULL_GUARD(outMsg); + fillHead(msg->nonce, PACKET_TYPE_CTRL, CTRL_TYPE_LIST_ORPHANED, outMsg); + + + clientEntry = *emulContext->clientsHead; + size_t filledCount = 0; + uint8_t* outMsgCursor = outMsg + 9; + encodeUintToBytes((uint64_t)orphanedCount, outMsgCursor); + outMsgCursor += 8; + + while(clientEntry != NULL && filledCount < orphanedCount) + { + ClientContext* orphanedCtx = clientEntry->payload; + if(orphanedCtx->orphanedAt != 0) + { + size_t filled = fillOrphanedClientReportItem(outMsgCursor, orphanedCtx); + outMsgCursor += filled; + filledCount++; + } + clientEntry = clientEntry->nextEntry; + } + + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, msgLen); +} + +void handleIncomingLoadFailed(const BaseMessage* msg, ClientContext* ctx, const EmulContext* emulContext) +{ + const uint8_t readMode = ((const uint8_t*)msg->payload)[0]; + const uint64_t seatId = decodeBytesToU64(((const uint8_t*)msg->payload) + 1); + + + LinkedListEntry* clientEntry = *emulContext->clientsHead; + ClientContext* orphanedCtx = NULL; + + while(clientEntry != NULL) + { + orphanedCtx = clientEntry->payload; + if(orphanedCtx->orphanedAt != 0) + { + if(orphanedCtx->seatId == seatId) + { + break; + } + } + clientEntry = clientEntry->nextEntry; + } + + if(clientEntry == NULL || orphanedCtx == NULL) + { + return; + } + + size_t payloadSize = 0; + OutgoingMessage* failedQueue = (OutgoingMessage*)orphanedCtx->fallbackOutcomeQ->ptr; + for(size_t i = orphanedCtx->fallbackOutcomeQPadding; i < orphanedCtx->fallbackOutcomeQ->size; i++) + { + OutgoingMessage* failedMessage = &failedQueue[i]; + payloadSize += 8; + payloadSize += failedMessage->msgLen; + } + + const size_t outLen = 9 + payloadSize; + + printf("preparing failed messages report: %lu messages, %lu payload size, %lu message size\n", + orphanedCtx->fallbackOutcomeQ->size - orphanedCtx->fallbackOutcomeQPadding, + payloadSize, + outLen); + + uint8_t* outMsg = malloc(sizeof(uint8_t) * outLen); + fillHead(msg->nonce, PACKET_TYPE_CTRL, CTRL_TYPE_LOAD_FAILED, outMsg); + uint8_t* outMsgCursor = outMsg + 9; + + for(size_t i = orphanedCtx->fallbackOutcomeQPadding; i < orphanedCtx->fallbackOutcomeQ->size; i++) + { + OutgoingMessage* failedMessage = &failedQueue[i]; + encodeUintToBytes(failedMessage->msgLen, outMsgCursor); + outMsgCursor += 8; + for(size_t j = 0; j < failedMessage->msgLen; j++) + { + outMsgCursor[j] = failedMessage->msg[j]; + } + outMsgCursor += failedMessage->msgLen; + } + + + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, outLen); + if(readMode == 1) + { + dispatchOutgoingMessage(emulContext->outBufs, ctx, NULL, 0); + } +} + + +void handleIncomingControlMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + switch (msg->payloadHeader) + { + case CTRL_TYPE_EXEC: + { + handleIncomingExecControl(msg, emulContext); + break; + } + case CTRL_TYPE_LIST_ORPHANED: + { + handleIncomingListOrphaned(msg, ctx, emulContext); + break; + } + case CTRL_TYPE_LOAD_FAILED: + { + handleIncomingLoadFailed(msg, ctx, emulContext); + break; + } + case CTRL_TYPE_SETUP_CONNECTION: + { + handleIncomingEditConnectionSettingsControl(msg, ctx, emulContext); + break; + } + + default: + break; + } +} diff --git a/src/proto/handlers/mem.c b/src/proto/handlers/mem.c new file mode 100644 index 0000000..f689535 --- /dev/null +++ b/src/proto/handlers/mem.c @@ -0,0 +1,84 @@ +#include "proto/handlers/mem.h" +#include "proto/enums.h" +#include "proto/pack.h" +#include "panic.h" +#include + + +uint8_t* createMemReadResponseMessage(uint64_t nonce, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut) +{ + *lenOut = 9 + 8 + payloadLen; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + encodeUintToBytes(nonce, outmsg); + + outmsg[8] = PACKET_TYPE_MEM << 4; + outmsg[8] |= MEM_TYPE_READ_RESP; + + encodeUintToBytes(clockCounter, outmsg + 9); + + memcpy(outmsg + 9 + 8, payload, payloadLen); + + return outmsg; +} + +void handleIncomingMemReadReq(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + uint64_t readLen = decodeBytesToU64(msg->payload + 24); + + if(devId >= emulContext->devicesCount) + { + return; + } + + + //TODO: lookup config for global addr conversion + uint64_t globalAddr = startAddr + segId * 128; + + uint8_t* readPtr = emulContext->devicesMem[devId] + globalAddr; + + size_t outLen = 0; + uint8_t* outMsg = createMemReadResponseMessage(msg->nonce, *emulContext->clockCounter, readPtr, readLen, &outLen); + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, outLen); +} + + +void handleIncomingMemWrite(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + + if(devId >= emulContext->devicesCount) + { + return; + } + + //TODO: lookup config for global addr conversion + uint64_t globalAddr = startAddr + segId * 128; + + uint8_t* writePtr = emulContext->devicesMem[devId] + globalAddr; + + memcpy(writePtr, msg->payload + 24, msg->payloadLen - 24); + + size_t outLen = 0; + uint8_t* outMsg = createMemReadResponseMessage(msg->nonce, *emulContext->clockCounter, writePtr, msg->payloadLen - 24, &outLen); + dispatchOutgoingMessage(emulContext->outBufs, ctx, outMsg, outLen); +} + + +void handleIncomingMemMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + if(msg->payloadHeader == MEM_TYPE_READ_REQ) + { + handleIncomingMemReadReq(msg, ctx, emulContext); + } + else if(msg->payloadHeader == MEM_TYPE_WRITE_PUSH) + { + handleIncomingMemWrite(msg, ctx, emulContext); + } +} \ No newline at end of file diff --git a/src/proto/handlers/stream.c b/src/proto/handlers/stream.c new file mode 100644 index 0000000..38c19c4 --- /dev/null +++ b/src/proto/handlers/stream.c @@ -0,0 +1,124 @@ +#include "proto/handlers/stream.h" + +#include "streamed.h" +#include "proto/pack.h" +#include "proto/enums.h" +#include "panic.h" + + + +void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId) +{ + for(size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) + { + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[deviceId]; + for(size_t i = 0; i < deviceRegs->regCount; i++) + { + StreamReg* iReg = &deviceRegs->regs[i]; + if(iReg->clientContext->clientId == ctx->clientId && iReg->regId == regId) + { + printf("Discard stream %u register for client %lu\n", regId, ctx->clientId); + deviceRegs->regCount -= 1; + for(size_t j = i; j < deviceRegs->regCount; j++) + { + deviceRegs->regs[j] = deviceRegs->regs[j + 1]; + } + break; + } + } + } +} + +void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx) +{ + for(size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) + { + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[deviceId]; + + StreamReg* newStreamRegs = malloc(sizeof(StreamReg) * deviceRegs->regCount); + NULL_GUARD(newStreamRegs); + size_t newStreamRegsId = 0; + + for(size_t i = 0; i < deviceRegs->regCount; i++) + { + StreamReg* reg = &deviceRegs->regs[i]; + if(reg->clientContext->clientId != ctx->clientId) + { + newStreamRegs[newStreamRegsId] = *reg; + newStreamRegsId++; + } + else + { + printf("Removing reg %d mode stream for %lu/%lu+%lu: [%u]\n", reg->mode, deviceId, reg->startAddr, reg->segLen, reg->regId); + } + } + free(deviceRegs->regs); + deviceRegs->regCount = newStreamRegsId; + deviceRegs->regs = newStreamRegs; + } +} + + +void handleStreamRegMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext, uint8_t X) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + uint64_t segLen = decodeBytesToU64(msg->payload + 24); + + if(devId >= emulContext->devicesCount) + { + return; + } + + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[devId]; + if(deviceRegs->regCount + 1 >= deviceRegs->allocatedSize) + { + size_t newAllocatedSize = deviceRegs->allocatedSize * 2; + if(newAllocatedSize <= deviceRegs->regCount + 1) + { + newAllocatedSize = deviceRegs->regCount + 1; + } + StreamReg* newRegs = realloc(deviceRegs->regs, sizeof(StreamReg) * newAllocatedSize); + NULL_GUARD(newRegs); + deviceRegs->allocatedSize = newAllocatedSize; + deviceRegs->regs = newRegs; + } + + //TODO: lookup config for global segment addr conversion + uint64_t globalStartAddr = startAddr + segId * 128; + + StreamReg* reg = &deviceRegs->regs[deviceRegs->regCount]; + + reg->clientContext = ctx; + reg->segLen = segLen; + reg->startAddr = globalStartAddr; + reg->regId = ctx->streamRegIterator; + reg->mode = X; + + ctx->streamRegIterator++; + deviceRegs->regCount++; + + printf("Done registering %d mode stream for %lu/%lu+%lu: [%u]\n", X, devId, globalStartAddr, segLen, reg->regId); + + size_t len = 0; + uint8_t* notifMsg = createDoneRegMessage(msg->nonce, X, devId, segId, startAddr, segLen, reg->regId, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx, notifMsg, len); + +} + +void handleIncomingStreamMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint8_t X = msg->payloadHeader >> 3; + uint8_t streamType = msg->payloadHeader & 0b111; + + if(streamType == STREAM_TYPE_REG_REQUEST) + { + handleStreamRegMessage(msg, ctx, emulContext, X); + } + else if(streamType == STREAM_TYPE_REG_DISCARD) + { + uint32_t regId = decodeBytesToU32(msg->payload); + unregisterClientStream(emulContext, ctx, regId); + } +} \ No newline at end of file diff --git a/src/proto/handlers/ws.c b/src/proto/handlers/ws.c new file mode 100644 index 0000000..e0e31c4 --- /dev/null +++ b/src/proto/handlers/ws.c @@ -0,0 +1,156 @@ +#include "proto/handlers/ws.h" +#include "proto/handlers.h" + +#include "panic.h" +#include "proto/enums.h" +#include "my_mutex.h" +#include "proto/handlers/auth.h" +#include "proto/msg.h" + +void onWsOpen(ws_cli_conn_t client) +{ + char errbuf[1024]; + + ptQueue* incomeQueue = ptQueueCreate(errbuf); + NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf); + + // ptQueue* outcomeQueue = ptQueueCreate(errbuf); + + // NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf); + + ClientContext* cctx = malloc(sizeof(ClientContext)); + if(cctx == NULL) + { + ptQueueFree(incomeQueue); + // ptQueueFree(outcomeQueue); + panic("Unable to allocate client context\n"); + } + cctx->clientId = client; + cctx->isAuthed = 0; + cctx->streamRegIterator = 0; + cctx->incomeQ = incomeQueue; + // cctx->outcomeQ = outcomeQueue; + cctx->connectedAt = (uint64_t)time(NULL); + cctx->orphanedAt = 0; + cctx->orphanedDeadTimeout = 60; + cctx->fallbackOutcomeQPadding = 0; + cctx->fallbackOutcomeQ = malloc(sizeof(SizedPtr)); + NULL_GUARD(cctx->fallbackOutcomeQ); + cctx->fallbackOutcomeQ->ptr = (void*) calloc(1024, sizeof(OutgoingMessage)); + NULL_GUARD(cctx->fallbackOutcomeQ->ptr); + cctx->fallbackOutcomeQ->allocatedSize = 1024; + cctx->fallbackOutcomeQ->size = 0; + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + if(ev == NULL) + { + ptQueueFree(incomeQueue); + // ptQueueFree(outcomeQueue); + free(cctx->fallbackOutcomeQ->ptr); + free(cctx->fallbackOutcomeQ); + free(cctx); + + panic("Unable to allocate register event"); + } + ev->regType = REG_EVTYPE_CONNECT; + ev->ctx = cctx; + + ws_set_connection_context(client, cctx); + + ServerContext* ctx = ws_get_server_context(client); + + with_lock(&ctx->registerMutex) + { + cctx->seatId = ctx->seatCounter; + ctx->seatCounter++; + + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + ptQueueFree(incomeQueue); + // ptQueueFree(outcomeQueue); + free(cctx->fallbackOutcomeQ->ptr); + free(cctx->fallbackOutcomeQ); + free(cctx); + + panic("Unable to push to reg queue: %s\n", errbuf); + } + } + + char *cli; + cli = ws_getaddress(client); + printf("Connection %lu opened, addr: %s\n", client, cli); +} + + +void onWsClose(ws_cli_conn_t client) +{ + pthread_t tid = pthread_self(); + char *cli; + cli = ws_getaddress(client); + printf("Connection %lu closed, addr: %s, thread id: %lu\n", client, cli, tid); + + + char errbuf[1024]; + + ClientContext* cctx = ws_get_connection_context(client); + if(cctx == NULL) + { + cctx = NULL; + printf("Unable to get client context for client %lu\n", client); + } + + + ServerContext* ctx = ws_get_server_context(client); + if(ctx == NULL) + { + printf("Unable to get server context for client %lu\n", client); + return; + } + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + NULL_GUARD(ev); + ev->regType = REG_EVTYPE_CLOSE; + ev->ctx = cctx; + + + with_lock(&ctx->registerMutex) + { + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + panic("Unable to push to reg queue: %s\n", errbuf); + } + } +} + + + + +void onWsMessage(ws_cli_conn_t client, const unsigned char *msg, uint64_t size, int type) +{ + ClientContext* cctx = ws_get_connection_context(client); + if(cctx == NULL) + { + panic("Unable to get client context\n"); + } + + if (cctx->isAuthed == 0) + { + handle_auth(cctx, client, msg, size, type); + } + else + { + char errbuf[1024]; + BaseMessage* baseMsg = parseMessage(msg, size); + printf("msg\n"); + int isErr = ptQueuePush(cctx->incomeQ, baseMsg, errbuf); + if(isErr) + { + panic("Unable to dispatch client message: %s\n", errbuf); + } + } +} + + + diff --git a/src/proto/msg.c b/src/proto/msg.c new file mode 100644 index 0000000..448c5ac --- /dev/null +++ b/src/proto/msg.c @@ -0,0 +1,107 @@ +#include "proto/msg.h" +#include "panic.h" + +#include "proto/enums.h" +#include "proto/pack.h" + +#include + +BaseMessage* parseMessage(const uint8_t* bytes, size_t size) +{ + const uint8_t headerSize = 9; + BaseMessage* msg = malloc(sizeof(BaseMessage)); + NULL_GUARD(msg); + uint64_t nonce = decodeBytesToU64(bytes); + + + msg->nonce = nonce; + + msg->packetType = bytes[8] >> 4; + msg->payloadHeader = bytes[8] & 0b1111; + + msg->payloadLen = size - headerSize; + + uint8_t* payload = malloc(sizeof(uint8_t) * (msg->payloadLen)); + NULL_GUARD(payload); + + memcpy(payload, bytes + headerSize, msg->payloadLen); + msg->payload = payload; + + return msg; +} + + +void fillHead(uint64_t nonce, uint8_t packetType, uint8_t payloadHeader, uint8_t* outmsg) +{ + encodeUintToBytes(nonce, outmsg); + outmsg[8] = (uint8_t)((packetType & 0b1111) << 4); + outmsg[8] |= payloadHeader & 0b1111; +} + +uint8_t* createControlNotifyMessage(uint64_t nonce, uint64_t clockCounter, uint8_t newEmulState, size_t* lenOut) +{ + *lenOut = 9 + 10; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg, "Unable to allocate message"); + + fillHead(nonce, PACKET_TYPE_CTRL, CTRL_TYPE_NOTIF_STATE, outmsg); + + outmsg[9] = NOTIF_TYPE_EXEC; + encodeUintToBytes(clockCounter, outmsg + 10); + outmsg[18] = newEmulState; + return outmsg; +} + + +uint8_t* createDoneRegMessage(uint64_t nonce, uint8_t X, uint64_t devId, uint64_t segId, uint64_t startAddr, uint64_t segLength, uint32_t regId, size_t* lenOut) +{ + *lenOut = 36 + 9; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + + fillHead(nonce, PACKET_TYPE_STREAM, (uint8_t)(X << 3) | STREAM_TYPE_REG_CONFIRM, outmsg); + + encodeUintToBytes(devId, outmsg + 9); + encodeUintToBytes(segId, outmsg + 9 + 8); + encodeUintToBytes(startAddr, outmsg + 9 + 8 + 8); + encodeUintToBytes(segLength, outmsg + 9 + 8 + 8 + 8); + encodeUintToBytes(regId, outmsg + 9 + 8 + 8 + 8 + 8); + return outmsg; +} + + +uint8_t* createStreamSegmentPush(uint8_t mode, uint32_t regId, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut) +{ + *lenOut = 9 + 4 + 8 + payloadLen; + + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + uint64_t nonce = (uint64_t)~0; + encodeUintToBytes(nonce, outmsg); + outmsg[8] = (uint8_t)((PACKET_TYPE_STREAM << 4) | (mode << 3) | STREAM_TYPE_SEND); + + fillHead(nonce, PACKET_TYPE_STREAM, (uint8_t)((mode << 3) | STREAM_TYPE_SEND), outmsg); + + encodeUintToBytes(regId, outmsg + 9); + encodeUintToBytes(clockCounter, outmsg + 9 + 4); + memcpy(outmsg + 9 + 4 + 8, payload, payloadLen); + + return outmsg; +} + + +uint8_t* createClientSetup(uint64_t nonce, ClientContext* ctx, size_t* lenOut) +{ + *lenOut = 9 + 8 + 8; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + fillHead(nonce, PACKET_TYPE_CTRL, CTRL_TYPE_SETUP_CONNECTION, outmsg); + uint8_t* payload = outmsg + 9; + encodeUintToBytes((uint64_t)ctx->fallbackOutcomeQ->allocatedSize, payload); + encodeUintToBytes((uint64_t)ctx->orphanedDeadTimeout, payload + 8); + + return outmsg; +} \ No newline at end of file diff --git a/src/proto/pack.c b/src/proto/pack.c new file mode 100644 index 0000000..7b98f6f --- /dev/null +++ b/src/proto/pack.c @@ -0,0 +1,57 @@ +#include "proto/pack.h" + +uint64_t decodeBytesToU64(const uint8_t* bytes) +{ + uint64_t ret = 0; + for(uint8_t i = 0; i < 8; i++) + { + ret |= (uint64_t)((uint64_t)(bytes[i]) << ((7 - i) * 8)); + } + return ret; +} + +uint32_t decodeBytesToU32(const uint8_t* bytes) +{ + uint32_t ret = 0; + for(uint8_t i = 0; i < 4; i++) + { + ret |= (uint32_t)((uint32_t)(bytes[i]) << ((3 - i) * 4)); + } + return ret; +} + +uint16_t decodeBytesToU16(const uint8_t* bytes) +{ + return ((uint16_t)(bytes[0] << 8)) | ((uint16_t)bytes[1]); +} + +void encodeUint8ToBytes(uint8_t num, uint8_t* tgt) +{ + tgt[0] = num; +} + + +void encodeUint16ToBytes(uint16_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 2; i++) + { + tgt[i] = (uint8_t)(((uint32_t)num) >> (2 * (1 - i))); + } +} + +void encodeUint32ToBytes(uint32_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 4; i++) + { + tgt[i] = (uint8_t)(((uint32_t)num) >> (4 * (3 - i))); + } +} + +void encodeUint64ToBytes(uint64_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 8; i++) + { + tgt[i] = (uint8_t)(((uint64_t)num) >> (8 * (7 - i))); + } +} + diff --git a/src/state.c b/src/state.c new file mode 100644 index 0000000..9dcc578 --- /dev/null +++ b/src/state.c @@ -0,0 +1,57 @@ +#include + +#include "state.h" + + +uint8_t switchNewEmulState(const uint8_t currentState, const uint8_t controlOp) +{ + + switch (currentState) + { + case EMUL_STATE_STILL: + { + if(controlOp == EMUL_STATE_OP_START) + { + return EMUL_STATE_EXEC; + } + break; + } + case EMUL_STATE_EXEC: + { + if(controlOp == EMUL_STATE_OP_PAUSE) + { + return EMUL_STATE_PAUSE; + } + else if(controlOp == EMUL_STATE_OP_STOP) + { + return EMUL_STATE_STOP; + } + break; + } + case EMUL_STATE_PAUSE: + { + if(controlOp == EMUL_STATE_OP_RESUME) + { + return EMUL_STATE_EXEC; + } + else if (controlOp == EMUL_STATE_OP_RESET) + { + return EMUL_STATE_STILL; + } + break; + } + case EMUL_STATE_STOP: + { + if(controlOp == EMUL_STATE_OP_RESET) + { + return EMUL_STATE_STILL; + } + break; + } + + default: + break; + } + + return currentState; +}