maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #05045
Re: 答复: in-order commit
Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:
> 丁奇 <dingqi.lxb@xxxxxxxxxx> writes:
>
>> Hi, Kristian
>> Ok. I have got the information from JIRA.
>>
>
>> I find you control the commit order inside the user thread.
>>
>> Will it be easier to let Trans_worker thread hold this logic?
>
> Yes, I think you are right. Of course, the user thread is the one that knows
> the ordering, but the logic for waiting needs to be in the Trans_worker
> thread. In fact this is a bug in my first patch: Transaction T3 could wait for
> the THD of worker thread 1 which has both T1 and T2 queued; then it will wake
> up too early, when T1 commits rather than when T2 does.
>
> I will try to implement the new idea today.
I fixed two bugs in my earlier patch (sorry for that, I should have tested a
bit better :-)
This one looks better. It fixes the earlier test failures I saw in
mysql-test-run.pl. I tested it with 20000 inserts, it works and keeps the
commit order, with a nice speedup compared to single-threaded insert on the
master.
There may still be more problems of course. Also, especially with
--sync-binlog=0, I will need to implement my group commit ideas to achieve
the best speed with in-order commit.
New patch is attached (replaces earlier patches). I also pushed it to my
branch:
lp:~knielsen/maria/dingqi-parallel-replication
- Kristian.
=== modified file 'sql/handler.cc'
--- sql/handler.cc 2012-09-22 14:11:40 +0000
+++ sql/handler.cc 2013-01-09 15:19:56 +0000
@@ -1318,6 +1318,15 @@ int ha_commit_trans(THD *thd, bool all)
goto done;
}
+ /*
+ ToDo: Push this wait into tc_log->log_and_order().
+
+ This will allow the TC to use the knowledge that one commit is waiting for
+ another to put them into the same group commit (with the waiter after the
+ waitee in the commit ordering).
+ */
+ thd->wait_for_prior_commit();
+
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
cookie= tc_log->log_and_order(thd, xid, all, need_prepare_ordered,
need_commit_ordered);
@@ -1389,6 +1398,8 @@ int ha_commit_one_phase(THD *thd, bool a
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
DBUG_ENTER("ha_commit_one_phase");
+ if (is_real_trans)
+ thd->wait_for_prior_commit();
int res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
@@ -1428,7 +1439,10 @@ commit_one_phase_2(THD *thd, bool all, T
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+ {
+ thd->wakeup_subsequent_commits();
thd->transaction.cleanup();
+ }
DBUG_RETURN(error);
}
@@ -1503,7 +1517,10 @@ int ha_rollback_trans(THD *thd, bool all
}
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
+ {
+ thd->wakeup_subsequent_commits();
thd->transaction.cleanup();
+ }
if (all)
thd->transaction_rollback_request= FALSE;
=== modified file 'sql/mysqld.cc'
--- sql/mysqld.cc 2012-12-05 14:05:37 +0000
+++ sql/mysqld.cc 2013-01-11 12:29:33 +0000
@@ -742,7 +742,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
- key_relay_log_info_sleep_lock,
+ key_relay_log_info_sleep_lock, key_rli_last_committed_id,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count,
key_PARTITION_LOCK_auto_inc;
@@ -751,7 +751,7 @@ PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
- key_LOCK_wakeup_ready;
+ key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -795,6 +795,7 @@ static PSI_mutex_info all_server_mutexes
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
+ { &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -807,6 +808,7 @@ static PSI_mutex_info all_server_mutexes
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
{ &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
{ &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
+ { &key_rli_last_committed_id, "Relay_log_info::LOCK_last_committed_id", 0},
{ &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
{ &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
@@ -851,7 +853,8 @@ PSI_cond_key key_BINLOG_COND_xid_list, k
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
-PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+ key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
@@ -872,6 +875,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_update_cond, "MYSQL_RELAY_LOG::update_cond", 0},
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
+ { &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_status, "COND_rpl_status", PSI_FLAG_GLOBAL},
=== modified file 'sql/mysqld.h'
--- sql/mysqld.h 2012-12-05 14:05:37 +0000
+++ sql/mysqld.h 2013-01-09 15:19:56 +0000
@@ -243,14 +243,14 @@ extern PSI_mutex_key key_BINLOG_LOCK_ind
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
- key_relay_log_info_sleep_lock,
+ key_relay_log_info_sleep_lock, key_rli_last_committed_id,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
- key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
+ key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -272,7 +272,8 @@ extern PSI_cond_key key_BINLOG_COND_xid_
key_relay_log_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
-extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
+extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
+ key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc 2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.cc 2013-01-11 14:01:02 +0000
@@ -43,6 +43,7 @@ Relay_log_info::Relay_log_info(bool is_s
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
+ last_trans_id(0), last_trans(0), last_committed_id(0),
#if HAVE_valgrind
is_fake(FALSE),
#endif
@@ -78,6 +79,8 @@ Relay_log_info::Relay_log_info(bool is_s
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_rli_last_committed_id, &LOCK_last_committed_id,
+ MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
@@ -102,6 +105,7 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
mysql_mutex_destroy(&sleep_lock);
+ mysql_mutex_destroy(&LOCK_last_committed_id);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h 2012-12-05 14:05:37 +0000
+++ sql/rpl_rli.h 2013-01-11 13:58:12 +0000
@@ -106,6 +106,25 @@ class transaction_st{
my_off_t relay_log_pos;
trans_pos_t *trans_pos;
+ /* This is used to keep transaction commit order. */
+ wait_for_commit commit_orderer;
+ /*
+ This is the ID (Relay_log_info::last_trans_id) of the previous transaction,
+ that we want to wait for before committing ourselves (if ordered commits
+ are enforced).
+ */
+ uint64 wait_commit_id;
+ /*
+ This is the transaction whose commit we want to wait for.
+ Only valid if Relay_log_info::last_committed_id < wait_commit_id.
+ */
+ transaction_st *wait_commit_trans;
+ /*
+ This is our own transaction id, which we should update
+ Relay_log_info::last_committed_id to once we commit.
+ */
+ uint64 own_commit_id;
+
transaction_st();
~transaction_st();
@@ -123,7 +142,7 @@ class Transfer_worker
int start();
int stop();
int wait_for_stopped();
- int push_trans(transaction_st *trans);
+ int push_trans(Relay_log_info *rli, transaction_st *trans);
int pop_trans(int batch_trans_n);
int remove_trans(transaction_st *trans);
bool check_trans_conflict(transaction_st *trans);
@@ -294,6 +313,20 @@ class Relay_log_info : public Slave_repo
int push_back_trans_pos(transaction_st *trans);
int rollback_trans_pos(transaction_st *trans);
int pop_front_trans_pos();
+
+ /* Running counter for assigning IDs to event groups/transactions. */
+ uint64 last_trans_id;
+ /* The transaction_st corresponding to last_trans_id. */
+ transaction_st *last_trans;
+ /*
+ The ID of the last transaction/event group that was committed/applied.
+ This is used to decide if the next transaction should wait for the
+ previous one to commit (to avoid trying to wait for a commit that already
+ took place).
+ */
+ uint64 last_committed_id;
+ /* Mutex protecting access to last_committed_id. */
+ mysql_mutex_t LOCK_last_committed_id;
/*Transfer end*/
=== modified file 'sql/slave.cc'
--- sql/slave.cc 2012-12-05 14:05:37 +0000
+++ sql/slave.cc 2013-01-11 16:45:15 +0000
@@ -2857,7 +2857,64 @@ int execute_single_transaction(Relay_log
int Transfer_worker::execute_transaction(transaction_st *trans)
{
- return execute_single_transaction(dummy_rli, trans);
+ int res;
+
+ mysql_mutex_lock(&rli->LOCK_last_committed_id);
+ /*
+ Register us to wait for the previous commit, unless that commit is
+ already finished.
+ */
+ if (trans->wait_commit_id > rli->last_committed_id)
+ {
+ trans->commit_orderer.register_wait_for_prior_commit
+ (&trans->wait_commit_trans->commit_orderer);
+ }
+ mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+ DBUG_ASSERT(!thd->wait_for_commit_ptr);
+ thd->wait_for_commit_ptr= &trans->commit_orderer;
+
+ res= execute_single_transaction(dummy_rli, trans);
+
+ /*
+ It is important to not leave us dangling in the wait-for list of another
+ THD. Best would be to ensure that we never register to wait without
+ actually waiting. But it's cheap, and probably more robust, to do an extra
+ check here and remove our wait registration if we somehow ended up never
+ waiting because of error condition or something.
+
+ ToDo: We need to *wait* here, not unregister. Because we must not wake
+ up following transactions until all prior transactions have completed.
+
+ If we do not want to wait, then alternatively we must put the
+ transaction_st * trans into some pending list, where it can be "woken up"
+ asynchroneously when the prior transaction _does_ commit.
+ */
+ trans->commit_orderer.unregister_wait_for_prior_commit();
+
+ thd->wait_for_commit_ptr= NULL;
+
+ /*
+ Register our commit so that subsequent transactions/event groups will know
+ not to register to wait for us any more.
+
+ We can race here with the next transactions, but that is fine, as long as
+ we check that we do not decrease last_committed_id. If this commit is done,
+ then any prior commits will also have been done and also no longer need
+ waiting for.
+ */
+ mysql_mutex_lock(&rli->LOCK_last_committed_id);
+ if (rli->last_committed_id < trans->own_commit_id)
+ rli->last_committed_id= trans->own_commit_id;
+ mysql_mutex_unlock(&rli->LOCK_last_committed_id);
+
+ /*
+ Now that we have marked in rli->last_committed_id that we have committed,
+ no more waiter can register. So wake up any pending one last time.
+ */
+ trans->commit_orderer.wakeup_subsequent_commits();
+
+ return res;
}
int transfer_event_types[] = {TABLE_MAP_EVENT, WRITE_ROWS_EVENT, UPDATE_ROWS_EVENT, DELETE_ROWS_EVENT, QUERY_EVENT, XID_EVENT};
@@ -2889,13 +2946,11 @@ int Transfer_worker::run()
thd= new THD;
pthread_detach_this_thread();
thd->thread_stack= (char*) &i;
- thd->variables= rli->sql_thd->variables;
+ init_slave_thread(thd, SLAVE_THD_SQL);
thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
thd->variables.dynamic_variables_ptr= NULL;
- thd->security_ctx= rli->sql_thd->security_ctx;
thd->lex->thd= thd;
thd->lex->derived_tables= 0;
- thd->system_thread = SYSTEM_THREAD_SLAVE_SQL;
dummy_rli= new Relay_log_info(FALSE);
dummy_rli->no_storage= TRUE;
@@ -2991,6 +3046,7 @@ int Transfer_worker::run()
rw_unlock(&trans_list_lock);
+ net_end(&thd->net);
delete thd;
thd= NULL;
@@ -3039,8 +3095,11 @@ int Transfer_worker::wait_for_stopped()
}
/* return -1 means the worker is full. ok is 0 */
-int Transfer_worker::push_trans(transaction_st *trans)
+int Transfer_worker::push_trans(Relay_log_info *rli, transaction_st *trans)
{
+
+ uint64 prev_trans_id, this_trans_id;
+
rw_wrlock(&trans_list_lock);
if (waiting_trans_number == 0)
@@ -3057,6 +3116,16 @@ int Transfer_worker::push_trans(transact
list_end= (list_end + 1) % worker_size;
}
+ prev_trans_id= rli->last_trans_id;
+ this_trans_id= prev_trans_id + 1;
+ rli->last_trans_id= this_trans_id;
+
+ trans->own_commit_id= this_trans_id;
+ trans->wait_commit_id= prev_trans_id;
+ trans->wait_commit_trans= rli->last_trans;
+
+ rli->last_trans= trans;
+
pk_hash_plus(trans);
trans_list[list_end]= trans;
@@ -3282,7 +3351,7 @@ int dispatch_transaction(Relay_log_info
goto retry;
}
- if (rli->workers[trans->worker_id]->push_trans(trans) != 0)
+ if (rli->workers[trans->worker_id]->push_trans(rli, trans) != 0)
{
rli->rollback_trans_pos(trans);
my_sleep(1000);
@@ -3327,6 +3396,9 @@ int pack_trans(Relay_log_info *rli, Log_
int ev_type= ev->get_type_code();
Query_log_event *qev= (Query_log_event *)ev;
Rows_log_event *rev= (Rows_log_event*) ev;
+ DML_prelocking_strategy prelocking_strategy;
+ uint counter;
+ TABLE_LIST *tables;
if (ev_type == XID_EVENT)
{
@@ -3378,9 +3450,10 @@ int pack_trans(Relay_log_info *rli, Log_
case UPDATE_ROWS_EVENT:
case DELETE_ROWS_EVENT:
thd->lex->thd= thd;
+ tables= rli->tables_to_lock;
if ((rli->tables_to_lock) &&
(rli->tables_to_lock->table ||
- !open_normal_and_derived_tables(thd, rli->tables_to_lock, 0, 0))
+ !open_tables(thd, &tables, &counter, 0, &prelocking_strategy))
)
{
rev->get_pk_value(rli);
=== modified file 'sql/sql_class.cc'
--- sql/sql_class.cc 2012-12-05 14:05:37 +0000
+++ sql/sql_class.cc 2013-01-11 16:45:24 +0000
@@ -754,6 +754,7 @@ THD::THD()
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
#endif /* defined(ENABLED_DEBUG_SYNC) */
+ wait_for_commit_ptr(0),
main_warning_info(0, false)
{
ulong tmp;
@@ -5505,6 +5506,196 @@ THD::signal_wakeup_ready()
}
+wait_for_commit::wait_for_commit()
+ : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
+ waiting_for_commit(false), wakeup_subsequent_commits_running(false)
+{
+ mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+}
+
+
+/*
+ Register that the next commit of this THD should wait to complete until
+ commit in another THD (the waitee) has completed.
+
+ The wait may occur explicitly, with the waiter sitting in
+ wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+ Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+ waiter at once during group commit, resolving both of them in the right
+ order.
+
+ Only one waitee can be registered for a waiter; it must be removed by
+ wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+ one is registered. But it is ok for several waiters to register a wait for
+ the same waitee. It is also permissible for one THD to be both a waiter and
+ a waitee at the same time.
+*/
+void
+wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
+{
+ waiting_for_commit= true;
+ DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ this->waitee= waitee;
+
+ mysql_mutex_lock(&waitee->LOCK_wait_commit);
+ /*
+ If waitee is in the middle of wakeup, then there is nothing to wait for,
+ so we need not register. This is necessary to avoid a race in unregister,
+ see comments on wakeup_subsequent_commits2() for details.
+ */
+ if (waitee->wakeup_subsequent_commits_running)
+ waiting_for_commit= false;
+ else
+ {
+ this->next_subsequent_commit= waitee->subsequent_commits_list;
+ waitee->subsequent_commits_list= this;
+ }
+ mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}
+
+
+/*
+ Wait for commit of another transaction to complete, as already registered
+ with register_wait_for_prior_commit(). If the commit already completed,
+ returns immediately.
+*/
+void
+wait_for_commit::wait_for_prior_commit2()
+{
+ mysql_mutex_lock(&LOCK_wait_commit);
+ while (waiting_for_commit)
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ mysql_mutex_unlock(&LOCK_wait_commit);
+ waitee= NULL;
+}
+
+
+/*
+ Wakeup anyone waiting for us to have committed.
+
+ Note about locking:
+
+ We have a potential race or deadlock between wakeup_subsequent_commits() in
+ the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+ Both waiter and waitee needs to take their own lock before it is safe to take
+ a lock on the other party - else the other party might disappear and invalid
+ memory data could be accessed. But if we take the two locks in different
+ order, we may end up in a deadlock.
+
+ The waiter needs to lock the waitee to delete itself from the list in
+ unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+ hold its own lock while locking waiters, lest we deadlock.
+
+ So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+ is in progress - otherwise the unregister could complete before the wakeup,
+ leading to incorrect spurious wakeup or accessing invalid memory.
+
+ However, if we are in the middle of running wakeup_subsequent_commits(), then
+ there is no need for unregister_wait_for_prior_commit() in the first place -
+ the waiter can just do a normal wait_for_prior_commit(), as it will be
+ immediately woken up.
+
+ So the solution to the potential race/deadlock is to set a flag in the waitee
+ that wakeup_subsequent_commits() is in progress. When this flag is set,
+ unregister_wait_for_prior_commit() becomes just wait_for_prior_commit().
+
+ Then also register_wait_for_prior_commit() needs to check if
+ wakeup_subsequent_commits() is running, and skip the registration if
+ so. This is needed in case a new waiter manages to register itself and
+ immediately try to unregister while wakeup_subsequent_commits() is
+ running. Else the new waiter would also wait rather than unregister, but it
+ would not be woken up until next wakeup, which could be potentially much
+ later than necessary.
+*/
+void
+wait_for_commit::wakeup_subsequent_commits2()
+{
+ wait_for_commit *waiter;
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ wakeup_subsequent_commits_running= true;
+ waiter= subsequent_commits_list;
+ subsequent_commits_list= NULL;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+
+ while (waiter)
+ {
+ /*
+ Important: we must grab the next pointer before waking up the waiter;
+ once the wakeup is done, the field could be invalidated at any time.
+ */
+ wait_for_commit *next= waiter->next_subsequent_commit;
+
+ /*
+ We signal each waiter on their own condition and mutex (rather than using
+ pthread_cond_broadcast() or something like that).
+
+ Otherwise we would need to somehow ensure that they were done
+ waking up before we could allow this THD to be destroyed, which would
+ be annoying and unnecessary.
+ */
+ mysql_mutex_lock(&waiter->LOCK_wait_commit);
+ waiter->waiting_for_commit= false;
+ mysql_cond_signal(&waiter->COND_wait_commit);
+ mysql_mutex_unlock(&waiter->LOCK_wait_commit);
+
+ waiter= next;
+ }
+
+ mysql_mutex_lock(&LOCK_wait_commit);
+ wakeup_subsequent_commits_running= false;
+ mysql_mutex_unlock(&LOCK_wait_commit);
+}
+
+
+/* Cancel a previously registered wait for another THD to commit before us. */
+void
+wait_for_commit::unregister_wait_for_prior_commit2()
+{
+ mysql_mutex_lock(&LOCK_wait_commit);
+ if (waiting_for_commit)
+ {
+ wait_for_commit *loc_waitee= this->waitee;
+ wait_for_commit **next_ptr_ptr, *cur;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /*
+ When a wakeup is running, we cannot safely remove ourselves from the
+ list without corrupting it. Instead we can just wait, as wakeup is
+ already in progress and will thus be immediate.
+
+ See comments on wakeup_subsequent_commits2() for more details.
+ */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ while (waiting_for_commit)
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ }
+ else
+ {
+ /* Remove ourselves from the list in the waitee. */
+ next_ptr_ptr= &loc_waitee->subsequent_commits_list;
+ while ((cur= *next_ptr_ptr) != NULL)
+ {
+ if (cur == this)
+ {
+ *next_ptr_ptr= this->next_subsequent_commit;
+ break;
+ }
+ next_ptr_ptr= &cur->next_subsequent_commit;
+ }
+ waiting_for_commit= false;
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ }
+ }
+ mysql_mutex_unlock(&LOCK_wait_commit);
+ this->waitee= NULL;
+}
+
+
bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr)
{
=== modified file 'sql/sql_class.h'
--- sql/sql_class.h 2012-09-22 14:11:40 +0000
+++ sql/sql_class.h 2013-01-11 12:28:00 +0000
@@ -1518,6 +1518,104 @@ class Global_read_lock
};
+/*
+ Class to facilitate the commit of one transactions waiting for the commit of
+ another transaction to complete first.
+
+ This is used during (parallel) replication, to allow different transactions
+ to be applied in parallel, but still commit in order.
+
+ The transaction that wants to wait for a prior commit must first register
+ to wait with register_wait_for_prior_commit(waitee). Such registration
+ must be done holding the waitee->LOCK_wait_commit, to prevent the other
+ THD from disappearing during the registration.
+
+ Then during commit, if a THD is registered to wait, it will call
+ wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
+ registered, or if the waitee for has already completed commit, then
+ wait_for_prior_commit() returns immediately.
+
+ And when a THD that may be waited for has completed commit (more precisely
+ commit_ordered()), then it must call wakeup_subsequent_commits() to wake
+ up any waiters. Note that this must be done at a point that is guaranteed
+ to be later than any waiters registering themselves. It is safe to call
+ wakeup_subsequent_commits() multiple times, as waiters are removed from
+ registration as part of the wakeup.
+
+ The reason for separate register and wait calls is that this allows to
+ register the wait early, at a point where the waited-for THD is known to
+ exist. And then the actual wait can be done much later, where the
+ waited-for THD may have been long gone. By registering early, the waitee
+ can signal before disappearing.
+*/
+struct wait_for_commit
+{
+ /*
+ The LOCK_wait_commit protects the fields subsequent_commits_list and
+ wakeup_subsequent_commits_running (for a waitee), and the flag
+ waiting_for_commit and associated COND_wait_commit (for a waiter).
+ */
+ mysql_mutex_t LOCK_wait_commit;
+ mysql_cond_t COND_wait_commit;
+ /* List of threads that did register_wait_for_prior_commit() on us. */
+ wait_for_commit *subsequent_commits_list;
+ /* Link field for entries in subsequent_commits_list. */
+ wait_for_commit *next_subsequent_commit;
+ /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+ wait_for_commit *waitee;
+ /*
+ The waiting_for_commit flag is cleared when a waiter has been woken
+ up. The COND_wait_commit condition is signalled when this has been
+ cleared.
+ */
+ bool waiting_for_commit;
+ /*
+ Flag set when wakeup_subsequent_commits_running() is active, see commonts
+ on that function for details.
+ */
+ bool wakeup_subsequent_commits_running;
+
+ void register_wait_for_prior_commit(wait_for_commit *waitee);
+ void wait_for_prior_commit()
+ {
+ /*
+ Quick inline check, to avoid function call and locking in the common case
+ where no wakeup is registered, or a registered wait was already signalled.
+ */
+ if (waiting_for_commit)
+ wait_for_prior_commit2();
+ }
+ void wakeup_subsequent_commits()
+ {
+ /*
+ Do the check inline, so only the wakeup case takes the cost of a function
+ call for every commmit.
+
+ Note that the check is done without locking. It is the responsibility of
+ the user of the wakeup facility to ensure that no waiters can register
+ themselves after the last call to wakeup_subsequent_commits().
+
+ This avoids having to take another lock for every commit, which would be
+ pointless anyway - even if we check under lock, there is nothing to
+ prevent a waiter from arriving just after releasing the lock.
+ */
+ if (subsequent_commits_list)
+ wakeup_subsequent_commits2();
+ }
+ void unregister_wait_for_prior_commit()
+ {
+ if (waiting_for_commit)
+ unregister_wait_for_prior_commit2();
+ }
+
+ void wait_for_prior_commit2();
+ void wakeup_subsequent_commits2();
+ void unregister_wait_for_prior_commit2();
+
+ wait_for_commit();
+};
+
+
extern "C" void my_message_sql(uint error, const char *str, myf MyFlags);
/**
@@ -3095,6 +3193,19 @@ class THD :public Statement,
void wait_for_wakeup_ready();
/* Wake this thread up from wait_for_wakeup_ready(). */
void signal_wakeup_ready();
+
+ wait_for_commit *wait_for_commit_ptr;
+ void wait_for_prior_commit()
+ {
+ if (wait_for_commit_ptr)
+ wait_for_commit_ptr->wait_for_prior_commit();
+ }
+ void wakeup_subsequent_commits()
+ {
+ if (wait_for_commit_ptr)
+ wait_for_commit_ptr->wakeup_subsequent_commits();
+ }
+
private:
/** The current internal error handler for this thread, or NULL. */
Follow ups
References