maria-developers team mailing list archive
  
  - 
     maria-developers team maria-developers team
- 
    Mailing list archive
  
- 
    Message #03659
  
Re:  Implementing new "group commit" API in PBXT?
  
Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:
> Ok, thanks a lot for the advice, I will give it another shot.
Thanks to your help, I got it working! It was _really_ nice to see that the
new API applies well to PBXT also.
As a bonus, we now get START TRANSACTION WITH CONSISTENT SNAPSHOT actually be
consistent! In MySQL, this does not really do much except start a transaction
in all engines, it certainly does not ensure any consistency between engines.
With this change, it becomes consistent, I added a small Perl test program
tests/consistent_snapshot.pl that shows this. I think this is particularly
useful for backups; I plan to add a way to get the corresponding binlog
position, so START TRANSACTION WITH CONSISTENT SNAPSHOT can be used to make a
fully consistent and non-blocking backup (current mysqldump needs FLUSH TABLES
WITH READ LOCK, which is not really non-blocking).
I hope you can take a look at the patch (attached) when you get some time and
let me know what you think, and if you see any mistakes. I did it a little
differently from what we discussed, as I wanted to minimise the amount of work
done while holding the global mutex around commit_ordered().
I also pushed the patch here, in case you want to see or run the full code:
    lp:~maria-captains/maria/mariadb-5.1-mwl116-pbxt
It passes the test suite, but I did at one point see this in the log, which I
am not sure what means, maybe you can help?
    void XTTabCache::xt_tc_release_page(XTOpenFile*, XTTabCachePage*, XTThread*)(tabcache_xt.cc:409) page->tcp_lock_count > 0
Finally a couple of questions:
>>> In particular this, flushing the data log (is this flush to disk?):
>
>>>    if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
>>>            ok = FALSE;
>>>            status = XT_LOG_ENT_ABORT;
>>>    }
>
>>
>> Yes, this is a flush to disk.
>>
>> This could be done in the slow part (obviously this would be ideal).
>> If we do not flush the data log, then there is a chance that such a
>> commit transaction is incomplete, because the associated data log data
>> has not been committed.
This is done in commit, but I could not see where similar data log flush is
done in prepare(). It seems prepare() mostly adds a "prepare" record and
flushes the transaction log.
Is it correct that no data log flush happens in prepare? If so, don't we have
the same problem?
Suppose we prepare() in PBXT and write (and flush) the transaction into the
binary log. Then we crash. When the server comes back up, it will try to
recover the transaction inside PBXT, but that will not be possible if the data
log was lost due to no flush, right?
Final question:
In commit() we call xt_tab_restrict_rows(). It seems to be delayed checking
for defered foreign key constraints or something like that? If it is, then
shouldn't it be done in prepare() (it's wrong to rollback with error in
commit() after successful prepare)? I see the #ifdef XT_IMPLEMENT_NO_ACTION
around the call, so I suppose this code is not actually used, but I just
wondered ...
 - Kristian.
------------------------------------------------------------
revno: 2852
committer: knielsen@xxxxxxxxxxxxxxx
branch nick: work-5.1-pbxt-commit-ordered
timestamp: Fri 2010-10-15 15:42:06 +0200
message:
  MWL#116: Efficient group commit: PBXT part
  
  Implement the commit_ordered() API in PBXT, getting consistent commit ordering
  with other engines and binlog.
  
  Make pbxt_support_xa default in MariaDB debug build (as the bug that causes
  assert in MySQL is fixed in MariaDB).
diff:
=== modified file 'storage/pbxt/src/ha_pbxt.cc'
--- storage/pbxt/src/ha_pbxt.cc	2010-09-28 13:05:45 +0000
+++ storage/pbxt/src/ha_pbxt.cc	2010-10-15 13:42:06 +0000
@@ -108,6 +108,9 @@
 static int		pbxt_panic(handlerton *hton, enum ha_panic_function flag);
 static void		pbxt_drop_database(handlerton *hton, char *path);
 static int		pbxt_close_connection(handlerton *hton, THD* thd);
+#ifdef MARIADB_BASE_VERSION
+static void		pbxt_commit_ordered(handlerton *hton, THD *thd, bool all);
+#endif
 static int		pbxt_commit(handlerton *hton, THD *thd, bool all);
 static int		pbxt_rollback(handlerton *hton, THD *thd, bool all);
 static int		pbxt_prepare(handlerton *hton, THD *thd, bool all);
