Urbi SDK Remote for C++  2.7.5
rtp.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2010-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00011 // Ortp needs a little help to know it's compiling on WIN32.
00012 
00013 // ORTP defines its own version of int32_t and all, so disable ours.
00014 #define LIBPORT_NO_CSTDINT_TYPES
00015 
00016 #include <libport/config.h>
00017 #include <libport/detect-win32.h>
00018 
00019 #include <libport/lexical-cast.hh>
00020 #include <libport/statistics.hh>
00021 #include <libport/sys/socket.h>
00022 #include <libport/system-warning-push.hh>
00023 #include <ortp/ortp.h>
00024 #include <libport/system-warning-pop.hh>
00025 
00026 #include <boost/unordered_map.hpp>
00027 
00028 #include <libport/asio.hh>
00029 #include <libport/debug.hh>
00030 
00031 #include <serialize/serialize.hh>
00032 
00033 #include <urbi/socket.hh>
00034 #include <urbi/uclient.hh>
00035 #include <urbi/uexternal.hh>
00036 #include <urbi/uobject.hh>
00037 
00038 #include <urbi/uvalue-serialize.hh>
00039 
00040 #include <urbi/customuvar.hh>
00041 
00042 using namespace urbi;
00043 
00044 GD_CATEGORY(URTP);
00045 extern "C"
00046 {
00047   // We are using our own network backend, which requires using a bit of the
00048   // internal ortp API.
00049   void rtp_session_rtp_parse(RtpSession *session, mblk_t *mp,
00050                              uint32_t local_str_ts,
00051                              struct sockaddr *addr, socklen_t addrlen);
00052   void rtp_session_rtcp_parse(RtpSession *session, mblk_t *mp);
00053 }
00054 
00055 class URTPLink;
00056 
00063 class URTP: public UObject, public UObjectSocket
00064 {
00065 public:
00066   URTP(const std::string& n);
00067   ~URTP();
00068   void init();
00069 
00073   void sendVar(UVar& v);
00075   void receiveVar(UVar& v);
00077 
00079   int listen(const std::string& host, const std::string& port);
00081   void connect(const std::string& host, const std::string& port);
00083   UDictionary stats();
00085   void reset();
00087   void send(const UValue&);
00089   void sendUrbiscript(const std::string& code);
00090 
00092 
00094 
00098   void groupedSendVar(UVar& v);
00100   void unGroupedSendVar(UVar& v);
00102   void sendGrouped(const std::string& name, const UValue& v,
00103                    libport::utime_t timestamp);
00104 
00108   CustomUVar<ufloat> commitDelay;
00109 
00111   CustomUVar<std::string> commitTriggerVarName;
00112 
00114 
00115   UEvent onConnectEvent, onErrorEvent;
00117   UVar forceType;
00119   UVar mediaType;
00121   UVar raw;
00123   UVar jitter, jitterAdaptive, jitterTime;
00125   UVar sourceContext;
00130   UVar forceHeader;
00132   enum MediaTypes {
00133     RAW_BINARY = 100, //< Binary without keywords
00134     BINARY     = 101, //< Keywords then binary
00135     URBISCRIPT = 102, //< urbiscript to execute
00137     VALUES     = 103,
00138     SER_VALUES = 104, //< Serialized stream of name value timestamp...
00139   };
00141   void setHeaderTarget(UVar& v);
00143   UVar async;
00145   UVar syncSendSocket;
00147   UVar rawUDP;
00149   std::vector<std::string> groupContent();
00150   virtual void onError(boost::system::error_code erc);
00151   virtual void onConnect();
00152   virtual size_t onRead(const void*, size_t);
00153   void readFrom(const void*, size_t,  boost::shared_ptr<libport::UDPLink>);
00154   void onChange(UVar& v);
00155   void onGroupedChange(UVar& v);
00156   void onTypeChange(UVar& v);
00157   void onJitterChange(UVar&);
00158   void onLogLevelChange(UVar& v);
00159   void commitGroup();
00160   void send_(const UValue&);
00161   void localWrite(const std::string& name, const UValue& val,
00162                   libport::utime_t timestamp = 0);
00163   URTPLink* makeSocket();
00164   // Ctor code, size matters
00165   void born();
00166   // Dtor code, size matters
00167   void die();
00168   RtpSession* session;
00169   size_t read_ts;
00170   size_t write_ts;
00171   UVar* writeTo;
00172   UVar* headerTarget;
00173   // Name of the uvar to deliver to locally.
00174   UVar localDeliver;
00175   UVar logLevel;
00176   libport::Lockable lock; // group protection
00177   libport::serialize::BinaryOSerializer* groupOArchiver;
00178   std::ostringstream sOArchiver;
00179   libport::serialize::BinaryISerializer* groupIArchiver;
00180   std::istringstream sIArchiver;
00181   // True if group is empty.
00182   bool groupEmpty_;
00183   typedef boost::unordered_map<std::string, UVar*> GroupedVars;
00184   GroupedVars groupedVars;
00185   friend class URTPLink;
00186   libport::Statistics<libport::utime_t> sendTime;
00187   bool sendMode_; // will this socket be used for synchronous sending.
00188   std::vector<unsigned short> localPorts;
00189 };
00190 
00191 class URTPLink: public libport::Socket
00192 {
00193 public:
00194   URTPLink(URTP* owner);
00195   virtual size_t onRead(const void*, size_t);
00196   URTP* owner_;
00197 };
00198 
00199 URTPLink::URTPLink(URTP* owner)
00200  : libport::Socket(owner->get_io_service())
00201  , owner_(owner)
00202 {
00203 }
00204 
00205 size_t URTPLink::onRead(const void* data, size_t size)
00206 {
00207   return owner_->onRead(data, size);
00208 }
00209 
00210 static
00211 std::string
00212 rtp_id()
00213 {
00214   return libport::format("URTP_%s_%s", getFilteredHostname(),
00215           // Under uclibc, each thread has a different pid.
00216 #ifdef __UCLIBC__
00217    "default"
00218 #else
00219    getpid()
00220 #endif
00221    );
00222 }
00223 
00224 ::urbi::URBIStarter<URTP>
00225 starter_URTP(urbi::isPluginMode() ? "URTP" : rtp_id());
00226 
00227 static void bounceSend(UObject*o, const UValue& v)
00228 {
00229   URTP* r = reinterpret_cast<URTP*>(o);
00230   r->send(v);
00231 }
00232 
00233 static void bounceSendGrouped(UObject*o, const std::string& n, const UValue& v,
00234                        libport::utime_t ts)
00235 {
00236   URTP* r = reinterpret_cast<URTP*>(o);
00237   r->sendGrouped(n, v, ts);
00238 }
00239 
00240 URTP::URTP(const std::string& n)
00241  : UObject(n)
00242  , UObjectSocket(getCurrentContext()->getIoService())
00243  , session(0)
00244  , read_ts(1)
00245  , write_ts(1)
00246  , writeTo(0)
00247  , headerTarget(0)
00248  , groupOArchiver(0)
00249  , groupIArchiver(0)
00250  , groupEmpty_(true)
00251  , sendMode_(false) // we don't know yet
00252 {
00253   born();
00254 }
00255 
00256 void URTP::born()
00257 {
00258   // Register us to contextimpl hooks
00259   ctx_->rtpSendGrouped = &bounceSendGrouped;
00260   ctx_->rtpSend = &bounceSend;
00261   GD_FINFO_DUMP("URTP::URTP on %s", this);
00262   UBindFunction(URTP, init);
00263   static bool ortpInit = false;
00264   if (!ortpInit)
00265   {
00266     ortp_init();
00267     ortp_scheduler_init();
00268     ortp_set_log_level_mask(/*ORTP_DEBUG|ORTP_MESSAGE|*/ORTP_WARNING|ORTP_ERROR);
00269     ortpInit = true;
00270     UObject::send("var _rtp_object_name = \"" + __name + "\"|");
00271   }
00272   UBindVar(URTP, logLevel);
00273   UNotifyChange(logLevel, &URTP::onLogLevelChange);
00274   session=rtp_session_new(RTP_SESSION_SENDRECV);
00275   rtp_session_set_scheduling_mode(session,0);
00276   // Should we block for sending?
00277   rtp_session_set_blocking_mode(session,0);
00278   rtp_session_set_symmetric_rtp(session,TRUE);
00279   rtp_session_enable_adaptive_jitter_compensation(session, FALSE);
00280   rtp_session_set_jitter_compensation(session, 500);
00281   rtp_session_set_payload_type(session,0);
00282 }
00283 
00284 URTP::~URTP()
00285 {
00286   die();
00287 }
00288 
00289 void URTP::die()
00290 {
00291   GD_FINFO_DUMP("URTP::~URTP on %s", this);
00292   close();
00293   libport::BlockLock bl(lock);
00294   {
00295     foreach(GroupedVars::value_type& v, groupedVars)
00296       delete v.second;
00297     groupedVars.clear();
00298   }
00299   foreach(unsigned short lp, localPorts)
00300     closeUDP(lp);
00301   delete writeTo;
00302   delete headerTarget;
00303   delete groupOArchiver;
00304   delete groupIArchiver;
00305 }
00306 
00307 URTPLink* URTP::makeSocket()
00308 {
00309   return new URTPLink(this);
00310 }
00311 
00312 void URTP::init()
00313 {
00314   UBindEventRename(URTP, onConnectEvent, "onConnect");
00315   UBindEventRename(URTP, onErrorEvent, "onError");
00316   UBindVars(URTP, forceType, mediaType, jitter, jitterAdaptive, jitterTime,
00317             localDeliver, forceHeader, raw);
00318   UBindVars(URTP, sourceContext, async, syncSendSocket, rawUDP);
00319   UBindCacheVar(URTP, commitDelay, ufloat);
00320   UBindCacheVar(URTP, commitTriggerVarName, std::string);
00321   async = 0;
00322   syncSendSocket = 0;
00323   rawUDP = 0;
00324   localDeliver = "";
00325   jitter = 1;
00326   jitterAdaptive = 1;
00327   jitterTime = 500;
00328   raw = 0;
00329   commitDelay = 0.0001;
00330   commitTriggerVarName = "";
00331   forceType = 0;
00332   forceHeader = "";
00333   UBindFunctions(libport::Socket, getLocalPort, getRemotePort,
00334                  getLocalHost, getRemoteHost, isConnected, close);
00335   UBindFunctions(URTP, sendVar, receiveVar, listen, connect, stats, reset,
00336                  send, setHeaderTarget, sendUrbiscript);
00337   UBindFunctions(URTP, groupedSendVar, unGroupedSendVar, sendGrouped,
00338                  groupContent);
00339   UNotifyChange(mediaType, &URTP::onTypeChange);
00340   mediaType = 96;
00341   // Not cool, but otherwise this line gets executed asynchronously in
00342   // remote mode which is not early enough.
00343   rtp_session_set_payload_type(session, 96);
00344   UNotifyChange(jitter, &URTP::onJitterChange);
00345   UNotifyChange(jitterAdaptive, &URTP::onJitterChange);
00346   UNotifyChange(jitterTime, &URTP::onJitterChange);
00347   jitter = 0;
00348 }
00349 
00350 int URTP::listen(const std::string& host, const std::string& port)
00351 {
00352   GD_SINFO_DUMP("Listening on " << host <<":" << port);
00353   boost::system::error_code erc;
00354   unsigned short res =
00355     Socket::listenUDP(host, port,
00356                       boost::bind(&URTP::readFrom, this, _1, _2, _3),
00357                       erc, get_io_service());
00358   if (erc)
00359     throw std::runtime_error(erc.message());
00360   localPorts.push_back(res);
00361   return res;
00362 }
00363 
00364 void URTP::readFrom(const void* data, size_t size,
00365                     boost::shared_ptr<libport::UDPLink>)
00366 {
00367   onRead(data, size);
00368 }
00369 
00370 void URTP::connect(const std::string& host, const std::string& port)
00371 {
00372   boost::system::error_code erc = Socket::connect(host, port, true);
00373   if (erc)
00374     throw std::runtime_error(erc.message());
00375 #define RTP_SOCKET_CONNECTED (1 << 8)
00376   session->flags |= RTP_SOCKET_CONNECTED;
00377 }
00378 
00379 void URTP::onError(boost::system::error_code erc)
00380 {
00381   onErrorEvent.emit(erc.message());
00382 }
00383 
00384 void URTP::onConnect()
00385 {
00386   onConnectEvent.emit();
00387 }
00388 
00389 void URTP::onChange(UVar& v)
00390 {
00391   send(v.val());
00392 }
00393 
00394 template<typename T>
00395 void
00396 transmitRemoteWrite(const std::string& name, const T& val,
00397                     libport::utime_t timestamp)
00398 {
00399   UMessage m(*getDefaultClient());
00400   m.tag = externalModuleTag;
00401   m.type = MESSAGE_DATA;
00402   m.value = new UValue(UList());
00403   UList& l = *m.value->list;
00404   l.push_back(UEM_ASSIGNVALUE);
00405   l.push_back(name);
00406   l.push_back(val);
00407   l.push_back(timestamp);
00408   getDefaultClient()->notifyCallbacks(m);
00409 }
00410 
00411 size_t URTP::onRead(const void* data, size_t sz)
00412 {
00413   GD_SINFO_DUMP(this << " packet of size " << sz);
00414 
00415   unsigned char *payload;
00416   int payload_size;
00417   int type;
00418   mblk_t* res = 0;
00419   if (rawUDP)
00420   {
00421     payload = (unsigned char*)data;
00422     payload_size = sz;
00423     type = mediaType;
00424     if (forceType)
00425       type = forceType;
00426   }
00427   else
00428   {
00429     /* Normal operation of ortp is to call rtp_session_recvm_with_ts which will
00430     * read its socket, get a packet and pass it to rtp_session_rtp_parse.
00431     * But here we handle the socket ourselve. Fortunately the above sequence
00432     * runs fine even if the socket is -1.
00433     */
00434     mblk_t* mp = allocb(sz, 1);
00435     memcpy(mp->b_datap->db_base, data, sz);
00436     mp->b_wptr = mp->b_datap->db_base + sz;
00437     mp->b_rptr = mp->b_datap->db_base;
00438     rtp_session_rtp_parse (session, mp, read_ts,
00439                            (struct sockaddr*)0,
00440                            0);
00441     res = rtp_session_recvm_with_ts(session, read_ts);
00442     if (res)
00443       read_ts++;
00444     GD_SINFO_DUMP("rtp_session_recvm_with_ts " << res);
00445     if (!res)
00446       return sz; // No data available
00447     payload_size=rtp_get_payload(mp,&payload);
00448     type = rtp_get_payload_type(mp);
00449     // Do not write if the type did not change.
00450     if (type != (int)mediaType)
00451       mediaType = type;
00452     GD_SINFO_DUMP("rtp payload type " << type);
00453     if (forceType)
00454       type = forceType;
00455   }
00456   std::string ld = localDeliver;
00457 
00458   switch (type)
00459   {
00460   case URBISCRIPT:
00461     UObject::send(std::string((const char*)payload, payload_size));
00462     break;
00463 
00464   case SER_VALUES:
00465   {
00466     sIArchiver.clear();
00467     sIArchiver.str(std::string((const char*)payload, payload_size));
00468     GD_FINFO_DUMP("Deserializing SER_VALUES (%s, %s)", (int)(((const char*)payload)[0]), (int)(((const char*)payload)[1]));
00469     if (!groupIArchiver)
00470       groupIArchiver = new libport::serialize::BinaryISerializer(sIArchiver);
00471     try {
00472     while (!sIArchiver.eof())
00473     {
00474       std::string name;
00475       libport::utime_t timestamp;
00476       unsigned int tlow, thi;
00477       UValue val;
00478       *groupIArchiver >> name >> tlow >> thi >> val;
00479       timestamp = tlow + ((libport::utime_t)thi << 32);
00480       if (!name.empty())
00481         localWrite(name, val, timestamp);
00482     }
00483     }
00484     // EOF detection is buggy. We will land there every turn.
00485     catch(const libport::serialize::Exception& e)
00486     {
00487       GD_FINFO_TRACE("Serialize exception %s", e.what());
00488     }
00489   }
00490   break;
00491   case VALUES:
00492   {
00493     // FIXME: this will not work with binaries
00494     binaries_type bd;
00495     UMessage m(*(UClient*)0, 0, "",
00496                std::string((const char*)payload, payload_size),
00497                bd);
00498     if (m.type != MESSAGE_DATA || m.value->type != DATA_LIST)
00499     {
00500       GD_WARN("Unexpected RTP message with payload type 'values'");
00501       return sz;
00502     }
00503     foreach(UValue* v, m.value->list->array)
00504     {
00505       if (v->type != DATA_LIST)
00506       {
00507         GD_WARN("Malformed 'value' RTP message");
00508         return sz;
00509       }
00510       if ((*v->list)[0].type == DATA_STRING)
00511       {
00512         if (v->list->size() != 2)
00513         {
00514           GD_WARN("Malformed 'value' RTP message");
00515           return sz;
00516         }
00517         std::string name = *v->list->array[0]->stringValue;
00518         const UValue& val  = *v->list->array[1];
00519         // Transmit the value to the UObject backend directly
00520         localWrite(name, val);
00521       }
00522       else if ((*v->list)[0].type == DATA_DOUBLE && isRemoteMode())
00523       {
00524         UMessage m(*getDefaultClient());
00525         m.tag = externalModuleTag;
00526         m.type = MESSAGE_DATA;
00527         m.value = new UValue(*v->list);
00528         getDefaultClient()->notifyCallbacks(m);
00529       }
00530     }
00531     break;
00532   }
00533 
00534   case BINARY:
00535   case RAW_BINARY:
00536   case 26:
00537   {
00538     if (!writeTo && ld.empty())
00539       return sz;
00540     UBinary b;
00541     binaries_type bd;
00542     std::string keywords;
00543     if (type == BINARY)
00544     {
00545       // Just honor the included header.
00546       void* p = memchr(payload, '\n', payload_size);
00547       if (!p)
00548       {
00549         GD_WARN("Parse error in binary message: no newline detected");
00550         return sz;
00551       }
00552       char* start = (char*)p + 1;
00553       size_t len = start - (char*)payload;
00554       bd.push_back(BinaryData(start, payload_size - len));
00555       keywords = string_cast(payload_size-len) + " "
00556         + std::string((const char*)payload, len+1);
00557     }
00558     else
00559     {
00560       // Use forceheader, or session type (26=jpeg), otherwise, raw binary.
00561       std::string fh = forceHeader;
00562       keywords = string_cast(payload_size);
00563       if (!fh.empty())
00564         keywords += " " + fh;
00565       else if (type == 26)
00566         keywords += " jpeg";
00567       keywords += "\n";
00568       bd.push_back(BinaryData(payload, payload_size));
00569     }
00570     binaries_type::const_iterator beg = bd.begin();
00571     b.parse(keywords.c_str(), 0, bd, beg, false);
00572     if (writeTo)
00573     {
00574       GD_SINFO_DUMP("writing to " << writeTo->get_name());
00575       std::string sc = sourceContext;
00576       if (sc.empty())
00577         *writeTo = b;
00578       else
00579         call("uobjects", "$uobject_writeFromContext", sc, writeTo->get_name(),
00580              b);
00581     }
00582     if (!ld.empty())
00583     {
00584       // We are in the io_service thread, deliver asynchronously.
00585       // FIXME: use the client of current context instead.
00586       GD_SINFO_DUMP("Transmitting " << b.getMessage() <<" " << b.common.size);
00587       transmitRemoteWrite(ld, b, libport::utime());
00588     }
00589     b.common.data = 0;
00590     if (res)
00591       freeb(res);
00592   }
00593   break;
00594   }
00595   return sz;
00596 }
00597 
00598 void URTP::localWrite(const std::string& name, const UValue& val,
00599                       libport::utime_t timestamp)
00600 {
00601   GD_FINFO_DUMP("localWrite on %s at %s", name, timestamp);
00602   if (isRemoteMode())
00603     transmitRemoteWrite(name, val, timestamp);
00604   else
00605   {
00606     std::string sc = sourceContext;
00607     if (sc.empty())
00608     {
00609       UVar var(name);
00610       var = val;
00611     }
00612     else
00613       call("uobjects", "$uobject_writeFromContext", sc, name, val, timestamp);
00614   }
00615 }
00616 
00617 void URTP::send(const UValue& v)
00618 {
00619   GD_FINFO_TRACE("URTP::send type %s on %s", v.type, __name);
00620   // The boost::bind will make a copy of v.
00621   if (async)
00622     libport::asyncCall(boost::bind(&URTP::send_, this, v), 0, get_io_service());
00623   else
00624     send_(v);
00625   GD_FINFO_TRACE("URTP::send finished on %s", v.type, __name);
00626 }
00627 
00628 void URTP::send_(const UValue& v)
00629 {
00630   bool sync = syncSendSocket;
00631   if (!sendMode_ && sync)
00632   {
00633     sendMode_ = true;
00634     rtp_session_set_blocking_mode(session, 1);
00635   }
00636   if (sync)
00637   {
00638 #ifndef WIN32
00639     int flags = fcntl(getFD(), F_GETFL);
00640     fcntl(getFD(), F_SETFL, flags & ~O_NONBLOCK);
00641 #endif
00642   }
00643   libport::utime_t start = libport::utime();
00644   bool craw = (int)raw;
00645   rtp_session_set_payload_type(session, craw? RAW_BINARY:BINARY);
00646   const UBinary& b = *v.binary;
00647   // Send new headers if target is defined and if it changed
00648   if (headerTarget)
00649   {
00650     std::string ht = *headerTarget;
00651     std::string m = b.getMessage();
00652     if (m != ht)
00653     {
00654       *headerTarget = m;
00655       // If we do not wait, the UDP packet might reach the remote end before
00656       // the TCP message with the correct header.
00657       headerTarget->syncValue();
00658     }
00659   }
00660   // We let ortp do the socket sending stuff, so give it the handle.
00661   session->rtp.socket = getFD();
00662   int res = 0;
00663   if (craw)
00664   {
00665     if (rawUDP)
00666     {
00667       if (sync)
00668         syncWrite(b.common.data, b.common.size);
00669       else
00670         write(b.common.data, b.common.size);
00671     }
00672     else
00673     {
00674       mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE,
00675                                             (const unsigned char*)b.common.data,
00676                                             b.common.size);
00677       res = rtp_session_sendm_with_ts(session, p, write_ts++);
00678     }
00679   }
00680   else
00681   {
00682     std::string s = b.getMessage() + "\n";
00683     char*d = new char[b.common.size + s.length()];
00684     memcpy(d, s.c_str(), s.length());
00685     memcpy(d+s.length(), b.common.data, b.common.size);
00686     if (rawUDP)
00687     {
00688      if (sync)
00689        syncWrite(d, b.common.size + s.length());
00690      else
00691        write(d, b.common.size + s.length());
00692     }
00693     else
00694     {
00695       mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE,
00696                                             (const unsigned char*)d,
00697                                             b.common.size + s.length());
00698       res = rtp_session_sendm_with_ts(session, p, write_ts++);
00699     }
00700     delete[] d;
00701   }
00702   GD_SINFO_DUMP("wrote " << res <<" bytes");
00703   // But reset it so that it will not attempt to call recvfrom() on our socket.
00704   session->rtp.socket = -1;
00705   if (sync)
00706   {
00707 #ifndef WIN32
00708     int flags = fcntl(getFD(), F_GETFL);
00709     fcntl(getFD(), F_SETFL, flags | O_NONBLOCK);
00710 #endif
00711   }
00712   static bool dstats = getenv("URBI_STATS");
00713   if (dstats)
00714   {
00715     sendTime.add_sample(libport::utime() - start);
00716     if (sendTime.n_samples() == 100)
00717     {
00718       GD_FINFO_DEBUG("rtp send %s: "
00719                      "mean = %s, max = %s, min = %s, variance = %s",
00720                      (bool)rawUDP,
00721                      sendTime.mean(), sendTime.max(),
00722                      sendTime.min(), sendTime.variance());
00723       sendTime.resize(0); //reset
00724     }
00725   }
00726 }
00727 
00728 void URTP::receiveVar(UVar& v)
00729 {
00730   writeTo = new UVar(v.get_name());
00731   writeTo->unnotify();
00732 }
00733 
00734 void URTP::setHeaderTarget(UVar& v)
00735 {
00736   headerTarget = new UVar(v.get_name());
00737   headerTarget->unnotify();
00738 }
00739 
00740 void URTP::sendVar(UVar& v)
00741 {
00742   UNotifyChange(v, &URTP::onChange);
00743 }
00744 
00745 UDictionary URTP::stats()
00746 {
00747   UDictionary res;
00748   const rtp_stats_t* stats = rtp_session_get_stats(session);
00749   if (!stats)
00750     throw std::runtime_error("No statistics returned");
00751   res["bytesSent"] = stats->sent;
00752   res["packetSent"] = stats->packet_sent;
00753   res["dataBytesReceived"] = stats->recv;
00754   res["bytesReceived"] = stats->hw_recv;
00755   res["packetReceived"] = stats->packet_recv;
00756   res["underrun"] = stats->unavaillable;
00757   res["latePacket"] = stats->outoftime;
00758   res["packetLoss"] = stats->cum_packet_loss;
00759   res["invalid"] = stats->bad;
00760   res["overrun"] = stats->discarded;
00761   return res;
00762 }
00763 
00764 void URTP::onTypeChange(UVar& t)
00765 {
00766   int type = t;
00767   rtp_session_set_payload_type(session, type);
00768 }
00769 
00770 void URTP::onJitterChange(UVar&)
00771 {
00772   int j = jitter;
00773   int ja = jitterAdaptive;
00774   int jt = jitterTime;
00775   GD_SINFO_DUMP(this << " Setting jitter = " << j <<"  jittertime = " << jt);
00776   rtp_session_enable_jitter_buffer(session, j? TRUE:FALSE);
00777   rtp_session_enable_adaptive_jitter_compensation(session,
00778                                                   ja?TRUE:FALSE);
00779   rtp_session_set_jitter_compensation(session, jt);
00780 }
00781 
00782 void URTP::reset()
00783 {
00784   read_ts = 0;
00785   write_ts = 0;
00786   rtp_session_reset(session);
00787 }
00788 
00789 void URTP::onLogLevelChange(UVar&v)
00790 {
00791   int level = v;
00792   int el = 0;
00793   switch (level)
00794   {
00795   default:
00796   case 4:
00797     el|= ORTP_DEBUG;
00798   case 3:
00799     el |= ORTP_MESSAGE;
00800   case 2:
00801     el |= ORTP_WARNING;
00802   case 1:
00803     el |= ORTP_ERROR;
00804   case 0:
00805     break;
00806   }
00807   ortp_set_log_level_mask(el);
00808 }
00809 
00810 void URTP::sendUrbiscript(const std::string& s)
00811 {
00812   rtp_session_set_payload_type(session, URBISCRIPT);
00813   mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE,
00814                                         (const unsigned char*)s.c_str(),
00815                                         s.size());
00816   // We let ortp do the socket sending stuff, so give it the handle.
00817   session->rtp.socket = getFD();
00818 
00819   int res = rtp_session_sendm_with_ts(session, p, write_ts++);
00820   LIBPORT_USE(res);
00821   GD_SINFO_DUMP("wrote " << res <<" bytes");
00822   // But reset it so that it will not attempt to call recvfrom() on our socket.
00823   session->rtp.socket = -1;
00824 }
00825 
00826 void URTP::groupedSendVar(UVar& v)
00827 {
00828   libport::BlockLock bl(lock);
00829   // This uvar is temporary.
00830   UVar* nv = new UVar(v.get_name());
00831   GD_SINFO_DEBUG("testuvar is " << nv << " " << nv->get_temp());
00832   groupedVars[v.get_name()] = nv;
00833   UNotifyChange(*nv, &URTP::onGroupedChange);
00834 }
00835 
00836 void URTP::unGroupedSendVar(UVar& v)
00837 {
00838   libport::BlockLock bl(lock);
00839   GD_SINFO_DEBUG("testuvar unis" << groupedVars[v.get_name()]);
00840   groupedVars[v.get_name()]->unnotify();
00841   delete groupedVars[v.get_name()];
00842   groupedVars.erase(v.get_name());
00843 }
00844 
00845 void URTP::onGroupedChange(UVar& v)
00846 {
00847   // Protect against loopback: If plugin mode, and remote lobby is the same
00848   // as current lobby, do not send.
00849   if (isPluginMode())
00850   {
00851     // Get lobby we are connected to.
00852     UVar remoteLobby(__name, "lobbyId"); // lobbyId is set by makeRTPPair().
00853     // Get current lobby.
00854     UVar currentLobby("Global", "currentLobbyId");
00855     // Check if we are in a UVar update.
00856     UVar inUpdate("Global", "currentRunnerInUObjectUpdate");
00857     // If in uvar update and same lobby, 'our' remote is the source: do not
00858     // transmit.
00859     if ((std::string)remoteLobby == (std::string)currentLobby
00860         && inUpdate)
00861       return;
00862   }
00863   sendGrouped(v.get_name(), v.val(), v.timestamp());
00864 }
00865 
00866 void URTP::sendGrouped(const std::string& name, const UValue& val,
00867                        libport::utime_t time)
00868 {
00869   libport::BlockLock bl(lock);
00870   if (!groupOArchiver)
00871     groupOArchiver = new libport::serialize::BinaryOSerializer(sOArchiver);
00872   unsigned int tlow = (unsigned int)time;
00873   unsigned int thi = (unsigned int)(time >> 32);
00874   *groupOArchiver
00875     << name
00876     << tlow << thi
00877     << val;
00878   ufloat cd = commitDelay.data();
00879   bool empty = groupEmpty_;
00880   groupEmpty_ = false;
00881   std::string& cn = commitTriggerVarName.data();
00882   if (cn == name)
00883   {
00884     static bool synchronous_commit = false;
00885     if (!synchronous_commit)
00886       std::cerr <<"synchronous commitgroup" << std::endl;
00887     synchronous_commit = true;
00888     commitGroup();
00889   }
00890   else if (cn.empty() && empty && cd>=0)
00891     libport::asyncCall(boost::bind(&URTP::commitGroup, this),
00892                        libport::utime_t(cd * 1000000LL),
00893                        ctx_->getIoService());
00894 }
00895 
00896 void URTP::commitGroup()
00897 {
00898   libport::utime_t start = libport::utime();
00899   libport::BlockLock bl(lock);
00900   std::string s = sOArchiver.str();
00901   sOArchiver.clear();
00902   sOArchiver.str("");
00903   groupEmpty_ = true;
00904   if (rawUDP)
00905   {
00906     write(s.c_str(), s.length());
00907   }
00908   else
00909   {
00910     rtp_session_set_payload_type(session, SER_VALUES);
00911     mblk_t* p = rtp_session_create_packet(session, RTP_FIXED_HEADER_SIZE,
00912                                           (const unsigned char*)s.c_str(),
00913                                           s.size());
00914     // We let ortp do the socket sending stuff, so give it the handle.
00915     session->rtp.socket = getFD();
00916 
00917     int res = rtp_session_sendm_with_ts(session, p, write_ts++);
00918     LIBPORT_USE(res);
00919     GD_SINFO_DUMP("wrote " << res <<" bytes");
00920     // But reset it so that it will not attempt to call recvfrom() on
00921     // our socket.
00922     session->rtp.socket = -1;
00923   }
00924   static bool dstats = getenv("URBI_STATS");
00925   if (dstats)
00926   {
00927     sendTime.add_sample(libport::utime() - start);
00928     if (sendTime.n_samples() == 500)
00929     {
00930       GD_FINFO_DEBUG("rtp group send: "
00931                      "mean = %s, max = %s, min = %s, variance = %s",
00932                      sendTime.mean(), sendTime.max(),
00933                      sendTime.min(), sendTime.variance());
00934       sendTime.resize(0); //reset
00935     }
00936   }
00937 }
00938 
00939 
00940 std::vector<std::string>
00941 URTP::groupContent()
00942 {
00943   std::vector<std::string> res;
00944   foreach(const GroupedVars::value_type& v, groupedVars)
00945     res.push_back(v.second->get_name());
00946   return res;
00947 }