|
Urbi SDK Remote for C++
2.7.5
|
00001 /* 00002 * Copyright (C) 2010-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00011 #include <urbi/uobject.hh> 00012 #include <libport/foreach.hh> 00013 #include <libport/thread.hh> 00014 #include <libport/lockable.hh> 00015 using namespace urbi; 00016 00017 00018 00019 GD_CATEGORY(Test.Threaded); 00020 00027 class Threaded: public UObject 00028 { 00029 public: 00030 CREATE_CLASS_LOCK 00031 Threaded(const std::string& n) 00032 :UObject(n) 00033 { 00034 UBindFunction(Threaded, init); 00035 UBindFunction(Threaded, queueOp); 00036 UBindFunction(Threaded, getLastRead); 00037 UBindFunction(Threaded, startThread); 00038 UBindThreadedFunction(Threaded, throwException, LOCK_NONE); 00039 UBindThreadedFunction(Threaded, lockNoneDelayOp, LOCK_NONE); 00040 UBindThreadedFunction(Threaded, lockInstanceDelayOp, LOCK_INSTANCE); 00041 UBindThreadedFunction(Threaded, lockClassDelayOp, LOCK_CLASS); 00042 UBindThreadedFunction(Threaded, lockModuleDelayOp, LOCK_MODULE); 00043 UBindThreadedFunction(Threaded, lockFunctionDelayOp, LOCK_FUNCTION); 00044 UBindThreadedFunction(Threaded, lockFunctionDropDelayOp, 00045 LOCK_FUNCTION_DROP); 00046 UBindThreadedFunction(Threaded, lockFunctionKeepOneDelayOp, 00047 LOCK_FUNCTION_KEEP_ONE); 00048 UBindVar(Threaded, updated); 00049 updated = 0; 00050 UBindVar(Threaded, timerUpdated); 00051 timerUpdated = 0; 00052 UBindVar(Threaded, lastChange); 00053 lastChange = UList(); 00054 UBindVar(Threaded, lastAccess); 00055 lastAccess = UList(); 00056 if (n == "Threaded") 00057 init(); 00058 } 00059 ~Threaded(); 00060 void lockNoneDelayOp(int id, int delay) { delayOp(id, delay);} 00061 void lockInstanceDelayOp(int id, int delay) { delayOp(id, delay);} 00062 void lockFunctionDelayOp(int id, int delay) { delayOp(id, delay);} 00063 void lockFunctionDropDelayOp(int id, int delay) { delayOp(id, delay);} 00064 void lockFunctionKeepOneDelayOp(int id, int delay) { delayOp(id, delay);} 00065 void lockClassDelayOp(int id, int delay) { delayOp(id, delay);} 00066 void lockModuleDelayOp(int id, int delay) { delayOp(id, delay);} 00067 void delayOp(int id, int delay); 00068 void terminate(); 00069 UVar updated; 00070 UVar timerUpdated; 00071 UVar lastChange; 00072 UVar lastAccess; 00073 void init(); 00074 virtual int update(); 00075 int onChangeDelay(UValue v); 00076 int onChange(UVar& v); 00077 int onAccess(UVar& v); 00078 void onTimer(); 00079 UValue getLastRead(unsigned id); 00081 int queueOp(unsigned tid, int op, UList args); 00082 void throwException(int what); 00084 int startThread(); 00085 // Thread main loop body, returns false when it wants to end. 00086 bool threadLoopBody(int id); 00087 // Thread main loop: repeatedly call threadLoopBody. 00088 void threadLoop(int id); 00089 int dummy(); 00090 enum OpType 00091 { 00092 WRITE_VAR, 00093 READ_VAR, 00094 CREATE_VAR, 00095 DELETE_VAR, 00096 NOTIFY_CHANGE, 00097 NOTIFY_ACCESS, 00098 BIND_FUNCTION, 00099 SET_UPDATE, 00100 SET_TIMER, 00101 UNSET_TIMER, 00102 UNNOTIFY, 00103 GETUOBJECT, 00104 DELAY, 00105 DIE, 00106 SET_UOWNED, 00107 SET_BYPASS, 00108 WRITE_BINARY, 00109 EMIT 00110 }; 00111 00112 struct Operation 00113 { 00114 Operation() : op(-1) {} 00115 Operation(int a, const UList &b) 00116 : op(a), args(b) 00117 {} 00118 int op; 00119 UList args; 00120 }; 00121 struct Context 00122 { 00123 Context(int id) 00124 : hasOp(false), dead(false), id(id) {} 00125 std::list<Operation> ops; 00126 bool hasOp; 00127 libport::Lockable opLock; 00128 std::vector<UVar*> vars; 00129 std::vector<TimerHandle> timers; 00130 UValue lastRead; 00131 pthread_t threadId; 00132 bool dead; 00133 int id; 00134 }; 00135 libport::Lockable opsLock; 00136 std::vector<Context*> ops; 00137 }; 00138 00139 UStart(Threaded); 00140 00141 static const char* opname[] = 00142 { 00143 "WRITE_VAR", 00144 "READ_VAR", 00145 "CREATE_VAR", 00146 "DELETE_VAR", 00147 "NOTIFY_CHANGE", 00148 "NOTIFY_ACCESS", 00149 "BIND_FUNCTION", 00150 "SET_UPDATE", 00151 "SET_TIMER", 00152 "UNSET_TIMER", 00153 "UNNOTIFY", 00154 "GETUOBJECT", 00155 "DELAY", 00156 "DIE", 00157 "SET_UOWNED", 00158 "SET_BYPASS", 00159 "WRITE_BINARY", 00160 "EMIT", 00161 0 00162 }; 00163 void Threaded::init() 00164 { 00165 for (int i=0; opname[i]; ++i) 00166 { 00167 UVar v; 00168 v.init(__name, opname[i]); 00169 v = i; 00170 } 00171 } 00172 00173 Threaded::~Threaded() 00174 { 00175 terminate(); 00176 } 00177 00178 void Threaded::terminate() 00179 { 00180 foreach(Context*c, ops) 00181 { 00182 if (!c->dead) 00183 { 00184 queueOp(c->id, DIE, UList()); 00185 void* retval; 00186 std::cerr <<"joining " << c->id << std::endl; 00187 pthread_join(c->threadId, &retval); 00188 std::cerr <<"done" << std::endl; 00189 } 00190 } 00191 } 00192 00193 int Threaded::queueOp(unsigned tid, int op, UList args) 00194 { 00195 if (tid >= ops.size()) 00196 return 0; 00197 libport::BlockLock bl (ops[tid]->opLock); 00198 ops[tid]->ops.push_back(Operation(op, args)); 00199 ops[tid]->hasOp = true; 00200 GD_FINFO_TRACE("Queued operation %s for thread %s", op, tid); 00201 return 1; 00202 } 00203 00204 int Threaded::startThread() 00205 { 00206 UValue v; 00207 libport::BlockLock bl (opsLock); 00208 int id = ops.size(); 00209 ops.push_back(new Context(ops.size())); 00210 ops.back()->threadId = 00211 libport::startThread(boost::bind(&Threaded::threadLoop, this, id)); 00212 GD_FINFO_DUMP("Started thread id %s", id); 00213 return id; 00214 } 00215 00216 UValue Threaded::getLastRead(unsigned tid) 00217 { 00218 if (tid >= ops.size()) 00219 return UValue(); 00220 libport::BlockLock bl (ops[tid]->opLock); 00221 return ops[tid]->lastRead; 00222 } 00223 00224 void Threaded::delayOp(int id, int delay) 00225 { 00226 GD_FINFO_DUMP("delaying %s on %s...", delay, id); 00227 usleep(delay); 00228 GD_FINFO_DUMP("...executing one op on %s", id); 00229 threadLoopBody(id); 00230 } 00231 00232 bool Threaded::threadLoopBody(int id) 00233 { 00234 #define type0 (op.args[0].type) 00235 #define string0 (*op.args[0].stringValue) 00236 #define float0 (op.args[0].val) 00237 #define int0 static_cast<int>(float0) 00238 Context* pctx; 00239 { 00240 libport::BlockLock bl (opsLock); 00241 pctx = ops[id]; 00242 } 00243 Context& ctx = *pctx; 00244 // Randomize behavior to increase chance of detecting a race condition. 00245 usleep(rand() % 100000); 00246 if (ctx.hasOp) 00247 { 00248 Operation op; 00249 { 00250 libport::BlockLock bl (ctx.opLock); 00251 op = ctx.ops.front(); 00252 ctx.ops.pop_front(); 00253 if (ctx.ops.empty()) 00254 ctx.hasOp = false; 00255 } 00256 GD_FINFO_TRACE("[%s] Executing operation %s", id, op.op); 00257 switch(op.op) 00258 { 00259 case SET_BYPASS: 00260 ctx.vars[int0]->setBypass(true); 00261 break; 00262 case SET_UOWNED: 00263 ctx.vars[int0]->setOwned(); 00264 break; 00265 case WRITE_BINARY: 00266 { 00267 UBinary b; 00268 b.type = BINARY_IMAGE; 00269 b.image.width = 3; 00270 b.image.height = 3; 00271 b.image.imageFormat = IMAGE_RGB; 00272 b.image.size = 9; 00273 b.image.data = (unsigned char*)const_cast<char*>("abcdefghi"); 00274 *( ctx.vars[int0]) = b; 00275 b.image.data = 0; 00276 } 00277 break; 00278 case WRITE_VAR: 00279 if (type0 == DATA_STRING) 00280 { 00281 UVar v(string0); 00282 if (op.args.size() > 2) 00283 for(int i=0; i<op.args[2].val; ++i) 00284 v = op.args[1]; 00285 else 00286 v = op.args[1]; 00287 } 00288 else 00289 { 00290 std::cerr 00291 << libport::utime() 00292 << " writevar " << id 00293 << " " << ctx.vars[int0]->get_name() 00294 << " " << op.args[1] 00295 << std::endl; 00296 *ctx.vars[int0] = op.args[1]; 00297 std::cerr << libport::utime() << "done write " << id << std::endl; 00298 } 00299 break; 00300 case READ_VAR: 00301 { 00302 UValue val; 00303 if (type0 == DATA_STRING) 00304 { 00305 UVar v(string0); 00306 val = v.val(); 00307 } 00308 else 00309 val = ctx.vars[int0]->val(); 00310 libport::BlockLock bl (ctx.opLock); 00311 ctx.lastRead = val; 00312 } 00313 break; 00314 case CREATE_VAR: 00315 ctx.vars.push_back(new UVar(string0)); 00316 break; 00317 case DELETE_VAR: 00318 { 00319 size_t idx = int0; 00320 delete ctx.vars[idx]; 00321 if (idx != ctx.vars.size()-1) 00322 ctx.vars[idx] = ctx.vars[ctx.vars.size()-1]; 00323 ctx.vars.pop_back(); 00324 } 00325 break; 00326 case NOTIFY_CHANGE: 00327 // Bind in async mode if an extra arg is given. 00328 if (op.args.size() > 1) 00329 if (type0 == DATA_STRING) 00330 UNotifyThreadedChange(string0, &Threaded::onChangeDelay, 00331 LOCK_FUNCTION); 00332 else 00333 UNotifyThreadedChange(*ctx.vars[int0], &Threaded::onChangeDelay, 00334 LOCK_FUNCTION); 00335 else 00336 if (type0 == DATA_STRING) 00337 UNotifyChange(string0, &Threaded::onChange); 00338 else 00339 UNotifyChange(*ctx.vars[int0], &Threaded::onChange); 00340 break; 00341 case NOTIFY_ACCESS: 00342 if (type0 == DATA_STRING) 00343 UNotifyAccess(string0, &Threaded::onAccess); 00344 else 00345 UNotifyAccess(*ctx.vars[int0], &Threaded::onAccess); 00346 break; 00347 case BIND_FUNCTION: 00348 ::urbi::createUCallback(*this, 0, "function", this, 00349 (&Threaded::dummy), __name + "." + string0); 00350 break; 00351 case SET_UPDATE: 00352 USetUpdate(float0); 00353 break; 00354 case SET_TIMER: 00355 ctx.timers.push_back(USetTimer(float0, &Threaded::onTimer)); 00356 break; 00357 case UNSET_TIMER: 00358 removeTimer(ctx.timers[int0]); 00359 if (ctx.timers.size() == 1) 00360 ctx.timers.pop_back(); 00361 else 00362 { 00363 ctx.timers[int0] = ctx.timers.back(); 00364 ctx.timers.pop_back(); 00365 } 00366 break; 00367 case UNNOTIFY: 00368 ctx.vars[int0]->unnotify(); 00369 break; 00370 case GETUOBJECT: 00371 { 00372 UObject* uob = getUObject(string0); 00373 libport::BlockLock bl (ctx.opLock); 00374 if (!uob) 00375 ctx.lastRead = "0"; 00376 else 00377 ctx.lastRead = uob->__name; 00378 } 00379 break; 00380 case EMIT: 00381 { 00382 UEvent e(string0); 00383 e.emit(12, 15, "canard"); 00384 } 00385 break; 00386 case DELAY: 00387 usleep(int0); 00388 break; 00389 case DIE: 00390 ctx.dead = true; 00391 return false; 00392 break; 00393 } 00394 GD_FINFO_TRACE("[%s] Done executing operation %s", id, op.op); 00395 } 00396 return true; 00397 } 00398 00399 void Threaded::throwException(int what) 00400 { 00401 switch(what) 00402 { 00403 case 0: 00404 throw 42; 00405 default: 00406 throw std::runtime_error("pan"); 00407 } 00408 } 00409 00410 void Threaded::threadLoop(int id) 00411 { 00412 GD_FINFO_DUMP("Entering threadLoop for id %s", id); 00413 while (true) 00414 { 00415 try 00416 { 00417 if (!threadLoopBody(id)) 00418 { 00419 GD_FINFO_DUMP("Exiting threadLoop for id %s", id); 00420 return; 00421 } 00422 } 00423 catch(std::exception& e) 00424 { 00425 GD_FINFO_TRACE("Exiting threadLoop with exception %s", e.what()); 00426 std::cerr <<"exception " << e.what() << std::endl; 00427 } 00428 catch(...) 00429 { 00430 GD_INFO_TRACE("Exiting threadLoop with unknown exception"); 00431 std::cerr <<"unknown exception" << std::endl; 00432 } 00433 } 00434 } 00435 00436 00437 int Threaded::update() 00438 { 00439 updated = (int)updated + 1; 00440 return 0; 00441 } 00442 00443 int Threaded::onChangeDelay(UValue v) 00444 { 00445 usleep(500000); 00446 lastChange = v; 00447 return 0; 00448 } 00449 00450 int Threaded::onChange(UVar& v) 00451 { 00452 lastChange = v.get_name(); 00453 return 0; 00454 } 00455 00456 int Threaded::onAccess(UVar& v) 00457 { 00458 UList l = lastAccess; 00459 l.array.push_back(new UValue(v.get_name())); 00460 lastAccess = l; 00461 return 0; 00462 }; 00463 00464 void Threaded::onTimer() 00465 { 00466 timerUpdated = (int)timerUpdated + 1; 00467 } 00468 00469 int Threaded::dummy() 00470 { 00471 return 42; 00472 }