@@ -1147,6 +1150,9 @@
 		pbxt_hton->state = SHOW_OPTION_YES;
 		pbxt_hton->db_type = DB_TYPE_PBXT; // Wow! I have my own!
 		pbxt_hton->close_connection = pbxt_close_connection; /* close_connection, cleanup thread related data. */
+#ifdef MARIADB_BASE_VERSION
+		pbxt_hton->commit_ordered = pbxt_commit_ordered;
+#endif
 		pbxt_hton->commit = pbxt_commit; /* commit */
 		pbxt_hton->rollback = pbxt_rollback; /* rollback */
 		if (pbxt_support_xa) {
@@ -1484,6 +1490,29 @@
 	return err;
 }
 
+#ifdef MARIADB_BASE_VERSION
+/*
+ * Quickly commit the transaction to memory and make it visible to others.
+ * The remaining part of commit will happen later, in pbxt_commit().
+ */
+static void pbxt_commit_ordered(handlerton *hton, THD *thd, bool all)
+{
+	XTThreadPtr	self;
+
+	if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
+		XT_PRINT2(self, "%s pbxt_commit_ordered all=%d\n", all ? "END CONN XACT" : "END STAT", all);
+
+		if (self->st_xact_data) {
+			if (all || self->st_auto_commit) {
+				self->st_commit_ordered = TRUE;
+				self->st_writer = self->st_xact_writer;
+				self->st_delayed_error= !xt_xn_commit_fast(self, self->st_writer);
+			}
+		}
+	}
+}
+#endif
+
 /*
  * Commit the PBXT transaction of the given thread.
  * thd is the MySQL thread structure.
@@ -1512,7 +1541,13 @@
 			if (all || self->st_auto_commit) {
 				XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n");
 
-				if (!xt_xn_commit(self))
+				if (self->st_commit_ordered) {
+					self->st_commit_ordered = FALSE;
+					err = !xt_xn_commit_slow(self, self->st_writer) || self->st_delayed_error;
+				} else {
+					err = !xt_xn_commit(self);
+				}
+				if (err)
 					err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
 			}
 		}
@@ -6064,7 +6099,7 @@
 	NULL, NULL, 0, 0, 20000, 1);
 #endif
 
-#ifndef DEBUG
+#if !defined(DEBUG) || defined(MARIADB_BASE_VERSION)
 static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
 	PLUGIN_VAR_OPCMDARG,
 	"Enable PBXT support for the XA two-phase commit, default is enabled",
=== modified file 'storage/pbxt/src/thread_xt.h'
--- storage/pbxt/src/thread_xt.h	2010-05-05 10:59:57 +0000
+++ storage/pbxt/src/thread_xt.h	2010-10-15 13:42:06 +0000
@@ -299,6 +299,9 @@
 	xtBool					st_stat_ended;					/* TRUE if the statement was ended. */
 	xtBool					st_stat_trans;					/* TRUE if a statement transaction is running (started on UPDATE). */
 	xtBool					st_stat_modify;					/* TRUE if the statement is an INSERT/UPDATE/DELETE */
+	xtBool					st_commit_ordered;				/* TRUE if we have run commit_ordered() */
+	xtBool					st_delayed_error;				/* TRUE if we got an error in commit_ordered() */
+	xtBool					st_writer;						/* Copy of thread->st_xact_writer (which is clobbered by xlog_append()) */
 #ifdef XT_IMPLEMENT_NO_ACTION
 	XTBasicListRec			st_restrict_list;				/* These records have been deleted and should have no reference. */
 #endif
=== modified file 'storage/pbxt/src/xaction_xt.cc'
--- storage/pbxt/src/xaction_xt.cc	2010-09-28 13:05:45 +0000
+++ storage/pbxt/src/xaction_xt.cc	2010-10-15 13:42:06 +0000
@@ -1287,27 +1287,61 @@
 	return OK;
 }
 
