change queues, run exec speed optimizations

This commit is contained in:
2026-06-23 21:30:01 +03:00
parent 7a272788f8
commit 26e594cee6
22 changed files with 919 additions and 613 deletions

View File

@@ -4,14 +4,15 @@ SRC_DIR=src
INC_DIR=inc INC_DIR=inc
CC=gcc CC=gcc
OBJDUMP=objdump OBJDUMP=objdump
LIBS=deps/ptQueue/out/ptQueue.a deps/wsServer/libws.a deps/flatcc/lib/libflatccrt.a LIBS=deps/wsServer/libws.a deps/flatcc/lib/libflatccrt.a
LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE) LIBS_HEADERS=deps/ $(OPENSSL_INCLUDE)
# flatcc runtime and generated reader headers use const-dropping casts; # flatcc runtime and generated reader headers use const-dropping casts;
# include both as system headers so -Wcast-qual doesn't apply to them # include both as system headers so -Wcast-qual doesn't apply to them
SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/ SYSTEM_INCLUDES=-isystem deps/flatcc/include/ -isystem $(PROTO_INC_DIR)/
STATIC_LIBS=crypto STATIC_LIBS=crypto
STANDART=c23 STANDART=c23
OPTIMIZE=-Og # OPTIMIZE=-Og #3 -march=native
OPTIMIZE=-O3 -march=native
TARGET=main TARGET=main
FLATCC = deps/flatcc/bin/flatcc FLATCC = deps/flatcc/bin/flatcc
@@ -34,14 +35,16 @@ C_INCLUDES=-I$(INC_DIR)/ $(addprefix -I,$(LIBS_HEADERS))
C_DEFS=-D_POSIX_C_SOURCE=199309L C_DEFS=-D_POSIX_C_SOURCE=199309L
DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop DISABLE_FLAGS=-Wno-unused-variable -Wno-unused-parameter -Wno-write-strings -Wno-pointer-arith -Wno-analyzer-infinite-loop -Wno-unused-function -Wno-unused-but-set-variable
PEDANTIC_FLAGS=-pedantic -pedantic-errors $(DISABLE_FLAGS) -Wall -Wcast-align -Wcast-qual -Wconversion -Wduplicated-branches -Wduplicated-cond -Werror -Wextra -Wfloat-equal -Wlogical-op -Wpedantic -Wredundant-decls -Wsign-conversion PEDANTIC_FLAGS=-pedantic -pedantic-errors $(DISABLE_FLAGS) -Wall -Wcast-align -Wcast-qual -Wconversion -Wduplicated-branches -Wduplicated-cond -Werror -Wextra -Wfloat-equal -Wlogical-op -Wpedantic -Wredundant-decls -Wsign-conversion
ANALYZER_FLAGS=-fanalyzer -fdiagnostics-show-option -fdiagnostics-color=always ANALYZER_FLAGS=-fanalyzer -fdiagnostics-show-option -fdiagnostics-color=always
LSECTIONS=-ffunction-sections -fdata-sections -Wl,--gc-sections LSECTIONS=-ffunction-sections -fdata-sections -Wl,--gc-sections
CFLAGS=$(C_DEFS) -g $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP # -fno-omit-frame-pointer -fno-inline -fno-lto # CFLAGS=$(C_DEFS) -g -pthread $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline # -fno-omit-frame-pointer -fno-inline -fno-lto
CFLAGS=$(C_DEFS) -g -pthread $(C_INCLUDES) $(SYSTEM_INCLUDES) $(DEFINES) $(OPTIMIZE) --std=$(STANDART) $(PEDANTIC_FLAGS) $(ANALYZER_FLAGS) -MMD -MP # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline # -fno-omit-frame-pointer -fno-inline -fno-lto
STATICLIBS_FLAGS=$(addprefix -l,$(STATIC_LIBS)) STATICLIBS_FLAGS=$(addprefix -l,$(STATIC_LIBS))
# LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline -fno-lto # LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline -fno-lto
LFLAGS=$(OPTIMIZE) -g $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -flto -fuse-linker-plugin $(LSECTIONS) -lm -fno-omit-frame-pointer -fno-inline # LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -fuse-linker-plugin $(LSECTIONS) -lm -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fno-omit-frame-pointer -fno-inline -flto -lgcov # -fno-omit-frame-pointer -fno-inline # #-fprofile-generate # -flto
LFLAGS=$(OPTIMIZE) -g -pthread $(PEDANTIC_FLAGS) $(DEFINES) $(STATICLIBS_FLAGS) -fuse-linker-plugin $(LSECTIONS) -lm # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fprofile-use # -fprofile-generate -fprofile-arcs -ftest-coverage # -fno-omit-frame-pointer -fno-inline -fno-lto # -fno-omit-frame-pointer -fno-inline -flto -lgcov # -fno-omit-frame-pointer -fno-inline # #-fprofile-generate # -flto
@@ -53,10 +56,18 @@ DEP = $(filter %.d, $(OBJECTS:.o=.d))
OBJECTS += $(LIBS) OBJECTS += $(LIBS)
vpath %.c $(sort $(dir $(C_SOURCES))) vpath %.c $(sort $(dir $(C_SOURCES)))
# asan: CFLAGS := $(filter-out -fanalyzer,$(CFLAGS)) -fsanitize=address,undefined
# asan: LFLAGS := $(filter-out -flto -fuse-linker-plugin,$(LFLAGS)) -fsanitize=address,undefined
# asan: build
# tsan: CFLAGS := $(filter-out -fanalyzer,$(CFLAGS)) -fsanitize=thread
# tsan: LFLAGS := $(filter-out -flto -fuse-linker-plugin,$(LFLAGS)) -fsanitize=thread
# tsan: build
all: build all: build
build: date deps Dir proto python-proto target compile_commands build: date deps Dir proto python-proto target
rebuild: clean | build rebuild: clean | build
@@ -121,10 +132,10 @@ $(FLATC):
@$(MAKE) -C deps flatbuffers/build/flatc @$(MAKE) -C deps flatbuffers/build/flatc
.PHONY: clean deps proto python-proto .PHONY: clean deps proto python-proto asan tsan clean
clean: clean:
@rm -rf $(BUILD_DIR)/* $(BUILD_DIR)/.* $(PROTO_INC_DIR)/* $(PROTO_SRC_DIR)/* @rm -rf $(BUILD_DIR)/* $(BUILD_DIR)/.* $(PROTO_INC_DIR)/* $(PROTO_SRC_DIR)/*
# @rm -f compile_commands.json
@echo -e '\033[0;31mCleaned\033[0m' @echo -e '\033[0;31mCleaned\033[0m'
.NOTPARALLEL: date target rebuild deps proto .NOTPARALLEL: date target rebuild deps proto
@@ -133,6 +144,3 @@ date:
@date @date
@echo -e '\033[0m' @echo -e '\033[0m'
compile_commands:
# @bear -- ./.gen_compile_commands.sh $(TARGET) $(CC) "$(CFLAGS)" "$(LFLAGS)" "$(OBJECTS)"

View File

@@ -27,13 +27,12 @@ void freeDevMem(device_mem_t* devMem)
{ {
free(devMem->memsegShifts); free(devMem->memsegShifts);
free(devMem->memsegSizes); free(devMem->memsegSizes);
free(devMem->memsegCellSizes);
free(devMem->rawCells); free(devMem->rawCells);
free(devMem->memreadCellAddrs); free(devMem->memreadCellAddrs);
free(devMem->memwriteCellAddrs); free(devMem->memwriteCellAddrs);
free(devMem->memwriteCellSegments); free(devMem->memwriteCellSegments);
free(devMem->memwriteValues[0]);
free(devMem->memwriteValues); free(devMem->memwriteValues);
free(devMem->memwriteWordLengths);
free(devMem->cells); free(devMem->cells);
free(devMem->smartAddrReadHandlers); free(devMem->smartAddrReadHandlers);
free(devMem->smartAddrWriteHandlers); free(devMem->smartAddrWriteHandlers);
@@ -118,10 +117,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
return NULL; return NULL;
} }
for(size_t i = 0; i < devSpec->memSpecsCount; i++)
{
// cellNames[i] = devSpec->memSpecs[i]->name;
}
for (uint8_t i = 0; i < devSpec->memSpecsCount; i++) for (uint8_t i = 0; i < devSpec->memSpecsCount; i++)
{ {
@@ -170,8 +165,7 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
return NULL; return NULL;
} }
void** memwriteValues = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(void*)); uint64_t* memwriteValues = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint64_t));
if(memwriteValues == NULL) if(memwriteValues == NULL)
{ {
sprintf(errbuf, "unable to allocate write interception addrs"); sprintf(errbuf, "unable to allocate write interception addrs");
@@ -186,43 +180,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
return NULL; return NULL;
} }
uint64_t* memwriteValuesContainers = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint64_t));
if(memwriteValuesContainers == NULL)
{
sprintf(errbuf, "unable to allocate write interception addrs");
free(devMem->memsegShifts);
free(devMem);
free(rawCells);
free(memreadCellAddrs);
free(cells);
free(cellNames);
free(memwriteCellAddrs);
free(memwriteCellSegments);
free(memwriteValues);
return NULL;
}
for(size_t i = 0; i < 64; i++)
{
memwriteValues[i] = &memwriteValuesContainers[i];
}
uint8_t* memwriteWordLengths = calloc(MEM_ACCESS_INTERCEPT_BUF_SIZE, sizeof(uint8_t));
if(memwriteWordLengths == NULL)
{
sprintf(errbuf, "unable to allocate write interception addrs");
free(devMem->memsegShifts);
free(devMem);
free(rawCells);
free(memreadCellAddrs);
free(cells);
free(cellNames);
free(memwriteCellAddrs);
free(memwriteCellSegments);
free(memwriteValues);
free(memwriteValuesContainers);
return NULL;
}
uint64_t smartAddrReadMask = 0; uint64_t smartAddrReadMask = 0;
uint64_t smartAddrWriteMask = 0; uint64_t smartAddrWriteMask = 0;
@@ -240,8 +197,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
free(memwriteCellAddrs); free(memwriteCellAddrs);
free(memwriteCellSegments); free(memwriteCellSegments);
free(memwriteValues); free(memwriteValues);
free(memwriteValuesContainers);
free(memwriteWordLengths);
return NULL; return NULL;
} }
@@ -259,8 +214,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
free(memwriteCellAddrs); free(memwriteCellAddrs);
free(memwriteCellSegments); free(memwriteCellSegments);
free(memwriteValues); free(memwriteValues);
free(memwriteValuesContainers);
free(memwriteWordLengths);
free(cells); free(cells);
free(cellNames); free(cellNames);
return NULL; return NULL;
@@ -270,7 +223,26 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
if(memsegSizes == NULL) if(memsegSizes == NULL)
{ {
sprintf(errbuf, "unable to allocate write interception handlers"); sprintf(errbuf, "unable to allocate memseg sizes");
free(smartAddrWriteHandlers);
free(smartAddrReadHandlers);
free(devMem->memsegShifts);
free(devMem);
free(rawCells);
free(memreadCellAddrs);
free(memwriteCellAddrs);
free(memwriteCellSegments);
free(memwriteValues);
free(cells);
free(cellNames);
return NULL;
}
uint8_t* memsegCellSizes = calloc(devSpec->memSpecsCount, sizeof(uint8_t));
if(memsegCellSizes == NULL)
{
sprintf(errbuf, "unable to allocate memseg cell sizes");
free(memsegSizes);
free(smartAddrWriteHandlers); free(smartAddrWriteHandlers);
free(smartAddrReadHandlers); free(smartAddrReadHandlers);
free(devMem->memsegShifts); free(devMem->memsegShifts);
@@ -280,31 +252,29 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
free(memwriteCellAddrs); free(memwriteCellAddrs);
free(memwriteCellSegments); free(memwriteCellSegments);
free(memwriteValues); free(memwriteValues);
free(memwriteValuesContainers);
free(memwriteWordLengths);
free(cells); free(cells);
free(cellNames); free(cellNames);
return NULL; return NULL;
} }
for(uint64_t i = 0; i < memTotalSize; i++) // for(uint64_t i = 0; i < memTotalSize; i++)
{ // {
smartAddrReadHandlers[i].func = NULL; // smartAddrReadHandlers[i].func = NULL;
smartAddrReadHandlers[i].ident = 0; // smartAddrReadHandlers[i].ident = 0;
smartAddrWriteHandlers[i].func = NULL; // smartAddrWriteHandlers[i].func = NULL;
smartAddrWriteHandlers[i].ident = 0; // smartAddrWriteHandlers[i].ident = 0;
} // }
for(uint64_t i = 0; i < memTotalSize; i++) // for(uint64_t i = 0; i < memTotalSize; i++)
{ // {
if((i & smartAddrReadMask) == smartAddrReadMask) // if((i & smartAddrReadMask) == smartAddrReadMask)
{ // {
smartAddrReadHandlers[i].func = NULL; // smartAddrReadHandlers[i].func = NULL;
} // }
} // }
if (devSpec->smartReadSpecsCount > 0) if (devSpec->smartReadSpecsCount > 0)
@@ -355,7 +325,6 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
devMem->memreadCellAddrs = memreadCellAddrs; devMem->memreadCellAddrs = memreadCellAddrs;
devMem->memwriteCellAddrs = memwriteCellAddrs; devMem->memwriteCellAddrs = memwriteCellAddrs;
devMem->memwriteCellSegments = memwriteCellSegments; devMem->memwriteCellSegments = memwriteCellSegments;
devMem->memwriteWordLengths = memwriteWordLengths;
devMem->memwriteValues = memwriteValues; devMem->memwriteValues = memwriteValues;
devMem->memreadLen = 0; devMem->memreadLen = 0;
devMem->memwriteLen = 0; devMem->memwriteLen = 0;
@@ -365,6 +334,7 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
devMem->smartAddrWriteHandlers = smartAddrWriteHandlers; devMem->smartAddrWriteHandlers = smartAddrWriteHandlers;
devMem->memsegNames = cellNames; devMem->memsegNames = cellNames;
devMem->memsegSizes = memsegSizes; devMem->memsegSizes = memsegSizes;
devMem->memsegCellSizes = memsegCellSizes;
memseg_metadata_t requiredSegments[] = MEMSEG_DEFINES; memseg_metadata_t requiredSegments[] = MEMSEG_DEFINES;
@@ -380,7 +350,8 @@ device_mem_t* genDevMem(device_specs_t* devSpec, char* errbuf)
const uint8_t seg_id = seg_def.seg_id; const uint8_t seg_id = seg_def.seg_id;
devMem->memsegShifts[seg_id] = memSegToGlobal(devSpec, i, 0); devMem->memsegShifts[seg_id] = memSegToGlobal(devSpec, i, 0);
devMem->memsegSizes[seg_id] = devSpec->memSpecs[i]->len; devMem->memsegSizes[seg_id] = devSpec->memSpecs[i]->len;
printf("set mem segment %d meta: +%lu/%lu \n", seg_id, devMem->memsegShifts[j], devMem->memsegSizes[j]); devMem->memsegCellSizes[seg_id] = devSpec->memSpecs[i]->wordLen;
printf("set mem segment %d meta: +%lu/%lu[%d] \n", seg_id, devMem->memsegShifts[seg_id], devMem->memsegSizes[seg_id], devMem->memsegCellSizes[seg_id]);
} }
} }
} }
@@ -404,6 +375,10 @@ uint8_t makeDeviceTick(device_public_context_t* devContext)
prog_counter_t _pc; prog_counter_t _pc;
READ_MEM(_pc, devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t) READ_MEM(_pc, devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t)
// printf("old PC is %d\n", _pc); // printf("old PC is %d\n", _pc);
if(_pc >= devInfo->deviceMem->memsegSizes[MEMSEG_PS])
{
_pc = 0;
}
uint8_t ticks = makeTick(&_pc, devInfo->instr, devInfo->deviceMem); uint8_t ticks = makeTick(&_pc, devInfo->instr, devInfo->deviceMem);
WRITE_MEM(devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t, _pc); WRITE_MEM(devInfo->deviceMem, MEMSEG_PC_SEG_NUM, MEMSEG_PC_ADDR, prog_counter_t, _pc);
// printf("new PC is %d\n", _pc); // printf("new PC is %d\n", _pc);
@@ -593,7 +568,7 @@ device_specs_t* parseSpecsFromConfig(const conf_dev_t* devConf, char* errbuf)
free(requiredSegmentsFoundMap); free(requiredSegmentsFoundMap);
return NULL; return NULL;
} }
specs->memSpecs[specNum]->name = calloc(strlen(segments[i]->name), sizeof(char)); specs->memSpecs[specNum]->name = calloc(strlen(segments[i]->name) + 1, sizeof(char));
if(specs->memSpecs[specNum]->name == NULL) if(specs->memSpecs[specNum]->name == NULL)
{ {
sprintf(errbuf, "unable to allocate spec %d name", i); sprintf(errbuf, "unable to allocate spec %d name", i);
@@ -837,6 +812,8 @@ uint8_t pubExtractPcounterSizeWords()
void reset (device_specs_t* specs, device_public_context_t* devInfo) void reset (device_specs_t* specs, device_public_context_t* devInfo)
{ {
// printf("reset device\n");
// uint8_t fuck = 0;
for(size_t i = 0; i < specs->memSpecsCount; i++) for(size_t i = 0; i < specs->memSpecsCount; i++)
{ {
if(i != MEMDATA_OPSIZE) if(i != MEMDATA_OPSIZE)
@@ -845,6 +822,7 @@ void reset (device_specs_t* specs, device_public_context_t* devInfo)
for(size_t j = 0; j < spec->len; j++) for(size_t j = 0; j < spec->len; j++)
{ {
((uint8_t*)devInfo->deviceMem->cells[i])[j] = 0; ((uint8_t*)devInfo->deviceMem->cells[i])[j] = 0;
// fuck++;
} }
} }
} }
@@ -852,3 +830,4 @@ void reset (device_specs_t* specs, device_public_context_t* devInfo)
devInfo->deviceMem->memwriteLen = 0; devInfo->deviceMem->memwriteLen = 0;
devInfo->deviceMem->memreadLen = 0; devInfo->deviceMem->memreadLen = 0;
} }

View File

@@ -4,13 +4,13 @@
#include <stdint.h> #include <stdint.h>
#include "wsServer/include/ws.h" #include "wsServer/include/ws.h"
#include "ptQueue/inc/ptQueue.h" #include "ptQueue/inc/spsc.h"
typedef struct { typedef struct {
ws_cli_conn_t clientId; ws_cli_conn_t clientId;
uint8_t isAuthed; uint8_t isAuthed;
ptQueue* incomeQ; queue_spsc_t* incomeQ;
ptQueue* outcomeQ; queue_spsc_t* outcomeQ;
uint64_t connectedAt; uint64_t connectedAt;
uint32_t streamRegIterator; uint32_t streamRegIterator;
} ClientContext; } ClientContext;

View File

@@ -3,10 +3,10 @@
#include "pub/libhmmmm/config.h" #include "pub/libhmmmm/config.h"
void freeMemSegConf(conf_mem_seg_t* memSegConf); // void freeMemSegConf(conf_mem_seg_t* memSegConf);
void freeMemConf(conf_mem_t* memConf); // void freeMemConf(conf_mem_t* memConf);
void freeConf(conf_dev_t* conf); void freeConf(conf_dev_t* conf);
void freeComposeId(char** id); void freeComposeId(char** id);
uint8_t compareComposeId(char** idA, char** idB); // uint8_t compareComposeId(char** idA, char** idB);
#endif #endif

View File

@@ -2,10 +2,12 @@
#define __CONTEXT_H__ #define __CONTEXT_H__
#include <pthread.h> #include <pthread.h>
#include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include "ptQueue/inc/ptQueue.h" #include "ptQueue/inc/spsc.h"
#include "wsServer/include/ws.h" #include "wsServer/include/ws.h"
#include "ptQueue/inc/mpsc.h"
#include "linkedlist.h" #include "linkedlist.h"
#include "sized_ptr.h" #include "sized_ptr.h"
@@ -16,8 +18,8 @@
typedef struct { typedef struct {
SizedPtr* bufs; SizedPtr* bufs;
uint8_t buffersCount; uint8_t buffersCount;
_Atomic (uint8_t) readRequestIdx; CACHE_ALIGN _Atomic (uint8_t) readRequestIdx;
_Atomic (uint8_t) currWritingIdx; CACHE_ALIGN _Atomic (uint8_t) currWritingIdx;
} OutgoingBuffers; } OutgoingBuffers;
@@ -43,12 +45,15 @@ typedef struct {
// Cached DeviceIdMappingNotif broadcast message (sent to newly authed clients) // Cached DeviceIdMappingNotif broadcast message (sent to newly authed clients)
uint8_t* deviceIdMappingMsg; uint8_t* deviceIdMappingMsg;
size_t deviceIdMappingMsgLen; size_t deviceIdMappingMsgLen;
queue_mpsc_t* inMsgQueue;
queue_spsc_t* outMsgQueue;
} EmulContext; } EmulContext;
typedef struct { typedef struct {
pthread_mutex_t registerMutex; pthread_mutex_t registerMutex;
ptQueue* regQueue; queue_spsc_t* regQueue;
queue_mpsc_t* inMsgQueue;
uint8_t* accessToken; uint8_t* accessToken;
EmulContext* emulContext; EmulContext* emulContext;
} ServerContext; } ServerContext;

View File

@@ -4,8 +4,9 @@
#include <stdint.h> #include <stdint.h>
#include "context.h" #include "context.h"
#include "ptQueue/inc/spsc.h"
void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen); void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen);
void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen); void dispatchOutgoingMessage(queue_spsc_t* outMsgQueue, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen);
#endif #endif

View File

@@ -7,7 +7,7 @@
#include "proto/dial.h" #include "proto/dial.h"
#include "stream_reader.h" #include "stream_reader.h"
void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId); // static void unregisterClientStream(EmulContext* emulContext, ClientContext* ctx, uint32_t regId);
void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx); void unregisterClientStreams(EmulContext* emulContext, ClientContext* ctx);
void handleIncomingStreamMessage( void handleIncomingStreamMessage(

View File

@@ -1,6 +1,7 @@
#ifndef __PROTO_MSG_H__ #ifndef __PROTO_MSG_H__
#define __PROTO_MSG_H__ #define __PROTO_MSG_H__
#include "client.h"
#include "wsServer/include/ws.h" #include "wsServer/include/ws.h"
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
@@ -10,6 +11,7 @@
typedef struct { typedef struct {
uint8_t* data; uint8_t* data;
size_t size; size_t size;
ClientContext* ctx;
} FbMessage; } FbMessage;
typedef struct { typedef struct {

View File

@@ -32,11 +32,11 @@ typedef struct
char** memsegNames; char** memsegNames;
uint64_t* memsegShifts; uint64_t* memsegShifts;
uint64_t* memsegSizes; uint64_t* memsegSizes;
uint8_t* memsegCellSizes;
uint64_t* memreadCellAddrs; uint64_t* memreadCellAddrs;
uint8_t* memwriteWordLengths;
uint8_t* memwriteCellSegments; uint8_t* memwriteCellSegments;
uint64_t* memwriteCellAddrs; uint64_t* memwriteCellAddrs;
void** memwriteValues; uint64_t* memwriteValues;
uint8_t memreadLen; uint8_t memreadLen;
uint8_t memwriteLen; uint8_t memwriteLen;
} device_mem_t; } device_mem_t;
@@ -87,10 +87,9 @@ typedef void (*ext_h_write_func)(uint64_t addr, void* rawCells, void* data, void
} \ } \
__mem->memwriteCellAddrs[__mem->memwriteLen] = __addr; \ __mem->memwriteCellAddrs[__mem->memwriteLen] = __addr; \
__mem->memwriteCellSegments[__mem->memwriteLen] = __segno; \ __mem->memwriteCellSegments[__mem->memwriteLen] = __segno; \
__mem->memwriteWordLengths[__mem->memwriteLen] = sizeof(__cell_t); \ /*__mem->memwriteWordLengths[__mem->memwriteLen] = sizeof(__cell_t);*/ \
*((__cell_t*)__mem->memwriteValues[__mem->memwriteLen]) = (__cell_t)(__val); \ (__mem->memwriteValues[__mem->memwriteLen]) = (__cell_t)(__val); \
__mem->memwriteLen += 1; \ __mem->memwriteLen += 1; \
} }
#endif // ifndef __HMMMM_PUB_MEM_H__ #endif // ifndef __HMMMM_PUB_MEM_H__

