Urbi SDK Remote for C++  2.7.5
uclient.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2005-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00012 
00013 #if !defined WIN32
00014 # include <libport/ctime>
00015 # include <libport/csignal>
00016 #endif
00017 
00018 #include <boost/lambda/bind.hpp>
00019 
00020 #include <libport/boost-error.hh>
00021 #include <libport/format.hh>
00022 
00023 #include <urbi/uclient.hh>
00024 #include <urbi/utag.hh>
00025 
00026 #include <liburbi/compatibility.hh>
00027 
00028 GD_CATEGORY(Urbi.Client);
00029 
00030 namespace urbi
00031 {
00032 
00033   /*-------------------.
00034   | UClient::options.  |
00035   `-------------------*/
00036 
00037   UClient::options::options(bool server)
00038     : server_(server)
00039       // Unless stated otherwise, auto start.
00040     , start_(true)
00041     , asynchronous_(false)
00042   {
00043   }
00044 
00045   UCLIENT_OPTION_IMPL(UClient, bool, server)
00046   UCLIENT_OPTION_IMPL(UClient, bool, start)
00047   UCLIENT_OPTION_IMPL(UClient, bool, asynchronous)
00048 
00049   /*----------.
00050   | UClient.  |
00051   `----------*/
00052 
00053   UClient::UClient(const std::string& host, unsigned port,
00054                    size_t buflen,
00055                    const options& opt)
00056     : UAbstractClient(host, port, buflen, opt.server())
00057     , ping_interval_(0)
00058     , pong_timeout_(0)
00059     , link_(new UClient*(this))
00060     , ping_sent_(libport::utime())
00061     , ping_sem_(0)
00062     , asynchronous_(opt.asynchronous())
00063     , synchronous_send_(false)
00064   {
00065     if (opt.start())
00066       start();
00067   }
00068 
00069   UClient::error_type
00070   UClient::start()
00071   {
00072     return rc = server_ ? listen_() : connect_();
00073   }
00074 
00075   UClient::error_type
00076   UClient::connect_()
00077   {
00078     if (boost::system::error_code erc = connect(host_, port_, false, 0,
00079                                                 asynchronous_))
00080     {
00081       libport::boost_error(libport::format("UClient::UClient connect(%s, %s)",
00082                                            host_, port_),
00083                            erc);
00084       return -1;
00085     }
00086     else
00087       return 0;
00088   }
00089 
00090   UClient::error_type
00091   UClient::listen_()
00092   {
00093     if (boost::system::error_code erc =
00094         listen(boost::bind(&UClient::mySocketFactory, this), host_, port_))
00095     {
00096       libport::boost_error(libport::format("UClient::UClient listen(%s, %s)",
00097                                            host_, port_),
00098                            erc);
00099       return -1;
00100     }
00101     else
00102       return 0;
00103   }
00104 
00105   UClient::~UClient()
00106   {
00107     *link_ = 0;
00108     closeUClient();
00109   }
00110 
00111   UClient::error_type
00112   UClient::onClose()
00113   {
00114     if (!closed_)
00115       UAbstractClient::onClose();
00116     return !!closed_;
00117   }
00118 
00119   UClient::error_type
00120   UClient::closeUClient()
00121   {
00122     close();
00123     onClose();
00124     return 0;
00125   }
00126 
00127   UClient::error_type
00128   UClient::effectiveSend(const void* buffer, size_t size)
00129   {
00130     if (rc)
00131       return -1;
00132     if (synchronous_send_)
00133       libport::Socket::syncWrite(buffer, size);
00134     else
00135       libport::Socket::write(buffer, size);
00136     return 0;
00137   }
00138 
00139   libport::Socket*
00140   UClient::mySocketFactory()
00141   {
00142     return this;
00143   }
00144 
00145   void
00146   UClient::onConnect()
00147   {
00148     init_ = true;
00149     onConnection();
00150 
00151     // Declare ping channel for kernel that requires it.  Do not try
00152     // to depend on kernelMajor, because it has not been computed yet.
00153     // And computing kernelMajor requires this code to be run.  So we
00154     // need to write something that both k1 and k2 will like.
00155     send(SYNCLINE_WRAP(
00156            "if (isdef(Channel))\n"
00157            "  var lobby.%s = Channel.new(\"%s\")|;",
00158            internalPongTag, internalPongTag));
00159     // The folowwing calls may fail if we got disconnected.
00160     try
00161     {
00162       host_ = getRemoteHost();
00163       port_ = getRemotePort();
00164     }
00165     catch (const std::exception& e)
00166     {
00167       // Ignore the error, next read attempt will trigger onError.
00168       GD_FINFO_DUMP("ignore std::exception: %s", e.what());
00169     }
00170     if (ping_interval_)
00171       sendPing(link_);
00172   }
00173 
00174   void
00175   UClient::onError(boost::system::error_code erc)
00176   {
00177     rc = -1;
00178     resetAsyncCalls_();
00179     clientError("!!! " + erc.message());
00180     notifyCallbacks(UMessage(*this, 0, CLIENTERROR_TAG,
00181                              "!!! " + erc.message()));
00182     return;
00183   }
00184 
00185   size_t
00186   UClient::onRead(const void* data, size_t length)
00187   {
00188     size_t capacity = recvBufSize - recvBufferPosition - 1;
00189 
00190     if (ping_interval_ && ping_sem_.uget(1))
00191     {
00192       pong_timeout_handler_->cancel();
00193       send_ping_handler_ =
00194         libport::asyncCall(boost::bind(&UClient::sendPing,
00195                                        this, link_),
00196                            ping_interval_ - (libport::utime() - ping_sent_));
00197     }
00198     if (capacity < length)
00199     {
00200       size_t nsz = std::max(recvBufSize*2, recvBufferPosition + length+1);
00201       char* nbuf = new char[nsz];
00202       memcpy(nbuf, recvBuffer, recvBufferPosition);
00203       delete[] recvBuffer;
00204       recvBuffer = nbuf;
00205       recvBufSize = nsz;
00206     }
00207     memcpy(&recvBuffer[recvBufferPosition], data, length);
00208     recvBufferPosition += length;
00209     recvBuffer[recvBufferPosition] = 0;
00210     processRecvBuffer();
00211     return length;
00212   }
00213 
00214   void
00215   UClient::pongTimeout(link_type l)
00216   {
00217     if (*l)
00218     {
00219       const char* err = "!!! Lost connection with server: ping timeout";
00220       // FIXME: Choose between two differents way to alert user program.
00221       clientError(err);
00222       notifyCallbacks(UMessage(*this, 0, connectionTimeoutTag, err));
00223       close();
00224     }
00225   }
00226 
00227   void
00228   UClient::sendPing(link_type l)
00229   {
00230     if (*l)
00231     {
00232       pong_timeout_handler_ =
00233         libport::asyncCall(boost::bind(&UClient::pongTimeout, this, link_),
00234                            pong_timeout_);
00235       send("%s << 1,", internalPongTag);
00236       ping_sent_ = libport::utime();
00237       ping_sem_++;
00238     }
00239   }
00240 
00241   void
00242   UClient::printf(const char * format, ...)
00243   {
00244     va_list arg;
00245     va_start(arg, format);
00246     vfprintf(stderr, format, arg);
00247     va_end(arg);
00248   }
00249 
00250   unsigned int UClient::getCurrentTime() const
00251   {
00252   // FIXME: Put this into libport.
00253 #ifdef WIN32
00254     return GetTickCount();
00255 #else
00256     struct timeval tv;
00257     gettimeofday(&tv, NULL);
00258     return tv.tv_sec*1000+tv.tv_usec/1000;
00259 #endif
00260   }
00261 
00262   void
00263   UClient::setKeepAliveCheck(unsigned ping_interval,
00264                              unsigned pong_timeout)
00265   {
00266     // Always interrupt previous ping handler.
00267     resetAsyncCalls_();
00268     // From milliseconds to microseconds.
00269     ping_interval_ = ping_interval * 1000;
00270     pong_timeout_  = pong_timeout * 1000;
00271     if (ping_interval_)
00272       sendPing(link_);
00273   }
00274 
00275   void
00276   UClient::resetAsyncCalls_()
00277   {
00278     if (pong_timeout_handler_)
00279     {
00280       pong_timeout_handler_->cancel();
00281       pong_timeout_handler_.reset();
00282     }
00283     if (send_ping_handler_)
00284     {
00285       send_ping_handler_->cancel();
00286       send_ping_handler_.reset();
00287     }
00288   }
00289 
00290   void
00291   UClient::waitForKernelVersion() const
00292   {
00293     // FIXME: use a condition.
00294     while (kernelMajor_ < 0 && !error())
00295       sleep(100000);
00296   }
00297 
00298   void
00299   UClient::setSynchronousSend(bool enable)
00300   {
00301     synchronous_send_ = enable;
00302   }
00303 
00304 
00305 
00306 /*-----------------------.
00307 | Standalone functions.  |
00308 `-----------------------*/
00309 
00310   void execute()
00311   {
00312     while (true)
00313       sleep(100);
00314   }
00315 
00316   void exit(int code)
00317   {
00318     ::exit(code);
00319   }
00320 
00321   UClient&
00322   connect(const std::string& host)
00323   {
00324     return *new UClient(host);
00325   }
00326 
00327   void disconnect(UClient &client)
00328   {
00329     // Asynchronous deletion to let our async handlers terminate.
00330     client.destroy();
00331   }
00332 
00333 } // namespace urbi