← Back to team overview

maria-developers team mailing list archive

Re: 答复: in-order commit

 

Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:

> 丁奇 <dingqi.lxb@xxxxxxxxxx> writes:
>
>> Hi, Kristian
>>     Ok. I have got the information from JIRA.
>>
>
>>    I find you control the commit order inside the user thread.
>>
>> Will it be easier to let Trans_worker thread hold this logic?
>
> Yes, I think you are right. Of course, the user thread is the one that knows
> the ordering, but the logic for waiting needs to be in the Trans_worker
> thread. In fact this is a bug in my first patch: Transaction T3 could wait for
> the THD of worker thread 1 which has both T1 and T2 queued; then it will wake
> up too early, when T1 commits rather than when T2 does.
>
> I will try to implement the new idea today.

I fixed two bugs in my earlier patch (sorry for that, I should have tested a
bit better :-)

This one looks better. It fixes the earlier test failures I saw in
mysql-test-run.pl. I tested it with 20000 inserts, it works and keeps the
commit order, with a nice speedup compared to single-threaded insert on the
master.

There may still be more problems of course. Also, especially with
--sync-binlog=0, I will need to implement my group commit ideas to achieve
the best speed with in-order commit.

New patch is attached (replaces earlier patches). I also pushed it to my
branch:

    lp:~knielsen/maria/dingqi-parallel-replication

 - Kristian.

=== modified file 'sql/handler.cc'
--- sql/handler.cc	2012-09-22 14:11:40 +0000
+++ sql/handler.cc	2013-01-09 15:19:56 +0000
@@ -1318,6 +1318,15 @@ int ha_commit_trans(THD *thd, bool all)
     goto done;
   }
 
+  /*
+    ToDo: Push this wait into tc_log->log_and_order().
+
+    This will allow the TC to use the knowledge that one commit is waiting for
+    another to put them into the same group commit (with the waiter after the
+    waitee in the commit ordering).
+  */
+  thd->wait_for_prior_commit();
+
   DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
   cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
                                 need_commit_ordered);
@@ -1389,6 +1398,8 @@ int ha_commit_one_phase(THD *thd, bool a
   */
   bool is_real_trans=all || thd->transaction.all.ha_list == 0;
   DBUG_ENTER("ha_commit_one_phase");
+  if (is_real_trans)
+    thd->wait_for_prior_commit();
   int res= commit_one_phase_2(thd, all, trans, is_real_trans);
   DBUG_RETURN(res);
 }
@@ -1428,7 +1439,10 @@ commit_one_phase_2(THD *thd, bool all, T
   }
   /* Free resources and perform other cleanup even for 'empty' transactions. */
   if (is_real_trans)
+  {
+    thd->wakeup_subsequent_commits();
     thd->transaction.cleanup();
+  }
 
   DBUG_RETURN(error);
 }
@@ -1503,7 +1517,10 @@ int ha_rollback_trans(THD *thd, bool all
   }
   /* Always cleanup. Even if nht==0. There may be savepoints. */
   if (is_real_trans)
+  {
+    thd->wakeup_subsequent_commits();
     thd->transaction.cleanup();
+  }
   if (all)
     thd->transaction_rollback_request= FALSE;
 

=== modified file 'sql/mysqld.cc'
--- sql/mysqld.cc	2012-12-05 14:05:37 +0000
+++ sql/mysqld.cc	2013-01-11 12:29:33 +0000
@@ -742,7 +742,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key
   key_master_info_sleep_lock,
   key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
   key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
-  key_relay_log_info_sleep_lock,
+  key_relay_log_info_sleep_lock, key_rli_last_committed_id,
   key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
   key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
   key_PARTITION_LOCK_auto_inc;
@@ -751,7 +751,7 @@ PSI_mutex_key key_RELAYLOG_LOCK_index;
 PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
   key_LOCK_global_index_stats,
-  key_LOCK_wakeup_ready;
+  key_LOCK_wakeup_ready, key_LOCK_wait_commit;
 
 PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
 
