diff --git a/examples/SharedMemory/PhysicsClientTCP.cpp b/examples/SharedMemory/PhysicsClientTCP.cpp new file mode 100644 index 000000000..35fc08964 --- /dev/null +++ b/examples/SharedMemory/PhysicsClientTCP.cpp @@ -0,0 +1,496 @@ +#include "PhysicsClientTCP.h" + +#include "ActiveSocket.h" + +#include +#include +#include "../Utils/b3Clock.h" +#include "PhysicsClient.h" +//#include "LinearMath/btVector3.h" +#include "SharedMemoryCommands.h" +#include +#include "Bullet3Common/b3Logging.h" +#include "../MultiThreading/b3ThreadSupportInterface.h" +void TCPThreadFunc(void* userPtr, void* lsMemory); +void* TCPlsMemoryFunc(); +bool gVerboseNetworkMessagesClient2 = false; + +#ifndef _WIN32 +#include "../MultiThreading/b3PosixThreadSupport.h" + +b3ThreadSupportInterface* createTCPThreadSupport(int numThreads) +{ + b3PosixThreadSupport::ThreadConstructionInfo constructionInfo("TCPThread", + TCPThreadFunc, + TCPlsMemoryFunc, + numThreads); + b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport(constructionInfo); + + return threadSupport; + +} + + +#elif defined( _WIN32) +#include "../MultiThreading/b3Win32ThreadSupport.h" + +b3ThreadSupportInterface* createTCPThreadSupport(int numThreads) +{ + b3Win32ThreadSupport::Win32ThreadConstructionInfo threadConstructionInfo("TCPThread", TCPThreadFunc, TCPlsMemoryFunc, numThreads); + b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport(threadConstructionInfo); + return threadSupport; + +} +#endif + + + +struct TCPThreadLocalStorage +{ + int threadId; +}; + + + +unsigned int b3DeserializeInt2(const unsigned char* input) +{ + unsigned int tmp = (input[3] << 24) + (input[2] << 16) + (input[1] << 8) + input[0]; + return tmp; +} + +struct TcpNetworkedInternalData +{ + /* + ENetHost* m_client; + ENetAddress m_address; + ENetPeer* m_peer; + ENetEvent m_event; + */ + CActiveSocket m_tcpSocket; + + bool m_isConnected; + + b3ThreadSupportInterface* m_threadSupport; + + b3CriticalSection* m_cs; + + TcpNetworkedInternalData* m_tcpInternalData; + + + SharedMemoryCommand m_clientCmd; + bool m_hasCommand; + + bool m_hasStatus; + SharedMemoryStatus m_lastStatus; + b3AlignedObjectArray m_stream; + + std::string m_hostName; + int m_port; + + TcpNetworkedInternalData() + : + m_isConnected(false), + m_threadSupport(0), + m_hasCommand(false), + m_hasStatus(false) + { + + } + + bool connectTCP() + { + if (m_isConnected) + return true; + + m_tcpSocket.Initialize(); + + m_isConnected = m_tcpSocket.Open(m_hostName.c_str(),m_port); + + m_isConnected = true; + return m_isConnected; + } + + bool checkData() + { + bool hasStatus = false; + + //int serviceResult = enet_host_service(m_client, &m_event, 0); + int maxLen = 4 + sizeof(SharedMemoryStatus)+SHARED_MEMORY_MAX_STREAM_CHUNK_SIZE; + + int recBytes = m_tcpSocket.Receive(maxLen); + + if (gVerboseNetworkMessagesClient2) + { + printf("A packet of length %d bytes received\n", recBytes); + } + + unsigned char* data = (unsigned char*)m_tcpSocket.GetData(); + + int packetSizeInBytes = b3DeserializeInt2(data); + + if (packetSizeInBytes == recBytes) + { + + SharedMemoryStatus* statPtr = (SharedMemoryStatus*)&data[4]; + if (statPtr->m_type == CMD_STEP_FORWARD_SIMULATION_COMPLETED) + { + SharedMemoryStatus dummy; + dummy.m_type = CMD_STEP_FORWARD_SIMULATION_COMPLETED; + m_lastStatus = dummy; + m_stream.resize(0); + } + else + { + + m_lastStatus = *statPtr; + int streamOffsetInBytes = 4 + sizeof(SharedMemoryStatus); + int numStreamBytes = packetSizeInBytes - streamOffsetInBytes; + m_stream.resize(numStreamBytes); + for (int i = 0; i < numStreamBytes; i++) + { + m_stream[i] = data[i + streamOffsetInBytes]; + } + } + } + else + { + printf("unknown status message received\n"); + } + + return hasStatus; + } + +}; + +enum TCPThreadEnums +{ + eTCPRequestTerminate = 13, + eTCPIsUnInitialized, + eTCPIsInitialized, + eTCPInitializationFailed, + eTCPHasTerminated +}; + + + +enum TCPCommandEnums +{ + eTCPIdle = 13, + eTCP_ConnectRequest, + eTCP_Connected, + eTCP_ConnectionFailed, + eTCP_DisconnectRequest, + eTCP_Disconnected, + +}; + +void TCPThreadFunc(void* userPtr, void* lsMemory) +{ + printf("TCPThreadFunc thread started\n"); + + TcpNetworkedInternalData* args = (TcpNetworkedInternalData*)userPtr; +// int workLeft = true; + b3Clock clock; + clock.reset(); + bool init = true; + if (init) + { + + args->m_cs->lock(); + args->m_cs->setSharedParam(0, eTCPIsInitialized); + args->m_cs->unlock(); + + + double deltaTimeInSeconds = 0; + + do + { + b3Clock::usleep(0); + + deltaTimeInSeconds += double(clock.getTimeMicroseconds()) / 1000000.; + + { + + clock.reset(); + deltaTimeInSeconds = 0.f; + switch (args->m_cs->getSharedParam(1)) + { + case eTCP_ConnectRequest: + { + bool connected = args->connectTCP(); + if (connected) + { + args->m_cs->setSharedParam(1, eTCP_Connected); + } + else + { + args->m_cs->setSharedParam(1, eTCP_ConnectionFailed); + } + break; + } + default: + { + } + }; + + if (args->m_isConnected) + { + + args->m_cs->lock(); + bool hasCommand = args->m_hasCommand; + args->m_cs->unlock(); + + + if (hasCommand) + { + int sz = 0; + unsigned char* data = 0; + + + if (args->m_clientCmd.m_type == CMD_STEP_FORWARD_SIMULATION) + { + sz = sizeof(int); + data = (unsigned char*) &args->m_clientCmd.m_type; + } + else + { + sz = sizeof(SharedMemoryCommand); + data = (unsigned char*)&args->m_clientCmd; + } + int res; + + args->m_tcpSocket.Send((const uint8 *)data,sz); + + args->m_cs->lock(); + args->m_hasCommand = false; + args->m_cs->unlock(); + } + + + bool hasNewStatus = args->checkData(); + if (hasNewStatus) + { + if (args->m_hasStatus) + { + //overflow: last status hasn't been processed yet + b3Assert(0); + printf("Error: received new status but previous status not processed yet"); + } + else + { + args->m_cs->lock(); + args->m_hasStatus = hasNewStatus; + args->m_cs->unlock(); + } + } + } + + } + + } while (args->m_cs->getSharedParam(0) != eTCPRequestTerminate); + } + else + { + args->m_cs->lock(); + args->m_cs->setSharedParam(0, eTCPInitializationFailed); + args->m_cs->unlock(); + } + + + printf("finished\n"); + +} + + + +void* TCPlsMemoryFunc() +{ + //don't create local store memory, just return 0 + return new TCPThreadLocalStorage; +} + + + + + + + +TcpNetworkedPhysicsProcessor::TcpNetworkedPhysicsProcessor(const char* hostName, int port) +{ + m_data = new TcpNetworkedInternalData; + if (hostName) + { + m_data->m_hostName = hostName; + } + m_data->m_port = port; + +} + +TcpNetworkedPhysicsProcessor::~TcpNetworkedPhysicsProcessor() +{ + disconnect(); + delete m_data; +} + +bool TcpNetworkedPhysicsProcessor::processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes) +{ + if(gVerboseNetworkMessagesClient2) + { + printf("PhysicsClientTCP::processCommand\n"); + } +// int sz = sizeof(SharedMemoryCommand); + int timeout = 1024 * 1024 * 1024; + + m_data->m_cs->lock(); + m_data->m_clientCmd = clientCmd; + m_data->m_hasCommand = true; + m_data->m_cs->unlock(); + + while (m_data->m_hasCommand && (timeout-- > 0)) + { + b3Clock::usleep(0); + } + +#if 0 + + timeout = 1024 * 1024 * 1024; + + bool hasStatus = false; + + const SharedMemoryStatus* stat = 0; + while ((!hasStatus) && (timeout-- > 0)) + { + hasStatus = receiveStatus(serverStatusOut, bufferServerToClient, bufferSizeInBytes); + b3Clock::usleep(100); + } + return hasStatus; + +#endif + + return false; +} + +bool TcpNetworkedPhysicsProcessor::receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes) +{ + bool hasStatus = false; + if (m_data->m_hasStatus) + { + if (gVerboseNetworkMessagesClient2) + { + printf("TcpNetworkedPhysicsProcessor::receiveStatus\n"); + } + + hasStatus = true; + serverStatusOut = m_data->m_lastStatus; + int numStreamBytes = m_data->m_stream.size(); + + if (numStreamBytes < bufferSizeInBytes) + { + for (int i = 0; i < numStreamBytes; i++) + { + bufferServerToClient[i] = m_data->m_stream[i]; + } + } + else + { + printf("Error: steam buffer overflow\n"); + } + + m_data->m_cs->lock(); + m_data->m_hasStatus = false; + m_data->m_cs->unlock(); + } + + + return hasStatus; + +} + + +void TcpNetworkedPhysicsProcessor::renderScene() +{ +} + +void TcpNetworkedPhysicsProcessor::physicsDebugDraw(int debugDrawFlags) +{ +} + +void TcpNetworkedPhysicsProcessor::setGuiHelper(struct GUIHelperInterface* guiHelper) +{ +} + +bool TcpNetworkedPhysicsProcessor::isConnected() const +{ + return m_data->m_isConnected; +} + + +bool TcpNetworkedPhysicsProcessor::connect() +{ + if (m_data->m_threadSupport==0) + { + m_data->m_threadSupport = createTCPThreadSupport(1); + + m_data->m_cs = m_data->m_threadSupport->createCriticalSection(); + m_data->m_cs->setSharedParam(0, eTCPIsUnInitialized); + m_data->m_threadSupport->runTask(B3_THREAD_SCHEDULE_TASK, (void*) m_data, 0); + + while (m_data->m_cs->getSharedParam(0) == eTCPIsUnInitialized) + { + b3Clock::usleep(1000); + } + + m_data->m_cs->lock(); + m_data->m_cs->setSharedParam(1, eTCP_ConnectRequest); + m_data->m_cs->unlock(); + + while (m_data->m_cs->getSharedParam(1) == eTCP_ConnectRequest) + { + b3Clock::usleep(1000); + } + + } + unsigned int sharedParam = m_data->m_cs->getSharedParam(1); + bool isConnected = (sharedParam == eTCP_Connected); + return isConnected; +} + +void TcpNetworkedPhysicsProcessor::disconnect() +{ + if (m_data->m_threadSupport) + { + m_data->m_cs->lock(); + m_data->m_cs->setSharedParam(0, eTCPRequestTerminate); + m_data->m_cs->unlock(); + + int numActiveThreads = 1; + + while (numActiveThreads) + { + int arg0, arg1; + if (m_data->m_threadSupport->isTaskCompleted(&arg0, &arg1, 0)) + { + numActiveThreads--; + printf("numActiveThreads = %d\n", numActiveThreads); + } + else + { + b3Clock::usleep(1000); + } + }; + + printf("stopping threads\n"); + + delete m_data->m_threadSupport; + m_data->m_threadSupport = 0; + m_data->m_isConnected = false; + } + + + +} + + + + + diff --git a/examples/SharedMemory/PhysicsClientTCP.h b/examples/SharedMemory/PhysicsClientTCP.h new file mode 100644 index 000000000..26dd45b58 --- /dev/null +++ b/examples/SharedMemory/PhysicsClientTCP.h @@ -0,0 +1,37 @@ +#ifndef PHYSICS_CLIENT_TCP_H +#define PHYSICS_CLIENT_TCP_H + +#include "PhysicsDirect.h" +#include "PhysicsServerCommandProcessor.h" + +class TcpNetworkedPhysicsProcessor : public PhysicsCommandProcessorInterface +{ + + struct TcpNetworkedInternalData* m_data; + +public: + TcpNetworkedPhysicsProcessor(const char* hostName, int port); + + virtual ~TcpNetworkedPhysicsProcessor(); + + virtual bool connect(); + + virtual void disconnect(); + + virtual bool isConnected() const; + + virtual bool processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes); + + virtual bool receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes); + + virtual void renderScene(); + + virtual void physicsDebugDraw(int debugDrawFlags); + + virtual void setGuiHelper(struct GUIHelperInterface* guiHelper); + +}; + + +#endif //PHYSICS_CLIENT_TCP_H + diff --git a/examples/SharedMemory/PhysicsClientTCP_C_API.cpp b/examples/SharedMemory/PhysicsClientTCP_C_API.cpp new file mode 100644 index 000000000..e333b967a --- /dev/null +++ b/examples/SharedMemory/PhysicsClientTCP_C_API.cpp @@ -0,0 +1,29 @@ + +#include "PhysicsClientTCP_C_API.h" +#include "PhysicsClientTCP.h" +#include "PhysicsDirect.h" +#include + +b3PhysicsClientHandle b3ConnectPhysicsTCP(const char* hostName, int port) +{ + + TcpNetworkedPhysicsProcessor* tcp = new TcpNetworkedPhysicsProcessor(hostName, port); + + PhysicsDirect* direct = new PhysicsDirect(tcp, true); + + bool connected; + connected = direct->connect(); + if (connected) + { + printf("b3ConnectPhysicsTCP connected successfully.\n"); + } + else + { + printf("b3ConnectPhysicsTCP connection failed.\n"); + + } + return (b3PhysicsClientHandle)direct; +} + + + diff --git a/examples/SharedMemory/PhysicsClientTCP_C_API.h b/examples/SharedMemory/PhysicsClientTCP_C_API.h new file mode 100644 index 000000000..dee180377 --- /dev/null +++ b/examples/SharedMemory/PhysicsClientTCP_C_API.h @@ -0,0 +1,19 @@ +#ifndef PHYSICS_CLIENT_TCP_C_API_H +#define PHYSICS_CLIENT_TCP_C_API_H + +#include "PhysicsClientC_API.h" + +#ifdef __cplusplus +extern "C" { +#endif + + + ///send physics commands using TCP networking + b3PhysicsClientHandle b3ConnectPhysicsTCP(const char* hostName, int port); + + +#ifdef __cplusplus +} +#endif + +#endif //PHYSICS_CLIENT_TCP_C_API_H diff --git a/examples/pybullet/premake4.lua b/examples/pybullet/premake4.lua index a47016c8b..d424ef209 100644 --- a/examples/pybullet/premake4.lua +++ b/examples/pybullet/premake4.lua @@ -59,6 +59,32 @@ if not _OPTIONS["no-enet"] then defines {"BT_ENABLE_ENET"} end + if not _OPTIONS["no-clsocket"] then + + includedirs {"../../examples/ThirdPartyLibs/clsocket/src"} + + if os.is("Windows") then + defines { "WIN32" } + links {"Ws2_32","Winmm"} + end + if os.is("Linux") then + defines {"_LINUX"} + end + if os.is("MacOSX") then + defines {"_DARWIN"} + end + + links {"clsocket"} + + files { + "../../examples/SharedMemory/PhysicsClientTCP.cpp", + "../../examples/SharedMemory/PhysicsClientTCP.h", + "../../examples/SharedMemory/PhysicsClientTCP_C_API.cpp", + "../../examples/SharedMemory/PhysicsClientTCP_C_API.h", + } + defines {"BT_ENABLE_CLSOCKET"} + end + files { "pybullet.c", diff --git a/test/SharedMemory/premake4.lua b/test/SharedMemory/premake4.lua index b886a9650..cfe86780e 100644 --- a/test/SharedMemory/premake4.lua +++ b/test/SharedMemory/premake4.lua @@ -80,6 +80,68 @@ project ("Test_PhysicsClientUDP") "../../examples/MultiThreading/b3ThreadSupportInterface.cpp", } + +project ("Test_PhysicsClientTCP") + + language "C++" + kind "ConsoleApp" + + includedirs { + "../../src", + "../../examples", + "../../examples/ThirdPartyLibs/clsocket/src" + } + links { + "clsocket", + "BulletFileLoader", + "Bullet3Common", + "LinearMath" + } + if os.is("Windows") then + defines { "WIN32" } + links {"Ws2_32","Winmm"} + end + + if os.is("Windows") then + defines { "WIN32","_WINSOCK_DEPRECATED_NO_WARNINGS" } + end + if os.is("Linux") then + defines {"_LINUX"} + end + if os.is("MacOSX") then + defines {"_DARWIN"} + end + + defines {"PHYSICS_TCP"} + + files { + "test.c", + "../../examples/SharedMemory/PhysicsClient.cpp", + "../../examples/SharedMemory/PhysicsClient.h", + "../../examples/SharedMemory/PhysicsClientSharedMemory.cpp", + "../../examples/SharedMemory/PhysicsClientSharedMemory.h", + "../../examples/SharedMemory/PhysicsClientSharedMemory_C_API.cpp", + "../../examples/SharedMemory/PhysicsClientSharedMemory_C_API.h", + "../../examples/SharedMemory/PhysicsClientTCP.cpp", + "../../examples/SharedMemory/PhysicsClientTCP.h", + "../../examples/SharedMemory/PhysicsClientTCP_C_API.cpp", + "../../examples/SharedMemory/PhysicsClientTCP_C_API.h", + "../../examples/SharedMemory/PhysicsClientSharedMemory_C_API.h", + "../../examples/SharedMemory/PhysicsClientC_API.cpp", + "../../examples/SharedMemory/PhysicsClientC_API.h", + "../../examples/SharedMemory/Win32SharedMemory.cpp", + "../../examples/SharedMemory/Win32SharedMemory.h", + "../../examples/SharedMemory/PosixSharedMemory.cpp", + "../../examples/SharedMemory/PosixSharedMemory.h", + "../../examples/Utils/b3ResourcePath.cpp", + "../../examples/Utils/b3ResourcePath.h", + "../../examples/SharedMemory/PhysicsDirect.cpp", + "../../examples/Utils/b3Clock.cpp", + "../../examples/MultiThreading/b3PosixThreadSupport.cpp", + "../../examples/MultiThreading/b3Win32ThreadSupport.cpp", + "../../examples/MultiThreading/b3ThreadSupportInterface.cpp", + } + project ("Test_PhysicsServerLoopBack") diff --git a/test/SharedMemory/test.c b/test/SharedMemory/test.c index 211daf0df..e475a4705 100644 --- a/test/SharedMemory/test.c +++ b/test/SharedMemory/test.c @@ -8,6 +8,10 @@ #include "SharedMemory/PhysicsClientUDP_C_API.h" #endif//PHYSICS_UDP +#ifdef PHYSICS_TCP +#include "SharedMemory/PhysicsClientTCP_C_API.h" +#endif//PHYSICS_TCP + #ifdef PHYSICS_LOOP_BACK #include "SharedMemory/PhysicsLoopBackC_API.h" #endif //PHYSICS_LOOP_BACK @@ -345,6 +349,10 @@ int main(int argc, char* argv[]) b3PhysicsClientHandle sm = b3ConnectPhysicsUDP("localhost",1234); #endif //PHYSICS_UDP +#ifdef PHYSICS_TCP + b3PhysicsClientHandle sm = b3ConnectPhysicsTCP("localhost",6667); +#endif //PHYSICS_UDP + testSharedMemory(sm); } #endif