View File

@@ -6,80 +6,81 @@
#include "compose_device.h" #include "compose_device.h"
char** appendId(char** prev, const char* cur, char* errbuf) // char** appendId(char** prev, const char* cur, char* errbuf)
{ // {
if(prev == NULL) // if(prev == NULL)
{ // {
prev = malloc(sizeof(char*) * 2); // prev = malloc(sizeof(char*) * 2);
if(prev == NULL) // if(prev == NULL)
{ // {
snprintf(errbuf, 1024, "unable to allocate id"); // snprintf(errbuf, 1024, "unable to allocate id");
return NULL; // return NULL;
} // }
prev[0] = NULL; // prev[0] = NULL;
prev[1] = NULL; // prev[1] = NULL;
} // }
size_t clockIdLen = 0; // size_t clockIdLen = 0;
while (prev[clockIdLen] != NULL){clockIdLen++;} // while (prev[clockIdLen] != NULL){clockIdLen++;}
clockIdLen++; // clockIdLen++;
char** new = realloc(prev, sizeof(char*) * (clockIdLen + 1)); // char** new = realloc(prev, sizeof(char*) * (clockIdLen + 1));
if(new == NULL) // if(new == NULL)
{ // {
snprintf(errbuf, 1024, "unable to reallocate id"); // snprintf(errbuf, 1024, "unable to reallocate id");
freeComposeId(prev); // freeComposeId(prev);
return NULL; // return NULL;
} // }
prev = new; // prev = new;
prev[clockIdLen] = NULL; // prev[clockIdLen] = NULL;
size_t idLen = strlen(cur); // size_t idLen = strlen(cur);
prev[clockIdLen - 1] = malloc(sizeof(char) * (idLen + 1)); // prev[clockIdLen - 1] = malloc(sizeof(char) * (idLen + 1));
if(prev[clockIdLen - 1] == NULL) // if(prev[clockIdLen - 1] == NULL)
{ // {
snprintf(errbuf, 1024, "unable to allocate new id entry"); // snprintf(errbuf, 1024, "unable to allocate new id entry");
freeComposeId(prev); // freeComposeId(prev);
return NULL; // return NULL;
} // }
strcpy(prev[clockIdLen - 1], cur); // strcpy(prev[clockIdLen - 1], cur);
prev[clockIdLen - 1][idLen] = '\0'; // prev[clockIdLen - 1][idLen] = '\0';
return prev; // return prev;
} // }
void freeProjectionConfig(projection_conf_t* conf) // static void freeProjectionConfig(projection_conf_t* conf)
{ // {
if(conf == NULL) // if(conf == NULL)
{ // {
return; // return;
} // }
freeComposeId(conf->target); // freeComposeId(conf->target);
freeComposeId(conf->baseAt); // freeComposeId(conf->baseAt);
if(conf->baseSeg != NULL) // if(conf->baseSeg != NULL)
{ // {
free(conf->baseSeg); // free(conf->baseSeg);
} // }
conf->target = NULL; // conf->target = NULL;
conf->baseAt = NULL; // conf->baseAt = NULL;
conf->baseSeg = NULL; // conf->baseSeg = NULL;
free(conf); // free(conf);
} // }
void freeProjectionConfigs(projection_conf_t** confs)
{ // void freeProjectionConfigs(projection_conf_t** confs)
if(confs == NULL) // {
{ // if(confs == NULL)
return; // {
} // return;
size_t i = 0; // }
while(confs[i] != NULL) // size_t i = 0;
{ // while(confs[i] != NULL)
freeProjectionConfig(confs[i]); // {
confs[i] = NULL; // freeProjectionConfig(confs[i]);
} // confs[i] = NULL;
free(confs); // }
} // free(confs);
// }
device_handle_t** openComposeDevice(compose_dev_conf_t* conf, char* errbuf) device_handle_t** openComposeDevice(compose_dev_conf_t* conf, char* errbuf)

