Rewrote test server to use libregexis024 to parse http requests (now it roughly conforms to RFC9112)

This commit is contained in:
Андреев Григорий 2024-08-05 03:37:56 +03:00 committed by Андреев Григорий
parent 190cdf25b3
commit 8de3335296
14 changed files with 390 additions and 241 deletions

2
.gitignore vendored
View File

@ -9,3 +9,5 @@ building/*.png
building/*.svg building/*.svg
.idea/ .idea/
compile_commands.json
local.sh

View File

@ -12,5 +12,49 @@
<p class="aaa"> Inside aaaa </p> <p class="aaa"> Inside aaaa </p>
<p id="bbb"> Iside bbbb </p> <p id="bbb"> Iside bbbb </p>
</div> </div>
<form method="POST" action="/output" enctype="multipart/form-data">
<label for="inp1"> Bla-bla-bla </label>
<input type="text" name="Cool input 1+" id="inp1" class="text-input" value="Ouuups">
<label for="inp2"> Goot got </label>
<input type="text" name="Cool input 2 " id="inp2" class="text-input" value="Did it for you">
<hr>
<label for="r-1-1">Boba</label>
<input type="radio" id="r-1-1" name="r-1" value="First">
<label for="r-1-2">Biba</label>
<input type="radio" id="r-1-2" name="r-1" value="Second">
<label for="r-2-1">Buba</label>
<input type="radio" id="r-2-1" name="r-2" value="Third">
<label for="r-2-2">Duba</label>
<input type="radio" id="r-2-2" name="r-2" value="Fourth">
<hr>
<label for="chb1"> Check this </label>
<input type="checkbox" name="Cool input 3" id="chb1" value="AAAVVVV1VVV">
<label for="chb2"> More checkbozsdfsdsess </label>
<input type="checkbox" name="Cool input 4" id="chb2" value="___@@@222">
<label for="chb3"> Lmao i cbnat type stuff ia hva ee an insu=sslt </label>
<input type="checkbox" name="Cool input 5" id="chb3" value="_down_TO">
<hr>
<p> Lmao, get ready to handle file input:</p>
<input type="file" name="BEBRA" id="tututu">
<hr>
<input type="submit" value="SubmitButton">
</form>
<p> Ok, ima try that again</p>
<form method="post" action="/output" enctype="multipart/form-data">
<div>
<label for="file">Choose a file</label>
<input type="file" id="file" name="myFile" />
</div>
<div>
<button>Send the file</button>
</div>
</form>
</body> </body>
</html> </html>

View File

@ -11,11 +11,15 @@ std::vector<std::string> getFromPkgConfig(const std::string& req, const std::str
for (char ch: pc_stdout) { for (char ch: pc_stdout) {
if (result.empty()) if (result.empty())
result.emplace_back(); result.emplace_back();
if (ch == ' ') if (ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r') {
if (!result.back().empty())
result.emplace_back(); result.emplace_back();
else } else {
result.back() += ch; result.back() += ch;
} }
}
if (!result.empty() && result.back().empty())
result.pop_back();
return result; return result;
} }
@ -61,13 +65,18 @@ struct CAWebChat {
std::vector<ExternalLibraryTarget> ext_targets = { std::vector<ExternalLibraryTarget> ext_targets = {
formExternalLibraryTargetWithNativeName("libjsonincpp"), formExternalLibraryTargetWithNativeName("libjsonincpp"),
formExternalLibraryTargetWithNativeName("sqlite3"), formExternalLibraryTargetWithNativeName("sqlite3"),
formExternalLibraryTargetWithNativeName("libregexis024"),
}; };
std::vector<CTarget> my_targets; std::vector<CTarget> my_targets;
{ CTarget T("engine_engine_number_9", "shared_library"); { CTarget T{"engine_engine_number_9", "shared_library"};
T.additional_compilation_flags = getSomeRadFlags(); T.additional_compilation_flags = getSomeRadFlags();
T.proj_deps = {}; T.proj_deps = {};
T.external_deps = {
CTargetDependenceOnExternalLibrary{"libjsonincpp", {true, true}},
CTargetDependenceOnExternalLibrary{"libregexis024", {true, true}}
};
T.units = { T.units = {
"baza.cpp", "baza.cpp",
"thread_synchronization.cpp", "thread_synchronization.cpp",
@ -97,9 +106,10 @@ struct CAWebChat {
T.installation_dir = ""; T.installation_dir = "";
my_targets.push_back(T); my_targets.push_back(T);
} }
{ CTarget T("iu9-ca-web-chat", "executable"); { CTarget T{"iu9-ca-web-chat", "executable"};
T.additional_compilation_flags = getSomeRadFlags(); T.additional_compilation_flags = getSomeRadFlags();
T.proj_deps = {CTargetDependenceOnProjectsLibrary("engine_engine_number_9")}; T.proj_deps = {CTargetDependenceOnProjectsLibrary{"engine_engine_number_9"}};
T.external_deps = {CTargetDependenceOnExternalLibrary{"sqlite3"}};
T.units = {"main.cpp"}; T.units = {"main.cpp"};
for (std::string& u: T.units) for (std::string& u: T.units)
u = "web_chat/" + u; u = "web_chat/" + u;
@ -115,7 +125,6 @@ struct CAWebChat {
int main(int argc, char** argv) { int main(int argc, char** argv) {
try { try {
ASSERT_pl(argc > 0); ASSERT_pl(argc > 0);
assert(argc > 0);
std::vector<std::string> args(argc - 1); std::vector<std::string> args(argc - 1);
for (int i = 0; i + 1 < argc; i++) { for (int i = 0; i + 1 < argc; i++) {
args[i] = argv[i + 1]; args[i] = argv[i + 1];
@ -123,11 +132,6 @@ int main(int argc, char** argv) {
NormalCBuildSystemCommandMeaning cmd; NormalCBuildSystemCommandMeaning cmd;
regular_bs_cli_cmd_interpret(args, cmd); regular_bs_cli_cmd_interpret(args, cmd);
CAWebChat bs("debug", cmd); CAWebChat bs("debug", cmd);
// std::string map = "Runlevel 1\n";
// draw_bu_arr_in_dot(bs.runlevel_1, map);
// map += "Runlevel 2\n";
// draw_bu_arr_in_dot(bs.runlevel_2, map);
// printf("%s", map.c_str());
if (cmd.need_to_build) if (cmd.need_to_build)
complete_tasks_of_build_units(bs.runlevel_1); complete_tasks_of_build_units(bs.runlevel_1);
umask(~0755); umask(~0755);

3
example/config.json Normal file
View File

@ -0,0 +1,3 @@
{
"name": "Web chat"
}

View File

@ -1,4 +1,5 @@
#include "baza.h" #include "baza.h"
#include "baza_inter.h"
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
@ -38,4 +39,9 @@ namespace een9 {
return false; return false;
return std::equal(a.end() - (ssize_t)b.size(), a.end(), b.begin()); return std::equal(a.end() - (ssize_t)b.size(), a.end(), b.begin());
} }
std::string getSubstring(const std::string &str, size_t A, size_t B) {
ASSERT(A <= B && B <= str.size(), "Incorrect substring segment");
return str.substr(A, B - A);
}
} }

View File

@ -2,6 +2,7 @@
#define ENGINE_ENGINE_NUMBER_9_BAZA_H #define ENGINE_ENGINE_NUMBER_9_BAZA_H
#include <string> #include <string>
#include <memory>
namespace een9 { namespace een9 {
class ServerError : public std::exception{ class ServerError : public std::exception{
@ -22,6 +23,12 @@ namespace een9 {
bool strIn(const std::string& str, const char* arr[]); bool strIn(const std::string& str, const char* arr[]);
bool endsIn(const std::string& a, const std::string& b); bool endsIn(const std::string& a, const std::string& b);
/* In case of error, throws een9::ServerError */
std::string getSubstring(const std::string& str, size_t A, size_t B);
template<typename T>
using uptr = std::unique_ptr<T>;
} }
#endif #endif

