#include "running_mainloop.h" #include #include #include #include #include #include #include #include #include "thread_synchronization.h" #include "os_utils.h" #include "http_structures/client_request_parse.h" #include "http_structures/response_gen.h" #include "baza_inter.h" namespace een9 { struct QElementHttpConnections { SlaveTask task; QElementHttpConnections* nxt = NULL; explicit QElementHttpConnections(SlaveTask task): task(std::move(task)) {} }; struct WorkersTaskQueue { QElementHttpConnections* first = NULL; QElementHttpConnections** afterLastPtr; size_t sz = 0; WorkersTaskQueue() { afterLastPtr = &first; } bool empty() const { return sz == 0; } size_t size() const { return sz; } void push_back(SlaveTask task) { /* Throws a goddamn execption. Because why not. Ofcourse everything has to throw an exception */ /* CLion says. Allocated memory is leaking. YOUR MOTHER IS LEAKING YOU FOOL!! MY CODE IS FINE!! */ QElementHttpConnections* el = new QElementHttpConnections(std::move(task)); /* Exception does not leave queue in incorrect state */ *afterLastPtr = el; afterLastPtr = &(el->nxt); sz++; } void pop_first(SlaveTask& ret_task) { assert(!empty()); ret_task = std::move(first->task); if (sz == 1) { delete first; first = NULL; afterLastPtr = &first; sz = 0; } else { /* Before I popped the first, this element was second, but now it took place of the first */ QElementHttpConnections* old_deut = first->nxt; delete first; first = old_deut; sz--; } } }; struct WorkersEnvCommon { /* This alarm notifies about new tasks and termination signal. Because we are polite people, we don't cancel threads */ CondVarBedObj corvee_bed; WorkersTaskQueue queue; bool& termination; guest_core_t guest_core; /* Parser programs */ ClientRequestParser_CommonPrograms parser_programs; WorkersEnvCommon(bool& term, guest_core_t g_c): termination(term), guest_core(std::move(g_c)){} }; struct WorkersEnv { WorkersEnvCommon& wtec; worker_id_t id; ClientRequestParser_WorkerBuffers personal_parser_buffer; explicit WorkersEnv(WorkersEnvCommon& wtec, worker_id_t id): wtec(wtec), id(id), personal_parser_buffer(wtec.parser_programs){} }; // todo: add timeout for multiple bytes, add more settings ClientRequest process_connection_input(int fd, const EEN9_ServerTips& s_tips, WorkersEnv& wte) { ClientRequest res; ClientHttpRequestParser_Ctx parser(res, wte.personal_parser_buffer, wte.wtec.parser_programs); int ret; char buf[2048]; ASSERT_pl(parser.status == 0); while ((ret = (int)recv(fd, buf, 2048, 0)) > 0) { for (size_t i = 0; i < ret; i++) { /* Throws ServerError on bad input */ if (parser.feedCharacter(buf[i]) > 0) { break; } } if (parser.status > 0) break; } ASSERT_on_iret(ret, "recv"); ASSERT_pl(parser.status == 1); // printf("Log: worker received clients request\n%s\n", client_request.toString().c_str()); return res; } void process_connection_output(int fd, const std::string& server_response) { size_t N = server_response.size(), i = 0; while (i < N) { /* MSG_NOSIGNAL set to prevent SIGPIPE */ int written = (int)send(fd, &server_response[i], std::min(2048lu, N - i), MSG_NOSIGNAL); ASSERT_on_iret(written, "sending"); ASSERT_pl(written > 0); i += written; } printf("Log: worker: succesfully asnwered with response\n"); } void process_connection(const SlaveTask& task, WorkersEnv& wte) { ClientRequest client_request = process_connection_input(task.fd(), task.s_tips, wte); std::string server_response = wte.wtec.guest_core(task, client_request, wte.id); process_connection_output(task.fd(), server_response); } void* worker_func(void* wte_ptr) { WorkersEnv& wte = *((WorkersEnv*)wte_ptr); WorkersEnvCommon& wtec = wte.wtec; printf("Worker started\n"); while (true) { try { MutexLockGuard cb_lg(wtec.corvee_bed, __func__); woke: if (wtec.termination) break; if (wtec.queue.empty()) { wtec.corvee_bed.sleep(__func__); goto woke; } SlaveTask task; wtec.queue.pop_first(task); process_connection(task, wte); } catch (const std::exception& e) { printf("Client request procession failure in worker\n"); printf("%s\n", e.what()); /* Under mysterious some circumstances, in this place destructor of string in SystemError causes segfault. I can't fix that */ } } printf("Worker finished\n"); return NULL; } // todo: retrieve address of connected client void electric_boogaloo(const MainloopParameters& params, bool& termination_trigger) { WorkersEnvCommon wtec(termination_trigger, params.guest_core); ASSERT(params.slave_number > 0, "No workers spawned"); size_t Nip = params.ports_to_listen.size(); ASSERT(Nip > 0, "No open listeting addresses"); std::vector workers(params.slave_number); std::vector> wtes(params.slave_number); for (size_t i = 0; i < params.slave_number; i++) { wtes[i] = std::make_unique(wtec, (worker_id_t)i); } for (size_t i = 0; i < params.slave_number; i++) { pthread_create(&workers[i], NULL, worker_func, wtes[i].get()); } try { int ret; std::vector listening_socks(Nip); for (size_t i = 0; i < Nip; i++) { printf("Creating listening socket\n"); uint16_t port = params.ports_to_listen[i]; int listening_socket_fd = socket(AF_INET, SOCK_STREAM, 0); ASSERT_on_iret(listening_socket_fd, "Listening socket creation"); UniqueFdWrapper listening_socket(listening_socket_fd); printf("Listening socket created\n"); sockaddr_in listening_address; listening_address.sin_family = AF_INET; listening_address.sin_port = htons(port); uint32_t lca = (127u << 24) | 1; listening_address.sin_addr.s_addr = htonl(lca); int reuseaddr_nozero_option_value = 1; ret = setsockopt(listening_socket_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_nozero_option_value, sizeof(int)); ASSERT_on_iret(ret, "setting SO_REUSEADDR befire binding to address"); ret = bind(listening_socket(), (const sockaddr*)&listening_address, sizeof(listening_address)); ASSERT_on_iret(ret, "binding to INADDR_ANY:" + std::to_string(port)); printf("Binded socket to address\n"); ret = listen(listening_socket(), 128); ASSERT_on_iret(ret, "listening for connections"); printf("Listening socket succesfully started listening\n"); listening_socks[i] = std::move(listening_socket); } std::vector pollfds(Nip); for (size_t i = 0; i < Nip; i++) { pollfds[i].fd = listening_socks[i](); pollfds[i].events = POLLRDNORM; } printf("Entering mainloop\n"); ASSERT(params.mainloop_recheck_interval_us > 0, "Incorrect poll timeout"); while (true) { MutexLockGuard lg1(wtec.corvee_bed, "poller termination check"); if (wtec.termination) break; lg1.unlock(); for (size_t i = 0; i < Nip; i++) { pollfds[i].revents = 0; } errno = 0; ret = poll(pollfds.data(), Nip, params.mainloop_recheck_interval_us); if (errno == EINTR) break; ASSERT_on_iret(ret, "polling"); for (size_t i = 0; i < Nip; i++) { if ((pollfds[i].revents & POLLRDNORM)) { try { sockaddr client_address; socklen_t client_addr_len = sizeof(client_address); int session_sock = accept(pollfds[i].fd, &client_address, &client_addr_len); ASSERT_on_iret(session_sock, "Failed to accept incoming connection"); printf("Log: successful connection\n"); UniqueFdWrapper session_sock_fdw(session_sock); configure_socket_rcvsndtimeo(session_sock_fdw(), params.s_conf.request_timeout); { MutexLockGuard lg2(wtec.corvee_bed, "poller adds connection"); SlaveTask task{ConnectionInfo{}, std::move(session_sock_fdw), EEN9_ServerTips{wtec.queue.size(), params.s_conf.critical_load_1, params.s_conf.request_timeout}}; if (wtec.queue.size() < params.s_conf.critical_load_2) wtec.queue.push_back(std::move(task)); } wtec.corvee_bed.din_don(); } catch (const std::exception& e) { printf("Error aceepting connection\n"); printf("%s\n", e.what()); } } } } } catch (const std::exception& e) { printf("System failure 2\n"); printf("%s\n", e.what()); /* There is no need to tiptoe around this multi-access field. It is write-onle-and-for-good-kind */ wtec.termination = true; wtec.corvee_bed.wake_them_all(); } wtec.termination = true; wtec.corvee_bed.wake_them_all(); for (size_t i = 0; i < params.slave_number; i++) { pthread_join(workers[i], NULL); } } }