YARP
Yet Another Robot Platform
SubscriberOnSql.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include <cstdlib>
8 #include <cstdio>
9 
10 #include <sqlite3.h>
11 
12 #include <yarp/os/RosNameSpace.h>
16 
17 #if !defined(_WIN32)
18 #include <unistd.h>
19 #else
20 #include <io.h>
21 #define access(f,a) _access(f,a)
22 #endif
23 
24 #include <vector>
25 #include <string>
26 
27 #ifndef F_OK
28 #define F_OK 0
29 #endif
30 
31 #define SQLDB(x) ((sqlite3*)(x))
32 
33 using namespace yarp::os;
34 using namespace yarp::serversql::impl;
35 
36 namespace {
37 YARP_SERVERSQL_LOG_COMPONENT(SUBSCRIBERONSQL, "yarp.serversql.impl.SubscriberOnSql")
38 } // namespace
39 
40 
41 bool SubscriberOnSql::open(const std::string& filename, bool fresh) {
42  sqlite3 *db = nullptr;
43  if (fresh) {
44  int result = access(filename.c_str(),F_OK);
45  if (result==0) {
46  yCWarning(SUBSCRIBERONSQL, "Database needs to be recreated.");
47  yCWarning(SUBSCRIBERONSQL, "Please move %s out of the way.", filename.c_str());
48  return false;
49  }
50 
51  }
52  int result = sqlite3_open_v2(filename.c_str(),
53  &db,
54  SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_NOMUTEX,
55  nullptr);
56  if (result!=SQLITE_OK) {
57  yCError(SUBSCRIBERONSQL, "Failed to open database %s", filename.c_str());
58  if (db != nullptr) {
59  sqlite3_close(db);
60  }
61  return false;
62  }
63 
64  const char *create_subscribe_table = "CREATE TABLE IF NOT EXISTS subscriptions (\n\
65  id INTEGER PRIMARY KEY,\n\
66  src TEXT,\n\
67  dest TEXT,\n\
68  srcFull TEXT,\n\
69  destFull TEXT,\n\
70  mode TEXT);";
71 
72  result = sqlite3_exec(db, create_subscribe_table, nullptr, nullptr, nullptr);
73  if (result!=SQLITE_OK) {
74  sqlite3_close(db);
75  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
76  std::exit(1);
77  }
78 
79  const char *check_subscriptions_size = "PRAGMA table_info(subscriptions)";
80 
81  sqlite3_stmt *statement = nullptr;
82  result = sqlite3_prepare_v2(db, check_subscriptions_size, -1, &statement, nullptr);
83  if (result!=SQLITE_OK) {
84  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
85  std::exit(1);
86  }
87 
88  int count = 0;
89  while (sqlite3_step(statement) == SQLITE_ROW) {
90  count++;
91  }
92  sqlite3_finalize(statement);
93 
94  if (count==5) {
95  const char *add_structure = "ALTER TABLE subscriptions ADD COLUMN mode";
96  result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
97  if (result!=SQLITE_OK) {
98  sqlite3_close(db);
99  yCError(SUBSCRIBERONSQL, "Failed to set up subscriptions table");
100  std::exit(1);
101  }
102  }
103 
104  const char *create_topic_table = "CREATE TABLE IF NOT EXISTS topics (\n\
105  id INTEGER PRIMARY KEY,\n\
106  topic TEXT,\n\
107  structure TEXT);";
108 
109  result = sqlite3_exec(db, create_topic_table, nullptr, nullptr, nullptr);
110  if (result!=SQLITE_OK) {
111  sqlite3_close(db);
112  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
113  std::exit(1);
114  }
115 
116  const char *check_topic_size = "PRAGMA table_info(topics)";
117 
118  statement = nullptr;
119  result = sqlite3_prepare_v2(db, check_topic_size, -1, &statement, nullptr);
120  if (result!=SQLITE_OK) {
121  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
122  std::exit(1);
123  }
124 
125  count = 0;
126  while (sqlite3_step(statement) == SQLITE_ROW) {
127  //sqlite3_column_text(statement,1);
128  count++;
129  }
130  sqlite3_finalize(statement);
131 
132  if (count==2) {
133  const char *add_structure = "ALTER TABLE topics ADD COLUMN structure";
134  result = sqlite3_exec(db, add_structure, nullptr, nullptr, nullptr);
135  if (result!=SQLITE_OK) {
136  sqlite3_close(db);
137  yCError(SUBSCRIBERONSQL, "Failed to set up topics table");
138  std::exit(1);
139  }
140  }
141 
142  const char *create_live_table = "CREATE TABLE IF NOT EXISTS live (\n\
143  id INTEGER PRIMARY KEY,\n\
144  name TEXT UNIQUE,\n\
145  stamp DATETIME);";
146 
147  result = sqlite3_exec(db, create_live_table, nullptr, nullptr, nullptr);
148  if (result!=SQLITE_OK) {
149  sqlite3_close(db);
150  yCError(SUBSCRIBERONSQL, "Failed to set up live table");
151  std::exit(1);
152  }
153 
154  const char *create_struct_table = "CREATE TABLE IF NOT EXISTS structures (\n\
155  name TEXT PRIMARY KEY,\n\
156  yarp TEXT);";
157 
158  result = sqlite3_exec(db, create_struct_table, nullptr, nullptr, nullptr);
159  if (result!=SQLITE_OK) {
160  sqlite3_close(db);
161  yCError(SUBSCRIBERONSQL, "Failed to set up structures table");
162  std::exit(1);
163  }
164 
165  implementation = db;
166  return true;
167 }
168 
169 
170 bool SubscriberOnSql::close() {
171  if (implementation != nullptr) {
172  auto* db = (sqlite3 *)implementation;
173  sqlite3_close(db);
174  implementation = nullptr;
175  }
176  return true;
177 }
178 
179 bool SubscriberOnSql::addSubscription(const std::string& src,
180  const std::string& dest,
181  const std::string& mode) {
182  removeSubscription(src,dest);
183  ParseName psrc, pdest;
184  psrc.apply(src);
185  pdest.apply(dest);
186  if (psrc.getCarrier()=="topic") {
187  setTopic(psrc.getPortName(),"",true);
188  }
189  if (pdest.getCarrier()=="topic") {
190  setTopic(pdest.getPortName(),"",true);
191  }
192  char *msg = nullptr;
193  const char *zmode = mode.c_str();
194  if (mode == "") {
195  zmode = nullptr;
196  }
197  char *query = sqlite3_mprintf("INSERT INTO subscriptions (src,dest,srcFull,destFull,mode) VALUES(%Q,%Q,%Q,%Q,%Q)",
198  psrc.getPortName().c_str(),
199  pdest.getPortName().c_str(),
200  src.c_str(),
201  dest.c_str(),
202  zmode);
203  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
204 
205  bool ok = true;
206  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
207  if (result!=SQLITE_OK) {
208  ok = false;
209  if (msg != nullptr) {
210  yCError(SUBSCRIBERONSQL, "%s", msg);
211  sqlite3_free(msg);
212  }
213  }
214  sqlite3_free(query);
215  if (ok) {
216  if (psrc.getCarrier()!="topic") {
217  if (pdest.getCarrier()!="topic") {
218  checkSubscription(psrc.getPortName(),
219  pdest.getPortName(),
220  src,
221  dest,
222  mode);
223  } else {
224  hookup(psrc.getPortName());
225  }
226  } else {
227  if (pdest.getCarrier()!="topic") {
228  hookup(pdest.getPortName());
229  }
230  }
231  }
232  return ok;
233 }
234 
235 bool SubscriberOnSql::removeSubscription(const std::string& src,
236  const std::string& dest) {
237  ParseName psrc, pdest;
238  psrc.apply(src);
239  pdest.apply(dest);
240  char *query = sqlite3_mprintf("DELETE FROM subscriptions WHERE src = %Q AND dest = %Q",
241  psrc.getPortName().c_str(),
242  pdest.getPortName().c_str());
243  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
244 
245  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
246  bool ok = true;
247  if (result!=SQLITE_OK) {
248  yCError(SUBSCRIBERONSQL, "Error in query");
249  ok = false;
250  }
251  sqlite3_free(query);
252 
253  return ok;
254 }
255 
256 
257 bool SubscriberOnSql::welcome(const std::string& port, int activity) {
258  mutex.lock();
259 
260  NameSpace *ns = getDelegate();
261  if (ns) {
262  NestedContact nc(port);
263  if (nc.getNestedName().size()>0) {
264  NameStore *store = getStore();
265  if (store != nullptr) {
266  Contact node = store->query(nc.getNodeName());
267  Contact me = store->query(port);
268  if (node.isValid() && me.isValid()) {
269  if (activity>0) {
270  ns->registerAdvanced(me,store);
271  } else {
272  ns->unregisterAdvanced(port,store);
273  }
274  }
275  }
276  }
277  }
278 
279  char *msg = nullptr;
280  char *query;
281  if (activity>0) {
282  query = sqlite3_mprintf("INSERT OR IGNORE INTO live (name,stamp) VALUES(%Q,DATETIME('now'))",
283  port.c_str());
284  } else {
285  // Port not responding. Mark as non-live.
286  if (activity==0) {
287  query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q AND stamp < DATETIME('now','-30 seconds')",
288  port.c_str());
289  } else {
290  // activity = -1 -- definite dodo
291  query = sqlite3_mprintf("DELETE FROM live WHERE name=%Q",
292  port.c_str());
293  }
294  }
295  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
296 
297  bool ok = true;
298  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
299  if (result!=SQLITE_OK) {
300  ok = false;
301  if (msg != nullptr) {
302  yCError(SUBSCRIBERONSQL, "%s", msg);
303  sqlite3_free(msg);
304  }
305  }
306  sqlite3_free(query);
307  mutex.unlock();
308 
309  if (activity>0) {
310  hookup(port);
311  } else if (activity<0) {
312  breakdown(port);
313  }
314  return ok;
315 }
316 
317 bool SubscriberOnSql::hookup(const std::string& port) {
318  if (getDelegate()) {
319  NestedContact nc(port);
320  if (nc.getNestedName().size()>0) {
321  return false;
322  }
323  }
324  mutex.lock();
325  sqlite3_stmt *statement = nullptr;
326  char *query = nullptr;
327  //query = sqlite3_mprintf("SELECT * FROM subscriptions WHERE src = %Q OR dest= %Q",port, port);
328  query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull FROM subscriptions WHERE (src = %Q OR dest= %Q) AND EXISTS (SELECT NULL FROM live WHERE name=src) AND EXISTS (SELECT NULL FROM live WHERE name=dest) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic) AND (s1.src = %Q OR s2.dest = %Q) AND EXISTS (SELECT NULL FROM live WHERE name=s1.src) AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)",port.c_str(), port.c_str(), port.c_str(), port.c_str());
329  //
330  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
331 
332  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
333  if (result!=SQLITE_OK) {
334  const char *msg = sqlite3_errmsg(SQLDB(implementation));
335  if (msg != nullptr) {
336  yCError(SUBSCRIBERONSQL, "%s", msg);
337  }
338  }
339  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
340  char *src = (char *)sqlite3_column_text(statement,0);
341  char *dest = (char *)sqlite3_column_text(statement,1);
342  char *srcFull = (char *)sqlite3_column_text(statement,2);
343  char *destFull = (char *)sqlite3_column_text(statement,3);
344  char *mode = (char *)sqlite3_column_text(statement,4);
345  checkSubscription(src,dest,srcFull,destFull,mode?mode:"");
346  }
347  sqlite3_finalize(statement);
348  sqlite3_free(query);
349  mutex.unlock();
350 
351  return false;
352 }
353 
354 
355 bool SubscriberOnSql::breakdown(const std::string& port) {
356  if (getDelegate()) {
357  NestedContact nc(port);
358  if (nc.getNestedName().size()>0) {
359  return false;
360  }
361  }
362  mutex.lock();
363  sqlite3_stmt *statement = nullptr;
364  char *query = nullptr;
365  // query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=dest)) OR (dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=src))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port, port, port, port);
366  query = sqlite3_mprintf("SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=dest))) OR (dest = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=src)))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port.c_str(), port.c_str(), port.c_str(), port.c_str());
367  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
368 
369  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
370  if (result!=SQLITE_OK) {
371  const char *msg = sqlite3_errmsg(SQLDB(implementation));
372  if (msg != nullptr) {
373  yCError(SUBSCRIBERONSQL, "%s", msg);
374  }
375  }
376  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
377  char *src = (char *)sqlite3_column_text(statement,0);
378  char *dest = (char *)sqlite3_column_text(statement,1);
379  char *srcFull = (char *)sqlite3_column_text(statement,2);
380  char *destFull = (char *)sqlite3_column_text(statement,3);
381  char *mode = (char *)sqlite3_column_text(statement,4);
382  breakSubscription(port,src,dest,srcFull,destFull,mode?mode:"");
383  }
384  sqlite3_finalize(statement);
385  sqlite3_free(query);
386  mutex.unlock();
387 
388  return false;
389 }
390 
391 
392 bool SubscriberOnSql::checkSubscription(const std::string& src,const std::string& dest,
393  const std::string& srcFull,
394  const std::string& destFull,
395  const std::string& mode) {
396  if (getDelegate()) {
397  NestedContact nc(src);
398  if (nc.getNestedName().size()>0) {
399  NestedContact nc(dest);
400  if (nc.getNestedName().size()>0) {
401  return false;
402  }
403  }
404  }
405  yCDebug(SUBSCRIBERONSQL,
406  "+++ Checking %s %s / %s %s",
407  src.c_str(),
408  dest.c_str(),
409  srcFull.c_str(),
410  destFull.c_str());
411 
412  NameStore *store = getStore();
413  if (store != nullptr) {
414  Contact csrc = store->query(src);
415  Contact cdest = store->query(dest);
416  if (csrc.isValid()&&cdest.isValid()) {
417  bool srcTopic = (csrc.getCarrier()=="topic");
418  bool destTopic = (cdest.getCarrier()=="topic");
419  if (!(srcTopic||destTopic)) {
420  yCDebug(SUBSCRIBERONSQL,
421  "++> check connection %s %s",
422  srcFull.c_str(),
423  destFull.c_str());
424  connect(srcFull,destFull);
425  }
426  }
427  if (mode!="") {
428  std::string mode_name = mode;
429  if (mode_name=="from") {
430  if (!csrc.isValid()) {
431  removeSubscription(src,dest);
432  }
433  } else if (mode_name=="to") {
434  if (!cdest.isValid()) {
435  removeSubscription(src,dest);
436  }
437  }
438  }
439  }
440  return false;
441 }
442 
443 
444 bool SubscriberOnSql::breakSubscription(const std::string& dropper,
445  const std::string& src, const std::string& dest,
446  const std::string& srcFull,
447  const std::string& destFull,
448  const std::string& mode) {
449  if (getDelegate()) {
450  NestedContact nc(src);
451  if (nc.getNestedName().size()>0) {
452  NestedContact nc(dest);
453  if (nc.getNestedName().size()>0) {
454  return false;
455  }
456  }
457  }
458  yCDebug(SUBSCRIBERONSQL,
459  "--- Checking %s %s / %s %s",
460  src.c_str(),
461  dest.c_str(),
462  srcFull.c_str(),
463  destFull.c_str());
464  NameStore *store = getStore();
465  if (store != nullptr) {
466  bool srcDrop = std::string(dropper) == src;
467  Contact contact;
468  if (srcDrop) {
469  contact = store->query(src);
470  } else {
471  contact = store->query(dest);
472  }
473  if (contact.isValid()) {
474  yCDebug(SUBSCRIBERONSQL,
475  "--> check connection %s %s",
476  srcFull.c_str(),
477  destFull.c_str());
478  disconnect(srcFull,destFull,srcDrop);
479  }
480  if (mode!="") {
481  std::string mode_name = mode;
482  if (mode_name=="from") {
483  if (srcDrop) {
484  removeSubscription(src,dest);
485  }
486  } else if (mode_name=="to") {
487  if (!srcDrop) {
488  removeSubscription(src,dest);
489  }
490  }
491  }
492  }
493  return false;
494 }
495 
496 
497 
498 bool SubscriberOnSql::listSubscriptions(const std::string& port,
499  yarp::os::Bottle& reply) {
500  mutex.lock();
501  sqlite3_stmt *statement = nullptr;
502  char *query = nullptr;
503  if (std::string(port)!="") {
504  query = sqlite3_mprintf("SELECT s.srcFull, s.DestFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s WHERE s.src = %Q OR s.dest= %Q ORDER BY s.src, s.dest",port.c_str(),port.c_str());
505  } else {
506  query = sqlite3_mprintf("SELECT s.srcFull, s.destFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s ORDER BY s.src, s.dest");
507  }
508  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
509 
510  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
511  if (result!=SQLITE_OK) {
512  const char *msg = sqlite3_errmsg(SQLDB(implementation));
513  if (msg != nullptr) {
514  yCError(SUBSCRIBERONSQL, "%s", msg);
515  }
516  }
517  reply.addString("subscriptions");
518  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
519  char *src = (char *)sqlite3_column_text(statement,0);
520  char *dest = (char *)sqlite3_column_text(statement,1);
521  int srcTopic = sqlite3_column_int(statement,2);
522  int destTopic = sqlite3_column_int(statement,3);
523  char *mode = (char *)sqlite3_column_text(statement,4);
524  Bottle& b = reply.addList();
525  b.addString("subscription");
526  Bottle bsrc;
527  bsrc.addString("src");
528  bsrc.addString(src);
529  Bottle bdest;
530  bdest.addString("dest");
531  bdest.addString(dest);
532  b.addList() = bsrc;
533  b.addList() = bdest;
534  if (mode != nullptr) {
535  if (mode[0]!='\0') {
536  Bottle bmode;
537  bmode.addString("mode");
538  bmode.addString(mode);
539  b.addList() = bmode;
540  }
541  }
542  if (srcTopic||destTopic) {
543  Bottle btopic;
544  btopic.addString("topic");
545  btopic.addInt32(srcTopic);
546  btopic.addInt32(destTopic);
547  b.addList() = btopic;
548  }
549  }
550  sqlite3_finalize(statement);
551  sqlite3_free(query);
552  mutex.unlock();
553 
554  return true;
555 }
556 
557 
558 bool SubscriberOnSql::setTopic(const std::string& port, const std::string& structure,
559  bool active) {
560  if (structure!="" || !active) {
561  mutex.lock();
562  char *query = sqlite3_mprintf("DELETE FROM topics WHERE topic = %Q",
563  port.c_str());
564  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
565 
566  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, nullptr);
567  bool ok = true;
568  if (result!=SQLITE_OK) {
569  yCError(SUBSCRIBERONSQL, "Error in query");
570  ok = false;
571  }
572  sqlite3_free(query);
573  mutex.unlock();
574  if (!ok) {
575  return false;
576  }
577  if (!active) {
578  return true;
579  }
580  }
581 
582  bool have_topic = false;
583  if (structure=="") {
584  mutex.lock();
585  sqlite3_stmt *statement = nullptr;
586  char *query = nullptr;
587  query = sqlite3_mprintf("SELECT topic FROM topics WHERE topic = %Q",
588  port.c_str());
589  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
590 
591  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
592  if (result!=SQLITE_OK) {
593  yCError(SUBSCRIBERONSQL, "Error in query");
594  }
595  if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
596  have_topic = true;
597  }
598  sqlite3_finalize(statement);
599  sqlite3_free(query);
600  mutex.unlock();
601  }
602 
603  if (structure!="" || !have_topic) {
604  mutex.lock();
605  char *msg = nullptr;
606  const char *pstructure = structure.c_str();
607  if (structure == "") {
608  pstructure = nullptr;
609  }
610  char *query = sqlite3_mprintf("INSERT INTO topics (topic,structure) VALUES(%Q,%Q)",
611  port.c_str(),pstructure);
612  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
613 
614  bool ok = true;
615  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
616  if (result!=SQLITE_OK) {
617  ok = false;
618  if (msg != nullptr) {
619  yCError(SUBSCRIBERONSQL, "%s", msg);
620  sqlite3_free(msg);
621  }
622  }
623  sqlite3_free(query);
624  mutex.unlock();
625  if (!ok) {
626  return false;
627  }
628  }
629 
630  std::vector<std::vector<std::string> > subs;
631 
632  // go ahead and connect anything needed
633  mutex.lock();
634  sqlite3_stmt *statement = nullptr;
635  char *query = sqlite3_mprintf("SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (t.topic = %Q AND s1.dest = t.topic AND s2.src = t.topic)", port.c_str());
636  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
637 
638  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
639  if (result!=SQLITE_OK) {
640  const char *msg = sqlite3_errmsg(SQLDB(implementation));
641  if (msg != nullptr) {
642  yCError(SUBSCRIBERONSQL, "%s", msg);
643  }
644  }
645  while (result == SQLITE_OK &&
646  sqlite3_step(statement) == SQLITE_ROW) {
647  char *src = (char *)sqlite3_column_text(statement,0);
648  char *dest = (char *)sqlite3_column_text(statement,1);
649  char *srcFull = (char *)sqlite3_column_text(statement,2);
650  char *destFull = (char *)sqlite3_column_text(statement,3);
651  char *mode = (char *)sqlite3_column_text(statement,4);
652  std::vector<std::string> sub;
653  sub.emplace_back(src);
654  sub.emplace_back(dest);
655  sub.emplace_back(srcFull);
656  sub.emplace_back(destFull);
657  sub.emplace_back(mode?mode:"");
658  subs.push_back(sub);
659  }
660  sqlite3_finalize(statement);
661  sqlite3_free(query);
662  mutex.unlock();
663 
664  for (auto& sub : subs) {
665  checkSubscription(sub[0],sub[1],sub[2],sub[3],sub[4]);
666  }
667 
668  return true;
669 }
670 
671 
672 bool SubscriberOnSql::listTopics(yarp::os::Bottle& topics) {
673  mutex.lock();
674  sqlite3_stmt *statement = nullptr;
675  char *query = nullptr;
676  query = sqlite3_mprintf("SELECT topic FROM topics");
677  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
678 
679  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
680  if (result!=SQLITE_OK) {
681  yCError(SUBSCRIBERONSQL, "Error in query");
682  }
683  while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
684  char *topic = (char *)sqlite3_column_text(statement,0);
685  topics.addString(topic);
686  }
687  sqlite3_finalize(statement);
688  sqlite3_free(query);
689  mutex.unlock();
690 
691  return true;
692 }
693 
694 
695 bool SubscriberOnSql::setType(const std::string& family,
696  const std::string& structure,
697  const std::string& value) {
698  mutex.lock();
699  char *msg = nullptr;
700  char *query = sqlite3_mprintf("INSERT OR REPLACE INTO structures (name,%Q) VALUES(%Q,%Q)",
701  family.c_str(),
702  (structure=="") ? nullptr : structure.c_str(),
703  value.c_str());
704  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
705 
706  bool ok = true;
707  int result = sqlite3_exec(SQLDB(implementation), query, nullptr, nullptr, &msg);
708  if (result!=SQLITE_OK) {
709  ok = false;
710  if (msg != nullptr) {
711  yCError(SUBSCRIBERONSQL, "%s", msg);
712  sqlite3_free(msg);
713  }
714  }
715  sqlite3_free(query);
716  mutex.unlock();
717  return ok;
718 }
719 
720 std::string SubscriberOnSql::getType(const std::string& family,
721  const std::string& structure) {
722  mutex.lock();
723  sqlite3_stmt *statement = nullptr;
724  char *query = nullptr;
725  query = sqlite3_mprintf("SELECT %s FROM structures WHERE name = %Q",
726  family.c_str(), structure.c_str());
727  yCDebug(SUBSCRIBERONSQL, "Query: %s", query);
728 
729  int result = sqlite3_prepare_v2(SQLDB(implementation), query, -1, &statement, nullptr);
730  std::string sresult;
731  if (result!=SQLITE_OK) {
732  yCError(SUBSCRIBERONSQL, "Error in query");
733  }
734  if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
735  sresult = (const char *)sqlite3_column_text(statement,0);
736  }
737  sqlite3_finalize(statement);
738  sqlite3_free(query);
739  mutex.unlock();
740 
741  return sresult;
742 }
RandScalar * implementation(void *t)
Definition: RandnScalar.cpp:17
#define SQLDB(x)
#define F_OK
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:182
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:250
An abstract name space for ports.
Definition: NameSpace.h:23
virtual Contact unregisterAdvanced(const std::string &name, NameStore *store)
Remove contact information, with access to the contact information of other ports for cross-referenci...
Definition: NameSpace.h:104
virtual Contact registerAdvanced(const Contact &contact, NameStore *store)
Record contact information, with access to the contact information of other ports for cross-referenci...
Definition: NameSpace.h:93
Abstract interface for a database of port names.
Definition: NameStore.h:20
virtual Contact query(const std::string &name)=0
A placeholder for rich contact information.
Definition: NestedContact.h:24
std::string getNodeName() const
std::string getNestedName() const
void apply(const std::string &str)
Definition: ParseName.cpp:13
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCWarning(component,...)
Definition: LogComponent.h:143
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_SERVERSQL_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:34
An interface to the operating system, including Port based communication.