maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #03177
bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2849)
#At lp:maria
2849 knielsen@xxxxxxxxxxxxxxx 2010-05-26
Preliminary commit of group commit patch.
added:
mysql-test/suite/binlog/r/binlog_ioerr.result
mysql-test/suite/binlog/t/binlog_ioerr.test
modified:
sql/handler.cc
sql/handler.h
sql/log.cc
sql/log.h
sql/log_event.h
sql/sql_class.cc
sql/sql_class.h
sql/sql_load.cc
sql/table.cc
sql/table.h
storage/xtradb/handler/ha_innodb.cc
=== added file 'mysql-test/suite/binlog/r/binlog_ioerr.result'
--- a/mysql-test/suite/binlog/r/binlog_ioerr.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/binlog/r/binlog_ioerr.result 2010-05-26 08:16:18 +0000
@@ -0,0 +1,28 @@
+CALL mtr.add_suppression("Error writing file 'master-bin'");
+RESET MASTER;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb;
+INSERT INTO t1 VALUES(0);
+SET SESSION debug='+d,fail_binlog_write_1';
+INSERT INTO t1 VALUES(1);
+ERROR HY000: Error writing file 'master-bin' (errno: 22)
+INSERT INTO t1 VALUES(2);
+ERROR HY000: Error writing file 'master-bin' (errno: 22)
+SET SESSION debug='';
+INSERT INTO t1 VALUES(3);
+SELECT * FROM t1;
+a
+0
+3
+SHOW BINLOG EVENTS;
+Log_name Pos Event_type Server_id End_log_pos Info
+BINLOG POS Format_desc 1 ENDPOS Server ver: #, Binlog ver: #
+BINLOG POS Query 1 ENDPOS use `test`; CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb
+BINLOG POS Query 1 ENDPOS BEGIN
+BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(0)
+BINLOG POS Xid 1 ENDPOS COMMIT /* XID */
+BINLOG POS Query 1 ENDPOS BEGIN
+BINLOG POS Query 1 ENDPOS BEGIN
+BINLOG POS Query 1 ENDPOS BEGIN
+BINLOG POS Query 1 ENDPOS use `test`; INSERT INTO t1 VALUES(3)
+BINLOG POS Xid 1 ENDPOS COMMIT /* XID */
+DROP TABLE t1;
=== added file 'mysql-test/suite/binlog/t/binlog_ioerr.test'
--- a/mysql-test/suite/binlog/t/binlog_ioerr.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/binlog/t/binlog_ioerr.test 2010-05-26 08:16:18 +0000
@@ -0,0 +1,29 @@
+source include/have_debug.inc;
+source include/have_innodb.inc;
+source include/have_log_bin.inc;
+source include/have_binlog_format_mixed_or_statement.inc;
+
+CALL mtr.add_suppression("Error writing file 'master-bin'");
+
+RESET MASTER;
+
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=innodb;
+INSERT INTO t1 VALUES(0);
+SET SESSION debug='+d,fail_binlog_write_1';
+--error ER_ERROR_ON_WRITE
+INSERT INTO t1 VALUES(1);
+--error ER_ERROR_ON_WRITE
+INSERT INTO t1 VALUES(2);
+SET SESSION debug='';
+INSERT INTO t1 VALUES(3);
+SELECT * FROM t1;
+
+# Actually the output from this currently shows a bug.
+# The injected IO error leaves partially written transactions in the binlog in
+# the form of stray "BEGIN" events.
+# These should disappear from the output if binlog error handling is improved.
+--replace_regex /\/\* xid=.* \*\//\/* XID *\// /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/
+--replace_column 1 BINLOG 2 POS 5 ENDPOS
+SHOW BINLOG EVENTS;
+
+DROP TABLE t1;
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2010-04-06 22:47:08 +0000
+++ b/sql/handler.cc 2010-05-26 08:16:18 +0000
@@ -76,6 +76,7 @@ TYPELIB tx_isolation_typelib= {array_ele
static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
uint known_extensions_id= 0;
+static int commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered);
static plugin_ref ha_default_plugin(THD *thd)
@@ -544,6 +545,26 @@ err:
DBUG_RETURN(1);
}
+/*
+ This is a queue of THDs waiting for being group committed with
+ tc_log->group_log_xid().
+*/
+static THD *group_commit_queue;
+/*
+ This mutex protects the group_commit_queue on platforms without native
+ atomic operations.
+ */
+static pthread_mutex_t LOCK_group_commit_queue;
+/* This mutex is used to serialize calls to handler prepare_ordered methods. */
+static pthread_mutex_t LOCK_prepare_ordered;
+/* This mutex is used to serialize calls to handler commit_ordered methods. */
+static pthread_mutex_t LOCK_commit_ordered;
+/* This mutex is used to serialize calls to group_log_xid(). */
+static pthread_mutex_t LOCK_group_commit;
+static pthread_cond_t COND_group_commit;
+
+static bool mutexes_inited= FALSE;
+
int ha_init()
{
int error= 0;
@@ -557,6 +578,19 @@ int ha_init()
*/
opt_using_transactions= total_ha>(ulong)opt_bin_log;
savepoint_alloc_size+= sizeof(SAVEPOINT);
+
+ group_commit_queue= NULL;
+ my_pthread_mutex_init(&LOCK_group_commit_queue, MY_MUTEX_INIT_FAST,
+ "LOCK_group_commit_queue", MYF(0));
+ my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW,
+ "LOCK_prepare_ordered", MYF(0));
+ my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW,
+ "LOCK_commit_ordered", MYF(0));
+ my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
+ "LOCK_group_commit", MYF(0));
+ pthread_cond_init(&COND_group_commit, 0);
+ mutexes_inited= TRUE;
+
DBUG_RETURN(error);
}
@@ -574,6 +608,15 @@ int ha_end()
if (ha_finish_errors())
error= 1;
+ if (mutexes_inited)
+ {
+ pthread_mutex_destroy(&LOCK_group_commit_queue);
+ pthread_mutex_destroy(&LOCK_prepare_ordered);
+ pthread_mutex_destroy(&LOCK_commit_ordered);
+ pthread_mutex_destroy(&LOCK_group_commit);
+ mutexes_inited= FALSE;
+ }
+
DBUG_RETURN(error);
}
@@ -1053,6 +1096,108 @@ ha_check_and_coalesce_trx_read_only(THD
return rw_ha_count;
}
+/*
+ Atomically enqueue a THD at the head of the queue of threads waiting to
+ group commit, and return the previous head of the queue.
+*/
+static THD *
+enqueue_atomic(THD *thd)
+{
+ my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+ thd->next_commit_ordered= group_commit_queue;
+ while (!my_atomic_casptr((void **)(&group_commit_queue),
+ (void **)(&thd->next_commit_ordered),
+ thd))
+ ;
+ my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
+ return thd->next_commit_ordered;
+}
+
+static THD *
+atomic_grab_reverse_queue()
+{
+ my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+ THD *queue= group_commit_queue;
+ while (!my_atomic_casptr((void **)(&group_commit_queue),
+ (void **)(&queue),
+ NULL))
+ ;
+ my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
+
+ /*
+ Since we enqueue at the head, the queue is actually in reverse order.
+ So reverse it back into correct commit order before returning.
+ */
+ THD *prev= NULL;
+ while (queue)
+ {
+ THD *next= queue->next_commit_ordered;
+ queue->next_commit_ordered= prev;
+ prev= queue;
+ queue= next;
+ }
+
+ return prev;
+}
+
+static void
+call_commit_ordered(Ha_trx_info *ha_info, THD *thd, bool all)
+{
+ for (; ha_info; ha_info= ha_info->next())
+ {
+ handlerton *ht= ha_info->ht();
+ if (!ht->commit_ordered)
+ continue;
+ ht->commit_ordered(ht, thd, all);
+ }
+}
+
+static void
+group_commit_wait_for_wakeup(THD *thd)
+{
+ pthread_mutex_lock(&thd->LOCK_commit_ordered);
+ while (!thd->group_commit_ready)
+ pthread_cond_wait(&thd->COND_commit_ordered,
+ &thd->LOCK_commit_ordered);
+ pthread_mutex_unlock(&thd->LOCK_commit_ordered);
+}
+
+static void
+group_commit_wakeup_other(THD *other_thd)
+{
+ pthread_mutex_lock(&other_thd->LOCK_commit_ordered);
+ other_thd->group_commit_ready= TRUE;
+ pthread_cond_signal(&other_thd->COND_commit_ordered);
+ pthread_mutex_unlock(&other_thd->LOCK_commit_ordered);
+}
+
+static bool group_commit_queue_busy= 0;
+
+static void
+group_commit_mark_queue_idle()
+{
+ pthread_mutex_lock(&LOCK_group_commit);
+ group_commit_queue_busy= FALSE;
+ pthread_cond_signal(&COND_group_commit);
+ pthread_mutex_unlock(&LOCK_group_commit);
+}
+
+static void
+group_commit_mark_queue_busy()
+{
+ safe_mutex_assert_owner(&LOCK_group_commit);
+ group_commit_queue_busy= TRUE;
+}
+
+static void
+group_commit_wait_queue_idle()
+{
+ /* Wait for any existing queue run to finish. */
+ safe_mutex_assert_owner(&LOCK_group_commit);
+ while (group_commit_queue_busy)
+ pthread_cond_wait(&COND_group_commit, &LOCK_group_commit);
+}
+
/**
@retval
@@ -1070,7 +1215,7 @@ ha_check_and_coalesce_trx_read_only(THD
*/
int ha_commit_trans(THD *thd, bool all)
{
- int error= 0, cookie= 0;
+ int error= 0;
/*
'all' means that this is either an explicit commit issued by
user, or an implicit commit issued by a DDL.
@@ -1085,7 +1230,10 @@ int ha_commit_trans(THD *thd, bool all)
*/
bool is_real_trans= all || thd->transaction.all.ha_list == 0;
Ha_trx_info *ha_info= trans->ha_list;
- my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
+ bool need_prepare_ordered, need_commit_ordered;
+ bool need_enqueue;
+ bool is_group_commit_leader;
+ my_xid xid;
DBUG_ENTER("ha_commit_trans");
/*
@@ -1118,85 +1266,277 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_RETURN(2);
}
#ifdef USING_TRANSACTIONS
- if (ha_info)
+ if (!ha_info)
{
- uint rw_ha_count;
- bool rw_trans;
+ /* Free resources and perform other cleanup even for 'empty' transactions. */
+ if (is_real_trans)
+ thd->transaction.cleanup();
+ DBUG_RETURN(0);
+ }
- DBUG_EXECUTE_IF("crash_commit_before", abort(););
+ DBUG_EXECUTE_IF("crash_commit_before", abort(););
- /* Close all cursors that can not survive COMMIT */
- if (is_real_trans) /* not a statement commit */
- thd->stmt_map.close_transient_cursors();
+ /* Close all cursors that can not survive COMMIT */
+ if (is_real_trans) /* not a statement commit */
+ thd->stmt_map.close_transient_cursors();
- rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
- /* rw_trans is TRUE when we in a transaction changing data */
- rw_trans= is_real_trans && (rw_ha_count > 0);
+ uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
+ /* rw_trans is TRUE when we in a transaction changing data */
+ bool rw_trans= is_real_trans && (rw_ha_count > 0);
- if (rw_trans &&
- wait_if_global_read_lock(thd, 0, 0))
- {
- ha_rollback_trans(thd, all);
- DBUG_RETURN(1);
- }
+ if (rw_trans &&
+ wait_if_global_read_lock(thd, 0, 0))
+ {
+ ha_rollback_trans(thd, all);
+ DBUG_RETURN(1);
+ }
+
+ if (rw_trans &&
+ opt_readonly &&
+ !(thd->security_ctx->master_access & SUPER_ACL) &&
+ !thd->slave_thread)
+ {
+ my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
+ goto err;
+ }
- if (rw_trans &&
- opt_readonly &&
- !(thd->security_ctx->master_access & SUPER_ACL) &&
- !thd->slave_thread)
+ if (trans->no_2pc || (rw_ha_count <= 1))
+ {
+ error= ha_commit_one_phase(thd, all);
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+ goto end;
+ }
+
+ need_prepare_ordered= FALSE;
+ need_commit_ordered= FALSE;
+ xid= thd->transaction.xid_state.xid.get_my_xid();
+
+ for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
+ {
+ int err;
+ handlerton *ht= hi->ht();
+ /*
+ Do not call two-phase commit if this particular
+ transaction is read-only. This allows for simpler
+ implementation in engines that are always read-only.
+ */
+ if (! hi->is_trx_read_write())
+ continue;
+ /*
+ Sic: we know that prepare() is not NULL since otherwise
+ trans->no_2pc would have been set.
+ */
+ if ((err= ht->prepare(ht, thd, all)))
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+ status_var_increment(thd->status_var.ha_prepare_count);
+
+ if (err)
+ goto err;
+
+ if (ht->prepare_ordered)
+ need_prepare_ordered= TRUE;
+ if (ht->commit_ordered)
+ need_commit_ordered= TRUE;
+ }
+ DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT(););
+
+ if (!is_real_trans)
+ {
+ error= commit_one_phase_2(thd, all, FALSE);
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+ goto end;
+ }
+
+ /*
+ We can optimise away some of the thread synchronisation that may not be
+ needed.
+
+ If need_prepare_ordered, then we need to take LOCK_prepare_ordered.
+
+ If (xid && use_group_log_xid), then we need to enqueue (and this must
+ be done under LOCK_prepare_ordered if we take that lock).
+
+ Similarly, if (need_prepare_ordered && need_commit_ordered), then we
+ need to enqueue under the LOCK_prepare_ordered.
+
+ If (xid && use_group_log_xid), then we need to take LOCK_group_commit.
+
+ If need_commit_ordered, then we need to take LOCK_commit_ordered.
+
+ Cases not covered by above can be skipped to optimise things a bit.
+ */
+ need_enqueue= (xid && tc_log->use_group_log_xid) ||
+ (need_prepare_ordered && need_commit_ordered);
+
+ thd->group_commit_ready= FALSE;
+ thd->group_commit_all= all;
+ if (need_prepare_ordered)
+ {
+ pthread_mutex_lock(&LOCK_prepare_ordered);
+
+ for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
{
- my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
- ha_rollback_trans(thd, all);
- error= 1;
- goto end;
+ int err;
+ handlerton *ht= hi->ht();
+ if (! hi->is_trx_read_write())
+ continue;
+ if (ht->prepare_ordered && (err= ht->prepare_ordered(ht, thd, all)))
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
+ goto err;
+ }
}
+ }
+ if (need_enqueue)
+ {
+ THD *previous_queue= enqueue_atomic(thd);
+ is_group_commit_leader= (previous_queue == NULL);
+ }
+ if (need_prepare_ordered)
+ pthread_mutex_unlock(&LOCK_prepare_ordered);
- if (!trans->no_2pc && (rw_ha_count > 1))
+ int cookie;
+ if (tc_log->use_group_log_xid)
+ {
+ if (is_group_commit_leader)
{
- for (; ha_info && !error; ha_info= ha_info->next())
+ pthread_mutex_lock(&LOCK_group_commit);
+ group_commit_wait_queue_idle();
+
+ THD *queue= atomic_grab_reverse_queue();
+ /* The first in the queue is the leader. */
+ DBUG_ASSERT(queue == thd);
+
+ /*
+ This will set individual error codes in each thd->xid_error, as
+ well as set thd->xid_cookie for later unlog() call.
+ */
+ tc_log->group_log_xid(queue);
+
+ /*
+ Call commit_ordered methods for all transactions in the queue
+ (that did not get an error in group_log_xid()).
+
+ We do this under an additional global LOCK_commit_ordered; this is
+ so that transactions that do not need 2-phase commit do not have
+ to wait for the potentially long duration of LOCK_group_commit.
+ */
+ if (need_commit_ordered)
{
- int err;
- handlerton *ht= ha_info->ht();
- /*
- Do not call two-phase commit if this particular
- transaction is read-only. This allows for simpler
- implementation in engines that are always read-only.
- */
- if (! ha_info->is_trx_read_write())
- continue;
- /*
- Sic: we know that prepare() is not NULL since otherwise
- trans->no_2pc would have been set.
- */
- if ((err= ht->prepare(ht, thd, all)))
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ for (THD *thd2= queue; thd2 != NULL; thd2= thd2->next_commit_ordered)
{
- my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
- error= 1;
+ if (!queue->xid_error)
+ call_commit_ordered(ha_info, thd2, thd2->group_commit_all);
}
- status_var_increment(thd->status_var.ha_prepare_count);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
}
- DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT(););
- if (error || (is_real_trans && xid &&
- (error= !(cookie= tc_log->log_xid(thd, xid)))))
- {
- ha_rollback_trans(thd, all);
- error= 1;
- goto end;
- }
- DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT(););
+ pthread_mutex_unlock(&LOCK_group_commit);
+
+ /* Wake up everyone except ourself. */
+ while ((queue= queue->next_commit_ordered) != NULL)
+ group_commit_wakeup_other(queue);
+ }
+ else
+ {
+ /* If not leader, just wait until leader wakes us up. */
+ group_commit_wait_for_wakeup(thd);
+ }
+
+ /*
+ Now that we're back in our own thread context, do any delayed error
+ reporting.
+ */
+ if (thd->xid_error)
+ {
+ tc_log->xid_delayed_error(thd);
+ goto err;
+ }
+ cookie= thd->xid_cookie;
+ /* The cookie must be non-zero in the non-error case. */
+ DBUG_ASSERT(cookie);
+ }
+ else
+ {
+ if (xid)
+ cookie= tc_log->log_xid(thd, xid);
+
+ if (!need_enqueue)
+ {
+ error= commit_one_phase_2(thd, all, TRUE);
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+ goto end;
+ }
+
+ /*
+ We only get here to do correctly sequenced prepare_ordered and
+ commit_ordered() calls.
+
+ In this case, we need to wait for the previous in queue to finish
+ commit_ordered before us to get the correct sequence.
+ */
+ DBUG_ASSERT(need_prepare_ordered && need_commit_ordered);
+
+ if (is_group_commit_leader)
+ {
+ pthread_mutex_lock(&LOCK_group_commit);
+ group_commit_wait_queue_idle();
+ THD *queue= atomic_grab_reverse_queue();
+ /*
+ Mark the queue busy while we bounce it from one thread to the
+ next.
+ */
+ group_commit_mark_queue_busy();
+ pthread_mutex_unlock(&LOCK_group_commit);
+
+ /* The first in the queue is the leader. */
+ DBUG_ASSERT(queue == thd);
}
- error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0;
- DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT(););
+ else
+ {
+ /* If not leader, just wait until previous thread wakes us up. */
+ group_commit_wait_for_wakeup(thd);
+ }
+
+ /* Only run commit_ordered() if log_xid was successful. */
if (cookie)
- tc_log->unlog(cookie, xid);
- DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
-end:
- if (rw_trans)
- start_waiting_global_read_lock(thd);
+ {
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ call_commit_ordered(ha_info, thd, all);
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+ }
+
+ THD *next= thd->next_commit_ordered;
+ if (next)
+ group_commit_wakeup_other(next);
+ else
+ group_commit_mark_queue_idle();
+
+ if (!cookie)
+ goto err;
}
- /* Free resources and perform other cleanup even for 'empty' transactions. */
- else if (is_real_trans)
- thd->transaction.cleanup();
+
+ DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT(););
+
+ error= commit_one_phase_2(thd, all, FALSE) ? 2 : 0;
+
+ DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT(););
+ DBUG_ASSERT(cookie);
+ tc_log->unlog(cookie, xid);
+
+ DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+ goto end;
+
+ /* Come here if error and we need to rollback. */
+err:
+ if (!error)
+ error= 1;
+ ha_rollback_trans(thd, all);
+
+end:
+ if (rw_trans)
+ start_waiting_global_read_lock(thd);
#endif /* USING_TRANSACTIONS */
DBUG_RETURN(error);
}
@@ -1207,6 +1547,17 @@ end:
*/
int ha_commit_one_phase(THD *thd, bool all)
{
+ /*
+ When we come here, we did not call handler commit_ordered() methods in
+ ha_commit_trans() 2-phase commit, so we pass TRUE to do it in
+ commit_one_phase_2().
+ */
+ return commit_one_phase_2(thd, all, TRUE);
+}
+
+static int
+commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered)
+{
int error=0;
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
/*
@@ -1218,10 +1569,40 @@ int ha_commit_one_phase(THD *thd, bool a
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
- DBUG_ENTER("ha_commit_one_phase");
+ DBUG_ENTER("commit_one_phase_2");
#ifdef USING_TRANSACTIONS
if (ha_info)
{
+ if (is_real_trans && do_commit_ordered)
+ {
+ /*
+ If we did not do it already, call any commit_ordered() method.
+
+ Even though we do not need to keep any ordering with other threads
+ (as there is no prepare or log_xid for this commit), we still need to
+ do this under the LOCK_commit_ordered mutex to not run in parallel
+ with other commit_ordered calls.
+ */
+
+ bool locked= FALSE;
+
+ for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
+ {
+ handlerton *ht= hi->ht();
+ if (ht->commit_ordered)
+ {
+ if (!locked)
+ {
+ pthread_mutex_lock(&LOCK_commit_ordered);
+ locked= 1;
+ }
+ ht->commit_ordered(ht, thd, all);
+ }
+ }
+ if (locked)
+ pthread_mutex_unlock(&LOCK_commit_ordered);
+ }
+
for (; ha_info; ha_info= ha_info_next)
{
int err;
=== modified file 'sql/handler.h'
--- a/sql/handler.h 2010-01-14 16:51:00 +0000
+++ b/sql/handler.h 2010-05-26 08:16:18 +0000
@@ -656,9 +656,81 @@ struct handlerton
NOTE 'all' is also false in auto-commit mode where 'end of statement'
and 'real commit' mean the same event.
*/
- int (*commit)(handlerton *hton, THD *thd, bool all);
+ int (*commit)(handlerton *hton, THD *thd, bool all);
+ /*
+ The commit_ordered() method is called prior to the commit() method, after
+ the transaction manager has decided to commit (not rollback) the
+ transaction.
+
+ The calls to commit_ordered() in multiple parallel transactions is
+ guaranteed to happen in the same order in every participating
+ handler. This can be used to ensure the same commit order among multiple
+ handlers (eg. in table handler and binlog). So if transaction T1 calls
+ into commit_ordered() of handler A before T2, then T1 will also call
+ commit_ordered() of handler B before T2.
+
+ The intension is that commit_ordered() should do the minimal amount of
+ work that needs to happen in consistent commit order among handlers. To
+ preserve ordering, calls need to be serialised on a global mutex, so
+ doing any time-consuming or blocking operations in commit_ordered() will
+ limit scalability.
+
+ Handlers can rely on commit_ordered() calls being serialised (no two
+ calls can run in parallel, so no extra locking on the handler part is
+ required to ensure this).
+
+ Note that commit_ordered() can be called from a different thread than the
+ one handling the transaction! So it can not do anything that depends on
+ thread local storage, in particular it can not call my_error() and
+ friends (instead it can store the error code and delay the call to
+ my_error() to the commit() method).
+
+ Similarly, since commit_ordered() returns void, any return error code
+ must be saved and returned from the commit() method instead.
+
+ commit_ordered() is called only when actually committing a transaction
+ (autocommit or not), not when ending a statement in the middle of a
+ transaction.
+
+ The commit_ordered method is optional, and can be left unset if not
+ needed in a particular handler.
+ */
+ void (*commit_ordered)(handlerton *hton, THD *thd, bool all);
int (*rollback)(handlerton *hton, THD *thd, bool all);
int (*prepare)(handlerton *hton, THD *thd, bool all);
+ /*
+ The prepare_ordered method is optional. If set, it will be called after
+ successful prepare() in all handlers participating in 2-phase commit.
+
+ The calls to prepare_ordered() among multiple parallel transactions are
+ ordered consistently with calls to commit_ordered(). This means that
+ calls to prepare_ordered() effectively define the commit order, and that
+ each handler will see the same sequence of transactions calling into
+ prepare_ordered() and commit_ordered().
+
+ Thus, prepare_ordered() can be used to define commit order for handlers
+ that need to do this in the prepare step (like binlog). It can also be
+ used to release transactions locks early in an order consistent with the
+ order transactions will be eventually committed.
+
+ Like commit_ordered(), prepare_ordered() calls are serialised to maintain
+ ordering, so the intension is that they should execute fast, with only
+ the minimal amount of work needed to define commit order. Handlers can
+ rely on this serialisation, and do not need to do any extra locking to
+ avoid two prepare_ordered() calls running in parallel.
+
+ Unlike commit_ordered(), prepare_ordered() _is_ guaranteed to be called
+ in the context of the thread handling the rest of the transaction.
+
+ Note that for user-level XA SQL commands, no consistent ordering among
+ prepare_ordered() and commit_ordered() is guaranteed (as that would
+ require blocking all other commits for an indefinite time).
+
+ prepare_ordered() is called only when actually committing a transaction
+ (autocommit or not), not when ending a statement in the middle of a
+ transaction.
+ */
+ int (*prepare_ordered)(handlerton *hton, THD *thd, bool all);
int (*recover)(handlerton *hton, XID *xid_list, uint len);
int (*commit_by_xid)(handlerton *hton, XID *xid);
int (*rollback_by_xid)(handlerton *hton, XID *xid);
=== modified file 'sql/log.cc'
--- a/sql/log.cc 2010-04-06 22:47:08 +0000
+++ b/sql/log.cc 2010-05-26 08:16:18 +0000
@@ -154,9 +154,12 @@ class binlog_trx_data {
public:
binlog_trx_data()
: at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
- before_stmt_pos(MY_OFF_T_UNDEF)
+ before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0)
{
trans_log.end_of_file= max_binlog_cache_size;
+ (void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
+ "LOCK_group_commit", MYF(0));
+ (void) pthread_cond_init(&COND_group_commit, 0);
}
~binlog_trx_data()
@@ -208,11 +211,12 @@ public:
completely.
*/
void reset() {
- if (!empty())
+ if (trans_log.type != WRITE_CACHE || !empty())
truncate(0);
before_stmt_pos= MY_OFF_T_UNDEF;
incident= FALSE;
trans_log.end_of_file= max_binlog_cache_size;
+ using_xa= FALSE;
DBUG_ASSERT(empty());
}
@@ -257,6 +261,41 @@ public:
Binlog position before the start of the current statement.
*/
my_off_t before_stmt_pos;
+
+ /* 0 or error when writing to binlog; set during group commit. */
+ int error;
+ /* If error != 0, value of errno (for my_error() reporting). */
+ int commit_errno;
+ /* Link for queueing transactions up for group commit to binlog. */
+ binlog_trx_data *next;
+ /*
+ Flag set true when group commit for this transaction is finished; used
+ with pthread_cond_wait() to wait until commit is done.
+ This flag is protected by LOCK_group_commit.
+ */
+ bool done;
+ /*
+ Flag set if this transaction is the group commit leader that will handle
+ the actual writing to the binlog.
+ This flag is protected by LOCK_group_commit.
+ */
+ bool group_commit_leader;
+ /*
+ Flag set true if this transaction is committed with log_xid() as part of
+ XA, false if not.
+ */
+ bool using_xa;
+ /*
+ Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
+ written during group commit. The incident_event is only valid if
+ has_incident() is true.
+ */
+ Log_event *begin_event;
+ Log_event *end_event;
+ Log_event *incident_event;
+ /* Mutex and condition for wakeup after group commit. */
+ pthread_mutex_t LOCK_group_commit;
+ pthread_cond_t COND_group_commit;
};
handlerton *binlog_hton;
@@ -1391,117 +1430,188 @@ static int binlog_close_connection(handl
return 0;
}
+/* Helper functions for binlog_flush_trx_cache(). */
+static int
+binlog_flush_trx_cache_prepare(THD *thd)
+{
+ if (thd->binlog_flush_pending_rows_event(TRUE))
+ return 1;
+ return 0;
+}
+
+static void
+binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
+{
+ IO_CACHE *trans_log= &trx_data->trans_log;
+
+ trx_data->reset();
+
+ statistic_increment(binlog_cache_use, &LOCK_status);
+ if (trans_log->disk_writes != 0)
+ {
+ statistic_increment(binlog_cache_disk_use, &LOCK_status);
+ trans_log->disk_writes= 0;
+ }
+}
+
/*
- End a transaction.
+ End a transaction, writing events to the binary log.
SYNOPSIS
- binlog_end_trans()
+ binlog_flush_trx_cache()
thd The thread whose transaction should be ended
trx_data Pointer to the transaction data to use
- end_ev The end event to use, or NULL
- all True if the entire transaction should be ended, false if
- only the statement transaction should be ended.
+ end_ev The end event to use (COMMIT, ROLLBACK, or commit XID)
DESCRIPTION
End the currently open transaction. The transaction can be either
- a real transaction (if 'all' is true) or a statement transaction
- (if 'all' is false).
+ a real transaction or a statement transaction.
- If 'end_ev' is NULL, the transaction is a rollback of only
- transactional tables, so the transaction cache will be truncated
- to either just before the last opened statement transaction (if
- 'all' is false), or reset completely (if 'all' is true).
+ This can be to commit a transaction, with a COMMIT query event or an XA
+ commit XID event. But it can also be to rollback a transaction with a
+ ROLLBACK query event, used for rolling back transactions which also
+ contain updates to non-transactional tables.
*/
static int
-binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
- Log_event *end_ev, bool all)
+binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev)
{
- DBUG_ENTER("binlog_end_trans");
- int error=0;
- IO_CACHE *trans_log= &trx_data->trans_log;
- DBUG_PRINT("enter", ("transaction: %s end_ev: 0x%lx",
- all ? "all" : "stmt", (long) end_ev));
+ DBUG_ENTER("binlog_flush_trx_cache");
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
+ if (binlog_flush_trx_cache_prepare(thd))
+ DBUG_RETURN(1);
+
/*
- NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
- only transactional tables. If the transaction contain changes to
- any non-transactiona tables, we need write the transaction and log
- a ROLLBACK last.
- */
- if (end_ev != NULL)
- {
- if (thd->binlog_flush_pending_rows_event(TRUE))
- DBUG_RETURN(1);
- /*
- Doing a commit or a rollback including non-transactional tables,
- i.e., ending a transaction where we might write the transaction
- cache to the binary log.
-
- We can always end the statement when ending a transaction since
- transactions are not allowed inside stored functions. If they
- were, we would have to ensure that we're not ending a statement
- inside a stored function.
- */
- error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev,
- trx_data->has_incident());
- trx_data->reset();
+ Doing a commit or a rollback including non-transactional tables,
+ i.e., ending a transaction where we might write the transaction
+ cache to the binary log.
+
+ We can always end the statement when ending a transaction since
+ transactions are not allowed inside stored functions. If they
+ were, we would have to ensure that we're not ending a statement
+ inside a stored function.
+ */
+ int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev);
- /*
- We need to step the table map version after writing the
- transaction cache to disk.
- */
- mysql_bin_log.update_table_map_version();
- statistic_increment(binlog_cache_use, &LOCK_status);
- if (trans_log->disk_writes != 0)
- {
- statistic_increment(binlog_cache_disk_use, &LOCK_status);
- trans_log->disk_writes= 0;
- }
- }
- else
- {
- /*
- If rolling back an entire transaction or a single statement not
- inside a transaction, we reset the transaction cache.
+ binlog_flush_trx_cache_finish(thd, trx_data);
- If rolling back a statement in a transaction, we truncate the
- transaction cache to remove the statement.
- */
- thd->binlog_remove_pending_rows_event(TRUE);
- if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
- {
- if (trx_data->has_incident())
- error= mysql_bin_log.write_incident(thd, TRUE);
- trx_data->reset();
- }
- else // ...statement
- trx_data->truncate(trx_data->before_stmt_pos);
+ DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
+ DBUG_RETURN(error);
+}
- /*
- We need to step the table map version on a rollback to ensure
- that a new table map event is generated instead of the one that
- was written to the thrown-away transaction cache.
- */
- mysql_bin_log.update_table_map_version();
+/*
+ Discard a transaction, ie. ROLLBACK with only transactional table updates.
+
+ SYNOPSIS
+ binlog_truncate_trx_cache()
+
+ thd The thread whose transaction should be ended
+ trx_data Pointer to the transaction data to use
+ all True if the entire transaction should be ended, false if
+ only the statement transaction should be ended.
+
+ DESCRIPTION
+
+ Rollback (and end) a transaction that only modifies transactional
+ tables. The transaction can be either a real transaction (if 'all' is
+ true) or a statement transaction (if 'all' is false).
+
+ The transaction cache will be truncated to either just before the last
+ opened statement transaction (if 'all' is false), or reset completely (if
+ 'all' is true).
+ */
+static int
+binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all)
+{
+ DBUG_ENTER("binlog_truncate_trx_cache");
+ int error= 0;
+ DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt"));
+ DBUG_PRINT("info", ("thd->options={ %s%s}",
+ FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
+ FLAGSTR(thd->options, OPTION_BEGIN)));
+
+ /*
+ ROLLBACK with nothing to replicate: i.e., rollback of only transactional
+ tables.
+ */
+
+ /*
+ If rolling back an entire transaction or a single statement not
+ inside a transaction, we reset the transaction cache.
+
+ If rolling back a statement in a transaction, we truncate the
+ transaction cache to remove the statement.
+ */
+ thd->binlog_remove_pending_rows_event(TRUE);
+ if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+ {
+ if (trx_data->has_incident())
+ error= mysql_bin_log.write_incident(thd);
+ trx_data->reset();
}
+ else // ...statement
+ trx_data->truncate(trx_data->before_stmt_pos);
DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
DBUG_RETURN(error);
}
+static LEX_STRING const write_error_msg=
+ { C_STRING_WITH_LEN("error writing to the binary log") };
+
static int binlog_prepare(handlerton *hton, THD *thd, bool all)
{
/*
- do nothing.
- just pretend we can do 2pc, so that MySQL won't
- switch to 1pc.
- real work will be done in MYSQL_BIN_LOG::log_xid()
+ If this prepare is for a single statement in the middle of a transactions,
+ not the actual transaction commit, then we do nothing. The real work is
+ only done later, in the prepare for making persistent changes.
*/
+ if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+ return 0;
+
+ binlog_trx_data *trx_data=
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+ trx_data->using_xa= TRUE;
+
+ if (binlog_flush_trx_cache_prepare(thd))
+ return 1;
+
+ my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
+ if (!xid)
+ {
+ /* Skip logging this transaction, marked by setting end_event to NULL. */
+ trx_data->end_event= NULL;
+ return 0;
+ }
+
+ /*
+ Allocate the extra events that will be logged to the binlog in binlog group
+ commit. Use placement new to allocate them on the THD memroot, as they need
+ to remain live until log_xid() returns.
+ */
+ size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event);
+ if (trx_data->has_incident())
+ needed_size+= sizeof(Incident_log_event);
+ uchar *mem= (uchar *)thd->alloc(needed_size);
+ if (!mem)
+ return 1;
+
+ trx_data->begin_event= new ((void *)mem)
+ Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ mem+= sizeof(Query_log_event);
+
+ trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid);
+
+ if (trx_data->has_incident())
+ trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event)))
+ Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+
return 0;
}
@@ -1525,11 +1635,11 @@ static int binlog_commit(handlerton *hto
binlog_trx_data *const trx_data=
(binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
- if (trx_data->empty())
+ if (trx_data->using_xa)
{
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
- trx_data->reset();
- DBUG_RETURN(0);
+ binlog_flush_trx_cache_finish(thd, trx_data);
+ DBUG_RETURN(error);
}
/*
@@ -1556,8 +1666,8 @@ static int binlog_commit(handlerton *hto
!stmt_has_updated_trans_table(thd) &&
thd->transaction.stmt.modified_non_trans_table))
{
- Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
- error= binlog_end_trans(thd, trx_data, &qev, all);
+ Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
}
trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
@@ -1621,7 +1731,7 @@ static int binlog_rollback(handlerton *h
(thd->options & OPTION_KEEP_LOG)) &&
mysql_bin_log.check_write_error(thd))
trx_data->set_incident();
- error= binlog_end_trans(thd, trx_data, 0, all);
+ error= binlog_truncate_trx_cache(thd, trx_data, all);
}
else
{
@@ -1641,8 +1751,8 @@ static int binlog_rollback(handlerton *h
thd->current_stmt_binlog_row_based) ||
((thd->options & OPTION_KEEP_LOG)))
{
- Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
- error= binlog_end_trans(thd, trx_data, &qev, all);
+ Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
+ error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
}
/*
Otherwise, we simply truncate the cache as there is no change on
@@ -1650,7 +1760,7 @@ static int binlog_rollback(handlerton *h
*/
else if ((all && !thd->transaction.all.modified_non_trans_table) ||
(!all && !thd->transaction.stmt.modified_non_trans_table))
- error= binlog_end_trans(thd, trx_data, 0, all);
+ error= binlog_truncate_trx_cache(thd, trx_data, all);
}
if (!all)
trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback
@@ -2464,7 +2574,7 @@ const char *MYSQL_LOG::generate_name(con
MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
- need_start_event(TRUE), m_table_map_version(0),
+ need_start_event(TRUE),
is_relay_log(0),
description_event_for_exec(0), description_event_for_queue(0)
{
@@ -2477,6 +2587,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG()
index_file_name[0] = 0;
bzero((char*) &index_file, sizeof(index_file));
bzero((char*) &purge_index_file, sizeof(purge_index_file));
+ use_group_log_xid= TRUE;
}
/* this is called only once */
@@ -2492,6 +2603,7 @@ void MYSQL_BIN_LOG::cleanup()
delete description_event_for_exec;
(void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index);
+ (void) pthread_mutex_destroy(&LOCK_queue);
(void) pthread_cond_destroy(&update_cond);
}
DBUG_VOID_RETURN;
@@ -2520,6 +2632,8 @@ void MYSQL_BIN_LOG::init_pthread_objects
*/
(void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
MYF_NO_DEADLOCK_DETECTION);
+ (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue",
+ MYF(0));
(void) pthread_cond_init(&update_cond, 0);
}
@@ -4113,7 +4227,6 @@ int THD::binlog_write_table_map(TABLE *t
DBUG_RETURN(error);
binlog_table_maps++;
- table->s->table_map_version= mysql_bin_log.table_map_version();
DBUG_RETURN(0);
}
@@ -4194,64 +4307,41 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
if (Rows_log_event* pending= trx_data->pending())
{
- IO_CACHE *file= &log_file;
-
/*
Decide if we should write to the log file directly or to the
transaction log.
*/
if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
- file= &trx_data->trans_log;
-
- /*
- If we are writing to the log file directly, we could avoid
- locking the log. This does not work since we need to step the
- m_table_map_version below, and that change has to be protected
- by the LOCK_log mutex.
- */
- pthread_mutex_lock(&LOCK_log);
-
- /*
- Write pending event to log file or transaction cache
- */
- if (pending->write(file))
{
- pthread_mutex_unlock(&LOCK_log);
- set_write_error(thd);
- DBUG_RETURN(1);
+ /* Write to transaction log/cache. */
+ if (pending->write(&trx_data->trans_log))
+ {
+ set_write_error(thd);
+ DBUG_RETURN(1);
+ }
}
-
- /*
- We step the table map version if we are writing an event
- representing the end of a statement. We do this regardless of
- wheather we write to the transaction cache or to directly to the
- file.
-
- In an ideal world, we could avoid stepping the table map version
- if we were writing to a transaction cache, since we could then
- reuse the table map that was written earlier in the transaction
- cache. This does not work since STMT_END_F implies closing all
- table mappings on the slave side.
-
- TODO: Find a solution so that table maps does not have to be
- written several times within a transaction.
- */
- if (pending->get_flags(Rows_log_event::STMT_END_F))
- ++m_table_map_version;
-
- delete pending;
-
- if (file == &log_file)
+ else
{
+ /* Write directly to log file. */
+ pthread_mutex_lock(&LOCK_log);
+ if (pending->write(&log_file))
+ {
+ pthread_mutex_unlock(&LOCK_log);
+ set_write_error(thd);
+ DBUG_RETURN(1);
+ }
+
error= flush_and_sync();
if (!error)
{
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+
+ pthread_mutex_unlock(&LOCK_log);
}
- pthread_mutex_unlock(&LOCK_log);
+ delete pending;
}
thd->binlog_set_pending_rows_event(event);
@@ -4450,9 +4540,6 @@ err:
set_write_error(thd);
}
- if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F)
- ++m_table_map_version;
-
pthread_mutex_unlock(&LOCK_log);
DBUG_RETURN(error);
}
@@ -4575,18 +4662,14 @@ uint MYSQL_BIN_LOG::next_file_id()
SYNOPSIS
write_cache()
cache Cache to write to the binary log
- lock_log True if the LOCK_log mutex should be aquired, false otherwise
- sync_log True if the log should be flushed and sync:ed
DESCRIPTION
Write the contents of the cache to the binary log. The cache will
be reset as a READ_CACHE to be able to read the contents from it.
*/
-int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
+int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache)
{
- Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
-
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
return ER_ERROR_ON_WRITE;
uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
@@ -4697,6 +4780,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
}
/* Write data to the binary log file */
+ DBUG_EXECUTE_IF("fail_binlog_write_1", return ER_ERROR_ON_WRITE;);
if (my_b_write(&log_file, cache->read_pos, length))
return ER_ERROR_ON_WRITE;
cache->read_pos=cache->read_end; // Mark buffer used up
@@ -4704,9 +4788,6 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
DBUG_ASSERT(carry == 0);
- if (sync_log)
- flush_and_sync();
-
return 0; // All OK
}
@@ -4739,26 +4820,22 @@ int query_error_code(THD *thd, bool not_
return error;
}
-bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
+bool MYSQL_BIN_LOG::write_incident(THD *thd)
{
uint error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
- LEX_STRING const write_error_msg=
- { C_STRING_WITH_LEN("error writing to the binary log") };
Incident incident= INCIDENT_LOST_EVENTS;
Incident_log_event ev(thd, incident, write_error_msg);
- if (lock)
- pthread_mutex_lock(&LOCK_log);
+
+ pthread_mutex_lock(&LOCK_log);
error= ev.write(&log_file);
- if (lock)
+ if (!error && !(error= flush_and_sync()))
{
- if (!error && !(error= flush_and_sync()))
- {
- signal_update();
- rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
- }
- pthread_mutex_unlock(&LOCK_log);
+ signal_update();
+ rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+ pthread_mutex_unlock(&LOCK_log);
+
DBUG_RETURN(error);
}
@@ -4786,103 +4863,364 @@ bool MYSQL_BIN_LOG::write_incident(THD *
'cache' needs to be reinitialized after this functions returns.
*/
-bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
- bool incident)
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev)
{
- DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)");
+ DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+
+ /*
+ Create the necessary events here, where we have the correct THD (and
+ thread context).
+
+ Due to group commit the actual writing to binlog may happen in a different
+ thread.
+ */
+ Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ trx_data->begin_event= &qinfo;
+ trx_data->end_event= end_ev;
+ if (trx_data->has_incident())
+ {
+ Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+ trx_data->incident_event= &inc_ev;
+ DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+ }
+ else
+ {
+ trx_data->incident_event= NULL;
+ DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+ }
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
+{
+ /*
+ To facilitate group commit for the binlog, we first queue up ourselves in
+ the group commit queue. Then the first thread to enter the queue waits for
+ the LOCK_log mutex, and commits for everyone in the queue once it gets the
+ lock. Any other threads in the queue just wait for the first one to finish
+ the commit and wake them up.
+ */
+
+ pthread_mutex_lock(&trx_data->LOCK_group_commit);
+ const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data);
+
+ if (orig_queue != NULL)
+ {
+ trx_data->group_commit_leader= FALSE;
+ trx_data->done= FALSE;
+ trx_group_commit_participant(trx_data);
+ }
+ else
+ {
+ trx_data->group_commit_leader= TRUE;
+ pthread_mutex_unlock(&trx_data->LOCK_group_commit);
+ trx_group_commit_leader(NULL);
+ }
+
+ return trx_group_commit_finish(trx_data);
+}
+
+/*
+ Participate as secondary transaction in group commit.
+
+ Another thread is already waiting to obtain the LOCK_log, and should include
+ this thread in the group commit once the log is obtained. So here we put
+ ourself in the queue and wait to be signalled that the group commit is done.
+
+ Note that this function must be called with the trs_data->LOCK_group_commit
+ locked; the mutex will be released before return.
+*/
+void
+MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data)
+{
+ safe_mutex_assert_owner(&trx_data->LOCK_group_commit);
+
+ /* Wait until trx_data.done == true and woken up by the leader. */
+ while (!trx_data->done)
+ pthread_cond_wait(&trx_data->COND_group_commit,
+ &trx_data->LOCK_group_commit);
+ pthread_mutex_unlock(&trx_data->LOCK_group_commit);
+}
+
+bool
+MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
+{
+ if (trx_data->error)
+ {
+ switch (trx_data->error)
+ {
+ case ER_ERROR_ON_WRITE:
+ my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno);
+ break;
+ case ER_ERROR_ON_READ:
+ my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+ trx_data->trans_log.file_name, trx_data->commit_errno);
+ break;
+ default:
+ /*
+ There are not (and should not be) any errors thrown not covered above.
+ But just in case one is added later without updating the above switch
+ statement, include a catch-all.
+ */
+ my_printf_error(trx_data->error,
+ "Error writing transaction to binary log: %d",
+ MYF(ME_NOREFRESH), trx_data->error);
+ }
+
+ /*
+ Since we return error, this transaction XID will not be committed, so
+ we need to mark it as not needed for recovery (unlog() is not called
+ for a transaction if log_xid() fails).
+ */
+ if (trx_data->end_event->get_type_code() == XID_EVENT)
+ mark_xid_done();
+
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ Do binlog group commit as the lead thread.
+
+ This must be called when this thread/transaction is queued at the start of
+ the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
+ commit all the transactions in the queue (more may have entered while waiting
+ for LOCK_log). After commit is done, all other threads in the queue will be
+ signalled.
+
+ */
+void
+MYSQL_BIN_LOG::trx_group_commit_leader(THD *first_thd)
+{
+ uint xid_count= 0;
+ uint write_count= 0;
+
+ /* First, put anything from group_log_xid into the queue. */
+ binlog_trx_data *full_queue= NULL;
+ binlog_trx_data **next_ptr= &full_queue;
+ for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered)
+ {
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+ /* Skip log_xid for transactions without xid, marked by NULL end_event. */
+ if (!trx_data->end_event)
+ continue;
+
+ trx_data->error= 0;
+ *next_ptr= trx_data;
+ next_ptr= &(trx_data->next);
+ }
+
+ /*
+ Next, lock the LOCK_log(), and once we get it, add any additional writes
+ that queued up while we were waiting.
+
+ Note that if some writer not going through log_xid() comes in and gets the
+ LOCK_log before us, they will not be able to include us in their group
+ commit (and they are not able to handle ensuring same commit order between
+ us and participating transactional storage engines anyway).
+
+ On the other hand, when we get the LOCK_log, we will be able to include
+ any non-trasactional writes that queued up in our group commit. This
+ should hopefully not be too big of a problem, as group commit is most
+ important for the transactional case anyway when durability (fsync) is
+ enabled.
+ */
VOID(pthread_mutex_lock(&LOCK_log));
- /* NULL would represent nothing to replicate after ROLLBACK */
- DBUG_ASSERT(commit_event != NULL);
+ /*
+ As the queue is in reverse order of entering, reverse the queue as we add
+ it to the existing one. Note that there is no ordering defined between
+ transactional and non-transactional commits.
+ */
+ binlog_trx_data *current= atomic_grab_trx_queue();
+ binlog_trx_data *xtra_queue= NULL;
+ while (current)
+ {
+ current->error= 0;
+ binlog_trx_data *next= current->next;
+ current->next= xtra_queue;
+ xtra_queue= current;
+ current= next;
+ }
+ *next_ptr= xtra_queue;
+ /*
+ Now we have in full_queue the list of transactions to be committed in
+ order.
+ */
DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true
{
/*
- We only bother to write to the binary log if there is anything
- to write.
- */
- if (my_b_tell(cache) > 0)
+ Commit every transaction in the queue.
+
+ Note that we are doing this in a different thread than the one running
+ the transaction! So we are limited in the operations we can do. In
+ particular, we cannot call my_error() on behalf of a transaction, as
+ that obtains the THD from thread local storage. Instead, we must set
+ current->error and let the thread do the error reporting itself once
+ we wake it up.
+ */
+ for (current= full_queue; current != NULL; current= current->next)
{
- /*
- Log "BEGIN" at the beginning of every transaction. Here, a
- transaction is either a BEGIN..COMMIT block or a single
- statement in autocommit mode.
- */
- Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+ IO_CACHE *cache= ¤t->trans_log;
/*
- Now this Query_log_event has artificial log_pos 0. It must be
- adjusted to reflect the real position in the log. Not doing it
- would confuse the slave: it would prevent this one from
- knowing where he is in the master's binlog, which would result
- in wrong positions being shown to the user, MASTER_POS_WAIT
- undue waiting etc.
+ We only bother to write to the binary log if there is anything
+ to write.
*/
- if (qinfo.write(&log_file))
- goto err;
-
- DBUG_EXECUTE_IF("crash_before_writing_xid",
- {
- if ((write_error= write_cache(cache, false, true)))
- DBUG_PRINT("info", ("error writing binlog cache: %d",
- write_error));
- DBUG_PRINT("info", ("crashing before writing xid"));
- abort();
- });
-
- if ((write_error= write_cache(cache, false, false)))
- goto err;
+ if (my_b_tell(cache) > 0)
+ {
+ current->error= write_transaction(current);
+ if (current->error)
+ current->commit_errno= errno;
- if (commit_event && commit_event->write(&log_file))
- goto err;
+ write_count++;
+ }
- if (incident && write_incident(thd, FALSE))
- goto err;
+ if (current->end_event->get_type_code() == XID_EVENT)
+ xid_count++;
+ }
+ if (write_count > 0)
+ {
if (flush_and_sync())
- goto err;
- DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
- if (cache->error) // Error on read
{
- sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
- write_error=1; // Don't give more errors
- goto err;
+ for (current= full_queue; current != NULL; current= current->next)
+ {
+ if (!current->error)
+ {
+ current->error= ER_ERROR_ON_WRITE;
+ current->commit_errno= errno;
+ }
+ }
+ }
+ else
+ {
+ signal_update();
}
- signal_update();
}
/*
- if commit_event is Xid_log_event, increase the number of
+ if any commit_events are Xid_log_event, increase the number of
prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
if there're prepared xids in it - see the comment in new_file() for
an explanation.
- If the commit_event is not Xid_log_event (then it's a Query_log_event)
- rotate binlog, if necessary.
+ If no Xid_log_events (then it's all Query_log_event) rotate binlog,
+ if necessary.
*/
- if (commit_event && commit_event->get_type_code() == XID_EVENT)
+ if (xid_count > 0)
{
- pthread_mutex_lock(&LOCK_prep_xids);
- prepared_xids++;
- pthread_mutex_unlock(&LOCK_prep_xids);
+ mark_xids_active(xid_count);
}
else
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
+
VOID(pthread_mutex_unlock(&LOCK_log));
- DBUG_RETURN(0);
+ /*
+ Signal those that are not part of group_log_xid, and are not group leaders
+ running the queue.
-err:
- if (!write_error)
+ Since a group leader runs the queue itself if a group_log_xid does not get
+ to do it forst, such leader threads do not need wait or wakeup.
+ */
+ for (current= xtra_queue; current != NULL; current= current->next)
{
- write_error= 1;
- sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
+ /*
+ Note that we need to take LOCK_group_commit even in the case of a leader!
+
+ Otherwise there is a race between setting and testing the
+ group_commit_leader flag.
+ */
+ pthread_mutex_lock(¤t->LOCK_group_commit);
+ if (!current->group_commit_leader)
+ {
+ current->done= true;
+ pthread_cond_signal(¤t->COND_group_commit);
+ }
+ pthread_mutex_unlock(¤t->LOCK_group_commit);
}
- VOID(pthread_mutex_unlock(&LOCK_log));
- DBUG_RETURN(1);
}
+int
+MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
+{
+ IO_CACHE *cache= &trx_data->trans_log;
+ /*
+ Log "BEGIN" at the beginning of every transaction. Here, a transaction is
+ either a BEGIN..COMMIT block or a single statement in autocommit mode. The
+ event was constructed in write_transaction_to_binlog(), in the thread
+ running the transaction.
+
+ Now this Query_log_event has artificial log_pos 0. It must be
+ adjusted to reflect the real position in the log. Not doing it
+ would confuse the slave: it would prevent this one from
+ knowing where he is in the master's binlog, which would result
+ in wrong positions being shown to the user, MASTER_POS_WAIT
+ undue waiting etc.
+ */
+ if (trx_data->begin_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+
+ DBUG_EXECUTE_IF("crash_before_writing_xid",
+ {
+ if ((write_cache(cache)))
+ DBUG_PRINT("info", ("error writing binlog cache"));
+ else
+ flush_and_sync();
+
+ DBUG_PRINT("info", ("crashing before writing xid"));
+ abort();
+ });
+
+ if (write_cache(cache))
+ return ER_ERROR_ON_WRITE;
+
+ if (trx_data->end_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+
+ if (trx_data->has_incident() && trx_data->incident_event->write(&log_file))
+ return ER_ERROR_ON_WRITE;
+
+ if (cache->error) // Error on read
+ return ER_ERROR_ON_READ;
+
+ return 0;
+}
+
+binlog_trx_data *
+MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data)
+{
+ my_atomic_rwlock_wrlock(&LOCK_queue);
+ trx_data->next= group_commit_queue;
+ while (!my_atomic_casptr((void **)(&group_commit_queue),
+ (void **)(&trx_data->next),
+ trx_data))
+ ;
+ my_atomic_rwlock_wrunlock(&LOCK_queue);
+ return trx_data->next;
+}
+
+binlog_trx_data *
+MYSQL_BIN_LOG::atomic_grab_trx_queue()
+{
+ my_atomic_rwlock_wrlock(&LOCK_queue);
+ binlog_trx_data *queue= group_commit_queue;
+ while (!my_atomic_casptr((void **)(&group_commit_queue),
+ (void **)(&queue),
+ NULL))
+ ;
+ my_atomic_rwlock_wrunlock(&LOCK_queue);
+ return queue;
+}
/**
Wait until we get a signal that the binary log has been updated.
@@ -5879,9 +6217,6 @@ void TC_LOG_BINLOG::close()
}
/**
- @todo
- group commit
-
@retval
0 error
@retval
@@ -5889,19 +6224,83 @@ void TC_LOG_BINLOG::close()
*/
int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
{
- DBUG_ENTER("TC_LOG_BINLOG::log");
- Xid_log_event xle(thd, xid);
- binlog_trx_data *trx_data=
- (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ int error;
+ DBUG_ENTER("TC_LOG_BINLOG::log_xid");
+
+ thd->next_commit_ordered= 0;
+ group_log_xid(thd);
+ if (thd->xid_error)
+ error= xid_delayed_error(thd);
+ else
+ error= 0;
+
/*
- We always commit the entire transaction when writing an XID. Also
- note that the return value is inverted.
- */
- DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE));
+ Note that the return value is inverted: zero on failure, private non-zero
+ 'cookie' on success.
+ */
+ DBUG_RETURN(!error);
}
-void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+/*
+ Do a binlog log_xid() for a group of transactions, linked through
+ thd->next_commit_ordered.
+*/
+void
+TC_LOG_BINLOG::group_log_xid(THD *first_thd)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::group_log_xid");
+ trx_group_commit_leader(first_thd);
+ for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered)
+ {
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ thd->xid_error= trx_data->error;
+ thd->xid_cookie= !trx_data->error;
+ }
+ DBUG_VOID_RETURN;
+}
+
+int
+TC_LOG_BINLOG::xid_delayed_error(THD *thd)
{
+ binlog_trx_data *const trx_data=
+ (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+ return trx_group_commit_finish(trx_data);
+}
+
+/*
+ After an XID is logged, we need to hold on to the current binlog file until
+ it is fully committed in the storage engine. The reason is that crash
+ recovery only looks at the latest binlog, so we must make sure there are no
+ outstanding prepared (but not committed) transactions before rotating the
+ binlog.
+
+ To handle this, we keep a count of outstanding XIDs. This function is used
+ to increase this count when committing one or more transactions to the
+ binary log.
+*/
+void
+TC_LOG_BINLOG::mark_xids_active(uint xid_count)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
+ DBUG_PRINT("info", ("xid_count=%u", xid_count));
+ pthread_mutex_lock(&LOCK_prep_xids);
+ prepared_xids+= xid_count;
+ pthread_mutex_unlock(&LOCK_prep_xids);
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Once an XID is committed, it is safe to rotate the binary log, as it can no
+ longer be needed during crash recovery.
+
+ This function is called to mark an XID this way. It needs to decrease the
+ count of pending XIDs, and signal the log rotator thread when it reaches zero.
+*/
+void
+TC_LOG_BINLOG::mark_xid_done()
+{
+ DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0) {
@@ -5909,7 +6308,16 @@ void TC_LOG_BINLOG::unlog(ulong cookie,
pthread_cond_signal(&COND_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids);
- rotate_and_purge(0); // as ::write() did not rotate
+ DBUG_VOID_RETURN;
+}
+
+void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+{
+ DBUG_ENTER("TC_LOG_BINLOG::unlog");
+ if (xid)
+ mark_xid_done();
+ rotate_and_purge(0); // as ::write_transaction_to_binlog() did not rotate
+ DBUG_VOID_RETURN;
}
int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)
=== modified file 'sql/log.h'
--- a/sql/log.h 2009-12-04 14:40:42 +0000
+++ b/sql/log.h 2010-05-26 08:16:18 +0000
@@ -28,13 +28,49 @@ class TC_LOG
{
public:
int using_heuristic_recover();
- TC_LOG() {}
+ /* True if we should use group_log_xid(), false to use log_xid(). */
+ bool use_group_log_xid;
+
+ TC_LOG() : use_group_log_xid(0) {}
virtual ~TC_LOG() {}
virtual int open(const char *opt_name)=0;
virtual void close()=0;
virtual int log_xid(THD *thd, my_xid xid)=0;
virtual void unlog(ulong cookie, my_xid xid)=0;
+ /*
+ If use_group_log_xid is true, then this method is used instead of
+ log_xid() to do logging of a group of transactions all at once.
+
+ The transactions will be linked through THD::next_commit_ordered.
+
+ Additionally, when this method is used instead of log_xid(), the order in
+ which handler->prepare_ordered() and handler->commit_ordered() are called
+ is guaranteed to be the same as the order of calls and THD list elements
+ for group_log_xid().
+
+ This can be used to efficiently implement group commit that at the same
+ time preserves the order of commits among handlers and TC (eg. to get same
+ commit order in InnoDB and binary log).
+
+ For TCs that do not need this, it can be preferable to use plain log_xid()
+ instead, as it allows threads to run log_xid() in parallel with each
+ other. In contrast, group_log_xid() runs under a global mutex, so it is
+ guaranteed that only once call into it will be active at once.
+
+ Since this call handles multiple threads/THDs at once, my_error() (and
+ other code that relies on thread local storage) cannot be used in this
+ method. Instead, in case of error, thd->xid_error should be set to the
+ error code, and xid_delayed_error() will be called later in the correct
+ thread context to actually report the error.
+
+ In the success case, this method must set thd->xid_cookie for each thread
+ to the cookie that is normally returned from log_xid() (which must be
+ non-zero in the non-error case).
+ */
+ virtual void group_log_xid(THD *first_thd) { DBUG_ASSERT(FALSE); }
+ /* Error reporting for group_log_xid(). */
+ virtual int xid_delayed_error(THD *thd) { DBUG_ASSERT(FALSE); return 0; }
};
class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
@@ -227,12 +263,19 @@ private:
time_t last_time;
};
+class binlog_trx_data;
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
{
private:
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */
pthread_mutex_t LOCK_index;
pthread_mutex_t LOCK_prep_xids;
+ /*
+ Mutex to protect the queue of transactions waiting to participate in group
+ commit. (Only used on platforms without native atomic operations).
+ */
+ pthread_mutex_t LOCK_queue;
+
pthread_cond_t COND_prep_xids;
pthread_cond_t update_cond;
ulonglong bytes_written;
@@ -271,8 +314,8 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
In 5.0 it's 0 for relay logs too!
*/
bool no_auto_events;
-
- ulonglong m_table_map_version;
+ /* Queue of transactions queued up to participate in group commit. */
+ binlog_trx_data *group_commit_queue;
int write_to_file(IO_CACHE *cache);
/*
@@ -282,6 +325,14 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
*/
void new_file_without_locking();
void new_file_impl(bool need_lock);
+ int write_transaction(binlog_trx_data *trx_data);
+ bool write_transaction_to_binlog_events(binlog_trx_data *trx_data);
+ void trx_group_commit_participant(binlog_trx_data *trx_data);
+ void trx_group_commit_leader(THD *first_thd);
+ binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data);
+ binlog_trx_data *atomic_grab_trx_queue();
+ void mark_xid_done();
+ void mark_xids_active(uint xid_count);
public:
MYSQL_LOG::generate_name;
@@ -311,17 +362,11 @@ public:
int open(const char *opt_name);
void close();
int log_xid(THD *thd, my_xid xid);
+ int xid_delayed_error(THD *thd);
+ void group_log_xid(THD *first_thd);
void unlog(ulong cookie, my_xid xid);
int recover(IO_CACHE *log, Format_description_log_event *fdle);
#if !defined(MYSQL_CLIENT)
- bool is_table_mapped(TABLE *table) const
- {
- return table->s->table_map_version == table_map_version();
- }
-
- ulonglong table_map_version() const { return m_table_map_version; }
- void update_table_map_version() { ++m_table_map_version; }
-
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event);
int remove_pending_rows_event(THD *thd);
@@ -362,10 +407,12 @@ public:
void new_file();
bool write(Log_event* event_info); // binary log write
- bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident);
- bool write_incident(THD *thd, bool lock);
+ bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+ Log_event *end_ev);
+ bool trx_group_commit_finish(binlog_trx_data *trx_data);
+ bool write_incident(THD *thd);
- int write_cache(IO_CACHE *cache, bool lock_log, bool flush_and_sync);
+ int write_cache(IO_CACHE *cache);
void set_write_error(THD *thd);
bool check_write_error(THD *thd);
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-03-04 08:03:07 +0000
+++ b/sql/log_event.h 2010-05-26 08:16:18 +0000
@@ -463,10 +463,9 @@ struct sql_ex_info
#define LOG_EVENT_SUPPRESS_USE_F 0x8
/*
- The table map version internal to the log should be increased after
- the event has been written to the binary log.
+ This used to be LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F, but is now unused.
*/
-#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x10
+#define LOG_EVENT_UNUSED1_F 0x10
/**
@def LOG_EVENT_ARTIFICIAL_F
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc 2010-01-15 15:27:55 +0000
+++ b/sql/sql_class.cc 2010-05-26 08:16:18 +0000
@@ -673,6 +673,8 @@ THD::THD()
active_vio = 0;
#endif
pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&COND_commit_ordered, 0);
/* Variables with default values */
proc_info="login";
@@ -3773,7 +3775,6 @@ int THD::binlog_flush_pending_rows_event
if (stmt_end)
{
pending->set_flags(Rows_log_event::STMT_END_F);
- pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
binlog_table_maps= 0;
}
@@ -3901,7 +3902,6 @@ int THD::binlog_query(THD::enum_binlog_q
{
Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
errcode);
- qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
/*
Binlog table maps will be irrelevant after a Query_log_event
(they are just removed on the slave side) so after the query
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2010-03-30 12:36:49 +0000
+++ b/sql/sql_class.h 2010-05-26 08:16:18 +0000
@@ -1438,6 +1438,21 @@ public:
/* container for handler's private per-connection data */
Ha_data ha_data[MAX_HA];
+ /* Mutex and condition for waking up threads after group commit. */
+ pthread_mutex_t LOCK_commit_ordered;
+ pthread_cond_t COND_commit_ordered;
+ bool group_commit_ready;
+ /* Pointer for linking THDs into queue waiting for group commit. */
+ THD *next_commit_ordered;
+ /*
+ The "all" parameter of commit(), to communicate it to the thread that
+ calls commit_ordered().
+ */
+ bool group_commit_all;
+ /* Set by TC_LOG::group_log_xid(), to return per-thd error and cookie. */
+ int xid_error;
+ int xid_cookie;
+
#ifndef MYSQL_CLIENT
int binlog_setup_trx_data();
=== modified file 'sql/sql_load.cc'
--- a/sql/sql_load.cc 2010-03-04 08:03:07 +0000
+++ b/sql/sql_load.cc 2010-05-26 08:16:18 +0000
@@ -516,7 +516,6 @@ int mysql_load(THD *thd,sql_exchange *ex
else
{
Delete_file_log_event d(thd, db, transactional_table);
- d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
(void) mysql_bin_log.write(&d);
}
}
@@ -698,7 +697,6 @@ static bool write_execute_load_query_log
(duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
(ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
transactional_table, FALSE, errcode);
- e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
return mysql_bin_log.write(&e);
}
=== modified file 'sql/table.cc'
--- a/sql/table.cc 2010-03-10 10:32:14 +0000
+++ b/sql/table.cc 2010-05-26 08:16:18 +0000
@@ -297,13 +297,6 @@ TABLE_SHARE *alloc_table_share(TABLE_LIS
share->version= refresh_version;
/*
- This constant is used to mark that no table map version has been
- assigned. No arithmetic is done on the value: it will be
- overwritten with a value taken from MYSQL_BIN_LOG.
- */
- share->table_map_version= ~(ulonglong)0;
-
- /*
Since alloc_table_share() can be called without any locking (for
example, ha_create_table... functions), we do not assign a table
map id here. Instead we assign a value that is not used
@@ -367,10 +360,9 @@ void init_tmp_table_share(THD *thd, TABL
share->frm_version= FRM_VER_TRUE_VARCHAR;
/*
- Temporary tables are not replicated, but we set up these fields
+ Temporary tables are not replicated, but we set up this fields
anyway to be able to catch errors.
*/
- share->table_map_version= ~(ulonglong)0;
share->cached_row_logging_check= -1;
/*
=== modified file 'sql/table.h'
--- a/sql/table.h 2010-02-10 19:06:24 +0000
+++ b/sql/table.h 2010-05-26 08:16:18 +0000
@@ -433,7 +433,6 @@ typedef struct st_table_share
bool waiting_on_cond; /* Protection against free */
bool deleting; /* going to delete this table */
ulong table_map_id; /* for row-based replication */
- ulonglong table_map_version;
/*
Cache for row-based replication table share checks that does not
=== modified file 'storage/xtradb/handler/ha_innodb.cc'
--- a/storage/xtradb/handler/ha_innodb.cc 2010-01-15 21:12:30 +0000
+++ b/storage/xtradb/handler/ha_innodb.cc 2010-05-26 08:16:18 +0000
@@ -138,8 +138,6 @@ bool check_global_access(THD *thd, ulong
/** to protect innobase_open_files */
static pthread_mutex_t innobase_share_mutex;
-/** to force correct commit order in binlog */
-static pthread_mutex_t prepare_commit_mutex;
static ulong commit_threads = 0;
static pthread_mutex_t commit_threads_m;
static pthread_cond_t commit_cond;
@@ -239,6 +237,7 @@ static const char* innobase_change_buffe
static INNOBASE_SHARE *get_share(const char *table_name);
static void free_share(INNOBASE_SHARE *share);
static int innobase_close_connection(handlerton *hton, THD* thd);
+static void innobase_commit_ordered(handlerton *hton, THD* thd, bool all);
static int innobase_commit(handlerton *hton, THD* thd, bool all);
static int innobase_rollback(handlerton *hton, THD* thd, bool all);
static int innobase_rollback_to_savepoint(handlerton *hton, THD* thd,
@@ -1356,7 +1355,6 @@ innobase_trx_init(
trx_t* trx) /*!< in/out: InnoDB transaction handle */
{
DBUG_ENTER("innobase_trx_init");
- DBUG_ASSERT(EQ_CURRENT_THD(thd));
DBUG_ASSERT(thd == trx->mysql_thd);
trx->check_foreigns = !thd_test_options(
@@ -1416,8 +1414,6 @@ check_trx_exists(
{
trx_t*& trx = thd_to_trx(thd);
- ut_ad(EQ_CURRENT_THD(thd));
-
if (trx == NULL) {
trx = innobase_trx_allocate(thd);
} else if (UNIV_UNLIKELY(trx->magic_n != TRX_MAGIC_N)) {
@@ -2024,6 +2020,7 @@ innobase_init(
innobase_hton->savepoint_set=innobase_savepoint;
innobase_hton->savepoint_rollback=innobase_rollback_to_savepoint;
innobase_hton->savepoint_release=innobase_release_savepoint;
+ innobase_hton->commit_ordered=innobase_commit_ordered;
innobase_hton->commit=innobase_commit;
innobase_hton->rollback=innobase_rollback;
innobase_hton->prepare=innobase_xa_prepare;
@@ -2492,7 +2489,6 @@ skip_overwrite:
innobase_open_tables = hash_create(200);
pthread_mutex_init(&innobase_share_mutex, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&prepare_commit_mutex, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&commit_threads_m, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&commit_cond_m, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&analyze_mutex, MY_MUTEX_INIT_FAST);
@@ -2547,7 +2543,6 @@ innobase_end(
my_free(internal_innobase_data_file_path,
MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_destroy(&innobase_share_mutex);
- pthread_mutex_destroy(&prepare_commit_mutex);
pthread_mutex_destroy(&commit_threads_m);
pthread_mutex_destroy(&commit_cond_m);
pthread_mutex_destroy(&analyze_mutex);
@@ -2681,6 +2676,101 @@ innobase_start_trx_and_assign_read_view(
}
/*****************************************************************//**
+Perform the first, fast part of InnoDB commit.
+
+Doing it in this call ensures that we get the same commit order here
+as in binlog and any other participating transactional storage engines.
+
+Note that we want to do as little as really needed here, as we run
+under a global mutex. The expensive fsync() is done later, in
+innobase_commit(), without a lock so group commit can take place.
+
+Note also that this method can be called from a different thread than
+the one handling the rest of the transaction. */
+static
+void
+innobase_commit_ordered(
+/*============*/
+ handlerton *hton, /*!< in: Innodb handlerton */
+ THD* thd, /*!< in: MySQL thread handle of the user for whom
+ the transaction should be committed */
+ bool all) /*!< in: TRUE - commit transaction
+ FALSE - the current SQL statement ended */
+{
+ trx_t* trx;
+ DBUG_ENTER("innobase_commit_ordered");
+ DBUG_ASSERT(hton == innodb_hton_ptr);
+
+ trx = check_trx_exists(thd);
+
+ if (trx->active_trans == 0
+ && trx->conc_state != TRX_NOT_STARTED) {
+ /* We throw an error here; instead we will catch this error
+ again in innobase_commit() and report it from there. */
+ DBUG_VOID_RETURN;
+ }
+ /* Since we will reserve the kernel mutex, we have to release
+ the search system latch first to obey the latching order. */
+
+ if (trx->has_search_latch) {
+ trx_search_latch_release_if_reserved(trx);
+ }
+
+ /* commit_ordered is only called when committing the whole transaction
+ (or an SQL statement when autocommit is on). */
+ DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
+
+ /* We need current binlog position for ibbackup to work.
+ Note, the position is current because commit_ordered is guaranteed
+ to be called in same sequenece as writing to binlog. */
+
+retry:
+ if (innobase_commit_concurrency > 0) {
+ pthread_mutex_lock(&commit_cond_m);
+ commit_threads++;
+
+ if (commit_threads > innobase_commit_concurrency) {
+ commit_threads--;
+ pthread_cond_wait(&commit_cond,
+ &commit_cond_m);
+ pthread_mutex_unlock(&commit_cond_m);
+ goto retry;
+ }
+ else {
+ pthread_mutex_unlock(&commit_cond_m);
+ }
+ }
+
+ /* The following calls to read the MySQL binary log
+ file name and the position return consistent results:
+ 1) We use commit_ordered() to get same commit order
+ in InnoDB as in binary log.
+ 2) A MySQL log file rotation cannot happen because
+ MySQL protects against this by having a counter of
+ transactions in prepared state and it only allows
+ a rotation when the counter drops to zero. See
+ LOCK_prep_xids and COND_prep_xids in log.cc. */
+ trx->mysql_log_file_name = mysql_bin_log_file_name();
+ trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos();
+
+ /* Don't do write + flush right now. For group commit
+ to work we want to do the flush in the innobase_commit()
+ method, which runs without holding any locks. */
+ trx->flush_log_later = TRUE;
+ innobase_commit_low(trx);
+ trx->flush_log_later = FALSE;
+
+ if (innobase_commit_concurrency > 0) {
+ pthread_mutex_lock(&commit_cond_m);
+ commit_threads--;
+ pthread_cond_signal(&commit_cond);
+ pthread_mutex_unlock(&commit_cond_m);
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+/*****************************************************************//**
Commits a transaction in an InnoDB database or marks an SQL statement
ended.
@return 0 */
@@ -2702,13 +2792,6 @@ innobase_commit(
trx = check_trx_exists(thd);
- /* Since we will reserve the kernel mutex, we have to release
- the search system latch first to obey the latching order. */
-
- if (trx->has_search_latch) {
- trx_search_latch_release_if_reserved(trx);
- }
-
/* The flag trx->active_trans is set to 1 in
1. ::external_lock(),
@@ -2736,62 +2819,8 @@ innobase_commit(
/* We were instructed to commit the whole transaction, or
this is an SQL statement end and autocommit is on */
- /* We need current binlog position for ibbackup to work.
- Note, the position is current because of
- prepare_commit_mutex */
-retry:
- if (innobase_commit_concurrency > 0) {
- pthread_mutex_lock(&commit_cond_m);
- commit_threads++;
-
- if (commit_threads > innobase_commit_concurrency) {
- commit_threads--;
- pthread_cond_wait(&commit_cond,
- &commit_cond_m);
- pthread_mutex_unlock(&commit_cond_m);
- goto retry;
- }
- else {
- pthread_mutex_unlock(&commit_cond_m);
- }
- }
-
- /* The following calls to read the MySQL binary log
- file name and the position return consistent results:
- 1) Other InnoDB transactions cannot intervene between
- these calls as we are holding prepare_commit_mutex.
- 2) Binary logging of other engines is not relevant
- to InnoDB as all InnoDB requires is that committing
- InnoDB transactions appear in the same order in the
- MySQL binary log as they appear in InnoDB logs.
- 3) A MySQL log file rotation cannot happen because
- MySQL protects against this by having a counter of
- transactions in prepared state and it only allows
- a rotation when the counter drops to zero. See
- LOCK_prep_xids and COND_prep_xids in log.cc. */
- trx->mysql_log_file_name = mysql_bin_log_file_name();
- trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos();
-
- /* Don't do write + flush right now. For group commit
- to work we want to do the flush after releasing the
- prepare_commit_mutex. */
- trx->flush_log_later = TRUE;
- innobase_commit_low(trx);
- trx->flush_log_later = FALSE;
-
- if (innobase_commit_concurrency > 0) {
- pthread_mutex_lock(&commit_cond_m);
- commit_threads--;
- pthread_cond_signal(&commit_cond);
- pthread_mutex_unlock(&commit_cond_m);
- }
-
- if (trx->active_trans == 2) {
-
- pthread_mutex_unlock(&prepare_commit_mutex);
- }
-
- /* Now do a write + flush of logs. */
+ /* We did the first part already in innobase_commit_ordered(),
+ Now finish by doing a write + flush of logs. */
trx_commit_complete_for_mysql(trx);
trx->active_trans = 0;
@@ -4621,6 +4650,7 @@ no_commit:
no need to re-acquire locks on it. */
/* Altering to InnoDB format */
+ innobase_commit_ordered(ht, user_thd, 1);
innobase_commit(ht, user_thd, 1);
/* Note that this transaction is still active. */
prebuilt->trx->active_trans = 1;
@@ -4637,6 +4667,7 @@ no_commit:
/* Commit the transaction. This will release the table
locks, so they have to be acquired again. */
+ innobase_commit_ordered(ht, user_thd, 1);
innobase_commit(ht, user_thd, 1);
/* Note that this transaction is still active. */
prebuilt->trx->active_trans = 1;
@@ -8339,6 +8370,7 @@ ha_innobase::external_lock(
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (trx->active_trans != 0) {
+ innobase_commit_ordered(ht, thd, TRUE);
innobase_commit(ht, thd, TRUE);
}
} else {
@@ -9448,36 +9480,6 @@ innobase_xa_prepare(
srv_active_wake_master_thread();
- if (thd_sql_command(thd) != SQLCOM_XA_PREPARE &&
- (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
- {
- if (srv_enable_unsafe_group_commit && !THDVAR(thd, support_xa)) {
- /* choose group commit rather than binlog order */
- return(error);
- }
-
- /* For ibbackup to work the order of transactions in binlog
- and InnoDB must be the same. Consider the situation
-
- thread1> prepare; write to binlog; ...
- <context switch>
- thread2> prepare; write to binlog; commit
- thread1> ... commit
-
- To ensure this will not happen we're taking the mutex on
- prepare, and releasing it on commit.
-
- Note: only do it for normal commits, done via ha_commit_trans.
- If 2pc protocol is executed by external transaction
- coordinator, it will be just a regular MySQL client
- executing XA PREPARE and XA COMMIT commands.
- In this case we cannot know how many minutes or hours
- will be between XA PREPARE and XA COMMIT, and we don't want
- to block for undefined period of time. */
- pthread_mutex_lock(&prepare_commit_mutex);
- trx->active_trans = 2;
- }
-
return(error);
}
@@ -10669,11 +10671,6 @@ static MYSQL_SYSVAR_ENUM(adaptive_checkp
"Enable/Disable flushing along modified age. (none, reflex, [estimate])",
NULL, innodb_adaptive_checkpoint_update, 2, &adaptive_checkpoint_typelib);
-static MYSQL_SYSVAR_ULONG(enable_unsafe_group_commit, srv_enable_unsafe_group_commit,
- PLUGIN_VAR_RQCMDARG,
- "Enable/Disable unsafe group commit when support_xa=OFF and use with binlog or other XA storage engine.",
- NULL, NULL, 0, 0, 1, 0);
-
static MYSQL_SYSVAR_ULONG(expand_import, srv_expand_import,
PLUGIN_VAR_RQCMDARG,
"Enable/Disable converting automatically *.ibd files when import tablespace.",
@@ -10763,7 +10760,6 @@ static struct st_mysql_sys_var* innobase
MYSQL_SYSVAR(flush_neighbor_pages),
MYSQL_SYSVAR(read_ahead),
MYSQL_SYSVAR(adaptive_checkpoint),
- MYSQL_SYSVAR(enable_unsafe_group_commit),
MYSQL_SYSVAR(expand_import),
MYSQL_SYSVAR(extra_rsegments),
MYSQL_SYSVAR(dict_size_limit),