From aaa858b903121eda071b641b052f8a06c63223d7 Mon Sep 17 00:00:00 2001 From: nikto_b Date: Wed, 11 Feb 2026 12:18:59 +0300 Subject: [PATCH] ws server +proto --- Makefile | 36 ++- inc/client.h | 18 ++ inc/context.h | 45 +++ inc/events.h | 13 + inc/executor/executor.h | 0 inc/linkedlist.h | 13 + inc/panic.h | 42 +++ inc/proto/dial.h | 11 + inc/proto/enums.h | 34 +++ inc/proto/handlers.h | 11 + inc/proto/handlers/auth.h | 11 + inc/proto/handlers/control.h | 10 + inc/proto/handlers/mem.h | 10 + inc/proto/handlers/stream.h | 12 + inc/proto/handlers/ws.h | 10 + inc/proto/msg.h | 26 ++ inc/proto/pack.h | 30 ++ inc/sized_ptr.h | 13 + inc/state.h | 20 ++ inc/streamed.h | 27 ++ src/base_device.c | 2 +- src/executor/executor.c | 0 src/linkedlist.c | 45 +++ src/main.c | 528 +++++++++++++++++++++++------------ src/main.h | 1 + src/panic.c | 21 ++ src/proto/dial.c | 46 +++ src/proto/handlers.c | 142 ++++++++++ src/proto/handlers/auth.c | 140 ++++++++++ src/proto/handlers/control.c | 33 +++ src/proto/handlers/mem.c | 80 ++++++ src/proto/handlers/stream.c | 124 ++++++++ src/proto/handlers/ws.c | 140 ++++++++++ src/proto/msg.c | 85 ++++++ src/proto/pack.c | 57 ++++ src/state.c | 57 ++++ 36 files changed, 1702 insertions(+), 191 deletions(-) create mode 100644 inc/client.h create mode 100644 inc/context.h create mode 100644 inc/events.h create mode 100644 inc/executor/executor.h create mode 100644 inc/linkedlist.h create mode 100644 inc/panic.h create mode 100644 inc/proto/dial.h create mode 100644 inc/proto/enums.h create mode 100644 inc/proto/handlers.h create mode 100644 inc/proto/handlers/auth.h create mode 100644 inc/proto/handlers/control.h create mode 100644 inc/proto/handlers/mem.h create mode 100644 inc/proto/handlers/stream.h create mode 100644 inc/proto/handlers/ws.h create mode 100644 inc/proto/msg.h create mode 100644 inc/proto/pack.h create mode 100644 inc/sized_ptr.h create mode 100644 inc/state.h create mode 100644 inc/streamed.h create mode 100644 src/executor/executor.c create mode 100644 src/linkedlist.c create mode 100644 src/main.h create mode 100644 src/panic.c create mode 100644 src/proto/dial.c create mode 100644 src/proto/handlers.c create mode 100644 src/proto/handlers/auth.c create mode 100644 src/proto/handlers/control.c create mode 100644 src/proto/handlers/mem.c create mode 100644 src/proto/handlers/stream.c create mode 100644 src/proto/handlers/ws.c create mode 100644 src/proto/msg.c create mode 100644 src/proto/pack.c create mode 100644 src/state.c diff --git a/Makefile b/Makefile index f568845..e1291c5 100644 --- a/Makefile +++ b/Makefile @@ -1,49 +1,54 @@ +OPENSSL_INCLUDE ?= /usr/include/openssl/ BUILD_DIR=out SRC_DIR=src INC_DIR=inc CC=gcc OBJDUMP=objdump LIBS=deps/tomlc99/libtoml.a deps/ptQueue/out/ptQueue.a deps/wsServer/libws.a -LIBS_HEADERS=deps/ +LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE) +STATIC_LIBS=crypto STANDART=c23 -OPTIMIZE=-Og +OPTIMIZE=-O3 TARGET=main DEFINES=#-DOPCODE_WORDSIZE=2 -DMEM_CELL_WORDS=1 -DPC_WORDSIZE=2 -DGP_REG_CELL_WORDS=1 -DIO_REG_CELL_WORDS=1 -C_SOURCES=$(wildcard $(SRC_DIR)/*.c) -# C_SOURCES=$(wildcard $(SRC_DIR)/*.c) -C_HEADERS=$(wildcard $(INC_DIR)/*.h) +C_SOURCES := $(shell find $(SRC_DIR) -type f -name '*.c') C_INCLUDES=-I$(INC_DIR)/ $(addprefix -I,$(LIBS_HEADERS)) C_DEFS=-D_POSIX_C_SOURCE=199309L -DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith +DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop PEDANTIC_FLAGS=-pedantic -pedantic-errors $(DISABLE_FLAGS) -Wall -Wcast-align -Wcast-qual -Wconversion -Wduplicated-branches -Wduplicated-cond -Werror -Wextra -Wfloat-equal -Wlogical-op -Wpedantic -Wredundant-decls -Wsign-conversion ANALYZER_FLAGS=-fanalyzer -fdiagnostics-show-option -fdiagnostics-color=always LSECTIONS=-ffunction-sections -fdata-sections -Wl,--gc-sections -CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) -flto -fuse-linker-plugin $(LSECTIONS) -lm +CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP +STATICLIBS_FLAGS=$(addprefix -l,$(STATIC_LIBS)) +LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -flto -fuse-linker-plugin $(LSECTIONS) -lm -OBJECTS = $(addprefix $(BUILD_DIR)/,$(notdir $(C_SOURCES:.c=.o))) $(LIBS) +OBJECTS = $(patsubst $(SRC_DIR)/%.c, $(BUILD_DIR)/%.o, $(C_SOURCES)) + +DEP = $(filter %.d, $(OBJECTS:.o=.d)) +$(info $(DEP)) + +OBJECTS += $(LIBS) vpath %.c $(sort $(dir $(C_SOURCES))) -vpath %.h $(sort $(dir $(C_HEADERS))) - -# $(info SOURCES=$(C_SOURCES)) -# $(info HEADERS=$(C_HEADERS)) -# $(info OBJECTS=$(OBJECTS)) all: build -build: date deps Dir $(C_HEADERS) target compile_commands +build: date deps Dir target compile_commands rebuild: clean | build +$(info $(DEP)) + +-include $(DEP) + $(BUILD_DIR)/%.o: %.c Makefile | $(BUILD_DIR) @echo -e '\033[1;32mCC\t'$<'\t->\t'$@'\033[0m' @$(CC) -c $(CFLAGS) -Wa,-a,-ad,-alms=$(BUILD_DIR)/$(notdir $(<:.c=.lst)) $< -o $@ @@ -62,6 +67,7 @@ deps: BuildDir: @mkdir -p $(BUILD_DIR) + $(shell mkdir -p $(dir $(OBJECTS))) SrcDir: @mkdir -p $(SRC_DIR) diff --git a/inc/client.h b/inc/client.h new file mode 100644 index 0000000..882533e --- /dev/null +++ b/inc/client.h @@ -0,0 +1,18 @@ +#ifndef __CLIENT_H__ +#define __CLIENT_H__ + +#include + +#include "wsServer/include/ws.h" +#include "ptQueue/inc/ptQueue.h" + +typedef struct { + ws_cli_conn_t clientId; + uint8_t isAuthed; + ptQueue* incomeQ; + ptQueue* outcomeQ; + uint64_t connectedAt; + uint32_t streamRegIterator; +} ClientContext; + +#endif //ifndef __CLIENT_H__ \ No newline at end of file diff --git a/inc/context.h b/inc/context.h new file mode 100644 index 0000000..4c4190f --- /dev/null +++ b/inc/context.h @@ -0,0 +1,45 @@ +#ifndef __CONTEXT_H__ +#define __CONTEXT_H__ + +#include + +#include "ptQueue/inc/ptQueue.h" +#include "wsServer/include/ws.h" + +#include "linkedlist.h" +#include "sized_ptr.h" + +#include "streamed.h" + +typedef struct { + SizedPtr* bufs; + uint8_t buffersCount; + _Atomic (uint8_t) readRequestIdx; + _Atomic (uint8_t) currWritingIdx; +} OutgoingBuffers; + + + + +typedef struct { + uint8_t* emulState; + uint64_t* clockCounter; + LinkedListEntry** clientsHead; + uint8_t* utilizedFlag; + OutgoingBuffers* outBufs; + DeviceSegStreamReg** deviceStreamRegs; + uint8_t** devicesMem; + size_t devicesCount; +} EmulContext; + + +typedef struct { + pthread_mutex_t registerMutex; + ptQueue* regQueue; + uint8_t* accessToken; + EmulContext* emulContext; +} ServerContext; + + + +#endif //ifndef __CONTEXT_H__ \ No newline at end of file diff --git a/inc/events.h b/inc/events.h new file mode 100644 index 0000000..390b47d --- /dev/null +++ b/inc/events.h @@ -0,0 +1,13 @@ +#ifndef __EVENTS_H__ +#define __EVENTS_H__ + +#include +#include "context.h" + +typedef struct { + uint8_t regType; + ClientContext* ctx; +} ClientRegistrationEvent; + + +#endif //ifndef __EVENTS_H__ \ No newline at end of file diff --git a/inc/executor/executor.h b/inc/executor/executor.h new file mode 100644 index 0000000..e69de29 diff --git a/inc/linkedlist.h b/inc/linkedlist.h new file mode 100644 index 0000000..b2797c2 --- /dev/null +++ b/inc/linkedlist.h @@ -0,0 +1,13 @@ +#ifndef __LINKEDLIST_H__ +#define __LINKEDLIST_H__ + + +typedef struct LinkedListEntry { + struct LinkedListEntry* prevEntry; + struct LinkedListEntry* nextEntry; + void* payload; +} LinkedListEntry; + +void removeLinkedListEntry(LinkedListEntry** head, LinkedListEntry* entry); + +#endif //ifndef __LINKEDLIST_H__ diff --git a/inc/panic.h b/inc/panic.h new file mode 100644 index 0000000..8425f70 --- /dev/null +++ b/inc/panic.h @@ -0,0 +1,42 @@ +#ifndef __PANIC_H__ +#define __PANIC_H__ + +#include +#include +#include + +#ifndef PANIC_FD +#define PANIC_FD stderr +#endif //ifndef PANIC_FD + +#ifndef PANIC_STRACE_LEN +#define PANIC_STRACE_LEN 32 +#endif //ifndef PANIC_STRACE_LEN + +void print_stacktrace(FILE* fd); + +#define panic(...) \ + do { \ + fprintf(PANIC_FD, "Panic at %s:%d\n", __FILE__, __LINE__); \ + fprintf(PANIC_FD, "Panic message:\n\t" __VA_ARGS__); \ + print_stacktrace(PANIC_FD); \ + abort(); \ + } while(0) + +#define NULL_GUARD(ptr, ...) \ + do { \ + if((ptr) == NULL) \ + { \ + if(sizeof(#__VA_ARGS__) > 1) \ + { \ + panic(__VA_ARGS__); \ + } \ + else \ + { \ + panic("NULL detected in %s", #ptr); \ + } \ + } \ + } while(0) + + +#endif //ifndef __PANIC_H__ diff --git a/inc/proto/dial.h b/inc/proto/dial.h new file mode 100644 index 0000000..d90a0fc --- /dev/null +++ b/inc/proto/dial.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_DIAL_H__ +#define __PROTO_DIAL_H__ + +#include + +#include "context.h" + +void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen); +void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen); + +#endif \ No newline at end of file diff --git a/inc/proto/enums.h b/inc/proto/enums.h new file mode 100644 index 0000000..1109ab7 --- /dev/null +++ b/inc/proto/enums.h @@ -0,0 +1,34 @@ +#ifndef __PROTO_ENUMS_H__ +#define __PROTO_ENUMS_H__ + + +#define REG_EVTYPE_CONNECT 1 +#define REG_EVTYPE_AUTH 2 +#define REG_EVTYPE_CLOSE 3 + + + + +#define PACKET_TYPE_CTRL 0b0001 +#define PACKET_TYPE_STREAM 0b0010 +#define PACKET_TYPE_MEM 0b0011 + + +#define CTRL_TYPE_EXEC 0b0001 +#define CTRL_TYPE_NOTIF_STATE 0b0010 + +#define NOTIF_TYPE_EXEC 0b0000 + +#define STREAM_TYPE_REG_REQUEST 0b001 +#define STREAM_TYPE_REG_DISCARD 0b011 +#define STREAM_TYPE_REG_CONFIRM 0b101 +#define STREAM_TYPE_SEND 0b000 + +#define MEM_TYPE_READ_REQ 0b0000 +#define MEM_TYPE_READ_RESP 0b0001 +#define MEM_TYPE_WRITE_PUSH 0b0010 + + + + +#endif //ifndef __PROTO_ENUMS_H__ diff --git a/inc/proto/handlers.h b/inc/proto/handlers.h new file mode 100644 index 0000000..61888ee --- /dev/null +++ b/inc/proto/handlers.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_HANDLERS_H__ +#define __PROTO_HANDLERS_H__ + +#include "events.h" + + +void handleRegEvent(EmulContext* emulContext, ClientRegistrationEvent* ev); +void handleAllClients(EmulContext* emulContext); + + +#endif //ifndef __PROTO_HANDLERS_H__ \ No newline at end of file diff --git a/inc/proto/handlers/auth.h b/inc/proto/handlers/auth.h new file mode 100644 index 0000000..81b0b35 --- /dev/null +++ b/inc/proto/handlers/auth.h @@ -0,0 +1,11 @@ +#ifndef __PROTO_HANDLERS_AUTH_H__ +#define __PROTO_HANDLERS_AUTH_H__ + +#include +#include "context.h" + +uint8_t handle_auth(ClientContext* cctx, ws_cli_conn_t client, const uint8_t* msg, uint64_t msgSize, int msgType); +LinkedListEntry* disconnectDueTimeout(EmulContext* emulContext, LinkedListEntry* clientEntry); +void handleOnClientAuthDone(ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_AUTH_H__ diff --git a/inc/proto/handlers/control.h b/inc/proto/handlers/control.h new file mode 100644 index 0000000..2e2726b --- /dev/null +++ b/inc/proto/handlers/control.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_CONTROL_H__ +#define __PROTO_HANDLERS_CONTROL_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + +void handleIncomingControlMessage(BaseMessage* msg, EmulContext* emulContext); + + +#endif //ifndef __PROTO_HANDLERS_CONTROL_H__ \ No newline at end of file diff --git a/inc/proto/handlers/mem.h b/inc/proto/handlers/mem.h new file mode 100644 index 0000000..b913a89 --- /dev/null +++ b/inc/proto/handlers/mem.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_MEM_H__ +#define __PROTO_HANDLERS_MEM_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + +void handleIncomingMemMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_MEM_H__ + diff --git a/inc/proto/handlers/stream.h b/inc/proto/handlers/stream.h new file mode 100644 index 0000000..7073dfd --- /dev/null +++ b/inc/proto/handlers/stream.h @@ -0,0 +1,12 @@ +#ifndef __PROTO_HANDLERS_STREAM_H__ +#define __PROTO_HANDLERS_STREAM_H__ + +#include "proto/msg.h" +#include "proto/dial.h" + + +void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId); +void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx); +void handleIncomingStreamMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext); + +#endif //ifndef __PROTO_HANDLERS_STREAM_H__ \ No newline at end of file diff --git a/inc/proto/handlers/ws.h b/inc/proto/handlers/ws.h new file mode 100644 index 0000000..5bea2aa --- /dev/null +++ b/inc/proto/handlers/ws.h @@ -0,0 +1,10 @@ +#ifndef __PROTO_HANDLERS_WS_H__ +#define __PROTO_HANDLERS_WS_H__ + +#include "wsServer/include/ws.h" + +void onWsMessage(ws_cli_conn_t client, const unsigned char *msg, uint64_t size, int type); +void onWsClose(ws_cli_conn_t client); +void onWsOpen(ws_cli_conn_t client); + +#endif //ifndef __PROTO_HANDLERS_WS_H__ diff --git a/inc/proto/msg.h b/inc/proto/msg.h new file mode 100644 index 0000000..f1855be --- /dev/null +++ b/inc/proto/msg.h @@ -0,0 +1,26 @@ +#ifndef __PROTO_MSG_H__ +#define __PROTO_MSG_H__ + +#include "wsServer/include/ws.h" +#include + +typedef struct { + ws_cli_conn_t clientIdx; + uint8_t* msg; + size_t msgLen; +} OutgoingMessage; + +typedef struct { + uint64_t nonce; + uint8_t packetType; + uint8_t payloadHeader; + const void* payload; + size_t payloadLen; +} BaseMessage; + +BaseMessage* parseMessage(const uint8_t* bytes, size_t size); +uint8_t* createControlNotifyMessage(uint64_t nonce, uint64_t clockCounter, uint8_t newEmulState, size_t* lenOut); +uint8_t* createDoneRegMessage(uint64_t nonce, uint8_t X, uint64_t devId, uint64_t segId, uint64_t startAddr, uint64_t segLength, uint32_t regId, size_t* lenOut); +uint8_t* createStreamSegmentPush(uint8_t mode, uint32_t regId, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut); + +#endif //ifndef __PROTO_MSG_H__ \ No newline at end of file diff --git a/inc/proto/pack.h b/inc/proto/pack.h new file mode 100644 index 0000000..b3c9cd6 --- /dev/null +++ b/inc/proto/pack.h @@ -0,0 +1,30 @@ +#ifndef __PROTO_PACK_H__ +#define __PROTO_PACK_H__ + +#include + + + +uint64_t decodeBytesToU64(const uint8_t* bytes); +uint32_t decodeBytesToU32(const uint8_t* bytes); +uint16_t decodeBytesToU16(const uint8_t* bytes); + + + +void encodeUint8ToBytes(uint8_t num, uint8_t* tgt); +void encodeUint16ToBytes(uint16_t num, uint8_t* tgt); +void encodeUint32ToBytes(uint32_t num, uint8_t* tgt); +void encodeUint64ToBytes(uint64_t num, uint8_t* tgt); + + + + + +#define encodeUintToBytes(num, tgt) _Generic((num), \ + uint8_t: encodeUint8ToBytes, \ + uint16_t: encodeUint16ToBytes, \ + uint32_t: encodeUint32ToBytes, \ + uint64_t: encodeUint64ToBytes \ + )(num, tgt) + +#endif //ifndef __PROTO_PACK_H__ diff --git a/inc/sized_ptr.h b/inc/sized_ptr.h new file mode 100644 index 0000000..ed61199 --- /dev/null +++ b/inc/sized_ptr.h @@ -0,0 +1,13 @@ +#ifndef __SIZED_PTR_H__ +#define __SIZED_PTR_H__ + +#include + +typedef struct { + void* ptr; + size_t size; + size_t allocatedSize; +} SizedPtr; + + +#endif //ifndef __SIZED_PTR_H__ diff --git a/inc/state.h b/inc/state.h new file mode 100644 index 0000000..da03a5c --- /dev/null +++ b/inc/state.h @@ -0,0 +1,20 @@ +#ifndef __EMUL_STATE_H__ +#define __EMUL_STATE_H__ + +#include + +#define EMUL_STATE_STILL 0 +#define EMUL_STATE_EXEC 1 +#define EMUL_STATE_PAUSE 2 +#define EMUL_STATE_STOP 3 + +#define EMUL_STATE_OP_START 1 +#define EMUL_STATE_OP_PAUSE 2 +#define EMUL_STATE_OP_RESUME 3 +#define EMUL_STATE_OP_RESET 4 +#define EMUL_STATE_OP_STOP 5 + + +uint8_t switchNewEmulState(const uint8_t currentState, const uint8_t controlOp); + +#endif //ifndef __EMUL_STATE_H__ \ No newline at end of file diff --git a/inc/streamed.h b/inc/streamed.h new file mode 100644 index 0000000..3949f7b --- /dev/null +++ b/inc/streamed.h @@ -0,0 +1,27 @@ +#ifndef __STREAMED_H__ +#define __STREAMED_H__ + +#include +#include +#include "client.h" + +#define STREAM_MODE_READ 0 +#define STREAM_MODE_WRITE 1 + + +typedef struct { + ClientContext* clientContext; + uint32_t regId; + uint64_t startAddr; + uint64_t segLen; + uint8_t mode; +} StreamReg; + +typedef struct { + StreamReg* regs; + size_t regCount; + size_t allocatedSize; +} DeviceSegStreamReg; + + +#endif //ifndef __STREAMED_H__ diff --git a/src/base_device.c b/src/base_device.c index 98eb981..b47fd0d 100644 --- a/src/base_device.c +++ b/src/base_device.c @@ -62,7 +62,7 @@ device_handle_t* openBaseDevice(conf_dev_t* devConf, char* errbuf) return NULL; } - char intErrbuf[1024]; + char intErrbuf[512]; device_lib_t* devLib = loadDeviceLib(devConf->libPath, intErrbuf); if (devLib == NULL) diff --git a/src/executor/executor.c b/src/executor/executor.c new file mode 100644 index 0000000..e69de29 diff --git a/src/linkedlist.c b/src/linkedlist.c new file mode 100644 index 0000000..60306e1 --- /dev/null +++ b/src/linkedlist.c @@ -0,0 +1,45 @@ +#include +#include "linkedlist.h" + + +void removeLinkedListEntry(LinkedListEntry** head, LinkedListEntry* entry) +{ + // check for head + if(entry->prevEntry != NULL) + { + entry->prevEntry->nextEntry = entry->nextEntry; + } + // check for tail + if(entry->nextEntry != NULL) + { + entry->nextEntry->prevEntry = entry->prevEntry; + } + + // check for removing head entry + if(entry == *head) + { + if(entry->nextEntry == NULL && entry->prevEntry == NULL) + { + // removing full list + *head = NULL; + } + else if(entry->nextEntry != NULL) + { + // moving head to next entry + *head = entry->nextEntry; + } + else if(entry->prevEntry != NULL) + { + // head that have previous item, must be impossible + *head = entry->prevEntry; + } + + if(entry == *head) + { + // if still head => there is a circular list, removing whole + *head = NULL; + } + } + + free(entry); +} \ No newline at end of file diff --git a/src/main.c b/src/main.c index ecfd534..953a1e9 100644 --- a/src/main.c +++ b/src/main.c @@ -21,6 +21,9 @@ #include "my_mutex.h" +#include "panic.h" + + #include "tomlc99/toml.h" #include @@ -28,10 +31,39 @@ #include "compose_device.h" +#include "linkedlist.h" #include "ptQueue/inc/ptQueue.h" #include "wsServer/include/ws.h" +#define _GNU_SOURCE +#include +#include + +#include +#include +#include + +#include + + + +#include "context.h" +#include "streamed.h" +#include "events.h" + +#include "state.h" + +#include "proto/msg.h" +#include "proto/enums.h" +#include "proto/dial.h" + +#include "proto/handlers.h" +#include "proto/handlers/ws.h" + + +#include "proto/pack.h" + void printMemory(void* cells, uint64_t cellsCount) { @@ -120,10 +152,6 @@ void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data) // void *threadMain(void *param); -#define _GNU_SOURCE -#include -#include - void my_sleep(int microseconds) { struct timespec ts; @@ -132,173 +160,152 @@ void my_sleep(int microseconds) { nanosleep(&ts, NULL); } -#define REG_EVTYPE_CONNECT 1 -#define REG_EVTYPE_CLOSE 2 -#define CLIENT_STATE_UNAUTHED 0 -typedef struct { - pthread_mutex_t registerMutex; - ptQueue* regQueue; -} ServerContext; -typedef struct { - ws_cli_conn_t clientId; - uint8_t isAuthed; - ptQueue* incomeQ; - ptQueue* outcomeQ; -} ClientContext; -typedef struct { - uint8_t regType; - ClientContext* ctx; -} ClientRegistrationEvent; - -/** - * @brief This function is called whenever a new connection is opened. - * @param client Client connection. - */ -void onopen(ws_cli_conn_t client) +void* outgoingMain(void* args) { - char errbuf[1024]; - - ptQueue* incomeQueue = ptQueueCreate(errbuf); - if(incomeQueue == NULL) + OutgoingBuffers* outBufs = args; + uint8_t bufsCount = outBufs->buffersCount; + uint8_t currBufIdx = 0; + uint8_t currWritingIdxPtr = 0; + SizedPtr* curBuf = NULL; + while (currWritingIdxPtr != 0xFF) { - printf("Unable to create income queue: %s\n", errbuf); - abort(); - } - - ptQueue* outcomeQueue = ptQueueCreate(errbuf); - if(outcomeQueue == NULL) - { - printf("Unable to create outcome queue: %s\n", errbuf); - ptQueueFree(incomeQueue); - abort(); - } - - ClientContext* cctx = malloc(sizeof(ClientContext)); - if(cctx == NULL) - { - printf("Unable to allocate client context\n"); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - abort(); - } - cctx->clientId = client; - cctx->isAuthed = 0; - cctx->incomeQ = incomeQueue; - cctx->outcomeQ = outcomeQueue; - - ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); - if(ev == NULL) - { - printf("Unable to allocate register event"); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - free(cctx); - abort(); - } - ev->regType = REG_EVTYPE_CONNECT; - ev->ctx = cctx; - - ws_set_connection_context(client, cctx); - - ServerContext* ctx = ws_get_server_context(client); - - with_lock(&ctx->registerMutex) - { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if(exitCode) + if(curBuf != NULL) { - printf("Unable to push to reg queue: %s\n", errbuf); - ptQueueFree(incomeQueue); - ptQueueFree(outcomeQueue); - free(cctx); - abort(); + OutgoingMessage* messages = curBuf->ptr; + for(size_t i = 0; i < curBuf->size; i++) + { + // my_sleep(100000); + OutgoingMessage *outMsg = &messages[i]; + if(outMsg->msg == NULL) + { + panic("Got double read on buf %d while writing on buf %d\n", currBufIdx, currWritingIdxPtr); + } + // printf("\t%lu -> [%lu]\n", outMsg->msgLen, outMsg->clientIdx); + ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); + // printf("running free on buf %d message %lu\n", currBufIdx, i); + free(outMsg->msg); + outMsg->msg = NULL; + // printf("done free on buf %d message %lu\n", currBufIdx, i); + } + curBuf->size = 0; + atomic_store(&outBufs->readRequestIdx, currBufIdx); + + currBufIdx++; + if(currBufIdx >= bufsCount) + { + currBufIdx = 0; + } + } + + currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx); + if(currBufIdx == currWritingIdxPtr) + { + atomic_store(&outBufs->readRequestIdx, currBufIdx); + if(curBuf == NULL) + { + my_sleep(100); + } + else + { + curBuf = NULL; + } + } + else + { + curBuf = &outBufs->bufs[currBufIdx]; + } + } + pthread_exit(0); +} + + +void mockDevice0(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) +{ + for(size_t i = 0; i < 8; i++) + { + readAddrs[*readAddrsLen] = i; + *readAddrsLen += 1; + } + + uint64_t num = decodeBytesToU64(mem); + num += 1; + encodeUintToBytes(num, mem); + + for(size_t i = 0; i < 8; i++) + { + writeAddrs[*writeAddrsLen] = i; + *writeAddrsLen += 1; + } +} + +void mockDevice1(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen) +{ + +} + +void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, uint8_t* mem) +{ + size_t mlen = 0; + uint8_t* msg = createStreamSegmentPush(reg->mode, reg->regId, *emulContext->clockCounter, mem + reg->startAddr, reg->segLen, &mlen); + dispatchOutgoingMessage(emulContext->outBufs, reg->clientContext->clientId, msg, mlen); +} + +void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, uint8_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode) +{ + if(deviceRegs->regCount == 0) + { + // printf("No stream regs\n"); + return; + } + StreamReg** dispatchRegs = alloca(sizeof(StreamReg*) * deviceRegs->regCount); + NULL_GUARD(dispatchRegs); + size_t dispatchRegsCnt = 0; + + for(size_t i = 0; i < addrsLen; i++) + { + uint64_t addr = addrs[i]; + for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) + { + StreamReg* reg = &deviceRegs->regs[regIdx]; + if(reg->mode == mode) + { + if(reg->startAddr <= addr && reg->startAddr + reg->segLen >= addr) + { + uint8_t isDuplicate = 0; + for(size_t j = 0; j < dispatchRegsCnt; j++) + { + if(dispatchRegs[j] == reg) + { + isDuplicate = 1; + break; + } + } + if(!isDuplicate) + { + dispatchRegs[dispatchRegsCnt] = reg; + dispatchRegsCnt++; + } + } + } } } - char *cli; - cli = ws_getaddress(client); - printf("Connection opened, addr: %s\n", cli); + // if(dispatchRegsCnt == 0) + // { + // printf("No memory dispatched\n"); + // } + + for(size_t i = 0; i < dispatchRegsCnt; i++) + { + dispatchStreamSegment(emulContext, dispatchRegs[i], mem); + } } -/** - * @brief This function is called whenever a connection is closed. - * @param client Client connection. - */ -void onclose(ws_cli_conn_t client) -{ - char errbuf[1024]; - - ClientContext* cctx = ws_get_connection_context(client); - if(cctx == NULL) - { - printf("Unable to get client context\n"); - abort(); - } - - - ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); - if(ev == NULL) - { - printf("Unable to allocate register event"); - abort(); - } - ev->regType = REG_EVTYPE_CONNECT; - ev->ctx = cctx; - - ws_set_connection_context(client, cctx); - - ServerContext* ctx = ws_get_server_context(client); - - with_lock(&ctx->registerMutex) - { - int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); - if(exitCode) - { - printf("Unable to push to reg queue: %s\n", errbuf); - abort(); - } - } - - - pthread_t tid = pthread_self(); - char *cli; - cli = ws_getaddress(client); - printf("Connection closed, addr: %s, thread id: %lu\n", cli, tid); -} - -/** - * @brief Message events goes here. - * @param client Client connection. - * @param msg Message content. - * @param size Message size. - * @param type Message type. - */ -void onmessage(ws_cli_conn_t client, - const unsigned char *msg, uint64_t size, int type) -{ - ClientContext* cctx = ws_get_connection_context(client); - if(cctx == NULL) - { - printf("Unable to get client context\n"); - abort(); - } - - pthread_t tid = pthread_self(); - char *cli; - cli = ws_getaddress(client); - printf("I receive a message: %s (%zu), from: %s, thread id: %lu\n", msg, - size, cli, tid); - - ws_sendframe_txt(client, "hello"); - ws_sendframe_txt(client, "world"); -} - - int main(int argc, char** argv) { char errbuf[1024]; @@ -306,39 +313,210 @@ int main(int argc, char** argv) pthread_mutex_init(&mtx, NULL); ptQueue* regQ = ptQueueCreate(errbuf); - if(regQ == NULL) + NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf); + + size_t deviceCount = 2; + + DeviceSegStreamReg* device0SegStreamRegs = malloc(sizeof(DeviceSegStreamReg) * 4); + NULL_GUARD(device0SegStreamRegs); + + device0SegStreamRegs->allocatedSize = 0; + device0SegStreamRegs->regCount = 0; + device0SegStreamRegs->regs = NULL; + + DeviceSegStreamReg* device1SegStreamRegs = malloc(sizeof(DeviceSegStreamReg) * 4); + NULL_GUARD(device1SegStreamRegs); + + + device1SegStreamRegs->allocatedSize = 0; + device1SegStreamRegs->regCount = 0; + device1SegStreamRegs->regs = NULL; + + DeviceSegStreamReg** deviceStreamRegs = malloc(sizeof(DeviceSegStreamReg*) * deviceCount); + NULL_GUARD(deviceStreamRegs); + + deviceStreamRegs[0] = device0SegStreamRegs; + deviceStreamRegs[1] = device1SegStreamRegs; + + uint8_t emulState = EMUL_STATE_STILL; + + + uint8_t* device0mem = malloc(sizeof(uint8_t) * 1024 * 128); + NULL_GUARD(device0mem); + + uint8_t* device1mem = malloc(sizeof(uint8_t) * 1024 * 128); + NULL_GUARD(device1mem); + + uint8_t** devicesMem = malloc(sizeof(uint8_t*) * deviceCount); + NULL_GUARD(devicesMem); + devicesMem[0] = device0mem; + devicesMem[1] = device1mem; + + uint64_t* device0readAddrs = malloc(sizeof(uint64_t) * 128); + NULL_GUARD(device0readAddrs); + size_t device0readAddrsLen = 0; + uint64_t* device0writeAddrs = malloc(sizeof(uint64_t) * 128); + NULL_GUARD(device0writeAddrs); + size_t device0writeAddrsLen = 0; + + + uint64_t* device1readAddrs = malloc(sizeof(uint64_t) * 128); + NULL_GUARD(device1readAddrs); + size_t device1readAddrsLen = 0; + uint64_t* device1writeAddrs = malloc(sizeof(uint64_t) * 128); + NULL_GUARD(device1writeAddrs); + size_t device1writeAddrsLen = 0; + + + uint8_t outBufsCount = 16; + SizedPtr* bufs = malloc(sizeof(SizedPtr) * outBufsCount); + NULL_GUARD(bufs); + + for(size_t i = 0; i < outBufsCount; i++) { - printf("Unable to create reg q: %s\n", errbuf); - abort(); + OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 128); + NULL_GUARD(messages); + bufs[i].ptr = messages; + bufs[i].allocatedSize = 128; + bufs[i].size = 0; + + for(size_t j = 0; j < bufs[i].allocatedSize; j++) + { + messages[j].msgLen = 0xFFFF; + messages[j].clientIdx = 0xFFFF; + messages[j].msg = NULL; + } } - ServerContext ctx = { - mtx, - regQ + OutgoingBuffers outBufs = { + bufs, + outBufsCount, + 0, + 0 }; + LinkedListEntry* clientsLinkedListHead = NULL; + + uint64_t clockCounter = 0; + + uint8_t utilizedFlag = 0; + EmulContext emulContext = { + &emulState, + &clockCounter, + &clientsLinkedListHead, + &utilizedFlag, + &outBufs, + deviceStreamRegs, + devicesMem, + deviceCount + }; + + + uint8_t access_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; + + ServerContext ctx = { + mtx, + regQ, + (uint8_t*)&access_token, + &emulContext, + }; + + pthread_t outTid; + pthread_attr_t outAttr; + + pthread_attr_init(&outAttr); + pthread_create(&outTid, &outAttr, outgoingMain, &outBufs); + pthread_detach(outTid); + + ws_socket(&(struct ws_server){ - /* - * Bind host, such as: - * localhost -> localhost/127.0.0.1 - * 0.0.0.0 -> global IPv4 - * :: -> global IPv4+IPv6 (Dual stack) - */ .host = "localhost", - .port = 8080, + .port = 8181, .thread_loop = 1, .timeout_ms = 1000, .context = &ctx, - .evs.onopen = &onopen, - .evs.onclose = &onclose, - .evs.onmessage = &onmessage + .evs.onopen = &onWsOpen, + .evs.onclose = &onWsClose, + .evs.onmessage = &onWsMessage }); + ptQueueElem* regQueueTail = regQ->tail; + while(1) { - sleep(1); + ClientRegistrationEvent* payload = regQueueTail->payload; + if(payload != NULL) + { + printf("Got reg queue data\n"); + handleRegEvent(&emulContext, payload); + regQueueTail = regQueueTail->nextEl; + utilizedFlag = 1; + } + + + handleAllClients(&emulContext); + + uint8_t readReqIdx = atomic_load(&outBufs.readRequestIdx); + if(readReqIdx == outBufs.currWritingIdx || outBufs.bufs[outBufs.currWritingIdx].size >= outBufs.bufs[outBufs.currWritingIdx].allocatedSize / 2) + { + uint8_t oldWriteIdx = outBufs.currWritingIdx; + uint8_t newWriteIdx = outBufs.currWritingIdx + 1; + if(outBufs.bufs[outBufs.currWritingIdx].size != 0 || readReqIdx != newWriteIdx) + { + if(newWriteIdx >= outBufs.buffersCount) + { + newWriteIdx = 0; + } + // printf("Switching write idx %d->%d\n", oldWriteIdx, newWriteIdx); + + + // if(readReqIdx == newWriteIdx) + // { + // printf("Flood control breach, waiting until outgoing buffers empty!\n"); + // } + while(readReqIdx == newWriteIdx) + { + my_sleep(1000); + readReqIdx = atomic_load(&outBufs.readRequestIdx); + } + atomic_store(&outBufs.currWritingIdx, newWriteIdx); + outBufs.bufs[outBufs.currWritingIdx].size = 0; + } + } + + if(emulState == EMUL_STATE_EXEC) + { + printf("Running...\n"); + device0readAddrsLen = 0; + device0writeAddrsLen = 0; + + device1readAddrsLen = 0; + device1writeAddrsLen = 0; + + mockDevice0(device0mem, device0readAddrs, &device0readAddrsLen, device0writeAddrs, &device0writeAddrsLen); + mockDevice1(device1mem, device1readAddrs, &device1readAddrsLen, device1writeAddrs, &device1writeAddrsLen); + + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[0], device0mem, device0readAddrs, device0readAddrsLen, STREAM_MODE_READ); + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[0], device0mem, device0writeAddrs, device0writeAddrsLen, STREAM_MODE_WRITE); + + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[1], device1mem, device1readAddrs, device1readAddrsLen, STREAM_MODE_READ); + dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[1], device1mem, device1writeAddrs, device1writeAddrsLen, STREAM_MODE_WRITE); + + clockCounter++; + } + else if(!utilizedFlag) + { + my_sleep(100); + } + } + + atomic_store(&outBufs.currWritingIdx, 0xFF); + pthread_join(outTid, NULL); + + + return 0; } diff --git a/src/main.h b/src/main.h new file mode 100644 index 0000000..6f70f09 --- /dev/null +++ b/src/main.h @@ -0,0 +1 @@ +#pragma once diff --git a/src/panic.c b/src/panic.c new file mode 100644 index 0000000..90af8d8 --- /dev/null +++ b/src/panic.c @@ -0,0 +1,21 @@ +#include "panic.h" + +void print_stacktrace(FILE* fd) +{ + void *buffer[PANIC_STRACE_LEN]; + int size = backtrace(buffer, PANIC_STRACE_LEN); + char **symbols = backtrace_symbols(buffer, size); + + if (symbols == NULL) { + fprintf(fd, "backtrace_symbols"); + abort(); + } + + fprintf(fd, "\n\nStack trace:\n"); + for (int i = 0; i < size; i++) + { + fprintf(fd, "%s\n", symbols[i]); + } + + free(symbols); +} \ No newline at end of file diff --git a/src/proto/dial.c b/src/proto/dial.c new file mode 100644 index 0000000..73e9871 --- /dev/null +++ b/src/proto/dial.c @@ -0,0 +1,46 @@ +#include "proto/dial.h" + +#include + +#include "context.h" +#include "panic.h" +#include "proto/msg.h" + + +void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen) +{ + LinkedListEntry* clientsHead = *emulContext->clientsHead; + while(clientsHead != NULL) + { + uint8_t* newMsg = malloc(sizeof(uint8_t) * msgLen); + NULL_GUARD(newMsg); + memcpy(newMsg, msg, msgLen); + + ClientContext* ctx = clientsHead->payload; + dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, newMsg, msgLen); + clientsHead = clientsHead->nextEntry; + } + free(msg); +} + + + + + +void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen) +{ + SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx]; + if(p->size + 1 >= p->allocatedSize) + { + printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx); + OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 2); + NULL_GUARD(newPtr); + p->ptr = newPtr; + p->allocatedSize = p->allocatedSize * 2; + } + OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size]; + outmsg->msg = msg; + outmsg->msgLen = msgLen; + outmsg->clientIdx = clientIdx; + p->size++; +} \ No newline at end of file diff --git a/src/proto/handlers.c b/src/proto/handlers.c new file mode 100644 index 0000000..22244db --- /dev/null +++ b/src/proto/handlers.c @@ -0,0 +1,142 @@ +#include "proto/handlers.h" + +#include +#include "proto/enums.h" +#include "panic.h" + +#include "proto/handlers/auth.h" +#include "proto/handlers/stream.h" +#include "proto/handlers/control.h" +#include "proto/handlers/mem.h" + + +void handleCloseClient(EmulContext* emulContext, ClientContext* ctx) +{ + if(ctx->streamRegIterator > 0) + { + unregisterClientStreams(emulContext, ctx); + } +} + + +void handleRegEvent(EmulContext* emulContext, ClientRegistrationEvent* ev) +{ + if(ev->regType == REG_EVTYPE_CONNECT) + { + printf("open client %lu\n", ev->ctx->clientId); + LinkedListEntry* newClientsLinkedListHead = malloc(sizeof(LinkedListEntry)); + NULL_GUARD(newClientsLinkedListHead); + + newClientsLinkedListHead->payload = ev->ctx; + if(*emulContext->clientsHead != NULL) + { + (*emulContext->clientsHead)->prevEntry = newClientsLinkedListHead; + } + newClientsLinkedListHead->prevEntry = NULL; + newClientsLinkedListHead->nextEntry = *emulContext->clientsHead; + *emulContext->clientsHead = newClientsLinkedListHead; + } + else if (ev->regType == REG_EVTYPE_CLOSE) + { + LinkedListEntry* clientEntry = *emulContext->clientsHead; + + while(clientEntry != NULL) + { + if(clientEntry->payload == ev->ctx) + { + printf("close client %lu\n", ev->ctx->clientId); + handleCloseClient(emulContext, ev->ctx); + removeLinkedListEntry(emulContext->clientsHead, clientEntry); + break; + } + clientEntry = clientEntry->nextEntry; + } + } + else if (ev->regType == REG_EVTYPE_AUTH) + { + printf("auth client %lu\n", ev->ctx->clientId); + ev->ctx->isAuthed = 1; + handleOnClientAuthDone(ev->ctx, emulContext); + } +} + + + + +void handleIncomingMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + switch (msg->packetType) + { + case PACKET_TYPE_CTRL: + { + printf("CTRL packet\n"); + handleIncomingControlMessage(msg, emulContext); + break; + } + case PACKET_TYPE_STREAM: + { + printf("STREAM packet\n"); + handleIncomingStreamMessage(msg, ctx, emulContext); + break; + } + case PACKET_TYPE_MEM: + { + printf("MEM packet\n"); + handleIncomingMemMessage(msg, ctx, emulContext); + break; + } + default: + { + printf("Unsupported packet type: %u\n", msg->packetType); + break; + } + } +} + + +void handleAllClients(EmulContext* emulContext) +{ + LinkedListEntry* clientEntry = *emulContext->clientsHead; + + size_t handleLimit = 128; + + while(clientEntry != NULL && handleLimit > 0) + { + handleLimit--; + ClientContext* ctx = clientEntry->payload; + if(!ctx->isAuthed) + { + clientEntry = disconnectDueTimeout(emulContext, clientEntry); + if(clientEntry == NULL) + { + break; + } + if(*emulContext->utilizedFlag) + { + continue; + } + } + else + { + void* payload = ctx->incomeQ->head->payload; + if(payload != NULL) + { + *emulContext->utilizedFlag = 1; + BaseMessage* msg = payload; + printf("client %lu sent data: \nnonce %lu, ptype %4u, ph: %u\n", ctx->clientId, msg->nonce, msg->packetType, msg->payloadHeader); + + handleIncomingMessage(msg, ctx, emulContext); + + + free(payload); + ctx->incomeQ->head = ctx->incomeQ->head->nextEl; + } + clientEntry = clientEntry->nextEntry; + } + } +} + + + + + diff --git a/src/proto/handlers/auth.c b/src/proto/handlers/auth.c new file mode 100644 index 0000000..5e6af54 --- /dev/null +++ b/src/proto/handlers/auth.c @@ -0,0 +1,140 @@ +#include "proto/handlers/auth.h" +#include +#include +#include +#include + +#include "my_mutex.h" + +#include "proto/enums.h" +#include "events.h" +#include "panic.h" + + +#include "proto/pack.h" +#include "proto/dial.h" +#include "proto/handlers/control.h" + +uint8_t validateAccessTokenDeterministic(const uint8_t* data, const uint8_t* token, uint64_t timestamp) +{ + char buf[1024]; + sprintf(buf, "%s%lu", token, timestamp); + + uint8_t hash[SHA512_DIGEST_LENGTH]; + SHA512((uint8_t*)buf, strlen(buf), hash); + + uint8_t valid = 1; + + for(size_t i = 0; i < SHA512_DIGEST_LENGTH; i++) + { + if (data[i] != hash[i]) + { + valid = 0; + } + } + + return valid; +} + +uint8_t validateAccessToken(const uint8_t* data, const uint8_t* access_token) +{ + uint64_t t = (uint64_t)time(NULL) / 30; + + uint8_t valid1 = validateAccessTokenDeterministic(data, access_token, t); + uint8_t valid2 = validateAccessTokenDeterministic(data, access_token, t - 1); + + return valid1 || valid2; +} + + +uint8_t handle_auth(ClientContext* cctx, ws_cli_conn_t client, const uint8_t* msg, uint64_t msgSize, int msgType) +{ + ServerContext* ctx = ws_get_server_context(client); + if(msgSize != SHA512_DIGEST_LENGTH) + { + int ret = ws_close_client(client); + if(ret == -1) + { + printf("Unable to close client %lu\n", client); + } + return 0; + } + uint8_t isValid = validateAccessToken(msg, (const uint8_t*)ctx->accessToken); + if(!isValid) + { + printf("Auth token invalid\n"); + int ret = ws_close_client(client); + if(ret == -1) + { + printf("Unable to close client %lu\n", client); + } + return 0; + } + + char errbuf[1024]; + + printf("Auth token is valid\n"); + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + if(ev == NULL) + { + panic("Unable to allocate register event"); + } + ev->regType = REG_EVTYPE_AUTH; + ev->ctx = cctx; + + ws_set_connection_context(client, cctx); + + + with_lock(&ctx->registerMutex) + { + printf("Writing auth event\n"); + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + panic("Unable to push to reg queue: %s\n", errbuf); + } + } + return 1; +} + + +LinkedListEntry* disconnectDueTimeout(EmulContext* emulContext, LinkedListEntry* clientEntry) +{ + ClientContext* ctx = clientEntry->payload; + + uint64_t now = (uint64_t)time(NULL); + if(now - ctx->connectedAt <= 30) + { + return clientEntry->nextEntry; + } + + printf("Timeout on connection %lu\n", ctx->clientId); + int ret = ws_close_client(ctx->clientId); + if(ret == -1) + { + printf("Unable to close client\n"); + } + LinkedListEntry* nextEntry = clientEntry->nextEntry; + + removeLinkedListEntry(emulContext->clientsHead, clientEntry); + clientEntry = nextEntry; + *emulContext->utilizedFlag = 1; + return clientEntry; +} + +void handleOnClientAuthDone(ClientContext* ctx, EmulContext* emulContext) +{ + uint8_t* framedata = malloc(sizeof(uint8_t) * 8); + NULL_GUARD(framedata); + + encodeUintToBytes(ctx->clientId, framedata); + + dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, framedata, 8); + + + size_t len = 0; + uint8_t* msg = createControlNotifyMessage((uint64_t)~0, *emulContext->clockCounter, *emulContext->emulState, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, msg, len); + +} \ No newline at end of file diff --git a/src/proto/handlers/control.c b/src/proto/handlers/control.c new file mode 100644 index 0000000..63e960b --- /dev/null +++ b/src/proto/handlers/control.c @@ -0,0 +1,33 @@ +#include "proto/handlers/control.h" + +#include "proto/pack.h" +#include "proto/enums.h" +#include "panic.h" +#include "state.h" + + + + +void handleIncomingControlMessage(BaseMessage* msg, EmulContext* emulContext) +{ + if(msg->payloadHeader == CTRL_TYPE_EXEC) + { + printf("ctrl exec\n"); + uint8_t stateOp = ((const uint8_t*)(msg->payload))[0]; + printf("state operation: %u\n", stateOp); + uint8_t newEmulState = switchNewEmulState(*emulContext->emulState, stateOp); + if(newEmulState != *emulContext->emulState) + { + printf("Switch state %u -> %u\n", *emulContext->emulState, newEmulState); + *emulContext->emulState = newEmulState; + + size_t len = 0; + uint8_t* notify = createControlNotifyMessage(msg->nonce, *emulContext->clockCounter, newEmulState, &len); + broadcastClients(emulContext, notify, len); + } + } + else + { + printf("invalid payload header: %u\n", msg->payloadHeader); + } +} \ No newline at end of file diff --git a/src/proto/handlers/mem.c b/src/proto/handlers/mem.c new file mode 100644 index 0000000..09bae46 --- /dev/null +++ b/src/proto/handlers/mem.c @@ -0,0 +1,80 @@ +#include "proto/handlers/mem.h" +#include "proto/enums.h" +#include "proto/pack.h" +#include "panic.h" +#include + + +uint8_t* createMemReadResponseMessage(uint64_t nonce, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut) +{ + *lenOut = 9 + 8 + payloadLen; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + encodeUintToBytes(nonce, outmsg); + + outmsg[8] = PACKET_TYPE_MEM << 4; + outmsg[8] |= MEM_TYPE_READ_RESP; + + encodeUintToBytes(clockCounter, outmsg + 9); + + memcpy(outmsg + 9 + 8, payload, payloadLen); + + return outmsg; +} + +void handleIncomingMemReadReq(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + uint64_t readLen = decodeBytesToU64(msg->payload + 24); + + if(devId >= emulContext->devicesCount) + { + return; + } + + + //TODO: lookup config for global addr conversion + uint64_t globalAddr = startAddr + segId * 128; + + uint8_t* readPtr = emulContext->devicesMem[devId] + globalAddr; + + size_t outLen = 0; + uint8_t* outMsg = createMemReadResponseMessage(msg->nonce, *emulContext->clockCounter, readPtr, readLen, &outLen); + dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, outMsg, outLen); +} + + +void handleIncomingMemWrite(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + + if(devId >= emulContext->devicesCount) + { + return; + } + + //TODO: lookup config for global addr conversion + uint64_t globalAddr = startAddr + segId * 128; + + uint8_t* writePtr = emulContext->devicesMem[devId] + globalAddr; + + memcpy(writePtr, msg->payload + 24, msg->payloadLen - 24); +} + + +void handleIncomingMemMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + if(msg->payloadHeader == MEM_TYPE_READ_REQ) + { + handleIncomingMemReadReq(msg, ctx, emulContext); + } + else if(msg->payloadHeader == MEM_TYPE_WRITE_PUSH) + { + handleIncomingMemWrite(msg, ctx, emulContext); + } +} \ No newline at end of file diff --git a/src/proto/handlers/stream.c b/src/proto/handlers/stream.c new file mode 100644 index 0000000..dbfaf87 --- /dev/null +++ b/src/proto/handlers/stream.c @@ -0,0 +1,124 @@ +#include "proto/handlers/stream.h" + +#include "streamed.h" +#include "proto/pack.h" +#include "proto/enums.h" +#include "panic.h" + + + +void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId) +{ + for(size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) + { + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[deviceId]; + for(size_t i = 0; i < deviceRegs->regCount; i++) + { + StreamReg* iReg = &deviceRegs->regs[i]; + if(iReg->clientContext->clientId == ctx->clientId && iReg->regId == regId) + { + printf("Discard stream %u register for client %lu\n", regId, ctx->clientId); + deviceRegs->regCount -= 1; + for(size_t j = i; j < deviceRegs->regCount; j++) + { + deviceRegs->regs[j] = deviceRegs->regs[j + 1]; + } + break; + } + } + } +} + +void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx) +{ + for(size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) + { + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[deviceId]; + + StreamReg* newStreamRegs = malloc(sizeof(StreamReg) * deviceRegs->regCount); + NULL_GUARD(newStreamRegs); + size_t newStreamRegsId = 0; + + for(size_t i = 0; i < deviceRegs->regCount; i++) + { + StreamReg* reg = &deviceRegs->regs[i]; + if(reg->clientContext->clientId != ctx->clientId) + { + newStreamRegs[newStreamRegsId] = *reg; + newStreamRegsId++; + } + else + { + printf("Removing reg %d mode stream for %lu/%lu+%lu: [%u]\n", reg->mode, deviceId, reg->startAddr, reg->segLen, reg->regId); + } + } + free(deviceRegs->regs); + deviceRegs->regCount = newStreamRegsId; + deviceRegs->regs = newStreamRegs; + } +} + + +void handleStreamRegMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext, uint8_t X) +{ + uint64_t devId = decodeBytesToU64(msg->payload); + uint64_t segId = decodeBytesToU64(msg->payload + 8); + uint64_t startAddr = decodeBytesToU64(msg->payload + 16); + uint64_t segLen = decodeBytesToU64(msg->payload + 24); + + if(devId >= emulContext->devicesCount) + { + return; + } + + DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[devId]; + if(deviceRegs->regCount + 1 >= deviceRegs->allocatedSize) + { + size_t newAllocatedSize = deviceRegs->allocatedSize; + if(newAllocatedSize <= deviceRegs->regCount + 1) + { + newAllocatedSize = deviceRegs->regCount + 1; + } + StreamReg* newRegs = realloc(deviceRegs->regs, sizeof(StreamReg) * newAllocatedSize); + NULL_GUARD(newRegs); + deviceRegs->allocatedSize = newAllocatedSize; + deviceRegs->regs = newRegs; + } + + //TODO: lookup config for global segment addr conversion + uint64_t globalStartAddr = startAddr + segId * 128; + + StreamReg* reg = &deviceRegs->regs[deviceRegs->regCount]; + + reg->clientContext = ctx; + reg->segLen = segLen; + reg->startAddr = globalStartAddr; + reg->regId = ctx->streamRegIterator; + reg->mode = X; + + ctx->streamRegIterator++; + deviceRegs->regCount++; + + printf("Done registering %d mode stream for %lu/%lu+%lu: [%u]\n", X, devId, globalStartAddr, segLen, reg->regId); + + size_t len = 0; + uint8_t* notifMsg = createDoneRegMessage(msg->nonce, X, devId, segId, startAddr, segLen, reg->regId, &len); + dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, notifMsg, len); + +} + +void handleIncomingStreamMessage(BaseMessage* msg, ClientContext* ctx, EmulContext* emulContext) +{ + uint8_t X = msg->payloadHeader >> 3; + uint8_t streamType = msg->payloadHeader & 0b111; + + if(streamType == STREAM_TYPE_REG_REQUEST) + { + handleStreamRegMessage(msg, ctx, emulContext, X); + } + else if(streamType == STREAM_TYPE_REG_DISCARD) + { + uint32_t regId = decodeBytesToU32(msg->payload); + unregisterClientStream(emulContext, ctx, regId); + } +} \ No newline at end of file diff --git a/src/proto/handlers/ws.c b/src/proto/handlers/ws.c new file mode 100644 index 0000000..23e2d90 --- /dev/null +++ b/src/proto/handlers/ws.c @@ -0,0 +1,140 @@ +#include "proto/handlers/ws.h" +#include "proto/handlers.h" + +#include "panic.h" +#include "proto/enums.h" +#include "my_mutex.h" +#include "proto/handlers/auth.h" +#include "proto/msg.h" + +void onWsOpen(ws_cli_conn_t client) +{ + char errbuf[1024]; + + ptQueue* incomeQueue = ptQueueCreate(errbuf); + NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf); + + ptQueue* outcomeQueue = ptQueueCreate(errbuf); + + NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf); + + ClientContext* cctx = malloc(sizeof(ClientContext)); + if(cctx == NULL) + { + ptQueueFree(incomeQueue); + ptQueueFree(outcomeQueue); + panic("Unable to allocate client context\n"); + } + cctx->clientId = client; + cctx->isAuthed = 0; + cctx->streamRegIterator = 0; + cctx->incomeQ = incomeQueue; + cctx->outcomeQ = outcomeQueue; + cctx->connectedAt = (uint64_t)time(NULL); + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + if(ev == NULL) + { + ptQueueFree(incomeQueue); + ptQueueFree(outcomeQueue); + free(cctx); + + panic("Unable to allocate register event"); + } + ev->regType = REG_EVTYPE_CONNECT; + ev->ctx = cctx; + + ws_set_connection_context(client, cctx); + + ServerContext* ctx = ws_get_server_context(client); + + with_lock(&ctx->registerMutex) + { + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + ptQueueFree(incomeQueue); + ptQueueFree(outcomeQueue); + free(cctx); + + panic("Unable to push to reg queue: %s\n", errbuf); + } + } + + char *cli; + cli = ws_getaddress(client); + printf("Connection %lu opened, addr: %s\n", client, cli); +} + + +void onWsClose(ws_cli_conn_t client) +{ + pthread_t tid = pthread_self(); + char *cli; + cli = ws_getaddress(client); + printf("Connection %lu closed, addr: %s, thread id: %lu\n", client, cli, tid); + + + char errbuf[1024]; + + ClientContext* cctx = ws_get_connection_context(client); + if(cctx == NULL) + { + cctx = NULL; + printf("Unable to get client context\n"); + } + + + ServerContext* ctx = ws_get_server_context(client); + if(ctx == NULL) + { + printf("Unable to get server context\n"); + return; + } + + ClientRegistrationEvent* ev = malloc(sizeof(ClientContext)); + NULL_GUARD(ev); + ev->regType = REG_EVTYPE_CLOSE; + ev->ctx = cctx; + + + with_lock(&ctx->registerMutex) + { + int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); + if(exitCode) + { + panic("Unable to push to reg queue: %s\n", errbuf); + } + } +} + + + + +void onWsMessage(ws_cli_conn_t client, const unsigned char *msg, uint64_t size, int type) +{ + ClientContext* cctx = ws_get_connection_context(client); + if(cctx == NULL) + { + panic("Unable to get client context\n"); + } + + if (cctx->isAuthed == 0) + { + handle_auth(cctx, client, msg, size, type); + } + else + { + char errbuf[1024]; + BaseMessage* baseMsg = parseMessage(msg, size); + printf("msg\n"); + int isErr = ptQueuePush(cctx->incomeQ, baseMsg, errbuf); + if(isErr) + { + panic("Unable to dispatch client message: %s\n", errbuf); + } + } +} + + + diff --git a/src/proto/msg.c b/src/proto/msg.c new file mode 100644 index 0000000..934c26f --- /dev/null +++ b/src/proto/msg.c @@ -0,0 +1,85 @@ +#include "proto/msg.h" +#include "panic.h" + +#include "proto/enums.h" +#include "proto/pack.h" + +#include + +BaseMessage* parseMessage(const uint8_t* bytes, size_t size) +{ + const uint8_t headerSize = 9; + BaseMessage* msg = malloc(sizeof(BaseMessage)); + NULL_GUARD(msg); + uint64_t nonce = decodeBytesToU64(bytes); + + + msg->nonce = nonce; + + msg->packetType = bytes[8] >> 4; + msg->payloadHeader = bytes[8] & 0b1111; + + msg->payloadLen = size - headerSize; + + uint8_t* payload = malloc(sizeof(uint8_t) * (msg->payloadLen)); + NULL_GUARD(payload); + + memcpy(payload, bytes + headerSize, msg->payloadLen); + msg->payload = payload; + + return msg; +} + + +uint8_t* createControlNotifyMessage(uint64_t nonce, uint64_t clockCounter, uint8_t newEmulState, size_t* lenOut) +{ + *lenOut = 9 + 10; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg, "Unable to allocate message"); + + encodeUintToBytes(nonce, outmsg); + + outmsg[8] = PACKET_TYPE_CTRL << 4; + outmsg[8] |= CTRL_TYPE_NOTIF_STATE; + + outmsg[9] = NOTIF_TYPE_EXEC; + encodeUintToBytes(clockCounter, outmsg + 10); + outmsg[18] = newEmulState; + return outmsg; +} + + +uint8_t* createDoneRegMessage(uint64_t nonce, uint8_t X, uint64_t devId, uint64_t segId, uint64_t startAddr, uint64_t segLength, uint32_t regId, size_t* lenOut) +{ + *lenOut = 36 + 9; + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + encodeUintToBytes(nonce, outmsg); + outmsg[8] = (uint8_t)((PACKET_TYPE_STREAM << 4) | (X << 3) | STREAM_TYPE_REG_CONFIRM); + encodeUintToBytes(devId, outmsg + 9); + encodeUintToBytes(segId, outmsg + 9 + 8); + encodeUintToBytes(startAddr, outmsg + 9 + 8 + 8); + encodeUintToBytes(segLength, outmsg + 9 + 8 + 8 + 8); + encodeUintToBytes(regId, outmsg + 9 + 8 + 8 + 8 + 8); + return outmsg; +} + + +uint8_t* createStreamSegmentPush(uint8_t mode, uint32_t regId, uint64_t clockCounter, uint8_t* payload, size_t payloadLen, size_t* lenOut) +{ + *lenOut = 9 + 4 + 8 + payloadLen; + + uint8_t* outmsg = malloc(sizeof(uint8_t) * (*lenOut)); + NULL_GUARD(outmsg); + + uint64_t nonce = (uint64_t)~0; + encodeUintToBytes(nonce, outmsg); + outmsg[8] = (uint8_t)((PACKET_TYPE_STREAM << 4) | (mode << 3) | STREAM_TYPE_SEND); + + encodeUintToBytes(regId, outmsg + 9); + encodeUintToBytes(clockCounter, outmsg + 9 + 4); + memcpy(outmsg + 9 + 4 + 8, payload, payloadLen); + + return outmsg; +} diff --git a/src/proto/pack.c b/src/proto/pack.c new file mode 100644 index 0000000..7b98f6f --- /dev/null +++ b/src/proto/pack.c @@ -0,0 +1,57 @@ +#include "proto/pack.h" + +uint64_t decodeBytesToU64(const uint8_t* bytes) +{ + uint64_t ret = 0; + for(uint8_t i = 0; i < 8; i++) + { + ret |= (uint64_t)((uint64_t)(bytes[i]) << ((7 - i) * 8)); + } + return ret; +} + +uint32_t decodeBytesToU32(const uint8_t* bytes) +{ + uint32_t ret = 0; + for(uint8_t i = 0; i < 4; i++) + { + ret |= (uint32_t)((uint32_t)(bytes[i]) << ((3 - i) * 4)); + } + return ret; +} + +uint16_t decodeBytesToU16(const uint8_t* bytes) +{ + return ((uint16_t)(bytes[0] << 8)) | ((uint16_t)bytes[1]); +} + +void encodeUint8ToBytes(uint8_t num, uint8_t* tgt) +{ + tgt[0] = num; +} + + +void encodeUint16ToBytes(uint16_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 2; i++) + { + tgt[i] = (uint8_t)(((uint32_t)num) >> (2 * (1 - i))); + } +} + +void encodeUint32ToBytes(uint32_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 4; i++) + { + tgt[i] = (uint8_t)(((uint32_t)num) >> (4 * (3 - i))); + } +} + +void encodeUint64ToBytes(uint64_t num, uint8_t* tgt) +{ + for(uint8_t i = 0; i < 8; i++) + { + tgt[i] = (uint8_t)(((uint64_t)num) >> (8 * (7 - i))); + } +} + diff --git a/src/state.c b/src/state.c new file mode 100644 index 0000000..9dcc578 --- /dev/null +++ b/src/state.c @@ -0,0 +1,57 @@ +#include + +#include "state.h" + + +uint8_t switchNewEmulState(const uint8_t currentState, const uint8_t controlOp) +{ + + switch (currentState) + { + case EMUL_STATE_STILL: + { + if(controlOp == EMUL_STATE_OP_START) + { + return EMUL_STATE_EXEC; + } + break; + } + case EMUL_STATE_EXEC: + { + if(controlOp == EMUL_STATE_OP_PAUSE) + { + return EMUL_STATE_PAUSE; + } + else if(controlOp == EMUL_STATE_OP_STOP) + { + return EMUL_STATE_STOP; + } + break; + } + case EMUL_STATE_PAUSE: + { + if(controlOp == EMUL_STATE_OP_RESUME) + { + return EMUL_STATE_EXEC; + } + else if (controlOp == EMUL_STATE_OP_RESET) + { + return EMUL_STATE_STILL; + } + break; + } + case EMUL_STATE_STOP: + { + if(controlOp == EMUL_STATE_OP_RESET) + { + return EMUL_STATE_STILL; + } + break; + } + + default: + break; + } + + return currentState; +}