View File

@@ -4,7 +4,7 @@
#include "config.h" #include "config.h"
void xfree(void* p) static void xfree(void* p)
{ {
if(p != NULL) if(p != NULL)
{ {
@@ -28,21 +28,21 @@ void freeComposeId(char** id)
} }
uint8_t compareComposeId(char** idA, char** idB) // static uint8_t compareComposeId(char** idA, char** idB)
{ // {
size_t i = 0; // size_t i = 0;
while(idA[i] != NULL && idB[i] != NULL) // while(idA[i] != NULL && idB[i] != NULL)
{ // {
if(strcmp(idA[i], idB[i]) != 0) // if(strcmp(idA[i], idB[i]) != 0)
{ // {
return 0; // return 0;
} // }
i++; // i++;
} // }
return idA[i] == NULL && idB[i] == NULL; // return idA[i] == NULL && idB[i] == NULL;
} // }
void freeMemSegConf(conf_mem_seg_t* memSegConf) static void freeMemSegConf(conf_mem_seg_t* memSegConf)
{ {
if(memSegConf == NULL) if(memSegConf == NULL)
{ {
@@ -50,7 +50,7 @@ void freeMemSegConf(conf_mem_seg_t* memSegConf)
} }
xfree(memSegConf->name); xfree(memSegConf->name);
} }
void freeMemConf(conf_mem_t* memConf) static void freeMemConf(conf_mem_t* memConf)
{ {
if(memConf == NULL) if(memConf == NULL)
{ {

View File

@@ -26,7 +26,7 @@ typedef void (*_dlib_freeDevMem_t)(device_mem_t* mem);
typedef void (*_dlib_fillSmartReadSpecs_t)(void* specs, smart_read_spec_t* smartReadSpecs, uint64_t smartReadSpecsCount); typedef void (*_dlib_fillSmartReadSpecs_t)(void* specs, smart_read_spec_t* smartReadSpecs, uint64_t smartReadSpecsCount);
typedef void (*_dlib_fillSmartWriteSpecs_t)(void* specs, smart_write_spec_t* smartWriteSpecs, uint64_t smartWriteSpecsCount); typedef void (*_dlib_fillSmartWriteSpecs_t)(void* specs, smart_write_spec_t* smartWriteSpecs, uint64_t smartWriteSpecsCount);
instruction_simul_handlers_t* _fillInstructionSimul(void* handle) static instruction_simul_handlers_t* _fillInstructionSimul(void* handle)
{ {
instruction_simul_handlers_t* ret = malloc(sizeof(instruction_simul_handlers_t)); instruction_simul_handlers_t* ret = malloc(sizeof(instruction_simul_handlers_t));

View File

@@ -1,3 +1,4 @@
#include <signal.h>
#include <stdio.h> #include <stdio.h>
#include <time.h> #include <time.h>
#include <stdint.h> #include <stdint.h>
@@ -33,7 +34,9 @@
#include "linkedlist.h" #include "linkedlist.h"
#include "ptQueue/inc/ptQueue.h" #include "ptQueue/inc/spsc.h"
#include "ptQueue/inc/mpsc.h"
#include "wsServer/include/ws.h" #include "wsServer/include/ws.h"
#define _GNU_SOURCE #define _GNU_SOURCE
@@ -64,69 +67,77 @@
#include "proto/pack.h" #include "proto/pack.h"
void printMemory(void* cells, uint64_t cellsCount) volatile sig_atomic_t done = 0;
void term(int signum)
{ {
printf("Caught!\n");
for(uint64_t i = 0; i < cellsCount; i++) done = 1;
{
printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]);
// uint8_t wasRead = 0;
// for (uint8_t j = 0; j < mem->memreadLen; j++)
// {
// if(mem->memreadCellAddrs[j] == i)
// {
// wasRead = 1;
// break;
// }
// }
// uint8_t wasWrite = 0;
// for (uint8_t j = 0; j < mem->memwriteLen; j++)
// {
// if(mem->memwriteCellAddrs[j] == i)
// {
// wasWrite = 1;
// break;
// }
// }
// if (wasRead == 1)
// {
// printf("\t[was read]");
// }
// else
// {
// printf("\t[was not read]");
// }
// if (wasWrite == 1)
// {
// printf("\t[was written]");
// }
// else
// {
// printf("\t[was not written]");
// }
printf("\n");
}
} }
// static void printMemory(void* cells, uint64_t cellsCount)
// {
// for(uint64_t i = 0; i < cellsCount; i++)
// {
// printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]);
// // uint8_t wasRead = 0;
// // for (uint8_t j = 0; j < mem->memreadLen; j++)
// // {
// // if(mem->memreadCellAddrs[j] == i)
// // {
// // wasRead = 1;
// // break;
// // }
// // }
// // uint8_t wasWrite = 0;
// // for (uint8_t j = 0; j < mem->memwriteLen; j++)
// // {
// // if(mem->memwriteCellAddrs[j] == i)
// // {
// // wasWrite = 1;
// // break;
// // }
// // }
// // if (wasRead == 1)
// // {
// // printf("\t[was read]");
// // }
// // else
// // {
// // printf("\t[was not read]");
// // }
// // if (wasWrite == 1)
// // {
// // printf("\t[was written]");
// // }
// // else
// // {
// // printf("\t[was not written]");
// // }
// printf("\n");
// }
// }
void dummyWriteHandler(uint64_t ident, uint64_t addr, void* rawCells, void* data)
{ // void dummyWriteHandler(uint64_t ident, uint64_t addr, void* rawCells, void* data)
// printf("Intercepted write on 0x%lx: 0x%02x\n", addr, *((uint8_t*)data)); // {
printf("Intercepted write on 0x%lx: ", addr); // // printf("Intercepted write on 0x%lx: 0x%02x\n", addr, *((uint8_t*)data));
printf("0b"); // printf("Intercepted write on 0x%lx: ", addr);
for(uint8_t i = 0; i < 8; i++) // printf("0b");
{ // for(uint8_t i = 0; i < 8; i++)
printf("%d", (*((uint8_t*)data) >> (7 - i)) & 1); // {
} // printf("%d", (*((uint8_t*)data) >> (7 - i)) & 1);
printf("\n"); // }
((uint8_t*)rawCells)[addr] = *((uint8_t*)data); // printf("\n");
return; // ((uint8_t*)rawCells)[addr] = *((uint8_t*)data);
} // return;
// }
@@ -135,24 +146,24 @@ ext_h_read_func* interceptReadRouter;
ext_h_write_func* interceptWriteRouter; ext_h_write_func* interceptWriteRouter;
void** iterceptDevContextRouter; void** iterceptDevContextRouter;
void* readExt(uint64_t ident, uint64_t addr, void* rawCells) // static void* readExt(uint64_t ident, uint64_t addr, void* rawCells)
{ // {
void* devContext = iterceptDevContextRouter[ident]; // void* devContext = iterceptDevContextRouter[ident];
ext_h_read_func tgt = interceptReadRouter[ident]; // ext_h_read_func tgt = interceptReadRouter[ident];
return tgt(addr, rawCells, devContext); // return tgt(addr, rawCells, devContext);
} // }
void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data) // static void writeExt(uint64_t ident, uint64_t addr, void* rawCells, void* data)
{ // {
void* devContext = iterceptDevContextRouter[ident]; // void* devContext = iterceptDevContextRouter[ident];
ext_h_write_func tgt = interceptWriteRouter[ident]; // ext_h_write_func tgt = interceptWriteRouter[ident];
tgt(addr, rawCells, data, devContext); // tgt(addr, rawCells, data, devContext);
} // }
// void *threadMain(void *param); // void *threadMain(void *param);
void my_sleep(int microseconds) { static void my_sleep(int microseconds) {
struct timespec ts; struct timespec ts;
ts.tv_sec = microseconds / 1000000; // секунды ts.tv_sec = microseconds / 1000000; // секунды
ts.tv_nsec = (microseconds % 1000000) * 1000; // наносекунды ts.tv_nsec = (microseconds % 1000000) * 1000; // наносекунды
@@ -232,99 +243,90 @@ static void compact_outgoing_buffer(SizedPtr* buf)
} }
void* outgoingMain(void* args) static void* outgoingMain(void* args)
{ {
OutgoingBuffers* outBufs = args; EmulContext* emulContext = args;
uint8_t bufsCount = outBufs->buffersCount; // OutgoingBuffers* outBufs = emulContext->outBufs;
uint8_t currBufIdx = 0; // uint8_t bufsCount = outBufs->buffersCount;
uint8_t currWritingIdxPtr = 0; // uint8_t currBufIdx = 0;
SizedPtr* curBuf = NULL; // uint8_t currWritingIdxPtr = 0;
while (currWritingIdxPtr != 0xFF) // SizedPtr* curBuf = NULL;
while(!done)
{ {
if(curBuf != NULL) OutgoingMessage* outMsg = NULL;
if(queue_spsc_pop(emulContext->outMsgQueue, (void**)&outMsg))
{ {
compact_outgoing_buffer(curBuf); if(outMsg != NULL)
OutgoingMessage* messages = curBuf->ptr;
for(size_t i = 0; i < curBuf->size; i++)
{ {
OutgoingMessage *outMsg = &messages[i];
if(outMsg->msg == NULL)
{
// Slot was merged into a CompactedMessage; skip.
continue;
}
ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen); ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen);
free(outMsg->msg); free(outMsg->msg);
outMsg->msg = NULL; free(outMsg);
} }
curBuf->size = 0;
atomic_store(&outBufs->readRequestIdx, currBufIdx);
currBufIdx++;
if(currBufIdx >= bufsCount)
{
currBufIdx = 0;
}
}
currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx);
if(currBufIdx == currWritingIdxPtr)
{
atomic_store(&outBufs->readRequestIdx, currBufIdx);
if(curBuf == NULL)
{
// my_sleep(1);
}
else
{
curBuf = NULL;
}
}
else
{
curBuf = &outBufs->bufs[currBufIdx];
} }
} }
// while (currWritingIdxPtr != 0xFF)
// {
// if(curBuf != NULL)
// {
// compact_outgoing_buffer(curBuf);
// OutgoingMessage* messages = curBuf->ptr;
// for(size_t i = 0; i < curBuf->size; i++)
// {
// OutgoingMessage *outMsg = &messages[i];
// if(outMsg->msg == NULL)
// {
// // Slot was merged into a CompactedMessage; skip.
// continue;
// }
// ws_sendframe_bin(outMsg->clientIdx, (const char*)outMsg->msg, outMsg->msgLen);
// free(outMsg->msg);
// outMsg->msg = NULL;
// }
// curBuf->size = 0;
// atomic_store(&outBufs->readRequestIdx, currBufIdx);
// currBufIdx++;
// if(currBufIdx >= bufsCount)
// {
// currBufIdx = 0;
// }
// }
// currWritingIdxPtr = atomic_load(&outBufs->currWritingIdx);
// if(currBufIdx == currWritingIdxPtr)
// {
// atomic_store(&outBufs->readRequestIdx, currBufIdx);
// if(curBuf == NULL)
// {
// // my_sleep(1);
// }
// else
// {
// curBuf = NULL;
// }
// }
// else
// {
// curBuf = &outBufs->bufs[currBufIdx];
// }
// }
pthread_exit(0); pthread_exit(0);
} }
void mockDevice0(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen)
{
for(size_t i = 0; i < 8; i++)
{
readAddrs[*readAddrsLen] = i;
*readAddrsLen += 1;
}
uint64_t num = decodeBytesToU64(mem); static void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, device_mem_t* mem)
num += 1;
encodeUintToBytes(num, mem);
for(size_t i = 0; i < 8; i++)
{
writeAddrs[*writeAddrsLen] = i;
*writeAddrsLen += 1;
}
}
void mockDevice1(uint8_t* mem, uint64_t* readAddrs, size_t* readAddrsLen, uint64_t* writeAddrs, size_t* writeAddrsLen)
{
}
void dispatchStreamSegment(EmulContext* emulContext, StreamReg* reg, device_mem_t* mem)
{ {
size_t mlen = 0; size_t mlen = 0;
uint8_t* msg = fb_build_stream_data_push( uint8_t* msg = fb_build_stream_data_push(
&emulContext->stream_builder, &emulContext->stream_builder,
UINT64_MAX, reg->regId, *emulContext->clockCounter, UINT64_MAX, reg->regId, *emulContext->clockCounter,
(uint8_t*)(((uint64_t)(mem->cells[reg->segId])) + (uint64_t)reg->startAddr), reg->segLen, &mlen); (uint8_t*)(((uint64_t)(mem->cells[reg->segId])) + (uint64_t)reg->startAddr), reg->segLen, &mlen);
dispatchOutgoingMessage(emulContext->outBufs, reg->clientContext->clientId, msg, mlen); dispatchOutgoingMessage(emulContext->outMsgQueue, reg->clientContext->clientId, msg, mlen);
} }
void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, device_mem_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode) static inline void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg* deviceRegs, device_mem_t* mem, uint64_t* addrs, size_t addrsLen, uint8_t mode)
{ {
if(deviceRegs->regCount == 0) if(deviceRegs->regCount == 0)
{ {
@@ -338,46 +340,66 @@ void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg
size_t dispatchRegsCnt = 0; size_t dispatchRegsCnt = 0;
// uint8_t dispatchedRegMap[1024 * 16] = {0}; // uint8_t dispatchedRegMap[1024 * 16] = {0};
for(size_t i = 0; i < addrsLen; i++)
for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++)
{ {
uint64_t addr = addrs[i]; StreamReg* reg = &deviceRegs->regs[regIdx];
for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++) if(reg->mode == mode)
{ {
StreamReg* reg = &deviceRegs->regs[regIdx]; for(size_t i = 0; i < addrsLen; i++)
if(reg->mode == mode)
{ {
const uint64_t addr = addrs[i];
if(reg->startGlobalAddr <= addr && addr <= reg->startGlobalAddr + reg->segLen) if(reg->startGlobalAddr <= addr && addr <= reg->startGlobalAddr + reg->segLen)
{ {
// if(dispatchedRegMap[reg->regId] == 1) dispatchRegs[dispatchRegsCnt] = reg;
// { dispatchRegsCnt++;
// // break; break;
// }
// else
// {
// dispatchRegs[dispatchRegsCnt] = reg;
// dispatchedRegMap[reg->regId] = 1;
// dispatchRegsCnt++;
// }
uint8_t isDuplicate = 0;
for(size_t j = 0; j < dispatchRegsCnt; j++)
{
if(dispatchRegs[j] == reg)
{
isDuplicate = 1;
break;
}
}
if(!isDuplicate)
{
dispatchRegs[dispatchRegsCnt] = reg;
// dispatchedRegMap[reg->regId] = 1;
dispatchRegsCnt++;
}
} }
} }
} }
} }
// for(size_t i = 0; i < addrsLen; i++)
// {
// uint64_t addr = addrs[i];
// for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++)
// {
// StreamReg* reg = &deviceRegs->regs[regIdx];
// if(reg->mode == mode)
// {
// if(reg->startGlobalAddr <= addr && addr <= reg->startGlobalAddr + reg->segLen)
// {
// // if(dispatchedRegMap[reg->regId] == 1)
// // {
// // // break;
// // }
// // else
// // {
// // dispatchRegs[dispatchRegsCnt] = reg;
// // dispatchedRegMap[reg->regId] = 1;
// // dispatchRegsCnt++;
// // }
// uint8_t isDuplicate = 0;
// for(size_t j = 0; j < dispatchRegsCnt; j++)
// {
// if(dispatchRegs[j] == reg)
// {
// isDuplicate = 1;
// break;
// }
// }
// if(!isDuplicate)
// {
// dispatchRegs[dispatchRegsCnt] = reg;
// // dispatchedRegMap[reg->regId] = 1;
// dispatchRegsCnt++;
// }
// }
// }
// }
// }
// if(dispatchRegsCnt == 0) // if(dispatchRegsCnt == 0)
// { // {
// printf("No memory dispatched\n"); // printf("No memory dispatched\n");
@@ -391,7 +413,7 @@ void dispatchMemAccessNotifications(EmulContext* emulContext, DeviceSegStreamReg
} }
uint64_t getCurrentUsec() static uint64_t getCurrentUsec()
{ {
struct timeval tv; struct timeval tv;
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
@@ -406,10 +428,218 @@ static void print_usage(const char* progname)
fprintf(stderr, " --host HOST Listen host (default: localhost)\n"); fprintf(stderr, " --host HOST Listen host (default: localhost)\n");
} }
static void outbufMaintence(EmulContext* emulContext)
{
OutgoingBuffers* outBufs = emulContext->outBufs;
uint8_t readReqIdx = atomic_load(&outBufs->readRequestIdx);
if(readReqIdx == outBufs->currWritingIdx || outBufs->bufs[outBufs->currWritingIdx].size >= outBufs->bufs[outBufs->currWritingIdx].allocatedSize / 2)
{
uint8_t newWriteIdx = outBufs->currWritingIdx + 1;
if(newWriteIdx >= outBufs->buffersCount)
{
newWriteIdx = 0;
}
if(outBufs->bufs[outBufs->currWritingIdx].size != 0 || readReqIdx != newWriteIdx)
{
while(readReqIdx == newWriteIdx)
{
printf("FLOOD CONTROL - SLEEP\n");
my_sleep(100);
readReqIdx = atomic_load(&outBufs->readRequestIdx);
}
atomic_store(&outBufs->currWritingIdx, newWriteIdx);
outBufs->bufs[outBufs->currWritingIdx].size = 0;
}
}
}
static StreamReg* dispatchRegs[1024 * 16];
static void commitDevMem(EmulContext* emulContext)
{
for(size_t di = 0; di < emulContext->devicesCount; di++)
{
device_handle_t* dev = (device_handle_t*)emulContext->deviceHandles[di];
device_mem_t* devMem = dev->ctx->deviceMem;
if(dev->clockCycleCounter == 0)
{
DeviceSegStreamReg* deviceRegs = emulContext->deviceStreamRegs[di];
size_t dispatchRegsCnt = 0;
if(devMem->memwriteLen > 0)
{
// printf("device %lu has %d writes\n", di, devMem->memwriteLen);
// uint64_t globalWriteAddrs[MEM_ACCESS_INTERCEPT_BUF_SIZE] = {0};
for(size_t i = 0; i < devMem->memwriteLen; i++)
{
const uint8_t seg = devMem->memwriteCellSegments[i];
const uint8_t wordLen = devMem->memsegCellSizes[seg]; // devMem->memwriteWordLengths[i];
const uint64_t addr = devMem->memwriteCellAddrs[i];
uint64_t val = devMem->memwriteValues[i];
switch(wordLen)
{
case 1:
{
((uint8_t*)(devMem->cells[seg]))[addr] = ((uint8_t)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint8_t*)val, seg, addr);
break;
}
case 2:
{
((uint16_t*)(devMem->cells[seg]))[addr] = ((uint16_t)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint16_t*)val, seg, addr);
break;
}
case 4:
{
((uint32_t*)(devMem->cells[seg]))[addr] = ((uint32_t)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint32_t*)val, seg, addr);
break;
}
case 8:
{
((uint64_t*)(devMem->cells[seg]))[addr] = ((uint64_t)val);
// printf("[DEV/WRITE] %lu -> [%d].%lu\n", *(uint64_t*)val, seg, addr);
break;
}
default:
{
printf("invalid word size: %d\n", wordLen);
}
}
size_t globalAddr = devMem->memsegShifts[seg] + addr;
for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++)
{
StreamReg* reg = &deviceRegs->regs[regIdx];
if(reg->mode == STREAM_MODE_WRITE)
{
if(reg->startGlobalAddr <= globalAddr && globalAddr <= reg->startGlobalAddr + reg->segLen)
{
uint8_t isDuplicate = 0;
for(size_t j = 0; j < dispatchRegsCnt; j++)
{
if(dispatchRegs[j] == reg)
{
isDuplicate = 1;
break;
}
}
if(!isDuplicate)
{
dispatchRegs[dispatchRegsCnt] = reg;
dispatchRegsCnt++;
}
}
}
}
// globalWriteAddrs[i] = globalAddr;
}
// for(size_t i = 0; i < dispatchRegsCnt; i++)
// {
// dispatchStreamSegment(emulContext, dispatchWriteRegs[i], devMem);
// }
// dispatchMemAccessNotifications(emulContext, emulContext->deviceStreamRegs[di], devMem, globalWriteAddrs, (size_t)devMem->memwriteLen, STREAM_MODE_WRITE);
devMem->memwriteLen = 0;
}
else
{
// printf("device %lu has no writes\n", di);
}
if(devMem->memreadLen > 0)
{
// StreamReg* dispatchReadRegs[1024 * 16];
// size_t dispatchRegsCnt = 0;
for(size_t regIdx = 0; regIdx < deviceRegs->regCount; regIdx++)
{
StreamReg* reg = &deviceRegs->regs[regIdx];
if(reg->mode == STREAM_MODE_READ)
{
for(size_t i = 0; i < devMem->memreadLen; i++)
{
size_t globalAddr = devMem->memreadCellAddrs[i];
if(reg->startGlobalAddr <= globalAddr && globalAddr <= reg->startGlobalAddr + reg->segLen)
{
uint8_t isDuplicate = 0;
for(size_t j = 0; j < dispatchRegsCnt; j++)
{
if(dispatchRegs[j] == reg)
{
isDuplicate = 1;
break;
}
}
if(!isDuplicate)
{
dispatchRegs[dispatchRegsCnt] = reg;
dispatchRegsCnt++;
}
}
}
}
}
// for(size_t i = 0; i < dispatchRegsCnt; i++)
// {
// dispatchStreamSegment(emulContext, dispatchReadRegs[i], devMem);
// }
// dispatchMemAccessNotifications(emulContext, emulContext->deviceStreamRegs[di], devMem, devMem->memreadCellAddrs, (size_t)devMem->memreadLen, STREAM_MODE_READ);
devMem->memreadLen = 0;
}
for(size_t i = 0; i < dispatchRegsCnt; i++)
{
dispatchStreamSegment(emulContext, dispatchRegs[i], devMem);
}
}
}
}
static void tickDevices(EmulContext* emulContext, uint64_t clockCounter)
{
for (size_t di = 0; di < emulContext->devicesCount; di++)
{
device_handle_t* dev = (device_handle_t*)emulContext->deviceHandles[di];
if (clockCounter % dev->clockDivider == 0)
{
if(dev->clockCycleCounter == 0)
{
// printf("clock device %lu\n", di);
// device_mem_t* devMem = dev->ctx->deviceMem;
dev->clockCycleCounter = dev->lib->makeDeviceTick(dev->ctx);
}
}
}
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
uint16_t cli_port = 8181; struct sigaction action;
const char* cli_host = "localhost"; memset(&action, 0, sizeof(action));
action.sa_handler = term;
sigaction(SIGTERM, &action, NULL);
uint16_t cli_port = 9000;
const char* cli_host = "0.0.0.0";
const char* cli_token = NULL; const char* cli_token = NULL;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
@@ -433,12 +663,17 @@ int main(int argc, char** argv)
return 1; return 1;
} }
} }
char errbuf[1024]; // char errbuf[1024];
pthread_mutex_t mtx; pthread_mutex_t mtx;
pthread_mutex_init(&mtx, NULL); pthread_mutex_init(&mtx, NULL);
ptQueue* regQ = ptQueueCreate(errbuf); queue_spsc_t regQ;
NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf); if(!queue_spsc_init(&regQ, 65536))
{
panic("Unable to create reg q");
}
// queue_spsc_t* regQ = queue_spsc_init(errbuf);
// NULL_GUARD(regQ, "Unable to create reg q: %s\n", errbuf);
size_t deviceCount = 0; size_t deviceCount = 0;
DeviceSegStreamReg** deviceStreamRegs = NULL; DeviceSegStreamReg** deviceStreamRegs = NULL;
@@ -455,10 +690,10 @@ int main(int argc, char** argv)
for(size_t i = 0; i < outBufsCount; i++) for(size_t i = 0; i < outBufsCount; i++)
{ {
OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 128); OutgoingMessage* messages = malloc(sizeof(OutgoingMessage) * 1024);
NULL_GUARD(messages); NULL_GUARD(messages);
bufs[i].ptr = messages; bufs[i].ptr = messages;
bufs[i].allocatedSize = 128; bufs[i].allocatedSize = 1024;
bufs[i].size = 0; bufs[i].size = 0;
for(size_t j = 0; j < bufs[i].allocatedSize; j++) for(size_t j = 0; j < bufs[i].allocatedSize; j++)
@@ -477,12 +712,19 @@ int main(int argc, char** argv)
}; };
queue_mpsc_t* inMsgQueue = calloc(1, sizeof(queue_mpsc_t));
queue_mpsc_init(inMsgQueue, 65536);
queue_spsc_t outMsgQueue;
queue_spsc_init(&outMsgQueue, 65536);
LinkedListEntry* clientsLinkedListHead = NULL; LinkedListEntry* clientsLinkedListHead = NULL;
uint64_t clockCounter = 0; uint64_t clockCounter = 0;
uint64_t tickTarget = 0; uint64_t tickTarget = 0;
uint8_t resetRequest = 0; uint8_t resetRequest = 0;
uint8_t utilizedFlag = 0; uint8_t utilizedFlag = 0;
EmulContext emulContext = { EmulContext emulContext = {
&resetRequest, &resetRequest,
@@ -503,13 +745,16 @@ int main(int argc, char** argv)
0, /* simRateLimit */ 0, /* simRateLimit */
NULL, /* deviceIdMappingMsg */ NULL, /* deviceIdMappingMsg */
0, /* deviceIdMappingMsgLen */ 0, /* deviceIdMappingMsgLen */
inMsgQueue,
&outMsgQueue,
}; };
if (flatcc_builder_init(&emulContext.stream_builder)) { if (flatcc_builder_init(&emulContext.stream_builder)) {
panic("flatcc_builder_init failed\n"); panic("flatcc_builder_init failed\n");
} }
static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63"; // static uint8_t default_token[] = "jtd3dhBWG2p73mkRCyaZ63KLmOjl5OgItIDAPa8HcyGQI2SOFA6hsJVr1NiG2Y96AvwyYSShr4GAL3sjJEo1wsmCo2vuhiqRhj8HVRrOVocD2e1jtuMwMsCmizYakT63";
static uint8_t default_token[] = "test_token";
uint8_t* access_token = default_token; uint8_t* access_token = default_token;
/* If --token was given, copy it into a mutable buffer */ /* If --token was given, copy it into a mutable buffer */
uint8_t* cli_token_buf = NULL; uint8_t* cli_token_buf = NULL;
@@ -526,7 +771,8 @@ int main(int argc, char** argv)
ServerContext ctx = { ServerContext ctx = {
mtx, mtx,
regQ, &regQ,
inMsgQueue,
access_token, access_token,
&emulContext, &emulContext,
}; };
@@ -535,7 +781,7 @@ int main(int argc, char** argv)
pthread_attr_t outAttr; pthread_attr_t outAttr;
pthread_attr_init(&outAttr); pthread_attr_init(&outAttr);
pthread_create(&outTid, &outAttr, outgoingMain, &outBufs); pthread_create(&outTid, &outAttr, outgoingMain, &emulContext);
pthread_detach(outTid); pthread_detach(outTid);
@@ -550,22 +796,24 @@ int main(int argc, char** argv)
.evs.onmessage = &onWsMessage .evs.onmessage = &onWsMessage
}); });
ptQueueElem* regQueueTail = regQ->tail; // ptQueueElem* regQueueTail = regQ->tail;
uint16_t clients_try_timer = 1000; uint16_t clients_try_timer = 10000;
uint64_t lastTickAt = getCurrentUsec(); uint64_t lastTickAt = getCurrentUsec();
uint64_t lastTickCountWindowAt = getCurrentUsec(); // uint64_t lastTickCountWindowAt = getCurrentUsec();
uint64_t lastTickCounter = 0; // uint64_t lastTickCounter = 0;
while(1) while(1)
{ {
ClientRegistrationEvent* payload = regQueueTail->payload; ClientRegistrationEvent* payload = NULL;
queue_spsc_pop(&regQ, (void**)&payload);
// ClientRegistrationEvent* payload = regQueueTail->payload;
if(payload != NULL) if(payload != NULL)
{ {
printf("Got reg queue data\n"); // printf("Got reg queue data\n");
handleRegEvent(&emulContext, payload); handleRegEvent(&emulContext, payload);
regQueueTail = regQueueTail->nextEl; // regQueueTail = regQueueTail->nextEl;
utilizedFlag = 1; utilizedFlag = 1;
} }
@@ -573,38 +821,19 @@ int main(int argc, char** argv)
if (clients_try_timer == 0 || emulState != EMUL_STATE_EXEC) if (clients_try_timer == 0 || emulState != EMUL_STATE_EXEC)
{ {
if(getCurrentUsec() - lastTickCountWindowAt > 1000000) // if(getCurrentUsec() - lastTickCountWindowAt > 1000000)
{ // {
// uint64_t dtimeUs = getCurrentUsec() - lastTickCountWindowAt; // uint64_t dtimeUs = getCurrentUsec() - lastTickCountWindowAt;
// lastTickCountWindowAt = getCurrentUsec(); // lastTickCountWindowAt = getCurrentUsec();
// uint64_t dtick = clockCounter - lastTickCounter; // uint64_t dtick = clockCounter - lastTickCounter;
// lastTickCounter = clockCounter; // lastTickCounter = clockCounter;
// double rate = ((double)dtick) / (((double)dtimeUs) / 1000000); // double rate = ((double)dtick) / (((double)dtimeUs) / 1000000);
// printf("clock rate: %f\n", rate); // printf("clock rate: %f\n", rate);
} // }
clients_try_timer = 10000;
handleAllClients(&emulContext); handleAllClients(&emulContext);
clients_try_timer = 1000; // outbufMaintence(&emulContext);
uint8_t readReqIdx = atomic_load(&outBufs.readRequestIdx);
// uint8_t readReqIdx = outBufs.readRequestIdx;
if(readReqIdx == outBufs.currWritingIdx || outBufs.bufs[outBufs.currWritingIdx].size >= outBufs.bufs[outBufs.currWritingIdx].allocatedSize / 2)
{
uint8_t newWriteIdx = outBufs.currWritingIdx + 1;
if(outBufs.bufs[outBufs.currWritingIdx].size != 0 || readReqIdx != newWriteIdx)
{
if(newWriteIdx >= outBufs.buffersCount)
{
newWriteIdx = 0;
}
while(readReqIdx == newWriteIdx)
{
my_sleep(100);
readReqIdx = atomic_load(&outBufs.readRequestIdx);
}
atomic_store(&outBufs.currWritingIdx, newWriteIdx);
outBufs.bufs[outBufs.currWritingIdx].size = 0;
}
}
} }
else else
{ {
@@ -613,15 +842,29 @@ int main(int argc, char** argv)
if(resetRequest) if(resetRequest)
{ {
for(size_t di = 0; di < emulContext.devicesCount; di++) if(emulState != EMUL_STATE_STILL)
{ {
device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di]; printf("running full reset\n");
dev->lib->reset(dev->specs, dev->ctx); for(size_t di = 0; di < emulContext.devicesCount; di++)
dev->clockCycleCounter = 0; {
dev->clockCycleLimit = 0; device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di];
dev->lib->reset(dev->specs, dev->ctx);
dev->clockCycleCounter = 0;
dev->clockCycleLimit = 0;
}
emulState = EMUL_STATE_STILL;
printf("[RESET/EXEC] state -> %u\n", emulState);
size_t msg_len;
uint8_t* out = fb_build_exec_notify(0, 0, emulState, &msg_len);
broadcastClients(&emulContext, out, msg_len);
} }
clockCounter = 0; clockCounter = 0;
resetRequest = 0; resetRequest = 0;
} }
@@ -647,104 +890,11 @@ int main(int argc, char** argv)
} }
} }
for(size_t di = 0; di < emulContext.devicesCount; di++) // printf("tick %lu\n", clockCounter);
{
device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di];
device_mem_t* devMem = dev->ctx->deviceMem;
if(dev->clockCycleCounter == 0)
{
if(devMem->memwriteLen > 0) commitDevMem(&emulContext);
{
// printf("device %lu has %d writes\n", di, devMem->memwriteLen);
uint64_t globalWriteAddrs[MEM_ACCESS_INTERCEPT_BUF_SIZE] = {0};
for(size_t i = 0; i < devMem->memwriteLen; i++)
{
const uint8_t seg = devMem->memwriteCellSegments[i];
const uint8_t wordLen = devMem->memwriteWordLengths[i];
const uint64_t addr = devMem->memwriteCellAddrs[i];
void* val = devMem->memwriteValues[i];
// const uint64_t segLen = devMem->memsegSizes[seg]; tickDevices(&emulContext, clockCounter);
// // device_specs_t* spec = dev->specs;
// // spec
// if(addr >= segLen)
// {
// printf("write out of bounds of segment of len %lu: [%d].%lu\n", segLen, seg, addr);
// emulState = EMUL_STATE_PAUSE;
// }
// else
{
switch(wordLen)
{
case 1:
{
((uint8_t*)(devMem->cells[seg]))[addr] = *((uint8_t*)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint8_t*)val, seg, addr);
break;
}
case 2:
{
((uint16_t*)(devMem->cells[seg]))[addr] = *((uint16_t*)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint16_t*)val, seg, addr);
break;
}
case 4:
{
((uint32_t*)(devMem->cells[seg]))[addr] = *((uint32_t*)val);
// printf("[DEV/WRITE] %d -> [%d].%lu\n", *(uint32_t*)val, seg, addr);
break;
}
case 8:
{
((uint64_t*)(devMem->cells[seg]))[addr] = *((uint64_t*)val);
// printf("[DEV/WRITE] %lu -> [%d].%lu\n", *(uint64_t*)val, seg, addr);
break;
}
default:
{
printf("invalid word size: %d\n", wordLen);
}
}
}
globalWriteAddrs[i] = devMem->memsegShifts[seg] + addr;
}
dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[di], devMem, globalWriteAddrs, (size_t)devMem->memwriteLen, STREAM_MODE_WRITE);
devMem->memwriteLen = 0;
}
else
{
// printf("device %lu has no writes\n", di);
}
if(devMem->memreadLen > 0)
{
dispatchMemAccessNotifications(&emulContext, emulContext.deviceStreamRegs[di], devMem, devMem->memreadCellAddrs, (size_t)devMem->memreadLen, STREAM_MODE_READ);
devMem->memreadLen = 0;
}
}
}
for (size_t di = 0; di < emulContext.devicesCount; di++)
{
device_handle_t* dev = (device_handle_t*)emulContext.deviceHandles[di];
if (clockCounter % dev->clockDivider == 0)
{
if(dev->clockCycleCounter == 0)
{
// printf("clock device %lu\n", di);
// device_mem_t* devMem = dev->ctx->deviceMem;
dev->clockCycleCounter = dev->lib->makeDeviceTick(dev->ctx);
}
}
}
clockCounter++; clockCounter++;
@@ -764,14 +914,24 @@ int main(int argc, char** argv)
} }
else if(!utilizedFlag) else if(!utilizedFlag)
{ {
// if(emulState != EMUL_STATE_EXEC)
// {
// my_sleep(100000);
// }
// my_sleep(1000); // my_sleep(1000);
} }
if(done)
{
break;
}
} }
atomic_store(&outBufs.currWritingIdx, 0xFF); atomic_store(&outBufs.currWritingIdx, 0xFF);
pthread_join(outTid, NULL); pthread_join(outTid, NULL);
printf("bye!\n");
return 0; return 0;