@@ -795,6 +795,7 @@ static PSI_mutex_info all_server_mutexes
   { &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
   { &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
   { &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
+  { &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
   { &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
   { &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
   { &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -807,6 +808,7 @@ static PSI_mutex_info all_server_mutexes
   { &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
   { &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
   { &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
+  { &key_rli_last_committed_id, "Relay_log_info::LOCK_last_committed_id", 0},
   { &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
   { &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
   { &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
@@ -851,7 +853,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, k
   key_TABLE_SHARE_cond, key_user_level_lock_cond,
   key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
   key_BINLOG_COND_queue_busy;
-PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+  key_COND_wait_commit;
 PSI_cond_key key_RELAYLOG_COND_queue_busy;
 PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 
@@ -872,6 +875,7 @@ static PSI_cond_info all_server_conds[]=
   { &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
   { &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
   { &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
+  { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
   { &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
   { &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
   { &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},

=== modified file 'sql/mysqld.h'
--- sql/mysqld.h	2012-12-05 14:05:37 +0000
+++ sql/mysqld.h	2013-01-09 15:19:56 +0000
@@ -243,14 +243,14 @@ extern PSI_mutex_key key_BINLOG_LOCK_ind
   key_master_info_sleep_lock,
   key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
   key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
-  key_relay_log_info_sleep_lock,
+  key_relay_log_info_sleep_lock, key_rli_last_committed_id,
   key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
   key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
 extern PSI_mutex_key key_RELAYLOG_LOCK_index;
 
 extern PSI_mutex_key key_LOCK_stats,
   key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
-  key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
+  key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
 
 extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
   key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -272,7 +272,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_
   key_relay_log_info_sleep_cond,
   key_TABLE_SHARE_cond, key_user_level_lock_cond,
   key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
-extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+  key_COND_wait_commit;
 extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
 extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
 

=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc	2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.cc	2013-01-11 14:01:02 +0000
@@ -43,6 +43,7 @@ Relay_log_info::Relay_log_info(bool is_s
    sync_counter(0), is_relay_log_recovery(is_slave_recovery),
    save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0), 
    event_relay_log_pos(0),
+   last_trans_id(0), last_trans(0), last_committed_id(0),
 #if HAVE_valgrind
    is_fake(FALSE),
 #endif
@@ -78,6 +79,8 @@ Relay_log_info::Relay_log_info(bool is_s
   mysql_mutex_init(key_relay_log_info_log_space_lock,
                    &log_space_lock, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
+  mysql_mutex_init(key_rli_last_committed_id, &LOCK_last_committed_id,
+                   MY_MUTEX_INIT_SLOW);
   mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
   mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
   mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
@@ -102,6 +105,7 @@ Relay_log_info::~Relay_log_info()
   mysql_mutex_destroy(&data_lock);
   mysql_mutex_destroy(&log_space_lock);
   mysql_mutex_destroy(&sleep_lock);
+  mysql_mutex_destroy(&LOCK_last_committed_id);
   mysql_cond_destroy(&data_cond);
   mysql_cond_destroy(&start_cond);
   mysql_cond_destroy(&stop_cond);

=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h	2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.h	2013-01-11 13:58:12 +0000
@@ -106,6 +106,25 @@ class transaction_st{
   my_off_t relay_log_pos;
   trans_pos_t *trans_pos;
 
+  /* This is used to keep transaction commit order. */
+  wait_for_commit commit_orderer;
+  /*
+    This is the ID (Relay_log_info::last_trans_id) of the previous transaction,
+    that we want to wait for before committing ourselves (if ordered commits
+    are enforced).
+  */
+  uint64 wait_commit_id;
+  /*
+    This is the transaction whose commit we want to wait for.
+    Only valid if Relay_log_info::last_committed_id < wait_commit_id.
+  */
+  transaction_st *wait_commit_trans;
+  /*
+    This is our own transaction id, which we should update
+    Relay_log_info::last_committed_id to once we commit.
+  */
+  uint64 own_commit_id;
+
   transaction_st();
 
   ~transaction_st();
@@ -123,7 +142,7 @@ class Transfer_worker
   int start();
   int stop();
   int wait_for_stopped();
-  int push_trans(transaction_st *trans);
+  int push_trans(Relay_log_info *rli, transaction_st *trans);
   int pop_trans(int batch_trans_n);
   int remove_trans(transaction_st *trans);
   bool check_trans_conflict(transaction_st *trans);
@@ -294,6 +313,20 @@ class Relay_log_info : public Slave_repo
   int push_back_trans_pos(transaction_st *trans);
   int rollback_trans_pos(transaction_st *trans);
   int pop_front_trans_pos();
+
+  /* Running counter for assigning IDs to event groups/transactions. */
+  uint64 last_trans_id;
+  /* The transaction_st corresponding to last_trans_id. */
+  transaction_st *last_trans;
+  /*
+    The ID of the last transaction/event group that was committed/applied.
+    This is used to decide if the next transaction should wait for the
+    previous one to commit (to avoid trying to wait for a commit that already
+    took place).
+  */
+  uint64 last_committed_id;
+  /* Mutex protecting access to last_committed_id. */
+  mysql_mutex_t LOCK_last_committed_id;
   /*Transfer end*/
 
 

=== modified file 'sql/slave.cc'
--- sql/slave.cc	2012-12-05 14:05:37 +0000
+++ sql/slave.cc	2013-01-11 16:45:15 +0000
@@ -2857,7 +2857,64 @@ int execute_single_transaction(Relay_log
 
 int Transfer_worker::execute_transaction(transaction_st *trans)
 {
-  return execute_single_transaction(dummy_rli, trans);
+  int res;
+
+  mysql_mutex_lock(&rli->LOCK_last_committed_id);
+  /*
+    Register us to wait for the previous commit, unless that commit is
+    already finished.
+  */
+  if (trans->wait_commit_id > rli->last_committed_id)
+  {
+    trans->commit_orderer.register_wait_for_prior_commit
+      (&trans->wait_commit_trans->commit_orderer);
+  }
+  mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+  DBUG_ASSERT(!thd->wait_for_commit_ptr);
+  thd->wait_for_commit_ptr= &trans->commit_orderer;
+
+  res= execute_single_transaction(dummy_rli, trans);
+
+  /*
+    It is important to not leave us dangling in the wait-for list of another
+    THD. Best would be to ensure that we never register to wait without
+    actually waiting. But it's cheap, and probably more robust, to do an extra
+    check here and remove our wait registration if we somehow ended up never
+    waiting because of error condition or something.
+
+    ToDo: We need to *wait* here, not unregister. Because we must not wake
+    up following transactions until all prior transactions have completed.
+
+    If we do not want to wait, then alternatively we must put the
+    transaction_st * trans into some pending list, where it can be "woken up"
+    asynchroneously when the prior transaction _does_ commit.
+  */
+  trans->commit_orderer.unregister_wait_for_prior_commit();
+
+  thd->wait_for_commit_ptr= NULL;
+
+  /*
+    Register our commit so that subsequent transactions/event groups will know
+    not to register to wait for us any more.
+
+    We can race here with the next transactions, but that is fine, as long as
+    we check that we do not decrease last_committed_id. If this commit is done,
+    then any prior commits will also have been done and also no longer need
+    waiting for.
+  */
+  mysql_mutex_lock(&rli->LOCK_last_committed_id);
+  if (rli->last_committed_id < trans->own_commit_id)
+    rli->last_committed_id= trans->own_commit_id;
+  mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+  /*
+    Now that we have marked in rli->last_committed_id that we have committed,
+    no more waiter can register. So wake up any pending one last time.
+  */
+  trans->commit_orderer.wakeup_subsequent_commits();
+
+  return res;
 }
 
 int transfer_event_types[] = {TABLE_MAP_EVENT, WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, DELETE_ROWS_EVENT, QUERY_EVENT, XID_EVENT};
@@ -2889,13 +2946,11 @@ int Transfer_worker::run()
   thd= new THD;
   pthread_detach_this_thread();
   thd->thread_stack= (char*) &i;
-  thd->variables= rli->sql_thd->variables;
+  init_slave_thread(thd, SLAVE_THD_SQL);
   thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
   thd->variables.dynamic_variables_ptr= NULL;
-  thd->security_ctx= rli->sql_thd->security_ctx;
   thd->lex->thd= thd;
   thd->lex->derived_tables= 0;
-  thd->system_thread = SYSTEM_THREAD_SLAVE_SQL;
 
   dummy_rli= new Relay_log_info(FALSE);
   dummy_rli->no_storage= TRUE;
@@ -2991,6 +3046,7 @@ int Transfer_worker::run()
 
   rw_unlock(&trans_list_lock);
 
+  net_end(&thd->net);
   delete thd;
   thd= NULL;
 
@@ -3039,8 +3095,11 @@ int Transfer_worker::wait_for_stopped()
 }
 
 /* return -1 means the worker is full. ok is 0 */
-int Transfer_worker::push_trans(transaction_st *trans)
+int Transfer_worker::push_trans(Relay_log_info *rli, transaction_st *trans)
 {
+
+  uint64 prev_trans_id, this_trans_id;
+
   rw_wrlock(&trans_list_lock);
 
   if (waiting_trans_number == 0)
@@ -3057,6 +3116,16 @@ int Transfer_worker::push_trans(transact
     list_end= (list_end + 1) % worker_size;
   }
 
+  prev_trans_id= rli->last_trans_id;
+  this_trans_id= prev_trans_id + 1;
+  rli->last_trans_id= this_trans_id;
+
+  trans->own_commit_id= this_trans_id;
+  trans->wait_commit_id= prev_trans_id;
+  trans->wait_commit_trans= rli->last_trans;
+
+  rli->last_trans= trans;
+
   pk_hash_plus(trans);
 
   trans_list[list_end]= trans;
@@ -3282,7 +3351,7 @@ int dispatch_transaction(Relay_log_info
     goto retry;
   }
 
-  if (rli->workers[trans->worker_id]->push_trans(trans) != 0)
+  if (rli->workers[trans->worker_id]->push_trans(rli, trans) != 0)
   {
     rli->rollback_trans_pos(trans);
     my_sleep(1000);
@@ -3327,6 +3396,9 @@ int pack_trans(Relay_log_info *rli, Log_
   int ev_type= ev->get_type_code();
   Query_log_event *qev= (Query_log_event *)ev;
   Rows_log_event *rev= (Rows_log_event*) ev;
+  DML_prelocking_strategy prelocking_strategy;
+  uint counter;
+  TABLE_LIST *tables;
 
   if (ev_type == XID_EVENT)
   {
@@ -3378,9 +3450,10 @@ int pack_trans(Relay_log_info *rli, Log_
     case UPDATE_ROWS_EVENT:
     case DELETE_ROWS_EVENT:
       thd->lex->thd= thd;
+      tables= rli->tables_to_lock;
       if ((rli->tables_to_lock) &&
           (rli->tables_to_lock->table || 
-           !open_normal_and_derived_tables(thd, rli->tables_to_lock, 0, 0))
+           !open_tables(thd, &tables, &counter, 0, &prelocking_strategy))
           )
       {
         rev->get_pk_value(rli);

=== modified file 'sql/sql_class.cc'
--- sql/sql_class.cc	2012-12-05 14:05:37 +0000
+++ sql/sql_class.cc	2013-01-11 16:45:24 +0000
@@ -754,6 +754,7 @@ THD::THD()
 #if defined(ENABLED_DEBUG_SYNC)
    debug_sync_control(0),
 #endif /* defined(ENABLED_DEBUG_SYNC) */
+   wait_for_commit_ptr(0),
    main_warning_info(0, false)
 {
   ulong tmp;
@@ -5505,6 +5506,196 @@ THD::signal_wakeup_ready()
 }
 
 
+wait_for_commit::wait_for_commit()
+  : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
+    waiting_for_commit(false), wakeup_subsequent_commits_running(false)
+{
+  mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
+  mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+}
+
+
+/*
+  Register that the next commit of this THD should wait to complete until
+  commit in another THD (the waitee) has completed.
+
+  The wait may occur explicitly, with the waiter sitting in
+  wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+  Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+  waiter at once during group commit, resolving both of them in the right
+  order.
+
+  Only one waitee can be registered for a waiter; it must be removed by
+  wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+  one is registered. But it is ok for several waiters to register a wait for
+  the same waitee. It is also permissible for one THD to be both a waiter and
+  a waitee at the same time.
+*/
+void
+wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
+{
+  waiting_for_commit= true;
+  DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+  this->waitee= waitee;
+
+  mysql_mutex_lock(&waitee->LOCK_wait_commit);
+  /*
+    If waitee is in the middle of wakeup, then there is nothing to wait for,
+    so we need not register. This is necessary to avoid a race in unregister,
+    see comments on wakeup_subsequent_commits2() for details.
+  */
+  if (waitee->wakeup_subsequent_commits_running)
+    waiting_for_commit= false;
+  else
+  {
+    this->next_subsequent_commit= waitee->subsequent_commits_list;
+    waitee->subsequent_commits_list= this;
+  }
+  mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}
+
+
+/*
+  Wait for commit of another transaction to complete, as already registered
+  with register_wait_for_prior_commit(). If the commit already completed,
+  returns immediately.
+*/
+void
+wait_for_commit::wait_for_prior_commit2()
+{
+  mysql_mutex_lock(&LOCK_wait_commit);
+  while (waiting_for_commit)
+    mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+  mysql_mutex_unlock(&LOCK_wait_commit);
+  waitee= NULL;
+}
+
+
+/*
+  Wakeup anyone waiting for us to have committed.
+
+  Note about locking:
+
+  We have a potential race or deadlock between wakeup_subsequent_commits() in
+  the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+  Both waiter and waitee needs to take their own lock before it is safe to take
+  a lock on the other party - else the other party might disappear and invalid
+  memory data could be accessed. But if we take the two locks in different
+  order, we may end up in a deadlock.
+
+  The waiter needs to lock the waitee to delete itself from the list in
+  unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+  hold its own lock while locking waiters, lest we deadlock.
+
+  So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+  is in progress - otherwise the unregister could complete before the wakeup,
+  leading to incorrect spurious wakeup or accessing invalid memory.
+
+  However, if we are in the middle of running wakeup_subsequent_commits(), then
+  there is no need for unregister_wait_for_prior_commit() in the first place -
+  the waiter can just do a normal wait_for_prior_commit(), as it will be
+  immediately woken up.
+
+  So the solution to the potential race/deadlock is to set a flag in the waitee
+  that wakeup_subsequent_commits() is in progress. When this flag is set,
+  unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
+
+  Then also register_wait_for_prior_commit() needs to check if
+  wakeup_subsequent_commits() is running, and skip the registration if
+  so. This is needed in case a new waiter manages to register itself and
+  immediately try to unregister while wakeup_subsequent_commits() is
+  running. Else the new waiter would also wait rather than unregister, but it
+  would not be woken up until next wakeup, which could be potentially much
+  later than necessary.
+*/
+void
+wait_for_commit::wakeup_subsequent_commits2()
+{
+  wait_for_commit *waiter;
+
+  mysql_mutex_lock(&LOCK_wait_commit);
+  wakeup_subsequent_commits_running= true;
+  waiter= subsequent_commits_list;
+  subsequent_commits_list= NULL;
+  mysql_mutex_unlock(&LOCK_wait_commit);
+
+  while (waiter)
+  {
+    /*
+      Important: we must grab the next pointer before waking up the waiter;
+      once the wakeup is done, the field could be invalidated at any time.
+    */
+    wait_for_commit *next= waiter->next_subsequent_commit;
+
+    /*
+      We signal each waiter on their own condition and mutex (rather than using
+      pthread_cond_broadcast() or something like that).
+
+      Otherwise we would need to somehow ensure that they were done
+      waking up before we could allow this THD to be destroyed, which would
+      be annoying and unnecessary.
+    */
+    mysql_mutex_lock(&waiter->LOCK_wait_commit);
+    waiter->waiting_for_commit= false;
+    mysql_cond_signal(&waiter->COND_wait_commit);
+    mysql_mutex_unlock(&waiter->LOCK_wait_commit);
+
+    waiter= next;
+  }
+
+  mysql_mutex_lock(&LOCK_wait_commit);
+  wakeup_subsequent_commits_running= false;
+  mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/* Cancel a previously registered wait for another THD to commit before us. */
+void
+wait_for_commit::unregister_wait_for_prior_commit2()
+{
+  mysql_mutex_lock(&LOCK_wait_commit);
+  if (waiting_for_commit)
+  {
+    wait_for_commit *loc_waitee= this->waitee;
+    wait_for_commit **next_ptr_ptr, *cur;
+    mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+    if (loc_waitee->wakeup_subsequent_commits_running)
+    {
+      /*
+        When a wakeup is running, we cannot safely remove ourselves from the
+        list without corrupting it. Instead we can just wait, as wakeup is
+        already in progress and will thus be immediate.
+
+        See comments on wakeup_subsequent_commits2() for more details.
+      */
+      mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+      while (waiting_for_commit)
+        mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+    }
+    else
+    {
+      /* Remove ourselves from the list in the waitee. */
+      next_ptr_ptr= &loc_waitee->subsequent_commits_list;
+      while ((cur= *next_ptr_ptr) != NULL)
+      {
+        if (cur == this)
+        {
+          *next_ptr_ptr= this->next_subsequent_commit;
+          break;
+        }
+        next_ptr_ptr= &cur->next_subsequent_commit;
+      }
+      waiting_for_commit= false;
+      mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+    }
+  }
+  mysql_mutex_unlock(&LOCK_wait_commit);
+  this->waitee= NULL;
+}
+
+
 bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
                                  ulonglong incr)
 {

=== modified file 'sql/sql_class.h'
--- sql/sql_class.h	2012-09-22 14:11:40 +0000
+++ sql/sql_class.h	2013-01-11 12:28:00 +0000
@@ -1518,6 +1518,104 @@ class Global_read_lock
 };
 
 
+/*
+  Class to facilitate the commit of one transactions waiting for the commit of
+  another transaction to complete first.
+
+  This is used during (parallel) replication, to allow different transactions
+  to be applied in parallel, but still commit in order.
+
+  The transaction that wants to wait for a prior commit must first register
+  to wait with register_wait_for_prior_commit(waitee). Such registration
+  must be done holding the waitee->LOCK_wait_commit, to prevent the other
+  THD from disappearing during the registration.
+
+  Then during commit, if a THD is registered to wait, it will call
+  wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
+  registered, or if the waitee for has already completed commit, then
+  wait_for_prior_commit() returns immediately.
+
+  And when a THD that may be waited for has completed commit (more precisely
+  commit_ordered()), then it must call wakeup_subsequent_commits() to wake
+  up any waiters. Note that this must be done at a point that is guaranteed
+  to be later than any waiters registering themselves. It is safe to call
+  wakeup_subsequent_commits() multiple times, as waiters are removed from
+  registration as part of the wakeup.
+
+  The reason for separate register and wait calls is that this allows to
+  register the wait early, at a point where the waited-for THD is known to
+  exist. And then the actual wait can be done much later, where the
+  waited-for THD may have been long gone. By registering early, the waitee
+  can signal before disappearing.
+*/
+struct wait_for_commit
+{
+  /*
+    The LOCK_wait_commit protects the fields subsequent_commits_list and
+    wakeup_subsequent_commits_running (for a waitee), and the flag
+    waiting_for_commit and associated COND_wait_commit (for a waiter).
+  */
+  mysql_mutex_t LOCK_wait_commit;
+  mysql_cond_t COND_wait_commit;
+  /* List of threads that did register_wait_for_prior_commit() on us. */
+  wait_for_commit *subsequent_commits_list;
+  /* Link field for entries in subsequent_commits_list. */
+  wait_for_commit *next_subsequent_commit;
+  /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+  wait_for_commit *waitee;
+  /*
+    The waiting_for_commit flag is cleared when a waiter has been woken
+    up. The COND_wait_commit condition is signalled when this has been
+    cleared.
+  */
+  bool waiting_for_commit;
+  /*
+    Flag set when wakeup_subsequent_commits_running() is active, see commonts
+    on that function for details.
+  */
+  bool wakeup_subsequent_commits_running;
+
+  void register_wait_for_prior_commit(wait_for_commit *waitee);
+  void wait_for_prior_commit()
+  {
+    /*
+      Quick inline check, to avoid function call and locking in the common case
+      where no wakeup is registered, or a registered wait was already signalled.
+    */
+    if (waiting_for_commit)
+      wait_for_prior_commit2();
+  }
+  void wakeup_subsequent_commits()
+  {
+    /*
+      Do the check inline, so only the wakeup case takes the cost of a function
+      call for every commmit.
+
+      Note that the check is done without locking. It is the responsibility of
+      the user of the wakeup facility to ensure that no waiters can register
+      themselves after the last call to wakeup_subsequent_commits().
+
+      This avoids having to take another lock for every commit, which would be
+      pointless anyway - even if we check under lock, there is nothing to
+      prevent a waiter from arriving just after releasing the lock.
+    */
+    if (subsequent_commits_list)
+      wakeup_subsequent_commits2();
+  }
+  void unregister_wait_for_prior_commit()
+  {
+    if (waiting_for_commit)
+      unregister_wait_for_prior_commit2();
+  }
+
+  void wait_for_prior_commit2();
+  void wakeup_subsequent_commits2();
+  void unregister_wait_for_prior_commit2();
+
+  wait_for_commit();
+};
+
+
 extern "C" void my_message_sql(uint error, const char *str, myf MyFlags);
 
 /**
@@ -3095,6 +3193,19 @@ class THD :public Statement,
   void wait_for_wakeup_ready();
   /* Wake this thread up from wait_for_wakeup_ready(). */
   void signal_wakeup_ready();
+
+  wait_for_commit *wait_for_commit_ptr;
+  void wait_for_prior_commit()
+  {
+    if (wait_for_commit_ptr)
+      wait_for_commit_ptr->wait_for_prior_commit();
+  }
+  void wakeup_subsequent_commits()
+  {
+    if (wait_for_commit_ptr)
+      wait_for_commit_ptr->wakeup_subsequent_commits();
+  }
+
 private:
 
   /** The current internal error handler for this thread, or NULL. */


Follow ups

References