← Back to team overview

maria-developers team mailing list archive

Re: [Maria-discuss] Known limitation with TokuDB in Read Free Replication & parallel replication ?

 

[Moving the discussion to maria-developers@, hope that is ok/makes sense...]

Ok, so here is a proof-of-concept patch for this, which seems to make TokuDB
work with optimistic parallel replication.

The core of the patch is this line in lock_request.cc

    lock_wait_callback(callback_data, m_txnid, conflicts.get(i));

which ends up doing this:

    thd_report_wait_for (requesting_thd, blocking_thd);

All the rest of the patch is just getting the right information around
between the different parts of the code.

I put this on top of Jocelyn Fournier's tokudb_rpl.rpl_parallel_optimistic
patches, and pushed it on my github:

  https://github.com/knielsen/server/tree/toku_opr2

With this patch, the test case passes! So that's promising.

Some things still left to do for this to be a good patch:

 - I think the callback needs to trigger also for an already waiting
   transaction, in case another transaction arrives later to contend for the
   same lock, but happens to get the lock earlier. I can look into this.

 - This patch needs linear time (in number of active transactions) per
   callback to find the THD from the TXNID, maybe that could be optimised.

 - Probably the new callback etc. needs some cleanup to better match TokuDB
   code organisation and style.

 - And testing, of course. I'll definitely need some help there, as I'm not
   familiar with how to run TokuDB efficiently.

Any thoughts or comments?

 - Kristian.

commit aeeec3f1f5447da4d7c466b1859f7261ae5f13be (HEAD, my/toku_opr2, toku_opr2)
Author: Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx>
Date:   Fri Aug 12 11:51:26 2016 +0200

    Initial steps towards TokuDB optimistic parallel replication support (untested)
    
    A callback is registered that can invoke thd_report_wait_for() with
    the THDs that a transaction goes to wait on.

diff --git a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
index 5c29209..dd1d849 100644
--- a/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
+++ b/storage/tokudb/PerconaFT/buildheader/make_tdb.cc
@@ -405,6 +405,7 @@ static void print_db_env_struct (void) {
                              "int (*set_lock_timeout)                     (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))",
                              "int (*get_lock_timeout)                     (DB_ENV *env, uint64_t *lock_wait_time_msec)",
                              "int (*set_lock_timeout_callback)            (DB_ENV *env, lock_timeout_callback callback)",
+                             "int (*set_lock_wait_callback)               (DB_ENV *env, lock_wait_callback callback)",
                              "int (*txn_xa_recover)                       (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
                              "int (*get_txn_from_xid)                     (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
                              "DB* (*get_db_for_directory)                 (DB_ENV*)",
@@ -540,8 +541,9 @@ static void print_db_txn_struct (void) {
 	"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
 	"int (*xa_prepare) (DB_TXN*, TOKU_XA_XID *, uint32_t flags)",
         "uint64_t (*id64) (DB_TXN*)",
-        "void (*set_client_id)(DB_TXN *, uint64_t client_id)",
+        "void (*set_client_id)(DB_TXN *, uint64_t client_id, void *extra)",
         "uint64_t (*get_client_id)(DB_TXN *)",
+        "void * (*get_client_extra)(DB_TXN *)",
         "bool (*is_prepared)(DB_TXN *)",
         "DB_TXN *(*get_child)(DB_TXN *)",
         "uint64_t (*get_start_time)(DB_TXN *)",
@@ -745,6 +747,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
     printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);
 
     printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
+    printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n");
     printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
     printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
     printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.cc b/storage/tokudb/PerconaFT/ft/txn/txn.cc
index dd03073..5cc858f 100644
--- a/storage/tokudb/PerconaFT/ft/txn/txn.cc
+++ b/storage/tokudb/PerconaFT/ft/txn/txn.cc
@@ -269,6 +269,7 @@ static txn_child_manager tcm;
         .state = TOKUTXN_LIVE,
         .num_pin = 0,
         .client_id = 0,
+        .client_extra = nullptr,
         .start_time = time(NULL),
     };
 
@@ -709,8 +710,13 @@ uint64_t toku_txn_get_client_id(TOKUTXN txn) {
     return txn->client_id;
 }
 
