21 #define access(f,a) _access(f,a)
31 #define SQLDB(x) ((sqlite3*)(x))
41 bool SubscriberOnSql::open(
const std::string& filename,
bool fresh) {
42 sqlite3 *db =
nullptr;
44 int result = access(filename.c_str(),
F_OK);
46 yCWarning(SUBSCRIBERONSQL,
"Database needs to be recreated.");
47 yCWarning(SUBSCRIBERONSQL,
"Please move %s out of the way.", filename.c_str());
52 int result = sqlite3_open_v2(filename.c_str(),
54 SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE|SQLITE_OPEN_NOMUTEX,
56 if (result!=SQLITE_OK) {
57 yCError(SUBSCRIBERONSQL,
"Failed to open database %s", filename.c_str());
64 const char *create_subscribe_table =
"CREATE TABLE IF NOT EXISTS subscriptions (\n\
65 id INTEGER PRIMARY KEY,\n\
72 result = sqlite3_exec(db, create_subscribe_table,
nullptr,
nullptr,
nullptr);
73 if (result!=SQLITE_OK) {
75 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
79 const char *check_subscriptions_size =
"PRAGMA table_info(subscriptions)";
81 sqlite3_stmt *statement =
nullptr;
82 result = sqlite3_prepare_v2(db, check_subscriptions_size, -1, &statement,
nullptr);
83 if (result!=SQLITE_OK) {
84 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
89 while (sqlite3_step(statement) == SQLITE_ROW) {
92 sqlite3_finalize(statement);
95 const char *add_structure =
"ALTER TABLE subscriptions ADD COLUMN mode";
96 result = sqlite3_exec(db, add_structure,
nullptr,
nullptr,
nullptr);
97 if (result!=SQLITE_OK) {
99 yCError(SUBSCRIBERONSQL,
"Failed to set up subscriptions table");
104 const char *create_topic_table =
"CREATE TABLE IF NOT EXISTS topics (\n\
105 id INTEGER PRIMARY KEY,\n\
109 result = sqlite3_exec(db, create_topic_table,
nullptr,
nullptr,
nullptr);
110 if (result!=SQLITE_OK) {
112 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
116 const char *check_topic_size =
"PRAGMA table_info(topics)";
119 result = sqlite3_prepare_v2(db, check_topic_size, -1, &statement,
nullptr);
120 if (result!=SQLITE_OK) {
121 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
126 while (sqlite3_step(statement) == SQLITE_ROW) {
130 sqlite3_finalize(statement);
133 const char *add_structure =
"ALTER TABLE topics ADD COLUMN structure";
134 result = sqlite3_exec(db, add_structure,
nullptr,
nullptr,
nullptr);
135 if (result!=SQLITE_OK) {
137 yCError(SUBSCRIBERONSQL,
"Failed to set up topics table");
142 const char *create_live_table =
"CREATE TABLE IF NOT EXISTS live (\n\
143 id INTEGER PRIMARY KEY,\n\
147 result = sqlite3_exec(db, create_live_table,
nullptr,
nullptr,
nullptr);
148 if (result!=SQLITE_OK) {
150 yCError(SUBSCRIBERONSQL,
"Failed to set up live table");
154 const char *create_struct_table =
"CREATE TABLE IF NOT EXISTS structures (\n\
155 name TEXT PRIMARY KEY,\n\
158 result = sqlite3_exec(db, create_struct_table,
nullptr,
nullptr,
nullptr);
159 if (result!=SQLITE_OK) {
161 yCError(SUBSCRIBERONSQL,
"Failed to set up structures table");
170 bool SubscriberOnSql::close() {
179 bool SubscriberOnSql::addSubscription(
const std::string& src,
180 const std::string& dest,
181 const std::string& mode) {
182 removeSubscription(src,dest);
193 const char *zmode = mode.c_str();
197 char *query = sqlite3_mprintf(
"INSERT INTO subscriptions (src,dest,srcFull,destFull,mode) VALUES(%Q,%Q,%Q,%Q,%Q)",
203 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
207 if (result!=SQLITE_OK) {
209 if (msg !=
nullptr) {
210 yCError(SUBSCRIBERONSQL,
"%s", msg);
235 bool SubscriberOnSql::removeSubscription(
const std::string& src,
236 const std::string& dest) {
240 char *query = sqlite3_mprintf(
"DELETE FROM subscriptions WHERE src = %Q AND dest = %Q",
243 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
247 if (result!=SQLITE_OK) {
248 yCError(SUBSCRIBERONSQL,
"Error in query");
257 bool SubscriberOnSql::welcome(
const std::string& port,
int activity) {
265 if (store !=
nullptr) {
282 query = sqlite3_mprintf(
"INSERT OR IGNORE INTO live (name,stamp) VALUES(%Q,DATETIME('now'))",
287 query = sqlite3_mprintf(
"DELETE FROM live WHERE name=%Q AND stamp < DATETIME('now','-30 seconds')",
291 query = sqlite3_mprintf(
"DELETE FROM live WHERE name=%Q",
295 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
299 if (result!=SQLITE_OK) {
301 if (msg !=
nullptr) {
302 yCError(SUBSCRIBERONSQL,
"%s", msg);
311 }
else if (activity<0) {
317 bool SubscriberOnSql::hookup(
const std::string& port) {
325 sqlite3_stmt *statement =
nullptr;
326 char *query =
nullptr;
328 query = sqlite3_mprintf(
"SELECT src,dest,srcFull,destFull FROM subscriptions WHERE (src = %Q OR dest= %Q) AND EXISTS (SELECT NULL FROM live WHERE name=src) AND EXISTS (SELECT NULL FROM live WHERE name=dest) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic) AND (s1.src = %Q OR s2.dest = %Q) AND EXISTS (SELECT NULL FROM live WHERE name=s1.src) AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)",port.c_str(), port.c_str(), port.c_str(), port.c_str());
330 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
333 if (result!=SQLITE_OK) {
335 if (msg !=
nullptr) {
336 yCError(SUBSCRIBERONSQL,
"%s", msg);
339 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
340 char *src = (
char *)sqlite3_column_text(statement,0);
341 char *dest = (
char *)sqlite3_column_text(statement,1);
342 char *srcFull = (
char *)sqlite3_column_text(statement,2);
343 char *destFull = (
char *)sqlite3_column_text(statement,3);
344 char *mode = (
char *)sqlite3_column_text(statement,4);
345 checkSubscription(src,dest,srcFull,destFull,mode?mode:
"");
347 sqlite3_finalize(statement);
355 bool SubscriberOnSql::breakdown(
const std::string& port) {
363 sqlite3_stmt *statement =
nullptr;
364 char *query =
nullptr;
366 query = sqlite3_mprintf(
"SELECT src,dest,srcFull,destFull,mode FROM subscriptions WHERE ((src = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=dest))) OR (dest = %Q AND (mode IS NOT NULL OR EXISTS (SELECT NULL FROM live WHERE name=src)))) UNION SELECT s1.src, s2.dest, s1.srcFull, s2.destFull, NULL FROM subscriptions s1, subscriptions s2, topics t WHERE (s1.dest = t.topic AND s2.src = t.topic AND ((s1.src = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s2.dest)) OR (s2.dest = %Q AND EXISTS (SELECT NULL FROM live WHERE name=s1.src))))",port.c_str(), port.c_str(), port.c_str(), port.c_str());
367 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
370 if (result!=SQLITE_OK) {
372 if (msg !=
nullptr) {
373 yCError(SUBSCRIBERONSQL,
"%s", msg);
376 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
377 char *src = (
char *)sqlite3_column_text(statement,0);
378 char *dest = (
char *)sqlite3_column_text(statement,1);
379 char *srcFull = (
char *)sqlite3_column_text(statement,2);
380 char *destFull = (
char *)sqlite3_column_text(statement,3);
381 char *mode = (
char *)sqlite3_column_text(statement,4);
382 breakSubscription(port,src,dest,srcFull,destFull,mode?mode:
"");
384 sqlite3_finalize(statement);
392 bool SubscriberOnSql::checkSubscription(
const std::string& src,
const std::string& dest,
393 const std::string& srcFull,
394 const std::string& destFull,
395 const std::string& mode) {
406 "+++ Checking %s %s / %s %s",
413 if (store !=
nullptr) {
418 bool destTopic = (cdest.
getCarrier()==
"topic");
419 if (!(srcTopic||destTopic)) {
421 "++> check connection %s %s",
424 connect(srcFull,destFull);
428 std::string mode_name = mode;
429 if (mode_name==
"from") {
431 removeSubscription(src,dest);
433 }
else if (mode_name==
"to") {
435 removeSubscription(src,dest);
444 bool SubscriberOnSql::breakSubscription(
const std::string& dropper,
445 const std::string& src,
const std::string& dest,
446 const std::string& srcFull,
447 const std::string& destFull,
448 const std::string& mode) {
459 "--- Checking %s %s / %s %s",
465 if (store !=
nullptr) {
466 bool srcDrop = std::string(dropper) == src;
469 contact = store->
query(src);
471 contact = store->
query(dest);
475 "--> check connection %s %s",
478 disconnect(srcFull,destFull,srcDrop);
481 std::string mode_name = mode;
482 if (mode_name==
"from") {
484 removeSubscription(src,dest);
486 }
else if (mode_name==
"to") {
488 removeSubscription(src,dest);
498 bool SubscriberOnSql::listSubscriptions(
const std::string& port,
501 sqlite3_stmt *statement =
nullptr;
502 char *query =
nullptr;
503 if (std::string(port)!=
"") {
504 query = sqlite3_mprintf(
"SELECT s.srcFull, s.DestFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s WHERE s.src = %Q OR s.dest= %Q ORDER BY s.src, s.dest",port.c_str(),port.c_str());
506 query = sqlite3_mprintf(
"SELECT s.srcFull, s.destFull, EXISTS(SELECT topic FROM topics WHERE topic = s.src), EXISTS(SELECT topic FROM topics WHERE topic = s.dest), s.mode FROM subscriptions s ORDER BY s.src, s.dest");
508 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
511 if (result!=SQLITE_OK) {
513 if (msg !=
nullptr) {
514 yCError(SUBSCRIBERONSQL,
"%s", msg);
518 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
519 char *src = (
char *)sqlite3_column_text(statement,0);
520 char *dest = (
char *)sqlite3_column_text(statement,1);
521 int srcTopic = sqlite3_column_int(statement,2);
522 int destTopic = sqlite3_column_int(statement,3);
523 char *mode = (
char *)sqlite3_column_text(statement,4);
534 if (mode !=
nullptr) {
542 if (srcTopic||destTopic) {
550 sqlite3_finalize(statement);
558 bool SubscriberOnSql::setTopic(
const std::string& port,
const std::string& structure,
560 if (structure!=
"" || !active) {
562 char *query = sqlite3_mprintf(
"DELETE FROM topics WHERE topic = %Q",
564 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
568 if (result!=SQLITE_OK) {
569 yCError(SUBSCRIBERONSQL,
"Error in query");
582 bool have_topic =
false;
585 sqlite3_stmt *statement =
nullptr;
586 char *query =
nullptr;
587 query = sqlite3_mprintf(
"SELECT topic FROM topics WHERE topic = %Q",
589 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
592 if (result!=SQLITE_OK) {
593 yCError(SUBSCRIBERONSQL,
"Error in query");
595 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
598 sqlite3_finalize(statement);
603 if (structure!=
"" || !have_topic) {
606 const char *pstructure = structure.c_str();
607 if (structure ==
"") {
608 pstructure =
nullptr;
610 char *query = sqlite3_mprintf(
"INSERT INTO topics (topic,structure) VALUES(%Q,%Q)",
611 port.c_str(),pstructure);
612 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
616 if (result!=SQLITE_OK) {
618 if (msg !=
nullptr) {
619 yCError(SUBSCRIBERONSQL,
"%s", msg);
630 std::vector<std::vector<std::string> > subs;
634 sqlite3_stmt *statement =
nullptr;
635 char *query = sqlite3_mprintf(
"SELECT s1.src, s2.dest, s1.srcFull, s2.destFull FROM subscriptions s1, subscriptions s2, topics t WHERE (t.topic = %Q AND s1.dest = t.topic AND s2.src = t.topic)", port.c_str());
636 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
639 if (result!=SQLITE_OK) {
641 if (msg !=
nullptr) {
642 yCError(SUBSCRIBERONSQL,
"%s", msg);
645 while (result == SQLITE_OK &&
646 sqlite3_step(statement) == SQLITE_ROW) {
647 char *src = (
char *)sqlite3_column_text(statement,0);
648 char *dest = (
char *)sqlite3_column_text(statement,1);
649 char *srcFull = (
char *)sqlite3_column_text(statement,2);
650 char *destFull = (
char *)sqlite3_column_text(statement,3);
651 char *mode = (
char *)sqlite3_column_text(statement,4);
652 std::vector<std::string> sub;
653 sub.emplace_back(src);
654 sub.emplace_back(dest);
655 sub.emplace_back(srcFull);
656 sub.emplace_back(destFull);
657 sub.emplace_back(mode?mode:
"");
660 sqlite3_finalize(statement);
664 for (
auto& sub : subs) {
665 checkSubscription(sub[0],sub[1],sub[2],sub[3],sub[4]);
674 sqlite3_stmt *statement =
nullptr;
675 char *query =
nullptr;
676 query = sqlite3_mprintf(
"SELECT topic FROM topics");
677 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
680 if (result!=SQLITE_OK) {
681 yCError(SUBSCRIBERONSQL,
"Error in query");
683 while (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
684 char *topic = (
char *)sqlite3_column_text(statement,0);
687 sqlite3_finalize(statement);
695 bool SubscriberOnSql::setType(
const std::string& family,
696 const std::string& structure,
697 const std::string& value) {
700 char *query = sqlite3_mprintf(
"INSERT OR REPLACE INTO structures (name,%Q) VALUES(%Q,%Q)",
702 (structure==
"") ?
nullptr : structure.c_str(),
704 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
708 if (result!=SQLITE_OK) {
710 if (msg !=
nullptr) {
711 yCError(SUBSCRIBERONSQL,
"%s", msg);
720 std::string SubscriberOnSql::getType(
const std::string& family,
721 const std::string& structure) {
723 sqlite3_stmt *statement =
nullptr;
724 char *query =
nullptr;
725 query = sqlite3_mprintf(
"SELECT %s FROM structures WHERE name = %Q",
726 family.c_str(), structure.c_str());
727 yCDebug(SUBSCRIBERONSQL,
"Query: %s", query);
731 if (result!=SQLITE_OK) {
732 yCError(SUBSCRIBERONSQL,
"Error in query");
734 if (result == SQLITE_OK && sqlite3_step(statement) == SQLITE_ROW) {
735 sresult = (
const char *)sqlite3_column_text(statement,0);
737 sqlite3_finalize(statement);
RandScalar * implementation(void *t)
A simple collection of objects that can be described and transmitted in a portable way.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
An abstract name space for ports.
virtual Contact unregisterAdvanced(const std::string &name, NameStore *store)
Remove contact information, with access to the contact information of other ports for cross-referenci...
virtual Contact registerAdvanced(const Contact &contact, NameStore *store)
Record contact information, with access to the contact information of other ports for cross-referenci...
Abstract interface for a database of port names.
virtual Contact query(const std::string &name)=0
void apply(const std::string &str)
std::string getPortName()
#define yCError(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_SERVERSQL_LOG_COMPONENT(name, name_string)
An interface to the operating system, including Port based communication.