From 33613a85afc4b1481367fbe92a17ee59c240250b Mon Sep 17 00:00:00 2001 From: Sven Eisenhauer Date: Fri, 10 Nov 2023 15:11:48 +0100 Subject: add new repo --- .../src/XorayaPluginExecutor/TcpConnection.cpp | 378 +++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp (limited to 'Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp') diff --git a/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp b/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp new file mode 100644 index 0000000..0439d23 --- /dev/null +++ b/Master/Masterarbeit/src/XorayaPluginExecutor/TcpConnection.cpp @@ -0,0 +1,378 @@ +/*! + * \file TcpConnection.cpp + * \author S. Eisenhauer + * \date 27.10.2011 + * \brief Implementation of CTcpConnection + */ +#include +#include +#include +#include +#include "TcpConnection.h" +#include "CPluginExecutor.h" +#include "protocol.h" +#include "global.h" + +/// static memory for connection objects +static char g_acConnections[nMAX_CONNECTIONS * sizeof(CTcpConnection)]; + +/// global connection container +tConnectionSet g_astActiveConnections; + +/*! constructor +* \param[in] io_service reference to the io_service object +* \param[in] pxExecutor pointer to the main application +* \param[in] u8ConNum number of the connection +*/ +CTcpConnection::CTcpConnection +(boost::asio::io_service& io_service , CPluginExecutor* pxExecutor , uint8_t u8ConNum) +:m_xInStream(m_xRxBuff.elems,nNETWORK_BUFFSIZE) +,m_xOutStream(m_xTxBuff.elems,nNETWORK_BUFFSIZE) +,m_xSocket(io_service),m_pxExecutor(pxExecutor) +,m_u8ConNum(u8ConNum),m_boActive(false),m_xUploadBuffer(nUPLOAD_BUFFERSIZE) +,m_i32UploadBufferUnread(0),m_boUploadActive(false) +{ + DEBUG_PRINT("entry %d",m_u8ConNum); + DEBUG_PRINT("exit %d",m_u8ConNum); +} + +/// destructor +CTcpConnection::~CTcpConnection(void) +{ + DEBUG_PRINT("entry %d",m_u8ConNum); + DEBUG_PRINT("exit %d",m_u8ConNum); +} + +/// static factory method for connections +/// \param[in] xIoService io_service object +/// \param[in] pxExecutor pointer to the main application +/// \return pointer to connection object or NULL pointer +/// \warning check returned pointer +CTcpConnection::tConnectionPtr CTcpConnection::pxCreate(boost::asio::io_service& xIoService, CPluginExecutor* pxExecutor) +{ + DEBUG_PRINT("created new tcp connection"); + uint8_t u8ConCount = 0; + char* pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ]; + tConIter xConIter(g_astActiveConnections.begin()); + tConnectionPtr pstNewConnection = NULL; + + // find free memory for a new connection + for(; xConIter != g_astActiveConnections.end(); ++xConIter) + { + if( reinterpret_cast( &(*xConIter) ) != pcFreeMemorySlot ) + { + break; + } + u8ConCount++; + pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ]; + } + + // if no memory is available for a new connection + if( u8ConCount >= nMAX_CONNECTIONS) + { + // return null pointer + return pstNewConnection; + } + DEBUG_PRINT("using connection %d at %#x",u8ConCount,(int) pcFreeMemorySlot); + + // place a new connection object at the found address + pstNewConnection = new(pcFreeMemorySlot) CTcpConnection(xIoService,pxExecutor,u8ConCount); + + // place the new connection in the container + g_astActiveConnections.insert(*pstNewConnection); + + // return a pointer to the new connection object + return pstNewConnection; +} + +/// return network socket +boost::asio::ip::tcp::socket& CTcpConnection::xGetSocket() +{ + return m_xSocket; +} + +/// async wait for a new request from the client +void CTcpConnection::vWaitForClientRequest() +{ + // initialize receive buffer + memset(m_xRxBuff.elems,0,nNETWORK_BUFFSIZE); + // reset instream + m_xInStream.vReset(); + // async wait, register read handler + boost::asio::async_read( + m_xSocket, + boost::asio::buffer(m_xRxBuff), + boost::asio::transfer_at_least( sizeof(tstNetworkMessageHeader) ), + boost::bind(&CTcpConnection::vReadHandler, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); +} + +/// handle data written event +/// \param[in] err error code of the transmission +/// \param[in] bytesTransferred number of bytes transferred +void CTcpConnection::vWriteHandler(const boost::system::error_code& err, size_t bytesTransferred) +{ + if(!err) + { +// DEBUG_PRINT("Transferred %d bytes",bytesTransferred); + // initialize transmit buffer + memset(m_xTxBuff.elems,0,nNETWORK_BUFFSIZE); + //reset outstream + m_xOutStream.vReset(); + vUploadNextMessage(); + } + else + { + DEBUG_PRINT("error: [%s]",err.message().c_str()); + DEBUG_PRINT("removing from connection list %d",m_u8ConNum); + // in case of an error, remove this object from the container + g_astActiveConnections.erase(*this); + vSetActive(false); + } +} + +/// handle data read event +/// \param[in] err error code of the transmission +/// \param[in] bytesTransferred number of bytes transferred +void CTcpConnection::vReadHandler(const boost::system::error_code& err, size_t bytesTransferred) +{ + if(!err) + { + bool boSendResponse = true; + uint32_t u32TxMsgLen = sizeof(tstNetworkMessageHeader); + uint32_t u32Tmp; + const CNetworkDatacontainer::tstRxNetworkMessageMsg& stRxMsg + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenNumberOfNetworkMessageTypes))); + const CNetworkDatacontainer::tstReqStartPlugin& stReqStartPlugin + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStartPlugin))); + const CNetworkDatacontainer::tstReqStopPlugin& stReqStopPlugin + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStopPlugin))); + const CNetworkDatacontainer::tstReqImportLog& stReqImportLog + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqImportLog))); + CNetworkDatacontainer::tstRespImportLog& stRespImportLog + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespImportLog))); + const CNetworkDatacontainer::tstReqChangeMsgData& stReqChangeMsgData + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqChangeMsgData))); + CNetworkDatacontainer::tstRespEnumerateLogs& stRespEnumerateLogs + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateLogs))); + CNetworkDatacontainer::tstRespEnumerateInterfaces& stRespEnumerateInterfaces + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateInterfaces))); + + do + { + uint32_t u32Offset = m_xInStream.u32GetByteCount(); + sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream); + if( !m_xInStream.boIsOK() ) + { + tenStreamError enError; + m_xInStream.vGetError(enError); + ERROR_PRINT("network instream has error %d",enError); + break; + } + m_xInStream.vSetPosition(u32Offset); + // DEBUG_PRINT("Received %u bytes", bytesTransferred); + + switch( stRxMsg.stHeader.enMessageType ) + { + case nenReqEnumerateInterfaces: + DEBUG_PRINT("Received command nenReqEnumerateInterfaces"); + u32Tmp = nNETWORK_DATALENGTH; + sxReqEnumerateInterfacesMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + memset(stRespEnumerateInterfaces.acInterfaces,0,nNETWORK_DATALENGTH); + m_pxExecutor->vGetInterfaces( + stRespEnumerateInterfaces.acInterfaces + ,u32Tmp); + u32TxMsgLen += u32Tmp; + sxRespEnumerateInterfacesMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqShutdown: + DEBUG_PRINT("Received command nenReqShutdown"); + sxReqShutdownMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + m_pxExecutor->vStop(); + sxRespShutdownMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqStartPlugin: + DEBUG_PRINT("Received command nenReqStartPlugin"); + sxReqStartPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + m_pxExecutor->vStartPlugin(stReqStartPlugin.i32Interface + ,stReqStartPlugin.acFilename); + sxRespStartPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqStopPlugin: + DEBUG_PRINT("Received command nenReqStopPlugin"); + sxReqStopPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + m_pxExecutor->vStopPlugin(stReqStopPlugin.i32Interface); + sxRespStopPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqImportLog: + DEBUG_PRINT("Received command nenReqImportLog"); + // number of data bytes in transmit buffer available for string + u32Tmp = nNETWORK_DATALENGTH; + sxReqImportLogMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + memset(stRespImportLog.acFilename,0,nNETWORK_DATALENGTH); + m_pxExecutor->enGetLog( + stReqImportLog.acFilename + ,stRespImportLog.acFilename + ,u32Tmp); + u32TxMsgLen += u32Tmp; + sxRespImportLogMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqChangeMsgData: + sxReqChangeMsgDataMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + m_pxExecutor->vChangeMsgData(stReqChangeMsgData.i32Interface + ,stReqChangeMsgData.u32CanMsgId + ,stReqChangeMsgData.au8Data); + sxRespChangeMsgDataMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenReqEnumerateLogs: + DEBUG_PRINT("Received command nenReqEnumerateLogs"); + sxReqEnumerateLogsMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + // number of data bytes in transmit buffer available for string + u32Tmp = nNETWORK_DATALENGTH; + memset(stRespEnumerateLogs.acFilenames,0,nNETWORK_DATALENGTH); + m_pxExecutor->enEnumerateLogs(stRespEnumerateLogs.acFilenames,u32Tmp); + u32TxMsgLen += u32Tmp; + sxRespEnumarateLogsMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + case nenRespUploadLogMsg: +// DEBUG_PRINT("Received nenRespUploadLog"); + sxRespUploadLogMsgMessage.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + vUploadNextMessage(); + boSendResponse = false; + } + break; + default: + DEBUG_PRINT("Received unknown message type"); + sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream); + if( m_xInStream.boIsOK() ) + { + sxRespUnknownReqMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + } + break; + } + if( !m_xInStream.boIsOK() ) + { + tenStreamError enError; + m_xInStream.vGetError(enError); + ERROR_PRINT("network instream has error %d",enError); + } + if( boSendResponse ) + { + if( m_xOutStream.boIsOK() ) + { + boost::asio::async_write( + m_xSocket, + boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()), + boost::bind( + &CTcpConnection::vWriteHandler, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); + } + else + { + tenStreamError enError; + m_xOutStream.vGetError(enError); + ERROR_PRINT("network outstream has error %d",enError); + m_xOutStream.vReset(); + } + } + } while ( m_xInStream.u32GetByteCount() < bytesTransferred ); + vWaitForClientRequest(); + } + else + { + ERROR_PRINT("error: [%s]",err.message().c_str()); + DEBUG_PRINT("removing from connection list %d",m_u8ConNum); + // in case of an error, remove this object from the container + vSetActive(false); + g_astActiveConnections.erase(*this); + } +} + +/*! + * \brief send the next message in the buffer over the network + */ +void CTcpConnection::vUploadNextMessage() +{ + boost::lock_guard lock(m_xUploadBufferMutex); + if( m_boUploadActive ) + { + CNetworkDatacontainer::tstReqUploadLogMsg& stReqUploadLogMsg + = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqUploadLogMsg))); + stReqUploadLogMsg.stLogMessage = m_xUploadBuffer[--m_i32UploadBufferUnread]; + if( m_i32UploadBufferUnread == 0 ) + { + m_boUploadActive = false; + } +// DEBUG_PRINT("%u %x %d",stLogMsg.u32MsgId,stLogMsg.au8Data[0],m_i32UploadBufferUnread); + + sxReqUploadLogMsgMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); + if( m_xOutStream.boIsOK() ) + { + boost::asio::async_write( + m_xSocket, + boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()), + boost::bind( + &CTcpConnection::vWriteHandler, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred + ) + ); + } + else + { + tenStreamError enError; + m_xOutStream.vGetError(enError); + ERROR_PRINT("network outstream has error %d",enError); + m_xOutStream.vReset(); + } + } +} +/** + * \brief adds a logged message to the upload buffer and triggers start of upload + * \param[in] stLogMsg message to upload + * \warning should run in network task + */ +void CTcpConnection::vUploadLogMessage(const tstLogMessage& stLogMsg) +{ +// DEBUG_PRINT("entry"); + boost::lock_guard lock(m_xUploadBufferMutex); + m_xUploadBuffer.push_front(stLogMsg); + ++m_i32UploadBufferUnread; + if( !m_boUploadActive ) + { + m_boUploadActive = true; + vUploadNextMessage(); + } +// DEBUG_PRINT("exit"); +} -- cgit v1.2.3