/*! * \file TcpConnection.cpp * \author S. Eisenhauer * \date 27.10.2011 * \brief Implementation of CTcpConnection */ #include #include #include #include #include "TcpConnection.h" #include "CPluginExecutor.h" #include "protocol.h" #include "global.h" /// static memory for connection objects static char g_acConnections[nMAX_CONNECTIONS * sizeof(CTcpConnection)]; /// global connection container tConnectionSet g_astActiveConnections; /*! constructor * \param[in] io_service reference to the io_service object * \param[in] pxExecutor pointer to the main application * \param[in] u8ConNum number of the connection */ CTcpConnection::CTcpConnection (boost::asio::io_service& io_service , CPluginExecutor* pxExecutor , uint8_t u8ConNum) :m_xInStream(m_xRxBuff.elems,nNETWORK_BUFFSIZE) ,m_xOutStream(m_xTxBuff.elems,nNETWORK_BUFFSIZE) ,m_xSocket(io_service),m_pxExecutor(pxExecutor) ,m_u8ConNum(u8ConNum),m_boActive(false),m_xUploadBuffer(nUPLOAD_BUFFERSIZE) ,m_i32UploadBufferUnread(0),m_boUploadActive(false) { DEBUG_PRINT("entry %d",m_u8ConNum); DEBUG_PRINT("exit %d",m_u8ConNum); } /// destructor CTcpConnection::~CTcpConnection(void) { DEBUG_PRINT("entry %d",m_u8ConNum); DEBUG_PRINT("exit %d",m_u8ConNum); } /// static factory method for connections /// \param[in] xIoService io_service object /// \param[in] pxExecutor pointer to the main application /// \return pointer to connection object or NULL pointer /// \warning check returned pointer CTcpConnection::tConnectionPtr CTcpConnection::pxCreate(boost::asio::io_service& xIoService, CPluginExecutor* pxExecutor) { DEBUG_PRINT("created new tcp connection"); uint8_t u8ConCount = 0; char* pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ]; tConIter xConIter(g_astActiveConnections.begin()); tConnectionPtr pstNewConnection = NULL; // find free memory for a new connection for(; xConIter != g_astActiveConnections.end(); ++xConIter) { if( reinterpret_cast( &(*xConIter) ) != pcFreeMemorySlot ) { break; } u8ConCount++; pcFreeMemorySlot = &g_acConnections[ u8ConCount * sizeof(CTcpConnection) ]; } // if no memory is available for a new connection if( u8ConCount >= nMAX_CONNECTIONS) { // return null pointer return pstNewConnection; } DEBUG_PRINT("using connection %d at %#x",u8ConCount,(int) pcFreeMemorySlot); // place a new connection object at the found address pstNewConnection = new(pcFreeMemorySlot) CTcpConnection(xIoService,pxExecutor,u8ConCount); // place the new connection in the container g_astActiveConnections.insert(*pstNewConnection); // return a pointer to the new connection object return pstNewConnection; } /// return network socket boost::asio::ip::tcp::socket& CTcpConnection::xGetSocket() { return m_xSocket; } /// async wait for a new request from the client void CTcpConnection::vWaitForClientRequest() { // initialize receive buffer memset(m_xRxBuff.elems,0,nNETWORK_BUFFSIZE); // reset instream m_xInStream.vReset(); // async wait, register read handler boost::asio::async_read( m_xSocket, boost::asio::buffer(m_xRxBuff), boost::asio::transfer_at_least( sizeof(tstNetworkMessageHeader) ), boost::bind(&CTcpConnection::vReadHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ); } /// handle data written event /// \param[in] err error code of the transmission /// \param[in] bytesTransferred number of bytes transferred void CTcpConnection::vWriteHandler(const boost::system::error_code& err, size_t bytesTransferred) { if(!err) { // DEBUG_PRINT("Transferred %d bytes",bytesTransferred); // initialize transmit buffer memset(m_xTxBuff.elems,0,nNETWORK_BUFFSIZE); //reset outstream m_xOutStream.vReset(); vUploadNextMessage(); } else { DEBUG_PRINT("error: [%s]",err.message().c_str()); DEBUG_PRINT("removing from connection list %d",m_u8ConNum); // in case of an error, remove this object from the container g_astActiveConnections.erase(*this); vSetActive(false); } } /// handle data read event /// \param[in] err error code of the transmission /// \param[in] bytesTransferred number of bytes transferred void CTcpConnection::vReadHandler(const boost::system::error_code& err, size_t bytesTransferred) { if(!err) { bool boSendResponse = true; uint32_t u32TxMsgLen = sizeof(tstNetworkMessageHeader); uint32_t u32Tmp; const CNetworkDatacontainer::tstRxNetworkMessageMsg& stRxMsg = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenNumberOfNetworkMessageTypes))); const CNetworkDatacontainer::tstReqStartPlugin& stReqStartPlugin = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStartPlugin))); const CNetworkDatacontainer::tstReqStopPlugin& stReqStopPlugin = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqStopPlugin))); const CNetworkDatacontainer::tstReqImportLog& stReqImportLog = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqImportLog))); CNetworkDatacontainer::tstRespImportLog& stRespImportLog = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespImportLog))); const CNetworkDatacontainer::tstReqChangeMsgData& stReqChangeMsgData = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqChangeMsgData))); CNetworkDatacontainer::tstRespEnumerateLogs& stRespEnumerateLogs = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateLogs))); CNetworkDatacontainer::tstRespEnumerateInterfaces& stRespEnumerateInterfaces = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenRespEnumerateInterfaces))); do { uint32_t u32Offset = m_xInStream.u32GetByteCount(); sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream); if( !m_xInStream.boIsOK() ) { tenStreamError enError; m_xInStream.vGetError(enError); ERROR_PRINT("network instream has error %d",enError); break; } m_xInStream.vSetPosition(u32Offset); // DEBUG_PRINT("Received %u bytes", bytesTransferred); switch( stRxMsg.stHeader.enMessageType ) { case nenReqEnumerateInterfaces: DEBUG_PRINT("Received command nenReqEnumerateInterfaces"); u32Tmp = nNETWORK_DATALENGTH; sxReqEnumerateInterfacesMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { memset(stRespEnumerateInterfaces.acInterfaces,0,nNETWORK_DATALENGTH); m_pxExecutor->vGetInterfaces( stRespEnumerateInterfaces.acInterfaces ,u32Tmp); u32TxMsgLen += u32Tmp; sxRespEnumerateInterfacesMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqShutdown: DEBUG_PRINT("Received command nenReqShutdown"); sxReqShutdownMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { m_pxExecutor->vStop(); sxRespShutdownMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqStartPlugin: DEBUG_PRINT("Received command nenReqStartPlugin"); sxReqStartPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { m_pxExecutor->vStartPlugin(stReqStartPlugin.i32Interface ,stReqStartPlugin.acFilename); sxRespStartPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqStopPlugin: DEBUG_PRINT("Received command nenReqStopPlugin"); sxReqStopPluginMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { m_pxExecutor->vStopPlugin(stReqStopPlugin.i32Interface); sxRespStopPluginMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqImportLog: DEBUG_PRINT("Received command nenReqImportLog"); // number of data bytes in transmit buffer available for string u32Tmp = nNETWORK_DATALENGTH; sxReqImportLogMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { memset(stRespImportLog.acFilename,0,nNETWORK_DATALENGTH); m_pxExecutor->enGetLog( stReqImportLog.acFilename ,stRespImportLog.acFilename ,u32Tmp); u32TxMsgLen += u32Tmp; sxRespImportLogMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqChangeMsgData: sxReqChangeMsgDataMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { m_pxExecutor->vChangeMsgData(stReqChangeMsgData.i32Interface ,stReqChangeMsgData.u32CanMsgId ,stReqChangeMsgData.au8Data); sxRespChangeMsgDataMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenReqEnumerateLogs: DEBUG_PRINT("Received command nenReqEnumerateLogs"); sxReqEnumerateLogsMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { // number of data bytes in transmit buffer available for string u32Tmp = nNETWORK_DATALENGTH; memset(stRespEnumerateLogs.acFilenames,0,nNETWORK_DATALENGTH); m_pxExecutor->enEnumerateLogs(stRespEnumerateLogs.acFilenames,u32Tmp); u32TxMsgLen += u32Tmp; sxRespEnumarateLogsMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; case nenRespUploadLogMsg: // DEBUG_PRINT("Received nenRespUploadLog"); sxRespUploadLogMsgMessage.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { vUploadNextMessage(); boSendResponse = false; } break; default: DEBUG_PRINT("Received unknown message type"); sxRxMsgHeader.vRead(m_xNetworkDatacontainer,m_xInStream); if( m_xInStream.boIsOK() ) { sxRespUnknownReqMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); } break; } if( !m_xInStream.boIsOK() ) { tenStreamError enError; m_xInStream.vGetError(enError); ERROR_PRINT("network instream has error %d",enError); } if( boSendResponse ) { if( m_xOutStream.boIsOK() ) { boost::asio::async_write( m_xSocket, boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()), boost::bind( &CTcpConnection::vWriteHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ); } else { tenStreamError enError; m_xOutStream.vGetError(enError); ERROR_PRINT("network outstream has error %d",enError); m_xOutStream.vReset(); } } } while ( m_xInStream.u32GetByteCount() < bytesTransferred ); vWaitForClientRequest(); } else { ERROR_PRINT("error: [%s]",err.message().c_str()); DEBUG_PRINT("removing from connection list %d",m_u8ConNum); // in case of an error, remove this object from the container vSetActive(false); g_astActiveConnections.erase(*this); } } /*! * \brief send the next message in the buffer over the network */ void CTcpConnection::vUploadNextMessage() { boost::lock_guard lock(m_xUploadBufferMutex); if( m_boUploadActive ) { CNetworkDatacontainer::tstReqUploadLogMsg& stReqUploadLogMsg = *(reinterpret_cast(m_xNetworkDatacontainer.pvGetVarPtr(nenReqUploadLogMsg))); stReqUploadLogMsg.stLogMessage = m_xUploadBuffer[--m_i32UploadBufferUnread]; if( m_i32UploadBufferUnread == 0 ) { m_boUploadActive = false; } // DEBUG_PRINT("%u %x %d",stLogMsg.u32MsgId,stLogMsg.au8Data[0],m_i32UploadBufferUnread); sxReqUploadLogMsgMessage.vWrite(m_xNetworkDatacontainer,m_xOutStream); if( m_xOutStream.boIsOK() ) { boost::asio::async_write( m_xSocket, boost::asio::buffer(m_xTxBuff,m_xOutStream.u32GetByteCount()), boost::bind( &CTcpConnection::vWriteHandler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) ); } else { tenStreamError enError; m_xOutStream.vGetError(enError); ERROR_PRINT("network outstream has error %d",enError); m_xOutStream.vReset(); } } } /** * \brief adds a logged message to the upload buffer and triggers start of upload * \param[in] stLogMsg message to upload * \warning should run in network task */ void CTcpConnection::vUploadLogMessage(const tstLogMessage& stLogMsg) { // DEBUG_PRINT("entry"); boost::lock_guard lock(m_xUploadBufferMutex); m_xUploadBuffer.push_front(stLogMsg); ++m_i32UploadBufferUnread; if( !m_boUploadActive ) { m_boUploadActive = true; vUploadNextMessage(); } // DEBUG_PRINT("exit"); }