diff options
Diffstat (limited to 'Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp')
| -rw-r--r-- | Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp | 378 |
1 files changed, 378 insertions, 0 deletions
diff --git a/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp b/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp new file mode 100644 index 0000000..0439d23 --- /dev/null +++ b/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp @@ -0,0 +1,378 @@ +/*!
+ * \file TcpConnection.cpp
+ * \author S. Eisenhauer
+ * \date 27.10.2011
+ * \brief Implementation of CTcpConnection
+ */
+#include <boost/bind.hpp>
+#include <string.h>
+#include <new>
+#include <NetworkMessages.h>
+#include "TcpConnection.h"
+#include "CPluginExecutor.h"
+#include "protocol.h"
+#include "global.h"
+
+/// static memory for connection objects
+static char g_acConnections[nMAX_CONNECTIONS * sizeof(CTcpConnection)];
+
+/// global connection container
+tConnectionSet g_astActiveConnections;
+
+/*! constructor
+* \param[in] io_service reference to the io_service object
+* \param[in] pxExecutor pointer to the main application
+* \param[in] u8ConNum number of the connection
+*/
+CTcpConnection::CTcpConnection
+(boost::asio::io_service& io_service , CPluginExecutor* pxExecutor , uint8_t u8ConNum)
+:m_xInStream(m_xRxBuff.elems,nNETWORK_BUFFSIZE)
+,m_xOutStream(m_xTxBuff.elems,nNETWORK_BUFFSIZE)
+,m_xSocket(io_service),m_pxExecutor(pxExecutor)
+,m_u8ConNum(u8ConNum),m_boActive(false),m_xUploadBuffer(nUPLOAD_BUFFERSIZE)
+,m_i32UploadBufferUnread(0),m_boUploadActive(false)
+{
+ DEBUG_PRINT("entry %d",m_u8ConNum);
+ DEBUG_PRINT("exit %d",m_u8ConNum);
+}
+
+/// destructor
+CTcpConnection::~CTcpConnection(void)
+{
+ DEBUG_PRINT("entry %d",m_u8ConNum);
+ DEBUG_PRINT("exit %d",m_u8ConNum);
+}
+
+/// static factory method for connections
+/// \param[in] xIoService io_service object
+/// \param[in] pxExecutor pointer to the main application
+/// \return pointer to connection object or NULL pointer
+/// \warning check returned pointer
+CTcpConnection::tConnectionPtr CTcpConnection::pxCreate(boost::asio::io_service& xIoService, CPluginExecutor* pxExecutor)
+{
+ DEBUG_PRINT("created new tcp connection");
+ uint8_t u8ConCount = 0;
+ char* pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ];
+ tConIter xConIter(g_astActiveConnections.begin());
+ tConnectionPtr pstNewConnection = NULL;
+
+ // find free memory for a new connection
+ for(; xConIter != g_astActiveConnections.end(); ++xConIter)
+ {
+ if( reinterpret_cast<char*>( &(*xConIter) ) != pcFreeMemorySlot )
+ {
+ break;
+ }
+ u8ConCount++;
+ pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ];
+ }
+
+ // if no memory is available for a new connection
+ if( u8ConCount >= nMAX_CONNECTIONS)
+ {
+ // return null pointer
+ return pstNewConnection;
+ }
+ DEBUG_PRINT("using connection %d at %#x",u8ConCount,(int) pcFreeMemorySlot);
+
+ // place a new connection object at the found address
+ pstNewConnection = new(pcFreeMemorySlot) CTcpConnection(xIoService,pxExecutor,u8ConCount);
+
+ // place the new connection in the container
+ g_astActiveConnections.insert(*pstNewConnection);
+
+ // return a pointer to the new connection object
+ return pstNewConnection;
+}
+
+/// return network socket
+boost::asio::ip::tcp::socket& CTcpConnection::xGetSocket()
+{
+ return m_xSocket;
+}
+
+/// async wait for a new request from the client
+void CTcpConnection::vWaitForClientRequest()
+{
+ // initialize receive buffer
+ memset(m_xRxBuff.elems,0,nNETWORK_BUFFSIZE);
+ // reset instream
+ m_xInStream.vReset();
+ // async wait, register read handler
+ boost::asio::async_read(
+ m_xSocket,
+ boost::asio::buffer(m_xRxBuff),
+ boost::asio::transfer_at_least( sizeof(tstNetworkMessageHeader) ),
+ boost::bind(&CTcpConnection::vReadHandler, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred
+ )
+ );
+}
+
+/// handle data written event
+/// \param[in] err error code of the transmission
+/// \param[in] bytesTransferred number of bytes transferred
+void CTcpConnection::vWriteHandler(const boost::system::error_code& err, size_t bytesTransferred)
+{
+ if(!err)
+ {
+// DEBUG_PRINT("Transferred %d bytes",bytesTransferred);
+ // initialize transmit buffer
+ memset(m_xTxBuff.elems,0,nNETWORK_BUFFSIZE);
+ //reset outstream
+ m_xOutStream.vReset();
+ vUploadNextMessage();
+ }
+ else
+ {
+ DEBUG_PRINT("error: [%s]",err.message().c_str());
+ DEBUG_PRINT("removing from connection list %d",m_u8ConNum);
+ // in case of an error, remove this object from the container
+ g_astActiveConnections.erase(*this);
+ vSetActive(false);
+ }
+}
+
+/// handle data read event
+/// \param[in] err error code of the transmission
+/// \param[in] bytesTransferred number of bytes transferred
+void CTcpConnection::vReadHandler(const boost::system::error_code& err, size_t bytesTransferred)
+{
+ if(!err)
+ {
+ bool boSendResponse = true;
+ uint32_t u32TxMsgLen = sizeof(tstNetworkMessageHeader);
+ uint32_t u32Tmp;
+ const CNetworkDatacontainer::tstRxNetworkMessageMsg& stRxMsg
+ = *(reinterpret_cast<CNetworkDatacontainer::tstRxNetworkMessageMsg*>(m_xNetworkDatacontainer.pvGetVarPtr(nenNumberOfNetworkMessageTypes)));
+ const CNetworkDatacontainer::tstReqStartPlugin& stReqStartPlugin
+ = *(reinterpret_cast<CNetworkDatacontainer::tstReqStartPlugin*>(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStartPlugin)));
+ const CNetworkDatacontainer::tstReqStopPlugin& stReqStopPlugin
+ = *(reinterpret_cast<CNetworkDatacontainer::tstReqStopPlugin*>(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStopPlugin)));
+ const CNetworkDatacontainer::tstReqImportLog& stReqImportLog
+ = *(reinterpret_cast<CNetworkDatacontainer::tstReqImportLog*>(m_xNetworkDatacontainer.pvGetVarPtr(nenReqImportLog)));
+ CNetworkDatacontainer::tstRespImportLog& stRespImportLog
+ = *(reinterpret_cast<CNetworkDatacontainer::tstRespImportLog*>(m_xNetworkDatacontainer.pvGetVarPtr(nenRespImportLog)));
+ const CNetworkDatacontainer::tstReqChangeMsgData& stReqChangeMsgData
+ = *(reinterpret_cast<CNetworkDatacontainer::tstReqChangeMsgData*>(m_xNetworkDatacontainer.pvGetVarPtr(nenReqChangeMsgData)));
+ CNetworkDatacontainer::tstRespEnumerateLogs& stRespEnumerateLogs
+ = *(reinterpret_cast<CNetworkDatacontainer::tstRespEnumerateLogs*>(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateLogs)));
+ CNetworkDatacontainer::tstRespEnumerateInterfaces& stRespEnumerateInterfaces
+ = *(reinterpret_cast<CNetworkDatacontainer::tstRespEnumerateInterfaces*>(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateInterfaces)));
+
+ do
+ {
+ uint32_t u32Offset = m_xInStream.u32GetByteCount();
+ sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( !m_xInStream.boIsOK() )
+ {
+ tenStreamError enError;
+ m_xInStream.vGetError(enError);
+ ERROR_PRINT("network instream has error %d",enError);
+ break;
+ }
+ m_xInStream.vSetPosition(u32Offset);
+ // DEBUG_PRINT("Received %u bytes", bytesTransferred);
+
+ switch( stRxMsg.stHeader.enMessageType )
+ {
+ case nenReqEnumerateInterfaces:
+ DEBUG_PRINT("Received command nenReqEnumerateInterfaces");
+ u32Tmp = nNETWORK_DATALENGTH;
+ sxReqEnumerateInterfacesMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ memset(stRespEnumerateInterfaces.acInterfaces,0,nNETWORK_DATALENGTH);
+ m_pxExecutor->vGetInterfaces(
+ stRespEnumerateInterfaces.acInterfaces
+ ,u32Tmp);
+ u32TxMsgLen += u32Tmp;
+ sxRespEnumerateInterfacesMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqShutdown:
+ DEBUG_PRINT("Received command nenReqShutdown");
+ sxReqShutdownMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ m_pxExecutor->vStop();
+ sxRespShutdownMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqStartPlugin:
+ DEBUG_PRINT("Received command nenReqStartPlugin");
+ sxReqStartPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ m_pxExecutor->vStartPlugin(stReqStartPlugin.i32Interface
+ ,stReqStartPlugin.acFilename);
+ sxRespStartPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqStopPlugin:
+ DEBUG_PRINT("Received command nenReqStopPlugin");
+ sxReqStopPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ m_pxExecutor->vStopPlugin(stReqStopPlugin.i32Interface);
+ sxRespStopPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqImportLog:
+ DEBUG_PRINT("Received command nenReqImportLog");
+ // number of data bytes in transmit buffer available for string
+ u32Tmp = nNETWORK_DATALENGTH;
+ sxReqImportLogMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ memset(stRespImportLog.acFilename,0,nNETWORK_DATALENGTH);
+ m_pxExecutor->enGetLog(
+ stReqImportLog.acFilename
+ ,stRespImportLog.acFilename
+ ,u32Tmp);
+ u32TxMsgLen += u32Tmp;
+ sxRespImportLogMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqChangeMsgData:
+ sxReqChangeMsgDataMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ m_pxExecutor->vChangeMsgData(stReqChangeMsgData.i32Interface
+ ,stReqChangeMsgData.u32CanMsgId
+ ,stReqChangeMsgData.au8Data);
+ sxRespChangeMsgDataMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenReqEnumerateLogs:
+ DEBUG_PRINT("Received command nenReqEnumerateLogs");
+ sxReqEnumerateLogsMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ // number of data bytes in transmit buffer available for string
+ u32Tmp = nNETWORK_DATALENGTH;
+ memset(stRespEnumerateLogs.acFilenames,0,nNETWORK_DATALENGTH);
+ m_pxExecutor->enEnumerateLogs(stRespEnumerateLogs.acFilenames,u32Tmp);
+ u32TxMsgLen += u32Tmp;
+ sxRespEnumarateLogsMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ case nenRespUploadLogMsg:
+// DEBUG_PRINT("Received nenRespUploadLog");
+ sxRespUploadLogMsgMessage.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ vUploadNextMessage();
+ boSendResponse = false;
+ }
+ break;
+ default:
+ DEBUG_PRINT("Received unknown message type");
+ sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream);
+ if( m_xInStream.boIsOK() )
+ {
+ sxRespUnknownReqMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ }
+ break;
+ }
+ if( !m_xInStream.boIsOK() )
+ {
+ tenStreamError enError;
+ m_xInStream.vGetError(enError);
+ ERROR_PRINT("network instream has error %d",enError);
+ }
+ if( boSendResponse )
+ {
+ if( m_xOutStream.boIsOK() )
+ {
+ boost::asio::async_write(
+ m_xSocket,
+ boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()),
+ boost::bind(
+ &CTcpConnection::vWriteHandler,
+ this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ else
+ {
+ tenStreamError enError;
+ m_xOutStream.vGetError(enError);
+ ERROR_PRINT("network outstream has error %d",enError);
+ m_xOutStream.vReset();
+ }
+ }
+ } while ( m_xInStream.u32GetByteCount() < bytesTransferred );
+ vWaitForClientRequest();
+ }
+ else
+ {
+ ERROR_PRINT("error: [%s]",err.message().c_str());
+ DEBUG_PRINT("removing from connection list %d",m_u8ConNum);
+ // in case of an error, remove this object from the container
+ vSetActive(false);
+ g_astActiveConnections.erase(*this);
+ }
+}
+
+/*!
+ * \brief send the next message in the buffer over the network
+ */
+void CTcpConnection::vUploadNextMessage()
+{
+ boost::lock_guard<x2e::Mutex> lock(m_xUploadBufferMutex);
+ if( m_boUploadActive )
+ {
+ CNetworkDatacontainer::tstReqUploadLogMsg& stReqUploadLogMsg
+ = *(reinterpret_cast<CNetworkDatacontainer::tstReqUploadLogMsg*>(m_xNetworkDatacontainer.pvGetVarPtr(nenReqUploadLogMsg)));
+ stReqUploadLogMsg.stLogMessage = m_xUploadBuffer[--m_i32UploadBufferUnread];
+ if( m_i32UploadBufferUnread == 0 )
+ {
+ m_boUploadActive = false;
+ }
+// DEBUG_PRINT("%u %x %d",stLogMsg.u32MsgId,stLogMsg.au8Data[0],m_i32UploadBufferUnread);
+
+ sxReqUploadLogMsgMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream);
+ if( m_xOutStream.boIsOK() )
+ {
+ boost::asio::async_write(
+ m_xSocket,
+ boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()),
+ boost::bind(
+ &CTcpConnection::vWriteHandler,
+ this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ else
+ {
+ tenStreamError enError;
+ m_xOutStream.vGetError(enError);
+ ERROR_PRINT("network outstream has error %d",enError);
+ m_xOutStream.vReset();
+ }
+ }
+}
+/**
+ * \brief adds a logged message to the upload buffer and triggers start of upload
+ * \param[in] stLogMsg message to upload
+ * \warning should run in network task
+ */
+void CTcpConnection::vUploadLogMessage(const tstLogMessage& stLogMsg)
+{
+// DEBUG_PRINT("entry");
+ boost::lock_guard<x2e::Mutex> lock(m_xUploadBufferMutex);
+ m_xUploadBuffer.push_front(stLogMsg);
+ ++m_i32UploadBufferUnread;
+ if( !m_boUploadActive )
+ {
+ m_boUploadActive = true;
+ vUploadNextMessage();
+ }
+// DEBUG_PRINT("exit");
+}
|
