|
Urbi SDK Remote for C++
2.7.5
|
00001 /* 00002 * Copyright (C) 2005-2012, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00012 00013 #include <algorithm> 00014 #include <libport/cassert> 00015 #include <libport/cerrno> 00016 #include <libport/cmath> 00017 #include <libport/cstdlib> 00018 #include <fstream> 00019 #include <iostream> 00020 00021 #include <libport/format.hh> 00022 #include <libport/io-stream.hh> 00023 #include <libport/lexical-cast.hh> 00024 00025 #include <libport/cstdio> 00026 #include <libport/cstring> 00027 #include <libport/containers.hh> 00028 #include <libport/debug.hh> 00029 #include <libport/escape.hh> 00030 #include <libport/lexical-cast.hh> 00031 #include <libport/lockable.hh> 00032 #include <libport/sys/stat.h> 00033 #include <libport/unistd.h> 00034 #include <libport/windows.hh> 00035 00036 #include <urbi/uabstractclient.hh> 00037 #include <urbi/uconversion.hh> 00038 #include <urbi/umessage.hh> 00039 #include <urbi/utag.hh> 00040 00041 #include <liburbi/compatibility.hh> 00042 00043 GD_CATEGORY(Urbi.Client.Abstract); 00044 00045 namespace urbi 00046 { 00047 00049 const char* tag_error = "[error]"; 00051 const char* tag_wildcard = "[wildcard]"; 00052 00053 std::ostream& 00054 default_stream() 00055 { 00056 return (getDefaultClient() 00057 ? ((UAbstractClient*)getDefaultClient())->stream_get() 00058 : std::cerr); 00059 } 00060 00061 00062 /*-------------. 00063 | UCallbacks. | 00064 `-------------*/ 00065 00066 static UCallbackID nextId; 00067 00068 class UCallbackWrapperCB: public UCallbackWrapper 00069 { 00070 UCallback cb; 00071 public: 00072 UCallbackWrapperCB(UCallback cb) 00073 : cb(cb) 00074 { 00075 } 00076 virtual UCallbackAction operator()(const UMessage& msg) 00077 { 00078 return cb(msg); 00079 } 00080 }; 00081 00082 00083 class UCallbackWrapperCCB: public UCallbackWrapper 00084 { 00085 UCustomCallback cb; 00086 void * data; 00087 public: 00088 UCallbackWrapperCCB(UCustomCallback cb, void* data) 00089 : cb(cb) 00090 , data(data) 00091 { 00092 } 00093 virtual UCallbackAction operator()(const UMessage& msg) 00094 { 00095 return cb(data, msg); 00096 } 00097 }; 00098 00099 00100 /*-------------------. 00101 | UClientStreambuf. | 00102 `-------------------*/ 00103 00104 class UClientStreambuf: public libport::StreamBuffer 00105 { 00106 public: 00107 UClientStreambuf(UAbstractClient* cl) 00108 : client_(cl) 00109 {} 00110 00111 protected: 00112 virtual size_t read(char* buffer, size_t size); 00113 virtual void write(char* buffer, size_t size); 00114 00115 private: 00116 UAbstractClient* client_; 00117 }; 00118 00119 void 00120 UClientStreambuf::write(char* buffer, size_t size) 00121 { 00122 client_->effective_send(buffer, size); 00123 } 00124 00125 size_t 00126 UClientStreambuf::read(char*, size_t) 00127 { 00128 return 0; 00129 } 00130 00131 /*------------------. 00132 | UAbstractClient. | 00133 `------------------*/ 00134 00135 00136 const char* UAbstractClient::CLIENTERROR_TAG = "client_error"; 00137 00138 void 00139 UAbstractClient::bins_clear() 00140 { 00141 for (/* nothing */; !bins.empty(); bins.pop_front()) 00142 bins.front().clear(); 00143 } 00144 00145 inline 00146 bool 00147 matching_tag(const UMessage& msg, const char* tag) 00148 { 00149 return 00150 msg.tag == tag 00151 || (libport::streq(tag, tag_error) && msg.type == MESSAGE_ERROR) 00152 // The wild card does not match tags starting with 00153 // TAG_PRIVATE_PREFIX. 00154 || (libport::streq(tag, tag_wildcard) 00155 && msg.tag.compare(0, 00156 sizeof TAG_PRIVATE_PREFIX - 1, 00157 TAG_PRIVATE_PREFIX)); 00158 } 00159 00160 void 00161 UAbstractClient::notifyCallbacks(const UMessage& msg) 00162 { 00163 libport::BlockLock bl(listLock); 00164 bool inc = true; 00165 for (callbacks_type::iterator it = callbacks_.begin(); 00166 it != callbacks_.end(); 00167 inc ? it++ : it, inc = true) 00168 if (matching_tag(msg, it->tag)) 00169 { 00170 UCallbackAction ua = it->callback(msg); 00171 if (ua == URBI_REMOVE) 00172 { 00173 delete &it->callback; 00174 it = callbacks_.erase(it); 00175 inc = false; 00176 } 00177 } 00178 } 00179 00180 UAbstractClient::UAbstractClient(const std::string& host, 00181 unsigned port, 00182 size_t buflen, 00183 bool server) 00184 : LockableOstream(new UClientStreambuf(this)) 00185 , closed_ (false) 00186 , listLock() 00187 , host_(host) 00188 , port_(port) 00189 , server_(server) 00190 , sendBufSize(buflen) 00191 , recvBufSize(buflen) 00192 , rc(0) 00193 00194 , recvBuffer(new char[buflen]) 00195 , recvBufferPosition(0) 00196 , sendBuffer(new char[buflen]) 00197 00198 , kernelMajor_(-1) 00199 , kernelMinor_(-1) 00200 , binaryBuffer(0) 00201 , parsePosition(0) 00202 , inString(false) 00203 , nBracket(0) 00204 , binaryMode(false) 00205 , system(false) 00206 , init_(true) 00207 , counter_(0) 00208 , stream_(this) 00209 { 00210 exceptions(std::ostream::eofbit | std::ostream::failbit | 00211 std::ostream::badbit); 00212 recvBuffer[0] = 0; 00213 sendBuffer[0] = 0; 00214 } 00215 00216 UAbstractClient::~UAbstractClient() 00217 { 00218 // No more default client if delete. 00219 if ((void*)getDefaultClient() == (void*)this) 00220 setDefaultClient(0); 00221 delete [] recvBuffer; 00222 delete [] sendBuffer; 00223 } 00224 00229 UAbstractClient::error_type 00230 UAbstractClient::startPack() 00231 { 00232 sendBufferLock.lock(); 00233 return 0; 00234 } 00235 00236 UAbstractClient::error_type 00237 UAbstractClient::endPack() 00238 { 00239 error_type res = effective_send(sendBuffer); 00240 sendBuffer[0] = 0; 00241 sendBufferLock.unlock(); 00242 return res; 00243 } 00244 00245 UAbstractClient::error_type 00246 UAbstractClient::send(const char* command, ...) 00247 { 00248 if (rc) 00249 return -1; 00250 va_list arg; 00251 va_start(arg, command); 00252 sendBufferLock.lock(); 00253 rc = vpack(command, arg); 00254 va_end(arg); 00255 if (rc < 0) 00256 { 00257 sendBufferLock.unlock(); 00258 return rc; 00259 } 00260 return rc = endPack(); 00261 } 00262 00263 UAbstractClient::error_type 00264 UAbstractClient::send(const std::string& s) 00265 { 00266 return send("%s", s.c_str()); 00267 } 00268 00269 UAbstractClient::error_type 00270 UAbstractClient::send(const UValue& v) 00271 { 00272 switch (v.type) 00273 { 00274 // Bounce to UValue operator << for those types. 00275 case DATA_DOUBLE: 00276 case DATA_SLOTNAME: 00277 case DATA_STRING: 00278 return send(string_cast(v)); 00279 break; 00280 00281 // Use our own sendBinary for binary, who knows how to talk to k1 and k2. 00282 case DATA_BINARY: 00283 if (v.binary->type != BINARY_NONE 00284 && v.binary->type != BINARY_UNKNOWN) 00285 v.binary->buildMessage(); 00286 return sendBinary(v.binary->common.data, v.binary->common.size, 00287 v.binary->message); 00288 break; 00289 00290 // Lists can contain binary, so recurse using this function. 00291 case DATA_LIST: 00292 send("["); 00293 foreach (const UValue* u, *v.list) 00294 send(libport::format("%s,", u)); 00295 return send("]"); 00296 break; 00297 00298 case DATA_DICTIONARY: 00299 send("["); 00300 if (v.dictionary->empty()) 00301 send("=>"); 00302 else 00303 foreach (const UDictionary::value_type& d, *v.dictionary) 00304 send(libport::format("\"%s\"=>%s,", 00305 libport::escape(d.first), d.second)); 00306 return send("]"); 00307 break; 00308 00309 case DATA_VOID: 00310 break; 00311 }; 00312 return 0; 00313 } 00314 00315 UAbstractClient::error_type 00316 UAbstractClient::send(std::istream& is) 00317 { 00318 if (rc) 00319 return -1; 00320 sendBufferLock.lock(); 00321 while (is.good() && !rc) 00322 { 00323 is.read(sendBuffer, sendBufSize); 00324 rc = effective_send(sendBuffer, is.gcount()); 00325 } 00326 sendBuffer[0] = 0; 00327 sendBufferLock.unlock(); 00328 return rc; 00329 } 00330 00331 00332 00337 UAbstractClient::error_type 00338 UAbstractClient::pack(const char* command, ...) 00339 { 00340 if (rc) 00341 return -1; 00342 va_list arg; 00343 va_start(arg, command); 00344 rc = vpack(command, arg); 00345 va_end(arg); 00346 return rc; 00347 } 00348 00349 00350 UAbstractClient::error_type 00351 UAbstractClient::vpack(const char* command, va_list arg) 00352 { 00353 if (rc) 00354 return -1; 00355 sendBufferLock.lock(); 00356 if (command) 00357 { 00358 // Don't print if we overflow the buffer. It would be nice to 00359 // rely on the behavior of the GNU LibC which accepts 0 as 00360 // destination buffer to query the space needed. But it is not 00361 // portable (e.g., segv on OS X). So rather, try to vsnprintf, 00362 // and upon failure, revert the buffer in its previous state. 00363 size_t slen = strlen(sendBuffer); 00364 size_t msize = sendBufSize - slen; 00365 int r = vsnprintf(sendBuffer + slen, msize, command, arg); 00366 // vsnprintf returns the number of characters to write. Check 00367 // that it fits. Don't forget the ending '\0' that it does not 00368 // count, but wants to add. 00369 if (r < 0 || static_cast<int>(msize) <= r) 00370 { 00371 // Don't produce partial input. 00372 sendBuffer[slen] = 0; 00373 rc = -1; 00374 } 00375 } 00376 sendBufferLock.unlock(); 00377 return rc; 00378 } 00379 00380 00381 UAbstractClient::error_type 00382 UAbstractClient::sendFile(const std::string& f) 00383 { 00384 if (f == "/dev/stdin") 00385 return send(std::cin); 00386 else 00387 { 00388 std::ifstream is(f.c_str(), std::ios::binary); 00389 if (is.fail()) 00390 return -1; 00391 else 00392 return send(is); 00393 } 00394 } 00395 00396 00397 UAbstractClient::error_type 00398 UAbstractClient::sendBin(const void* buffer, size_t len) 00399 { 00400 return sendBin(buffer, len, 0); 00401 } 00402 00403 00404 UAbstractClient::error_type 00405 UAbstractClient::sendBin(const void* buffer, size_t len, 00406 const char* header, ...) 00407 { 00408 if (rc) 00409 return -1; 00410 sendBufferLock.lock(); 00411 if (header) 00412 { 00413 va_list arg; 00414 va_start(arg, header); 00415 vpack(header, arg); 00416 va_end(arg); 00417 effective_send(sendBuffer); 00418 } 00419 00420 error_type res = effective_send(buffer, len); 00421 sendBuffer[0] = 0; 00422 sendBufferLock.unlock(); 00423 return res; 00424 } 00425 00426 UAbstractClient::error_type 00427 UAbstractClient::sendBinary(const void* data, size_t len, 00428 const std::string& header) 00429 { 00430 if (kernelMajor() < 2) 00431 return sendBin(data, len, "BIN %lu %s;", 00432 static_cast<unsigned long>(len), header.c_str()); 00433 else 00434 { 00435 sendBufferLock.lock(); 00436 *this << libport::format("Global.Binary.new(\"%s\", \"\\B(%s)(", 00437 libport::escape(header), len); 00438 flush(); 00439 effective_send(data, len); 00440 *this << ")\")"; 00441 sendBufferLock.unlock(); 00442 return rc; 00443 } 00444 } 00445 00446 struct sendSoundData 00447 { 00448 char* buffer; 00449 int bytespersec; 00450 size_t length; 00451 size_t pos; 00452 char* device; 00453 char* tag; 00454 char formatString[50]; 00455 USoundFormat format; 00456 UAbstractClient* uc; 00457 bool startNotify; 00458 }; 00459 00460 static UCallbackAction sendSound_(void* cb, const UMessage &msg) 00461 { 00462 //the idea is to cut the sound into small chunks, 00463 //add a header and send each chunk separately 00464 00465 //create the header. 00466 static const size_t CHUNK_SIZE = 32 * 8*60; 00467 sendSoundData* s = (sendSoundData*)cb; 00468 //handle next chunk 00469 if (s->format == SOUND_WAV && s->pos==0) 00470 s->pos = sizeof (wavheader); 00471 size_t tosend = std::min(CHUNK_SIZE, s->length - s->pos); 00472 00473 //int playlength = tosend *1000 / s->bytespersec; 00474 std::string header = ((s->format == SOUND_WAV) ? "wav " : "raw ") 00475 + (std::string)s->formatString; 00476 00477 s->uc->send("%s.val = Global.Binary.new(\"%s\", \"\\B(%lu)(", 00478 s->device, 00479 header.c_str(), 00480 static_cast<unsigned long> 00481 (tosend + ((s->format == SOUND_WAV) ? sizeof (wavheader) : 0)) 00482 ); 00483 00484 if (s->format == SOUND_WAV) 00485 { 00486 wavheader wh; 00487 memcpy(&wh, s->buffer, sizeof wh); 00488 wh.datalength=tosend; 00489 wh.length=tosend+44-8; 00490 s->uc->sendBin(&wh, sizeof wh); 00491 } 00492 s->uc->sendBin(s->buffer+s->pos, tosend); 00494 s->uc->send(")\")|;waituntil(%s.remain < 1000);\n" 00495 " %s << 1;\n", s->device, msg.tag.c_str()); 00496 s->pos += tosend; 00497 if (s->pos >= s->length) 00498 { 00499 const char* dev = s->device ? s->device : "speaker"; 00500 s->uc->send("%s.val->blend = %s.sendsoundsaveblend;", dev, dev); 00501 00502 if (s->tag && s->tag[0]) 00503 s->uc->send("Channel.new(\"%s\") << 1;\n", s->tag); 00504 delete[] s->buffer; 00505 free(s->tag); 00506 free(s->device); 00507 delete s; 00508 return URBI_REMOVE; 00509 } 00510 return URBI_CONTINUE; 00511 } 00512 00519 UAbstractClient::error_type 00520 UAbstractClient::sendSound(const char* device, const USound& sound, 00521 const char* tag) 00522 { 00523 switch (sound.soundFormat) 00524 { 00525 case SOUND_MP3: 00526 case SOUND_OGG: 00527 // We don't handle chunking for these formats. 00528 return sendBin(sound.data, sound.size, 00529 "%s +report: %s.val = BIN %lu %s;", 00530 tag, device, static_cast<unsigned long>(sound.size), 00531 sound.soundFormat == SOUND_MP3 ? "mp3" : "ogg"); 00532 break; 00533 00534 case SOUND_WAV: 00535 case SOUND_RAW: 00536 { 00537 const char* dev = device ? device : "speaker"; 00538 send("%s.removeSlot(\"sendsoundsaveblend\") |" 00539 "var %s.sendsoundsaveblend = %s.val->blend;" 00540 "%s.val->blend=\"queue\";", 00541 dev, dev, dev, dev); 00542 sendSoundData* s = new sendSoundData(); 00543 s->bytespersec = sound.channels * sound.rate * (sound.sampleSize / 8); 00544 s->uc = this; 00545 s->buffer = new char[sound.size]; 00546 memcpy(s->buffer, sound.data, sound.size); 00547 s->length = sound.size; 00548 s->tag = tag ? strdup(tag) : 0; 00549 s->device = strdup(device); 00550 s->pos = 0; 00551 s->format = sound.soundFormat; 00552 if (sound.soundFormat == SOUND_RAW) 00553 sprintf(s->formatString, "%zd %zd %zd %d", 00554 sound.channels, sound.rate, sound.sampleSize, 00555 sound.sampleFormat); 00556 else 00557 s->formatString[0] = 0; 00558 s->startNotify = false; 00559 std::string utag = fresh(); 00560 (*this) << "var " + utag +" = Channel.new(\"" << utag << "\");"; 00561 UCallbackID cid = setCallback(sendSound_, s, utag.c_str()); 00562 // Invoke it 2 times to queue sound. 00563 if (sendSound_(s, UMessage(*this, 0, utag, "*** stop", 00564 binaries_type())) 00565 == URBI_CONTINUE) 00566 { 00567 if (sendSound_(s, UMessage(*this, 0, utag, "*** stop", 00568 binaries_type())) 00569 == URBI_REMOVE) 00570 deleteCallback(cid); 00571 } 00572 else 00573 deleteCallback(cid); 00574 return 0; 00575 } 00576 00577 default: 00578 // Unrecognized format. 00579 return 1; 00580 } 00581 } 00582 00583 UCallbackID 00584 UAbstractClient::setCallback(UCallback cb, const char* tag) 00585 { 00586 return addCallback(tag, *new UCallbackWrapperCB(cb)); 00587 } 00588 00589 UCallbackID 00590 UAbstractClient::setCallback(UCustomCallback cb, 00591 void* cbData, 00592 const char* tag) 00593 { 00594 return addCallback(tag, *new UCallbackWrapperCCB(cb, cbData)); 00595 } 00596 00597 00598 int 00599 UAbstractClient::getAssociatedTag(UCallbackID id, char* tag) 00600 { 00601 listLock.lock(); 00602 callbacks_type::iterator it = 00603 std::find(callbacks_.begin(), callbacks_.end(), id); 00604 if (it == callbacks_.end()) 00605 { 00606 listLock.unlock(); 00607 return 0; 00608 } 00609 strcpy(tag, it->tag); 00610 listLock.unlock(); 00611 return 1; 00612 } 00613 00614 00615 int 00616 UAbstractClient::deleteCallback(UCallbackID id) 00617 { 00618 listLock.lock(); 00619 callbacks_type::iterator it = 00620 std::find(callbacks_.begin(), callbacks_.end(), id); 00621 if (it == callbacks_.end()) 00622 { 00623 listLock.unlock(); 00624 return 0; 00625 } 00626 delete &(it->callback); 00627 callbacks_.erase(it); 00628 listLock.unlock(); 00629 return 1; 00630 } 00631 00632 UCallbackID 00633 UAbstractClient::sendCommand(UCallback cb, const char* cmd, ...) 00634 { 00635 std::string tag = fresh(); 00636 std::string mcmd = tag + " << " + cmd; 00637 UCallbackID res = setCallback(cb, tag.c_str()); 00638 sendBufferLock.lock(); 00639 va_list arg; 00640 va_start(arg, cmd); 00641 vpack(mcmd.c_str(), arg); 00642 va_end(arg); 00643 if (endPack()) 00644 { 00645 deleteCallback(res); 00646 return UINVALIDCALLBACKID; 00647 } 00648 return res; 00649 } 00650 00651 UCallbackID 00652 UAbstractClient::sendCommand(UCustomCallback cb, void *cbData, 00653 const char* cmd, ...) 00654 { 00655 std::string tag = fresh(); 00656 std::string mcmd = tag + " << " + cmd; 00657 UCallbackID res = setCallback(cb, cbData, tag.c_str()); 00658 sendBufferLock.lock(); 00659 va_list arg; 00660 va_start(arg, cmd); 00661 vpack(mcmd.c_str(), arg); 00662 va_end(arg); 00663 if (endPack()) 00664 { 00665 deleteCallback(res); 00666 return UINVALIDCALLBACKID; 00667 } 00668 return res; 00669 } 00670 00671 UAbstractClient::error_type 00672 UAbstractClient::putFile(const char* localName, const char* remoteName) 00673 { 00674 sendBufferLock.lock(); 00675 if (!remoteName) 00676 remoteName = localName; 00677 send("save(\"%s\", \"", remoteName); 00678 error_type res = sendFile(localName); 00679 send("\");"); 00680 sendBufferLock.unlock(); 00681 return res; 00682 } 00683 00684 UAbstractClient::error_type 00685 UAbstractClient::putFile(const void* buffer, size_t length, 00686 const char* remoteName) 00687 { 00688 send("save(\"%s\", \"", remoteName); 00689 sendBin(buffer, length); 00690 send("\");"); 00691 sendBufferLock.unlock(); 00692 return 0; 00693 } 00694 00695 std::string 00696 UAbstractClient::fresh() 00697 { 00698 static boost::format fmt("URBI_%s"); 00699 return str(fmt % ++counter_); 00700 } 00701 00702 void 00703 UAbstractClient::makeUniqueTag(char* tag) 00704 { 00705 strcpy(tag, fresh().c_str()); 00706 } 00707 00708 bool 00709 UAbstractClient::process_recv_buffer_binary_() 00710 { 00711 //Receiving binary. Append to binaryBuffer; 00712 size_t len = 00713 std::min(recvBufferPosition - endOfHeaderPosition, 00714 binaryBufferLength - binaryBufferPosition); 00715 if (binaryBuffer) 00716 memcpy (static_cast<char*> (binaryBuffer) + binaryBufferPosition, 00717 recvBuffer + endOfHeaderPosition, len); 00718 binaryBufferPosition += len; 00719 00720 if (binaryBufferPosition == binaryBufferLength) 00721 { 00722 //Finished receiving binary. 00723 //append 00724 BinaryData bd; 00725 bd.size = binaryBufferLength; 00726 bd.data = binaryBuffer; 00727 bins << bd; 00728 binaryBuffer = 0; 00729 00730 if (nBracket == 0) 00731 { 00732 //end of command, send 00733 //dumb listLock.lock(); 00734 UMessage msg(*this, currentTimestamp, currentTag, currentCommand, 00735 bins); 00736 notifyCallbacks(msg); 00737 //unlistLock.lock(); 00738 00739 bins_clear(); 00740 00741 //flush 00742 parsePosition = 0; 00743 //Move the extra we received 00744 recvBufferPosition -= len + endOfHeaderPosition; 00745 memmove(recvBuffer, 00746 recvBuffer + endOfHeaderPosition + len, 00747 recvBufferPosition); 00748 } 00749 else 00750 { 00751 // not over yet 00752 //leave parseposition where it is 00753 //move the extra (parsePosition = endOfHeaderPosition) 00754 recvBufferPosition -= len; 00755 memmove(recvBuffer + parsePosition, 00756 recvBuffer + endOfHeaderPosition + len, 00757 recvBufferPosition - endOfHeaderPosition); 00758 } 00759 binaryBuffer = 0; 00760 binaryMode = false; 00761 00762 // Reenter loop. 00763 return true; 00764 } 00765 else 00766 { 00767 // Not finished receiving binary. 00768 recvBufferPosition = endOfHeaderPosition; 00769 return false; 00770 } 00771 } 00772 00773 bool 00774 UAbstractClient::process_recv_buffer_text_() 00775 { 00776 // Not in binary mode. 00777 char* endline = 00778 static_cast<char*> (memchr(recvBuffer+parsePosition, '\n', 00779 recvBufferPosition - parsePosition)); 00780 if (!endline) 00781 return false; //no new end of command/start of binary: wait 00782 00783 if (parsePosition == 0) // parse header 00784 { 00785 // Ignore empty lines. 00786 if (endline == recvBuffer) 00787 { 00788 memmove(recvBuffer, recvBuffer+1, recvBufferPosition - 1); 00789 recvBufferPosition--; 00790 return true; 00791 } 00792 00793 if (2 != sscanf(recvBuffer, "[%d:%64[A-Za-z0-9_.]]", 00794 ¤tTimestamp, currentTag)) 00795 { 00796 if (1 == sscanf(recvBuffer, "[%d]", ¤tTimestamp)) 00797 currentTag[0] = 0; 00798 else 00799 { 00800 // failure 00801 GD_FERROR("read, error parsing header: '%s'", recvBuffer); 00802 currentTimestamp = 0; 00803 strcpy(currentTag, "UNKNWN"); 00804 //listLock.lock(); 00805 UMessage msg(*this, 0, tag_error, 00806 "!!! UAbstractClient::read, fatal error parsing header", 00807 binaries_type()); 00808 notifyCallbacks(msg); 00809 //unlistLock.lock(); 00810 } 00811 } 00812 00813 currentCommand = strstr(recvBuffer, "]"); 00814 if (!currentCommand) 00815 { 00816 //reset all 00817 nBracket = 0; 00818 inString = false; 00819 parsePosition = 0; 00820 recvBufferPosition = 0; 00821 return false; 00822 } 00823 00824 ++currentCommand; 00825 while (*currentCommand == ' ') 00826 ++currentCommand; 00827 system = (*currentCommand == '!' || *currentCommand == '*'); 00828 parsePosition = (long) currentCommand - (long) recvBuffer; 00829 00830 //reinit just to be sure: 00831 nBracket = 0; 00832 inString = false; 00833 } 00834 00835 for (/* nothing */; parsePosition < recvBufferPosition; ++parsePosition) 00836 { 00837 if (inString) 00838 switch (recvBuffer[parsePosition]) 00839 { 00840 case '\\': 00841 if (parsePosition == recvBufferPosition-1) 00842 //we cant handle the '\\' 00843 return false; 00844 ++parsePosition; //ignore next character 00845 continue; 00846 case '"': 00847 inString = false; 00848 continue; 00849 } 00850 else 00851 { 00852 switch (recvBuffer[parsePosition]) 00853 { 00854 case '"': 00855 inString = true; 00856 continue; 00857 case '[': 00858 ++nBracket; 00859 continue; 00860 case ']': 00861 --nBracket; 00862 continue; 00863 case '\n': 00864 // FIXME: handle '[' in echoed messages or errors nBracket == 0. 00865 // 00866 // end of command 00867 recvBuffer[parsePosition] = 0; 00868 //listLock.lock(); 00869 UMessage msg(*this, currentTimestamp, currentTag, 00870 currentCommand, 00871 bins); 00872 notifyCallbacks(msg); 00873 //unlistLock.lock(); 00874 //prepare for next read, copy the extra 00875 memmove(recvBuffer, recvBuffer + parsePosition + 1, 00876 recvBufferPosition - parsePosition - 1); 00877 // copy beginning of next cmd 00878 recvBufferPosition = recvBufferPosition - parsePosition - 1; 00879 recvBuffer[recvBufferPosition] = 0; 00880 parsePosition = 0; 00881 bins_clear(); 00882 goto line_finished; //restart 00883 } 00884 00885 if (!system && !strncmp(recvBuffer+parsePosition-3, "BIN ", 4)) 00886 { 00887 //very important: scan starts below current point 00888 //compute length 00889 char* endLength; 00890 binaryBufferLength = 00891 strtol(recvBuffer+parsePosition+1, &endLength, 0); 00892 if (endLength == recvBuffer+parsePosition+1) 00893 { 00894 GD_ERROR("read, error parsing bin data length."); 00895 recvBufferPosition = 0; 00896 return false; 00897 } 00898 //go to end of header 00899 while (recvBuffer[parsePosition] !='\n') 00900 ++parsePosition; //we now we will find a \n 00901 ++parsePosition; 00902 endOfHeaderPosition = parsePosition; 00903 binaryMode = true; 00904 binaryBuffer = malloc(binaryBufferLength); 00905 binaryBufferPosition = 0; 00906 break; //restart in binarymode to handle binary 00907 } 00908 } 00909 } 00910 line_finished: 00911 // Either we ate all characters, or we were asked to restart. 00912 return parsePosition != recvBufferPosition; 00913 } 00914 00919 void 00920 UAbstractClient::processRecvBuffer() 00921 { 00922 while (binaryMode 00923 ? process_recv_buffer_binary_() 00924 : process_recv_buffer_text_()) 00925 continue; 00926 } 00927 00928 UCallbackID 00929 UAbstractClient::setWildcardCallback(UCallbackWrapper& callback) 00930 { 00931 return addCallback(tag_wildcard, callback); 00932 } 00933 00934 UCallbackID 00935 UAbstractClient::setErrorCallback(UCallbackWrapper& callback) 00936 { 00937 return addCallback(tag_error, callback); 00938 } 00939 00940 UCallbackID 00941 UAbstractClient::setClientErrorCallback(UCallbackWrapper& callback) 00942 { 00943 return addCallback(CLIENTERROR_TAG, callback); 00944 } 00945 00946 UCallbackID 00947 UAbstractClient::setCallback(UCallbackWrapper& callback, 00948 const char* tag) 00949 { 00950 return addCallback(tag, callback); 00951 } 00952 00953 UCallbackID 00954 UAbstractClient::addCallback(const char* tag, 00955 UCallbackWrapper& w) 00956 { 00957 listLock.lock(); 00958 UCallbackInfo ci(w); 00959 strncpy(ci.tag, tag, URBI_MAX_TAG_LENGTH-1); 00960 ci.tag[URBI_MAX_TAG_LENGTH-1]=0; 00961 ci.id = ++nextId; 00962 callbacks_.push_front(ci); 00963 listLock.unlock(); 00964 return ci.id; 00965 } 00966 00967 void 00968 UAbstractClient::clientError(std::string message, int erc) 00969 { 00970 // Like in UMessage's constructor, skip the possible "!!! " 00971 // prefix. 00972 const char prefix[] = "!!! "; 00973 if (message.substr(0, sizeof prefix - 1) == prefix) 00974 message.erase(0, sizeof prefix - 1); 00975 00976 if (erc) 00977 { 00978 message += message.empty() ? "" : ": "; 00979 message += libport::strerror(erc); 00980 } 00981 00982 UMessage m(*this); 00983 m.type = MESSAGE_ERROR; 00984 // rawMessage is incorrect but we don't care. 00985 m.message = m.rawMessage = message; 00986 m.timestamp = 0; 00987 m.tag = CLIENTERROR_TAG; 00988 notifyCallbacks(m); 00989 } 00990 00991 void 00992 UAbstractClient::clientError(const char* message, int erc) 00993 { 00994 return clientError(std::string(message ? message : ""), erc); 00995 } 00996 00997 void 00998 UAbstractClient::onConnection() 00999 { 01000 # define VERSION_TAG TAG_PRIVATE_PREFIX "__version" 01001 setCallback(*this, &UAbstractClient::setVersion, VERSION_TAG); 01002 // We don't know our kernel version yet. 01003 send(SYNCLINE_WRAP( 01004 "{\n" 01005 " var __ver__ = 2;\n" 01006 " {var __ver__ = 1};\n" 01007 " var " VERSION_TAG ";\n" 01008 " if (__ver__ == 1)\n" 01009 " " VERSION_TAG " << system.version\n" 01010 " else\n" 01011 " {\n" 01012 " " VERSION_TAG " = Channel.new(\"" VERSION_TAG "\");\n" 01013 " " VERSION_TAG " << System.version;\n" 01014 " };\n" 01015 "};\n")); 01016 # undef VERSION_TAG 01017 } 01018 01019 UCallbackAction 01020 UAbstractClient::setConnectionID(const UMessage& msg) 01021 { 01022 GD_FINFO_TRACE("setConnectionId for client %p", this); 01023 if (msg.type == MESSAGE_DATA && msg.value) 01024 { 01025 std::string id(*msg.value); 01026 if (!id.empty()) 01027 { 01028 libport::BlockLock bl(sendBufferLock); 01029 connectionID_ = id; 01030 return URBI_REMOVE; 01031 } 01032 } 01033 return URBI_CONTINUE; 01034 } 01035 01036 UCallbackAction 01037 UAbstractClient::setVersion(const UMessage& msg) 01038 { 01039 GD_FINFO_TRACE("setVersion for client %p", this); 01040 libport::BlockLock bl(sendBufferLock); 01041 if (msg.type != MESSAGE_DATA) 01042 return URBI_CONTINUE; 01043 aver_eq(msg.value->type, DATA_STRING); 01044 kernelVersion_ = *msg.value->stringValue; 01045 size_t sep = kernelVersion_.find_first_of('.'); 01046 try 01047 { 01048 kernelMajor_ = boost::lexical_cast<int>(kernelVersion_.substr(0, sep)); 01049 size_t sep2 = kernelVersion_.find_first_of('.', sep+1); 01050 if (sep2 != kernelVersion_.npos) 01051 kernelMinor_ = 01052 boost::lexical_cast<int>(kernelVersion_.substr(sep+1, 01053 sep2-sep-1)); 01054 else 01055 kernelMinor_ = 0; 01056 } 01057 catch (boost::bad_lexical_cast&) 01058 { 01059 kernelMajor_ = 2; 01060 kernelMinor_ = 0; 01061 GD_FWARN("failed to parse kernel version string: '%s', assuming %s.%s.", 01062 kernelVersion_, 01063 kernelMajor_, 01064 kernelMinor_); 01065 } 01066 // Set the kernel version of our associated stream. 01067 ::urbi::kernelMajor(*this) = kernelMajor_; 01068 01069 // Have the connectionId sent on __ident. 01070 # define IDENT_TAG TAG_PRIVATE_PREFIX "__ident" 01071 setCallback(*this, &UAbstractClient::setConnectionID, IDENT_TAG); 01072 if (kernelMajor_ < 2) 01073 send(IDENT_TAG " << local.connectionID;\n"); 01074 else 01075 send(SYNCLINE_WRAP("Channel.new(\"" IDENT_TAG "\")" 01076 " << connectionTag.name;\n")); 01077 return URBI_REMOVE; 01078 # undef IDENT_TAG 01079 } 01080 01081 int 01082 UAbstractClient::getCurrentTimestamp() const 01083 { 01084 return currentTimestamp; 01085 } 01086 01087 const std::string& 01088 UAbstractClient::connectionID() const 01089 { 01090 libport::BlockLock bl(sendBufferLock); 01091 return connectionID_; 01092 } 01093 01094 std::string 01095 getClientConnectionID(const UAbstractClient* cli) 01096 { 01097 if (!cli) 01098 return ""; 01099 return cli->connectionID(); 01100 } 01101 01102 01103 /*-----------------. 01104 | Default client. | 01105 `-----------------*/ 01106 01107 UClient* defaultClient = 0; 01108 01109 UClient* getDefaultClient() 01110 { 01111 return defaultClient; 01112 } 01113 01114 UClient& get_default_client() 01115 { 01116 return *getDefaultClient(); 01117 } 01118 01119 void setDefaultClient(UClient* cl) 01120 { 01121 defaultClient = cl; 01122 } 01123 01124 01125 std::ostream& 01126 unarmorAndSend(const char* a, UAbstractClient* where) 01127 { 01128 aver(a); 01129 aver(where); 01130 std::ostream& s = *where; 01131 if (strlen(a)>2) 01132 { 01133 if (a[0]=='(' && a[strlen(a)-1]==')') 01134 s.rdbuf()->sputn(a+1, strlen(a)-2); 01135 else 01136 s << a; //this is baaad, user forgot the parenthesis but was lucky 01137 } 01138 return s; 01139 } 01140 01141 } // namespace urbi