-void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id) {
+void *toku_txn_get_client_extra(TOKUTXN txn) {
+    return txn->client_extra;
+}
+
+void toku_txn_set_client_id(TOKUTXN txn, uint64_t client_id, void *extra) {
     txn->client_id = client_id;
+    txn->client_extra = extra;
 }
 
 time_t toku_txn_get_start_time(struct tokutxn *txn) {
diff --git a/storage/tokudb/PerconaFT/ft/txn/txn.h b/storage/tokudb/PerconaFT/ft/txn/txn.h
index 51a4602..96db632 100644
--- a/storage/tokudb/PerconaFT/ft/txn/txn.h
+++ b/storage/tokudb/PerconaFT/ft/txn/txn.h
@@ -193,6 +193,7 @@ struct tokutxn {
     uint32_t num_pin; // number of threads (all hot indexes) that want this
                       // txn to not transition to commit or abort
     uint64_t client_id;
+    void *client_extra;
     time_t start_time;
 };
 typedef struct tokutxn *TOKUTXN;
@@ -294,7 +295,8 @@ void toku_txn_unpin_live_txn(struct tokutxn *txn);
 bool toku_txn_has_spilled_rollback(struct tokutxn *txn);
 
 uint64_t toku_txn_get_client_id(struct tokutxn *txn);
-void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id);
+void *toku_txn_get_client_extra(struct tokutxn *txn);
+void toku_txn_set_client_id(struct tokutxn *txn, uint64_t client_id, void *extra);
 
 time_t toku_txn_get_start_time(struct tokutxn *txn);
 
diff --git a/storage/tokudb/PerconaFT/ftcxx/db_env.hpp b/storage/tokudb/PerconaFT/ftcxx/db_env.hpp
index 071614b..15b5ce5 100644
--- a/storage/tokudb/PerconaFT/ftcxx/db_env.hpp
+++ b/storage/tokudb/PerconaFT/ftcxx/db_env.hpp
@@ -202,6 +202,7 @@ namespace ftcxx {
         typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t);
         get_lock_wait_time_cb_func _get_lock_wait_time_cb;
         lock_timeout_callback _lock_timeout_callback;
+        lock_wait_callback _lock_wait_needed_callback;
         uint64_t (*_loader_memory_size_callback)(void);
 
         uint32_t _cachesize_gbytes;
@@ -231,6 +232,7 @@ namespace ftcxx {
               _lock_wait_time_msec(0),
               _get_lock_wait_time_cb(nullptr),
               _lock_timeout_callback(nullptr),
+              _lock_wait_needed_callback(nullptr),
               _loader_memory_size_callback(nullptr),
               _cachesize_gbytes(0),
               _cachesize_bytes(0),
@@ -296,6 +298,11 @@ namespace ftcxx {
                 handle_ft_retval(r);
             }
 
+            if (_lock_wait_needed_callback) {
+                r = env->set_lock_wait_callback(env, _lock_wait_needed_callback);
+                handle_ft_retval(r);
+            }
+
             if (_loader_memory_size_callback) {
                 env->set_loader_memory_size(env, _loader_memory_size_callback);
             }
@@ -419,6 +426,11 @@ namespace ftcxx {
             return *this;
         }
 
+        DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) {
+            _lock_wait_needed_callback = callback;
+            return *this;
+        }
+
         DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) {
             _loader_memory_size_callback = callback;
             return *this;
diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.cc b/storage/tokudb/PerconaFT/locktree/lock_request.cc
index 22b6da9..4036bf4 100644
--- a/storage/tokudb/PerconaFT/locktree/lock_request.cc
+++ b/storage/tokudb/PerconaFT/locktree/lock_request.cc
@@ -142,7 +142,10 @@ void lock_request::build_wait_graph(wfg *wait_graph, const txnid_set &conflicts)
 
 // returns: true if the current set of lock requests contains
 //          a deadlock, false otherwise.
-bool lock_request::deadlock_exists(const txnid_set &conflicts) {
+bool lock_request::deadlock_exists(const txnid_set &conflicts,
+                                   void (*lock_wait_callback)(void *, TXNID, TXNID),
+                                   void *callback_data) {
+
     wfg wait_graph;
     wait_graph.create();
 
@@ -150,11 +153,20 @@ bool lock_request::deadlock_exists(const txnid_set &conflicts) {
     bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid);
 
     wait_graph.destroy();
+
+    if (lock_wait_callback) {
+        size_t num_conflicts = conflicts.size();
+        for (size_t i = 0; i < num_conflicts; i++) {
+            lock_wait_callback(callback_data, m_txnid, conflicts.get(i));
+        }
+    }
+
     return deadlock;
 }
 
 // try to acquire a lock described by this lock request. 
-int lock_request::start(void) {
+int lock_request::start(void (*lock_wait_callback)(void *, TXNID, TXNID),
+                        void *callback_data) {
     int r;
 
     txnid_set conflicts;
@@ -175,7 +187,7 @@ int lock_request::start(void) {
         m_conflicting_txnid = conflicts.get(0);
         toku_mutex_lock(&m_info->mutex);
         insert_into_lock_requests();
-        if (deadlock_exists(conflicts)) {
+        if (deadlock_exists(conflicts, lock_wait_callback, callback_data)) {
             remove_from_lock_requests();
             r = DB_LOCK_DEADLOCK;
         }
diff --git a/storage/tokudb/PerconaFT/locktree/lock_request.h b/storage/tokudb/PerconaFT/locktree/lock_request.h
index 48d1279..3af4bd6 100644
--- a/storage/tokudb/PerconaFT/locktree/lock_request.h
+++ b/storage/tokudb/PerconaFT/locktree/lock_request.h
@@ -83,7 +83,8 @@ class lock_request {
     // effect: Tries to acquire a lock described by this lock request.
     // returns: The return code of locktree::acquire_[write,read]_lock()
     //          or DB_LOCK_DEADLOCK if this request would end up deadlocked.
-    int start(void);
+    int start(void (*lock_wait_callback)(void *, TXNID, TXNID),
+              void *callback_data);
 
     // effect: Sleeps until either the request is granted or the wait time expires.
     // returns: The return code of locktree::acquire_[write,read]_lock()
@@ -180,7 +181,9 @@ class lock_request {
     void build_wait_graph(wfg *wait_graph, const txnid_set &conflicts);
 
     // returns: True if this lock request is in deadlock with the given conflicts set
-    bool deadlock_exists(const txnid_set &conflicts);
+    bool deadlock_exists(const txnid_set &conflicts,
+                         void (*lock_wait_callback)(void *, TXNID, TXNID),
+                         void *callback_data);
 
     void copy_keys(void);
 
diff --git a/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc b/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc
index c5561cd..dd1a753 100644
--- a/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc
+++ b/storage/tokudb/PerconaFT/src/tests/test_iterate_live_transactions.cc
@@ -93,13 +93,13 @@ int test_main(int UU(argc), char *const UU(argv[])) {
     r = env->open(env, TOKU_TEST_FILENAME, env_flags, 0755); CKERR(r);
 
     r = env->txn_begin(env, NULL, &txn1, 0); CKERR(r);
-    txn1->set_client_id(txn1, 0);
+    txn1->set_client_id(txn1, 0, NULL);
     txnid1 = txn1->id64(txn1);
     r = env->txn_begin(env, NULL, &txn2, 0); CKERR(r);
-    txn2->set_client_id(txn2, 1);
+    txn2->set_client_id(txn2, 1, NULL);
     txnid2 = txn2->id64(txn2);
     r = env->txn_begin(env, NULL, &txn3, 0); CKERR(r);
-    txn3->set_client_id(txn3, 2);
+    txn3->set_client_id(txn3, 2, NULL);
     txnid3 = txn3->id64(txn3);
 
     {
diff --git a/storage/tokudb/PerconaFT/src/ydb-internal.h b/storage/tokudb/PerconaFT/src/ydb-internal.h
index 3737a1c..fe7af8e 100644
--- a/storage/tokudb/PerconaFT/src/ydb-internal.h
+++ b/storage/tokudb/PerconaFT/src/ydb-internal.h
@@ -105,6 +105,7 @@ struct __toku_db_env_internal {
     TOKULOGGER logger;
     toku::locktree_manager ltm;
     lock_timeout_callback lock_wait_timeout_callback;   // Called when a lock request times out waiting for a lock.
+    lock_wait_callback lock_wait_needed_callback;       // Called when a lock request requires a wait.
 
     DB *directory;                                      // Maps dnames to inames
     DB *persistent_environment;                         // Stores environment settings, can be used for upgrade
diff --git a/storage/tokudb/PerconaFT/src/ydb.cc b/storage/tokudb/PerconaFT/src/ydb.cc
index 55da418..a34f070 100644
--- a/storage/tokudb/PerconaFT/src/ydb.cc
+++ b/storage/tokudb/PerconaFT/src/ydb.cc
@@ -1724,6 +1724,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
     return 0;
 }
 
+static int
+env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
+    env->i->lock_wait_needed_callback = callback;
+    return 0;
+}
+
 static void
 format_time(const time_t *timer, char *buf) {
     ctime_r(timer, buf);
@@ -2620,6 +2626,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
     USENV(get_lock_timeout);
     USENV(set_lock_timeout);
     USENV(set_lock_timeout_callback);
+    USENV(set_lock_wait_callback);
     USENV(set_redzone);
     USENV(log_flush);
     USENV(log_archive);
diff --git a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc
index 913e1a4..70654db 100644
--- a/storage/tokudb/PerconaFT/src/ydb_row_lock.cc
+++ b/storage/tokudb/PerconaFT/src/ydb_row_lock.cc
@@ -195,7 +195,7 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
     TXNID txn_anc_id = txn_anc->id64(txn_anc);
     request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc));
 
-    const int r = request->start();
+    const int r = request->start(txn->mgrp->i->lock_wait_needed_callback, db);
     if (r == 0) {
         db_txn_note_row_lock(db, txn_anc, left_key, right_key);
     } else if (r == DB_LOCK_DEADLOCK) {
@@ -225,6 +225,7 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
     if (r == 0) {
         db_txn_note_row_lock(db, txn_anc, left_key, right_key);
     } else if (r == DB_LOCK_NOTGRANTED) {
+    // ToDo: handle lock_wait_needed callback here?
         lock_timeout_callback callback = txn->mgrp->i->lock_wait_timeout_callback;
         if (callback != nullptr) {
             callback(db, txn_anc->id64(txn_anc), left_key, right_key,
@@ -249,7 +250,7 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) {
     toku::lock_request request;
     request.create();
     request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc));
-    int r = request.start();
+    int r = request.start(NULL, NULL);
     invariant_zero(r);
     db_txn_note_row_lock(db, txn_anc, key, key);
     request.destroy();
diff --git a/storage/tokudb/PerconaFT/src/ydb_txn.cc b/storage/tokudb/PerconaFT/src/ydb_txn.cc
index ae1f930..0a20569 100644
--- a/storage/tokudb/PerconaFT/src/ydb_txn.cc
+++ b/storage/tokudb/PerconaFT/src/ydb_txn.cc
@@ -323,14 +323,18 @@ int locked_txn_abort(DB_TXN *txn) {
     return r;
 }
 
-static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id) {
-    toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id);
+static void locked_txn_set_client_id(DB_TXN *txn, uint64_t client_id, void *extra) {
+    toku_txn_set_client_id(db_txn_struct_i(txn)->tokutxn, client_id, extra);
 }
 
 static uint64_t locked_txn_get_client_id(DB_TXN *txn) {
     return toku_txn_get_client_id(db_txn_struct_i(txn)->tokutxn);
 }
 
+static void *locked_txn_get_client_extra(DB_TXN *txn) {
+    return toku_txn_get_client_extra(db_txn_struct_i(txn)->tokutxn);
+}
+
 static int toku_txn_discard(DB_TXN *txn, uint32_t flags) {
     // check parameters
     if (flags != 0)
@@ -392,6 +396,7 @@ static inline void txn_func_init(DB_TXN *txn) {
     STXN(txn_stat);
     STXN(set_client_id);
     STXN(get_client_id);
+    STXN(get_client_extra);
 #undef STXN
 #define SUTXN(name) txn->name = toku_txn_ ## name
     SUTXN(prepare);
diff --git a/storage/tokudb/hatoku_hton.cc b/storage/tokudb/hatoku_hton.cc
index a288fbc..1d00769 100644
--- a/storage/tokudb/hatoku_hton.cc
+++ b/storage/tokudb/hatoku_hton.cc
@@ -147,6 +147,11 @@ static void tokudb_lock_timeout_callback(
     const DBT* right_key,
     uint64_t blocking_txnid);
 
+static void tokudb_lock_wait_needed_callback(
+    void* arg,
+    uint64_t requesting_txnid,
+    uint64_t blocking_txnid);
+
 #define ASSERT_MSGLEN 1024
 
 void toku_hton_assert_fail(
@@ -533,6 +538,7 @@ static int tokudb_init_func(void *p) {
     db_env->change_fsync_log_period(db_env, tokudb::sysvars::fsync_log_period);
 
     db_env->set_lock_timeout_callback(db_env, tokudb_lock_timeout_callback);
+    db_env->set_lock_wait_callback(db_env, tokudb_lock_wait_needed_callback);
 
     db_env->set_loader_memory_size(
         db_env,
@@ -1747,6 +1753,62 @@ static void tokudb_lock_timeout_callback(
     }
 }
 
+extern "C" void thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd);
+
+struct tokudb_search_txn_thd {
+    bool match_found;
+    uint64_t match_txn_id;
+    THD *match_client_thd;
+};
+
+static int tokudb_search_txn_thd_callback(
+    DB_TXN* txn,
+    iterate_row_locks_callback iterate_locks,
+    void* locks_extra,
+    void* extra) {
+
+    uint64_t txn_id = txn->id64(txn);
+    void *client_extra = txn->get_client_extra(txn);
+    struct tokudb_search_txn_thd* e =
+        reinterpret_cast<struct tokudb_search_txn_thd*>(extra);
+    if (e->match_txn_id == txn_id) {
+        e->match_found = true;
+        e->match_client_thd = reinterpret_cast<THD *>(client_extra);
+        return 1;
+    }
+    return 0;
+}
+
+static bool tokudb_txn_id_to_thd(
+    uint64_t txnid,
+    THD **out_thd) {
+
+    struct tokudb_search_txn_thd e = {
+        false,
+        txnid,
+        0
+    };
+    db_env->iterate_live_transactions(db_env, tokudb_search_txn_thd_callback, &e);
+    if (e.match_found) {
+        *out_thd = e.match_client_thd;
+    }
+    return e.match_found;
+}
+
+static void tokudb_lock_wait_needed_callback(
+    void *arg,
+    uint64_t requesting_txnid,
+    uint64_t blocking_txnid) {
+
+    DB* db = static_cast<DB*>(arg);
+    THD *requesting_thd;
+    THD *blocking_thd;
+    if (tokudb_txn_id_to_thd(requesting_txnid, &requesting_thd) &&
+        tokudb_txn_id_to_thd(blocking_txnid, &blocking_thd)) {
+        thd_report_wait_for (requesting_thd, blocking_thd);
+    }
+}
+
 // Retrieves variables for information_schema.global_status.
 // Names (columnname) are automatically converted to upper case,
 // and prefixed with "TOKUDB_"
diff --git a/storage/tokudb/tokudb_txn.h b/storage/tokudb/tokudb_txn.h
index 67bf591..d025541 100644
--- a/storage/tokudb/tokudb_txn.h
+++ b/storage/tokudb/tokudb_txn.h
@@ -116,7 +116,7 @@ inline int txn_begin(
     int r = env->txn_begin(env, parent, txn, flags);
     if (r == 0 && thd) {
         DB_TXN* this_txn = *txn;
-        this_txn->set_client_id(this_txn, thd_get_thread_id(thd));
+        this_txn->set_client_id(this_txn, thd_get_thread_id(thd), thd);
     }
     TOKUDB_TRACE_FOR_FLAGS(
         TOKUDB_DEBUG_TXN,

Follow ups