Urbi SDK Remote for C++  2.7.5
usyncclient.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2005-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00011 #include <libport/unistd.h>
00012 #include <libport/fcntl.h>
00013 
00014 #include <libport/cassert>
00015 #include <libport/compiler.hh>
00016 #include <libport/debug.hh>
00017 #include <libport/thread.hh>
00018 #include <libport/unistd.h>
00019 
00020 #include <liburbi/compatibility.hh>
00021 #include <urbi/uconversion.hh>
00022 #include <urbi/umessage.hh>
00023 #include <urbi/usyncclient.hh>
00024 
00025 GD_CATEGORY(Urbi.Client.Sync);
00026 
00027 namespace urbi
00028 {
00029 
00030   USyncClient::options::options()
00031     : super_type::options()
00032     , startCallbackThread_(true)
00033     , connectCallback_(0)
00034   {}
00035 
00036   const USyncClient::send_options USyncClient::send_options::default_options =
00037     USyncClient::send_options();
00038 
00039   USyncClient::send_options::send_options()
00040     : timeout_(0)
00041     , mtag_(0)
00042     , mmod_(0)
00043   {}
00044 
00045   USyncClient::send_options&
00046   USyncClient::send_options::timeout(libport::utime_t usec)
00047   {
00048     timeout_ = usec;
00049     return *this;
00050   }
00051 
00052   USyncClient::send_options&
00053   USyncClient::send_options::tag(const char* tag, const char* mod)
00054   {
00055     mtag_ = tag;
00056     mmod_ = mod;
00057     return *this;
00058   }
00059 
00060   UCLIENT_OPTION_IMPL(USyncClient, bool, startCallbackThread);
00061   UCLIENT_OPTION_IMPL(USyncClient, USyncClient::connect_callback_type,
00062                       connectCallback);
00063 
00064   USyncClient::USyncClient(const std::string& host,
00065                            unsigned port,
00066                            size_t buflen,
00067                            const options& opts)
00068     // Be cautious not to start the UClient part of this USyncClient
00069     // before the completion of "this".  If you do (e.g., you forget
00070     // to pass "start(false)" to UClient), then the UClient, as a
00071     // libport::Socket, is likely to receive messages that will be
00072     // bounced to USyncClient::onRead, as expected for virtual
00073     // functions.  Except that "this" is not a valid USyncClient, as
00074     // its members we not initialized.
00075     //
00076     // Therefore, start handling connection only in the body of the
00077     // ctor.
00078     : UClient(host, port, buflen,
00079               UClient::options()
00080                 .server(opts.server())
00081                 .asynchronous(opts.asynchronous())
00082                 .start(false))
00083     , sem_()
00084     , queueLock_()
00085     , message_(0)
00086     , syncLock_()
00087     , syncTag()
00088     , default_options_()
00089     , stopCallbackThread_(!opts.startCallbackThread())
00090     , cbThread(0)
00091     , connectCallback_(opts.connectCallback())
00092     , synchronous_(false)
00093   {
00094     // Do not start if connectCallback_ is set, we were constructed by a
00095     // listening socket.
00096     if (!connectCallback_)
00097       start();
00098     if (error())
00099       return;
00100 
00101     if (opts.startCallbackThread())
00102       cbThread = libport::startThread(this, &USyncClient::callbackThread);
00103     if (!defaultClient)
00104       defaultClient = this;
00105 
00106     callbackSem_++;
00107   }
00108 
00109   USyncClient::~USyncClient()
00110   {
00111     // Notify of destruction
00112     wasDestroyed();
00113     // Close the socket
00114     close();
00115     // Wait for our message handler thread to terminate
00116     if (cbThread)
00117       joinCallbackThread_();
00118     // Wait for all asio async handlers to terminate
00119     waitForDestructionPermission();
00120   }
00121 
00122   void USyncClient::callbackThread()
00123   {
00124     callbackSem_--;
00125 
00126     while (true)
00127     {
00128       sem_--;
00129       if (stopCallbackThread_)
00130       {
00131         // The call to stopCallbackThread is
00132         // waiting on stopCallbackSem_.
00133         stopCallbackSem_++;
00134         return;
00135       }
00136       queueLock_.lock();
00137       if (queue.empty())
00138       {
00139         // Only explanation: processEvents from another thread stole our
00140         // message.
00141         sem_++; // Give back the token we took without popping a message.
00142         queueLock_.unlock();
00143         continue;
00144       }
00145       UMessage *m = queue.front();
00146       queue.pop_front();
00147       queueLock_.unlock();
00148       UAbstractClient::notifyCallbacks(*m);
00149       delete m;
00150     }
00151   }
00152 
00153   void USyncClient::stopCallbackThread()
00154   {
00155     if (stopCallbackThread_)
00156       return;
00157     stopCallbackThread_ = true;
00158     sem_++;
00159     // Unlock any pending syncGet.
00160     syncLock_++;
00161     // Wait until the callback thread is actually stopped to avoid both
00162     // processEvents and the callbackThread running at the same time.
00163     stopCallbackSem_--;
00164   }
00165 
00166   bool USyncClient::processEvents(libport::utime_t timeout)
00167   {
00168     bool res = false;
00169     libport::utime_t startTime = libport::utime();
00170     do // Always check at least once.
00171     {
00172       queueLock_.lock();
00173       if (queue.empty())
00174       {
00175         queueLock_.unlock();
00176         return res;
00177       }
00178       res = true;
00179       UMessage *m = queue.front();
00180       queue.pop_front();
00181       sem_--; // Will not block since queue was not empty.
00182       queueLock_.unlock();
00183       UAbstractClient::notifyCallbacks(*m);
00184       delete m;
00185     } while (timeout < 0 || libport::utime() - startTime <= timeout);
00186     return res;
00187   }
00188 
00189   int USyncClient::joinCallbackThread_()
00190   {
00191     stopCallbackThread();
00192     if (cbThread)
00193     {
00194       PTHREAD_RUN(pthread_join, cbThread, 0);
00195       cbThread = 0;
00196     }
00197     return 0;
00198   }
00199 
00200   void
00201   USyncClient::notifyCallbacks(const UMessage& msg)
00202   {
00203     queueLock_.lock();
00204     // If waiting for a tag, pass it to the user.
00205     if (!syncTag.empty() && syncTag == msg.tag)
00206     {
00207       message_ = new UMessage(msg);
00208       syncTag.clear();
00209       if (waitingFromPollThread_)
00210         libport::get_io_service().stop();
00211       else
00212         syncLock_++;
00213     }
00214     else if (synchronous_)
00215       UClient::notifyCallbacks(msg);
00216     else
00217     {
00218       queue.push_back(new UMessage(msg));
00219       sem_++;
00220     }
00221     queueLock_.unlock();
00222   }
00223 
00224   UMessage*
00225   USyncClient::waitForTag(const std::string& tag, libport::utime_t useconds)
00226   {
00227     if (message_ || !syncTag.empty())
00228       throw std::runtime_error("Another waitForTag is already in progress");
00229     syncTag = tag;
00230     message_ = 0;
00231     waitingFromPollThread_ = libport::isPollThread();
00232     // Reset before releasing the lock, as other thread may call io.stop()
00233     if (waitingFromPollThread_)
00234       libport::get_io_service().reset();
00235     queueLock_.unlock();
00236 
00237     // syncTag is reset by the other thread.
00238     if (!waitingFromPollThread_)
00239       syncLock_.uget(useconds);
00240     else if (useconds)
00241       libport::pollFor(useconds);
00242     else
00243       libport::get_io_service().run();
00244 
00245     UMessage *res = message_;
00246     if (!res)
00247       GD_ERROR("Timed out");
00248     else if (res->type == MESSAGE_ERROR)
00249       GD_FERROR("Received error message: %s", *res);
00250     message_ = 0;
00251     syncTag.clear();
00252     return res;
00253   }
00254 
00255   namespace
00256   {
00260     static
00261     bool
00262     has_tag(const char* cp)
00263     {
00264       while (*cp == ' ')
00265         ++cp;
00266       while (isalpha(*cp))
00267         ++cp;
00268       while (*cp == ' ')
00269         ++cp;
00270       return *cp == ':' || *cp == '<';
00271     }
00272 
00275     static
00276     std::string
00277     make_tag(UAbstractClient& cl, const USyncClient::send_options& opt)
00278     {
00279       std::string res;
00280       if (opt.mtag_)
00281       {
00282         res = opt.mtag_;
00283         if (opt.mmod_)
00284           res += opt.mmod_;
00285       }
00286       else
00287         res = cl.fresh();
00288       return res;
00289     }
00290   }
00291 
00292   USyncClient::error_type
00293   USyncClient::onClose()
00294   {
00295     if (closed_)
00296       return 1;
00297 
00298     UClient::onClose();
00299 
00300     stopCallbackThread_ = true;
00301     callbackSem_++;
00302     sem_++;
00303     return 0;
00304   }
00305 
00306   UMessage*
00307   USyncClient::syncGet_(const char* format, va_list& arg,
00308                         const USyncClient::send_options& options)
00309   {
00310     const USyncClient::send_options& opt_used = getOptions(options);
00311     if (has_tag(format))
00312       return 0;
00313     sendBufferLock.lock();
00314     std::string tag = make_tag(*this, opt_used);
00315     pack("%s", compatibility::evaluate_in_channel_open
00316          (tag, kernelMajor()).c_str());
00317     rc = vpack(format, arg);
00318     if (rc < 0)
00319     {
00320       sendBufferLock.unlock();
00321       return 0;
00322     }
00323     pack("%s", compatibility::evaluate_in_channel_close
00324          (tag, kernelMajor()).c_str());
00325     queueLock_.lock();
00326     rc = effective_send(sendBuffer);
00327     sendBuffer[0] = 0;
00328     sendBufferLock.unlock();
00329     return waitForTag(opt_used.mtag_ ? opt_used.mtag_ : tag,
00330                       opt_used.timeout_);
00331   }
00332 
00333   UMessage*
00334   USyncClient::syncGet(const char* format, ...)
00335   {
00336     va_list arg;
00337     va_start(arg, format);
00338     UMessage* res = syncGet_(format, arg);
00339     va_end(arg);
00340     return res;
00341   }
00342 
00343   UMessage*
00344   USyncClient::syncGet(const std::string& msg)
00345   {
00346     // Yes, this is studid, as it will copy uselessly.  But that's the
00347     // only safe way to do it.  The interface should be redesigned,
00348     // without buffers actually.
00349     return syncGet("%s", msg.c_str());
00350   }
00351 
00352   UMessage*
00353   USyncClient::syncGet(libport::utime_t useconds,
00354                        const char* format, ...)
00355   {
00356     va_list arg;
00357     va_start(arg, format);
00358     UMessage* res = syncGet_(format, arg, send_options().timeout(useconds));
00359     va_end(arg);
00360     return res;
00361   }
00362 
00363   UMessage*
00364   USyncClient::syncGetTag(const char* format,
00365                           const char* mtag, const char* mmod, ...)
00366   {
00367     va_list arg;
00368     va_start(arg, mmod);
00369     UMessage* res = syncGet_(format, arg, send_options().tag(mtag, mmod));
00370     va_end(arg);
00371     return res;
00372   }
00373 
00374   UMessage*
00375   USyncClient::syncGetTag(libport::utime_t useconds,
00376                           const char* format,
00377                           const char* mtag,
00378                           const char* mmod, ...)
00379   {
00380     va_list arg;
00381     va_start(arg, mmod);
00382     UMessage* res = syncGet_(format, arg,
00383                              send_options().tag(mtag, mmod).timeout(useconds));
00384     va_end(arg);
00385     return res;
00386   }
00387 
00388   int
00389   USyncClient::syncGetImage(const char* camera,
00390                             void* buffer, size_t& buffersize,
00391                             int format, int transmitFormat,
00392                             size_t& width, size_t& height,
00393                             libport::utime_t useconds)
00394   {
00395     int f = format == IMAGE_JPEG || transmitFormat == URBI_TRANSMIT_JPEG;
00396     if (kernelMajor_ < 2)
00397     // FIXME: required to ensure format change is applied
00398       send("%s.format = %d;\n"
00399            "noop;\n"
00400            "noop;\n", camera, f);
00401     else
00402       send(SYNCLINE_WRAP("%s.format = %d|;", camera, f));
00403     UMessage *m = syncGet(useconds, "%s.val", camera);
00404     if (!m
00405         || m->type != MESSAGE_DATA
00406         || m->value->type != DATA_BINARY
00407         || m->value->binary->type != BINARY_IMAGE)
00408     {
00409       delete m;
00410       return 0;
00411     }
00412     width = m->value->binary->image.width;
00413     height = m->value->binary->image.height;
00414 
00415     size_t osize = buffersize;
00416     if (f == 1 && format != IMAGE_JPEG)
00417     {
00418       size_t w, h;
00419       //uncompress jpeg
00420       if (format == IMAGE_YCbCr)
00421         convertJPEGtoYCrCb((const byte*) m->value->binary->image.data,
00422                            m->value->binary->image.size, (byte**) &buffer,
00423                            buffersize, w, h);
00424       else
00425         convertJPEGtoRGB((const byte*) m->value->binary->image.data,
00426                          m->value->binary->image.size, (byte**) &buffer,
00427                          buffersize, w, h);
00428     }
00429     else if (format == IMAGE_RGB || format == IMAGE_PPM)
00430     {
00431       buffersize = std::min(m->value->binary->image.size,
00432                             static_cast<size_t> (buffersize));
00433       if (m->value->binary->image.imageFormat == IMAGE_YCbCr)
00434         convertYCbCrtoRGB((const byte*) m->value->binary->image.data,
00435                           buffersize, (byte*) buffer);
00436       else
00437         memcpy(buffer, m->value->binary->image.data, buffersize);
00438 
00439     }
00440     else
00441     {
00442       //jpeg jpeg, or ycrcb ycrcb
00443       buffersize = std::min(m->value->binary->image.size,
00444                             static_cast<size_t>(buffersize));
00445       memcpy(buffer, m->value->binary->image.data, buffersize);
00446     }
00447     if (format == IMAGE_PPM)
00448     {
00449       char p6h[20];
00450       sprintf(p6h, "P6\n%zu %zu\n255\n", width, height);
00451       size_t p6len = strlen(p6h);
00452       size_t mlen = osize > buffersize + p6len ? buffersize : osize - p6len;
00453       memmove((void *) (((long) buffer) + p6len), buffer, mlen);
00454       memcpy(buffer, p6h, p6len);
00455       buffersize += p6len;
00456     }
00457     delete m;
00458     return 1;
00459   }
00460 
00461   int
00462   USyncClient::syncGetNormalizedDevice(const char* device, ufloat& val,
00463                                        libport::utime_t useconds)
00464   {
00465     return getValue(syncGet(useconds, "%s.valn;", device), val);
00466   }
00467 
00468   int
00469   USyncClient::syncGetValue(const char* valName, UValue& val,
00470                             libport::utime_t useconds)
00471   {
00472     return syncGetValue(0, valName, val, useconds);
00473   }
00474 
00475   int
00476   USyncClient::syncGetValue(const char* tag, const char* valName, UValue& val,
00477                             libport::utime_t useconds)
00478   {
00479     return getValue(syncGetTag(useconds, "%s;", tag, 0, valName), val);
00480   }
00481 
00482   int
00483   USyncClient::syncGetDevice(const char* device, ufloat& val,
00484                              libport::utime_t useconds)
00485   {
00486     return getValue(syncGet(useconds, "%s.val;", device), val);
00487   }
00488 
00489   int
00490   USyncClient::syncGetResult(const char* command, ufloat& val,
00491                              libport::utime_t useconds)
00492   {
00493     return getValue(syncGet(useconds, "%s", command), val);
00494   }
00495 
00496 
00497   int
00498   USyncClient::syncGetDevice(const char* device, const char* access,
00499                              ufloat& val, libport::utime_t useconds)
00500   {
00501     return getValue(syncGet(useconds, "%s.%s;", device, access), val);
00502   }
00503 
00504 
00505   int
00506   USyncClient::syncGetSound(const char* device, int duration, USound& sound,
00507                             libport::utime_t useconds)
00508   {
00509     if (kernelMajor_ < 2)
00510       send("syncgetsound = BIN 0;\n"
00511            "loopsound: loop syncgetsound = syncgetsound + %s.val,\n"
00512            "{\n"
00513            "  sleep(%d);\n"
00514            "  stop loopsound;\n"
00515            "  noop;\n"
00516            "  noop;\n"
00517            "};\n",
00518            device, duration);
00519     else
00520       send(SYNCLINE_WRAP(
00521              "syncgetsound = BIN 0;\n"
00522              "loopsound: loop syncgetsound = syncgetsound + %s.val,\n"
00523              "{\n"
00524              "  sleep(%d);\n"
00525              "  loopsound.stop;\n"
00526              "};", device, duration));
00527 
00528     UMessage* m = syncGet(useconds, "%s", "syncgetsound;");
00529     if (!m
00530         || m->type != MESSAGE_DATA
00531         || m->value->type != DATA_BINARY
00532         || m->value->binary->type != BINARY_SOUND)
00533     {
00534       delete m;
00535       return 0;
00536     }
00537     convert(m->value->binary->sound, sound);
00538     delete m;
00539     return 1;
00540   }
00541 
00542   int
00543   USyncClient::syncSend(const void* buffer, size_t length)
00544   {
00545     if (rc != 0)
00546       return -1;
00547     sendBufferLock.lock();
00548     int res = effective_send(buffer, length);
00549     sendBufferLock.unlock();
00550     return res;
00551   }
00552 
00553   void
00554   USyncClient::waitForKernelVersion(bool hasProcessingThread)
00555   {
00556     // Do not call kernelMajor() which precisely requires kernelMajor_
00557     // to be defined.
00558     while (kernelMajor_ < 0 && !error())
00559     {
00560       // Process events if we are the processing thread or if there is none.
00561       if (!hasProcessingThread || cbThread == pthread_self())
00562         processEvents();
00563       // Process the sockets if we are the asio worker thread.
00564       libport::Socket::sleep(100000);
00565     }
00566   }
00567 
00568   void
00569   USyncClient::onConnect()
00570   {
00571     UClient::onConnect();
00572     if (connectCallback_)
00573       connectCallback_(this);
00574     connectCallback_ = 0;
00575   }
00576 
00577   void
00578   USyncClient::setDefaultOptions(const USyncClient::send_options& opt)
00579   {
00580     default_options_ = opt;
00581   }
00582 
00583   const USyncClient::send_options&
00584   USyncClient::getOptions(const USyncClient::send_options& opt) const
00585   {
00586     return (&opt == &USyncClient::send_options::default_options) ?
00587       default_options_ : opt;
00588   }
00589 
00590   static void destroySocket(libport::Socket* s)
00591   {
00592     s->destroy();
00593   }
00594 
00595   libport::Socket*
00596   USyncClient::onAccept(connect_callback_type connectCallback,
00597                         size_t buflen,
00598                         bool startCallbackThread)
00599   {
00600     return new USyncClient("", 0, buflen,
00601                            USyncClient::options()
00602                            .startCallbackThread(startCallbackThread)
00603                            .connectCallback(connectCallback));
00604   }
00605 
00606   boost::shared_ptr<libport::Finally>
00607   USyncClient::listen(const std::string& host, const std::string& port,
00608                       boost::system::error_code& erc,
00609                       connect_callback_type connectCallback,
00610                       size_t buflen,
00611                       bool startCallbackThread)
00612   {
00613     libport::Socket* s = new libport::Socket;
00614     erc = s->listen(boost::bind(&onAccept, connectCallback, buflen,
00615                                 startCallbackThread),
00616                     host, port);
00617     if (erc)
00618       return boost::shared_ptr<libport::Finally>((libport::Finally*)0);
00619     boost::shared_ptr<libport::Finally> res(new libport::Finally);
00620     (*res) << boost::bind(&destroySocket, s);
00621     return res;
00622   }
00623 
00624   bool
00625   USyncClient::isCallbackThread() const
00626   {
00627     return pthread_self() == cbThread;
00628   }
00629 
00630   void
00631   USyncClient::setSynchronous(bool enable)
00632   {
00633     synchronous_ = enable;
00634   }
00635 
00636   void
00637   USyncClient::lockQueue()
00638   {
00639     queueLock_.lock();
00640   }
00641 } // namespace urbi