------------------------------------------------------------
revno: 2852
committer: knielsen@xxxxxxxxxxxxxxx
branch nick: work-5.1-pbxt-commit-ordered
timestamp: Fri 2010-10-15 15:42:06 +0200
message:
MWL#116: Efficient group commit: PBXT part
Implement the commit_ordered() API in PBXT, getting consistent
commit ordering
with other engines and binlog.
Make pbxt_support_xa default in MariaDB debug build (as the bug
that causes
assert in MySQL is fixed in MariaDB).
diff:
=== modified file 'storage/pbxt/src/ha_pbxt.cc'
--- storage/pbxt/src/ha_pbxt.cc 2010-09-28 13:05:45 +0000
+++ storage/pbxt/src/ha_pbxt.cc 2010-10-15 13:42:06 +0000
@@ -108,6 +108,9 @@
static int pbxt_panic(handlerton *hton, enum ha_panic_function flag);
static void pbxt_drop_database(handlerton *hton, char *path);
static int pbxt_close_connection(handlerton *hton, THD* thd);
+#ifdef MARIADB_BASE_VERSION
+static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool
all);
+#endif
static int pbxt_commit(handlerton *hton, THD *thd, bool all);
static int pbxt_rollback(handlerton *hton, THD *thd, bool all);
static int pbxt_prepare(handlerton *hton, THD *thd, bool all);
@@ -1147,6 +1150,9 @@
pbxt_hton->state = SHOW_OPTION_YES;
pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own!
pbxt_hton->close_connection = pbxt_close_connection; /*
close_connection, cleanup thread related data. */
+#ifdef MARIADB_BASE_VERSION
+ pbxt_hton->commit_ordered = pbxt_commit_ordered;
+#endif
pbxt_hton->commit = pbxt_commit; /* commit */
pbxt_hton->rollback = pbxt_rollback; /* rollback */
if (pbxt_support_xa) {
@@ -1484,6 +1490,29 @@
return err;
}
+#ifdef MARIADB_BASE_VERSION
+/*
+ * Quickly commit the transaction to memory and make it visible to
others.
+ * The remaining part of commit will happen later, in pbxt_commit().
+ */
+static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all)
+{
+ XTThreadPtr self;
+
+ if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
+ XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END
CONN XACT" : "END STAT", all);
+
+ if (self->st_xact_data) {
+ if (all || self->st_auto_commit) {
+ self->st_commit_ordered = TRUE;
+ self->st_writer = self->st_xact_writer;
+ self->st_delayed_error= !xt_xn_commit_fast(self, self-
>st_writer);
+ }
+ }
+ }
+}
+#endif
+
/*
* Commit the PBXT transaction of the given thread.
* thd is the MySQL thread structure.
@@ -1512,7 +1541,13 @@
if (all || self->st_auto_commit) {
XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n");
- if (!xt_xn_commit(self))
+ if (self->st_commit_ordered) {
+ self->st_commit_ordered = FALSE;
+ err = !xt_xn_commit_slow(self, self->st_writer) || self-
>st_delayed_error;
+ } else {
+ err = !xt_xn_commit(self);
+ }
+ if (err)
err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
}
}
@@ -6064,7 +6099,7 @@
NULL, NULL, 0, 0, 20000, 1);
#endif
-#ifndef DEBUG
+#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION)
static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
PLUGIN_VAR_OPCMDARG,
"Enable PBXT support for the XA two-phase commit, default is
enabled",
=== modified file 'storage/pbxt/src/thread_xt.h'
--- storage/pbxt/src/thread_xt.h 2010-05-05 10:59:57 +0000
+++ storage/pbxt/src/thread_xt.h 2010-10-15 13:42:06 +0000
@@ -299,6 +299,9 @@
xtBool st_stat_ended; /* TRUE if the statement was ended. */
xtBool st_stat_trans; /* TRUE if a statement transaction is
running (started on UPDATE). */
xtBool st_stat_modify; /* TRUE if the statement is an
INSERT/UPDATE/DELETE */
+ xtBool st_commit_ordered; /* TRUE if we have run
commit_ordered() */
+ xtBool st_delayed_error; /* TRUE if we got an error in
commit_ordered() */
+ xtBool st_writer; /* Copy of thread->st_xact_writer
(which is clobbered by xlog_append()) */
#ifdef XT_IMPLEMENT_NO_ACTION
XTBasicListRec st_restrict_list; /* These records have been
deleted and should have no reference. */
#endif
=== modified file 'storage/pbxt/src/xaction_xt.cc'
--- storage/pbxt/src/xaction_xt.cc 2010-09-28 13:05:45 +0000
+++ storage/pbxt/src/xaction_xt.cc 2010-10-15 13:42:06 +0000
@@ -1287,27 +1287,61 @@
return OK;
}
-static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
+static void xn_end_release_locks(XTThreadPtr thread)
+{
+ XTXactDataPtr xact = thread->st_xact_data;
+ XTDatabaseHPtr db = thread->st_database;
+ ASSERT_NS(xact);
+
+ /* {REMOVE-LOCKS} Drop locks if you have any: */
+ thread->st_lock_list.xt_remove_all_locks(db, thread);
+
+ /* Do this afterwards to make sure the sweeper
+ * does not cleanup transactions start cleaning up
+ * before any transactions that were waiting for
+ * this transaction have completed!
+ */
+ xact->xd_end_xn_id = db->db_xn_curr_id;
+
+ /* Now you can sweep! */
+ xact->xd_flags |= XT_XN_XAC_SWEEP;
+}
+
+/* The commit is split into two phases: one "fast" for MariaDB
commit_ordered(),
+ * and one "slow" for commit(). When not using internal 2pc, there
is only one
+ * call combining both phases.
+ */
+
+enum {
+ XN_END_PHASE_FAST = 1,
+ XN_END_PHASE_SLOW = 2,
+ XN_END_PHASE_BOTH = 3
+};
+
+static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool
writer, int phase)
{
XTXactDataPtr xact;
xtBool ok = TRUE;
+ xtBool err;
ASSERT_NS(thread->st_xact_data);
if ((xact = thread->st_xact_data)) {
XTDatabaseHPtr db = thread->st_database;
xtXactID xn_id = xact->xd_start_xn_id;
- xtBool writer;
- if ((writer = thread->st_xact_writer)) {
+ if (writer) {
/* The transaction wrote something: */
XTXactEndEntryDRec entry;
xtWord4 sum;
- sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
- entry.xe_status_1 = status;
- entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
- XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
- XT_SET_DISK_4(entry.xe_not_used_4, 0);
+ if (phase & XN_END_PHASE_FAST)
+ {
+ sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
+ entry.xe_status_1 = status;
+ entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
+ XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
+ XT_SET_DISK_4(entry.xe_not_used_4, 0);
+ }
#ifdef XT_IMPLEMENT_NO_ACTION
/* This will check any resticts that have been delayed to the end
of the statement. */
@@ -1319,20 +1353,35 @@
}
#endif
- /* Flush the data log: */
- if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
+ /* Flush the data log (in the "fast" case we already did it in
prepare: */
+ if ((phase & XN_END_PHASE_SLOW) && !thread-
>st_dlog_buf.dlb_flush_log(TRUE, thread)) {
ok = FALSE;
status = XT_LOG_ENT_ABORT;
}
/* Write and flush the transaction log: */
- if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec),
(XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
+ if (phase == XN_END_PHASE_FAST) {
+ /* Fast phase, delay any write or flush to later. */
+ err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec),
(XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH);
+ } else if (phase == XN_END_PHASE_SLOW) {
+ /* We already appended the commit record in the fast phase.
+ * Now just call with empty record to ensure we write/flush
+ * the log as needed for this commit.
+ */
+ err = !xt_xlog_log_data(thread, 0, NULL,
xt_db_flush_log_at_trx_commit);
+ } else /* phase == XN_END_PHASE_BOTH */ {
+ /* Both phases at once, append commit record and write/flush
normally. */
+ ASSERT_NS(phase == XN_END_PHASE_BOTH);
+ err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec),
(XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit);
+ }
+
+ if (err) {
ok = FALSE;
status = XT_LOG_ENT_ABORT;
/* Make sure this is done, if we failed to log
* the transction end!
*/
- if (thread->st_xact_writer) {
+ if (writer) {
/* Adjust this in case of error, but don't forget
* to lock!
*/
@@ -1347,46 +1396,46 @@
}
}
- /* Setting this flag completes the transaction,
- * Do this before we release the locks, because
- * the unlocked transactions expect the
- * transaction they are waiting for to be
- * gone!
+ if (phase & XN_END_PHASE_FAST) {
+ /* Setting this flag completes the transaction,
+ * Do this before we release the locks, because
+ * the unlocked transactions expect the
+ * transaction they are waiting for to be
+ * gone!
+ */
+ xact->xd_end_time = ++db->db_xn_end_time;
+ if (status == XT_LOG_ENT_COMMIT) {
+ thread->st_statistics.st_commits++;
+ xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
+ }
+ else {
+ thread->st_statistics.st_rollbacks++;
+ xact->xd_flags |= XT_XN_XAC_ENDED;
+ }
+ }
+
+ /* Be as fast as possible in the "fast" path, as we want to be as
+ * fast as possible here (we will release slow locks immediately
+ * after in the "slow" part).
+ * ToDo: If we ran the fast part, the slow part could release
locks
+ * _before_ fsync(), rather than after.
*/
- xact->xd_end_time = ++db->db_xn_end_time;
- if (status == XT_LOG_ENT_COMMIT) {
- thread->st_statistics.st_commits++;
+ if (!(phase & XN_END_PHASE_SLOW))
+ return ok;
+
+ xn_end_release_locks(thread);
+ }
+ else {
+ /* Read-only transaction can be removed, immediately */
+ if (phase & XN_END_PHASE_FAST) {
+ xact->xd_end_time = ++db->db_xn_end_time;
xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
- }
- else {
- thread->st_statistics.st_rollbacks++;
- xact->xd_flags |= XT_XN_XAC_ENDED;
- }
-
- /* {REMOVE-LOCKS} Drop locks is you have any: */
- thread->st_lock_list.xt_remove_all_locks(db, thread);
-
- /* Do this afterwards to make sure the sweeper
- * does not cleanup transactions start cleaning up
- * before any transactions that were waiting for
- * this transaction have completed!
- */
- xact->xd_end_xn_id = db->db_xn_curr_id;
-
- /* Now you can sweep! */
- xact->xd_flags |= XT_XN_XAC_SWEEP;
- }
- else {
- /* Read-only transaction can be removed, immediately */
- xact->xd_end_time = ++db->db_xn_end_time;
- xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-
- /* Drop locks is you have any: */
- thread->st_lock_list.xt_remove_all_locks(db, thread);
-
- xact->xd_end_xn_id = db->db_xn_curr_id;
-
- xact->xd_flags |= XT_XN_XAC_SWEEP;
+
+ if (!(phase & XN_END_PHASE_SLOW))
+ return ok;
+ }
+
+ xn_end_release_locks(thread);
if (xt_xn_delete_xact(db, xn_id, thread)) {
if (db->db_xn_min_ram_id == xn_id)
@@ -1478,12 +1527,22 @@
xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
{
- return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread-
>st_xact_writer, XN_END_PHASE_BOTH);
+}
+
+xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer)
+{
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer,
XN_END_PHASE_FAST);
+}
+
+xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer)
+{
+ return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer,
XN_END_PHASE_SLOW);
}
xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
{
- return xn_end_xact(thread, XT_LOG_ENT_ABORT);
+ return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread-
>st_xact_writer, XN_END_PHASE_BOTH);
}
xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
=== modified file 'storage/pbxt/src/xaction_xt.h'
--- storage/pbxt/src/xaction_xt.h 2010-05-05 10:59:57 +0000
+++ storage/pbxt/src/xaction_xt.h 2010-10-15 13:42:06 +0000
@@ -193,6 +193,8 @@
xtBool xt_xn_begin(struct XTThread *self);
xtBool xt_xn_commit(struct XTThread *self);
+xtBool xt_xn_commit_fast(struct XTThread *self, xtBool writer);
+xtBool xt_xn_commit_slow(struct XTThread *self, xtBool writer);
xtBool xt_xn_rollback(struct XTThread *self);
xtBool xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id);
int xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id,
xtRecordID rec_id);
=== added file 'tests/consistent_snapshot.pl'
--- tests/consistent_snapshot.pl 1970-01-01 00:00:00 +0000
+++ tests/consistent_snapshot.pl 2010-10-15 13:42:06 +0000
@@ -0,0 +1,107 @@
+#! /usr/bin/perl
+
+# Test START TRANSACTION WITH CONSISTENT SNAPSHOT.
+# With MWL#116, this is implemented so it is actually consistent.
+
+use strict;
+use warnings;
+
+use DBI;
+
+my $UPDATERS= 10;
+my $READERS= 5;
+
+my $ROWS= 50;
+my $DURATION= 20;
+
+my $stop_time= time() + $DURATION;
+
+sub my_connect {
+ my $dbh= DBI->connect("dbi:mysql:mysql_socket=/tmp/
mysql.sock;database=test",
+ "root", undef, { RaiseError=>1,
PrintError=>0, AutoCommit=>0});
+ $dbh->do("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE
READ");
+ $dbh->do("SET SESSION autocommit = 0");
+ return $dbh;
+}
+
+sub my_setup {
+ my $dbh= my_connect();
+
+ $dbh->do("DROP TABLE IF EXISTS test_consistent_snapshot1,
test_consistent_snapshot2");
+ $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot1 (
+ a INT PRIMARY KEY,
+ b INT NOT NULL
+) ENGINE=InnoDB
+TABLE
+ $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot2(
+ a INT PRIMARY KEY,
+ b INT NOT NULL
+) ENGINE=PBXT
+TABLE
+
+ for (my $i= 0; $i < $ROWS; $i++) {
+ my $value= int(rand()*1000);
+ $dbh->do("INSERT INTO test_consistent_snapshot1 VALUES (?, ?)",
undef,
+ $i, $value);
+ $dbh->do("INSERT INTO test_consistent_snapshot2 VALUES (?, ?)",
undef,
+ $i, -$value);
+ }
+ $dbh->commit();
+ $dbh->disconnect();
+}
+
+sub my_updater {
+ my $dbh= my_connect();
+
+ while (time() < $stop_time) {
+ my $i1= int(rand()*$ROWS);
+ my $i2= int(rand()*$ROWS);
+ my $v= int(rand()*99)-49;
+ $dbh->do("UPDATE test_consistent_snapshot1 SET b = b + ? WHERE
a = ?",
+ undef, $v, $i1);
+ $dbh->do("UPDATE test_consistent_snapshot2 SET b = b - ? WHERE
a = ?",
+ undef, $v, $i2);
+ $dbh->commit();
+ }
+
+ $dbh->disconnect();
+ exit(0);
+}
+
+sub my_reader {
+ my $dbh= my_connect();
+
+ my $iteration= 0;
+ while (time() < $stop_time) {
+ $dbh->do("START TRANSACTION WITH CONSISTENT SNAPSHOT");
+ my $s1= $dbh->selectrow_arrayref("SELECT SUM(b) FROM
test_consistent_snapshot1");
+ $s1= $s1->[0];
+ my $s2= $dbh->selectrow_arrayref("SELECT SUM(b) FROM
test_consistent_snapshot2");
+ $s2= $s2->[0];
+ $dbh->commit();
+ if ($s1 + $s2 != 0) {
+ print STDERR "Found inconsistency, s1=$s1 s2=$s2 iteration=
$iteration\n";
+ last;
+ }
+ ++$iteration;
+ }
+
+ $dbh->disconnect();
+ exit(0);
+}
+
+my_setup();
+
+for (1 .. $UPDATERS) {
+ fork() || my_updater();
+}
+
+for (1 .. $READERS) {
+ fork() || my_reader();
+}
+
+waitpid(-1, 0) for (1 .. ($UPDATERS + $READERS));
+
+print "All checks done\n";
_______________________________________________
Mailing list: https://launchpad.net/~maria-developers
Post to : maria-developers@xxxxxxxxxxxxxxxxxxx
Unsubscribe : https://launchpad.net/~maria-developers
More help : https://help.launchpad.net/ListHelp