View File

@@ -1,11 +1,13 @@
#include "proto/dial.h" #include "proto/dial.h"
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/time.h> #include <sys/time.h>
#include "context.h" #include "context.h"
#include "panic.h" #include "panic.h"
#include "proto/msg.h" #include "proto/msg.h"
#include "ptQueue/inc/spsc.h"
static uint64_t dial_now_us(void) static uint64_t dial_now_us(void)
{ {
@@ -25,7 +27,7 @@ void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen)
memcpy(newMsg, msg, msgLen); memcpy(newMsg, msg, msgLen);
ClientContext* ctx = clientsHead->payload; ClientContext* ctx = clientsHead->payload;
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, newMsg, msgLen); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, newMsg, msgLen);
clientsHead = clientsHead->nextEntry; clientsHead = clientsHead->nextEntry;
} }
free(msg); free(msg);
@@ -35,21 +37,25 @@ void broadcastClients(EmulContext* emulContext, uint8_t* msg, size_t msgLen)
void dispatchOutgoingMessage(OutgoingBuffers* outBufs, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen) void dispatchOutgoingMessage(queue_spsc_t* outMsgQueue, ws_cli_conn_t clientIdx, uint8_t* msg, size_t msgLen)
{ {
SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx]; // OutgoingBuffers* outBufs = emulContext->outBufs;
if(p->size + 1 >= p->allocatedSize) // SizedPtr* p = &outBufs->bufs[outBufs->currWritingIdx];
{ // if(p->size + 1 >= p->allocatedSize)
// printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx); // {
OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 5); // // printf("\t>>Reallocating buf %d\n", outBufs->currWritingIdx);
NULL_GUARD(newPtr); // OutgoingMessage* newPtr = realloc(p->ptr, sizeof(OutgoingMessage) * p->allocatedSize * 5);
p->ptr = newPtr; // NULL_GUARD(newPtr);
p->allocatedSize = p->allocatedSize * 5; // p->ptr = newPtr;
} // p->allocatedSize = p->allocatedSize * 5;
OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size]; // }
OutgoingMessage* outmsg = malloc(sizeof(OutgoingMessage));
// OutgoingMessage* outmsg = &((OutgoingMessage*)p->ptr)[p->size];
NULL_GUARD(outmsg);
outmsg->msg = msg; outmsg->msg = msg;
outmsg->msgLen = msgLen; outmsg->msgLen = msgLen;
outmsg->clientIdx = clientIdx; outmsg->clientIdx = clientIdx;
outmsg->dispatch_us = dial_now_us(); outmsg->dispatch_us = dial_now_us();
p->size++; queue_spsc_push(outMsgQueue, outmsg);
// p->size++;
} }

