← Back to team overview

maria-developers team mailing list archive

Re: 答复: 答复: 答复: MDEV-520: consider parallel replication patch from taobao patches

 

Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:

> I next want to try one thing that I may have mentioned before: Implement an
> option so that threads wait with COMMIT until the previous event group has
> committed. I will try to implement this and let you know how it goes.

I implemented a first version of this, attached below.

This is work in progress, there will be more work to do. But it seems to
mostly work, so I wanted that you have the chance to look at it and see what
my idea is and tell me your opinion.

The patch makes later transactions wait for earlier transactions to commit
before committing themselves.

The waiting probably looks more complex than necessary, but this is because I
prepared for integration with group commit. For now transactions will be
committed one at a time, in order. Later I want to implement that if
transaction T2 waits for T1, then T1 and T2 will be committed together. This
could maybe give a nice speedup if --sync-binlog=1. The waiting stuff in
sql_class.cc is prepared for this.

There were three new test failures: rpl.rpl_semi_sync, rpl.rpl_row_001, and
rpl.rpl_row_inexist_tbl. I will look into that later, but wanted to send you
the patch as early as possible.

If you want, you can try to test it for performance against no ordering. Do
let me know if there are problems, I will try to fix it.

 - Kristian.

=== modified file 'sql/handler.cc'
--- sql/handler.cc	2012-09-22 14:11:40 +0000
+++ sql/handler.cc	2013-01-09 13:32:34 +0000
@@ -1318,6 +1318,15 @@ int ha_commit_trans(THD *thd, bool all)
     goto done;
   }
 
+  /*
+    ToDo: Push this wait into tc_log->log_and_order().
+
+    This will allow the TC to use the knowledge that one commit is waiting for
+    another to put them into the same group commit (with the waiter after the
+    waitee in the commit ordering).
+  */
+  thd->wait_for_prior_commit();
+
   DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
   cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
                                 need_commit_ordered);
@@ -1389,6 +1398,8 @@ int ha_commit_one_phase(THD *thd, bool a
   */
   bool is_real_trans=all || thd->transaction.all.ha_list == 0;
   DBUG_ENTER("ha_commit_one_phase");
+  if (is_real_trans)
+    thd->wait_for_prior_commit();
   int res= commit_one_phase_2(thd, all, trans, is_real_trans);
   DBUG_RETURN(res);
 }
@@ -1428,7 +1439,10 @@ commit_one_phase_2(THD *thd, bool all, T
   }
   /* Free resources and perform other cleanup even for 'empty' transactions. */
   if (is_real_trans)
+  {
+    thd->wakeup_subsequent_commits();
     thd->transaction.cleanup();
+  }
 
   DBUG_RETURN(error);
 }
@@ -1503,7 +1517,10 @@ int ha_rollback_trans(THD *thd, bool all
   }
   /* Always cleanup. Even if nht==0. There may be savepoints. */
   if (is_real_trans)
+  {
+    thd->wakeup_subsequent_commits();
     thd->transaction.cleanup();
+  }
   if (all)
     thd->transaction_rollback_request= FALSE;
 

=== modified file 'sql/mysqld.cc'
--- sql/mysqld.cc	2012-12-05 14:05:37 +0000
+++ sql/mysqld.cc	2013-01-09 13:32:34 +0000
@@ -742,7 +742,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key
   key_master_info_sleep_lock,
   key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
   key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
-  key_relay_log_info_sleep_lock,
+  key_relay_log_info_sleep_lock, key_rli_last_committed_id,
   key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
   key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
   key_PARTITION_LOCK_auto_inc;
@@ -751,7 +751,7 @@ PSI_mutex_key key_RELAYLOG_LOCK_index;
 PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
   key_LOCK_global_index_stats,
-  key_LOCK_wakeup_ready;
+  key_LOCK_wakeup_ready, key_LOCK_wait_commit;
 
 PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
 
