|
Urbi SDK Remote for C++
2.7.5
|
00001 /* 00002 * Copyright (C) 2010-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00011 // Ortp needs a little help to know it's compiling on WIN32. 00012 00013 // ORTP defines its own version of int32_t and all, so disable ours. 00014 #define LIBPORT_NO_CSTDINT_TYPES 00015 00016 #include <libport/config.h> 00017 #include <libport/detect-win32.h> 00018 00019 #include <libport/lexical-cast.hh> 00020 #include <libport/statistics.hh> 00021 #include <libport/sys/socket.h> 00022 #include <libport/system-warning-push.hh> 00023 #include <ortp/ortp.h> 00024 #include <libport/system-warning-pop.hh> 00025 00026 #include <boost/unordered_map.hpp> 00027 00028 #include <libport/asio.hh> 00029 #include <libport/debug.hh> 00030 00031 #include <serialize/serialize.hh> 00032 00033 #include <urbi/socket.hh> 00034 #include <urbi/uclient.hh> 00035 #include <urbi/uexternal.hh> 00036 #include <urbi/uobject.hh> 00037 00038 #include <urbi/uvalue-serialize.hh> 00039 00040 #include <urbi/customuvar.hh> 00041 00042 using namespace urbi; 00043 00044 GD_CATEGORY(URTP); 00045 extern "C" 00046 { 00047 // We are using our own network backend, which requires using a bit of the 00048 // internal ortp API. 00049 void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp, 00050 uint32_t local_str_ts, 00051 struct sockaddr *addr, socklen_t addrlen); 00052 void rtp_session_rtcp_parse(RtpSession *session, mblk_t *mp); 00053 } 00054 00055 class URTPLink; 00056 00063 class URTP: public UObject, public UObjectSocket 00064 { 00065 public: 00066 URTP(const std::string& n); 00067 ~URTP(); 00068 void init(); 00069 00073 void sendVar(UVar& v); 00075 void receiveVar(UVar& v); 00077 00079 int listen(const std::string& host, const std::string& port); 00081 void connect(const std::string& host, const std::string& port); 00083 UDictionary stats(); 00085 void reset(); 00087 void send(const UValue&); 00089 void sendUrbiscript(const std::string& code); 00090 00092 00094 00098 void groupedSendVar(UVar& v); 00100 void unGroupedSendVar(UVar& v); 00102 void sendGrouped(const std::string& name, const UValue& v, 00103 libport::utime_t timestamp); 00104 00108 CustomUVar<ufloat> commitDelay; 00109 00111 CustomUVar<std::string> commitTriggerVarName; 00112 00114 00115 UEvent onConnectEvent, onErrorEvent; 00117 UVar forceType; 00119 UVar mediaType; 00121 UVar raw; 00123 UVar jitter, jitterAdaptive, jitterTime; 00125 UVar sourceContext; 00130 UVar forceHeader; 00132 enum MediaTypes { 00133 RAW_BINARY = 100, //< Binary without keywords 00134 BINARY = 101, //< Keywords then binary 00135 URBISCRIPT = 102, //< urbiscript to execute 00137 VALUES = 103, 00138 SER_VALUES = 104, //< Serialized stream of name value timestamp... 00139 }; 00141 void setHeaderTarget(UVar& v); 00143 UVar async; 00145 UVar syncSendSocket; 00147 UVar rawUDP; 00149 std::vector<std::string> groupContent(); 00150 virtual void onError(boost::system::error_code erc); 00151 virtual void onConnect(); 00152 virtual size_t onRead(const void*, size_t); 00153 void readFrom(const void*, size_t, boost::shared_ptr<libport::UDPLink>); 00154 void onChange(UVar& v); 00155 void onGroupedChange(UVar& v); 00156 void onTypeChange(UVar& v); 00157 void onJitterChange(UVar&); 00158 void onLogLevelChange(UVar& v); 00159 void commitGroup(); 00160 void send_(const UValue&); 00161 void localWrite(const std::string& name, const UValue& val, 00162 libport::utime_t timestamp = 0); 00163 URTPLink* makeSocket(); 00164 // Ctor code, size matters 00165 void born(); 00166 // Dtor code, size matters 00167 void die(); 00168 RtpSession* session; 00169 size_t read_ts; 00170 size_t write_ts; 00171 UVar* writeTo; 00172 UVar* headerTarget; 00173 // Name of the uvar to deliver to locally. 00174 UVar localDeliver; 00175 UVar logLevel; 00176 libport::Lockable lock; // group protection 00177 libport::serialize::BinaryOSerializer* groupOArchiver; 00178 std::ostringstream sOArchiver; 00179 libport::serialize::BinaryISerializer* groupIArchiver; 00180 std::istringstream sIArchiver; 00181 // True if group is empty. 00182 bool groupEmpty_; 00183 typedef boost::unordered_map<std::string, UVar*> GroupedVars; 00184 GroupedVars groupedVars; 00185 friend class URTPLink; 00186 libport::Statistics<libport::utime_t> sendTime; 00187 bool sendMode_; // will this socket be used for synchronous sending. 00188 std::vector<unsigned short> localPorts; 00189 }; 00190 00191 class URTPLink: public libport::Socket 00192 { 00193 public: 00194 URTPLink(URTP* owner); 00195 virtual size_t onRead(const void*, size_t); 00196 URTP* owner_; 00197 }; 00198 00199 URTPLink::URTPLink(URTP* owner) 00200 : libport::Socket(owner->get_io_service()) 00201 , owner_(owner) 00202 { 00203 } 00204 00205 size_t URTPLink::onRead(const void* data, size_t size) 00206 { 00207 return owner_->onRead(data, size); 00208 } 00209 00210 static 00211 std::string 00212 rtp_id() 00213 { 00214 return libport::format("URTP_%s_%s", getFilteredHostname(), 00215 // Under uclibc, each thread has a different pid. 00216 #ifdef __UCLIBC__ 00217 "default" 00218 #else 00219 getpid() 00220 #endif 00221 ); 00222 } 00223 00224 ::urbi::URBIStarter<URTP> 00225 starter_URTP(urbi::isPluginMode() ? "URTP" : rtp_id()); 00226 00227 static void bounceSend(UObject*o, const UValue& v) 00228 { 00229 URTP* r = reinterpret_cast<URTP*>(o); 00230 r->send(v); 00231 } 00232 00233 static void bounceSendGrouped(UObject*o, const std::string& n, const UValue& v, 00234 libport::utime_t ts) 00235 { 00236 URTP* r = reinterpret_cast<URTP*>(o); 00237 r->sendGrouped(n, v, ts); 00238 } 00239 00240 URTP::URTP(const std::string& n) 00241 : UObject(n) 00242 , UObjectSocket(getCurrentContext()->getIoService()) 00243 , session(0) 00244 , read_ts(1) 00245 , write_ts(1) 00246 , writeTo(0) 00247 , headerTarget(0) 00248 , groupOArchiver(0) 00249 , groupIArchiver(0) 00250 , groupEmpty_(true) 00251 , sendMode_(false) // we don't know yet 00252 { 00253 born(); 00254 } 00255 00256 void URTP::born() 00257 { 00258 // Register us to contextimpl hooks 00259 ctx_->rtpSendGrouped = &bounceSendGrouped; 00260 ctx_->rtpSend = &bounceSend; 00261 GD_FINFO_DUMP("URTP::URTP on %s", this); 00262 UBindFunction(URTP, init); 00263 static bool ortpInit = false; 00264 if (!ortpInit) 00265 { 00266 ortp_init(); 00267 ortp_scheduler_init(); 00268 ortp_set_log_level_mask(/*ORTP_DEBUG|ORTP_MESSAGE|*/ORTP_WARNING|ORTP_ERROR); 00269 ortpInit = true; 00270 UObject::send("var _rtp_object_name = \"" + __name + "\"|"); 00271 } 00272 UBindVar(URTP, logLevel); 00273 UNotifyChange(logLevel, &URTP::onLogLevelChange); 00274 session=rtp_session_new(RTP_SESSION_SENDRECV); 00275 rtp_session_set_scheduling_mode(session,0); 00276 // Should we block for sending? 00277 rtp_session_set_blocking_mode(session,0); 00278 rtp_session_set_symmetric_rtp(session,TRUE); 00279 rtp_session_enable_adaptive_jitter_compensation(session, FALSE); 00280 rtp_session_set_jitter_compensation(session, 500); 00281 rtp_session_set_payload_type(session,0); 00282 } 00283 00284 URTP::~URTP() 00285 { 00286 die(); 00287 } 00288 00289 void URTP::die() 00290 { 00291 GD_FINFO_DUMP("URTP::~URTP on %s", this); 00292 close(); 00293 libport::BlockLock bl(lock); 00294 { 00295 foreach(GroupedVars::value_type& v, groupedVars) 00296 delete v.second; 00297 groupedVars.clear(); 00298 } 00299 foreach(unsigned short lp, localPorts) 00300 closeUDP(lp); 00301 delete writeTo; 00302 delete headerTarget; 00303 delete groupOArchiver; 00304 delete groupIArchiver; 00305 } 00306 00307 URTPLink* URTP::makeSocket() 00308 { 00309 return new URTPLink(this); 00310 } 00311 00312 void URTP::init() 00313 { 00314 UBindEventRename(URTP, onConnectEvent, "onConnect"); 00315 UBindEventRename(URTP, onErrorEvent, "onError"); 00316 UBindVars(URTP, forceType, mediaType, jitter, jitterAdaptive, jitterTime, 00317 localDeliver, forceHeader, raw); 00318 UBindVars(URTP, sourceContext, async, syncSendSocket, rawUDP); 00319 UBindCacheVar(URTP, commitDelay, ufloat); 00320 UBindCacheVar(URTP, commitTriggerVarName, std::string); 00321 async = 0; 00322 syncSendSocket = 0; 00323 rawUDP = 0; 00324 localDeliver = ""; 00325 jitter = 1; 00326 jitterAdaptive = 1; 00327 jitterTime = 500; 00328 raw = 0; 00329 commitDelay = 0.0001; 00330 commitTriggerVarName = ""; 00331 forceType = 0; 00332 forceHeader = ""; 00333 UBindFunctions(libport::Socket, getLocalPort, getRemotePort, 00334 getLocalHost, getRemoteHost, isConnected, close); 00335 UBindFunctions(URTP, sendVar, receiveVar, listen, connect, stats, reset, 00336 send, setHeaderTarget, sendUrbiscript); 00337 UBindFunctions(URTP, groupedSendVar, unGroupedSendVar, sendGrouped, 00338 groupContent); 00339 UNotifyChange(mediaType, &URTP::onTypeChange); 00340 mediaType = 96; 00341 // Not cool, but otherwise this line gets executed asynchronously in 00342 // remote mode which is not early enough. 00343 rtp_session_set_payload_type(session, 96); 00344 UNotifyChange(jitter, &URTP::onJitterChange); 00345 UNotifyChange(jitterAdaptive, &URTP::onJitterChange); 00346 UNotifyChange(jitterTime, &URTP::onJitterChange); 00347 jitter = 0; 00348 } 00349 00350 int URTP::listen(const std::string& host, const std::string& port) 00351 { 00352 GD_SINFO_DUMP("Listening on " << host <<":" << port); 00353 boost::system::error_code erc; 00354 unsigned short res = 00355 Socket::listenUDP(host, port, 00356 boost::bind(&URTP::readFrom, this, _1, _2, _3), 00357 erc, get_io_service()); 00358 if (erc) 00359 throw std::runtime_error(erc.message()); 00360 localPorts.push_back(res); 00361 return res; 00362 } 00363 00364 void URTP::readFrom(const void* data, size_t size, 00365 boost::shared_ptr<libport::UDPLink>) 00366 { 00367 onRead(data, size); 00368 } 00369 00370 void URTP::connect(const std::string& host, const std::string& port) 00371 { 00372 boost::system::error_code erc = Socket::connect(host, port, true); 00373 if (erc) 00374 throw std::runtime_error(erc.message()); 00375 #define RTP_SOCKET_CONNECTED (1 << 8) 00376 session->flags |= RTP_SOCKET_CONNECTED; 00377 } 00378 00379 void URTP::onError(boost::system::error_code erc) 00380 { 00381 onErrorEvent.emit(erc.message()); 00382 } 00383 00384 void URTP::onConnect() 00385 { 00386 onConnectEvent.emit(); 00387 } 00388 00389 void URTP::onChange(UVar& v) 00390 { 00391 send(v.val()); 00392 } 00393 00394 template<typename T> 00395 void 00396 transmitRemoteWrite(const std::string& name, const T& val, 00397 libport::utime_t timestamp) 00398 { 00399 UMessage m(*getDefaultClient()); 00400 m.tag = externalModuleTag; 00401 m.type = MESSAGE_DATA; 00402 m.value = new UValue(UList()); 00403 UList& l = *m.value->list; 00404 l.push_back(UEM_ASSIGNVALUE); 00405 l.push_back(name); 00406 l.push_back(val); 00407 l.push_back(timestamp); 00408 getDefaultClient()->notifyCallbacks(m); 00409 } 00410 00411 size_t URTP::onRead(const void* data, size_t sz) 00412 { 00413 GD_SINFO_DUMP(this << " packet of size " << sz); 00414 00415 unsigned char *payload; 00416 int payload_size; 00417 int type; 00418 mblk_t* res = 0; 00419 if (rawUDP) 00420 { 00421 payload = (unsigned char*)data; 00422 payload_size = sz; 00423 type = mediaType; 00424 if (forceType) 00425 type = forceType; 00426 } 00427 else 00428 { 00429 /* Normal operation of ortp is to call rtp_session_recvm_with_ts which will 00430 * read its socket, get a packet and pass it to rtp_session_rtp_parse. 00431 * But here we handle the socket ourselve. Fortunately the above sequence 00432 * runs fine even if the socket is -1. 00433 */ 00434 mblk_t* mp = allocb(sz, 1); 00435 memcpy(mp->b_datap->db_base, data, sz); 00436 mp->b_wptr = mp->b_datap->db_base + sz; 00437 mp->b_rptr = mp->b_datap->db_base; 00438 rtp_session_rtp_parse (session, mp, read_ts, 00439 (struct sockaddr*)0, 00440 0); 00441 res = rtp_session_recvm_with_ts(session, read_ts); 00442 if (res) 00443 read_ts++; 00444 GD_SINFO_DUMP("rtp_session_recvm_with_ts " << res); 00445 if (!res) 00446 return sz; // No data available 00447 payload_size=rtp_get_payload(mp,&payload); 00448 type = rtp_get_payload_type(mp); 00449 // Do not write if the type did not change. 00450 if (type != (int)mediaType) 00451 mediaType = type; 00452 GD_SINFO_DUMP("rtp payload type " << type); 00453 if (forceType) 00454 type = forceType; 00455 } 00456 std::string ld = localDeliver; 00457 00458 switch (type) 00459 { 00460 case URBISCRIPT: 00461 UObject::send(std::string((const char*)payload, payload_size)); 00462 break; 00463 00464 case SER_VALUES: 00465 { 00466 sIArchiver.clear(); 00467 sIArchiver.str(std::string((const char*)payload, payload_size)); 00468 GD_FINFO_DUMP("Deserializing SER_VALUES (%s, %s)", (int)(((const char*)payload)[0]), (int)(((const char*)payload)[1])); 00469 if (!groupIArchiver) 00470 groupIArchiver = new libport::serialize::BinaryISerializer(sIArchiver); 00471 try { 00472 while (!sIArchiver.eof()) 00473 { 00474 std::string name; 00475 libport::utime_t timestamp; 00476 unsigned int tlow, thi; 00477 UValue val; 00478 *groupIArchiver >> name >> tlow >> thi >> val; 00479 timestamp = tlow + ((libport::utime_t)thi << 32); 00480 if (!name.empty()) 00481 localWrite(name, val, timestamp); 00482 } 00483 } 00484 // EOF detection is buggy. We will land there every turn. 00485 catch(const libport::serialize::Exception& e) 00486 { 00487 GD_FINFO_TRACE("Serialize exception %s", e.what()); 00488 } 00489 } 00490 break; 00491 case VALUES: 00492 { 00493 // FIXME: this will not work with binaries 00494 binaries_type bd; 00495 UMessage m(*(UClient*)0, 0, "", 00496 std::string((const char*)payload, payload_size), 00497 bd); 00498 if (m.type != MESSAGE_DATA || m.value->type != DATA_LIST) 00499 { 00500 GD_WARN("Unexpected RTP message with payload type 'values'"); 00501 return sz; 00502 } 00503 foreach(UValue* v, m.value->list->array) 00504 { 00505 if (v->type != DATA_LIST) 00506 { 00507 GD_WARN("Malformed 'value' RTP message"); 00508 return sz; 00509 } 00510 if ((*v->list)[0].type == DATA_STRING) 00511 { 00512 if (v->list->size() != 2) 00513 { 00514 GD_WARN("Malformed 'value' RTP message"); 00515 return sz; 00516 } 00517 std::string name = *v->list->array[0]->stringValue; 00518 const UValue& val = *v->list->array[1]; 00519 // Transmit the value to the UObject backend directly 00520 localWrite(name, val); 00521 } 00522 else if ((*v->list)[0].type == DATA_DOUBLE && isRemoteMode()) 00523 { 00524 UMessage m(*getDefaultClient()); 00525 m.tag = externalModuleTag; 00526 m.type = MESSAGE_DATA; 00527 m.value = new UValue(*v->list); 00528 getDefaultClient()->notifyCallbacks(m); 00529 } 00530 } 00531 break; 00532 } 00533 00534 case BINARY: 00535 case RAW_BINARY: 00536 case 26: 00537 { 00538 if (!writeTo && ld.empty()) 00539 return sz; 00540 UBinary b; 00541 binaries_type bd; 00542 std::string keywords; 00543 if (type == BINARY) 00544 { 00545 // Just honor the included header. 00546 void* p = memchr(payload, '\n', payload_size); 00547 if (!p) 00548 { 00549 GD_WARN("Parse error in binary message: no newline detected"); 00550 return sz; 00551 } 00552 char* start = (char*)p + 1; 00553 size_t len = start - (char*)payload; 00554 bd.push_back(BinaryData(start, payload_size - len)); 00555 keywords = string_cast(payload_size-len) + " " 00556 + std::string((const char*)payload, len+1); 00557 } 00558 else 00559 { 00560 // Use forceheader, or session type (26=jpeg), otherwise, raw binary. 00561 std::string fh = forceHeader; 00562 keywords = string_cast(payload_size); 00563 if (!fh.empty()) 00564 keywords += " " + fh; 00565 else if (type == 26) 00566 keywords += " jpeg"; 00567 keywords += "\n"; 00568 bd.push_back(BinaryData(payload, payload_size)); 00569 } 00570 binaries_type::const_iterator beg = bd.begin(); 00571 b.parse(keywords.c_str(), 0, bd, beg, false); 00572 if (writeTo) 00573 { 00574 GD_SINFO_DUMP("writing to " << writeTo->get_name()); 00575 std::string sc = sourceContext; 00576 if (sc.empty()) 00577 *writeTo = b; 00578 else 00579 call("uobjects", "$uobject_writeFromContext", sc, writeTo->get_name(), 00580 b); 00581 } 00582 if (!ld.empty()) 00583 { 00584 // We are in the io_service thread, deliver asynchronously. 00585 // FIXME: use the client of current context instead. 00586 GD_SINFO_DUMP("Transmitting " << b.getMessage() <<" " << b.common.size); 00587 transmitRemoteWrite(ld, b, libport::utime()); 00588 } 00589 b.common.data = 0; 00590 if (res) 00591 freeb(res); 00592 } 00593 break; 00594 } 00595 return sz; 00596 } 00597 00598 void URTP::localWrite(const std::string& name, const UValue& val, 00599 libport::utime_t timestamp) 00600 { 00601 GD_FINFO_DUMP("localWrite on %s at %s", name, timestamp); 00602 if (isRemoteMode()) 00603 transmitRemoteWrite(name, val, timestamp); 00604 else 00605 { 00606 std::string sc = sourceContext; 00607 if (sc.empty()) 00608 { 00609 UVar var(name); 00610 var = val; 00611 } 00612 else 00613 call("uobjects", "$uobject_writeFromContext", sc, name, val, timestamp); 00614 } 00615 } 00616 00617 void URTP::send(const UValue& v) 00618 { 00619 GD_FINFO_TRACE("URTP::send type %s on %s", v.type, __name); 00620 // The boost::bind will make a copy of v. 00621 if (async) 00622 libport::asyncCall(boost::bind(&URTP::send_, this, v), 0, get_io_service()); 00623 else 00624 send_(v); 00625 GD_FINFO_TRACE("URTP::send finished on %s", v.type, __name); 00626 } 00627 00628 void URTP::send_(const UValue& v) 00629 { 00630 bool sync = syncSendSocket; 00631 if (!sendMode_ && sync) 00632 { 00633 sendMode_ = true; 00634 rtp_session_set_blocking_mode(session, 1); 00635 } 00636 if (sync) 00637 { 00638 #ifndef WIN32 00639 int flags = fcntl(getFD(), F_GETFL); 00640 fcntl(getFD(), F_SETFL, flags & ~O_NONBLOCK); 00641 #endif 00642 } 00643 libport::utime_t start = libport::utime(); 00644 bool craw = (int)raw; 00645 rtp_session_set_payload_type(session, craw? RAW_BINARY:BINARY); 00646 const UBinary& b = *v.binary; 00647 // Send new headers if target is defined and if it changed 00648 if (headerTarget) 00649 { 00650 std::string ht = *headerTarget; 00651 std::string m = b.getMessage(); 00652 if (m != ht) 00653 { 00654 *headerTarget = m; 00655 // If we do not wait, the UDP packet might reach the remote end before 00656 // the TCP message with the correct header. 00657 headerTarget->syncValue(); 00658 } 00659 } 00660 // We let ortp do the socket sending stuff, so give it the handle. 00661 session->rtp.socket = getFD(); 00662 int res = 0; 00663 if (craw) 00664 { 00665 if (rawUDP) 00666 { 00667 if (sync) 00668 syncWrite(b.common.data, b.common.size); 00669 else 00670 write(b.common.data, b.common.size); 00671 } 00672 else 00673 { 00674 mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE, 00675 (const unsigned char*)b.common.data, 00676 b.common.size); 00677 res = rtp_session_sendm_with_ts(session, p, write_ts++); 00678 } 00679 } 00680 else 00681 { 00682 std::string s = b.getMessage() + "\n"; 00683 char*d = new char[b.common.size + s.length()]; 00684 memcpy(d, s.c_str(), s.length()); 00685 memcpy(d+s.length(), b.common.data, b.common.size); 00686 if (rawUDP) 00687 { 00688 if (sync) 00689 syncWrite(d, b.common.size + s.length()); 00690 else 00691 write(d, b.common.size + s.length()); 00692 } 00693 else 00694 { 00695 mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE, 00696 (const unsigned char*)d, 00697 b.common.size + s.length()); 00698 res = rtp_session_sendm_with_ts(session, p, write_ts++); 00699 } 00700 delete[] d; 00701 } 00702 GD_SINFO_DUMP("wrote " << res <<" bytes"); 00703 // But reset it so that it will not attempt to call recvfrom() on our socket. 00704 session->rtp.socket = -1; 00705 if (sync) 00706 { 00707 #ifndef WIN32 00708 int flags = fcntl(getFD(), F_GETFL); 00709 fcntl(getFD(), F_SETFL, flags | O_NONBLOCK); 00710 #endif 00711 } 00712 static bool dstats = getenv("URBI_STATS"); 00713 if (dstats) 00714 { 00715 sendTime.add_sample(libport::utime() - start); 00716 if (sendTime.n_samples() == 100) 00717 { 00718 GD_FINFO_DEBUG("rtp send %s: " 00719 "mean = %s, max = %s, min = %s, variance = %s", 00720 (bool)rawUDP, 00721 sendTime.mean(), sendTime.max(), 00722 sendTime.min(), sendTime.variance()); 00723 sendTime.resize(0); //reset 00724 } 00725 } 00726 } 00727 00728 void URTP::receiveVar(UVar& v) 00729 { 00730 writeTo = new UVar(v.get_name()); 00731 writeTo->unnotify(); 00732 } 00733 00734 void URTP::setHeaderTarget(UVar& v) 00735 { 00736 headerTarget = new UVar(v.get_name()); 00737 headerTarget->unnotify(); 00738 } 00739 00740 void URTP::sendVar(UVar& v) 00741 { 00742 UNotifyChange(v, &URTP::onChange); 00743 } 00744 00745 UDictionary URTP::stats() 00746 { 00747 UDictionary res; 00748 const rtp_stats_t* stats = rtp_session_get_stats(session); 00749 if (!stats) 00750 throw std::runtime_error("No statistics returned"); 00751 res["bytesSent"] = stats->sent; 00752 res["packetSent"] = stats->packet_sent; 00753 res["dataBytesReceived"] = stats->recv; 00754 res["bytesReceived"] = stats->hw_recv; 00755 res["packetReceived"] = stats->packet_recv; 00756 res["underrun"] = stats->unavaillable; 00757 res["latePacket"] = stats->outoftime; 00758 res["packetLoss"] = stats->cum_packet_loss; 00759 res["invalid"] = stats->bad; 00760 res["overrun"] = stats->discarded; 00761 return res; 00762 } 00763 00764 void URTP::onTypeChange(UVar& t) 00765 { 00766 int type = t; 00767 rtp_session_set_payload_type(session, type); 00768 } 00769 00770 void URTP::onJitterChange(UVar&) 00771 { 00772 int j = jitter; 00773 int ja = jitterAdaptive; 00774 int jt = jitterTime; 00775 GD_SINFO_DUMP(this << " Setting jitter = " << j <<" jittertime = " << jt); 00776 rtp_session_enable_jitter_buffer(session, j? TRUE:FALSE); 00777 rtp_session_enable_adaptive_jitter_compensation(session, 00778 ja?TRUE:FALSE); 00779 rtp_session_set_jitter_compensation(session, jt); 00780 } 00781 00782 void URTP::reset() 00783 { 00784 read_ts = 0; 00785 write_ts = 0; 00786 rtp_session_reset(session); 00787 } 00788 00789 void URTP::onLogLevelChange(UVar&v) 00790 { 00791 int level = v; 00792 int el = 0; 00793 switch (level) 00794 { 00795 default: 00796 case 4: 00797 el|= ORTP_DEBUG; 00798 case 3: 00799 el |= ORTP_MESSAGE; 00800 case 2: 00801 el |= ORTP_WARNING; 00802 case 1: 00803 el |= ORTP_ERROR; 00804 case 0: 00805 break; 00806 } 00807 ortp_set_log_level_mask(el); 00808 } 00809 00810 void URTP::sendUrbiscript(const std::string& s) 00811 { 00812 rtp_session_set_payload_type(session, URBISCRIPT); 00813 mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE, 00814 (const unsigned char*)s.c_str(), 00815 s.size()); 00816 // We let ortp do the socket sending stuff, so give it the handle. 00817 session->rtp.socket = getFD(); 00818 00819 int res = rtp_session_sendm_with_ts(session, p, write_ts++); 00820 LIBPORT_USE(res); 00821 GD_SINFO_DUMP("wrote " << res <<" bytes"); 00822 // But reset it so that it will not attempt to call recvfrom() on our socket. 00823 session->rtp.socket = -1; 00824 } 00825 00826 void URTP::groupedSendVar(UVar& v) 00827 { 00828 libport::BlockLock bl(lock); 00829 // This uvar is temporary. 00830 UVar* nv = new UVar(v.get_name()); 00831 GD_SINFO_DEBUG("testuvar is " << nv << " " << nv->get_temp()); 00832 groupedVars[v.get_name()] = nv; 00833 UNotifyChange(*nv, &URTP::onGroupedChange); 00834 } 00835 00836 void URTP::unGroupedSendVar(UVar& v) 00837 { 00838 libport::BlockLock bl(lock); 00839 GD_SINFO_DEBUG("testuvar unis" << groupedVars[v.get_name()]); 00840 groupedVars[v.get_name()]->unnotify(); 00841 delete groupedVars[v.get_name()]; 00842 groupedVars.erase(v.get_name()); 00843 } 00844 00845 void URTP::onGroupedChange(UVar& v) 00846 { 00847 // Protect against loopback: If plugin mode, and remote lobby is the same 00848 // as current lobby, do not send. 00849 if (isPluginMode()) 00850 { 00851 // Get lobby we are connected to. 00852 UVar remoteLobby(__name, "lobbyId"); // lobbyId is set by makeRTPPair(). 00853 // Get current lobby. 00854 UVar currentLobby("Global", "currentLobbyId"); 00855 // Check if we are in a UVar update. 00856 UVar inUpdate("Global", "currentRunnerInUObjectUpdate"); 00857 // If in uvar update and same lobby, 'our' remote is the source: do not 00858 // transmit. 00859 if ((std::string)remoteLobby == (std::string)currentLobby 00860 && inUpdate) 00861 return; 00862 } 00863 sendGrouped(v.get_name(), v.val(), v.timestamp()); 00864 } 00865 00866 void URTP::sendGrouped(const std::string& name, const UValue& val, 00867 libport::utime_t time) 00868 { 00869 libport::BlockLock bl(lock); 00870 if (!groupOArchiver) 00871 groupOArchiver = new libport::serialize::BinaryOSerializer(sOArchiver); 00872 unsigned int tlow = (unsigned int)time; 00873 unsigned int thi = (unsigned int)(time >> 32); 00874 *groupOArchiver 00875 << name 00876 << tlow << thi 00877 << val; 00878 ufloat cd = commitDelay.data(); 00879 bool empty = groupEmpty_; 00880 groupEmpty_ = false; 00881 std::string& cn = commitTriggerVarName.data(); 00882 if (cn == name) 00883 { 00884 static bool synchronous_commit = false; 00885 if (!synchronous_commit) 00886 std::cerr <<"synchronous commitgroup" << std::endl; 00887 synchronous_commit = true; 00888 commitGroup(); 00889 } 00890 else if (cn.empty() && empty && cd>=0) 00891 libport::asyncCall(boost::bind(&URTP::commitGroup, this), 00892 libport::utime_t(cd * 1000000LL), 00893 ctx_->getIoService()); 00894 } 00895 00896 void URTP::commitGroup() 00897 { 00898 libport::utime_t start = libport::utime(); 00899 libport::BlockLock bl(lock); 00900 std::string s = sOArchiver.str(); 00901 sOArchiver.clear(); 00902 sOArchiver.str(""); 00903 groupEmpty_ = true; 00904 if (rawUDP) 00905 { 00906 write(s.c_str(), s.length()); 00907 } 00908 else 00909 { 00910 rtp_session_set_payload_type(session, SER_VALUES); 00911 mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE, 00912 (const unsigned char*)s.c_str(), 00913 s.size()); 00914 // We let ortp do the socket sending stuff, so give it the handle. 00915 session->rtp.socket = getFD(); 00916 00917 int res = rtp_session_sendm_with_ts(session, p, write_ts++); 00918 LIBPORT_USE(res); 00919 GD_SINFO_DUMP("wrote " << res <<" bytes"); 00920 // But reset it so that it will not attempt to call recvfrom() on 00921 // our socket. 00922 session->rtp.socket = -1; 00923 } 00924 static bool dstats = getenv("URBI_STATS"); 00925 if (dstats) 00926 { 00927 sendTime.add_sample(libport::utime() - start); 00928 if (sendTime.n_samples() == 500) 00929 { 00930 GD_FINFO_DEBUG("rtp group send: " 00931 "mean = %s, max = %s, min = %s, variance = %s", 00932 sendTime.mean(), sendTime.max(), 00933 sendTime.min(), sendTime.variance()); 00934 sendTime.resize(0); //reset 00935 } 00936 } 00937 } 00938 00939 00940 std::vector<std::string> 00941 URTP::groupContent() 00942 { 00943 std::vector<std::string> res; 00944 foreach(const GroupedVars::value_type& v, groupedVars) 00945 res.push_back(v.second->get_name()); 00946 return res; 00947 }