Urbi SDK Remote for C++  2.7.5
threaded.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2010-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00011 #include <urbi/uobject.hh>
00012 #include <libport/foreach.hh>
00013 #include <libport/thread.hh>
00014 #include <libport/lockable.hh>
00015 using namespace urbi;
00016 
00017 
00018 
00019 GD_CATEGORY(Test.Threaded);
00020 
00027 class Threaded: public UObject
00028 {
00029 public:
00030   CREATE_CLASS_LOCK
00031   Threaded(const std::string& n)
00032   :UObject(n)
00033   {
00034     UBindFunction(Threaded, init);
00035     UBindFunction(Threaded, queueOp);
00036     UBindFunction(Threaded, getLastRead);
00037     UBindFunction(Threaded, startThread);
00038     UBindThreadedFunction(Threaded, throwException, LOCK_NONE);
00039     UBindThreadedFunction(Threaded, lockNoneDelayOp, LOCK_NONE);
00040     UBindThreadedFunction(Threaded, lockInstanceDelayOp, LOCK_INSTANCE);
00041     UBindThreadedFunction(Threaded, lockClassDelayOp, LOCK_CLASS);
00042     UBindThreadedFunction(Threaded, lockModuleDelayOp, LOCK_MODULE);
00043     UBindThreadedFunction(Threaded, lockFunctionDelayOp, LOCK_FUNCTION);
00044     UBindThreadedFunction(Threaded, lockFunctionDropDelayOp,
00045                           LOCK_FUNCTION_DROP);
00046     UBindThreadedFunction(Threaded, lockFunctionKeepOneDelayOp,
00047                           LOCK_FUNCTION_KEEP_ONE);
00048     UBindVar(Threaded, updated);
00049     updated = 0;
00050     UBindVar(Threaded, timerUpdated);
00051     timerUpdated = 0;
00052     UBindVar(Threaded, lastChange);
00053     lastChange = UList();
00054     UBindVar(Threaded, lastAccess);
00055     lastAccess = UList();
00056     if (n == "Threaded")
00057       init();
00058   }
00059   ~Threaded();
00060   void lockNoneDelayOp(int id, int delay) { delayOp(id, delay);}
00061   void lockInstanceDelayOp(int id, int delay) { delayOp(id, delay);}
00062   void lockFunctionDelayOp(int id, int delay) { delayOp(id, delay);}
00063   void lockFunctionDropDelayOp(int id, int delay) { delayOp(id, delay);}
00064   void lockFunctionKeepOneDelayOp(int id, int delay) { delayOp(id, delay);}
00065   void lockClassDelayOp(int id, int delay) { delayOp(id, delay);}
00066   void lockModuleDelayOp(int id, int delay) { delayOp(id, delay);}
00067   void delayOp(int id, int delay);
00068   void terminate();
00069   UVar updated;
00070   UVar timerUpdated;
00071   UVar lastChange;
00072   UVar lastAccess;
00073   void init();
00074   virtual int update();
00075   int onChangeDelay(UValue v);
00076   int onChange(UVar& v);
00077   int onAccess(UVar& v);
00078   void onTimer();
00079   UValue getLastRead(unsigned id);
00081   int queueOp(unsigned tid, int op, UList args);
00082   void throwException(int what);
00084   int startThread();
00085   // Thread main loop body, returns false when it wants to end.
00086   bool threadLoopBody(int id);
00087   // Thread main loop: repeatedly call threadLoopBody.
00088   void threadLoop(int id);
00089   int dummy();
00090   enum OpType
00091   {
00092     WRITE_VAR,
00093     READ_VAR,
00094     CREATE_VAR,
00095     DELETE_VAR,
00096     NOTIFY_CHANGE,
00097     NOTIFY_ACCESS,
00098     BIND_FUNCTION,
00099     SET_UPDATE,
00100     SET_TIMER,
00101     UNSET_TIMER,
00102     UNNOTIFY,
00103     GETUOBJECT,
00104     DELAY,
00105     DIE,
00106     SET_UOWNED,
00107     SET_BYPASS,
00108     WRITE_BINARY,
00109     EMIT
00110   };
00111 
00112   struct Operation
00113   {
00114     Operation() : op(-1) {}
00115     Operation(int a, const UList &b)
00116       : op(a), args(b)
00117     {}
00118     int op;
00119     UList args;
00120   };
00121   struct Context
00122   {
00123     Context(int id)
00124     : hasOp(false), dead(false), id(id) {}
00125     std::list<Operation> ops;
00126     bool hasOp;
00127     libport::Lockable opLock;
00128     std::vector<UVar*> vars;
00129     std::vector<TimerHandle> timers;
00130     UValue lastRead;
00131     pthread_t threadId;
00132     bool dead;
00133     int id;
00134   };
00135   libport::Lockable opsLock;
00136   std::vector<Context*> ops;
00137 };
00138 
00139 UStart(Threaded);
00140 
00141 static const char* opname[] =
00142   {
00143     "WRITE_VAR",
00144     "READ_VAR",
00145     "CREATE_VAR",
00146     "DELETE_VAR",
00147     "NOTIFY_CHANGE",
00148     "NOTIFY_ACCESS",
00149     "BIND_FUNCTION",
00150     "SET_UPDATE",
00151     "SET_TIMER",
00152     "UNSET_TIMER",
00153     "UNNOTIFY",
00154     "GETUOBJECT",
00155     "DELAY",
00156     "DIE",
00157     "SET_UOWNED",
00158     "SET_BYPASS",
00159     "WRITE_BINARY",
00160     "EMIT",
00161     0
00162   };
00163 void Threaded::init()
00164 {
00165   for (int i=0; opname[i]; ++i)
00166   {
00167     UVar v;
00168     v.init(__name, opname[i]);
00169     v = i;
00170   }
00171 }
00172 
00173 Threaded::~Threaded()
00174 {
00175   terminate();
00176 }
00177 
00178 void Threaded::terminate()
00179 {
00180   foreach(Context*c, ops)
00181   {
00182     if (!c->dead)
00183     {
00184       queueOp(c->id, DIE, UList());
00185       void* retval;
00186       std::cerr <<"joining " << c->id << std::endl;
00187       pthread_join(c->threadId, &retval);
00188       std::cerr <<"done" << std::endl;
00189     }
00190   }
00191 }
00192 
00193 int Threaded::queueOp(unsigned tid, int op, UList args)
00194 {
00195   if (tid >= ops.size())
00196     return 0;
00197   libport::BlockLock bl (ops[tid]->opLock);
00198   ops[tid]->ops.push_back(Operation(op, args));
00199   ops[tid]->hasOp = true;
00200   GD_FINFO_TRACE("Queued operation %s for thread %s",  op, tid);
00201   return 1;
00202 }
00203 
00204 int Threaded::startThread()
00205 {
00206   UValue v;
00207   libport::BlockLock bl (opsLock);
00208   int id = ops.size();
00209   ops.push_back(new Context(ops.size()));
00210   ops.back()->threadId =
00211     libport::startThread(boost::bind(&Threaded::threadLoop, this, id));
00212   GD_FINFO_DUMP("Started thread id %s", id);
00213   return id;
00214 }
00215 
00216 UValue Threaded::getLastRead(unsigned tid)
00217 {
00218   if (tid >= ops.size())
00219     return UValue();
00220   libport::BlockLock bl (ops[tid]->opLock);
00221   return ops[tid]->lastRead;
00222 }
00223 
00224 void Threaded::delayOp(int id, int delay)
00225 {
00226   GD_FINFO_DUMP("delaying %s on %s...", delay, id);
00227   usleep(delay);
00228   GD_FINFO_DUMP("...executing one op on %s", id);
00229   threadLoopBody(id);
00230 }
00231 
00232 bool Threaded::threadLoopBody(int id)
00233 {
00234   #define type0 (op.args[0].type)
00235   #define string0 (*op.args[0].stringValue)
00236   #define float0 (op.args[0].val)
00237   #define int0 static_cast<int>(float0)
00238   Context* pctx;
00239   {
00240     libport::BlockLock bl (opsLock);
00241     pctx = ops[id];
00242   }
00243   Context& ctx = *pctx;
00244   // Randomize behavior to increase chance of detecting a race condition.
00245   usleep(rand() % 100000);
00246   if (ctx.hasOp)
00247   {
00248     Operation op;
00249     {
00250       libport::BlockLock bl (ctx.opLock);
00251       op = ctx.ops.front();
00252       ctx.ops.pop_front();
00253       if (ctx.ops.empty())
00254         ctx.hasOp = false;
00255     }
00256     GD_FINFO_TRACE("[%s] Executing operation %s", id, op.op);
00257     switch(op.op)
00258     {
00259     case SET_BYPASS:
00260       ctx.vars[int0]->setBypass(true);
00261       break;
00262     case SET_UOWNED:
00263       ctx.vars[int0]->setOwned();
00264       break;
00265     case WRITE_BINARY:
00266       {
00267         UBinary b;
00268         b.type = BINARY_IMAGE;
00269         b.image.width = 3;
00270         b.image.height = 3;
00271         b.image.imageFormat = IMAGE_RGB;
00272         b.image.size = 9;
00273         b.image.data = (unsigned char*)const_cast<char*>("abcdefghi");
00274         *( ctx.vars[int0]) = b;
00275         b.image.data = 0;
00276       }
00277       break;
00278     case WRITE_VAR:
00279       if (type0 == DATA_STRING)
00280       {
00281         UVar v(string0);
00282         if (op.args.size() > 2)
00283           for(int i=0; i<op.args[2].val; ++i)
00284             v = op.args[1];
00285         else
00286           v = op.args[1];
00287       }
00288       else
00289       {
00290         std::cerr
00291           << libport::utime()
00292           << " writevar " << id
00293           << " " << ctx.vars[int0]->get_name()
00294           << " " << op.args[1]
00295           << std::endl;
00296         *ctx.vars[int0] = op.args[1];
00297         std::cerr << libport::utime() << "done write " << id << std::endl;
00298       }
00299       break;
00300     case READ_VAR:
00301       {
00302         UValue val;
00303         if (type0 == DATA_STRING)
00304         {
00305           UVar v(string0);
00306           val = v.val();
00307         }
00308         else
00309           val = ctx.vars[int0]->val();
00310         libport::BlockLock bl (ctx.opLock);
00311         ctx.lastRead = val;
00312       }
00313       break;
00314     case CREATE_VAR:
00315       ctx.vars.push_back(new UVar(string0));
00316       break;
00317     case DELETE_VAR:
00318       {
00319         size_t idx = int0;
00320         delete ctx.vars[idx];
00321         if (idx != ctx.vars.size()-1)
00322           ctx.vars[idx] = ctx.vars[ctx.vars.size()-1];
00323         ctx.vars.pop_back();
00324       }
00325       break;
00326     case NOTIFY_CHANGE:
00327       // Bind in async mode if an extra arg is given.
00328       if (op.args.size() > 1)
00329         if (type0 == DATA_STRING)
00330           UNotifyThreadedChange(string0, &Threaded::onChangeDelay,
00331                                 LOCK_FUNCTION);
00332         else
00333           UNotifyThreadedChange(*ctx.vars[int0], &Threaded::onChangeDelay,
00334                                 LOCK_FUNCTION);
00335       else
00336         if (type0 == DATA_STRING)
00337           UNotifyChange(string0, &Threaded::onChange);
00338         else
00339           UNotifyChange(*ctx.vars[int0], &Threaded::onChange);
00340       break;
00341     case NOTIFY_ACCESS:
00342        if (type0 == DATA_STRING)
00343         UNotifyAccess(string0, &Threaded::onAccess);
00344       else
00345         UNotifyAccess(*ctx.vars[int0], &Threaded::onAccess);
00346       break;
00347     case BIND_FUNCTION:
00348       ::urbi::createUCallback(*this, 0, "function", this,
00349                         (&Threaded::dummy), __name + "." + string0);
00350       break;
00351     case SET_UPDATE:
00352       USetUpdate(float0);
00353       break;
00354     case SET_TIMER:
00355       ctx.timers.push_back(USetTimer(float0, &Threaded::onTimer));
00356       break;
00357     case UNSET_TIMER:
00358       removeTimer(ctx.timers[int0]);
00359       if (ctx.timers.size() == 1)
00360         ctx.timers.pop_back();
00361       else
00362       {
00363         ctx.timers[int0] = ctx.timers.back();
00364         ctx.timers.pop_back();
00365       }
00366       break;
00367     case UNNOTIFY:
00368       ctx.vars[int0]->unnotify();
00369       break;
00370     case GETUOBJECT:
00371       {
00372         UObject* uob = getUObject(string0);
00373         libport::BlockLock bl (ctx.opLock);
00374         if (!uob)
00375           ctx.lastRead =  "0";
00376         else
00377           ctx.lastRead = uob->__name;
00378       }
00379       break;
00380     case EMIT:
00381       {
00382         UEvent e(string0);
00383         e.emit(12, 15, "canard");
00384       }
00385       break;
00386     case DELAY:
00387       usleep(int0);
00388       break;
00389     case DIE:
00390       ctx.dead = true;
00391       return false;
00392       break;
00393     }
00394     GD_FINFO_TRACE("[%s] Done executing operation %s", id, op.op);
00395   }
00396   return true;
00397 }
00398 
00399 void Threaded::throwException(int what)
00400 {
00401   switch(what)
00402   {
00403   case 0:
00404     throw 42;
00405   default:
00406     throw std::runtime_error("pan");
00407   }
00408 }
00409 
00410 void Threaded::threadLoop(int id)
00411 {
00412   GD_FINFO_DUMP("Entering threadLoop for id %s", id);
00413   while (true)
00414   {
00415     try
00416       {
00417         if (!threadLoopBody(id))
00418         {
00419           GD_FINFO_DUMP("Exiting threadLoop for id %s", id);
00420           return;
00421         }
00422       }
00423     catch(std::exception& e)
00424       {
00425         GD_FINFO_TRACE("Exiting threadLoop with exception %s", e.what());
00426         std::cerr <<"exception " << e.what() << std::endl;
00427       }
00428     catch(...)
00429       {
00430         GD_INFO_TRACE("Exiting threadLoop with unknown exception");
00431         std::cerr <<"unknown exception" << std::endl;
00432       }
00433   }
00434 }
00435 
00436 
00437 int Threaded::update()
00438 {
00439   updated = (int)updated + 1;
00440   return 0;
00441 }
00442 
00443 int Threaded::onChangeDelay(UValue v)
00444 {
00445   usleep(500000);
00446   lastChange = v;
00447   return 0;
00448 }
00449 
00450 int Threaded::onChange(UVar& v)
00451 {
00452   lastChange = v.get_name();
00453   return 0;
00454 }
00455 
00456 int Threaded::onAccess(UVar& v)
00457 {
00458   UList l = lastAccess;
00459   l.array.push_back(new UValue(v.get_name()));
00460   lastAccess = l;
00461   return 0;
00462 };
00463 
00464 void Threaded::onTimer()
00465 {
00466   timerUpdated = (int)timerUpdated + 1;
00467 }
00468 
00469 int Threaded::dummy()
00470 {
00471   return 42;
00472 }