@@ -795,6 +795,7 @@ static PSI_mutex_info all_server_mutexes
   { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
   { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
   { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
+  { &key_LOCK_wait_commit, "THD::LOCK_wait_commit", 0},
   { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
   { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
   { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -807,6 +808,7 @@ static PSI_mutex_info all_server_mutexes
   { &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
   { &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
   { &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
+  { &key_rli_last_committed_id, "Relay_log_info::LOCK_last_committed_id", 0},
   { &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
   { &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
   { &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
@@ -851,7 +853,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, k
   key_TABLE_SHARE_cond, key_user_level_lock_cond,
   key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
   key_BINLOG_COND_queue_busy;
-PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+  key_COND_wait_commit;
 PSI_cond_key key_RELAYLOG_COND_queue_busy;
 PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 
@@ -872,6 +875,7 @@ static PSI_cond_info all_server_conds[]=
   { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
   { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
   { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
+  { &key_COND_wait_commit, "THD::COND_wait_commit_ready", 0},
   { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
   { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
   { &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},

=== modified file 'sql/mysqld.h'
--- sql/mysqld.h	2012-12-05 14:05:37 +0000
+++ sql/mysqld.h	2013-01-09 13:32:34 +0000
@@ -243,14 +243,14 @@ extern PSI_mutex_key key_BINLOG_LOCK_ind
   key_master_info_sleep_lock,
   key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
   key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
-  key_relay_log_info_sleep_lock,
+  key_relay_log_info_sleep_lock, key_rli_last_committed_id,
   key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
   key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
 extern PSI_mutex_key key_RELAYLOG_LOCK_index;
 
 extern PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
-  key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
+  key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
 
 extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
   key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -272,7 +272,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_
   key_relay_log_info_sleep_cond,
   key_TABLE_SHARE_cond, key_user_level_lock_cond,
   key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
-extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+  key_COND_wait_commit;
 extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
 extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 

=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc	2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.cc	2013-01-09 13:32:34 +0000
@@ -43,6 +43,7 @@ Relay_log_info::Relay_log_info(bool is_s
    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
    save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0), 
    event_relay_log_pos(0),
+   last_trans_id(0), last_trans_thd(0), last_committed_id(0),
 #if HAVE_valgrind
    is_fake(FALSE),
 #endif
@@ -78,6 +79,8 @@ Relay_log_info::Relay_log_info(bool is_s
   mysql_mutex_init(key_relay_log_info_log_space_lock,
                    &log_space_lock, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
+  mysql_mutex_init(key_rli_last_committed_id, &LOCK_last_committed_id,
+                   MY_MUTEX_INIT_SLOW);
   mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
   mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
   mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
@@ -102,6 +105,7 @@ Relay_log_info::~Relay_log_info()
   mysql_mutex_destroy(&data_lock);
   mysql_mutex_destroy(&log_space_lock);
   mysql_mutex_destroy(&sleep_lock);
+  mysql_mutex_destroy(&LOCK_last_committed_id);
   mysql_cond_destroy(&data_cond);
   mysql_cond_destroy(&start_cond);
   mysql_cond_destroy(&stop_cond);

=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h	2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.h	2013-01-09 13:32:34 +0000
@@ -106,6 +106,23 @@ class transaction_st{
   my_off_t relay_log_pos;
   trans_pos_t *trans_pos;
 
+  /*
+    This is the ID (Relay_log_info::last_trans_id) of the previous transaction,
+    that we want to wait for before committing ourselves (if ordered commits
+    are enforced).
+  */
+  uint64 wait_commit_id;
+  /*
+    This is the THD whose commit we want to wait for.
+    Only valid if Relay_log_info::last_committed_id < wait_commit_id.
+  */
+  THD *wait_commit_thd;
+  /*
+    This is our own transaction id, which we should update
+    Relay_log_info::last_committed_id to once we commit.
+  */
+  uint64 own_commit_id;
+
   transaction_st();
 
   ~transaction_st();
@@ -123,7 +140,7 @@ class Transfer_worker
   int start();
   int stop();
   int wait_for_stopped();
-  int push_trans(transaction_st *trans);
+  int push_trans(Relay_log_info *rli, transaction_st *trans);
   int pop_trans(int batch_trans_n);
   int remove_trans(transaction_st *trans);
   bool check_trans_conflict(transaction_st *trans);
@@ -294,6 +311,20 @@ class Relay_log_info : public Slave_repo
   int push_back_trans_pos(transaction_st *trans);
   int rollback_trans_pos(transaction_st *trans);
   int pop_front_trans_pos();
+
+  /* Running counter for assigning IDs to event groups/transactions. */
+  uint64 last_trans_id;
+  /* The THD of the worker assigned to handle the transaction last_trans_id. */
+  THD *last_trans_thd;
+  /*
+    The ID of the last transaction/event group that was committed/applied.
+    This is used to decide if the next transaction should wait for the
+    previous one to commit (to avoid trying to wait for a commit that already
+    took place).
+  */
+  uint64 last_committed_id;
+  /* Mutex protecting access to last_committed_id. */
+  mysql_mutex_t LOCK_last_committed_id;
   /*Transfer end*/
 
 

=== modified file 'sql/slave.cc'
--- sql/slave.cc	2013-01-08 14:12:14 +0000
+++ sql/slave.cc	2013-01-09 13:32:34 +0000
@@ -2857,7 +2857,48 @@ int execute_single_transaction(Relay_log
 
 int Transfer_worker::execute_transaction(transaction_st *trans)
 {
-  return execute_single_transaction(dummy_rli, trans);
+  int res;
+
+  mysql_mutex_lock(&rli->LOCK_last_committed_id);
+  /*
+    Register us to wait for the previous commit, unless that commit is
+    already finished.
+  */
+  if (trans->wait_commit_id > rli->last_committed_id)
+    thd->register_wait_for_prior_commit(trans->wait_commit_thd);
+  mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+  res= execute_single_transaction(dummy_rli, trans);
+
+  /*
+    It is important to not leave us dangling in the wait-for list of another
+    THD. Best would be to ensure that we never register to wait without
+    actually waiting. But it's cheap, and probably more robust, to do an extra
+    check here and remove our wait registration if we somehow ended up never
+    waiting because of error condition or something.
+  */
+  thd->unregister_wait_for_prior_commit();
+  /*
+    Register our commit so that subsequent transactions/event groups will know
+    not to register to wait for us any more.
+
+    We can race here with the next transactions, but that is fine, as long as
+    we check that we do not decrease last_committed_id. If this commit is done,
+    then any prior commits will also have been done and also no longer need
+    waiting for.
+  */
+  mysql_mutex_lock(&rli->LOCK_last_committed_id);
+  if (rli->last_committed_id < trans->own_commit_id)
+    rli->last_committed_id= trans->own_commit_id;
+  mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+  /*
+    Now that we have marked in rli->last_committed_id that we have committed,
+    no more waiter can register. So wake up any pending one last time.
+  */
+  thd->wakeup_subsequent_commits();
+
+  return res;
 }
 
 int transfer_event_types[] = {TABLE_MAP_EVENT, WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, DELETE_ROWS_EVENT, QUERY_EVENT, XID_EVENT};
@@ -3038,8 +3079,21 @@ int Transfer_worker::wait_for_stopped()
 }
 
 /* return -1 means the worker is full. ok is 0 */
-int Transfer_worker::push_trans(transaction_st *trans)
+int Transfer_worker::push_trans(Relay_log_info *rli, transaction_st *trans)
 {
+
+  uint64 prev_trans_id, this_trans_id;
+
+  prev_trans_id= rli->last_trans_id;
+  this_trans_id= prev_trans_id + 1;
+  rli->last_trans_id= this_trans_id;
+
+  trans->own_commit_id= this_trans_id;
+  trans->wait_commit_id= prev_trans_id;
+  trans->wait_commit_thd= rli->last_trans_thd;
+
+  rli->last_trans_thd= this->thd;
+
   rw_wrlock(&trans_list_lock);
 
   if (waiting_trans_number == 0)
@@ -3281,7 +3335,7 @@ int dispatch_transaction(Relay_log_info
     goto retry;
   }
 
-  if (rli->workers[trans->worker_id]->push_trans(trans) != 0)
+  if (rli->workers[trans->worker_id]->push_trans(rli, trans) != 0)
   {
     rli->rollback_trans_pos(trans);
     my_sleep(1000);

=== modified file 'sql/sql_class.cc'
--- sql/sql_class.cc	2012-12-05 14:05:37 +0000
+++ sql/sql_class.cc	2013-01-09 14:23:15 +0000
@@ -754,6 +754,9 @@ THD::THD()
 #if defined(ENABLED_DEBUG_SYNC)
    debug_sync_control(0),
 #endif /* defined(ENABLED_DEBUG_SYNC) */
+    subsequent_commits_list(0), next_subsequent_commit(0),
+    register_wait_for_commit_thd(0), waiting_for_commit(false),
+    wakeup_subsequent_commits_running(false),
    main_warning_info(0, false)
 {
   ulong tmp;
@@ -834,7 +837,9 @@ THD::THD()
 #endif
   mysql_mutex_init(key_LOCK_thd_data, &LOCK_thd_data, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_LOCK_wakeup_ready, &LOCK_wakeup_ready, MY_MUTEX_INIT_FAST);
+  mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_COND_wakeup_ready, &COND_wakeup_ready, 0);
+  mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
   /*
     LOCK_thread_count goes before LOCK_thd_data - the former is called around
     'delete thd', the latter - in THD::~THD
@@ -5505,6 +5510,187 @@ THD::signal_wakeup_ready()
 }
 
 
+/*
+  Register that the next commit of this THD should wait to complete until
+  commit in another THD (the waitee) has completed.
+
+  The wait may occur explicitly, with the waiter sitting in
+  wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+  Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+  waiter at once during group commit, resolving both of them in the right
+  order.
+
+  Only one waitee can be registered for a waiter; it must be removed by
+  wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+  one is registered. But it is ok for several waiters to register a wait for
+  the same waitee. It is also permissible for one THD to be both a waiter and
+  a waitee at the same time.
+*/
+void
+THD::register_wait_for_prior_commit(THD *waitee)
+{
+  waiting_for_commit= true;
+  DBUG_ASSERT(!register_wait_for_commit_thd /*No prior registration allowed*/);
+  register_wait_for_commit_thd= waitee;
+
+  mysql_mutex_lock(&waitee->LOCK_wait_commit);
+  /*
+    If waitee is in the middle of wakeup, then there is nothing to wait for,
+    so we need not register. This is necessary to avoid a race in unregister,
+    see comments on wakeup_subsequent_commits2() for details.
+  */
+  if (waitee->wakeup_subsequent_commits_running)
+    waiting_for_commit= false;
+  else
+  {
+    this->next_subsequent_commit= waitee->subsequent_commits_list;
+    waitee->subsequent_commits_list= this;
+  }
+  mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}
+
+
+/*
+  Wait for commit of another THD to complete, as already registered with
+  register_wait_for_prior_commit(). If the commit already completed, returns
+  immediately.
+*/
+void
+THD::wait_for_prior_commit2()
+{
+  mysql_mutex_lock(&LOCK_wait_commit);
+  while (waiting_for_commit)
+    mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+  mysql_mutex_unlock(&LOCK_wait_commit);
+  register_wait_for_commit_thd= NULL;
+}
+
+
+/*
+  Wakeup any THDs waiting for us to have committed.
+
+  Note about locking:
+
+  We have a potential race or deadlock between wakeup_subsequent_commits() in
+  the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+  Both waiter and waitee needs to take their own lock before it is safe to take
+  a lock on the other party - else the other party might disappear and invalid
+  memory data could be accessed. But if we take the two locks in different
+  order, we may end up in a deadlock.
+
+  The waiter needs to lock the waitee to delete itself from the list in
+  unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+  hold its own lock while locking waiters, lest we deadlock.
+
+  So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+  is in progress - otherwise the unregister could complete before the wakeup,
+  leading to incorrect spurious wakeup or accessing a freed THD.
+
+  However, if we are in the middle of running wakeup_subsequent_commits(), then
+  there is no need for unregister_wait_for_prior_commit() in the first place -
+  the waiter can just do a normal wait_for_prior_commit(), as it will be
+  immediately woken up.
+
+  So the solution to the potential race/deadlock is to set a flag in the waitee
+  that wakeup_subsequent_commits() is in progress. When this flag is set,
+  unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
+
+  Then also register_wait_for_prior_commit() needs to check if
+  wakeup_subsequent_commits() is running, and skip the registration if
+  so. This is needed in case a new waiter manages to register itself and
+  immediately try to unregister while wakeup_subsequent_commits() is
+  running. Else the new waiter would also wait rather than unregister, but it
+  would not be woken up until next wakeup, which could be potentially much
+  later than necessary.
+*/
+void
+THD::wakeup_subsequent_commits2()
+{
+  THD *wakeup_thd;
+
+  mysql_mutex_lock(&LOCK_wait_commit);
+  wakeup_subsequent_commits_running= true;
+  wakeup_thd= subsequent_commits_list;
+  subsequent_commits_list= NULL;
+  mysql_mutex_unlock(&LOCK_wait_commit);
+
+  while (wakeup_thd)
+  {
+    /*
+      Important: we must grab the next pointer before waking up the waiter;
+      once the wakeup is done, the field could be invalidated at any time.
+    */
+    THD *next= wakeup_thd->next_subsequent_commit;
+
+    /*
+      We signal each waiter on their own condition and mutex (rather than using
+      pthread_cond_broadcast() or something like that).
+
+      Otherwise we would need to somehow ensure that they were done
+      waking up before we could allow this THD to be destroyed, which would
+      be annoying and unnecessary.
+    */
+    mysql_mutex_lock(&wakeup_thd->LOCK_wait_commit);
+    wakeup_thd->waiting_for_commit= false;
+    mysql_cond_signal(&wakeup_thd->COND_wait_commit);
+    mysql_mutex_unlock(&wakeup_thd->LOCK_wait_commit);
+
+    wakeup_thd= next;
+  }
+
+  mysql_mutex_lock(&LOCK_wait_commit);
+  wakeup_subsequent_commits_running= false;
+  mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/* Cancel a previously registered wait for another THD to commit before us. */
+void
+THD::unregister_wait_for_prior_commit2()
+{
+  mysql_mutex_lock(&LOCK_wait_commit);
+  if (waiting_for_commit)
+  {
+    THD *waitee= register_wait_for_commit_thd;
+    THD **next_ptr_ptr, *cur;
+    mysql_mutex_lock(&waitee->LOCK_wait_commit);
+    if (waitee->wakeup_subsequent_commits_running)
+    {
+      /*
+        When a wakeup is running, we cannot safely remove ourselves from the
+        list without corrupting it. Instead we can just wait, as wakeup is
+        already in progress and will thus be immediate.
+
+        See comments on wakeup_subsequent_commits2() for more details.
+      */
+      mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+      while (waiting_for_commit)
+        mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+    }
+    else
+    {
+      /* Remove ourselves from the list in the waitee. */
+      next_ptr_ptr= &waitee->subsequent_commits_list;
+      while ((cur= *next_ptr_ptr) != NULL)
+      {
+        if (cur == this)
+        {
+          *next_ptr_ptr= this->next_subsequent_commit;
+          break;
+        }
+        next_ptr_ptr= &cur->next_subsequent_commit;
+      }
+      waiting_for_commit= false;
+      mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+    }
+  }
+  mysql_mutex_unlock(&LOCK_wait_commit);
+  register_wait_for_commit_thd= NULL;
+}
+
+
 bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
                                  ulonglong incr)
 {

=== modified file 'sql/sql_class.h'
--- sql/sql_class.h	2012-09-22 14:11:40 +0000
+++ sql/sql_class.h	2013-01-09 13:32:34 +0000
@@ -2992,6 +2992,10 @@ class THD :public Statement,
                   MYSQL_ERROR::enum_warning_level level,
                   const char* msg);
 
+  void wait_for_prior_commit2();
+  void wakeup_subsequent_commits2();
+  void unregister_wait_for_prior_commit2();
+
 public:
   /** Overloaded to guard query/query_length fields */
   virtual void set_statement(Statement *stmt);
@@ -3095,6 +3099,94 @@ class THD :public Statement,
   void wait_for_wakeup_ready();
   /* Wake this thread up from wait_for_wakeup_ready(). */
   void signal_wakeup_ready();
+
+  /*
+    Facilities to wait for a prior commit to complete before allowing the
+    commit of this THD to proceed.
+
+    This is used during (parallel) replication, to allow different transactions
+    to be applied in parallel, but still commit in order.
+
+    The transaction that wants to wait for a prior commit must first register
+    to wait with register_wait_for_prior_commit(other_thd). Such registration
+    must be done holding the other_thd->LOCK_wait_commit, to prevent the other
+    THD from disappearing during the registration.
+
+    Then during commit, if a THD is registered to wait, it will call
+    wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
+    registered, or if the THD to wait for has already completed commit, then
+    wait_for_prior_commit() returns immediately.
+
+    And when a THD that may be waited for has completed commit (more precisely
+    commit_ordered()), then it must call wakeup_subsequent_commits() to wake
+    up any waiters. Note that this must be done at a point that is guaranteed
+    to be later than any waiters registering themselves. It is safe to call
+    wakeup_subsequent_commits() multiple times, as waiters are removed from
+    registration as part of the wakeup.
+
+    The reason for separate register and wait calls is that this allows to
+    register the wait early, at a point where the waited-for THD is known to
+    exist. And then the actual wait can be done much later, where the
+    waited-for THD may have been long gone. By registering early, the waitee
+    can signal before disappearing.
+  */
+  void register_wait_for_prior_commit(THD *waitee);
+  void wait_for_prior_commit()
+  {
+    /*
+      Quick inline check, to avoid function call and locking in the common case
+      where no wakeup is registered, or a registered wait was already signalled.
+    */
+    if (waiting_for_commit)
+      wait_for_prior_commit2();
+  }
+  void wakeup_subsequent_commits()
+  {
+    /*
+      Do the check inline, so only the wakeup case takes the cost of a function
+      call for every commmit.
+
+      Note that the check is done without locking. It is the responsibility of
+      the user of the wakeup facility to ensure that no waiters can register
+      themselves after the last call to wakeup_subsequent_commits().
+
+      This avoids having to take another lock for every commit, which would be
+      pointless anyway - even if we check under lock, there is nothing to
+      prevent a waiter from arriving just after releasing the lock.
+    */
+    if (subsequent_commits_list)
+      wakeup_subsequent_commits2();
+  }
+  void unregister_wait_for_prior_commit()
+  {
+    if (waiting_for_commit)
+      unregister_wait_for_prior_commit2();
+  }
+  /*
+    The LOCK_wait_commit protects the fields subsequent_commits_list and
+    wakeup_subsequent_commits_running (for a waitee), and the flag
+    waiting_for_commit and associated COND_wait_commit (for a waiter).
+  */
+  mysql_mutex_t LOCK_wait_commit;
+  mysql_cond_t COND_wait_commit;
+  /* List of THDs that did register_wait_for_prior_commit() on us. */
+  THD *subsequent_commits_list;
+  /* Link field for entries in subsequent_commits_list. */
+  THD *next_subsequent_commit;
+  /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+  THD *register_wait_for_commit_thd;
+  /*
+    The waiting_for_commit flag is cleared when a waiter has been woken
+    up. The COND_wait_commit condition is signalled when this has been
+    cleared.
+  */
+  bool waiting_for_commit;
+  /*
+    Flag set when wakeup_subsequent_commits_running() is active, see commonts
+    on that function for details.
+  */
+  bool wakeup_subsequent_commits_running;
+
 private:
 
   /** The current internal error handler for this thread, or NULL. */


Follow ups

References