basic epoll implementation

This commit is contained in:
2019-05-20 13:28:51 +02:00
parent 8e66213d6e
commit 6dbe2c26d8
3 changed files with 30 additions and 6 deletions

View File

@ -141,6 +141,11 @@ void Socket::cleanUp() {
} }
int Socket::getFD()
{
return sockDesc;
}
unsigned short Socket::resolveService(const string &service, unsigned short Socket::resolveService(const string &service,
const string &protocol) { const string &protocol) {
struct servent *serv; /* Structure containing service information */ struct servent *serv; /* Structure containing service information */

View File

@ -104,6 +104,8 @@ public:
void setKeepalive(); void setKeepalive();
void setBlocking(bool flag); void setBlocking(bool flag);
int getFD();
private: private:
// Prevent the user from trying to use value semantics on this object // Prevent the user from trying to use value semantics on this object
Socket(const Socket &sock); Socket(const Socket &sock);

View File

@ -3,20 +3,23 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <sys/epoll.h>
#include <string> #include <string>
#include <cstdlib> #include <cstdlib>
#include <vector> #include <vector>
#include <thread> #include <thread>
#include <mutex>
#include <signal.h> #include <signal.h>
#include <unistd.h> #include <unistd.h>
#include "serial_io.h" #include "serial_io.h"
#include "Socket.h" #include "Socket.h"
#define VERSION "v0.4" #define VERSION "v0.5"
volatile bool stop = false; volatile bool stop = false;
volatile bool resettSerialPort = false; volatile bool resettSerialPort = false;
struct epoll_event ev;
void intHandler(int sig) void intHandler(int sig)
{ {
@ -100,18 +103,21 @@ static int parseCmdArgs(int argc, char** argv, Config *config)
return 0; return 0;
} }
void acceptThreadFunction( TCPServerSocket* servSock, std::vector<TCPSocket*>* clientSockets ) void acceptThreadFunction( TCPServerSocket* servSock, std::vector<TCPSocket*>* clientSockets, std::mutex* clientSocketsMutex, int pollQue )
{ {
while(!stop) while(!stop)
{ {
TCPSocket* newSock = servSock->accept(); TCPSocket* newSock = servSock->accept();
if(newSock != nullptr) if(newSock != nullptr)
{ {
clientSocketsMutex->lock();
clientSockets->push_back(newSock); // Wait for a client to connect clientSockets->push_back(newSock); // Wait for a client to connect
clientSocketsMutex->unlock();
epoll_ctl(pollQue, EPOLL_CTL_ADD, newSock->getFD(), &ev);
clientSockets->back()->send("UVOS serial multiplexer " VERSION "\n", sizeof("UVOS serial multiplexer " VERSION "\n")-1); clientSockets->back()->send("UVOS serial multiplexer " VERSION "\n", sizeof("UVOS serial multiplexer " VERSION "\n")-1);
std::cout<<"got client\n"; std::cout<<"got client\n";
} }
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
} }
@ -141,16 +147,25 @@ int main(int argc, char* argv[])
std::cout<<"UVOS serial mulitplexer "<<VERSION<<'\n'; std::cout<<"UVOS serial mulitplexer "<<VERSION<<'\n';
int pollQue = epoll_create1(0);
ev.events = EPOLLIN;
int serial = openSerialPort(config); int serial = openSerialPort(config);
if(!config.noSerial) if(!config.noSerial)
{ {
std::cout<<"Using serial port: "<<config.portFileName<<" at "<<config.baud<<" baud\n"; std::cout<<"Using serial port: "<<config.portFileName<<" at "<<config.baud<<" baud\n";
serial = serialport_init(config.portFileName.c_str(), config.baud); serial = serialport_init(config.portFileName.c_str(), config.baud);
if(serial == -1) return 1; if(serial == -1) return 1;
else
{
struct epoll_event ev;
epoll_ctl(pollQue, EPOLL_CTL_ADD, serial, &ev);
}
tcflush(serial, TCIOFLUSH); tcflush(serial, TCIOFLUSH);
} }
else std::cout<<"Sinkless mode\n"; else std::cout<<"Sinkless mode\n";
std::mutex clientSocketsMutex;
std::vector<TCPSocket*> clientSockets; std::vector<TCPSocket*> clientSockets;
std::thread* acceptThread; std::thread* acceptThread;
@ -161,7 +176,7 @@ int main(int argc, char* argv[])
{ {
servSock = new TCPServerSocket(config.port, 5, true); // Server Socket object servSock = new TCPServerSocket(config.port, 5, true); // Server Socket object
servSock->setBlocking(false); servSock->setBlocking(false);
acceptThread = new std::thread(acceptThreadFunction, servSock, &clientSockets); acceptThread = new std::thread(acceptThreadFunction, servSock, &clientSockets, &clientSocketsMutex, pollQue);
} }
catch(SocketException &e) catch(SocketException &e)
{ {
@ -177,16 +192,18 @@ int main(int argc, char* argv[])
if(config.reinit) alarm(600); if(config.reinit) alarm(600);
std::cout<<"starting loop\n"; std::cout<<"starting loop\n";
while(!stop) while(!stop)
{ {
epoll_wait(pollQue, &ev, 1, 2000);
int readlen = sRead(serial, buffer, 4096); int readlen = sRead(serial, buffer, 4096);
for(unsigned int i = 0; i < clientSockets.size(); i++) for(unsigned int i = 0; i < clientSockets.size(); i++)
{ {
clientSocketsMutex.lock();
try try
{ {
if(readlen > 0) if(readlen > 0)
{ {
if(config.verbose) if(config.verbose)
@ -262,6 +279,7 @@ int main(int argc, char* argv[])
i--; i--;
if(i < 0) i=0; if(i < 0) i=0;
} }
clientSocketsMutex.unlock();
} }
if(resettSerialPort) if(resettSerialPort)
{ {
@ -269,7 +287,6 @@ int main(int argc, char* argv[])
serial = openSerialPort(config); serial = openSerialPort(config);
resettSerialPort = false; resettSerialPort = false;
} }
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
acceptThread->join(); acceptThread->join();