View File

@@ -13,11 +13,13 @@
#include "proto_reader.h" #include "proto_reader.h"
#include "proto_verifier.h" #include "proto_verifier.h"
#include "control_reader.h" #include "control_reader.h"
#include "ptQueue/inc/mpsc.h"
#include "ptQueue/inc/spsc.h"
#include "stream_reader.h" #include "stream_reader.h"
#include "mem_reader.h" #include "mem_reader.h"
void handleCloseClient(EmulContext* emulContext, ClientContext* ctx) static void handleCloseClient(EmulContext* emulContext, ClientContext* ctx)
{ {
// if (ctx->streamRegIterator > 0) { // if (ctx->streamRegIterator > 0) {
unregisterClientStreams(emulContext, ctx); unregisterClientStreams(emulContext, ctx);
@@ -113,35 +115,77 @@ static void handleIncomingMessage(
void handleAllClients(EmulContext* emulContext) void handleAllClients(EmulContext* emulContext)
{ {
LinkedListEntry* clientEntry = *emulContext->clientsHead;
size_t handleLimit = 128;
while (clientEntry != NULL && handleLimit > 0) { queue_mpsc_t* inMsgQueue = emulContext->inMsgQueue;
handleLimit--;
ClientContext* ctx = clientEntry->payload;
if (!ctx->isAuthed) { FbMessage* fbmsg = NULL;
clientEntry = disconnectDueTimeout(emulContext, clientEntry);
if (clientEntry == NULL) break; for(size_t handleLimit = 0; handleLimit < 32; handleLimit++)
if (*emulContext->utilizedFlag) continue; {
} else { fbmsg = NULL;
FbMessage* fbmsg = ctx->incomeQ->head->payload; if(queue_mpsc_pop(inMsgQueue, (void**)&fbmsg))
if (fbmsg != NULL) { {
if(fbmsg != NULL)
{
*emulContext->utilizedFlag = 1; *emulContext->utilizedFlag = 1;
// if (hmmmm_ClientMessage_verify_as_root(fbmsg->data, fbmsg->size) == 0) { // if (hmmmm_ClientMessage_verify_as_root(fbmsg->data, fbmsg->size) == 0) {
hmmmm_ClientMessage_table_t cm = hmmmm_ClientMessage_table_t cm =
hmmmm_ClientMessage_as_root(fbmsg->data); hmmmm_ClientMessage_as_root(fbmsg->data);
handleIncomingMessage(cm, ctx, emulContext); // printf("read message len %lu\n", fbmsg->size);
handleIncomingMessage(cm, fbmsg->ctx, emulContext);
// } else { // } else {
// printf("client %lu: dropped malformed FlatBuffer\n", ctx->clientId); // printf("client %lu: dropped malformed FlatBuffer\n", fbmsg->ctx->clientId);
// } // }
free(fbmsg->data); free(fbmsg->data);
free(fbmsg); free(fbmsg);
ctx->incomeQ->head = ctx->incomeQ->head->nextEl;
} }
clientEntry = clientEntry->nextEntry; else
{
break;
}
}
else
{
break;
} }
} }
// LinkedListEntry* clientEntry = *emulContext->clientsHead;
// size_t handleLimit = 128;
// while (clientEntry != NULL && handleLimit > 0) {
// handleLimit--;
// ClientContext* ctx = clientEntry->payload;
// if (!ctx->isAuthed) {
// clientEntry = disconnectDueTimeout(emulContext, clientEntry);
// if (clientEntry == NULL) break;
// if (*emulContext->utilizedFlag) continue;
// } else {
// FbMessage* fbmsg = NULL;
// queue_spsc_pop(ctx->incomeQ, (void**)&fbmsg);
// // FbMessage* fbmsg = ctx->incomeQ->head->payload;
// if (fbmsg != NULL) {
// *emulContext->utilizedFlag = 1;
// // if (hmmmm_ClientMessage_verify_as_root(fbmsg->data, fbmsg->size) == 0) {
// printf("read message len %lu\n", fbmsg->size);
// hmmmm_ClientMessage_table_t cm =
// hmmmm_ClientMessage_as_root(fbmsg->data);
// handleIncomingMessage(cm, ctx, emulContext);
// // } else {
// // printf("client %lu: dropped malformed FlatBuffer\n", ctx->clientId);
// // }
// free(fbmsg->data);
// free(fbmsg);
// // ctx->incomeQ->head = ctx->incomeQ->head->nextEl;
// }
// clientEntry = clientEntry->nextEntry;
// }
// }
} }

View File

@@ -16,6 +16,7 @@
#include "proto_reader.h" #include "proto_reader.h"
#include "proto_verifier.h" #include "proto_verifier.h"
#include "auth_reader.h" #include "auth_reader.h"
#include "ptQueue/inc/spsc.h"
static uint8_t validateAccessTokenDeterministic( static uint8_t validateAccessTokenDeterministic(
@@ -99,10 +100,12 @@ uint8_t handle_auth(
with_lock(&ctx->registerMutex) with_lock(&ctx->registerMutex)
{ {
printf("Writing auth event\n"); printf("Writing auth event\n");
int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf);
if (exitCode) {
panic("Unable to push to reg queue: %s\n", errbuf); queue_spsc_push(ctx->regQueue, ev);
} // if (!queue_spsc_push(ctx->regQueue, ev)) {
// panic("Unable to push to reg queue: %s\n", errbuf);
// }
} }
return 1; return 1;
} }
@@ -135,13 +138,13 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext)
// Send AuthResponse with assigned seat_id // Send AuthResponse with assigned seat_id
size_t len = 0; size_t len = 0;
uint8_t* authResp = fb_build_auth_response(UINT64_MAX, cctx->clientId, &len); uint8_t* authResp = fb_build_auth_response(UINT64_MAX, cctx->clientId, &len);
dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, authResp, len); dispatchOutgoingMessage(emulContext->outMsgQueue, cctx->clientId, authResp, len);
// Send current execution state // Send current execution state
len = 0; len = 0;
uint8_t* stateMsg = fb_build_exec_notify( uint8_t* stateMsg = fb_build_exec_notify(
UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len); UINT64_MAX, *emulContext->clockCounter, *emulContext->emulState, &len);
dispatchOutgoingMessage(emulContext->outBufs, cctx->clientId, stateMsg, len); dispatchOutgoingMessage(emulContext->outMsgQueue, cctx->clientId, stateMsg, len);
// Send cached DeviceIdMappingNotif if a config has been loaded // Send cached DeviceIdMappingNotif if a config has been loaded
if (emulContext->deviceIdMappingMsg && emulContext->deviceIdMappingMsgLen > 0) { if (emulContext->deviceIdMappingMsg && emulContext->deviceIdMappingMsgLen > 0) {
@@ -149,7 +152,7 @@ void handleOnClientAuthDone(ClientContext* cctx, EmulContext* emulContext)
if (mappingCopy) { if (mappingCopy) {
memcpy(mappingCopy, emulContext->deviceIdMappingMsg, emulContext->deviceIdMappingMsgLen); memcpy(mappingCopy, emulContext->deviceIdMappingMsg, emulContext->deviceIdMappingMsgLen);
dispatchOutgoingMessage( dispatchOutgoingMessage(
emulContext->outBufs, cctx->clientId, emulContext->outMsgQueue, cctx->clientId,
mappingCopy, emulContext->deviceIdMappingMsgLen); mappingCopy, emulContext->deviceIdMappingMsgLen);
} }
} }

View File

@@ -152,6 +152,8 @@ static int load_devices_recursive(
return -1; return -1;
} }
dev->lib->reset(dev->specs, dev->ctx);
freeConf(dc); freeConf(dc);
size_t idx = st->count; size_t idx = st->count;
@@ -243,7 +245,7 @@ static size_t find_device_by_id(LoadState* st, const char* id)
// Find a segment index by name in a device // Find a segment index by name in a device
static size_t find_seg_by_name(device_handle_t* dev, size_t seg_count, const char* name) static size_t find_seg_by_name(device_handle_t* dev, size_t seg_count, const char* name)
{ {
device_mem_t* mem = dev->ctx->deviceMem; const device_mem_t* mem = dev->ctx->deviceMem;
for (size_t i = 0; i < seg_count; i++) { for (size_t i = 0; i < seg_count; i++) {
if (strcmp(mem->memsegNames[i], name) == 0) return i; if (strcmp(mem->memsegNames[i], name) == 0) return i;
} }
@@ -327,7 +329,7 @@ typedef struct {
// shadow_replace: read from point device instead of base // shadow_replace: read from point device instead of base
static void* intercept_replace_read(uint64_t ident, uint64_t addr, void* rawCells) static void* intercept_replace_read(uint64_t ident, uint64_t addr, void* rawCells)
{ {
intercept_ctx_t* ctx = (intercept_ctx_t*)(uintptr_t)ident; const intercept_ctx_t* ctx = (intercept_ctx_t*)(uintptr_t)ident;
return ctx->point_cell; return ctx->point_cell;
} }
@@ -536,13 +538,13 @@ void handleConfigCtrlMessage(
char errbuf[1024]; char errbuf[1024];
// Block config load only while actively executing (ticking) // Block config load only while actively executing (ticking)
if (*emulContext->emulState == EMUL_STATE_EXEC) { if (*emulContext->emulState != EMUL_STATE_STILL) {
snprintf(errbuf, 1024, "stop or pause the emulator before loading config"); snprintf(errbuf, 1024, "reset emulator before loading config, current state is %d", *emulContext->emulState);
printf("[CTRL/CONFIG] error: %s\n", errbuf); printf("[CTRL/CONFIG] error: %s\n", errbuf);
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -555,7 +557,7 @@ void handleConfigCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -569,7 +571,7 @@ void handleConfigCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -597,7 +599,7 @@ void handleConfigCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -616,7 +618,7 @@ void handleConfigCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -634,7 +636,7 @@ void handleConfigCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -671,7 +673,7 @@ void handleConfigCtrlMessage(
snprintf(errbuf, sizeof(errbuf), "failed to allocate device arrays"); snprintf(errbuf, sizeof(errbuf), "failed to allocate device arrays");
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len); uint8_t* out = fb_build_config_error(nonce, errbuf, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
return; return;
} }
@@ -709,7 +711,8 @@ void handleConfigCtrlMessage(
} }
// Reset emulation state to STILL and notify all clients // Reset emulation state to STILL and notify all clients
if (*emulContext->emulState != EMUL_STATE_STILL) { if (*emulContext->emulState != EMUL_STATE_STILL)
{
*emulContext->emulState = EMUL_STATE_STILL; *emulContext->emulState = EMUL_STATE_STILL;
size_t notify_len; size_t notify_len;
uint8_t* notify = fb_build_exec_notify(0, *emulContext->clockCounter, EMUL_STATE_STILL, &notify_len); uint8_t* notify = fb_build_exec_notify(0, *emulContext->clockCounter, EMUL_STATE_STILL, &notify_len);
@@ -723,7 +726,7 @@ void handleConfigCtrlMessage(
for(size_t i = 0; i < emulContext->devicesCount; i++) for(size_t i = 0; i < emulContext->devicesCount; i++)
{ {
device_handle_t* handl = emulContext->deviceHandles[i]; const device_handle_t* handl = emulContext->deviceHandles[i];
for(size_t j = 0; j < st.seg_counts[i]; j++) for(size_t j = 0; j < st.seg_counts[i]; j++)
{ {
if(st.seg_names[i][j]) if(st.seg_names[i][j])
@@ -734,7 +737,7 @@ void handleConfigCtrlMessage(
st.seg_names[i][j] = strdup(handl->ctx->deviceMem->memsegNames[j]); st.seg_names[i][j] = strdup(handl->ctx->deviceMem->memsegNames[j]);
} }
} }
*emulContext->resetRequest = 1; //*emulContext->resetRequest = 1;
uint8_t* out = fb_build_config_device_id_mapping( uint8_t* out = fb_build_config_device_id_mapping(
nonce, st.paths, st.path_lens, st.seg_names, st.seg_counts, dc, &msg_len); nonce, st.paths, st.path_lens, st.seg_names, st.seg_counts, dc, &msg_len);

View File

@@ -63,24 +63,29 @@ void handleIncomingCtrlMessage(
return; return;
} }
if(state_op == EMUL_STATE_OP_RESET) if(state_op == EMUL_STATE_OP_RESET && emulContext->emulState != EMUL_STATE_STILL)
{ {
*emulContext->resetRequest = 1; *emulContext->resetRequest = 1;
} }
else
{
uint8_t new_state = switchNewEmulState(*emulContext->emulState, state_op);
*emulContext->emulState = new_state;
printf("[CTRL/EXEC] state -> %u\n", new_state);
uint8_t new_state = switchNewEmulState(*emulContext->emulState, state_op); if (new_state == EMUL_STATE_EXEC && tick_count > 0) {
*emulContext->emulState = new_state; *emulContext->tickTarget = *emulContext->clockCounter + tick_count;
printf("[CTRL/EXEC] state -> %u\n", new_state);
if (new_state == EMUL_STATE_EXEC && tick_count > 0) { printf("set ticket target at %lu\n", *emulContext->tickTarget);
*emulContext->tickTarget = *emulContext->clockCounter + tick_count; } else if (new_state == EMUL_STATE_EXEC) {
} else if (new_state == EMUL_STATE_EXEC) { *emulContext->tickTarget = 0;
*emulContext->tickTarget = 0; printf("reset tick target\n");
}
size_t msg_len;
uint8_t* out = fb_build_exec_notify(0, *emulContext->clockCounter, new_state, &msg_len);
broadcastClients(emulContext, out, msg_len);
} }
size_t msg_len;
uint8_t* out = fb_build_exec_notify(0, *emulContext->clockCounter, new_state, &msg_len);
broadcastClients(emulContext, out, msg_len);
} }
else if (ptype == hmmmm_ctrl_CtrlClientPayload_SetupBuf) else if (ptype == hmmmm_ctrl_CtrlClientPayload_SetupBuf)
{ {
@@ -96,7 +101,7 @@ void handleIncomingCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_setup_buf(nonce, lost_buf_size, lifetime_ticks, &msg_len); uint8_t* out = fb_build_setup_buf(nonce, lost_buf_size, lifetime_ticks, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
} }
else if (ptype == hmmmm_ctrl_CtrlClientPayload_OrphanedRequest) else if (ptype == hmmmm_ctrl_CtrlClientPayload_OrphanedRequest)
{ {
@@ -104,7 +109,7 @@ void handleIncomingCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_orphaned_response(nonce, &msg_len); uint8_t* out = fb_build_orphaned_response(nonce, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
} }
else if (ptype == hmmmm_ctrl_CtrlClientPayload_ConfigCtrlMessage) else if (ptype == hmmmm_ctrl_CtrlClientPayload_ConfigCtrlMessage)
{ {
@@ -125,6 +130,6 @@ void handleIncomingCtrlMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_lost_messages_response(nonce, seat_id, &msg_len); uint8_t* out = fb_build_lost_messages_response(nonce, seat_id, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
} }
} }

View File

@@ -9,7 +9,52 @@
#include "pub/libhmmmm/mem.h" #include "pub/libhmmmm/mem.h"
// #define DEVICE_MEM_SIZE ((size_t)(256 * 1024)) // #define DEVICE_MEM_SIZE ((size_t)(256 * 1024))
static void printMemory(void* cells, uint64_t cellsCount)
{
for(uint64_t i = 0; i < cellsCount; i++)
{
printf("%lu: 0x%04X", i, ((uint8_t*)cells)[i]);
// uint8_t wasRead = 0;
// for (uint8_t j = 0; j < mem->memreadLen; j++)
// {
// if(mem->memreadCellAddrs[j] == i)
// {
// wasRead = 1;
// break;
// }
// }
// uint8_t wasWrite = 0;
// for (uint8_t j = 0; j < mem->memwriteLen; j++)
// {
// if(mem->memwriteCellAddrs[j] == i)
// {
// wasWrite = 1;
// break;
// }
// }
// if (wasRead == 1)
// {
// printf("\t[was read]");
// }
// else
// {
// printf("\t[was not read]");
// }
// if (wasWrite == 1)
// {
// printf("\t[was written]");
// }
// else
// {
// printf("\t[was not written]");
// }
printf("\n");
}
}
void handleIncomingMemMessage( void handleIncomingMemMessage(
hmmmm_mem_MemClientMessage_table_t msg, hmmmm_mem_MemClientMessage_table_t msg,
@@ -20,9 +65,9 @@ void handleIncomingMemMessage(
hmmmm_mem_MemClientPayload_union_type_t ptype = hmmmm_mem_MemClientPayload_union_type_t ptype =
hmmmm_mem_MemClientMessage_payload_type(msg); hmmmm_mem_MemClientMessage_payload_type(msg);
printf("[MEM] client=%lu nonce=%lu type=%s\n", // printf("[MEM] client=%lu nonce=%lu type=%s\n",
ctx->clientId, nonce, // ctx->clientId, nonce,
hmmmm_mem_MemClientPayload_type_name(ptype)); // hmmmm_mem_MemClientPayload_type_name(ptype));
if (ptype == hmmmm_mem_MemClientPayload_MemReadRequest) if (ptype == hmmmm_mem_MemClientPayload_MemReadRequest)
{ {
@@ -35,8 +80,8 @@ void handleIncomingMemMessage(
uint32_t offset = hmmmm_mem_MemReadRequest_offset(req); uint32_t offset = hmmmm_mem_MemReadRequest_offset(req);
uint32_t length = hmmmm_mem_MemReadRequest_length(req); uint32_t length = hmmmm_mem_MemReadRequest_length(req);
printf("[MEM/READ] device=%u seg=%u offset=%u len=%u\n", // printf("[MEM/READ] device=%u seg=%u offset=%u len=%u\n",
dev_id, seg_id, offset, length); // dev_id, seg_id, offset, length);
if (dev_id >= (uint32_t)emulContext->devicesCount) { if (dev_id >= (uint32_t)emulContext->devicesCount) {
printf("[MEM/READ] invalid device %u\n", dev_id); printf("[MEM/READ] invalid device %u\n", dev_id);
@@ -49,18 +94,18 @@ void handleIncomingMemMessage(
// printf("[MEM/READ] out of bounds\n"); // printf("[MEM/READ] out of bounds\n");
// return; // return;
// } // }
printf("[MEM/READ] from %d/%d+%d:%d\n", dev_id, seg_id, offset, length); // printf("[MEM/READ] from %d/%d+%d:%d\n", dev_id, seg_id, offset, length);
device_handle_t* handl = emulContext->deviceHandles[dev_id]; device_handle_t* handl = emulContext->deviceHandles[dev_id];
const uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; //emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id]; const uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; //emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id];
for(size_t i = 0; i < length; i++) // for(size_t i = 0; i < length; i++)
{ // {
printf("%02X ", (base + offset)[i]); // printf("%02X ", (base + offset)[i]);
} // }
printf("\n"); // printf("\n");
size_t out_len; size_t out_len;
uint8_t* out = fb_build_mem_read_response( uint8_t* out = fb_build_mem_read_response(
@@ -68,7 +113,7 @@ void handleIncomingMemMessage(
dev_id, seg_id, offset, dev_id, seg_id, offset,
base + offset, (size_t)length, base + offset, (size_t)length,
&out_len); &out_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, out_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, out_len);
} }
else if (ptype == hmmmm_mem_MemClientPayload_MemWriteRequest) else if (ptype == hmmmm_mem_MemClientPayload_MemWriteRequest)
{ {
@@ -82,13 +127,18 @@ void handleIncomingMemMessage(
flatbuffers_uint8_vec_t data = hmmmm_mem_MemWriteRequest_data(req); flatbuffers_uint8_vec_t data = hmmmm_mem_MemWriteRequest_data(req);
size_t data_len = flatbuffers_uint8_vec_len(data); size_t data_len = flatbuffers_uint8_vec_len(data);
printf("[MEM/WRITE] device=%u seg=%u offset=%u len=%zu\n", // printf("[MEM/WRITE] device=%u seg=%u offset=%u len=%zu\n",
dev_id, seg_id, offset, data_len); // dev_id, seg_id, offset, data_len);
if (dev_id >= (uint32_t)emulContext->devicesCount) { if (dev_id >= (uint32_t)emulContext->devicesCount) {
printf("[MEM/WRITE] invalid device %u\n", dev_id); printf("[MEM/WRITE] invalid device %u\n", dev_id);
return; return;
} }
if(!data) {
printf("[MEM/WRITE] invalid data\n");
return;
}
// if ((size_t)offset + data_len > DEVICE_MEM_SIZE) { // if ((size_t)offset + data_len > DEVICE_MEM_SIZE) {
// printf("[MEM/WRITE] out of bounds\n"); // printf("[MEM/WRITE] out of bounds\n");
// return; // return;
@@ -98,16 +148,24 @@ void handleIncomingMemMessage(
device_handle_t* handl = emulContext->deviceHandles[dev_id]; device_handle_t* handl = emulContext->deviceHandles[dev_id];
printf("[MEM/WRITE] from %d/%d+%d:%lu\n", dev_id, seg_id, offset, data_len); // printf("[MEM/WRITE] from %d/%d+%d:%lu\n", dev_id, seg_id, offset, data_len);
uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; // emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id]; uint8_t* base = handl->ctx->deviceMem->cells[seg_id]; // emulContext->devicesMem[dev_id] + handl->ctx->deviceMem->memsegShifts[seg_id];
memcpy(base + offset, data, data_len); memcpy(base + offset, data, data_len);
// for(size_t i = 0; i < data_len; i++)
// {
// printf("%02X ", (base + offset)[i]);
// }
// printf("\n");
size_t out_len; size_t out_len;
uint8_t* out = fb_build_mem_read_response( uint8_t* out = fb_build_mem_read_response(
nonce, *emulContext->clockCounter, nonce, *emulContext->clockCounter,
dev_id, seg_id, offset, dev_id, seg_id, offset,
base + offset, data_len, base + offset, data_len,
&out_len); &out_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, out_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, out_len);
} }
} }

View File

@@ -14,7 +14,7 @@
// ── Stream registration bookkeeping (unchanged logic) ──────────────────────── // ── Stream registration bookkeeping (unchanged logic) ────────────────────────
void unregisterClientStream( static void unregisterClientStream(
EmulContext* emulContext, ClientContext* ctx, uint32_t regId) EmulContext* emulContext, ClientContext* ctx, uint32_t regId)
{ {
for (size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) { for (size_t deviceId = 0; deviceId < emulContext->devicesCount; deviceId++) {
@@ -127,7 +127,7 @@ void handleIncomingStreamMessage(
size_t msg_len; size_t msg_len;
uint8_t* out = fb_build_stream_reg_confirm( uint8_t* out = fb_build_stream_reg_confirm(
nonce, reg_id, dev_id, seg_id, offset, length, (uint8_t)mode, &msg_len); nonce, reg_id, dev_id, seg_id, offset, length, (uint8_t)mode, &msg_len);
dispatchOutgoingMessage(emulContext->outBufs, ctx->clientId, out, msg_len); dispatchOutgoingMessage(emulContext->outMsgQueue, ctx->clientId, out, msg_len);
} }
else if (ptype == hmmmm_stream_StreamClientPayload_StreamDeregRequest) else if (ptype == hmmmm_stream_StreamClientPayload_StreamDeregRequest)
{ {

View File

@@ -1,4 +1,5 @@
#include "proto/handlers/ws.h" #include "proto/handlers/ws.h"
#include "context.h"
#include "proto/handlers.h" #include "proto/handlers.h"
#include <stdio.h> #include <stdio.h>
@@ -11,22 +12,38 @@
#include "proto_verifier.h" #include "proto_verifier.h"
#include "proto_reader.h" #include "proto_reader.h"
#include "ptQueue/inc/spsc.h"
#include "wsServer/include/ws.h"
void onWsOpen(ws_cli_conn_t client) void onWsOpen(ws_cli_conn_t client)
{ {
char errbuf[1024]; char errbuf[1024];
ptQueue* incomeQueue = ptQueueCreate(errbuf); queue_spsc_t *incomeQueue = malloc(sizeof(queue_spsc_t));
NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf); // queue_spsc_t incomeQueue, outcomeQueue;
if(incomeQueue == NULL || !queue_spsc_init(incomeQueue, 65536))
{
panic("Unable to create income queue");
}
queue_spsc_t *outcomeQueue = malloc(sizeof(queue_spsc_t));
if(outcomeQueue == NULL || !queue_spsc_init(outcomeQueue, 65536))
{
panic("Unable to create outcome queue");
}
ptQueue* outcomeQueue = ptQueueCreate(errbuf); // = ptQueueCreate(errbuf);
NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf); // NULL_GUARD(incomeQueue, "Unable to create income queue: %s\n", errbuf);
// queue_spsc_t* outcomeQueue = ptQueueCreate(errbuf);
// NULL_GUARD(outcomeQueue, "Unable to create outcome queue: %s\n", errbuf);
ClientContext* cctx = malloc(sizeof(ClientContext)); ClientContext* cctx = malloc(sizeof(ClientContext));
if (cctx == NULL) { if (cctx == NULL) {
ptQueueFree(incomeQueue); queue_spsc_destroy(incomeQueue);
ptQueueFree(outcomeQueue); queue_spsc_destroy(outcomeQueue);
// ptQueueFree(incomeQueue);
// ptQueueFree(outcomeQueue);
panic("Unable to allocate client context\n"); panic("Unable to allocate client context\n");
} }
cctx->clientId = client; cctx->clientId = client;
@@ -38,8 +55,10 @@ void onWsOpen(ws_cli_conn_t client)
ClientRegistrationEvent* ev = malloc(sizeof(ClientRegistrationEvent)); ClientRegistrationEvent* ev = malloc(sizeof(ClientRegistrationEvent));
if (ev == NULL) { if (ev == NULL) {
ptQueueFree(incomeQueue); queue_spsc_destroy(incomeQueue);
ptQueueFree(outcomeQueue); queue_spsc_destroy(outcomeQueue);
// ptQueueFree(incomeQueue);
// ptQueueFree(outcomeQueue);
free(cctx); free(cctx);
panic("Unable to allocate register event"); panic("Unable to allocate register event");
} }
@@ -52,23 +71,27 @@ void onWsOpen(ws_cli_conn_t client)
with_lock(&ctx->registerMutex) with_lock(&ctx->registerMutex)
{ {
int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf);
if (exitCode) {
ptQueueFree(incomeQueue); queue_spsc_push(ctx->regQueue, ev);
ptQueueFree(outcomeQueue); // if (!queue_spsc_push(ctx->regQueue, ev)) {
free(cctx); // queue_spsc_destroy(incomeQueue);
panic("Unable to push to reg queue: %s\n", errbuf); // queue_spsc_destroy(outcomeQueue);
} // // ptQueueFree(incomeQueue);
// // ptQueueFree(outcomeQueue);
// free(cctx);
// panic("Unable to push to reg queue: %s\n", errbuf);
// }
} }
char* cli = ws_getaddress(client); const char* cli = ws_getaddress(client);
printf("Connection %lu opened, addr: %s\n", client, cli); printf("Connection %lu opened, addr: %s\n", client, cli);
} }
void onWsClose(ws_cli_conn_t client) void onWsClose(ws_cli_conn_t client)
{ {
char* cli = ws_getaddress(client); const char* cli = ws_getaddress(client);
printf("Connection %lu closed, addr: %s\n", client, cli); printf("Connection %lu closed, addr: %s\n", client, cli);
char errbuf[1024]; char errbuf[1024];
@@ -91,11 +114,12 @@ void onWsClose(ws_cli_conn_t client)
with_lock(&ctx->registerMutex) with_lock(&ctx->registerMutex)
{ {
int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf); queue_spsc_push(ctx->regQueue, ev);
if (exitCode) // int exitCode = ptQueuePush(ctx->regQueue, ev, errbuf);
{ // if (!queue_spsc_push(ctx->regQueue, ev))
panic("Unable to push to reg queue: %s\n", errbuf); // {
} // panic("Unable to push to reg queue: %s\n", errbuf);
// }
} }
} }
@@ -108,6 +132,7 @@ void onWsMessage(
panic("Unable to get client context\n"); panic("Unable to get client context\n");
} }
if (!cctx->isAuthed) { if (!cctx->isAuthed) {
handle_auth(cctx, client, msg, size, type); handle_auth(cctx, client, msg, size, type);
return; return;
@@ -138,13 +163,20 @@ void onWsMessage(
panic("Unable to allocate FbMessage buffer\n"); panic("Unable to allocate FbMessage buffer\n");
} }
fbmsg->size = (size_t)size; fbmsg->size = (size_t)size;
fbmsg->ctx = cctx;
memcpy(fbmsg->data, msg, (size_t)size); memcpy(fbmsg->data, msg, (size_t)size);
char errbuf[1024]; char errbuf[1024];
int isErr = ptQueuePush(cctx->incomeQ, fbmsg, errbuf); // int isErr = ptQueuePush(cctx->incomeQ, fbmsg, errbuf);
if (isErr) { ServerContext* sctx = ws_get_server_context(client);
free(fbmsg->data);
free(fbmsg); // printf("got message with len %lu\n", size);
panic("Unable to queue client message: %s\n", errbuf);
} queue_mpsc_push(sctx->inMsgQueue, fbmsg);
// if (!queue_spsc_push(cctx->incomeQ, fbmsg)) {
// if (!queue_mpsc_push(sctx->inMsgQueue, fbmsg)) {
// free(fbmsg->data);
// free(fbmsg);
// panic("Unable to queue client message: %s\n", errbuf);
// }
} }