Urbi SDK Remote for C++  2.7.5
uvar.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2005-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00012 
00013 #include <libport/format.hh>
00014 
00015 #include <libport/containers.hh>
00016 #include <libport/debug.hh>
00017 #include <libport/escape.hh>
00018 #include <libport/lexical-cast.hh>
00019 
00020 #include <urbi/uabstractclient.hh>
00021 #include <urbi/ublend-type.hh>
00022 #include <urbi/uexternal.hh>
00023 #include <urbi/umessage.hh>
00024 #include <urbi/uobject.hh>
00025 #include <urbi/usyncclient.hh>
00026 #include <urbi/uvalue-serialize.hh>
00027 
00028 #include <liburbi/compatibility.hh>
00029 
00030 #include <libuobject/remote-ucontext-impl.hh>
00031 
00032 namespace urbi
00033 {
00034   namespace impl
00035   {
00036 
00037   GD_CATEGORY(Urbi.LibUObject);
00038 
00040   void
00041   RemoteUVarImpl::initialize(UVar* owner)
00042   {
00043     GD_FINFO_TRACE("RemoteUVarImpl::initialize %s %s", owner->get_name(),
00044                    this);
00045     owner_ = owner;
00046     bypass_ = false;
00047     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00048     client_ = ctx->backend_;
00049     LockableOstream* outputStream = ctx->outputStream;
00050     std::string name = owner_->get_name();
00051     {
00052       libport::BlockLock bl(ctx->tableLock);
00053       UVarTable::callbacks_type& ct = ctx->varmap()[name];
00054       bool first = ct.empty();
00055       ct.push_back(owner_);
00056       if (first)
00057       {
00058         value_ = new UValue();
00059         timestamp_ = new time_t;
00060       }
00061       else
00062       {
00063         RemoteUVarImpl* impl = static_cast<RemoteUVarImpl*>(ct.front()->impl_);
00064         value_ = impl->value_;
00065         timestamp_ = impl->timestamp_;
00066       }
00067     }
00068     URBI_SEND_PIPED_COMMAND_C((*outputStream), "if (!isdef(" << name << ")) var "
00069                             << name);
00070     URBI_SEND_PIPED_COMMAND_C
00071           ((*outputStream),
00072            libport::format("external var %s from %s",
00073                            owner_->get_name(), ctx->hookPointName()));
00074     ctx->markDataSent();
00075   }
00076 
00077   bool RemoteUVarImpl::setBypass(bool enable)
00078   {
00079     bypass_ = enable;
00080     return true;
00081   }
00082 
00084   ufloat&
00085   RemoteUVarImpl::out()
00086   {
00087     return const_cast<ufloat&>(get().val);
00088   }
00089 
00091   ufloat&
00092   RemoteUVarImpl::in()
00093   {
00094     return const_cast<ufloat&>(get().val);
00095   }
00096 
00097 
00098   void
00099   RemoteUVarImpl::setProp(UProperty p, const UValue& v)
00100   {
00101     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00102     LockableOstream* outputStream = ctx->outputStream;
00103     URBI_SEND_PIPED_COMMAND_C((*outputStream), owner_->get_name() << "->"
00104                               << urbi::name(p) << " = " << v);
00105     ctx->markDataSent();
00106   }
00107 
00108   void
00109   RemoteUVarImpl::keepSynchronized()
00110   {
00111     //FIXME: do something?
00112   }
00113 
00114   UValue
00115   RemoteUVarImpl::getProp(UProperty p)
00116   {
00117     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00118     UMessage* m = ctx->syncGet(owner_->get_name() +"->"
00119                                + urbi::name(p));
00120     if (!m->value)
00121       throw std::runtime_error("Error fetching property on "
00122                                + owner_->get_name());
00123     UValue res = *m->value;
00124     delete m;
00125     return res;
00126   }
00127 
00129   void
00130   RemoteUVarImpl::clean()
00131   {
00132     RemoteUContextImpl* ctx = dynamic_cast<RemoteUContextImpl*>(owner_->ctx_);
00133     libport::BlockLock bl(ctx->tableLock);
00134     ctx->varmap().clean(*owner_);
00135     if (ctx->varmap()[owner_->get_name()].empty())
00136     {
00137       delete value_;
00138       delete timestamp_;
00139     }
00140   }
00141 
00142   static
00143   std::string
00144   rtp_id()
00145   {
00146     // Compute once in some thread implementations, each thread has different
00147     // PID.
00148     static std::string res =
00149       libport::format("URTP_%s_%s", getFilteredHostname(),
00150 #ifdef __UCLIBC__
00151    "default"
00152 #else
00153    getpid()
00154 #endif
00155     );
00156     return res;
00157   }
00158 
00159   static std::string makeLinkName(const std::string& key)
00160   {
00161     // We cannot have '.' in here, but we want to be able to regenerate the
00162     // original key unambiguously, so use something unlikely (as in reserved
00163     // idealy)
00164     std::string res =  rtp_id() + "___" + key;
00165     res[res.find_first_of(".")] = '_';
00166     return res;
00167   }
00168 
00169   void
00170   RemoteUContextImpl::makeRTPLink(const std::string& key)
00171   {
00172     /* Setup RTP mode
00173     * We create two instances of the URTP UObject: one local to this
00174     * remote, and one plugged in the engine, and connect them together.
00175     */
00176     // Spawn a new local RTP instance
00177     std::string localRTP = rtp_id();
00178     RemoteUContextImpl::objects_type::iterator oi = objects.find(localRTP);
00179     if (oi == objects.end())
00180       return;
00181     baseURBIStarter* bsa = oi->second->cloner;
00182     std::string linkName = makeLinkName(key);
00183     GD_SINFO_TRACE("Instanciating local RTP " << linkName);
00184     bsa->instanciate(this, linkName);
00185     // Call init
00186     localCall(linkName, "init");
00187 
00188     // Spawn a remote RTP instance and bind it.
00189     // Also destroy it when this remote disconnects.
00190     std::string rLinkName = linkName + "_l";
00191     URBI_SEND_COMMAND_C
00192       (*outputStream,
00193        libport::format("var %s = URTP.new|\n"
00194                        "%s.sourceContext = lobby.uid|\n",
00195                        rLinkName, rLinkName));
00196     // Now asynchronously ask the remote object to listen and to report
00197     // the port number.
00198     GD_SINFO_TRACE("fetching engine listen port...");
00199     backend_->setCallback(
00200       callback(*this, &RemoteUContextImpl::onRTPListenMessage),
00201       (URBI_REMOTE_RTP_INIT_CHANNEL + key).c_str());
00202     URBI_SEND_COMMA_COMMAND_C
00203       (*outputStream,
00204        libport::format("Channel.new(\"%s%s\") << %s.listen(\"0.0.0.0\", \"0\")",
00205                        URBI_REMOTE_RTP_INIT_CHANNEL, key, rLinkName));
00206     rtpLinks[key]  = 0; // Not ready yet.
00207   }
00208 
00209   UCallbackAction RemoteUContextImpl::onRTPListenMessage(const UMessage& mport)
00210   {
00211     // Second stage of RTP initialization: the remote is listening.
00212     if (mport.type != MESSAGE_DATA
00213         || mport.value->type != DATA_DOUBLE)
00214     {
00215       GD_SWARN("Failed to get remote RTP port, disabling RTP");
00216       enableRTP = false;
00217       return URBI_REMOVE;
00218     }
00219     // Extract key from channel.
00220     std::string key = mport.tag.substr(strlen(URBI_REMOTE_RTP_INIT_CHANNEL),
00221                                          mport.tag.npos);
00222     // Regenerate link name
00223     std::string linkName = makeLinkName(key);
00224     std::string rLinkName = linkName + "_l";
00225     // And uvar name
00226     std::string varname = key;
00227     size_t p = varname.find("___");
00228     if (p != varname.npos)
00229       varname = varname.substr(0, p) + varname.substr(p+3, varname.npos);
00230     int port = int(mport.value->val);
00231     GD_FINFO_TRACE("Finishing RTP init, link %s port %s variable %s",
00232                    linkName, port, varname);
00233     // Invoke the connect method on our RTP instance. Having a reference
00234     // to URTP symbols would be painful, so pass through our
00235     // UGenericCallback mechanism.
00236     localCall(linkName, "connect", backend_->getRemoteHost(), port);
00237     UObject* ob = getUObject(linkName);
00238     // Monitor this RTP link.
00239     URBI_SEND_COMMA_COMMAND_C(*outputStream,
00240       "detach('external'.monitorRTP(" << linkName << ","
00241       << rLinkName << ", closure() {'external'.failRTP}))|"
00242       << rLinkName << ".receiveVar(\"" << varname
00243       << "\")");
00244     rtpLinks[key]  = ob;
00245     return URBI_REMOVE;
00246   }
00247 
00248   void
00249   RemoteUVarImpl::set(const UValue& v)
00250   {
00251     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00252     libport::utime_t time = libport::utime();
00253     if (!owner_->get_local())
00254       transmit(v, time);
00255     // Loopback notification
00256     ctx->assignMessage(owner_->get_name(), v, time, bypass_);
00257   }
00258 
00259   void
00260   RemoteUVarImpl::transmitSerialized(const UValue& v, libport::utime_t time)
00261   {
00262     GD_INFO_DEBUG("transmitSerialized");
00263     char av = UEM_ASSIGNVALUE;
00264     std::string n = owner_->get_name();
00265     unsigned int tlow = (unsigned int)time;
00266     unsigned int thi = (unsigned int)(time >> 32);
00267     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00268     ctx->backend_->startPack();
00269     ctx->outputStream->flush();
00270     *static_cast<RemoteUContextImpl*>(owner_->ctx_)->
00271       oarchive
00272         << av
00273         << n
00274         << v
00275         << tlow << thi;
00276      client_->flush();
00277      ctx->backend_->endPack();
00278   }
00279 
00280   void
00281   RemoteUVarImpl::transmit(const UValue& v, libport::utime_t time)
00282   {
00283     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00284     std::string fullname = owner_->get_name();
00285     size_t pos = fullname.rfind(".");
00286     assert(pos != std::string::npos);
00287     std::string owner = fullname.substr(0, pos);
00288     std::string name = fullname.substr(pos + 1);
00289     GD_FINFO_DUMP("transmit new value for %s", fullname);
00290     bool rtp = false;
00291     if (v.type == DATA_BINARY)
00292     {
00293       std::string localRTP = rtp_id();
00294       if (ctx->enableRTP && getUObject(localRTP)
00295           && owner_->get_rtp() != UVar::RTP_NO)
00296       {
00297         GD_SINFO_TRACE("Trying RTP mode using " << localRTP);
00298         RemoteUContextImpl::RTPLinks::iterator i
00299           = ctx->rtpLinks.find(owner_->get_name());
00300         if (i == ctx->rtpLinks.end())
00301         {
00302           // Initiate rtp link asynchronously
00303           GD_INFO_TRACE("Asynchronous RTP link initialization.");
00304           ctx->makeRTPLink(owner_->get_name());
00305           goto rtpfail;
00306         }
00307         else if (i->second == 0)
00308         {
00309           GD_INFO_TRACE("RTP link not ready yet, fallback");
00310           goto rtpfail; // init started, link not ready yet
00311         }
00312         GD_FINFO_TRACE("Link ready, using cache if %s", ctx->rtpSend);
00313         if (ctx->rtpSend)
00314           ctx->rtpSend(i->second, v);
00315         else
00316           ctx->localCall(i->second->__name, "send", v);
00317         rtp = true;
00318       }
00319     rtpfail:
00320       if (!rtp)
00321       {
00322         if (ctx->serializationMode)
00323           transmitSerialized(v, time);
00324         else
00325         {
00326           ctx->backend_->startPack();
00327           *ctx->outputStream
00328           << owner
00329           << ".getSlot(\"" << libport::escape(name)
00330           << "\").update_timed(";
00331         // Sendbinary is not using the stream, so we must flush.
00332           ctx->outputStream->flush();
00333           UBinary& b = *(v.binary);
00334           ctx->backend_->sendBinary(b.common.data, b.common.size,
00335                               b.getMessage());
00336           *ctx->outputStream << ", " << time << ")|";
00337           ctx->backend_->endPack();
00338         }
00339       }
00340     }
00341     else
00342     {
00343       if (ctx->enableRTP && owner_->get_rtp())
00344       {
00345         if (!ctx->sharedRTP_)
00346         {
00347           RemoteUContextImpl::RTPLinks::iterator i
00348           = ctx->rtpLinks.find("_shared_");
00349           if (i == ctx->rtpLinks.end())
00350           {
00351             GD_INFO_DUMP("Async init of RTP shared link");
00352             ctx->makeRTPLink("_shared_");
00353             goto rtpfail2;
00354           }
00355           else if (!i->second)
00356           {
00357             GD_INFO_DUMP("RTP shared link not yet ready");
00358             goto rtpfail2;
00359           }
00360           ctx->sharedRTP_ = i->second;
00361         }
00362         GD_INFO_DUMP("localCalling sendGrouped");
00363         if (ctx->rtpSendGrouped)
00364           ctx->rtpSendGrouped(ctx->sharedRTP_, owner_->get_name(), v, time);
00365         else
00366           ctx->localCall(ctx->sharedRTP_->__name, "sendGrouped",
00367                          owner_->get_name(), v, time);
00368         rtp = true;
00369       }
00370     rtpfail2:
00371       if (!rtp)
00372       {
00373         if (ctx->serializationMode)
00374           transmitSerialized(v, time);
00375         else
00376         {
00377           ctx->backend_->startPack();
00378           *ctx->outputStream
00379           << owner
00380           << ".getSlot(\"" << libport::escape(name)
00381           << "\").update_timed(";
00382           if (v.type == DATA_STRING)
00383             (*ctx->outputStream) << "\"" << libport::escape(*v.stringValue, '"') << "\"";
00384           else
00385             *ctx->outputStream << v ;
00386           *ctx->outputStream << ", " << time << ")|";
00387           ctx->backend_->endPack();
00388         }
00389       }
00390     }
00391     if (!rtp && !ctx->serializationMode)
00392     {
00393       ctx->markDataSent();
00394     }
00395     GD_FINFO_DUMP("transmit new value for %s done", fullname);
00396   }
00397 
00398   const UValue& RemoteUVarImpl::get() const
00399   {
00400     return *value_;
00401   };
00402 
00404   void
00405   RemoteUVarImpl::setOwned()
00406   {
00407     owner_->owned = true;
00408   }
00409 
00411   UDataType
00412   RemoteUVarImpl::type() const
00413   {
00414     return get().type;
00415   }
00416 
00417   void
00418   RemoteUVarImpl::request()
00419   {
00420     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00421     std::string name = owner_->get_name();
00422     //build a getvalue message  that will be parsed and returned by the server
00423     URBI_SEND_PIPED_COMMAND_C((*ctx->outputStream), externalModuleTag << "<<"
00424                             <<'[' << UEM_ASSIGNVALUE << ","
00425                             << '"' << name << '"' << ',' << name << ']');
00426     ctx->markDataSent();
00427   }
00428 
00429   void
00430   RemoteUVarImpl::sync()
00431   {
00432     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00433     std::string name = owner_->get_name();
00434     UMessage* m = ctx->syncGet(name + ".uvalueSerialize");
00435     if (m->type == MESSAGE_DATA)
00436       value_->set(*m->value);
00437     delete m;
00438   }
00439 
00440   time_t
00441   RemoteUVarImpl::timestamp() const
00442   {
00443     return *timestamp_;
00444   }
00445 
00446   void RemoteUVarImpl::unnotify()
00447   {
00448     GD_FINFO_TRACE("RemoteUVarImpl::unnotify on %s (%s)", owner_->get_name(),
00449                    this);
00450     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00451     std::string name = owner_->get_name();
00452     size_t p = name.find_first_of(".");
00453     if (p == name.npos)
00454       throw std::runtime_error("unnotify: invalid argument: " + name);
00455     // Each UVar creation and each notifychange causes an 'external
00456     // var' message, so when the UVar dies, creation count is
00457     // callbacks.size +1.
00458     URBI_SEND_PIPED_COMMAND_C((*ctx->outputStream),
00459       "UObject.unnotify(\"" << name.substr(0, p) << "\", \""
00460                             << name.substr(p+1, name.npos) << "\","
00461                             << callbacks_.size()+1 << ")" );
00462     libport::BlockLock bl(ctx->tableLock);
00463     foreach(RemoteUGenericCallbackImpl* c, callbacks_)
00464     {
00465       UTable& t =
00466         dynamic_cast<RemoteUContextImpl*>(c->owner_->ctx_)
00467         ->tableByName(c->owner_->type);
00468       UTable::callbacks_type& ct = t[c->owner_->name];
00469       UTable::callbacks_type::iterator i = libport::find(ct, c->owner_);
00470       if (i != ct.end())
00471         ct.erase(i);
00472       owner_->ctx_->addCleanup(c->owner_); // Will clean the impl_ too.
00473     }
00474     callbacks_.clear();
00475     ctx->markDataSent();
00476     if (std::list<UVar*> *us = ctx->varmap().find0(name))
00477       us->remove(owner_);
00478   }
00479 
00480   void RemoteUVarImpl::useRTP(bool enable)
00481   {
00482     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00483     std::string name = owner_->get_name();
00484     size_t p = name.find_first_of(".");
00485     if (p == name.npos)
00486       throw std::runtime_error("invalid argument to useRTP: "+name);
00487     ctx->send(libport::format("%s.getSlot(\"%s\").rtp = %s|",
00488                          name.substr(0, p), name.substr(p+1, name.npos),
00489                          enable ? "true" : "false"));
00490     ctx->markDataSent();
00491   }
00492 
00493   void RemoteUVarImpl::setInputPort(bool enable)
00494   {
00495     RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_);
00496     std::string name = owner_->get_name();
00497     size_t p = name.find_first_of(".");
00498     if (p == name.npos)
00499       throw std::runtime_error("invalid argument to setInputPort: "+name);
00500     ctx->send(libport::format("%s.getSlot(\"%s\").%s|",
00501                          name.substr(0, p), name.substr(p+1, name.npos),
00502                          enable
00503                          ? "setSlot(\"inputPort\", true)"
00504                          : "removeLocalSlot(\"inputPort\")"));
00505     ctx->markDataSent();
00506   }
00507 
00508   }
00509 } //namespace urbi