|
Urbi SDK Remote for C++
2.7.5
|
00001 /* 00002 * Copyright (C) 2005-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00011 #include <libport/unistd.h> 00012 #include <libport/fcntl.h> 00013 00014 #include <libport/cassert> 00015 #include <libport/compiler.hh> 00016 #include <libport/debug.hh> 00017 #include <libport/thread.hh> 00018 #include <libport/unistd.h> 00019 00020 #include <liburbi/compatibility.hh> 00021 #include <urbi/uconversion.hh> 00022 #include <urbi/umessage.hh> 00023 #include <urbi/usyncclient.hh> 00024 00025 GD_CATEGORY(Urbi.Client.Sync); 00026 00027 namespace urbi 00028 { 00029 00030 USyncClient::options::options() 00031 : super_type::options() 00032 , startCallbackThread_(true) 00033 , connectCallback_(0) 00034 {} 00035 00036 const USyncClient::send_options USyncClient::send_options::default_options = 00037 USyncClient::send_options(); 00038 00039 USyncClient::send_options::send_options() 00040 : timeout_(0) 00041 , mtag_(0) 00042 , mmod_(0) 00043 {} 00044 00045 USyncClient::send_options& 00046 USyncClient::send_options::timeout(libport::utime_t usec) 00047 { 00048 timeout_ = usec; 00049 return *this; 00050 } 00051 00052 USyncClient::send_options& 00053 USyncClient::send_options::tag(const char* tag, const char* mod) 00054 { 00055 mtag_ = tag; 00056 mmod_ = mod; 00057 return *this; 00058 } 00059 00060 UCLIENT_OPTION_IMPL(USyncClient, bool, startCallbackThread); 00061 UCLIENT_OPTION_IMPL(USyncClient, USyncClient::connect_callback_type, 00062 connectCallback); 00063 00064 USyncClient::USyncClient(const std::string& host, 00065 unsigned port, 00066 size_t buflen, 00067 const options& opts) 00068 // Be cautious not to start the UClient part of this USyncClient 00069 // before the completion of "this". If you do (e.g., you forget 00070 // to pass "start(false)" to UClient), then the UClient, as a 00071 // libport::Socket, is likely to receive messages that will be 00072 // bounced to USyncClient::onRead, as expected for virtual 00073 // functions. Except that "this" is not a valid USyncClient, as 00074 // its members we not initialized. 00075 // 00076 // Therefore, start handling connection only in the body of the 00077 // ctor. 00078 : UClient(host, port, buflen, 00079 UClient::options() 00080 .server(opts.server()) 00081 .asynchronous(opts.asynchronous()) 00082 .start(false)) 00083 , sem_() 00084 , queueLock_() 00085 , message_(0) 00086 , syncLock_() 00087 , syncTag() 00088 , default_options_() 00089 , stopCallbackThread_(!opts.startCallbackThread()) 00090 , cbThread(0) 00091 , connectCallback_(opts.connectCallback()) 00092 , synchronous_(false) 00093 { 00094 // Do not start if connectCallback_ is set, we were constructed by a 00095 // listening socket. 00096 if (!connectCallback_) 00097 start(); 00098 if (error()) 00099 return; 00100 00101 if (opts.startCallbackThread()) 00102 cbThread = libport::startThread(this, &USyncClient::callbackThread); 00103 if (!defaultClient) 00104 defaultClient = this; 00105 00106 callbackSem_++; 00107 } 00108 00109 USyncClient::~USyncClient() 00110 { 00111 // Notify of destruction 00112 wasDestroyed(); 00113 // Close the socket 00114 close(); 00115 // Wait for our message handler thread to terminate 00116 if (cbThread) 00117 joinCallbackThread_(); 00118 // Wait for all asio async handlers to terminate 00119 waitForDestructionPermission(); 00120 } 00121 00122 void USyncClient::callbackThread() 00123 { 00124 callbackSem_--; 00125 00126 while (true) 00127 { 00128 sem_--; 00129 if (stopCallbackThread_) 00130 { 00131 // The call to stopCallbackThread is 00132 // waiting on stopCallbackSem_. 00133 stopCallbackSem_++; 00134 return; 00135 } 00136 queueLock_.lock(); 00137 if (queue.empty()) 00138 { 00139 // Only explanation: processEvents from another thread stole our 00140 // message. 00141 sem_++; // Give back the token we took without popping a message. 00142 queueLock_.unlock(); 00143 continue; 00144 } 00145 UMessage *m = queue.front(); 00146 queue.pop_front(); 00147 queueLock_.unlock(); 00148 UAbstractClient::notifyCallbacks(*m); 00149 delete m; 00150 } 00151 } 00152 00153 void USyncClient::stopCallbackThread() 00154 { 00155 if (stopCallbackThread_) 00156 return; 00157 stopCallbackThread_ = true; 00158 sem_++; 00159 // Unlock any pending syncGet. 00160 syncLock_++; 00161 // Wait until the callback thread is actually stopped to avoid both 00162 // processEvents and the callbackThread running at the same time. 00163 stopCallbackSem_--; 00164 } 00165 00166 bool USyncClient::processEvents(libport::utime_t timeout) 00167 { 00168 bool res = false; 00169 libport::utime_t startTime = libport::utime(); 00170 do // Always check at least once. 00171 { 00172 queueLock_.lock(); 00173 if (queue.empty()) 00174 { 00175 queueLock_.unlock(); 00176 return res; 00177 } 00178 res = true; 00179 UMessage *m = queue.front(); 00180 queue.pop_front(); 00181 sem_--; // Will not block since queue was not empty. 00182 queueLock_.unlock(); 00183 UAbstractClient::notifyCallbacks(*m); 00184 delete m; 00185 } while (timeout < 0 || libport::utime() - startTime <= timeout); 00186 return res; 00187 } 00188 00189 int USyncClient::joinCallbackThread_() 00190 { 00191 stopCallbackThread(); 00192 if (cbThread) 00193 { 00194 PTHREAD_RUN(pthread_join, cbThread, 0); 00195 cbThread = 0; 00196 } 00197 return 0; 00198 } 00199 00200 void 00201 USyncClient::notifyCallbacks(const UMessage& msg) 00202 { 00203 queueLock_.lock(); 00204 // If waiting for a tag, pass it to the user. 00205 if (!syncTag.empty() && syncTag == msg.tag) 00206 { 00207 message_ = new UMessage(msg); 00208 syncTag.clear(); 00209 if (waitingFromPollThread_) 00210 libport::get_io_service().stop(); 00211 else 00212 syncLock_++; 00213 } 00214 else if (synchronous_) 00215 UClient::notifyCallbacks(msg); 00216 else 00217 { 00218 queue.push_back(new UMessage(msg)); 00219 sem_++; 00220 } 00221 queueLock_.unlock(); 00222 } 00223 00224 UMessage* 00225 USyncClient::waitForTag(const std::string& tag, libport::utime_t useconds) 00226 { 00227 if (message_ || !syncTag.empty()) 00228 throw std::runtime_error("Another waitForTag is already in progress"); 00229 syncTag = tag; 00230 message_ = 0; 00231 waitingFromPollThread_ = libport::isPollThread(); 00232 // Reset before releasing the lock, as other thread may call io.stop() 00233 if (waitingFromPollThread_) 00234 libport::get_io_service().reset(); 00235 queueLock_.unlock(); 00236 00237 // syncTag is reset by the other thread. 00238 if (!waitingFromPollThread_) 00239 syncLock_.uget(useconds); 00240 else if (useconds) 00241 libport::pollFor(useconds); 00242 else 00243 libport::get_io_service().run(); 00244 00245 UMessage *res = message_; 00246 if (!res) 00247 GD_ERROR("Timed out"); 00248 else if (res->type == MESSAGE_ERROR) 00249 GD_FERROR("Received error message: %s", *res); 00250 message_ = 0; 00251 syncTag.clear(); 00252 return res; 00253 } 00254 00255 namespace 00256 { 00260 static 00261 bool 00262 has_tag(const char* cp) 00263 { 00264 while (*cp == ' ') 00265 ++cp; 00266 while (isalpha(*cp)) 00267 ++cp; 00268 while (*cp == ' ') 00269 ++cp; 00270 return *cp == ':' || *cp == '<'; 00271 } 00272 00275 static 00276 std::string 00277 make_tag(UAbstractClient& cl, const USyncClient::send_options& opt) 00278 { 00279 std::string res; 00280 if (opt.mtag_) 00281 { 00282 res = opt.mtag_; 00283 if (opt.mmod_) 00284 res += opt.mmod_; 00285 } 00286 else 00287 res = cl.fresh(); 00288 return res; 00289 } 00290 } 00291 00292 USyncClient::error_type 00293 USyncClient::onClose() 00294 { 00295 if (closed_) 00296 return 1; 00297 00298 UClient::onClose(); 00299 00300 stopCallbackThread_ = true; 00301 callbackSem_++; 00302 sem_++; 00303 return 0; 00304 } 00305 00306 UMessage* 00307 USyncClient::syncGet_(const char* format, va_list& arg, 00308 const USyncClient::send_options& options) 00309 { 00310 const USyncClient::send_options& opt_used = getOptions(options); 00311 if (has_tag(format)) 00312 return 0; 00313 sendBufferLock.lock(); 00314 std::string tag = make_tag(*this, opt_used); 00315 pack("%s", compatibility::evaluate_in_channel_open 00316 (tag, kernelMajor()).c_str()); 00317 rc = vpack(format, arg); 00318 if (rc < 0) 00319 { 00320 sendBufferLock.unlock(); 00321 return 0; 00322 } 00323 pack("%s", compatibility::evaluate_in_channel_close 00324 (tag, kernelMajor()).c_str()); 00325 queueLock_.lock(); 00326 rc = effective_send(sendBuffer); 00327 sendBuffer[0] = 0; 00328 sendBufferLock.unlock(); 00329 return waitForTag(opt_used.mtag_ ? opt_used.mtag_ : tag, 00330 opt_used.timeout_); 00331 } 00332 00333 UMessage* 00334 USyncClient::syncGet(const char* format, ...) 00335 { 00336 va_list arg; 00337 va_start(arg, format); 00338 UMessage* res = syncGet_(format, arg); 00339 va_end(arg); 00340 return res; 00341 } 00342 00343 UMessage* 00344 USyncClient::syncGet(const std::string& msg) 00345 { 00346 // Yes, this is studid, as it will copy uselessly. But that's the 00347 // only safe way to do it. The interface should be redesigned, 00348 // without buffers actually. 00349 return syncGet("%s", msg.c_str()); 00350 } 00351 00352 UMessage* 00353 USyncClient::syncGet(libport::utime_t useconds, 00354 const char* format, ...) 00355 { 00356 va_list arg; 00357 va_start(arg, format); 00358 UMessage* res = syncGet_(format, arg, send_options().timeout(useconds)); 00359 va_end(arg); 00360 return res; 00361 } 00362 00363 UMessage* 00364 USyncClient::syncGetTag(const char* format, 00365 const char* mtag, const char* mmod, ...) 00366 { 00367 va_list arg; 00368 va_start(arg, mmod); 00369 UMessage* res = syncGet_(format, arg, send_options().tag(mtag, mmod)); 00370 va_end(arg); 00371 return res; 00372 } 00373 00374 UMessage* 00375 USyncClient::syncGetTag(libport::utime_t useconds, 00376 const char* format, 00377 const char* mtag, 00378 const char* mmod, ...) 00379 { 00380 va_list arg; 00381 va_start(arg, mmod); 00382 UMessage* res = syncGet_(format, arg, 00383 send_options().tag(mtag, mmod).timeout(useconds)); 00384 va_end(arg); 00385 return res; 00386 } 00387 00388 int 00389 USyncClient::syncGetImage(const char* camera, 00390 void* buffer, size_t& buffersize, 00391 int format, int transmitFormat, 00392 size_t& width, size_t& height, 00393 libport::utime_t useconds) 00394 { 00395 int f = format == IMAGE_JPEG || transmitFormat == URBI_TRANSMIT_JPEG; 00396 if (kernelMajor_ < 2) 00397 // FIXME: required to ensure format change is applied 00398 send("%s.format = %d;\n" 00399 "noop;\n" 00400 "noop;\n", camera, f); 00401 else 00402 send(SYNCLINE_WRAP("%s.format = %d|;", camera, f)); 00403 UMessage *m = syncGet(useconds, "%s.val", camera); 00404 if (!m 00405 || m->type != MESSAGE_DATA 00406 || m->value->type != DATA_BINARY 00407 || m->value->binary->type != BINARY_IMAGE) 00408 { 00409 delete m; 00410 return 0; 00411 } 00412 width = m->value->binary->image.width; 00413 height = m->value->binary->image.height; 00414 00415 size_t osize = buffersize; 00416 if (f == 1 && format != IMAGE_JPEG) 00417 { 00418 size_t w, h; 00419 //uncompress jpeg 00420 if (format == IMAGE_YCbCr) 00421 convertJPEGtoYCrCb((const byte*) m->value->binary->image.data, 00422 m->value->binary->image.size, (byte**) &buffer, 00423 buffersize, w, h); 00424 else 00425 convertJPEGtoRGB((const byte*) m->value->binary->image.data, 00426 m->value->binary->image.size, (byte**) &buffer, 00427 buffersize, w, h); 00428 } 00429 else if (format == IMAGE_RGB || format == IMAGE_PPM) 00430 { 00431 buffersize = std::min(m->value->binary->image.size, 00432 static_cast<size_t> (buffersize)); 00433 if (m->value->binary->image.imageFormat == IMAGE_YCbCr) 00434 convertYCbCrtoRGB((const byte*) m->value->binary->image.data, 00435 buffersize, (byte*) buffer); 00436 else 00437 memcpy(buffer, m->value->binary->image.data, buffersize); 00438 00439 } 00440 else 00441 { 00442 //jpeg jpeg, or ycrcb ycrcb 00443 buffersize = std::min(m->value->binary->image.size, 00444 static_cast<size_t>(buffersize)); 00445 memcpy(buffer, m->value->binary->image.data, buffersize); 00446 } 00447 if (format == IMAGE_PPM) 00448 { 00449 char p6h[20]; 00450 sprintf(p6h, "P6\n%zu %zu\n255\n", width, height); 00451 size_t p6len = strlen(p6h); 00452 size_t mlen = osize > buffersize + p6len ? buffersize : osize - p6len; 00453 memmove((void *) (((long) buffer) + p6len), buffer, mlen); 00454 memcpy(buffer, p6h, p6len); 00455 buffersize += p6len; 00456 } 00457 delete m; 00458 return 1; 00459 } 00460 00461 int 00462 USyncClient::syncGetNormalizedDevice(const char* device, ufloat& val, 00463 libport::utime_t useconds) 00464 { 00465 return getValue(syncGet(useconds, "%s.valn;", device), val); 00466 } 00467 00468 int 00469 USyncClient::syncGetValue(const char* valName, UValue& val, 00470 libport::utime_t useconds) 00471 { 00472 return syncGetValue(0, valName, val, useconds); 00473 } 00474 00475 int 00476 USyncClient::syncGetValue(const char* tag, const char* valName, UValue& val, 00477 libport::utime_t useconds) 00478 { 00479 return getValue(syncGetTag(useconds, "%s;", tag, 0, valName), val); 00480 } 00481 00482 int 00483 USyncClient::syncGetDevice(const char* device, ufloat& val, 00484 libport::utime_t useconds) 00485 { 00486 return getValue(syncGet(useconds, "%s.val;", device), val); 00487 } 00488 00489 int 00490 USyncClient::syncGetResult(const char* command, ufloat& val, 00491 libport::utime_t useconds) 00492 { 00493 return getValue(syncGet(useconds, "%s", command), val); 00494 } 00495 00496 00497 int 00498 USyncClient::syncGetDevice(const char* device, const char* access, 00499 ufloat& val, libport::utime_t useconds) 00500 { 00501 return getValue(syncGet(useconds, "%s.%s;", device, access), val); 00502 } 00503 00504 00505 int 00506 USyncClient::syncGetSound(const char* device, int duration, USound& sound, 00507 libport::utime_t useconds) 00508 { 00509 if (kernelMajor_ < 2) 00510 send("syncgetsound = BIN 0;\n" 00511 "loopsound: loop syncgetsound = syncgetsound + %s.val,\n" 00512 "{\n" 00513 " sleep(%d);\n" 00514 " stop loopsound;\n" 00515 " noop;\n" 00516 " noop;\n" 00517 "};\n", 00518 device, duration); 00519 else 00520 send(SYNCLINE_WRAP( 00521 "syncgetsound = BIN 0;\n" 00522 "loopsound: loop syncgetsound = syncgetsound + %s.val,\n" 00523 "{\n" 00524 " sleep(%d);\n" 00525 " loopsound.stop;\n" 00526 "};", device, duration)); 00527 00528 UMessage* m = syncGet(useconds, "%s", "syncgetsound;"); 00529 if (!m 00530 || m->type != MESSAGE_DATA 00531 || m->value->type != DATA_BINARY 00532 || m->value->binary->type != BINARY_SOUND) 00533 { 00534 delete m; 00535 return 0; 00536 } 00537 convert(m->value->binary->sound, sound); 00538 delete m; 00539 return 1; 00540 } 00541 00542 int 00543 USyncClient::syncSend(const void* buffer, size_t length) 00544 { 00545 if (rc != 0) 00546 return -1; 00547 sendBufferLock.lock(); 00548 int res = effective_send(buffer, length); 00549 sendBufferLock.unlock(); 00550 return res; 00551 } 00552 00553 void 00554 USyncClient::waitForKernelVersion(bool hasProcessingThread) 00555 { 00556 // Do not call kernelMajor() which precisely requires kernelMajor_ 00557 // to be defined. 00558 while (kernelMajor_ < 0 && !error()) 00559 { 00560 // Process events if we are the processing thread or if there is none. 00561 if (!hasProcessingThread || cbThread == pthread_self()) 00562 processEvents(); 00563 // Process the sockets if we are the asio worker thread. 00564 libport::Socket::sleep(100000); 00565 } 00566 } 00567 00568 void 00569 USyncClient::onConnect() 00570 { 00571 UClient::onConnect(); 00572 if (connectCallback_) 00573 connectCallback_(this); 00574 connectCallback_ = 0; 00575 } 00576 00577 void 00578 USyncClient::setDefaultOptions(const USyncClient::send_options& opt) 00579 { 00580 default_options_ = opt; 00581 } 00582 00583 const USyncClient::send_options& 00584 USyncClient::getOptions(const USyncClient::send_options& opt) const 00585 { 00586 return (&opt == &USyncClient::send_options::default_options) ? 00587 default_options_ : opt; 00588 } 00589 00590 static void destroySocket(libport::Socket* s) 00591 { 00592 s->destroy(); 00593 } 00594 00595 libport::Socket* 00596 USyncClient::onAccept(connect_callback_type connectCallback, 00597 size_t buflen, 00598 bool startCallbackThread) 00599 { 00600 return new USyncClient("", 0, buflen, 00601 USyncClient::options() 00602 .startCallbackThread(startCallbackThread) 00603 .connectCallback(connectCallback)); 00604 } 00605 00606 boost::shared_ptr<libport::Finally> 00607 USyncClient::listen(const std::string& host, const std::string& port, 00608 boost::system::error_code& erc, 00609 connect_callback_type connectCallback, 00610 size_t buflen, 00611 bool startCallbackThread) 00612 { 00613 libport::Socket* s = new libport::Socket; 00614 erc = s->listen(boost::bind(&onAccept, connectCallback, buflen, 00615 startCallbackThread), 00616 host, port); 00617 if (erc) 00618 return boost::shared_ptr<libport::Finally>((libport::Finally*)0); 00619 boost::shared_ptr<libport::Finally> res(new libport::Finally); 00620 (*res) << boost::bind(&destroySocket, s); 00621 return res; 00622 } 00623 00624 bool 00625 USyncClient::isCallbackThread() const 00626 { 00627 return pthread_self() == cbThread; 00628 } 00629 00630 void 00631 USyncClient::setSynchronous(bool enable) 00632 { 00633 synchronous_ = enable; 00634 } 00635 00636 void 00637 USyncClient::lockQueue() 00638 { 00639 queueLock_.lock(); 00640 } 00641 } // namespace urbi