View File

@ -6,9 +6,12 @@
#include <utility> #include <utility>
namespace een9 { namespace een9 {
/* host:port scheme:authority and asterisk types of URI in http request are not supported by een9 */
struct ClientRequest { struct ClientRequest {
std::string method; std::string method;
std::string url; std::string uri_path;
bool has_query = false;
std::string uri_query;
std::string http_version; std::string http_version;
std::vector<std::pair<std::string, std::string>> headers; std::vector<std::pair<std::string, std::string>> headers;
bool has_body = false; bool has_body = false;

View File

@ -1,101 +1,116 @@
#include "client_request_parse.h" #include "client_request_parse.h"
#include "../baza_inter.h" #include "../baza_inter.h"
#include <libregexis024tools/delayed_matching.h>
#include <algorithm>
#include <assert.h>
namespace een9 { namespace een9 {
constexpr size_t max_allowed_request_content_length = 1000000000; ClientRequestParser_CommonPrograms::ClientRequestParser_CommonPrograms() {
regexis024::track_var_list vars;
std::string emsg;
#define reg_ALPHA "[a-zA-Z]"
#define reg_pchar "([a-zA-Z0-9\\-._~\\!$\\&'()*+@:,;=]|%[0-9a-hA-H]!r{2})"
#define reg_query "(" reg_pchar"|[?/])*"
#define reg_lin_ws "([ \t]|\r\n[ \t])*"
#define reg_request_line "#method(" reg_ALPHA"+) #uri_path(/(" reg_pchar"|/)*)(\\?#uri_query(" reg_query"))? HTTP/#http_version(!digit;+.!digit;+)\r\n"
#define reg_filed_value "(" reg_lin_ws"#header_field_value_part([\\u0021-\\u007e&^\r\n]+))*" reg_lin_ws
#define reg_HTTP_message reg_request_line "(#header_field_name([\\u0021-\\u007E&^:]+):" reg_filed_value "\r\n)*\r\n"
int ret = compile(reg_HTTP_message, vars, http_request_parse_prg, emsg);
ASSERT(ret >= 0, "regexis024::compile. " + emsg);
#define retrieve_variable(name) ASSERT_pl(vars.count(#name) > 0); ASSERT_pl(vars[#name].colarr_first >= 0); \
ASSERT_pl(vars[#name].colarr_second >= 0); name ## _beg = vars[#name].colarr_first; name ## _end = vars[#name].colarr_second;
retrieve_variable(method);
retrieve_variable(uri_path);
retrieve_variable(uri_query);
retrieve_variable(http_version);
retrieve_variable(header_field_name);
retrieve_variable(header_field_value_part);
}
static const char* supported_http_versions[] = {"HTTP/0.9", "HTTP/1.0", "HTTP/1.1", NULL}; ClientRequestParser_WorkerBuffers::ClientRequestParser_WorkerBuffers(
static const char* supported_http_methods[] = {"GET", "POST", NULL}; const ClientRequestParser_CommonPrograms &common_comp_program
): http_request_parse_vm(
common_comp_program.http_request_parse_prg.size(), common_comp_program.http_request_parse_prg.data(),
UINT64_MAX, UINT16_MAX, UINT32_MAX, UINT32_MAX, UINT64_MAX)
{
ASSERT_pl(http_request_parse_vm.initialize() == 0);
}
void ClientRequestParser::feedByte(char b) { ClientHttpRequestParser_Ctx::ClientHttpRequestParser_Ctx(
if (finished) ClientRequest &res, ClientRequestParser_WorkerBuffers &wb, ClientRequestParser_CommonPrograms& cp
THROW("Excess tailing bytes"); ): res(res), vm(wb.http_request_parse_vm), cp(cp)
{
vm.wipeToInit();
ASSERT_pl(vm.addNewMatchingThread() == 0);
}
int ClientHttpRequestParser_Ctx::feedCharacter(char ch) {
assert(status == 0);
if (collecting_body) { if (collecting_body) {
res.body += b; res.body += ch;
if (res.body.size() >= content_lenth) { if (res.body.size() >= body_size) {
finished = true; status = 1;
}
return;
}
if (line_end[line_end_progress] == b) {
line_end_progress++;
if (line_end_progress == line_end.size()) {
line_end_progress = 0;
/* Evaluating meaning of complete request line */
if (i == 0) {
parseFirstLine();
} else if (cur_line.empty()) {
processEndOfHeader();
} else {
parseHeaderLine();
}
cur_line = "";
i++;
} }
} else { } else {
ASSERT_pl(line_end_progress == 0); header += ch;
cur_line += b; if (vm.feedCharacter(ch, 1) < 0) {
THROW("vm error");
} }
if (vm.isMatched()) {
/* Finishing line */
std::vector<regexis024::CAEvent> ca = vm.getMatchedThreadCABranchReverse();
std::reverse(ca.begin(), ca.end());
size_t cur_ca_i = 0;
auto getCaV = [&](ssize_t offset) -> uint64_t { return ca[cur_ca_i + offset].value; };
auto getCaK = [&](ssize_t offset) -> regexis024::tai_t { return ca[cur_ca_i + offset].key; };
auto isThat = [&](ssize_t offset, regexis024::tai_t key) -> bool {
return ca.size() > cur_ca_i + offset && getCaK(offset) == key;
};
#define vibe_check(boff, name) isThat(boff, cp.name ## _beg) && isThat(boff + 1, cp.name ## _end)
ASSERT_pl(vibe_check(0, method) && vibe_check(2, uri_path));
res.method = getSubstring(header, getCaV(0), getCaV(1));
res.uri_path = getSubstring(header, getCaV(2), getCaV(3));
cur_ca_i += 4;
if (isThat(0, cp.uri_query_beg)) {
ASSERT_pl(vibe_check(0, uri_query));
res.has_query = true;
res.uri_query = getSubstring(header, getCaV(0), getCaV(1));
cur_ca_i += 2;
} }
ASSERT_pl(vibe_check(0, http_version));
bool ClientRequestParser::completed() const { res.http_version = getSubstring(header, getCaV(0), getCaV(1));
return finished; cur_ca_i += 2;
while (isThat(0, cp.header_field_name_beg)) {
ASSERT_pl(vibe_check(0, header_field_name));
std::string field_name = getSubstring(header, getCaV(0), getCaV(1));
cur_ca_i += 2;
std::string field_value;
while (isThat(0, cp.header_field_value_part_beg)) {
ASSERT_pl(vibe_check(0, header_field_value_part));
if (!field_value.empty())
field_value += " ";
field_value += getSubstring(header, getCaV(0), getCaV(1));
cur_ca_i += 2;
} }
res.headers.emplace_back(field_name, field_value);
void ClientRequestParser::parseFirstLine() {
std::vector<std::string> huyushki = {""};
for (char ch: cur_line) {
if (ch == ' ') {
huyushki.emplace_back();
} else {
huyushki.back() += ch;
} }
} /* Finished header processing */
ASSERT_pl(huyushki.size() == 3);
res.method = huyushki[0];
// todo: decypher and check url
res.url = huyushki[1];
res.http_version = huyushki[2];
ASSERT_pl(strIn(res.method, supported_http_methods));
ASSERT_pl(strIn(res.http_version, supported_http_versions));
}
void ClientRequestParser::parseHeaderLine() {
std::pair<std::string, std::string> np;
static const std::string sep = ": ";
size_t sep_progress = 0;
bool reading_value = false;
for (char ch: cur_line) {
if (reading_value) {
np.second += ch;
} else if (sep[sep_progress] == ch) {
sep_progress++;
if (sep_progress == sep.size()) {
reading_value = true;
}
} else {
ASSERT_pl(sep_progress == 0);
np.first += ch;
}
}
res.headers.push_back(np);
}
void ClientRequestParser::processEndOfHeader() {
for (auto& p: res.headers) { for (auto& p: res.headers) {
if (p.first == "Content-Length") { if (p.first == "Content-Length") {
res.has_body = true; collecting_body = res.has_body = true;
long cl = std::stol(p.second); body_size = std::stoull(p.second);
ASSERT_pl(cl > 0 && cl <= max_allowed_request_content_length); res.body.reserve(body_size);
content_lenth = cl;
collecting_body = true;
break;
} }
} }
if (!collecting_body) { if (!res.has_body) {
finished = true; status = 1;
}
/* We either finish now or we finish later */
} else if (!vm.haveSurvivors()) {
status = -1;
THROW("bad request");
} }
} }
return status;
ClientRequestParser::ClientRequestParser(ClientRequest &res) : res(res) {} }
} }

View File

@ -5,36 +5,52 @@
#include "../baza.h" #include "../baza.h"
#include "client_request.h" #include "client_request.h"
#include <libregexis024vm/libregexis024vm_interface.h>
// todo: parse unicode % blocks in url
namespace een9 { namespace een9 {
struct ClientRequestParser { /* One structure that contains regexp program and C.A.T. keys. All accesscan and should be read only */
struct ClientRequestParser_CommonPrograms {
std::vector<uint8_t> http_request_parse_prg;
regexis024::tai_t method_beg;
regexis024::tai_t method_end;
regexis024::tai_t uri_path_beg;
regexis024::tai_t uri_path_end;
/* Splitting of query into components (with & and =) is defined in html spec, not in http spec */
regexis024::tai_t uri_query_beg;
regexis024::tai_t uri_query_end;
regexis024::tai_t http_version_beg;
regexis024::tai_t http_version_end;
regexis024::tai_t header_field_name_beg;
regexis024::tai_t header_field_name_end;
regexis024::tai_t header_field_value_part_beg;
regexis024::tai_t header_field_value_part_end;
ClientRequestParser_CommonPrograms();
};
/* Many structures (one for each worker) that stores regexp machine that reads program from one common buffer
* VM buffers should not be reallocateed between user requests. Note that after ClientRequestParser_CommonPrograms
* has been destroyed, this vm should not be used */
struct ClientRequestParser_WorkerBuffers {
regexis024::VirtualMachine http_request_parse_vm;
explicit ClientRequestParser_WorkerBuffers(const ClientRequestParser_CommonPrograms& common_comp_program);
};
/* Ou yeah, baby, it's time for more OOP */
struct ClientHttpRequestParser_Ctx {
ClientRequest& res; ClientRequest& res;
regexis024::VirtualMachine& vm;
explicit ClientRequestParser(ClientRequest& res); ClientRequestParser_CommonPrograms& cp;
/* 1 if reading has completed, 0 if reading can be continued, -1 if error occured (input is incorrect) */
void feedByte(char b); int status = 0;
bool completed() const;
bool finished = false;
/* internal parse data */
ssize_t i = 0;
std::string cur_line;
bool collecting_body = false; bool collecting_body = false;
size_t content_lenth = 0; size_t body_size = 0;
std::string header;
const std::string line_end = "\r\n"; ClientHttpRequestParser_Ctx(ClientRequest& res, ClientRequestParser_WorkerBuffers& wb, ClientRequestParser_CommonPrograms& cp);
size_t line_end_progress = 0;
/* argument stored in cur_line */ int feedCharacter(char ch);
void parseFirstLine();
void parseHeaderLine();
void processEndOfHeader();
}; };
} }

View File

@ -5,6 +5,7 @@
#include <string.h> #include <string.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/socket.h>
namespace een9 { namespace een9 {
UniqueFdWrapper::UniqueFdWrapper(int fd_): fd(fd_) {} UniqueFdWrapper::UniqueFdWrapper(int fd_): fd(fd_) {}
@ -65,4 +66,12 @@ namespace een9 {
UniqueFdWrapper fdw(fd); UniqueFdWrapper fdw(fd);
readFromFileDescriptor(fdw(), result, "file \"" + path + "\""); readFromFileDescriptor(fdw(), result, "file \"" + path + "\"");
} }
void configure_socket_rcvsndtimeo(int fd, timeval tv) {
int ret;
ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(timeval));
ASSERT_on_iret_pl(ret);
ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(timeval));
ASSERT_on_iret_pl(ret);
}
} }

View File

@ -29,6 +29,8 @@ namespace een9 {
void readFromFileDescriptor(int fd, std::string& result, const std::string& description = ""); void readFromFileDescriptor(int fd, std::string& result, const std::string& description = "");
void readFile(const std::string& path, std::string& result); void readFile(const std::string& path, std::string& result);
void configure_socket_rcvsndtimeo(int fd, timeval tv);
} }
#endif #endif

View File

@ -15,44 +15,6 @@
#include "baza_inter.h" #include "baza_inter.h"
namespace een9 { namespace een9 {
// todo: add timeout for multiple bytes, add more settings
ClientRequest process_connection_input(int fd, const EEN9_ServerTips& s_tips) {
ClientRequest res;
ClientRequestParser parser(res);
int ret;
char buf[2048];
ASSERT_pl(!parser.completed())
while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) {
for (size_t i = 0; i < ret; i++)
parser.feedByte(buf[i]);
if (parser.completed())
break;
}
ASSERT_on_iret(ret, "recv");
ASSERT_pl(parser.completed());
// printf("Log: worker received clients request\n%s\n", client_request.toString().c_str());
return res;
}
void process_connection_output(int fd, const std::string& server_response) {
size_t N = server_response.size(), i = 0;
while (i < N) {
int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), 0);
ASSERT_on_iret(written, "sending");
ASSERT_pl(written > 0);
i += written;
}
printf("Log: worker: succesfully asnwered with response\n");
}
void process_connection(const SlaveTask& task,
const guest_core_t& guest_core)
{
ClientRequest client_request = process_connection_input(task.fd(), task.s_tips);
std::string server_response = guest_core(task, client_request);
process_connection_output(task.fd(), server_response);
}
struct QElementHttpConnections { struct QElementHttpConnections {
SlaveTask task; SlaveTask task;
QElementHttpConnections* nxt = NULL; QElementHttpConnections* nxt = NULL;
@ -105,32 +67,85 @@ namespace een9 {
} }
}; };
struct WorkersTaskEnv { struct WorkersEnvCommon {
/* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */ /* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */
CondVarBedObj corvee_bed; CondVarBedObj corvee_bed;
WorkersTaskQueue queue; WorkersTaskQueue queue;
bool& termination; bool& termination;
guest_core_t guest_core; guest_core_t guest_core;
WorkersTaskEnv(bool& term, guest_core_t g_c): termination(term), guest_core(std::move(g_c)){} /* Parser programs */
ClientRequestParser_CommonPrograms parser_programs;
WorkersEnvCommon(bool& term, guest_core_t g_c): termination(term), guest_core(std::move(g_c)){}
}; };
struct WorkersEnv {
WorkersEnvCommon& wtec;
int id;
ClientRequestParser_WorkerBuffers personal_parser_buffer;
explicit WorkersEnv(WorkersEnvCommon& wtec, int id): wtec(wtec), id(id), personal_parser_buffer(wtec.parser_programs){}
};
// todo: add timeout for multiple bytes, add more settings
ClientRequest process_connection_input(int fd, const EEN9_ServerTips& s_tips, WorkersEnv& wte) {
ClientRequest res;
ClientHttpRequestParser_Ctx parser(res, wte.personal_parser_buffer, wte.wtec.parser_programs);
int ret;
char buf[2048];
ASSERT_pl(parser.status == 0);
while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) {
for (size_t i = 0; i < ret; i++) {
/* Throws ServerError on bad input */
if (parser.feedCharacter(buf[i]) > 0) {
break;
}
}
if (parser.status > 0)
break;
}
ASSERT_on_iret(ret, "recv");
ASSERT_pl(parser.status == 1);
// printf("Log: worker received clients request\n%s\n", client_request.toString().c_str());
return res;
}
void process_connection_output(int fd, const std::string& server_response) {
size_t N = server_response.size(), i = 0;
while (i < N) {
/* MSG_NOSIGNAL set to prevent SIGPIPE */
int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), MSG_NOSIGNAL);
ASSERT_on_iret(written, "sending");
ASSERT_pl(written > 0);
i += written;
}
printf("Log: worker: succesfully asnwered with response\n");
}
void process_connection(const SlaveTask& task, WorkersEnv& wte) {
ClientRequest client_request = process_connection_input(task.fd(), task.s_tips, wte);
std::string server_response = wte.wtec.guest_core(task, client_request);
process_connection_output(task.fd(), server_response);
}
void* worker_func(void* wte_ptr) { void* worker_func(void* wte_ptr) {
WorkersTaskEnv& wte = *((WorkersTaskEnv*)wte_ptr); WorkersEnv& wte = *((WorkersEnv*)wte_ptr);
WorkersEnvCommon& wtec = wte.wtec;
printf("Worker started\n"); printf("Worker started\n");
while (true) { while (true) {
try { try {
MutexLockGuard cb_lg(wte.corvee_bed, __func__); MutexLockGuard cb_lg(wtec.corvee_bed, __func__);
woke: woke:
if (wte.termination) if (wtec.termination)
break; break;
if (wte.queue.empty()) { if (wtec.queue.empty()) {
wte.corvee_bed.sleep(__func__); wtec.corvee_bed.sleep(__func__);
goto woke; goto woke;
} }
SlaveTask task; SlaveTask task;
wte.queue.pop_first(task); wtec.queue.pop_first(task);
process_connection(task, wte.guest_core); process_connection(task, wte);
} catch (const std::exception& e) { } catch (const std::exception& e) {
printf("Client request procession failure in worker\n"); printf("Client request procession failure in worker\n");
printf("%s\n", e.what()); printf("%s\n", e.what());
@ -140,25 +155,21 @@ namespace een9 {
return NULL; return NULL;
} }
void configure_socket_rcvsndtimeo(int fd, timeval tv) {
int ret;
ret = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(timeval));
ASSERT_on_iret_pl(ret);
ret = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(timeval));
ASSERT_on_iret_pl(ret);
}
// todo: retrieve address of connected client // todo: retrieve address of connected client
void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) { void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) {
WorkersTaskEnv wte(termination_trigger, params.guest_core); WorkersEnvCommon wtec(termination_trigger, params.guest_core);
ASSERT(params.slave_number > 0, "No workers spawned"); ASSERT(params.slave_number > 0, "No workers spawned");
size_t Nip = params.ports_to_listen.size(); size_t Nip = params.ports_to_listen.size();
ASSERT(Nip > 0, "No open listeting addresses"); ASSERT(Nip > 0, "No open listeting addresses");
std::vector<pthread_t> workers(params.slave_number); std::vector<pthread_t> workers(params.slave_number);
std::vector<uptr<WorkersEnv>> wtes(params.slave_number);
for (size_t i = 0; i < params.slave_number; i++) { for (size_t i = 0; i < params.slave_number; i++) {
pthread_create(&workers[i], NULL, worker_func, &wte); wtes[i] = std::make_unique<WorkersEnv>(wtec, i);
}
for (size_t i = 0; i < params.slave_number; i++) {
pthread_create(&workers[i], NULL, worker_func, wtes[i].get());
} }
try { try {
@ -195,8 +206,8 @@ namespace een9 {
printf("Entering mainloop\n"); printf("Entering mainloop\n");
ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout"); ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout");
while (true) { while (true) {
MutexLockGuard lg1(wte.corvee_bed, "poller termination check"); MutexLockGuard lg1(wtec.corvee_bed, "poller termination check");
if (wte.termination) if (wtec.termination)
break; break;
lg1.unlock(); lg1.unlock();
for (size_t i = 0; i < Nip; i++) { for (size_t i = 0; i < Nip; i++) {
@ -217,14 +228,14 @@ namespace een9 {
printf("Log: successful connection\n"); printf("Log: successful connection\n");
UniqueFdWrapper session_sock_fdw(session_sock); UniqueFdWrapper session_sock_fdw(session_sock);
configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout); configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout);
{ MutexLockGuard lg2(wte.corvee_bed, "poller adds connection"); { MutexLockGuard lg2(wtec.corvee_bed, "poller adds connection");
SlaveTask task{ConnectionInfo{}, std::move(session_sock_fdw), SlaveTask task{ConnectionInfo{}, std::move(session_sock_fdw),
EEN9_ServerTips{wte.queue.size(), EEN9_ServerTips{wtec.queue.size(),
params.s_conf.critical_load_1, params.s_conf.request_timeout}}; params.s_conf.critical_load_1, params.s_conf.request_timeout}};
if (wte.queue.size() < params.s_conf.critical_load_2) if (wtec.queue.size() < params.s_conf.critical_load_2)
wte.queue.push_back(std::move(task)); wtec.queue.push_back(std::move(task));
} }
wte.corvee_bed.din_don(); wtec.corvee_bed.din_don();
} catch (const std::exception& e) { } catch (const std::exception& e) {
printf("Error aceepting connection\n"); printf("Error aceepting connection\n");
printf("%s\n", e.what()); printf("%s\n", e.what());
@ -236,22 +247,13 @@ namespace een9 {
printf("System failure 2\n"); printf("System failure 2\n");
printf("%s\n", e.what()); printf("%s\n", e.what());
/* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind */ /* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind */
wte.termination = true; wtec.termination = true;
wte.corvee_bed.wake_them_all(); wtec.corvee_bed.wake_them_all();
} }
wte.termination = true; wtec.termination = true;
wte.corvee_bed.wake_them_all(); wtec.corvee_bed.wake_them_all();
for (size_t i = 0; i < params.slave_number; i++) { for (size_t i = 0; i < params.slave_number; i++) {
pthread_join(workers[i], NULL); pthread_join(workers[i], NULL);
} }
} }
void safe_electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) {
try {
electric_boogaloo(params, termination_trigger);
} catch (const std::exception& e) {
printf("System failure\n");
printf("%s\n", e.what());
}
}
} }

View File

@ -53,7 +53,6 @@ namespace een9 {
}; };
void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger); void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger);
void safe_electric_boogaloo(const MainloopParameters& params, bool& termination_trigger);
} }
#endif #endif