-static xtBool xn_end_xact(XTThreadPtr thread, u_int status)
+static void xn_end_release_locks(XTThreadPtr thread)
+{
+	XTXactDataPtr	xact = thread->st_xact_data;
+	XTDatabaseHPtr	db = thread->st_database;
+	ASSERT_NS(xact);
+
+	/* {REMOVE-LOCKS} Drop locks if you have any: */
+	thread->st_lock_list.xt_remove_all_locks(db, thread);
+
+	/* Do this afterwards to make sure the sweeper
+	 * does not cleanup transactions start cleaning up
+	 * before any transactions that were waiting for
+	 * this transaction have completed!
+	 */
+	xact->xd_end_xn_id = db->db_xn_curr_id;
+
+	/* Now you can sweep! */
+	xact->xd_flags |= XT_XN_XAC_SWEEP;
+}
+
+/* The commit is split into two phases: one "fast" for MariaDB commit_ordered(),
+ * and one "slow" for commit(). When not using internal 2pc, there is only one
+ * call combining both phases.
+ */
+
+enum {
+	XN_END_PHASE_FAST = 1,
+	XN_END_PHASE_SLOW = 2,
+	XN_END_PHASE_BOTH = 3
+};
+
+static xtBool xn_end_xact(XTThreadPtr thread, u_int status, xtBool writer, int phase)
 {
 	XTXactDataPtr	xact;
 	xtBool			ok = TRUE;
+	xtBool			err;
 
 	ASSERT_NS(thread->st_xact_data);
 	if ((xact = thread->st_xact_data)) {
 		XTDatabaseHPtr	db = thread->st_database;
 		xtXactID		xn_id = xact->xd_start_xn_id;
-		xtBool			writer;
 		
-		if ((writer = thread->st_xact_writer)) {
+		if (writer) {
 			/* The transaction wrote something: */
 			XTXactEndEntryDRec	entry;
 			xtWord4				sum;
 
-			sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
-			entry.xe_status_1 = status;
-			entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
-			XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
-			XT_SET_DISK_4(entry.xe_not_used_4, 0);
+			if (phase & XN_END_PHASE_FAST)
+			{
+				sum = XT_CHECKSUM4_XACT(xn_id) ^ XT_CHECKSUM4_XACT(0);
+				entry.xe_status_1 = status;
+				entry.xe_checksum_1 = XT_CHECKSUM_1(sum);
+				XT_SET_DISK_4(entry.xe_xact_id_4, xn_id);
+				XT_SET_DISK_4(entry.xe_not_used_4, 0);
+			}
 
 #ifdef XT_IMPLEMENT_NO_ACTION
 			/* This will check any resticts that have been delayed to the end of the statement. */
@@ -1319,20 +1353,35 @@
 			}
 #endif
 
-			/* Flush the data log: */
-			if (!thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
+			/* Flush the data log (in the "fast" case we already did it in prepare: */
+			if ((phase & XN_END_PHASE_SLOW) && !thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
 				ok = FALSE;
 				status = XT_LOG_ENT_ABORT;
 			}
 
 			/* Write and flush the transaction log: */
-			if (!xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit)) {
+			if (phase == XN_END_PHASE_FAST) {
+				/* Fast phase, delay any write or flush to later. */
+				err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, XT_XLOG_NO_WRITE_NO_FLUSH);
+			} else if (phase == XN_END_PHASE_SLOW) {
+				/* We already appended the commit record in the fast phase.
+				 * Now just call with empty record to ensure we write/flush
+				 * the log as needed for this commit.
+				 */
+				err = !xt_xlog_log_data(thread, 0, NULL, xt_db_flush_log_at_trx_commit);
+			} else /* phase == XN_END_PHASE_BOTH */ {
+				/* Both phases at once, append commit record and write/flush normally. */
+				ASSERT_NS(phase == XN_END_PHASE_BOTH);
+				err = !xt_xlog_log_data(thread, sizeof(XTXactEndEntryDRec), (XTXactLogBufferDPtr) &entry, xt_db_flush_log_at_trx_commit);
+			}
+
+			if (err) {
 				ok = FALSE;
 				status = XT_LOG_ENT_ABORT;
 				/* Make sure this is done, if we failed to log
 				 * the transction end!
 				 */
-				if (thread->st_xact_writer) {
+				if (writer) {
 					/* Adjust this in case of error, but don't forget
 					 * to lock!
 					 */
@@ -1347,46 +1396,46 @@
 				}
 			}
 
-			/* Setting this flag completes the transaction,
-			 * Do this before we release the locks, because
-			 * the unlocked transactions expect the
-			 * transaction they are waiting for to be
-			 * gone!
+			if (phase & XN_END_PHASE_FAST) {
+				/* Setting this flag completes the transaction,
+				 * Do this before we release the locks, because
+				 * the unlocked transactions expect the
+				 * transaction they are waiting for to be
+				 * gone!
+				 */
+				xact->xd_end_time = ++db->db_xn_end_time;
+				if (status == XT_LOG_ENT_COMMIT) {
+					thread->st_statistics.st_commits++;
+					xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
+				}
+				else {
+					thread->st_statistics.st_rollbacks++;
+					xact->xd_flags |= XT_XN_XAC_ENDED;
+				}
+			}
+
+			/* Be as fast as possible in the "fast" path, as we want to be as
+			 * fast as possible here (we will release slow locks immediately
+			 * after in the "slow" part).
+			 * ToDo: If we ran the fast part, the slow part could release locks
+			 * _before_ fsync(), rather than after.
 			 */
-			xact->xd_end_time = ++db->db_xn_end_time;
-			if (status == XT_LOG_ENT_COMMIT) {
-				thread->st_statistics.st_commits++;
+			if (!(phase & XN_END_PHASE_SLOW))
+				return ok;
+
+			xn_end_release_locks(thread);
+		}
+		else {
+			/* Read-only transaction can be removed, immediately */
+			if (phase & XN_END_PHASE_FAST) {
+				xact->xd_end_time = ++db->db_xn_end_time;
 				xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-			}
-			else {
-				thread->st_statistics.st_rollbacks++;
-				xact->xd_flags |= XT_XN_XAC_ENDED;
-			}
-
-			/* {REMOVE-LOCKS} Drop locks is you have any: */
-			thread->st_lock_list.xt_remove_all_locks(db, thread);
-
-			/* Do this afterwards to make sure the sweeper
-			 * does not cleanup transactions start cleaning up
-			 * before any transactions that were waiting for
-			 * this transaction have completed!
-			 */
-			xact->xd_end_xn_id = db->db_xn_curr_id;
-
-			/* Now you can sweep! */
-			xact->xd_flags |= XT_XN_XAC_SWEEP;
-		}
-		else {
-			/* Read-only transaction can be removed, immediately */
-			xact->xd_end_time = ++db->db_xn_end_time;
-			xact->xd_flags |= (XT_XN_XAC_COMMITTED | XT_XN_XAC_ENDED);
-
-			/* Drop locks is you have any: */
-			thread->st_lock_list.xt_remove_all_locks(db, thread);
-
-			xact->xd_end_xn_id = db->db_xn_curr_id;
-
-			xact->xd_flags |= XT_XN_XAC_SWEEP;
+
+				if (!(phase & XN_END_PHASE_SLOW))
+					return ok;
+			}
+
+			xn_end_release_locks(thread);
 
 			if (xt_xn_delete_xact(db, xn_id, thread)) {
 				if (db->db_xn_min_ram_id == xn_id)
@@ -1478,12 +1527,22 @@
 
 xtPublic xtBool xt_xn_commit(XTThreadPtr thread)
 {
-	return xn_end_xact(thread, XT_LOG_ENT_COMMIT);
+	return xn_end_xact(thread, XT_LOG_ENT_COMMIT, thread->st_xact_writer, XN_END_PHASE_BOTH);
+}
+
+xtPublic xtBool xt_xn_commit_fast(XTThreadPtr thread, xtBool writer)
+{
+	return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_FAST);
+}
+
+xtPublic xtBool xt_xn_commit_slow(XTThreadPtr thread, xtBool writer)
+{
+	return xn_end_xact(thread, XT_LOG_ENT_COMMIT, writer, XN_END_PHASE_SLOW);
 }
 
 xtPublic xtBool xt_xn_rollback(XTThreadPtr thread)
 {
-	return xn_end_xact(thread, XT_LOG_ENT_ABORT);
+	return xn_end_xact(thread, XT_LOG_ENT_ABORT, thread->st_xact_writer, XN_END_PHASE_BOTH);
 }
 
 xtPublic xtBool xt_xn_log_tab_id(XTThreadPtr self, xtTableID tab_id)
=== modified file 'storage/pbxt/src/xaction_xt.h'
--- storage/pbxt/src/xaction_xt.h	2010-05-05 10:59:57 +0000
+++ storage/pbxt/src/xaction_xt.h	2010-10-15 13:42:06 +0000
@@ -193,6 +193,8 @@
 
 xtBool			xt_xn_begin(struct XTThread *self);
 xtBool			xt_xn_commit(struct XTThread *self);
+xtBool			xt_xn_commit_fast(struct XTThread *self, xtBool writer);
+xtBool			xt_xn_commit_slow(struct XTThread *self, xtBool writer);
 xtBool			xt_xn_rollback(struct XTThread *self);
 xtBool			xt_xn_log_tab_id(struct XTThread *self, xtTableID tab_id);
 int				xt_xn_status(struct XTOpenTable *ot, xtXactID xn_id, xtRecordID rec_id);
=== added file 'tests/consistent_snapshot.pl'
--- tests/consistent_snapshot.pl	1970-01-01 00:00:00 +0000
+++ tests/consistent_snapshot.pl	2010-10-15 13:42:06 +0000
@@ -0,0 +1,107 @@
+#! /usr/bin/perl
+
+# Test START TRANSACTION WITH CONSISTENT SNAPSHOT.
+# With MWL#116, this is implemented so it is actually consistent.
+
+use strict;
+use warnings;
+
+use DBI;
+
+my $UPDATERS= 10;
+my $READERS= 5;
+
+my $ROWS= 50;
+my $DURATION= 20;
+
+my $stop_time= time() + $DURATION;
+
+sub my_connect {
+  my $dbh= DBI->connect("dbi:mysql:mysql_socket=/tmp/mysql.sock;database=test",
+                        "root", undef, { RaiseError=>1, PrintError=>0, AutoCommit=>0});
+  $dbh->do("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+  $dbh->do("SET SESSION autocommit = 0");
+  return $dbh;
+}
+
+sub my_setup {
+  my $dbh= my_connect();
+
+  $dbh->do("DROP TABLE IF EXISTS test_consistent_snapshot1, test_consistent_snapshot2");
+  $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot1 (
+  a INT PRIMARY KEY,
+  b INT NOT NULL
+) ENGINE=InnoDB
+TABLE
+  $dbh->do(<<TABLE);
+CREATE TABLE test_consistent_snapshot2(
+  a INT PRIMARY KEY,
+  b INT NOT NULL
+) ENGINE=PBXT
+TABLE
+
+  for (my $i= 0; $i < $ROWS; $i++) {
+    my $value= int(rand()*1000);
+    $dbh->do("INSERT INTO test_consistent_snapshot1 VALUES (?, ?)", undef,
+             $i, $value);
+    $dbh->do("INSERT INTO test_consistent_snapshot2 VALUES (?, ?)", undef,
+             $i, -$value);
+  }
+  $dbh->commit();
+  $dbh->disconnect();
+}
+
+sub my_updater {
+  my $dbh= my_connect();
+
+  while (time() < $stop_time) {
+    my $i1= int(rand()*$ROWS);
+    my $i2= int(rand()*$ROWS);
+    my $v= int(rand()*99)-49;
+    $dbh->do("UPDATE test_consistent_snapshot1 SET b = b + ? WHERE a = ?",
+             undef, $v, $i1);
+    $dbh->do("UPDATE test_consistent_snapshot2 SET b = b - ? WHERE a = ?",
+             undef, $v, $i2);
+    $dbh->commit();
+  }
+
+  $dbh->disconnect();
+  exit(0);
+}
+
+sub my_reader {
+  my $dbh= my_connect();
+
+  my $iteration= 0;
+  while (time() < $stop_time) {
+    $dbh->do("START TRANSACTION WITH CONSISTENT SNAPSHOT");
+    my $s1= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot1");
+    $s1= $s1->[0];
+    my $s2= $dbh->selectrow_arrayref("SELECT SUM(b) FROM test_consistent_snapshot2");
+    $s2= $s2->[0];
+    $dbh->commit();
+    if ($s1 + $s2 != 0) {
+      print STDERR "Found inconsistency, s1=$s1 s2=$s2 iteration=$iteration\n";
+      last;
+    }
+    ++$iteration;
+  }
+
+  $dbh->disconnect();
+  exit(0);
+}
+
+my_setup();
+
+for (1 .. $UPDATERS) {
+  fork() || my_updater();
+}
+
+for (1 .. $READERS) {
+  fork() || my_reader();
+}
+
+waitpid(-1, 0) for (1 .. ($UPDATERS + $READERS));
+
+print "All checks done\n";
Follow ups
References