View File

@ -4,6 +4,9 @@
#include <signal.h> #include <signal.h>
#include <engine_engine_number_9/connecting_assets/static_asset_manager.h> #include <engine_engine_number_9/connecting_assets/static_asset_manager.h>
#include <assert.h> #include <assert.h>
#include <sqlite3.h>
#include <libjsonincpp/string_representation.h>
#include <libregexis024vm/vm_opcodes.h>
bool termination = false; bool termination = false;
@ -11,18 +14,43 @@ void sigterm_action(int ) {
termination = true; termination = true;
} }
int main(int argc, char** argv){ void usage(char** argv) {
een9_ASSERT_pl(argc > 0);
if (argc < 1 + 2) {
printf("Usage: %s <file with settings> <assets folder>\n", argv[0]); printf("Usage: %s <file with settings> <assets folder>\n", argv[0]);
exit(1); exit(1);
} }
// if ()
std::string unsafe_client_request_stringification(const een9::ClientRequest& req) {
std::string text = "\n\nGot some cool stuff\n";
text += (req.method + " " + req.uri_path + " " + req.http_version + "\n");
for (auto& p: req.headers) {
text += p.first; text += ": "; text += p.second; text += "\n";
}
text += "Body\n"; text += req.body; text += "\n";
return text;
}
int main(int argc, char** argv){
printf("%s\n", regexis024::opcode_to_str(regexis024::opcode_t::DIE));
try {
een9_ASSERT_pl(argc > 0);
if (argc < 1 + 2)
usage(argv);
if (!een9::isRegularFile(argv[1]) || !een9::endsIn(argv[1], ".json")) {
printf("\"%s\" is not a json file\n", argv[1]);
usage(argv);
}
std::string config_file = argv[1];
if (!een9::isDirectory(argv[2])) { if (!een9::isDirectory(argv[2])) {
printf("\"%s\" is not a directory\n", argv[2]); printf("\"%s\" is not a directory\n", argv[2]);
usage(argv);
} }
std::string assets_dir = argv[2]; std::string assets_dir = argv[2];
std::string config_text;
een9::readFile(config_file, config_text);
json::JSON config = json::parse_str_flawless(config_text);
een9_ASSERT(config.isDictionary(), "config root is not dictionary");
een9::StaticAssetManagerSlaveModule samI; een9::StaticAssetManagerSlaveModule samI;
samI.update({ samI.update({
een9::StaticAssetManagerRule{assets_dir + "/html", "/assets/html", {{".html", "text/html"}} }, een9::StaticAssetManagerRule{assets_dir + "/html", "/assets/html", {{".html", "text/html"}} },
@ -34,15 +62,20 @@ int main(int argc, char** argv){
params.guest_core = [&samI](const een9::SlaveTask& task, const een9::ClientRequest& req) -> std::string { params.guest_core = [&samI](const een9::SlaveTask& task, const een9::ClientRequest& req) -> std::string {
een9::StaticAsset sa; een9::StaticAsset sa;
int ret; int ret;
ret = samI.get_asset(req.url, sa); printf("%s", unsafe_client_request_stringification(req).c_str());
if (ret >= 0) { if (req.uri_path == "/output") {
return een9::form_http_server_response_200(sa.type, sa.content); std::string text = unsafe_client_request_stringification(req);
return een9::form_http_server_response_200("text/plain", text);
} }
if (req.url == "/" || req.url == "/index.html") { if (req.uri_path == "/" || req.uri_path == "/index.html") {
ret = samI.get_asset("/assets/html/test.html", sa); ret = samI.get_asset("/assets/html/test.html", sa);
een9_ASSERT_pl(ret == 0); een9_ASSERT_pl(ret == 0);
return een9::form_http_server_response_200(sa.type, sa.content); return een9::form_http_server_response_200(sa.type, sa.content);
} }
ret = samI.get_asset(req.uri_path, sa);
if (ret >= 0) {
return een9::form_http_server_response_200(sa.type, sa.content);
}
return een9::form_http_server_response_404("text/html", "<h1> Not found! </h1>"); return een9::form_http_server_response_404("text/html", "<h1> Not found! </h1>");
}; };
params.ports_to_listen = {1025}; params.ports_to_listen = {1025};
@ -50,7 +83,11 @@ int main(int argc, char** argv){
params.open_admin_listener = false; params.open_admin_listener = false;
signal(SIGINT, sigterm_action); signal(SIGINT, sigterm_action);
signal(SIGTERM, sigterm_action);
een9::safe_electric_boogaloo(params, termination); een9::electric_boogaloo(params, termination);
} catch (std::exception& e) {
printf("System failure\n%s\n", e.what());
}
return 0; return 0;
} }