← Back to team overview

maria-developers team mailing list archive

bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2774)

 

#At lp:maria

 2774 knielsen@xxxxxxxxxxxxxxx	2009-12-02 [merge]
      Merge PBXT 1.0.09f RC3 into MariaDB.
      added:
        storage/pbxt/src/backup_xt.cc
        storage/pbxt/src/backup_xt.h
      modified:
        sql/handler.cc
        storage/pbxt/ChangeLog
        storage/pbxt/src/Makefile.am
        storage/pbxt/src/cache_xt.cc
        storage/pbxt/src/cache_xt.h
        storage/pbxt/src/database_xt.cc
        storage/pbxt/src/database_xt.h
        storage/pbxt/src/datadic_xt.cc
        storage/pbxt/src/datalog_xt.cc
        storage/pbxt/src/discover_xt.cc
        storage/pbxt/src/filesys_xt.cc
        storage/pbxt/src/filesys_xt.h
        storage/pbxt/src/ha_pbxt.cc
        storage/pbxt/src/ha_pbxt.h
        storage/pbxt/src/ha_xtsys.h
        storage/pbxt/src/heap_xt.cc
        storage/pbxt/src/index_xt.cc
        storage/pbxt/src/lock_xt.cc
        storage/pbxt/src/locklist_xt.h
        storage/pbxt/src/memory_xt.cc
        storage/pbxt/src/myxt_xt.cc
        storage/pbxt/src/myxt_xt.h
        storage/pbxt/src/pbms.h
        storage/pbxt/src/pbms_enabled.cc
        storage/pbxt/src/pbms_enabled.h
        storage/pbxt/src/pthread_xt.cc
        storage/pbxt/src/restart_xt.cc
        storage/pbxt/src/restart_xt.h
        storage/pbxt/src/strutil_xt.cc
        storage/pbxt/src/systab_xt.cc
        storage/pbxt/src/tabcache_xt.cc
        storage/pbxt/src/tabcache_xt.h
        storage/pbxt/src/table_xt.cc
        storage/pbxt/src/table_xt.h
        storage/pbxt/src/thread_xt.cc
        storage/pbxt/src/thread_xt.h
        storage/pbxt/src/util_xt.cc
        storage/pbxt/src/util_xt.h
        storage/pbxt/src/xaction_xt.cc
        storage/pbxt/src/xaction_xt.h
        storage/pbxt/src/xactlog_xt.cc
        storage/pbxt/src/xactlog_xt.h
        storage/pbxt/src/xt_config.h
        storage/pbxt/src/xt_defs.h
        storage/pbxt/src/xt_errno.h

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2009-11-16 20:49:51 +0000
+++ b/sql/handler.cc	2009-12-02 11:50:40 +0000
@@ -1584,16 +1584,6 @@ int ha_recover(HASH *commit_list)
   if (info.commit_list)
     sql_print_information("Starting crash recovery...");
 
-#ifndef WILL_BE_DELETED_LATER
-  /*
-    for now, only InnoDB supports 2pc. It means we can always safely
-    rollback all pending transactions, without risking inconsistent data
-  */
-  DBUG_ASSERT(total_ha_2pc == (ulong) opt_bin_log+1); // only InnoDB and binlog
-  tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
-  info.dry_run=FALSE;
-#endif
-
   for (info.len= MAX_XID_LIST_SIZE ; 
        info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2)
   {

=== modified file 'storage/pbxt/ChangeLog'
--- a/storage/pbxt/ChangeLog	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/ChangeLog	2009-12-01 09:50:46 +0000
@@ -1,6 +1,58 @@
 PBXT Release Notes
 ==================
 
+------- 1.0.09f RC3 - 2009-11-30
+
+RN291: Fixed bug #489088: On shutdown MySQL reports: [Warning] Plugin 'PBXT' will be forced to shutdown.
+
+RN290: Fixed bug #345524: pbxt does not compile on 64 bit windows. Currently atomic operations are not supported on this platform.
+
+RN286: Fixed a bug introduced in RN281, which could cause an index scan to hang. The original change was to prevent a warning in Valgrind.
+
+RN285: Merged changes required to compile with Drizzle.
+
+RN284: Fixed bug that cause the error "[ERROR] Invalid (old?) table or database name 'mysqld.1'", when running temp_table.test under MariaDB (thanks to Monty for his initial bug fix). Added a fix for partition table names as well.
+
+RN283: Added win_inttypes.h to the distribution. This file is only required for the Windows build.
+
+RN282: Fixed bug #451101: jump or move depends on uninitialised value in myxt_get_key_length
+
+RN281: Fixed bug #451080: Uninitialised memory write in XTDatabaseLog::xlog_append
+
+RN280: Fixed bug #451085: jump or move depends on uninitialised value in my_type_to_string
+
+RN279: Fixed bug #441000: xtstat crashes with segmentation fault on startup if max_pbxt_threads exceeded.
+
+------- 1.0.09e RC3 - 2009-11-20
+
+RN278: Fixed compile error with MySQL 5.1.41.
+
+------- 1.0.09d RC3 - 2009-09-30
+
+RN277: Added r/o flag to pbxt_max_threads server variable (this fix is related to bug #430637)
+
+RN276: Added test case for replication on tables w/o PKs (see bug #430716)
+
+RN275: Fixed bug #430600: 'Failed to read auto-increment value from storage engine' error.
+
+RN274: Fixed bug #431240: This report is public edit xtstat fails if no PBXT table has been created. xtstat now accepts --database=information_schema or --database=pbxt. Depending on this setting PBXT will either use the information_schema.pbxt_statistics or the pbxt.statistics table. If information_schema is used, then the statistics are displayed even when no PBXT table exists. Recovery activity is also displayed, unless pbxt_support_xa=1, in which case MySQL will wait for PBXT recovery to complete before allowing connections. 
+
+RN273: Fixed bug #430633: XA_RBDEADLOCK is not returned on XA END after the transacting ended with a deadlock.
+
+RN272: Fixed bug #430596: Backup/restore does not work well even on a basic PBXT table with auto-increment.
+
+------- 1.0.09c RC3 - 2009-09-16
+
+RN271: Windows build update: now you can simply put the pbxt directory under <mysql-root>/storage and build the PBXT engine as a part of the source tree. The engine will be linked statically. Be sure to specify the WITH_PBXT_STORAGE_ENGINE option when running win\configure.js
+
+RN270: Correctly disabled PBMS so that this version now compiles under Windows. If PBMS_ENABLED is defined, PBXT will not compile under Windows becaause of a getpid() call in pbms.h.
+
+------- 1.0.09 RC3 - 2009-09-09
+
+RN269: Implemented online backup. A native online backup driver now performs BACKUP and RESTORE DATABASE operations for PBXT. NOTE: This feature is only supported by MySQL 6.0.9 or later.
+
+RN268: Implemented XA support. PBXT now supports all XA related MySQL statements. The variable pbxt_support_xa determines if XA support is enabled. Note: due to MySQL bug #47134, enabling XA support could lead to a crash. 
+
 ------- 1.0.08d RC2 - 2009-09-02
 
 RN267: Fixed a bug that caused MySQL to crash on shutdown, after an incorrect command line parameter was given. The crash occurred because the background recovery task was not cleaned up before the PBXT engine was de-initialized.

=== modified file 'storage/pbxt/src/Makefile.am'
--- a/storage/pbxt/src/Makefile.am	2009-10-07 07:40:56 +0000
+++ b/storage/pbxt/src/Makefile.am	2009-11-24 10:55:06 +0000
@@ -22,7 +22,7 @@ noinst_HEADERS =		bsearch_xt.h cache_xt.
 						pbms_enabled.h sortedlist_xt.h strutil_xt.h \
 						tabcache_xt.h table_xt.h trace_xt.h thread_xt.h \
 						util_xt.h xaction_xt.h xactlog_xt.h lock_xt.h \
-						systab_xt.h ha_xtsys.h discover_xt.h \
+						systab_xt.h ha_xtsys.h discover_xt.h backup_xt.h \
 						pbms.h xt_config.h xt_defs.h xt_errno.h locklist_xt.h
 EXTRA_LTLIBRARIES =	libpbxt.la
 
@@ -32,7 +32,7 @@ libpbxt_la_SOURCES =	bsearch_xt.cc cache
 						memory_xt.cc myxt_xt.cc pthread_xt.cc restart_xt.cc \
 						sortedlist_xt.cc strutil_xt.cc \
 						tabcache_xt.cc table_xt.cc trace_xt.cc thread_xt.cc \
-						systab_xt.cc ha_xtsys.cc discover_xt.cc \
+						systab_xt.cc ha_xtsys.cc discover_xt.cc backup_xt.cc \
 						util_xt.cc xaction_xt.cc xactlog_xt.cc lock_xt.cc locklist_xt.cc
 
 libpbxt_la_LDFLAGS =	-module

=== added file 'storage/pbxt/src/backup_xt.cc'
--- a/storage/pbxt/src/backup_xt.cc	1970-01-01 00:00:00 +0000
+++ b/storage/pbxt/src/backup_xt.cc	2009-11-24 10:55:06 +0000
@@ -0,0 +1,802 @@
+/* Copyright (c) 2009 PrimeBase Technologies GmbH
+ *
+ * PrimeBase XT
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * 2009-09-07	Paul McCullagh
+ *
+ * H&G2JCtL
+ */
+
+#include "xt_config.h"
+
+#ifdef MYSQL_SUPPORTS_BACKUP
+
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <ctype.h>
+
+#include "mysql_priv.h"
+#include <backup/api_types.h>
+#include <backup/backup_engine.h>
+#include <backup/backup_aux.h>         // for build_table_list()
+#include <hash.h>
+
+#include "ha_pbxt.h"
+
+#include "backup_xt.h"
+#include "pthread_xt.h"
+#include "filesys_xt.h"
+#include "database_xt.h"
+#include "strutil_xt.h"
+#include "memory_xt.h"
+#include "trace_xt.h"
+#include "myxt_xt.h"
+
+#ifdef OK
+#undef OK
+#endif
+
+#ifdef byte
+#undef byte
+#endif
+
+#ifdef DEBUG
+//#define TRACE_BACKUP_CALLS
+//#define TEST_SMALL_BLOCK			100000
+#endif
+
+using backup::byte;
+using backup::result_t;
+using backup::version_t;
+using backup::Table_list;
+using backup::Table_ref;
+using backup::Buffer;
+
+#ifdef TRACE_BACKUP_CALLS
+#define XT_TRACE_CALL()				ha_trace_function(__FUNC__, NULL)
+#else
+#define XT_TRACE_CALL()
+#endif
+
+#define XT_RESTORE_BATCH_SIZE		10000
+
+#define BUP_STATE_BEFORE_LOCK		0
+#define BUP_STATE_AFTER_LOCK		1
+
+#define BUP_STANDARD_VAR_RECORD		1
+#define BUP_RECORD_BLOCK_4_START	2			// Part of a record, with a 4 byte total length, and 4 byte data length
+#define BUP_RECORD_BLOCK_4			3			// Part of a record, with a 4 byte length
+#define BUP_RECORD_BLOCK_4_END		4			// Last part of a record with a 4 byte length
+
+/*
+ * -----------------------------------------------------------------------
+ * UTILITIES
+ */
+
+#ifdef TRACE_BACKUP_CALLS
+static void ha_trace_function(const char *function, char *table)
+{
+	char		func_buf[50], *ptr;
+	XTThreadPtr	thread = xt_get_self(); 
+	
+	if ((ptr = strchr(function, '('))) {
+		ptr--;
+		while (ptr > function) {
+			if (!(isalnum(*ptr) || *ptr == '_'))
+				break;
+			ptr--;
+		}
+		ptr++;
+		xt_strcpy(50, func_buf, ptr);
+		if ((ptr = strchr(func_buf, '(')))
+			*ptr = 0;
+	}
+	else
+		xt_strcpy(50, func_buf, function);
+	if (table)
+		printf("%s %s (%s)\n", thread ? thread->t_name : "-unknown-", func_buf, table);
+	else
+		printf("%s %s\n", thread ? thread->t_name : "-unknown-", func_buf);
+}
+#endif
+
+/*
+ * -----------------------------------------------------------------------
+ * BACKUP DRIVER
+ */
+
+class PBXTBackupDriver: public Backup_driver
+{
+	public:
+	PBXTBackupDriver(const Table_list &);
+	virtual ~PBXTBackupDriver();
+
+	virtual size_t		size();
+	virtual size_t		init_size();
+	virtual result_t	begin(const size_t);
+	virtual result_t	end();
+	virtual result_t	get_data(Buffer &);
+	virtual result_t	prelock();
+	virtual result_t	lock();
+	virtual result_t	unlock();
+	virtual result_t	cancel();
+	virtual void		free();
+	void				lock_tables_TL_READ_NO_INSERT();
+
+	private:
+	XTThreadPtr		bd_thread;
+	int				bd_state;
+	u_int			bd_table_no;
+	XTOpenTablePtr	bd_ot;
+	xtWord1			*bd_row_buf;
+
+	/* Non-zero if we last returned only part of
+	 * a row.
+	 */
+	xtWord1			*db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 row_len);
+	xtWord1			*db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *size, xtWord4 total_len, xtWord4 row_len);
+
+	xtWord4			bd_row_offset;
+	xtWord4			bd_row_size;
+};
+
+
+PBXTBackupDriver::PBXTBackupDriver(const Table_list &tables):
+Backup_driver(tables),
+bd_state(BUP_STATE_BEFORE_LOCK),
+bd_table_no(0),
+bd_ot(NULL),
+bd_row_buf(NULL),
+bd_row_offset(0),
+bd_row_size(0)
+{
+}
+
+PBXTBackupDriver::~PBXTBackupDriver()
+{
+}
+
+/** Estimates total size of backup. @todo improve it */
+size_t PBXTBackupDriver::size()
+{
+	XT_TRACE_CALL();
+	return UNKNOWN_SIZE;
+}
+
+/** Estimates size of backup before lock. @todo improve it */
+size_t PBXTBackupDriver::init_size()
+{
+	XT_TRACE_CALL();
+	return 0;
+}
+
+result_t PBXTBackupDriver::begin(const size_t)
+{
+	THD				*thd = current_thd;
+	XTExceptionRec	e;
+
+	XT_TRACE_CALL();
+	
+	if (!(bd_thread = xt_ha_set_current_thread(thd, &e))) {
+		xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
+		return backup::ERROR;
+	}
+	
+	return backup::OK;
+}
+
+result_t PBXTBackupDriver::end()
+{
+	XT_TRACE_CALL();
+	if (bd_ot) {
+		xt_tab_seq_exit(bd_ot);
+		xt_db_return_table_to_pool_ns(bd_ot);
+		bd_ot = NULL;
+	}
+	if (bd_thread->st_xact_data) {
+		if (!xt_xn_commit(bd_thread))
+			return backup::ERROR;
+	}
+	return backup::OK;
+}
+
+xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 row_len)
+{
+	register size_t size = *ret_size;
+
+	*buffer = bup_type;	// Record type identifier.
+	buffer++;
+	size--;
+	memcpy(buffer, bd_ot->ot_row_wbuffer, row_len);
+	buffer += row_len;
+	size -= row_len;
+	*ret_size = size;
+	return buffer;
+}
+
+xtWord1 *PBXTBackupDriver::db_write_block(xtWord1 *buffer, xtWord1 bup_type, size_t *ret_size, xtWord4 total_len, xtWord4 row_len)
+{
+	register size_t size = *ret_size;
+
+	*buffer = bup_type;	// Record type identifier.
+	buffer++;
+	size--;
+	if (bup_type == BUP_RECORD_BLOCK_4_START) {
+		XT_SET_DISK_4(buffer, total_len);
+		buffer += 4;
+		size -= 4;
+	}
+	XT_SET_DISK_4(buffer, row_len);
+	buffer += 4;
+	size -= 4;
+	memcpy(buffer, bd_ot->ot_row_wbuffer+bd_row_offset, row_len);
+	buffer += row_len;
+	size -= row_len;
+	bd_row_size -= row_len;
+	bd_row_offset += row_len;
+	*ret_size = size;
+	return buffer;
+}
+
+result_t PBXTBackupDriver::get_data(Buffer &buf)
+{
+	xtBool	eof = FALSE;
+	size_t	size;
+	xtWord4	row_len;
+	xtWord1	*buffer;
+
+	XT_TRACE_CALL();
+
+	if (bd_state == BUP_STATE_BEFORE_LOCK) {
+		buf.table_num = 0;
+		buf.size = 0;
+		buf.last = FALSE;
+		return backup::READY;
+	}
+
+	/* Open the backup table: */
+	if (!bd_ot) {
+		XTThreadPtr		self = bd_thread;
+		XTTableHPtr		tab;
+		char			path[PATH_MAX];
+	
+		if (bd_table_no == m_tables.count()) {
+			buf.size = 0;
+			buf.table_num = 0;
+			buf.last = TRUE;
+			return backup::DONE;
+		}
+		
+		m_tables[bd_table_no].internal_name(path, sizeof(path));
+		bd_table_no++;
+		try_(a)	{
+			xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
+			tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL);
+			pushr_(xt_heap_release, tab);
+			if (!(bd_ot = xt_db_open_table_using_tab(tab, bd_thread)))
+				xt_throw(self);
+			freer_(); // xt_heap_release(tab)
+
+			/* Prepare the seqential scan: */
+			xt_tab_seq_exit(bd_ot);
+			if (!xt_tab_seq_init(bd_ot))
+				xt_throw(self);
+			
+			if (bd_row_buf) {
+				xt_free(self, bd_row_buf);
+				bd_row_buf = NULL;
+			}
+			bd_row_buf = (xtWord1 *) xt_malloc(self, bd_ot->ot_table->tab_dic.dic_mysql_buf_size);
+			bd_ot->ot_cols_req = bd_ot->ot_table->tab_dic.dic_no_of_cols;
+		}
+		catch_(a) {
+			;
+		}
+		cont_(a);
+
+		if (!bd_ot)
+			goto failed;
+	}
+
+	buf.table_num = bd_table_no;
+#ifdef TEST_SMALL_BLOCK
+	buf.size = TEST_SMALL_BLOCK;
+#endif
+	size = buf.size;
+	buffer = (xtWord1 *) buf.data;
+	ASSERT_NS(size > 9);
+
+	/* First check of a record was partically written
+	 * last time.
+	 */
+	write_row:
+	if (bd_row_size > 0) {
+		row_len = bd_row_size;
+		if (bd_row_offset == 0) {
+			if (row_len+1 > size) {
+				ASSERT_NS(size > 9);
+				row_len = size - 9;
+				buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_START, &size, bd_row_size, row_len);
+				goto done;
+			}
+			buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
+			bd_row_size = 0;
+		}
+		else {
+			if (row_len+5 > size) {
+				row_len = size - 5;
+				buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4, &size, 0, row_len);
+				goto done;
+			}
+			buffer = db_write_block(buffer, BUP_RECORD_BLOCK_4_END, &size, 0, row_len);
+		}
+	}
+
+	/* Now continue with the sequential scan. */
+	while (size > 1) {
+		if (!xt_tab_seq_next(bd_ot, bd_row_buf, &eof))
+			goto failed;
+		if (eof) {
+			/* We will go the next table, on the next call. */
+			xt_tab_seq_exit(bd_ot);
+			xt_db_return_table_to_pool_ns(bd_ot);
+			bd_ot = NULL;
+			break;
+		}
+		if (!(row_len = myxt_store_row_data(bd_ot, 0, (char *) bd_row_buf)))
+			goto failed;
+		if (row_len+1 > size) {
+			/* Does not fit: */
+			bd_row_offset = 0;
+			bd_row_size = row_len;
+			/* Only add part of the row, if there is still
+			 * quite a bit of space left:
+			 */
+			if (size >= (32 * 1024))
+				goto write_row;
+			break;
+		}
+		buffer = db_write_block(buffer, BUP_STANDARD_VAR_RECORD, &size, row_len);
+	}
+
+	done:
+	buf.size = buf.size - size;
+	/* This indicates wnd of data for a table! */
+    buf.last = eof;
+
+	return backup::OK;
+
+	failed:
+	xt_log_and_clear_exception(bd_thread);
+	return backup::ERROR;
+}
+
+result_t PBXTBackupDriver::prelock()
+{
+	XT_TRACE_CALL();
+	return backup::READY;
+}
+
+result_t PBXTBackupDriver::lock()
+{
+	XT_TRACE_CALL();
+	bd_thread->st_xact_mode = XT_XACT_COMMITTED_READ;
+	bd_thread->st_ignore_fkeys = FALSE;
+	bd_thread->st_auto_commit = FALSE;
+	bd_thread->st_table_trans = FALSE;
+	bd_thread->st_abort_trans = FALSE;
+	bd_thread->st_stat_ended = FALSE;
+	bd_thread->st_stat_trans = FALSE;
+	bd_thread->st_is_update = FALSE;
+	if (!xt_xn_begin(bd_thread))
+		return backup::ERROR;
+	bd_state = BUP_STATE_AFTER_LOCK;
+	return backup::OK;
+}
+
+result_t PBXTBackupDriver::unlock()
+{
+	XT_TRACE_CALL();
+	return backup::OK;
+}
+
+result_t PBXTBackupDriver::cancel()
+{
+	XT_TRACE_CALL();
+	return backup::OK; // free() will be called and suffice
+}
+
+void PBXTBackupDriver::free()
+{
+	XT_TRACE_CALL();
+	if (bd_ot) {
+		xt_tab_seq_exit(bd_ot);
+		xt_db_return_table_to_pool_ns(bd_ot);
+		bd_ot = NULL;
+	}
+	if (bd_row_buf) {
+		xt_free_ns(bd_row_buf);
+		bd_row_buf = NULL;
+	}
+	if (bd_thread->st_xact_data)
+		xt_xn_rollback(bd_thread);
+	delete this;
+}
+
+void PBXTBackupDriver::lock_tables_TL_READ_NO_INSERT()
+{
+	XT_TRACE_CALL();
+}
+
+/*
+ * -----------------------------------------------------------------------
+ * BACKUP DRIVER
+ */
+
+class PBXTRestoreDriver: public Restore_driver
+{
+	public:
+	PBXTRestoreDriver(const Table_list &tables);
+	virtual ~PBXTRestoreDriver();
+
+	virtual result_t  begin(const size_t);
+	virtual result_t  end();
+	virtual result_t  send_data(Buffer &buf);
+	virtual result_t  cancel();
+	virtual void      free();
+	
+	private:
+	XTThreadPtr		rd_thread;
+	u_int			rd_table_no;
+	XTOpenTablePtr	rd_ot;
+	STRUCT_TABLE	*rd_my_table;
+	xtWord1			*rb_row_buf;
+	u_int			rb_col_cnt;
+	u_int			rb_insert_count;
+
+	/* Long rows are accumulated here: */
+	xtWord4			rb_row_len;
+	xtWord4			rb_data_size;
+	xtWord1			*rb_row_data;
+};
+
+PBXTRestoreDriver::PBXTRestoreDriver(const Table_list &tables):
+Restore_driver(tables),
+rd_thread(NULL),
+rd_table_no(0),
+rd_ot(NULL),
+rb_row_buf(NULL),
+rb_row_len(0),
+rb_data_size(0),
+rb_row_data(NULL)
+{
+}
+
+PBXTRestoreDriver::~PBXTRestoreDriver()
+{
+}
+
+result_t PBXTRestoreDriver::begin(const size_t)
+{
+	THD				*thd = current_thd;
+	XTExceptionRec	e;
+	
+	XT_TRACE_CALL();
+	
+	if (!(rd_thread = xt_ha_set_current_thread(thd, &e))) {
+		xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
+		return backup::ERROR;
+	}
+	
+	return backup::OK;
+}
+
+result_t PBXTRestoreDriver::end()
+{
+	XT_TRACE_CALL();
+	if (rd_ot) {
+		xt_db_return_table_to_pool_ns(rd_ot);
+		rd_ot = NULL;
+	}
+	//if (rb_row_buf) {
+	//	xt_free_ns(rb_row_buf);
+	//	rb_row_buf = NULL;
+	//}
+	if (rb_row_data) {
+		xt_free_ns(rb_row_data);
+		rb_row_data = NULL;
+	}
+	if (rd_thread->st_xact_data) {
+		if (!xt_xn_commit(rd_thread))
+			return backup::ERROR;
+	}
+	return backup::OK;
+}
+
+
+result_t PBXTRestoreDriver::send_data(Buffer &buf)
+{
+	size_t	size;
+	xtWord1	type;
+	xtWord1	*buffer;
+	xtWord4	row_len;
+	xtWord1 *rec_data;
+
+	XT_TRACE_CALL();
+
+	if (buf.table_num != rd_table_no) {
+		XTThreadPtr		self = rd_thread;
+		XTTableHPtr		tab;
+		char			path[PATH_MAX];
+		
+		if (rd_ot) {
+			xt_db_return_table_to_pool_ns(rd_ot);
+			rd_ot = NULL;
+		}
+
+		if (rd_thread->st_xact_data) {
+			if (!xt_xn_commit(rd_thread))
+				goto failed;
+		}
+		if (!xt_xn_begin(rd_thread))
+			goto failed;
+		rb_insert_count = 0;
+		
+		rd_table_no = buf.table_num;
+		m_tables[rd_table_no-1].internal_name(path, sizeof(path));
+		try_(a)	{
+			xt_ha_open_database_of_table(self, (XTPathStrPtr) path);
+			tab = xt_use_table(self, (XTPathStrPtr) path, FALSE, FALSE, NULL);
+			pushr_(xt_heap_release, tab);
+			if (!(rd_ot = xt_db_open_table_using_tab(tab, rd_thread)))
+				xt_throw(self);
+			freer_(); // xt_heap_release(tab)
+
+			rd_my_table = rd_ot->ot_table->tab_dic.dic_my_table;
+			if (rd_my_table->found_next_number_field) {
+				rd_my_table->in_use = current_thd;
+				rd_my_table->next_number_field = rd_my_table->found_next_number_field;
+				rd_my_table->mark_columns_used_by_index_no_reset(rd_my_table->s->next_number_index, rd_my_table->read_set);
+			}
+
+			/* This is safe because only one thread can restore a table at 
+			 * a time!
+			 */
+			rb_row_buf = (xtWord1 *) rd_my_table->record[0];
+			//if (rb_row_buf) {
+			//	xt_free(self, rb_row_buf);
+			//	rb_row_buf = NULL;
+			//}
+			//rb_row_buf = (xtWord1 *) xt_malloc(self, rd_ot->ot_table->tab_dic.dic_mysql_buf_size);
+	
+			rb_col_cnt = rd_ot->ot_table->tab_dic.dic_no_of_cols;
+
+		}
+		catch_(a) {
+			;
+		}
+		cont_(a);
+		
+		if (!rd_ot)
+			goto failed;
+	}
+
+	buffer = (xtWord1 *) buf.data;
+	size = buf.size;
+
+	while (size > 0) {
+		type = *buffer;
+		switch (type) {
+			case BUP_STANDARD_VAR_RECORD:
+				rec_data = buffer + 1;
+				break;
+			case BUP_RECORD_BLOCK_4_START:
+				buffer++;
+				row_len = XT_GET_DISK_4(buffer);
+				buffer += 4;
+				if (rb_data_size < row_len) {
+					if (!xt_realloc_ns((void **) &rb_row_data, row_len))
+						goto failed;
+					rb_data_size = row_len;
+				}
+				row_len = XT_GET_DISK_4(buffer);
+				buffer += 4;
+				ASSERT_NS(row_len <= rb_data_size);
+				if (row_len > rb_data_size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				memcpy(rb_row_data, buffer, row_len);
+				rb_row_len = row_len;
+				buffer += row_len;
+				if (row_len + 9 > size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				size -= row_len + 9;
+				continue;
+			case BUP_RECORD_BLOCK_4:
+				buffer++;
+				row_len = XT_GET_DISK_4(buffer);
+				buffer += 4;
+				ASSERT_NS(rb_row_len + row_len <= rb_data_size);
+				if (rb_row_len + row_len > rb_data_size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				memcpy(rb_row_data + rb_row_len, buffer, row_len);
+				rb_row_len += row_len;
+				buffer += row_len;
+				if (row_len + 5 > size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				size -= row_len + 5;
+				continue;
+			case BUP_RECORD_BLOCK_4_END:
+				buffer++;
+				row_len = XT_GET_DISK_4(buffer);
+				buffer += 4;
+				ASSERT_NS(rb_row_len + row_len <= rb_data_size);
+				if (rb_row_len + row_len > rb_data_size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				memcpy(rb_row_data + rb_row_len, buffer, row_len);
+				buffer += row_len;
+				if (row_len + 5 > size) {
+					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+					goto failed;
+				}
+				size -= row_len + 5;
+				rec_data = rb_row_data;
+				break;
+			default:
+				xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+				goto failed;
+		}
+		
+		if (!(row_len = myxt_load_row_data(rd_ot, rec_data, rb_row_buf, rb_col_cnt)))
+			goto failed;
+
+		if (rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field)
+			ha_set_auto_increment(rd_ot, rd_ot->ot_table->tab_dic.dic_my_table->found_next_number_field);
+
+		if (!xt_tab_new_record(rd_ot, rb_row_buf))
+			goto failed;
+
+		if (type == BUP_STANDARD_VAR_RECORD) {
+			buffer += row_len+1;
+			if (row_len + 1 > size) {
+				xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_BACKUP_FORMAT);
+				goto failed;
+			}
+			size -= row_len + 1;
+		}
+
+		rb_insert_count++;
+		if (rb_insert_count == XT_RESTORE_BATCH_SIZE) {
+			if (!xt_xn_commit(rd_thread))
+				goto failed;
+			if (!xt_xn_begin(rd_thread))
+				goto failed;
+			rb_insert_count = 0;
+		}
+	}
+
+	return backup::OK;
+	
+	failed:
+	xt_log_and_clear_exception(rd_thread);
+	return backup::ERROR;
+}
+
+
+result_t PBXTRestoreDriver::cancel()
+{
+	XT_TRACE_CALL();
+	/* Nothing to do in cancel(); free() will suffice */
+	return backup::OK;
+}
+
+void PBXTRestoreDriver::free()
+{
+	XT_TRACE_CALL();
+	if (rd_ot) {
+		xt_db_return_table_to_pool_ns(rd_ot);
+		rd_ot = NULL;
+	}
+	//if (rb_row_buf) {
+	//	xt_free_ns(rb_row_buf);
+	//	rb_row_buf = NULL;
+	//}
+	if (rb_row_data) {
+		xt_free_ns(rb_row_data);
+		rb_row_data = NULL;
+	}
+	if (rd_thread->st_xact_data)
+		xt_xn_rollback(rd_thread);
+	delete this;
+}
+
+/*
+ * -----------------------------------------------------------------------
+ * BACKUP ENGINE FACTORY
+ */
+
+#define PBXT_BACKUP_VERSION 1
+
+
+class PBXTBackupEngine: public Backup_engine
+{
+	public:
+	PBXTBackupEngine() { };
+
+	virtual version_t version() const {
+		return PBXT_BACKUP_VERSION;
+	};
+
+	virtual result_t get_backup(const uint32, const Table_list &, Backup_driver* &);
+
+	virtual result_t get_restore(const version_t, const uint32, const Table_list &,Restore_driver* &);
+
+	virtual void free()
+	{
+		delete this;
+	}
+};
+
+result_t PBXTBackupEngine::get_backup(const u_int count, const Table_list &tables, Backup_driver* &drv)
+{
+	PBXTBackupDriver *ptr = new PBXTBackupDriver(tables);
+
+	if (!ptr)
+		return backup::ERROR;
+	drv = ptr;
+	return backup::OK;
+}
+
+result_t PBXTBackupEngine::get_restore(const version_t ver, const uint32,
+                             const Table_list &tables, Restore_driver* &drv)
+{
+	if (ver > PBXT_BACKUP_VERSION)
+	{
+		return backup::ERROR;    
+	}
+	
+	PBXTRestoreDriver *ptr = new PBXTRestoreDriver(tables);
+
+	if (!ptr)
+		return backup::ERROR;
+	drv = (Restore_driver *) ptr;
+	return backup::OK;
+}
+
+
+Backup_result_t pbxt_backup_engine(handlerton *self, Backup_engine* &be)
+{
+	be = new PBXTBackupEngine();
+	
+	if (!be)
+		return backup::ERROR;
+	
+	return backup::OK;
+}
+
+#endif

=== added file 'storage/pbxt/src/backup_xt.h'
--- a/storage/pbxt/src/backup_xt.h	1970-01-01 00:00:00 +0000
+++ b/storage/pbxt/src/backup_xt.h	2009-11-24 10:55:06 +0000
@@ -0,0 +1,34 @@
+/* Copyright (c) 2009 PrimeBase Technologies GmbH
+ *
+ * PrimeBase XT
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * 2009-09-07	Paul McCullagh
+ *
+ * H&G2JCtL
+ */
+
+#ifndef __backup_xt_h__
+#define __backup_xt_h__
+
+#include "xt_defs.h"
+
+#ifdef MYSQL_SUPPORTS_BACKUP
+
+Backup_result_t pbxt_backup_engine(handlerton *self, Backup_engine* &be);
+
+#endif
+#endif

=== modified file 'storage/pbxt/src/cache_xt.cc'
--- a/storage/pbxt/src/cache_xt.cc	2009-10-30 18:50:56 +0000
+++ b/storage/pbxt/src/cache_xt.cc	2009-11-24 10:55:06 +0000
@@ -73,7 +73,7 @@
 #define IDX_CAC_UNLOCK(i, o)			xt_xsmutex_unlock(&(i)->cs_lock, (o)->t_id)
 #elif defined(IDX_CAC_USE_PTHREAD_RW)
 #define IDX_CAC_LOCK_TYPE				xt_rwlock_type
-#define IDX_CAC_INIT_LOCK(s, i)			xt_init_rwlock(s, &(i)->cs_lock)
+#define IDX_CAC_INIT_LOCK(s, i)			xt_init_rwlock_with_autoname(s, &(i)->cs_lock)
 #define IDX_CAC_FREE_LOCK(s, i)			xt_free_rwlock(&(i)->cs_lock)	
 #define IDX_CAC_READ_LOCK(i, o)			xt_slock_rwlock_ns(&(i)->cs_lock)
 #define IDX_CAC_WRITE_LOCK(i, o)		xt_xlock_rwlock_ns(&(i)->cs_lock)
@@ -94,8 +94,12 @@
 #define IDX_CAC_UNLOCK(i, s)			xt_spinxslock_unlock(&(i)->cs_lock, (s)->t_id)
 #endif
 
+#ifdef XT_NO_ATOMICS
+#define ID_HANDLE_USE_PTHREAD_RW
+#else
 #define ID_HANDLE_USE_SPINLOCK
 //#define ID_HANDLE_USE_PTHREAD_RW
+#endif
 
 #if defined(ID_HANDLE_USE_PTHREAD_RW)
 #define ID_HANDLE_LOCK_TYPE				xt_mutex_type
@@ -374,7 +378,7 @@ xtPublic void xt_ind_release_handle(XTIn
 {
 	DcHandleSlotPtr	hs;
 	XTIndBlockPtr	block = NULL;
-	u_int		hash_idx = 0;
+	u_int			hash_idx = 0;
 	DcSegmentPtr	seg = NULL;
 	XTIndBlockPtr	xblock;
 
@@ -1379,7 +1383,7 @@ xtPublic xtBool xt_ind_fetch(XTOpenTable
 	ASSERT_NS(iref->ir_xlock == 2);
 #endif
 	if (!(block = ind_cac_fetch(ot, ind, address, &seg, TRUE)))
-		return 0;
+		return FAILED;
 
 	branch_size = XT_GET_DISK_2(((XTIdxBranchDPtr) block->cb_data)->tb_size_2);
 	if (XT_GET_INDEX_BLOCK_LEN(branch_size) < 2 || XT_GET_INDEX_BLOCK_LEN(branch_size) > XT_INDEX_PAGE_SIZE) {

=== modified file 'storage/pbxt/src/cache_xt.h'
--- a/storage/pbxt/src/cache_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/cache_xt.h	2009-11-24 10:55:06 +0000
@@ -62,7 +62,7 @@ struct XTIdxReadBuffer;
 #define XT_IPAGE_UNLOCK(i, x)			xt_atomicrwlock_unlock(i, x)
 #elif defined(XT_IPAGE_USE_PTHREAD_RW)
 #define XT_IPAGE_LOCK_TYPE				xt_rwlock_type
-#define XT_IPAGE_INIT_LOCK(s, i)		xt_init_rwlock(s, i)
+#define XT_IPAGE_INIT_LOCK(s, i)		xt_init_rwlock_with_autoname(s, i)
 #define XT_IPAGE_FREE_LOCK(s, i)		xt_free_rwlock(i)	
 #define XT_IPAGE_READ_LOCK(i)			xt_slock_rwlock_ns(i)
 #define XT_IPAGE_WRITE_LOCK(i, s)		xt_xlock_rwlock_ns(i)

=== modified file 'storage/pbxt/src/database_xt.cc'
--- a/storage/pbxt/src/database_xt.cc	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/database_xt.cc	2009-11-24 10:55:06 +0000
@@ -54,6 +54,8 @@
  * GLOBALS
  */
 
+xtPublic XTDatabaseHPtr		pbxt_database = NULL;		// The global open database
+
 xtPublic xtLogOffset		xt_db_log_file_threshold;
 xtPublic size_t				xt_db_log_buffer_size;
 xtPublic size_t				xt_db_transaction_buffer_size;
@@ -505,6 +507,15 @@ xtPublic XTDatabaseHPtr xt_get_database(
 	 * all index entries that are not visible have
 	 * been removed.
 	 *
+	 * REASON WHY WE SET ROWID ON RECOVERY:
+	 * The row ID is set on recovery because the
+	 * change to the index may be lost after a crash.
+	 * The change to the index is done by the sweeper, and
+	 * there is no record of this change in the log.
+	 * The sweeper will not "re-sweep" all transations
+	 * that are recovered. As a result, this upadte
+	 * of the index by the sweeper may be lost.
+	 *
 	 * {OPEN-DB-SWEEPER-WAIT}
 	 * This has been moved to after the release of the open
 	 * database lock because:
@@ -518,9 +529,12 @@ xtPublic XTDatabaseHPtr xt_get_database(
 	 * - To open the database it needs the open database
 	 * lock.
 	 */
+	/*
+	 * This has been moved, see: {WAIT-FOR-SW-AFTER-RECOV}
 	pushr_(xt_heap_release, db);
 	xt_wait_for_sweeper(self, db, 0);
 	popr_();
+	*/
 
 	return db;
 }

=== modified file 'storage/pbxt/src/database_xt.h'
--- a/storage/pbxt/src/database_xt.h	2009-04-02 10:03:14 +0000
+++ b/storage/pbxt/src/database_xt.h	2009-11-24 10:55:06 +0000
@@ -105,6 +105,8 @@ typedef struct XTTablePath {
 #define XT_THREAD_IDLE		1
 #define XT_THREAD_INERR		2
 
+#define XT_XA_HASH_TAB_SIZE	223
+
 typedef struct XTDatabase : public XTHeap {
 	char					*db_name;								/* The name of the database, last component of the path! */
 	char					*db_main_path;
@@ -131,6 +133,9 @@ typedef struct XTDatabase : public XTHea
 	u_int					db_stat_sweep_waits;					/* STATISTICS: count the sweeper waits. */
 	XTDatabaseLogRec		db_xlog;								/* The transaction log for this database. */
 	XTXactRestartRec		db_restart;								/* Database recovery stuff. */
+	xt_mutex_type			db_xn_xa_lock;
+	XTXactPreparePtr		db_xn_xa_table[XT_XA_HASH_TAB_SIZE];
+	XTSortedListPtr			db_xn_xa_list;							/* The "wait-for" list, of transactions waiting for other transactions. */
 
 	XTSortedListPtr			db_xn_wait_for;							/* The "wait-for" list, of transactions waiting for other transactions. */
 	u_int					db_xn_call_start;						/* Start of the post wait calls. */
@@ -198,6 +203,7 @@ void				xt_check_database(XTThreadPtr se
 
 void				xt_add_pbxt_file(size_t size, char *path, const char *file);
 void				xt_add_location_file(size_t size, char *path);
+void				xt_add_pbxt_dir(size_t size, char *path);
 void				xt_add_system_dir(size_t size, char *path);
 void				xt_add_data_dir(size_t size, char *path);
 
@@ -244,4 +250,6 @@ inline void xt_xlog_check_long_writer(XT
 	}
 }
 
+extern XTDatabaseHPtr	pbxt_database;				// The global open database
+
 #endif

=== modified file 'storage/pbxt/src/datadic_xt.cc'
--- a/storage/pbxt/src/datadic_xt.cc	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/datadic_xt.cc	2009-11-24 10:55:06 +0000
@@ -35,7 +35,7 @@
 
 #ifdef DEBUG
 #ifdef DRIZZLED
-#include <drizzled/common_includes.h>
+//#include <drizzled/common_includes.h>
 #else
 #include "mysql_priv.h"
 #endif
@@ -437,11 +437,6 @@ class XTTokenizer {
 	XTToken *nextToken(XTThreadPtr self, c_char *keyword, XTToken *tk);
 };
 
-void ri_free_token(XTThreadPtr XT_UNUSED(self), XTToken *tk)
-{
-	delete tk;
-}
-
 XTToken *XTTokenizer::newToken(XTThreadPtr self, u_int type, char *start, char *end)
 {
 	if (!tkn_current) {

=== modified file 'storage/pbxt/src/datalog_xt.cc'
--- a/storage/pbxt/src/datalog_xt.cc	2009-09-04 07:29:34 +0000
+++ b/storage/pbxt/src/datalog_xt.cc	2009-11-24 10:55:06 +0000
@@ -410,7 +410,7 @@ static void dl_recover_log(XTThreadPtr s
 	ASSERT_NS(seq_read.sl_log_eof == seq_read.sl_rec_log_offset);
 	data_log->dlf_log_eof = seq_read.sl_rec_log_offset;
 
-	if ((size_t) data_log->dlf_log_eof < sizeof(XTXactLogHeaderDRec)) {
+	if (data_log->dlf_log_eof < (off_t) sizeof(XTXactLogHeaderDRec)) {
 		data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
 		if (!dl_create_log_header(data_log, seq_read.sl_log_file, self))
 			xt_throw(self);
@@ -1162,7 +1162,7 @@ xtBool XTDataLogBuffer::dlb_close_log(XT
 /* When I use 'thread' instead of 'self', this means
  * that I will not throw an error.
  */
-xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_offset, size_t req_size, struct XTThread *thread)
+xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_offset, size_t XT_UNUSED(req_size), struct XTThread *thread)
 {
 	/* Note, I am allowing a log to grow beyond the threshold.
 	 * The amount depends on the maximum extended record size.
@@ -1757,7 +1757,7 @@ static xtBool dl_collect_garbage(XTThrea
 			freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 
 			/* Flush the transaction log. */
-			if (!xt_xlog_flush_log(self))
+			if (!xt_xlog_flush_log(db, self))
 				xt_throw(self);
 
 			xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
@@ -1891,7 +1891,7 @@ static xtBool dl_collect_garbage(XTThrea
 	freer_(); // xt_unlock_mutex(&db->db_co_dlog_lock)
 	
 	/* Flush the transaction log. */
-	if (!xt_xlog_flush_log(self))
+	if (!xt_xlog_flush_log(db, self))
 		xt_throw(self);
 
 	/* Save state in source log header. */

=== modified file 'storage/pbxt/src/discover_xt.cc'
--- a/storage/pbxt/src/discover_xt.cc	2009-11-16 20:49:51 +0000
+++ b/storage/pbxt/src/discover_xt.cc	2009-12-02 11:50:40 +0000
@@ -31,6 +31,9 @@
 #include <drizzled/session.h>
 #include <drizzled/server_includes.h>
 #include <drizzled/sql_base.h>
+#include <drizzled/statement/alter_table.h>
+#include <algorithm>
+#include <sstream>
 #endif
 
 #include "strutil_xt.h"
@@ -39,18 +42,273 @@
 #include "ha_xtsys.h"
 
 #ifndef DRIZZLED
-#if MYSQL_VERSION_ID > 60005
+#if MYSQL_VERSION_ID >= 50404
 #define DOT_STR(x)			x.str
 #else
 #define DOT_STR(x)			x
 #endif
 #endif
 
-#ifndef DRIZZLED
+//#ifndef DRIZZLED
 #define LOCK_OPEN_HACK_REQUIRED
-#endif // DRIZZLED
+//#endif // DRIZZLED
 
 #ifdef LOCK_OPEN_HACK_REQUIRED
+#ifdef DRIZZLED
+
+using namespace drizzled;
+using namespace std;
+
+#define mysql_create_table_no_lock hacked_mysql_create_table_no_lock
+
+namespace drizzled {
+
+int rea_create_table(Session *session, const char *path,
+                     const char *db, const char *table_name,
+                     message::Table *table_proto,
+                     HA_CREATE_INFO *create_info,
+                     List<CreateField> &create_field,
+                     uint32_t key_count,KEY *key_info);
+}
+
+static uint32_t build_tmptable_filename(Session* session,
+                                        char *buff, size_t bufflen)
+{
+  uint32_t length;
+  ostringstream path_str, post_tmpdir_str;
+  string tmp;
+
+  path_str << drizzle_tmpdir;
+  post_tmpdir_str << "/" << TMP_FILE_PREFIX << current_pid;
+  post_tmpdir_str << session->thread_id << session->tmp_table++;
+  tmp= post_tmpdir_str.str();
+
+  transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
+
+  path_str << tmp;
+
+  if (bufflen < path_str.str().length())
+    length= 0;
+  else
+    length= unpack_filename(buff, path_str.str().c_str());
+
+  return length;
+}
+
+static bool mysql_create_table_no_lock(Session *session,
+                                const char *db, const char *table_name,
+                                HA_CREATE_INFO *create_info,
+                                message::Table *table_proto,
+                                AlterInfo *alter_info,
+                                bool internal_tmp_table,
+                                uint32_t select_field_count)
+{
+  char          path[FN_REFLEN];
+  uint32_t          path_length;
+  uint          db_options, key_count;
+  KEY           *key_info_buffer;
+  Cursor        *file;
+  bool          error= true;
+  /* Check for duplicate fields and check type of table to create */
+  if (!alter_info->create_list.elements)
+  {
+    my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
+               MYF(0));
+    return true;
+  }
+  assert(strcmp(table_name,table_proto->name().c_str())==0);
+  if (check_engine(session, table_name, create_info))
+    return true;
+  db_options= create_info->table_options;
+  if (create_info->row_type == ROW_TYPE_DYNAMIC)
+    db_options|=HA_OPTION_PACK_RECORD;
+  
+  /*if (!(file= create_info->db_type->getCursor((TableShare*) 0, session->mem_root)))
+  {
+    my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Cursor));
+    return true;
+  }*/
+
+  /* PMC - Done to avoid getting the partition handler by mistake! */
+  if (!(file= new (session->mem_root) ha_xtsys(pbxt_hton, NULL)))
+  {
+    my_error(ER_OUTOFMEMORY, MYF(0), sizeof(Cursor));
+    return true;
+  }
+
+  set_table_default_charset(create_info, (char*) db);
+
+  if (mysql_prepare_create_table(session, 
+                                 create_info,
+                                 table_proto,
+                                 alter_info,
+                                 internal_tmp_table,
+                                 &db_options, file,
+                                 &key_info_buffer, &key_count,
+                                 select_field_count))
+    goto err;
+
+      /* Check if table exists */
+  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+  {
+    path_length= build_tmptable_filename(session, path, sizeof(path));
+  }
+  else
+  {
+ #ifdef FN_DEVCHAR
+    /* check if the table name contains FN_DEVCHAR when defined */
+    if (strchr(table_name, FN_DEVCHAR))
+    {
+      my_error(ER_WRONG_TABLE_NAME, MYF(0), table_name);
+      return true;
+    }
+#endif
+    path_length= build_table_filename(path, sizeof(path), db, table_name, internal_tmp_table);
+  }
+
+  /* Check if table already exists */
+  if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
+      session->find_temporary_table(db, table_name))
+  {
+    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
+    {
+      create_info->table_existed= 1;            // Mark that table existed
+      push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
+                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
+                          table_name);
+      error= 0;
+      goto err;
+    }
+    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
+    goto err;
+  }
+
+  //pthread_mutex_lock(&LOCK_open); /* CREATE TABLE (some confussion on naming, double check) */
+  if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+  {
+    if (plugin::StorageEngine::getTableDefinition(*session,
+                                                  path, 
+                                                  db,
+                                                  table_name,
+                                                  internal_tmp_table) == EEXIST)
+    {
+      if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
+      {
+        error= false;
+        push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
+                            ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
+                            table_name);
+        create_info->table_existed= 1;          // Mark that table existed
+      }
+      else
+        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
+
+      goto unlock_and_end;
+    }
+    /*
+ *       We don't assert here, but check the result, because the table could be
+ *             in the table definition cache and in the same time the .frm could be
+ *                   missing from the disk, in case of manual intervention which deletes
+ *                         the .frm file. The user has to use FLUSH TABLES; to clear the cache.
+ *                               Then she could create the table. This case is pretty obscure and
+ *                                     therefore we don't introduce a new error message only for it.
+ *                                         */
+    if (TableShare::getShare(db, table_name))
+    {
+      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
+      goto unlock_and_end;
+    }
+  }
+  /*
+ *     Check that table with given name does not already
+ *         exist in any storage engine. In such a case it should
+ *             be discovered and the error ER_TABLE_EXISTS_ERROR be returned
+ *                 unless user specified CREATE TABLE IF EXISTS
+ *                     The LOCK_open mutex has been locked to make sure no
+ *                         one else is attempting to discover the table. Since
+ *                             it's not on disk as a frm file, no one could be using it!
+ *                               */
+  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
+  {
+    bool create_if_not_exists =
+      create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
+
+    char table_path[FN_REFLEN];
+    uint32_t          table_path_length;
+
+    table_path_length= build_table_filename(table_path, sizeof(table_path),
+                                            db, table_name, false);
+
+    int retcode= plugin::StorageEngine::getTableDefinition(*session,
+                                                           table_path, 
+                                                           db,
+                                                           table_name,
+                                                           false);
+    switch (retcode)
+    {
+      case ENOENT:
+        /* Normal case, no table exists. we can go and create it */
+        break;
+      case EEXIST:
+        if (create_if_not_exists)
+        {
+          error= false;
+          push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
+                              ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
+                              table_name);
+          create_info->table_existed= 1;                // Mark that table existed
+          goto unlock_and_end;
+        }
+        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
+        goto unlock_and_end;
+      default:
+        my_error(retcode, MYF(0),table_name);
+        goto unlock_and_end;
+    }
+  }
+
+  session->set_proc_info("creating table");
+  create_info->table_existed= 0;                // Mark that table is created
+
+  create_info->table_options=db_options;
+
+  if (rea_create_table(session, path, db, table_name,
+                       table_proto,
+                       create_info, alter_info->create_list,
+                       key_count, key_info_buffer))
+    goto unlock_and_end;
+
+  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+  {
+    /* Open table and put in temporary table list */
+    if (!(session->open_temporary_table(path, db, table_name, 1, OTM_OPEN)))
+    {
+      (void) session->rm_temporary_table(create_info->db_type, path);
+      goto unlock_and_end;
+    }
+  }
+
+  /*
+ *     Don't write statement if:
+ *         - It is an internal temporary table,
+ *             - Row-based logging is used and it we are creating a temporary table, or
+ *                 - The binary log is not open.
+ *                     Otherwise, the statement shall be binlogged.
+ *                        */
+  if (!internal_tmp_table &&
+      ((!(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
+    write_bin_log(session, session->query, session->query_length);
+  error= false;
+unlock_and_end:
+  //pthread_mutex_unlock(&LOCK_open);
+
+err:
+  session->set_proc_info("After create");
+  delete file;
+  return(error);
+}
+
+#else // MySQL case
 ///////////////////////////////
 /*
  * Unfortunately I cannot use the standard mysql_create_table_no_lock() because it will lock "LOCK_open"
@@ -1229,13 +1487,13 @@ static bool mysql_create_table_no_lock(T
   if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
   {
     /* Open table and put in temporary table list */
-#if MYSQL_VERSION_ID > 60005
+#if MYSQL_VERSION_ID >= 50404
     if (!(open_temporary_table(thd, path, db, table_name, 1, OTM_OPEN)))
 #else
     if (!(open_temporary_table(thd, path, db, table_name, 1)))
 #endif
     {
-#if MYSQL_VERSION_ID > 60005
+#if MYSQL_VERSION_ID >= 50404
       (void) rm_temporary_table(create_info->db_type, path, false);
 #else
       (void) rm_temporary_table(create_info->db_type, path);
@@ -1252,11 +1510,21 @@ static bool mysql_create_table_no_lock(T
     - The binary log is not open.
     Otherwise, the statement shall be binlogged.
    */
+  /* PBXT 1.0.09e
+   * Firstly we had a compile problem with MySQL 5.1.42 and
+   * the write_bin_log() call below:
+   * discover_xt.cc:1259: error: argument of type 'char* (Statement::)()' does not match 'const char*'
+   * 
+   * And secondly, we should no write the BINLOG anyway because this is
+   * an internal PBXT system table.
+   *
+   * So I am just commenting out the code altogether.
   if (!internal_tmp_table &&
       (!thd->current_stmt_binlog_row_based ||
        (thd->current_stmt_binlog_row_based &&
         !(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
-    write_bin_log(thd, TRUE, thd->query(), thd->query_length());
+    write_bin_log(thd, TRUE, thd->query, thd->query_length);
+   */
   error= FALSE;
 unlock_and_end:
   pthread_mutex_unlock(&LOCK_open);
@@ -1279,37 +1547,51 @@ warn:
 ////// END OF CUT AND PASTES FROM  sql_table.cc ////////
 ////////////////////////////////////////////////////////
 
+#endif // DRIZZLED
 #endif // LOCK_OPEN_HACK_REQUIRED
 
 //------------------------------
 int xt_create_table_frm(handlerton *hton, THD* thd, const char *db, const char *name, DT_FIELD_INFO *info, DT_KEY_INFO *XT_UNUSED(keys), xtBool skip_existing)
 {
 #ifdef DRIZZLED
-    drizzled::message::Table table_proto;
+#define MYLEX_CREATE_INFO create_info
+#else
+#define MYLEX_CREATE_INFO mylex.create_info 
+#endif
+
+#ifdef DRIZZLED
+	drizzled::statement::AlterTable *stmt = new drizzled::statement::AlterTable(thd);
+	HA_CREATE_INFO create_info;
+	//AlterInfo alter_info;
+	drizzled::message::Table table_proto;
 
 	static const char *ext = ".dfe";
 	static const int ext_len = 4;
+
+	table_proto.mutable_engine()->mutable_name()->assign("PBXT");
 #else
 	static const char *ext = ".frm";
 	static const int ext_len = 4;
 #endif
 	int err = 1;
-	//HA_CREATE_INFO create_info = {0};
-	//Alter_info alter_info;
 	char field_length_buffer[12], *field_length_ptr;
 	LEX  *save_lex= thd->lex, mylex;
-	
-	memset(&mylex.create_info, 0, sizeof(HA_CREATE_INFO));
+
+	memset(&MYLEX_CREATE_INFO, 0, sizeof(HA_CREATE_INFO));
 
 	thd->lex = &mylex;
-    lex_start(thd);
+	lex_start(thd);
+#ifdef DRIZZLED
+        mylex.statement = stmt;
+#endif
 	
 	/* setup the create info */
-	mylex.create_info.db_type = hton;
+	MYLEX_CREATE_INFO.db_type = hton;
+
 #ifndef DRIZZLED 
 	mylex.create_info.frm_only = 1;
 #endif
- 	mylex.create_info.default_table_charset = system_charset_info;
+ 	MYLEX_CREATE_INFO.default_table_charset = system_charset_info;
 	
 	/* setup the column info. */
 	while (info->field_name) {		
@@ -1335,7 +1617,7 @@ int xt_create_table_frm(handlerton *hton
 #else
 		if (add_field_to_list(thd, &field_name, info->field_type, field_length_ptr, info->field_decimal_length,
 			info->field_flags,
-#if MYSQL_VERSION_ID > 60005
+#if MYSQL_VERSION_ID >= 50404
 				HA_SM_DISK,
 				COLUMN_FORMAT_TYPE_FIXED,
 #endif
@@ -1369,7 +1651,7 @@ int xt_create_table_frm(handlerton *hton
     table_proto.set_name(name);
     table_proto.set_type(drizzled::message::Table::STANDARD);
 
-	if (mysql_create_table_no_lock(thd, db, name, &mylex.create_info, &table_proto, &mylex.alter_info, 1, 0, false)) 
+	if (mysql_create_table_no_lock(thd, db, name, &create_info, &table_proto, &stmt->alter_info, 1, 0)) 
 		goto error;
 #else
 	if (mysql_create_table_no_lock(thd, db, name, &mylex.create_info, &mylex.alter_info, 1, 0)) 

=== modified file 'storage/pbxt/src/filesys_xt.cc'
--- a/storage/pbxt/src/filesys_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/filesys_xt.cc	2009-11-24 10:55:06 +0000
@@ -56,6 +56,8 @@
 //#define DEBUG_TRACE_FILES
 //#define INJECT_WRITE_REMAP_ERROR
 /* This is required to make testing on the Mac faster: */
+/* It turns of full file sync. */
+#define DEBUG_FAST_MAC
 #endif
 
 #ifdef DEBUG_TRACE_FILES
@@ -63,10 +65,6 @@
 #define PRINTF		xt_trace
 #endif
 
-#if defined(XT_MAC) && defined(F_FULLFSYNC)
-#undef F_FULLFSYNC
-#endif
-
 #ifdef INJECT_WRITE_REMAP_ERROR
 #define INJECT_REMAP_FILE_SIZE			1000000
 #define INJECT_REMAP_FILE_TYPE			"xtd"
@@ -883,7 +881,7 @@ xtPublic xtBool xt_flush_file(XTOpenFile
 	 * fsync didn't really flush index pages to disk. fcntl(F_FULLFSYNC) is considered more effective 
 	 * in such case.
 	 */
-#ifdef F_FULLFSYNC
+#if defined(F_FULLFSYNC) && !defined(DEBUG_FAST_MAC)
 	if (fcntl(of->of_filedes, F_FULLFSYNC, 0) == -1) {
 		xt_register_ferrno(XT_REG_CONTEXT, errno, xt_file_path(of));
 		goto failed;

=== modified file 'storage/pbxt/src/filesys_xt.h'
--- a/storage/pbxt/src/filesys_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/filesys_xt.h	2009-11-24 10:55:06 +0000
@@ -102,7 +102,7 @@ xtBool			xt_fs_rename(struct XTThread *s
 #define FILE_MAP_UNLOCK(i, o)			xt_xsmutex_unlock(i, o)
 #elif defined(FILE_MAP_USE_PTHREAD_RW)
 #define FILE_MAP_LOCK_TYPE				xt_rwlock_type
-#define FILE_MAP_INIT_LOCK(s, i)		xt_init_rwlock(s, i)
+#define FILE_MAP_INIT_LOCK(s, i)		xt_init_rwlock_with_autoname(s, i)
 #define FILE_MAP_FREE_LOCK(s, i)		xt_free_rwlock(i)	
 #define FILE_MAP_READ_LOCK(i, o)		xt_slock_rwlock_ns(i)
 #define FILE_MAP_WRITE_LOCK(i, o)		xt_xlock_rwlock_ns(i)

=== modified file 'storage/pbxt/src/ha_pbxt.cc'
--- a/storage/pbxt/src/ha_pbxt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/ha_pbxt.cc	2009-11-27 15:37:02 +0000
@@ -35,6 +35,7 @@
 
 #include <stdlib.h>
 #include <time.h>
+#include <ctype.h>
 
 #ifdef DRIZZLED
 #include <drizzled/common.h>
@@ -42,14 +43,21 @@
 #include <mysys/my_alloc.h>
 #include <mysys/hash.h>
 #include <drizzled/field.h>
-#include <drizzled/current_session.h>
+#include <drizzled/session.h>
 #include <drizzled/data_home.h>
 #include <drizzled/error.h>
 #include <drizzled/table.h>
 #include <drizzled/field/timestamp.h>
 #include <drizzled/server_includes.h>
+#include <drizzled/plugin/info_schema_table.h>
 extern "C" char **session_query(Session *session);
 #define my_strdup(a,b) strdup(a)
+
+using drizzled::plugin::Registry;
+using drizzled::plugin::ColumnInfo;
+using drizzled::plugin::InfoSchemaTable;
+using drizzled::plugin::InfoSchemaMethods;
+
 #else
 #include "mysql_priv.h"
 #include <mysql/plugin.h>
@@ -71,7 +79,7 @@ extern "C" char **session_query(Session 
 #include "tabcache_xt.h"
 #include "systab_xt.h"
 #include "xaction_xt.h"
-#include "restart_xt.h"
+#include "backup_xt.h"
 
 #ifdef DEBUG
 //#define XT_USE_SYS_PAR_DEBUG_SIZES
@@ -101,6 +109,10 @@ static void		pbxt_drop_database(handlert
 static int		pbxt_close_connection(handlerton *hton, THD* thd);
 static int		pbxt_commit(handlerton *hton, THD *thd, bool all);
 static int		pbxt_rollback(handlerton *hton, THD *thd, bool all);
+static int		pbxt_prepare(handlerton *hton, THD *thd, bool all);
+static int		pbxt_recover(handlerton *hton, XID *xid_list, uint len);
+static int		pbxt_commit_by_xid(handlerton *hton, XID *xid);
+static int		pbxt_rollback_by_xid(handlerton *hton, XID *xid);
 #endif
 static void		ha_aquire_exclusive_use(XTThreadPtr self, XTSharePtr share, ha_pbxt *mine);
 static void		ha_release_exclusive_use(XTThreadPtr self, XTSharePtr share);
@@ -123,7 +135,9 @@ static void		ha_close_open_tables(XTThre
 #ifdef PBXT_HANDLER_TRACE
 #define PBXT_ALLOW_PRINTING
 
-#define XT_TRACE_CALL()				do { XTThreadPtr s = xt_get_self(); printf("%s %s\n", s ? s->t_name : "-unknown-", __FUNC__); } while (0)
+#define XT_TRACE_CALL()				ha_trace_function(__FUNC__, NULL)
+#define XT_TRACE_METHOD()			ha_trace_function(__FUNC__, pb_share->sh_table_path->ps_path)
+
 #ifdef PBXT_TRACE_RETURN
 #define XT_RETURN(x)				do { printf("%d\n", (int) (x)); return (x); } while (0)
 #define XT_RETURN_VOID				do { printf("out\n"); return; } while (0)
@@ -135,6 +149,7 @@ static void		ha_close_open_tables(XTThre
 #else
 
 #define XT_TRACE_CALL()
+#define XT_TRACE_METHOD()
 #define XT_RETURN(x)				return (x)
 #define XT_RETURN_VOID				return
 
@@ -165,10 +180,10 @@ xtBool					pbxt_crash_debug = TRUE;
 xtBool					pbxt_crash_debug = FALSE;
 #endif
 
+
 /* Variables for pbxt share methods */
 static xt_mutex_type	pbxt_database_mutex;		// Prevent a database from being opened while it is being dropped
 static XTHashTabPtr		pbxt_share_tables;			// Hash used to track open tables
-XTDatabaseHPtr			pbxt_database = NULL;		// The global open database
 static char				*pbxt_index_cache_size;
 static char				*pbxt_record_cache_size;
 static char				*pbxt_log_cache_size;
@@ -180,6 +195,12 @@ static char				*pbxt_data_log_threshold;
 static char				*pbxt_data_file_grow_size;
 static char				*pbxt_row_file_grow_size;
 static int				pbxt_max_threads;
+static my_bool			pbxt_support_xa;
+
+#ifndef DRIZZLED
+// drizzle complains it's not used
+static XTXactEnumXARec	pbxt_xa_enum;
+#endif
 
 #ifdef DEBUG
 #define XT_SHARE_LOCK_WAIT		5000
@@ -259,6 +280,33 @@ static HAVarParamsRec vp_row_file_grow_s
 //#define XT_AUTO_INCREMENT_DEF			1
 #endif
 
+#ifdef PBXT_HANDLER_TRACE
+static void ha_trace_function(const char *function, char *table)
+{
+	char		func_buf[50], *ptr;
+	XTThreadPtr	thread = xt_get_self(); 
+
+	if ((ptr = strchr(function, '('))) {
+		ptr--;
+		while (ptr > function) {
+			if (!(isalnum(*ptr) || *ptr == '_'))
+				break;
+			ptr--;
+		}
+		ptr++;
+		xt_strcpy(50, func_buf, ptr);
+		if ((ptr = strchr(func_buf, '(')))
+			*ptr = 0;
+	}
+	else
+		xt_strcpy(50, func_buf, function);
+	if (table)
+		printf("%s %s (%s)\n", thread ? thread->t_name : "-unknown-", func_buf, table);
+	else
+		printf("%s %s\n", thread ? thread->t_name : "-unknown-", func_buf);
+}
+#endif
+
 /*
  * -----------------------------------------------------------------------
  * SHARED TABLE DATA
@@ -584,6 +632,9 @@ xtPublic XTThreadPtr xt_ha_thd_to_self(T
 /* The first bit is 1. */
 static u_int ha_get_max_bit(MX_BITMAP *map)
 {
+#ifdef DRIZZLED
+	return map->getFirstSet();
+#else
 	my_bitmap_map	*data_ptr = map->bitmap;
 	my_bitmap_map	*end_ptr = map->last_word_ptr;
 	my_bitmap_map	b;
@@ -612,6 +663,7 @@ static u_int ha_get_max_bit(MX_BITMAP *m
 			cnt -= 32;
 	}
 	return 0;
+#endif
 }
 
 /*
@@ -684,9 +736,10 @@ xtPublic int xt_ha_pbxt_to_mysql_error(i
 	return(-1);			// Unknown error
 }
 
-xtPublic int xt_ha_pbxt_thread_error_for_mysql(THD *XT_UNUSED(thd), const XTThreadPtr self, int ignore_dup_key)
+xtPublic int xt_ha_pbxt_thread_error_for_mysql(THD *thd, const XTThreadPtr self, int ignore_dup_key)
 {
-	int xt_err = self->t_exception.e_xt_err;
+	int		xt_err = self->t_exception.e_xt_err;
+	xtBool	dup_key = FALSE;
 
 	XT_PRINT2(self, "xt_ha_pbxt_thread_error_for_mysql xt_err=%d auto commit=%d\n", (int) xt_err, (int) self->st_auto_commit);
 	switch (xt_err) {
@@ -725,6 +778,7 @@ xtPublic int xt_ha_pbxt_thread_error_for
 			/* If we are in auto-commit mode (and we are not ignoring
 			 * duplicate keys) then rollback the transaction automatically.
 			 */
+			dup_key = TRUE;
 			if (!ignore_dup_key && self->st_auto_commit)
 				goto abort_transaction;
 			break;
@@ -790,26 +844,20 @@ xtPublic int xt_ha_pbxt_thread_error_for
 					/* Locks are held on tables.
 					 * Only rollback after locks are released.
 					 */
-					self->st_auto_commit = TRUE;
+					/* I do not think this is required, because
+					 * I tell mysql to rollback below, 
+					 * besides it is a hack!
+					 self->st_auto_commit = TRUE;
+					 */
 					self->st_abort_trans = TRUE;
 				}
-#ifdef xxxx
-/* DBUG_ASSERT(thd->transaction.stmt.ha_list == NULL ||
-              trans == &thd->transaction.stmt); in handler.cc now
- * fails, and I don't know if this function can be called anymore! */
-				/* Cause any other DBs to do a rollback as well... */
-				if (thd) {
-					/*
-					 * GOTCHA:
-					 * This is a BUG in MySQL. I cannot rollback a transaction if
-					 * pb_mysql_thd->in_sub_stmt! But I must....?!
-					 */
-#ifdef MYSQL_SERVER
-					if (!thd->in_sub_stmt)
-						ha_rollback(thd);
-#endif
+				/* Only tell MySQL to rollback if we automatically rollback.
+				 * Note: calling this with (thd, FALSE), cause sp.test to fail.
+				 */
+				if (!dup_key) {
+					if (thd)
+						thd_mark_transaction_to_rollback(thd, TRUE);
 				}
-#endif
 			}
 			break;
 	}
@@ -908,7 +956,11 @@ static void pbxt_call_init(XTThreadPtr s
 	xt_db_data_file_grow_size = (size_t) data_file_grow_size;
 	xt_db_row_file_grow_size = (size_t) row_file_grow_size;
 
+#ifdef DRIZZLED
+	pbxt_ignore_case = TRUE;
+#else
 	pbxt_ignore_case = lower_case_table_names != 0;
+#endif
 	if (pbxt_ignore_case)
 		pbxt_share_tables = xt_new_hashtable(self, ha_hash_comp_ci, ha_hash_ci, ha_hash_free, TRUE, FALSE);
 	else
@@ -968,7 +1020,7 @@ static void pbxt_call_exit(XTThreadPtr s
  */
 static void ha_exit(XTThreadPtr self)
 {
-	xt_xres_wait_for_recovery(self);
+	xt_xres_terminate_recovery(self);
 
 	/* Wrap things up... */
 	xt_unuse_database(self, self);	/* Just in case the main thread has a database in use (for testing)? */
@@ -1024,7 +1076,7 @@ static bool pbxt_show_status(handlerton 
 	cont_(a);
 
 	if (!not_ok) {
-		if (stat_print(thd, "PBXT", 4, "", 0, strbuf.sb_cstring, strbuf.sb_len))
+		if (stat_print(thd, "PBXT", 4, "", 0, strbuf.sb_cstring, (uint) strbuf.sb_len))
 			not_ok = TRUE;
 	}
 	xt_sb_set_size(self, &strbuf, 0);
@@ -1038,14 +1090,14 @@ static bool pbxt_show_status(handlerton 
  * return 1 on error, else 0.
  */
 #ifdef DRIZZLED
-static int pbxt_init(PluginRegistry &registry)
+static int pbxt_init(Registry &registry)
 #else
 static int pbxt_init(void *p)
 #endif
 {
 	int init_err = 0;
 
-	XT_TRACE_CALL();
+	XT_PRINT0(NULL, "pbxt_init\n");
 
 	if (sizeof(xtWordPS) != sizeof(void *)) {
 		printf("PBXT: This won't work, I require that sizeof(xtWordPS) == sizeof(void *)!\n");
@@ -1076,11 +1128,27 @@ static int pbxt_init(void *p)
 		pbxt_hton->close_connection = pbxt_close_connection; /* close_connection, cleanup thread related data. */
 		pbxt_hton->commit = pbxt_commit; /* commit */
 		pbxt_hton->rollback = pbxt_rollback; /* rollback */
+		if (pbxt_support_xa) {
+			pbxt_hton->prepare = pbxt_prepare;
+			pbxt_hton->recover = pbxt_recover;
+			pbxt_hton->commit_by_xid = pbxt_commit_by_xid;
+			pbxt_hton->rollback_by_xid = pbxt_rollback_by_xid;
+		}
+		else {
+			pbxt_hton->prepare = NULL;
+			pbxt_hton->recover = NULL;
+			pbxt_hton->commit_by_xid = NULL;
+			pbxt_hton->rollback_by_xid = NULL;
+		}
 		pbxt_hton->create = pbxt_create_handler; /* Create a new handler */
 		pbxt_hton->drop_database = pbxt_drop_database; /* Drop a database */
 		pbxt_hton->panic = pbxt_panic; /* Panic call */
 		pbxt_hton->show_status = pbxt_show_status;
 		pbxt_hton->flags = HTON_NO_FLAGS; /* HTON_CAN_RECREATE - Without this flags TRUNCATE uses delete_all_rows() */
+		pbxt_hton->slot = (uint)-1; /* assign invald value, so we know when it's inited later */
+#if defined(MYSQL_SUPPORTS_BACKUP) && defined(XT_ENABLE_ONLINE_BACKUP)
+		pbxt_hton->get_backup_engine = pbxt_backup_engine;
+#endif
 #endif
 		if (!xt_init_logging())					/* Initialize logging */
 			goto error_1;
@@ -1160,8 +1228,10 @@ static int pbxt_init(void *p)
 				 * Only real problem, 2 threads try to load the same
 				 * plugin at the same time.
 				 */
+#if MYSQL_VERSION_ID < 60014
 				myxt_mutex_unlock(&LOCK_plugin);
 #endif
+#endif
 
 				/* Can't do this here yet, because I need a THD! */
 				try_(b) {
@@ -1195,8 +1265,10 @@ static int pbxt_init(void *p)
 				if (thd)
 					myxt_destroy_thread(thd, FALSE);
 #ifndef DRIZZLED
+#if MYSQL_VERSION_ID < 60014
 				myxt_mutex_lock(&LOCK_plugin);
 #endif
+#endif
 			}
 #endif
 		}
@@ -1262,7 +1334,7 @@ static int pbxt_init(void *p)
 }
 
 #ifdef DRIZZLED
-static int pbxt_end(PluginRegistry &registry)
+static int pbxt_end(Registry &registry)
 #else
 static int pbxt_end(void *)
 #endif
@@ -1378,7 +1450,7 @@ static int pbxt_commit(handlerton *hton,
 			 * transaction (!all && !self->st_auto_commit).
 			 */
 			if (all || self->st_auto_commit) {
-				XT_PRINT0(self, "xt_xn_commit\n");
+				XT_PRINT0(self, "xt_xn_commit in pbxt_commit\n");
 
 				if (!xt_xn_commit(self))
 					err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
@@ -1402,7 +1474,7 @@ static int pbxt_rollback(handlerton *hto
 	XTThreadPtr	self;
 
 	if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
-		XT_PRINT1(self, "pbxt_rollback all=%d\n", all);
+		XT_PRINT1(self, "pbxt_rollback all=%d in pbxt_commit\n", all);
 
 		if (self->st_xact_data) {
 			/* There are no table locks, rollback immediately in all cases
@@ -1431,7 +1503,7 @@ static int pbxt_rollback(handlerton *hto
 }
 
 #ifdef DRIZZLED
-handler *PBXTStorageEngine::create(TABLE_SHARE *table, MEM_ROOT *mem_root)
+Cursor *PBXTStorageEngine::create(TABLE_SHARE *table, MEM_ROOT *mem_root)
 {
 	PBXTStorageEngine * const hton = this;
 #else
@@ -1446,6 +1518,182 @@ static handler *pbxt_create_handler(hand
 
 /*
  * -----------------------------------------------------------------------
+ * 2-PHASE COMMIT
+ *
+ */
+
+#ifndef DRIZZLED
+
+static int pbxt_prepare(handlerton *hton, THD *thd, bool all)
+{
+	int			err = 0;
+	XTThreadPtr	self;
+
+	XT_TRACE_CALL();
+	if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
+		XT_PRINT1(self, "pbxt_commit all=%d\n", all);
+
+		if (self->st_xact_data) {
+			/* There are no table locks, commit immediately in all cases
+			 * except when this is a statement commit with an explicit
+			 * transaction (!all && !self->st_auto_commit).
+			 */
+			if (all) {
+				XID xid;
+
+				XT_PRINT0(self, "xt_xn_prepare in pbxt_prepare\n");
+				thd_get_xid(thd, (MYSQL_XID*) &xid);
+
+				if (!xt_xn_prepare(xid.length(), (xtWord1 *) &xid, self))
+					err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
+			}
+		}
+	}
+	return err;
+}
+
+static XTThreadPtr ha_temp_open_global_database(handlerton *hton, THD **ret_thd, int *temp_thread, char *thread_name, int *err)
+{
+	THD			*thd;
+	XTThreadPtr	self = NULL;
+
+	*temp_thread = 0;
+	if ((thd = current_thd))
+		self = (XTThreadPtr) *thd_ha_data(thd, hton);
+	else {
+		//thd = (THD *) myxt_create_thread();
+		//*temp_thread |= 2;
+	}
+
+	if (!self) {
+		XTExceptionRec e;
+
+		if (!(self = xt_create_thread(thread_name, FALSE, TRUE, &e))) {
+			*err = xt_ha_pbxt_to_mysql_error(e.e_xt_err);
+			xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
+			return NULL;
+		}
+		*temp_thread |= 1;
+	}
+
+	xt_xres_wait_for_recovery(self, XT_RECOVER_DONE);
+
+	try_(a) {
+		xt_open_database(self, mysql_real_data_home, TRUE);
+	}
+	catch_(a) {
+		*err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
+		if ((*temp_thread & 1))
+			xt_free_thread(self);
+		if (*temp_thread & 2)
+			myxt_destroy_thread(thd, FALSE);
+		self = NULL;
+	}
+	cont_(a);
+
+	*ret_thd = thd;
+	return self;
+}
+
+static void ha_temp_close_database(XTThreadPtr self, THD *thd, int temp_thread)
+{
+	xt_unuse_database(self, self);
+	if (temp_thread & 1)
+		xt_free_thread(self);
+	if (temp_thread & 2)
+		myxt_destroy_thread(thd, TRUE);
+}
+
+/* Return all prepared transactions, found during recovery.
+ * This function returns a count. If len is returned, the
+ * function will be called again.
+ */
+static int pbxt_recover(handlerton *hton, XID *xid_list, uint len)
+{
+	xtBool				temp_thread;
+	XTThreadPtr			self;
+	XTDatabaseHPtr		db;
+	uint				count = 0;
+	XTXactPreparePtr	xap;
+	int					err;
+	THD					*thd;
+
+	if (!(self = ha_temp_open_global_database(hton, &thd, &temp_thread, "TempForRecover", &err)))
+		return 0;
+
+	db = self->st_database;
+
+	for (count=0; count<len; count++) {
+		xap = xt_xn_enum_xa_data(db, &pbxt_xa_enum);
+		if (!xap)
+			break;
+		memcpy(&xid_list[count], xap->xp_xa_data, xap->xp_data_len);
+	}
+
+	ha_temp_close_database(self, thd, temp_thread);
+	return (int) count;
+}
+
+static int pbxt_commit_by_xid(handlerton *hton, XID *xid)
+{
+	xtBool				temp_thread;
+	XTThreadPtr			self;
+	XTDatabaseHPtr		db;
+	int					err = 0;
+	XTXactPreparePtr	xap;
+	THD					*thd;
+
+	XT_TRACE_CALL();
+
+	if (!(self = ha_temp_open_global_database(hton, &thd, &temp_thread, "TempForCommitXA", &err)))
+		return err;
+	db = self->st_database;
+
+	if ((xap = xt_xn_find_xa_data(db, xid->length(), (xtWord1 *) xid, TRUE, self))) {
+		if ((self->st_xact_data = xt_xn_get_xact(db, xap->xp_xact_id, self))) {
+			self->st_xact_data->xd_flags &= ~XT_XN_XAC_PREPARED;  // Prepared transactions cannot be swept!
+			if (!xt_xn_commit(self))
+				err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
+		}
+		xt_xn_delete_xa_data(db, xap, TRUE, self);
+	}
+
+	ha_temp_close_database(self, thd, temp_thread);
+	return 0;
+}
+
+static int pbxt_rollback_by_xid(handlerton *hton, XID *xid)
+{
+	int					temp_thread;
+	XTThreadPtr			self;
+	XTDatabaseHPtr		db;
+	int					err = 0;
+	XTXactPreparePtr	xap;
+	THD					*thd;
+
+	XT_TRACE_CALL();
+
+	if (!(self = ha_temp_open_global_database(hton, &thd, &temp_thread, "TempForRollbackXA", &err)))
+		return err;
+	db = self->st_database;
+
+	if ((xap = xt_xn_find_xa_data(db, xid->length(), (xtWord1 *) xid, TRUE, self))) {
+		if ((self->st_xact_data = xt_xn_get_xact(db, xap->xp_xact_id, self))) {
+			self->st_xact_data->xd_flags &= ~XT_XN_XAC_PREPARED;  // Prepared transactions cannot be swept!
+			if (!xt_xn_rollback(self))
+				err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
+		}
+		xt_xn_delete_xa_data(db, xap, TRUE, self);
+	}
+
+	ha_temp_close_database(self, thd, temp_thread);
+	return 0;
+}
+
+#endif
+
+/*
+ * -----------------------------------------------------------------------
  * HANDLER LOCKING FUNCTIONS
  *
  * These functions are used get a lock on all handles of a particular table.
@@ -1497,7 +1745,7 @@ static void ha_aquire_exclusive_use(XTTh
 	ha_pbxt	*handler;
 	time_t	end_time = time(NULL) + XT_SHARE_LOCK_TIMEOUT / 1000;
 
-	XT_PRINT1(self, "ha_aquire_exclusive_use %s PBXT X lock\n", share->sh_table_path->ps_path);
+	XT_PRINT1(self, "ha_aquire_exclusive_use (%s) PBXT X lock\n", share->sh_table_path->ps_path);
 	/* GOTCHA: It is possible to hang here, if you hold
 	 * onto the sh_ex_mutex lock, before we really
 	 * have the exclusive lock (i.e. before all
@@ -1578,7 +1826,7 @@ static void ha_release_exclusive_use(XTT
 static void ha_release_exclusive_use(XTThreadPtr XT_UNUSED(self), XTSharePtr share)
 #endif
 {
-	XT_PRINT1(self, "ha_release_exclusive_use %s PBXT X UNLOCK\n", share->sh_table_path->ps_path);
+	XT_PRINT1(self, "ha_release_exclusive_use (%s) PBXT X UNLOCK\n", share->sh_table_path->ps_path);
 	xt_lock_mutex_ns((xt_mutex_type *) share->sh_ex_mutex);
 	share->sh_table_lock = FALSE;
 	xt_broadcast_cond_ns((xt_cond_type *) share->sh_ex_cond);
@@ -1589,7 +1837,7 @@ static xtBool ha_wait_for_shared_use(ha_
 {
 	time_t	end_time = time(NULL) + XT_SHARE_LOCK_TIMEOUT / 1000;
 
-	XT_PRINT1(xt_get_self(), "ha_wait_for_shared_use %s share lock wait...\n", share->sh_table_path->ps_path);
+	XT_PRINT1(xt_get_self(), "ha_wait_for_shared_use (%s) share lock wait...\n", share->sh_table_path->ps_path);
 	mine->pb_ex_in_use = 0;
 	xt_lock_mutex_ns((xt_mutex_type *) share->sh_ex_mutex);
 	while (share->sh_table_lock) {
@@ -1667,14 +1915,38 @@ xtPublic int ha_pbxt::reopen()
  *
  */
 
-int pbxt_statistics_fill_table(THD *thd, TABLE_LIST *tables, COND *cond)
+static int pbxt_statistics_fill_table(THD *thd, TABLE_LIST *tables, COND *cond)
 {
-	XTThreadPtr		self;	
+	XTThreadPtr		self = NULL;	
 	int				err = 0;
 
+	if (!pbxt_hton) {
+		/* Can't do if PBXT is not loaded! */
+		XTExceptionRec	e;
+
+		xt_exception_xterr(&e, XT_CONTEXT, XT_ERR_PBXT_NOT_INSTALLED);
+		xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
+		/* Just return an empty set: */
+		return 0;
+	}
+
 	if (!(self = ha_set_current_thread(thd, &err)))
 		return xt_ha_pbxt_to_mysql_error(err);
+
+
 	try_(a) {
+		/* If the thread has no open database, and the global
+		 * database is already open, then open
+		 * the database. Otherwise the statement will be
+		 * executed without an open database, which means
+		 * that the related statistics will be missing.
+		 *
+		 * This includes all background threads.
+		 */
+		if (!self->st_database && pbxt_database) {
+			xt_ha_open_database_of_table(self, (XTPathStrPtr) NULL);
+		}
+
 		err = myxt_statistics_fill_table(self, thd, tables, cond, system_charset_info);
 	}
 	catch_(a) {
@@ -1684,6 +1956,24 @@ int pbxt_statistics_fill_table(THD *thd,
 	return err;
 }
 
+#ifdef DRIZZLED
+ColumnInfo pbxt_statistics_fields_info[]=
+{
+	ColumnInfo("ID", 4, MYSQL_TYPE_LONG,  0, 0, "The ID of the statistic", SKIP_OPEN_TABLE),
+        ColumnInfo("Name", 40, MYSQL_TYPE_STRING, 0, 0, "The name of the statistic", SKIP_OPEN_TABLE),
+        ColumnInfo("Value", 8, MYSQL_TYPE_LONGLONG, 0, 0, "The accumulated value", SKIP_OPEN_TABLE),
+	ColumnInfo()
+};
+
+class PBXTStatisticsMethods : public InfoSchemaMethods
+{
+public:
+  int fillTable(Session *session, TableList *tables, COND *cond)
+  {
+        return pbxt_statistics_fill_table(session, tables, cond);
+  }
+};
+#else
 ST_FIELD_INFO pbxt_statistics_fields_info[]=
 {
 	{ "ID",		4,	MYSQL_TYPE_LONG,		0, 0, "The ID of the statistic", SKIP_OPEN_TABLE},
@@ -1691,24 +1981,28 @@ ST_FIELD_INFO pbxt_statistics_fields_inf
 	{ "Value",	8,	MYSQL_TYPE_LONGLONG,	0, 0, "The accumulated value", SKIP_OPEN_TABLE},
 	{ 0,		0,	MYSQL_TYPE_STRING,		0, 0, 0, SKIP_OPEN_TABLE}
 };
+#endif
 
 #ifdef DRIZZLED
 static InfoSchemaTable	*pbxt_statistics_table;
-
-int pbxt_init_statitics(PluginRegistry &registry)
+static PBXTStatisticsMethods pbxt_statistics_methods;
+static int pbxt_init_statistics(Registry &registry)
 #else
-int pbxt_init_statitics(void *p)
+static int pbxt_init_statistics(void *p)
 #endif
 {
 #ifdef DRIZZLED
-	pbxt_statistics_table = (InfoSchemaTable *)xt_calloc_ns(sizeof(InfoSchemaTable));
-	pbxt_statistics_table->table_name= "PBXT_STATISTICS";
+	//pbxt_statistics_table = (InfoSchemaTable *)xt_calloc_ns(sizeof(InfoSchemaTable));
+	//pbxt_statistics_table->table_name= "PBXT_STATISTICS";
+	pbxt_statistics_table = new InfoSchemaTable("PBXT_STATISTICS");
+	pbxt_statistics_table->setColumnInfo(pbxt_statistics_fields_info);
+	pbxt_statistics_table->setInfoSchemaMethods(&pbxt_statistics_methods);
 	registry.add(pbxt_statistics_table);
 #else
 	ST_SCHEMA_TABLE *pbxt_statistics_table = (ST_SCHEMA_TABLE *) p;
-#endif
 	pbxt_statistics_table->fields_info = pbxt_statistics_fields_info;
 	pbxt_statistics_table->fill_table = pbxt_statistics_fill_table;
+#endif
 
 #if defined(XT_WIN) && defined(XT_COREDUMP)
 	void register_crash_filter();
@@ -1721,14 +2015,14 @@ int pbxt_init_statitics(void *p)
 }
 
 #ifdef DRIZZLED
-int pbxt_exit_statitics(PluginRegistry &registry)
+static int pbxt_exit_statistics(Registry &registry)
 #else
-int pbxt_exit_statitics(void *XT_UNUSED(p))
+static int pbxt_exit_statistics(void *XT_UNUSED(p))
 #endif
 {
 #ifdef DRIZZLED
 	registry.remove(pbxt_statistics_table);
-	xt_free_ns(pbxt_statistics_table);
+	delete pbxt_statistics_table;
 #endif
 	return(0);
 }
@@ -1758,7 +2052,11 @@ ha_pbxt::ha_pbxt(handlerton *hton, TABLE
  * exist for the storage engine. This is also used by the default rename_table and
  * delete_table method in handler.cc.
  */
+#ifdef DRIZZLED
+const char **PBXTStorageEngine::bas_ext() const
+#else
 const char **ha_pbxt::bas_ext() const
+#endif
 {
 	return pbxt_extensions;
 }
@@ -1800,11 +2098,13 @@ MX_TABLE_TYPES_T ha_pbxt::table_flags() 
 		 * purposes!
 		HA_NOT_EXACT_COUNT |
 		 */
+#ifndef DRIZZLED
 		/*
 		 * This basically means we have a file with the name of
 		 * database table (which we do).
 		 */
 		HA_FILE_BASED |
+#endif
 		/*
 		 * Not sure what this does (but MyISAM and InnoDB have it)?!
 		 * Could it mean that we support the handler functions.
@@ -1971,7 +2271,7 @@ int ha_pbxt::open(const char *table_path
 	if (!(self = ha_set_current_thread(thd, &err)))
 		return xt_ha_pbxt_to_mysql_error(err);
 
-	XT_PRINT1(self, "ha_pbxt::open %s\n", table_path);
+	XT_PRINT1(self, "open (%s)\n", table_path);
 
 	pb_ex_in_use = 1;
 	try_(a) {
@@ -2049,7 +2349,7 @@ int ha_pbxt::close(void)
 		}
 	}
 
-	XT_PRINT1(self, "ha_pbxt::close %s\n", pb_share && pb_share->sh_table_path->ps_path ? pb_share->sh_table_path->ps_path : "unknown");
+	XT_PRINT1(self, "close (%s)\n", pb_share && pb_share->sh_table_path->ps_path ? pb_share->sh_table_path->ps_path : "unknown");
 
 	if (self) {
 		try_(a) {
@@ -2125,9 +2425,10 @@ void ha_pbxt::init_auto_increment(xtWord
 		if (!TS(table)->next_number_key_offset) {
 			// Autoincrement at key-start
 			err = index_last(table->record[1]);
-			if (!err)
+			if (!err && !table->next_number_field->is_null(TS(table)->rec_buff_length)) {
 				/* {PRE-INC} */
 				nr = (xtWord8) table->next_number_field->val_int_offset(TS(table)->rec_buff_length);
+			}
 		}
 		else {
 			/* Do an index scan to find the largest value! */
@@ -2180,8 +2481,10 @@ void ha_pbxt::init_auto_increment(xtWord
 		table->next_number_field = tmp_fie;
 		table->in_use = tmp_thd;
 
-		if (xn_started)
+		if (xn_started) {
+			XT_PRINT0(self, "xt_xn_commit in init_auto_increment\n");
 			xt_xn_commit(self);
+		}
 	}
 	xt_spinlock_unlock(&tab->tab_ainc_lock);
 }
@@ -2228,13 +2531,13 @@ void ha_pbxt::get_auto_increment(MX_ULON
  * insert into t1 values (-1);
  * insert into t1 values (NULL);
  */
-void ha_pbxt::set_auto_increment(Field *nr)
+xtPublic void ha_set_auto_increment(XTOpenTablePtr ot, Field *nr)
 {
 	register XTTableHPtr	tab;
 	MX_ULONGLONG_T			nr_int_val;
 	
 	nr_int_val = nr->val_int();
-	tab = pb_open_tab->ot_table;
+	tab = ot->ot_table;
 
 	if (nr->cmp((const unsigned char *)&tab->tab_auto_inc) > 0) {
 		xt_spinlock_lock(&tab->tab_ainc_lock);
@@ -2260,9 +2563,9 @@ void ha_pbxt::set_auto_increment(Field *
 #else
 			tab->tab_dic.dic_min_auto_inc = nr_int_val + 100;
 #endif
-			pb_open_tab->ot_thread = xt_get_self();
-			if (!xt_tab_write_min_auto_inc(pb_open_tab))
-				xt_log_and_clear_exception(pb_open_tab->ot_thread);
+			ot->ot_thread = xt_get_self();
+			if (!xt_tab_write_min_auto_inc(ot))
+				xt_log_and_clear_exception(ot->ot_thread);
 		}
 	}
 }
@@ -2305,7 +2608,7 @@ int ha_pbxt::write_row(byte *buf)
 
 	ASSERT_NS(pb_ex_in_use);
 
-	XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::write_row %s\n", pb_share->sh_table_path->ps_path);
+	XT_PRINT1(pb_open_tab->ot_thread, "write_row (%s)\n", pb_share->sh_table_path->ps_path);
 	XT_DISABLED_TRACE(("INSERT tx=%d val=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(&buf[1])));
 	//statistic_increment(ha_write_count,&LOCK_status);
 #ifdef PBMS_ENABLED
@@ -2350,7 +2653,7 @@ int ha_pbxt::write_row(byte *buf)
 			err = update_err;
 			goto done;
 		}
-		set_auto_increment(table->next_number_field);
+		ha_set_auto_increment(pb_open_tab, table->next_number_field);
 	}
 
 	if (!xt_tab_new_record(pb_open_tab, (xtWord1 *) buf)) {
@@ -2423,7 +2726,7 @@ int ha_pbxt::update_row(const byte * old
 
 	ASSERT_NS(pb_ex_in_use);
 
-	XT_PRINT1(self, "ha_pbxt::update_row %s\n", pb_share->sh_table_path->ps_path);
+	XT_PRINT1(self, "update_row (%s)\n", pb_share->sh_table_path->ps_path);
 	XT_DISABLED_TRACE(("UPDATE tx=%d val=%d\n", (int) self->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(&new_data[1])));
 	//statistic_increment(ha_update_count,&LOCK_status);
 
@@ -2472,7 +2775,7 @@ int ha_pbxt::update_row(const byte * old
 
 		old_map = mx_tmp_use_all_columns(table, table->read_set);
 		nr = table->found_next_number_field->val_int();
-		set_auto_increment(table->found_next_number_field);
+		ha_set_auto_increment(pb_open_tab, table->found_next_number_field);
 		mx_tmp_restore_column_map(table, old_map);
 	}
 
@@ -2504,7 +2807,7 @@ int ha_pbxt::delete_row(const byte * buf
 
 	ASSERT_NS(pb_ex_in_use);
 
-	XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::delete_row %s\n", pb_share->sh_table_path->ps_path);
+	XT_PRINT1(pb_open_tab->ot_thread, "delete_row (%s)\n", pb_share->sh_table_path->ps_path);
 	XT_DISABLED_TRACE(("DELETE tx=%d val=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(&buf[1])));
 	//statistic_increment(ha_delete_count,&LOCK_status);
 
@@ -2829,7 +3132,8 @@ int ha_pbxt::xt_index_prev_read(XTOpenTa
 
 int ha_pbxt::index_init(uint idx, bool XT_UNUSED(sorted))
 {
-	XTIndexPtr ind;
+	XTIndexPtr	ind;
+	XTThreadPtr	thread = pb_open_tab->ot_thread;
 
 	/* select count(*) from smalltab_PBXT;
 	 * ignores the error below, and continues to
@@ -2851,6 +3155,15 @@ int ha_pbxt::index_init(uint idx, bool X
 
 		printf("index_init %s index %d cols req=%d/%d read_bits=%X write_bits=%X index_bits=%X\n", pb_open_tab->ot_table->tab_name->ps_path, (int) idx, pb_open_tab->ot_cols_req, pb_open_tab->ot_cols_req, (int) *table->read_set->bitmap, (int) *table->write_set->bitmap, (int) *ind->mi_col_map.bitmap);
 #endif
+		/* Start a statement based transaction as soon
+		 * as a read is done for a modify type statement!
+		 * Previously, this was done too late!
+		 */
+		if (!thread->st_stat_trans) {
+			trans_register_ha(pb_mysql_thd, FALSE, pbxt_hton);
+			XT_PRINT0(thread, "ha_pbxt::update_row trans_register_ha all=FALSE\n");
+			thread->st_stat_trans = TRUE;
+		}
 	}
 	else {
 		pb_open_tab->ot_cols_req = ha_get_max_bit(table->read_set);
@@ -2901,7 +3214,7 @@ int ha_pbxt::index_init(uint idx, bool X
 #endif
 	}
 	
-	xt_xlog_check_long_writer(pb_open_tab->ot_thread);
+	xt_xlog_check_long_writer(thread);
 
 	pb_open_tab->ot_thread->st_statistics.st_scan_index++;
 	return 0;
@@ -2911,7 +3224,7 @@ int ha_pbxt::index_end()
 {
 	int err = 0;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 
 	XTThreadPtr thread = pb_open_tab->ot_thread;
 
@@ -2991,7 +3304,7 @@ int ha_pbxt::index_read_xt(byte * buf, u
 
 	ASSERT_NS(pb_ex_in_use);
 
-	XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::index_read_xt %s\n", pb_share->sh_table_path->ps_path);
+	XT_PRINT1(pb_open_tab->ot_thread, "index_read_xt (%s)\n", pb_share->sh_table_path->ps_path);
 	XT_DISABLED_TRACE(("search tx=%d val=%d update=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(key), pb_modified));
 	ind = (XTIndexPtr) pb_share->sh_dic_keys[idx];
 
@@ -3072,7 +3385,7 @@ int ha_pbxt::index_next(byte * buf)
 	int			err = 0;
 	XTIndexPtr	ind;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	//statistic_increment(ha_read_next_count,&LOCK_status);
 	ASSERT_NS(pb_ex_in_use);
 
@@ -3114,7 +3427,7 @@ int ha_pbxt::index_next_same(byte * buf,
 	XTIndexPtr			ind;
 	XTIdxSearchKeyRec	search_key;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	//statistic_increment(ha_read_next_count,&LOCK_status);
 	ASSERT_NS(pb_ex_in_use);
 
@@ -3154,7 +3467,7 @@ int ha_pbxt::index_prev(byte * buf)
 	int			err = 0;
 	XTIndexPtr	ind;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	//statistic_increment(ha_read_prev_count,&LOCK_status);
 	ASSERT_NS(pb_ex_in_use);
 
@@ -3188,7 +3501,7 @@ int ha_pbxt::index_first(byte * buf)
 	XTIndexPtr			ind;
 	XTIdxSearchKeyRec	search_key;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	//statistic_increment(ha_read_first_count,&LOCK_status);
 	ASSERT_NS(pb_ex_in_use);
 
@@ -3228,7 +3541,7 @@ int ha_pbxt::index_last(byte * buf)
 	XTIndexPtr			ind;
 	XTIdxSearchKeyRec	search_key;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	//statistic_increment(ha_read_last_count,&LOCK_status);
 	ASSERT_NS(pb_ex_in_use);
 
@@ -3275,10 +3588,11 @@ int ha_pbxt::index_last(byte * buf)
  */
 int ha_pbxt::rnd_init(bool scan)
 {
-	int err = 0;
+	int			err = 0;
+	XTThreadPtr	thread = pb_open_tab->ot_thread;
 
-	XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::rnd_init %s\n", pb_share->sh_table_path->ps_path);
-	XT_DISABLED_TRACE(("seq scan tx=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id));
+	XT_PRINT1(thread, "rnd_init (%s)\n", pb_share->sh_table_path->ps_path);
+	XT_DISABLED_TRACE(("seq scan tx=%d\n", (int) thread->st_xact_data->xd_start_xn_id));
 
 	/* Call xt_tab_seq_exit() to make sure the resources used by the previous
 	 * scan are freed. In particular make sure cache page ref count is decremented.
@@ -3296,8 +3610,18 @@ int ha_pbxt::rnd_init(bool scan)
 	xt_tab_seq_exit(pb_open_tab);
 
 	/* The number of columns required: */
-	if (pb_open_tab->ot_is_modify)
+	if (pb_open_tab->ot_is_modify) {
 		pb_open_tab->ot_cols_req = table->read_set->MX_BIT_SIZE();
+		/* Start a statement based transaction as soon
+		 * as a read is done for a modify type statement!
+		 * Previously, this was done too late!
+		 */
+		if (!thread->st_stat_trans) {
+			trans_register_ha(pb_mysql_thd, FALSE, pbxt_hton);
+			XT_PRINT0(thread, "ha_pbxt::update_row trans_register_ha all=FALSE\n");
+			thread->st_stat_trans = TRUE;
+		}
+	}
 	else {
 		pb_open_tab->ot_cols_req = ha_get_max_bit(table->read_set);
 
@@ -3322,14 +3646,14 @@ int ha_pbxt::rnd_init(bool scan)
 	else
 		xt_tab_seq_reset(pb_open_tab);
 
-	xt_xlog_check_long_writer(pb_open_tab->ot_thread);
+	xt_xlog_check_long_writer(thread);
 
 	return err;
 }
 
 int ha_pbxt::rnd_end()
 {
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 
 	/*
 	 * make permanent the lock for the last scanned row
@@ -3358,7 +3682,7 @@ int ha_pbxt::rnd_next(byte *buf)
 	int		err = 0;
 	xtBool	eof;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	ASSERT_NS(pb_ex_in_use);
 	//statistic_increment(ha_read_rnd_next_count, &LOCK_status);
 	xt_xlog_check_long_writer(pb_open_tab->ot_thread);
@@ -3393,7 +3717,7 @@ int ha_pbxt::rnd_next(byte *buf)
  */
 void ha_pbxt::position(const byte *XT_UNUSED(record))
 {
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	ASSERT_NS(pb_ex_in_use);
 	/*
 	 * I changed this from using little endian to big endian.
@@ -3429,10 +3753,10 @@ int ha_pbxt::rnd_pos(byte * buf, byte *p
 {
 	int err = 0;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	ASSERT_NS(pb_ex_in_use);
 	//statistic_increment(ha_read_rnd_count, &LOCK_status);
-	XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::rnd_pos %s\n", pb_share->sh_table_path->ps_path);
+	XT_PRINT1(pb_open_tab->ot_thread, "rnd_pos (%s)\n", pb_share->sh_table_path->ps_path);
 
 	pb_open_tab->ot_curr_rec_id = mi_uint4korr((xtWord1 *) pos);
 	switch (xt_tab_dirty_read_record(pb_open_tab, (xtWord1 *) buf)) {
@@ -3509,7 +3833,7 @@ int ha_pbxt::info(uint flag)
 	XTOpenTablePtr	ot;
 	int				in_use;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	
 	if (!(in_use = pb_ex_in_use)) {
 		pb_ex_in_use = 1;
@@ -3535,7 +3859,7 @@ int ha_pbxt::info(uint flag)
 			stats.index_file_length = xt_ind_node_to_offset(ot->ot_table, ot->ot_table->tab_ind_eof);
 			stats.delete_length = ot->ot_table->tab_rec_fnum * ot->ot_rec_size;
 			//check_time = info.check_time;
-			stats.mean_rec_length = ot->ot_rec_size;
+			stats.mean_rec_length = (ulong) ot->ot_rec_size;
 		}
 
 		if (flag & HA_STATUS_CONST) {
@@ -3551,7 +3875,9 @@ int ha_pbxt::info(uint flag)
 			stats.block_size = XT_INDEX_PAGE_SIZE;
 
 			if (share->tmp_table == NO_TMP_TABLE)
-#if MYSQL_VERSION_ID > 60005
+#ifdef DRIZZLED
+#define WHICH_MUTEX			mutex
+#elif MYSQL_VERSION_ID >= 50404
 #define WHICH_MUTEX			LOCK_ha_data
 #else
 #define WHICH_MUTEX			mutex
@@ -3559,19 +3885,15 @@ int ha_pbxt::info(uint flag)
 
 #ifdef SAFE_MUTEX
 
-#if MYSQL_VERSION_ID < 60000
+#if MYSQL_VERSION_ID < 50404
 #if MYSQL_VERSION_ID < 50123
 				safe_mutex_lock(&share->mutex,__FILE__,__LINE__);
 #else
 				safe_mutex_lock(&share->mutex,0,__FILE__,__LINE__);
 #endif
 #else
-#if MYSQL_VERSION_ID < 60004
-				safe_mutex_lock(&share->mutex,__FILE__,__LINE__);
-#else
 				safe_mutex_lock(&share->WHICH_MUTEX,0,__FILE__,__LINE__);
 #endif
-#endif
 
 #else // SAFE_MUTEX
 
@@ -3675,7 +3997,7 @@ int ha_pbxt::extra(enum ha_extra_functio
 {
 	int err = 0;
 
-	XT_PRINT2(xt_get_self(), "ha_pbxt::extra %s  operation=%d\n", pb_share->sh_table_path->ps_path, operation);
+	XT_PRINT2(xt_get_self(), "ha_pbxt::extra (%s) operation=%d\n", pb_share->sh_table_path->ps_path, operation);
 
 	switch (operation) {
 		case HA_EXTRA_RESET_STATE:
@@ -3771,14 +4093,14 @@ int ha_pbxt::extra(enum ha_extra_functio
  */
 int ha_pbxt::reset(void)
 {
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	extra(HA_EXTRA_RESET_STATE);
 	XT_RETURN(0);
 }
 
 void ha_pbxt::unlock_row()
 {
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 	if (pb_open_tab)
 		pb_open_tab->ot_table->tab_locks.xt_remove_temp_lock(pb_open_tab, FALSE);
 }
@@ -3802,7 +4124,7 @@ int ha_pbxt::delete_all_rows()
 	XTDDTable		*tab_def = NULL;
 	char			path[PATH_MAX];
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 
 	if (thd_sql_command(thd) != SQLCOM_TRUNCATE) {
 		/* Just like InnoDB we only handle TRUNCATE TABLE
@@ -3906,7 +4228,7 @@ int ha_pbxt::analyze(THD *thd, HA_CHECK_
 	xtXactID		clean_xn_id = 0;
 	uint			cnt = 10;
 
-	XT_TRACE_CALL();
+	XT_TRACE_METHOD();
 
 	if (!pb_open_tab) {
 		if ((err = reopen()))
@@ -4056,7 +4378,7 @@ xtPublic int ha_pbxt::external_lock(THD 
 		ASSERT_NS(pb_ex_in_use);
 		*/
 
-		XT_PRINT1(self, "ha_pbxt::EXTERNAL_LOCK %s lock_type=UNLOCK\n", pb_share->sh_table_path->ps_path);
+		XT_PRINT1(self, "EXTERNAL_LOCK (%s) lock_type=UNLOCK\n", pb_share->sh_table_path->ps_path);
 
 		/* Make any temporary locks on this table permanent.
 		 *
@@ -4189,10 +4511,9 @@ xtPublic int ha_pbxt::external_lock(THD 
 			xt_broadcast_cond_ns((xt_cond_type *) pb_share->sh_ex_cond);
 	}
 	else {
-		XT_PRINT2(self, "ha_pbxt::EXTERNAL_LOCK %s lock_type=%d\n", pb_share->sh_table_path->ps_path, lock_type);
+		XT_PRINT2(self, "ha_pbxt::EXTERNAL_LOCK (%s) lock_type=%d\n", pb_share->sh_table_path->ps_path, lock_type);
 		
 		if (pb_lock_table) {
-
 			pb_ex_in_use = 1;
 			try_(a) {
 				if (!pb_table_locked)
@@ -4243,14 +4564,18 @@ xtPublic int ha_pbxt::external_lock(THD 
 			if ((pb_open_tab->ot_for_update = (lock_type == F_WRLCK))) {
 				switch ((int) thd_sql_command(thd)) {
 					case SQLCOM_DELETE:
+#ifndef DRIZZLED
 					case SQLCOM_DELETE_MULTI:
+#endif
 						/* turn DELETE IGNORE into normal DELETE. The IGNORE option causes problems because 
 						 * when a record is deleted we add an xlog record which we cannot "rollback" later
 						 * when we find that an FK-constraint has failed. 
 						 */
 						thd->lex->ignore = false;
 					case SQLCOM_UPDATE:
+#ifndef DRIZZLED
 					case SQLCOM_UPDATE_MULTI:
+#endif
 					case SQLCOM_REPLACE:
 					case SQLCOM_REPLACE_SELECT:
 					case SQLCOM_INSERT:
@@ -4265,7 +4590,9 @@ xtPublic int ha_pbxt::external_lock(THD 
 					case SQLCOM_DROP_TABLE:
 					case SQLCOM_DROP_INDEX:
 					case SQLCOM_LOAD:
+#ifndef DRIZZLED
 					case SQLCOM_REPAIR:
+#endif
 					case SQLCOM_OPTIMIZE:
 						self->st_stat_modify = TRUE;
 						break;
@@ -4316,11 +4643,16 @@ xtPublic int ha_pbxt::external_lock(THD 
 			self->st_xact_mode = thd_tx_isolation(thd) <= ISO_READ_COMMITTED ? XT_XACT_COMMITTED_READ : XT_XACT_REPEATABLE_READ;
 			self->st_ignore_fkeys = (thd_test_options(thd,OPTION_NO_FOREIGN_KEY_CHECKS)) != 0;
 			self->st_auto_commit = (thd_test_options(thd, (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) == 0;
+#ifdef DRIZZLED
+			self->st_table_trans = FALSE;
+#else
 			self->st_table_trans = thd_sql_command(thd) == SQLCOM_LOCK_TABLES;
+#endif
 			self->st_abort_trans = FALSE;
 			self->st_stat_ended = FALSE;
 			self->st_stat_trans = FALSE;
 			XT_PRINT0(self, "xt_xn_begin\n");
+			xt_xres_wait_for_recovery(self, XT_RECOVER_SWEPT);
 			if (!xt_xn_begin(self)) {
 				err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
 				pb_ex_in_use = 0;
@@ -4404,7 +4736,7 @@ int ha_pbxt::start_stmt(THD *thd, thr_lo
 	if (!(self = ha_set_current_thread(thd, &err)))
 		return xt_ha_pbxt_to_mysql_error(err);
 
-	XT_PRINT2(self, "ha_pbxt::start_stmt %s lock_type=%d\n", pb_share->sh_table_path->ps_path, (int) lock_type);
+	XT_PRINT2(self, "ha_pbxt::start_stmt (%s) lock_type=%d\n", pb_share->sh_table_path->ps_path, (int) lock_type);
 
 	if (!pb_open_tab) {
 		if ((err = reopen()))
@@ -4430,12 +4762,12 @@ int ha_pbxt::start_stmt(THD *thd, thr_lo
 		/* This section handles "auto-commit"... */
 		if (self->st_xact_data && self->st_auto_commit && self->st_table_trans) {
 			if (self->st_abort_trans) {
-				XT_PRINT0(self, "xt_xn_rollback\n");
+				XT_PRINT0(self, "xt_xn_rollback in start_stmt\n");
 				if (!xt_xn_rollback(self))
 					err = xt_ha_pbxt_thread_error_for_mysql(pb_mysql_thd, self, pb_ignore_dup_key);
 			}
 			else {
-				XT_PRINT0(self, "xt_xn_commit\n");
+				XT_PRINT0(self, "xt_xn_commit in start_stmt\n");
 				if (!xt_xn_commit(self))
 					err = xt_ha_pbxt_thread_error_for_mysql(pb_mysql_thd, self, pb_ignore_dup_key);
 			}
@@ -4466,9 +4798,11 @@ int ha_pbxt::start_stmt(THD *thd, thr_lo
 	if (pb_open_tab->ot_for_update) {
 		switch ((int) thd_sql_command(thd)) {
 			case SQLCOM_UPDATE:
-			case SQLCOM_UPDATE_MULTI:
 			case SQLCOM_DELETE:
+#ifndef DRIZZLED
+			case SQLCOM_UPDATE_MULTI:
 			case SQLCOM_DELETE_MULTI:
+#endif
 			case SQLCOM_REPLACE:
 			case SQLCOM_REPLACE_SELECT:
 			case SQLCOM_INSERT:
@@ -4483,14 +4817,15 @@ int ha_pbxt::start_stmt(THD *thd, thr_lo
 			case SQLCOM_DROP_TABLE:
 			case SQLCOM_DROP_INDEX:
 			case SQLCOM_LOAD:
+#ifndef DRIZZLED
 			case SQLCOM_REPAIR:
+#endif
 			case SQLCOM_OPTIMIZE:
 				self->st_stat_modify = TRUE;
 				break;
 		}
 	}
 
-
 	/* (***) This is required at this level!
 	 * No matter how often it is called, it is still the start of a
 	 * statement. We need to make sure statements that are NOT mistaken
@@ -4516,6 +4851,7 @@ int ha_pbxt::start_stmt(THD *thd, thr_lo
 		self->st_stat_ended = FALSE;
 		self->st_stat_trans = FALSE;
 		XT_PRINT0(self, "xt_xn_begin\n");
+		xt_xres_wait_for_recovery(self, XT_RECOVER_SWEPT);
 		if (!xt_xn_begin(self)) {
 			err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
 			goto complete;
@@ -4673,7 +5009,9 @@ THR_LOCK_DATA **ha_pbxt::store_lock(THD 
 		 * 
 		 */
 		if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && 
+#ifndef DRIZZLED
 			!(thd_in_lock_tables(thd) && thd_sql_command(thd) == SQLCOM_LOCK_TABLES) &&
+#endif
 			!thd_tablespace_op(thd) &&
 			thd_sql_command(thd) != SQLCOM_TRUNCATE &&
 			thd_sql_command(thd) != SQLCOM_OPTIMIZE &&
@@ -4691,22 +5029,23 @@ THR_LOCK_DATA **ha_pbxt::store_lock(THD 
 
                  * Stewart: removed SQLCOM_CALL, not sure of implications.
 		 */
-		if (lock_type == TL_READ_NO_INSERT &&
-			(!thd_in_lock_tables(thd)
+		if (lock_type == TL_READ_NO_INSERT
 #ifndef DRIZZLED
+			&& (!thd_in_lock_tables(thd)
 			 || thd_sql_command(thd) == SQLCOM_CALL
+			)
 #endif
-			))
+			)
 		{
 			lock_type = TL_READ;
 		}
 
-		XT_PRINT3(xt_get_self(), "ha_pbxt::store_lock %s %d->%d\n", pb_share->sh_table_path->ps_path, pb_lock.type, lock_type);
+		XT_PRINT3(xt_get_self(), "store_lock (%s) %d->%d\n", pb_share->sh_table_path->ps_path, pb_lock.type, lock_type);
 		pb_lock.type = lock_type;
 	}
 #ifdef PBXT_HANDLER_TRACE
 	else {
-		XT_PRINT3(xt_get_self(), "ha_pbxt::store_lock %s %d->%d (ignore/unlock)\n", pb_share->sh_table_path->ps_path, lock_type, lock_type);
+		XT_PRINT3(xt_get_self(), "store_lock (%s) %d->%d (ignore/unlock)\n", pb_share->sh_table_path->ps_path, lock_type, lock_type);
 	}
 #endif
 	*to++= &pb_lock;
@@ -4723,15 +5062,23 @@ THR_LOCK_DATA **ha_pbxt::store_lock(THD 
  * during create if the table_flag HA_DROP_BEFORE_CREATE was specified for
  * the storage engine.
 */
+#ifdef DRIZZLED
+int PBXTStorageEngine::doDropTable(Session &, std::string table_path_str)
+#else
 int ha_pbxt::delete_table(const char *table_path)
+#endif
 {
 	THD				*thd = current_thd;
 	int				err = 0;
 	XTThreadPtr		self = NULL;
 	XTSharePtr		share;
 
+#ifdef DRIZZLED
+	const char *table_path = table_path_str.c_str();
+#endif
+
 	STAT_TRACE(self, *thd_query(thd));
-	XT_PRINT1(self, "ha_pbxt::delete_table %s\n", table_path);
+	XT_PRINT1(self, "delete_table (%s)\n", table_path);
 
 	if (XTSystemTableShare::isSystemTable(table_path))
 		return delete_system_table(table_path);
@@ -4795,7 +5142,7 @@ int ha_pbxt::delete_table(const char *ta
 #endif
 	}
 	catch_(a) {
-		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
+		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
 #ifdef DRIZZLED
 		if (err == HA_ERR_NO_SUCH_TABLE)
 			err = ENOENT;
@@ -4819,7 +5166,11 @@ int ha_pbxt::delete_table(const char *ta
 	return err;
 }
 
+#ifdef DRIZZLED
+int PBXTStorageEngine::delete_system_table(const char *table_path)
+#else
 int ha_pbxt::delete_system_table(const char *table_path)
+#endif
 {
 	THD				*thd = current_thd;
 	XTExceptionRec	e;
@@ -4857,7 +5208,13 @@ int ha_pbxt::delete_system_table(const c
  * This function can be used to move a table from one database to
  * another.
  */
+#ifdef DRIZZLED
+int PBXTStorageEngine::doRenameTable(Session *,
+                                     const char *from,
+                                     const char *to)
+#else
 int ha_pbxt::rename_table(const char *from, const char *to)
+#endif
 {
 	THD				*thd = current_thd;
 	int				err = 0;
@@ -4865,15 +5222,13 @@ int ha_pbxt::rename_table(const char *fr
 	XTSharePtr		share;
 	XTDatabaseHPtr	to_db;
 
-	XT_TRACE_CALL();
-
 	if (XTSystemTableShare::isSystemTable(from))
 		return rename_system_table(from, to);
 
 	if (!(self = ha_set_current_thread(thd, &err)))
 		return xt_ha_pbxt_to_mysql_error(err);
 
-	XT_PRINT2(self, "ha_pbxt::rename_table %s -> %s\n", from, to);
+	XT_PRINT2(self, "rename_table (%s -> %s)\n", from, to);
 
 #ifdef PBMS_ENABLED
 	PBMSResultRec result;
@@ -4929,7 +5284,7 @@ int ha_pbxt::rename_table(const char *fr
 #endif
 	}
 	catch_(a) {
-		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
+		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
 	}
 	cont_(a);
 	
@@ -4940,7 +5295,11 @@ int ha_pbxt::rename_table(const char *fr
 	XT_RETURN(err);
 }
 
+#ifdef DRIZZLED
+int PBXTStorageEngine::rename_system_table(const char *XT_UNUSED(from), const char *XT_UNUSED(to))
+#else
 int ha_pbxt::rename_system_table(const char *XT_UNUSED(from), const char *XT_UNUSED(to))
+#endif
 {
 	return ER_NOT_SUPPORTED_YET;
 }
@@ -5029,7 +5388,15 @@ ha_rows ha_pbxt::records_in_range(uint i
 
  * Called from handle.cc by ha_create_table().
 */
+#ifdef DRIZZLED
+int PBXTStorageEngine::doCreateTable(Session *, 
+                                     const char *table_path, 
+                                     Table &table_arg, 
+                                     HA_CREATE_INFO &create_info, 
+                                     drizzled::message::Table &XT_UNUSED(proto))
+#else
 int ha_pbxt::create(const char *table_path, TABLE *table_arg, HA_CREATE_INFO *create_info)
+#endif
 {
 	THD				*thd = current_thd;
 	int				err = 0;
@@ -5037,37 +5404,61 @@ int ha_pbxt::create(const char *table_pa
 	XTDDTable		*tab_def = NULL;
 	XTDictionaryRec	dic;
 
-	memset(&dic, 0, sizeof(dic));
+	if ((strcmp(table_path, "./pbxt/location") == 0) || (strcmp(table_path, "./pbxt/statistics") == 0))
+		return 0;
 
-	XT_TRACE_CALL();
+	memset(&dic, 0, sizeof(dic));
 
 	if (!(self = ha_set_current_thread(thd, &err)))
 		return xt_ha_pbxt_to_mysql_error(err);
+#ifdef DRIZZLED
+	XT_PRINT2(self, "create (%s) %s\n", table_path, (create_info.options & HA_LEX_CREATE_TMP_TABLE) ? "temporary" : "");
+#else
+	XT_PRINT2(self, "create (%s) %s\n", table_path, (create_info->options & HA_LEX_CREATE_TMP_TABLE) ? "temporary" : "");
+#endif
 
 	STAT_TRACE(self, *thd_query(thd));
-	XT_PRINT1(self, "ha_pbxt::create %s\n", table_path);
 
 	try_(a) {
 		xt_ha_open_database_of_table(self, (XTPathStrPtr) table_path);
 
+#ifdef DRIZZLED
+		for (uint i=0; i<TS(&table_arg)->keys; i++) {
+			if (table_arg.key_info[i].key_length > XT_INDEX_MAX_KEY_SIZE)
+				xt_throw_sulxterr(XT_CONTEXT, XT_ERR_KEY_TOO_LARGE, table_arg.key_info[i].name, (u_long) XT_INDEX_MAX_KEY_SIZE);
+		}
+#else
 		for (uint i=0; i<TS(table_arg)->keys; i++) {
 			if (table_arg->key_info[i].key_length > XT_INDEX_MAX_KEY_SIZE)
 				xt_throw_sulxterr(XT_CONTEXT, XT_ERR_KEY_TOO_LARGE, table_arg->key_info[i].name, (u_long) XT_INDEX_MAX_KEY_SIZE);
 		}
+#endif
 
 		/* ($) auto_increment_value will be zero if 
 		 * AUTO_INCREMENT is not used. Otherwise
 		 * Query was ALTER TABLE ... AUTO_INCREMENT = x; or 
 		 * CREATE TABLE ... AUTO_INCREMENT = x;
 		 */
+#ifdef DRIZZLED
+		tab_def = xt_ri_create_table(self, true, (XTPathStrPtr) table_path, *thd_query(thd), myxt_create_table_from_table(self, &table_arg));
+		tab_def->checkForeignKeys(self, create_info.options & HA_LEX_CREATE_TMP_TABLE);
+#else
 		tab_def = xt_ri_create_table(self, true, (XTPathStrPtr) table_path, *thd_query(thd), myxt_create_table_from_table(self, table_arg));
 		tab_def->checkForeignKeys(self, create_info->options & HA_LEX_CREATE_TMP_TABLE);
+#endif
 
 		dic.dic_table = tab_def;
+#ifdef DRIZZLED
+		dic.dic_my_table = &table_arg;
+		dic.dic_tab_flags = (create_info.options & HA_LEX_CREATE_TMP_TABLE) ? XT_TAB_FLAGS_TEMP_TAB : 0;
+		dic.dic_min_auto_inc = (xtWord8) create_info.auto_increment_value; /* ($) */
+		dic.dic_def_ave_row_size = table_arg.s->getAvgRowLength();
+#else
 		dic.dic_my_table = table_arg;
 		dic.dic_tab_flags = (create_info->options & HA_LEX_CREATE_TMP_TABLE) ? XT_TAB_FLAGS_TEMP_TAB : 0;
 		dic.dic_min_auto_inc = (xtWord8) create_info->auto_increment_value; /* ($) */
 		dic.dic_def_ave_row_size = (xtWord8) table_arg->s->avg_row_length;
+#endif
 		myxt_setup_dictionary(self, &dic);
 
 		/*
@@ -5089,7 +5480,7 @@ int ha_pbxt::create(const char *table_pa
 		if (tab_def)
 			tab_def->finalize(self);
 		dic.dic_table = NULL;
-		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
+		err = xt_ha_pbxt_thread_error_for_mysql(thd, self, FALSE);
 	}
 	cont_(a);
 
@@ -5161,7 +5552,7 @@ bool ha_pbxt::get_error_message(int XT_U
 	if (!self->t_exception.e_xt_err)
 		return FALSE;
 
-	buf->copy(self->t_exception.e_err_msg, strlen(self->t_exception.e_err_msg), system_charset_info);
+	buf->copy(self->t_exception.e_err_msg, (uint32) strlen(self->t_exception.e_err_msg), system_charset_info);
 	return TRUE;
 }
 
@@ -5421,16 +5812,31 @@ static MYSQL_SYSVAR_INT(sweeper_priority
 
 #ifdef DRIZZLED
 static MYSQL_SYSVAR_INT(max_threads, pbxt_max_threads,
-	PLUGIN_VAR_OPCMDARG,
+	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
 	"The maximum number of threads used by PBXT",
 	NULL, NULL, 500, 20, 20000, 1);
 #else
 static MYSQL_SYSVAR_INT(max_threads, pbxt_max_threads,
-	PLUGIN_VAR_OPCMDARG,
+	PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_READONLY,
 	"The maximum number of threads used by PBXT, 0 = set according to MySQL max_connections.",
 	NULL, NULL, 0, 0, 20000, 1);
 #endif
 
+#ifndef DEBUG
+static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
+	PLUGIN_VAR_OPCMDARG,
+	"Enable PBXT support for the XA two-phase commit, default is enabled",
+	NULL, NULL, TRUE);
+#else
+static MYSQL_SYSVAR_BOOL(support_xa, pbxt_support_xa,
+	PLUGIN_VAR_OPCMDARG,
+	"Enable PBXT support for the XA two-phase commit, default is disabled (due to assertion failure in MySQL)",
+	/* The problem is, in MySQL an assertion fails in debug mode: 
+	 * Assertion failed: (total_ha_2pc == (ulong) opt_bin_log+1), function ha_recover, file handler.cc, line 1557.
+     */
+	NULL, NULL, FALSE);
+#endif
+
 static struct st_mysql_sys_var* pbxt_system_variables[] = {
   MYSQL_SYSVAR(index_cache_size),
   MYSQL_SYSVAR(record_cache_size),
@@ -5448,6 +5854,7 @@ static struct st_mysql_sys_var* pbxt_sys
   MYSQL_SYSVAR(offline_log_function),
   MYSQL_SYSVAR(sweeper_priority),
   MYSQL_SYSVAR(max_threads),
+  MYSQL_SYSVAR(support_xa),
   NULL
 };
 #endif
@@ -5494,8 +5901,8 @@ mysql_declare_plugin(pbxt)
 	"Paul McCullagh, PrimeBase Technologies GmbH",
 	"PBXT internal system statitics",
 	PLUGIN_LICENSE_GPL,
-	pbxt_init_statitics,						/* plugin init */
-	pbxt_exit_statitics,						/* plugin deinit */
+	pbxt_init_statistics,						/* plugin init */
+	pbxt_exit_statistics,						/* plugin deinit */
 #ifndef DRIZZLED
 	0x0005,
 #endif

=== modified file 'storage/pbxt/src/ha_pbxt.h'
--- a/storage/pbxt/src/ha_pbxt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/ha_pbxt.h	2009-11-24 10:55:06 +0000
@@ -27,9 +27,9 @@
 
 #ifdef DRIZZLED
 #include <drizzled/common.h>
-#include <drizzled/handler.h>
-#include <drizzled/plugin/storage_engine.h>
 #include <mysys/thr_lock.h>
+#include <drizzled/cursor.h>
+
 #else
 #include "mysql_priv.h"
 #endif
@@ -53,17 +53,31 @@ class ha_pbxt;
 
 #ifdef DRIZZLED
 
-class PBXTStorageEngine : public StorageEngine {
+class PBXTStorageEngine : public drizzled::plugin::StorageEngine 
+{
+
+	int delete_system_table(const char *table_path);
+	int rename_system_table(const char * from, const char * to);
+
 public:
 	PBXTStorageEngine(std::string name_arg)
-	: StorageEngine(name_arg, HTON_NO_FLAGS) {}
+	: drizzled::plugin::StorageEngine(name_arg, HTON_NO_FLAGS) {}
+
+	void operator delete(void *) {}
+	void operator delete[] (void *) {}
 
 	/* override */ int close_connection(Session *);
 	/* override */ int commit(Session *, bool);
 	/* override */ int rollback(Session *, bool);
-	/* override */ handler *create(TABLE_SHARE *, MEM_ROOT *);
+	/* override */ Cursor *create(TABLE_SHARE *, MEM_ROOT *);
 	/* override */ void drop_database(char *);
 	/* override */ bool show_status(Session *, stat_print_fn *, enum ha_stat_type);
+        /* override */ const char **bas_ext() const;
+	/* override */ int doCreateTable(Session *session, const char *table_name, 
+				Table &table_arg, HA_CREATE_INFO
+                                &create_info, drizzled::message::Table &proto);
+	/* override */ int doRenameTable(Session *, const char *from, const char *to);
+	/* override */ int doDropTable(Session &session, std::string table_path);
 };
 
 typedef PBXTStorageEngine handlerton;
@@ -139,9 +153,9 @@ class ha_pbxt: public handler
 	 * don't implement this method unless you really have indexes.
 	 */
 	const char *index_type(uint inx) { (void) inx; return "BTREE"; }
-
+#ifndef DRIZZLED
 	const char **bas_ext() const;
-
+#endif
 	MX_UINT8_T table_cache_type();
 
 	/*
@@ -241,11 +255,13 @@ class ha_pbxt: public handler
 	int		optimize(THD* thd, HA_CHECK_OPT* check_opt);
 	int		check(THD* thd, HA_CHECK_OPT* check_opt);
 	ha_rows	records_in_range(uint inx, key_range *min_key, key_range *max_key);
-	int		delete_table(const char *from);
+#ifndef DRIZZLED
 	int		delete_system_table(const char *table_path);
-	int		rename_table(const char * from, const char * to);
+	int		delete_table(const char *from);
 	int		rename_system_table(const char * from, const char * to);
+	int		rename_table(const char * from, const char * to);
 	int		create(const char *name, TABLE *form, HA_CREATE_INFO *create_info);				//required
+#endif
 	void	update_create_info(HA_CREATE_INFO *create_info);
 
 	THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type);		 //required
@@ -277,6 +293,7 @@ struct XTThread	*xt_ha_thd_to_self(THD* 
 int				xt_ha_pbxt_to_mysql_error(int xt_err);
 int				xt_ha_pbxt_thread_error_for_mysql(THD *thd, const XTThreadPtr self, int ignore_dup_key);
 void			xt_ha_all_threads_close_database(XTThreadPtr self, XTDatabase *db);
+void			ha_set_auto_increment(XTOpenTablePtr ot, Field *nr);
 
 /*
  * These hooks are suppossed to only be used by InnoDB:

=== modified file 'storage/pbxt/src/ha_xtsys.h'
--- a/storage/pbxt/src/ha_xtsys.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/ha_xtsys.h	2009-11-24 10:55:06 +0000
@@ -30,8 +30,9 @@
 
 #ifdef DRIZZLED
 #include <drizzled/common.h>
-#include <drizzled/handler.h>
+#include <drizzled/handler_structs.h>
 #include <drizzled/current_session.h>
+#include <drizzled/cursor.h>
 #else
 #include "mysql_priv.h"
 #endif

=== modified file 'storage/pbxt/src/heap_xt.cc'
--- a/storage/pbxt/src/heap_xt.cc	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/heap_xt.cc	2009-11-24 10:55:06 +0000
@@ -73,7 +73,7 @@ xtPublic void xt_check_heap(XTThreadPtr 
 }
 
 #ifdef DEBUG_MEMORY
-xtPublic void xt_mm_heap_reference(XTThreadPtr self, XTHeapPtr hp, u_int line, c_char *file)
+xtPublic void xt_mm_heap_reference(XTThreadPtr XT_UNUSED(self), XTHeapPtr hp, u_int line, c_char *file)
 #else
 xtPublic void xt_heap_reference(XTThreadPtr, XTHeapPtr hp)
 #endif

=== modified file 'storage/pbxt/src/index_xt.cc'
--- a/storage/pbxt/src/index_xt.cc	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/index_xt.cc	2009-11-24 10:55:06 +0000
@@ -829,14 +829,25 @@ static void idx_next_branch_item(XTTable
 
 	result->sr_item.i_item_offset += result->sr_item.i_item_size + result->sr_item.i_node_ref_size;
 	bitem = branch->tb_data + result->sr_item.i_item_offset;
-	if (ind->mi_fix_key)
-		ilen = result->sr_item.i_item_size;
+	if (result->sr_item.i_item_offset < result->sr_item.i_total_size) {
+		if (ind->mi_fix_key)
+			ilen = result->sr_item.i_item_size;
+		else {
+			ilen = myxt_get_key_length(ind, bitem) + XT_RECORD_REF_SIZE;
+			result->sr_item.i_item_size = ilen;
+		}
+		xt_get_res_record_ref(bitem + ilen - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
+	}
 	else {
-		ilen = myxt_get_key_length(ind, bitem) + XT_RECORD_REF_SIZE;
-		result->sr_item.i_item_size = ilen;
+		result->sr_item.i_item_size = 0;
+		result->sr_rec_id = 0;
+		result->sr_row_id = 0;
 	}
-	xt_get_res_record_ref(bitem + ilen - XT_RECORD_REF_SIZE, result); /* (Only valid if i_item_offset < i_total_size) */
-	result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
+	if (result->sr_item.i_node_ref_size)
+		/* IDX_GET_NODE_REF() loads the branch reference to the LEFT of the item. */
+		result->sr_branch = IDX_GET_NODE_REF(tab, bitem, result->sr_item.i_node_ref_size);
+	else
+		result->sr_branch = 0;
 }
 
 xtPublic void xt_prev_branch_item_fix(XTTableHPtr XT_UNUSED(tab), XTIndexPtr XT_UNUSED(ind), XTIdxBranchDPtr branch, register XTIdxResultRec *result)
@@ -3987,7 +3998,7 @@ xtPublic xtBool xt_flush_indices(XTOpenT
 		 * here.
 		 */
 		if (!(tab->tab_dic.dic_tab_flags & XT_TAB_FLAGS_TEMP_TAB)) {
-			if (!xt_xlog_flush_log(ot->ot_thread))
+			if (!xt_xlog_flush_log(tab->tab_db, ot->ot_thread))
 				goto failed_2;
 			if (!il->il_flush(ot))
 				goto failed_2;

=== modified file 'storage/pbxt/src/lock_xt.cc'
--- a/storage/pbxt/src/lock_xt.cc	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/lock_xt.cc	2009-11-24 10:55:06 +0000
@@ -1246,7 +1246,7 @@ xtPublic void xt_spinlock_init(XTThreadP
 	(void) self;
 	spl->spl_lock = 0;
 #ifdef XT_NO_ATOMICS
-	xt_init_mutex(self, &spl->spl_mutex);
+	xt_init_mutex_with_autoname(self, &spl->spl_mutex);
 #endif
 #ifdef DEBUG
 	spl->spl_locker = 0;

=== modified file 'storage/pbxt/src/locklist_xt.h'
--- a/storage/pbxt/src/locklist_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/locklist_xt.h	2009-11-27 15:37:02 +0000
@@ -24,11 +24,16 @@
 #ifndef __xt_locklist_h__
 #define __xt_locklist_h__
 
+/*
+ * XT_THREAD_LOCK_INFO and DEBUG_LOCKING code must be updated to avoid calls to xt_get_self() as it can be called before hton->slot is
+ * assigned by MySQL which is used by xt_get_self()
+ */
+
 #ifdef DEBUG
-#define XT_THREAD_LOCK_INFO
+//#define XT_THREAD_LOCK_INFO
 #ifndef XT_WIN
 /* We need DEBUG_LOCKING in order to enable pthread function wrappers */
-#define DEBUG_LOCKING
+//#define DEBUG_LOCKING
 #endif
 #endif
 

=== modified file 'storage/pbxt/src/memory_xt.cc'
--- a/storage/pbxt/src/memory_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/memory_xt.cc	2009-11-25 15:06:47 +0000
@@ -34,7 +34,7 @@
 #include "trace_xt.h"
 
 #ifdef DEBUG
-//#define RECORD_MM
+#define RECORD_MM
 #endif
 
 #ifdef DEBUG
@@ -367,9 +367,8 @@ static long mm_find_pointer(void *ptr)
 	return(-1);
 }
 
-static long mm_add_pointer(void *ptr, u_int id)
+static long mm_add_pointer(void *ptr, u_int XT_UNUSED(id))
 {
-#pragma unused(id)
 	register int	i, n, guess;
 
 	if (mm_nr_in_use == mm_total_allocated) {

=== modified file 'storage/pbxt/src/myxt_xt.cc'
--- a/storage/pbxt/src/myxt_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/myxt_xt.cc	2009-12-01 09:50:46 +0000
@@ -36,7 +36,7 @@
 #include <drizzled/current_session.h>
 #include <drizzled/sql_lex.h>
 #include <drizzled/session.h>
-extern "C" struct charset_info_st *session_charset(Session *session);
+//extern "C" struct charset_info_st *session_charset(Session *session);
 extern pthread_key_t THR_Session;
 #else
 #include "mysql_priv.h"
@@ -171,7 +171,9 @@ xtPublic u_int myxt_create_key_from_key(
 
 	for (u_int i=0; i<ind->mi_seg_count && (int) k_length > 0; i++, old += keyseg->length, keyseg++)
 	{
+#ifndef DRIZZLED
 		enum ha_base_keytype	type = (enum ha_base_keytype) keyseg->type;
+#endif
 		u_int					length = keyseg->length < k_length ? keyseg->length : k_length;
 		u_int					char_length;
 		xtWord1					*pos;
@@ -192,14 +194,18 @@ xtPublic u_int myxt_create_key_from_key(
 		pos = old;
 		if (keyseg->flag & HA_SPACE_PACK) {
 			uchar *end = pos + length;
+#ifndef DRIZZLED
 			if (type != HA_KEYTYPE_NUM) {
+#endif
 				while (end > pos && end[-1] == ' ')
 					end--;
+#ifndef DRIZZLED
 			}
 			else {
 				while (pos < end && pos[0] == ' ')
 					pos++;
 			}
+#endif
 			k_length -= length;
 			length = (u_int) (end-pos);
 			FIX_LENGTH(cs, pos, length, char_length);
@@ -276,6 +282,7 @@ xtPublic u_int myxt_create_key_from_row(
 		char_length= ((cs && cs->mbmaxlen > 1) ? length/cs->mbmaxlen : length);
 
 		pos = record + keyseg->start;
+#ifndef DRIZZLED
 		if (type == HA_KEYTYPE_BIT)
 		{
 			if (keyseg->bit_length)
@@ -289,17 +296,22 @@ xtPublic u_int myxt_create_key_from_row(
 			key+= length;
 			continue;
 		}
+#endif
 		if (keyseg->flag & HA_SPACE_PACK)
 		{
 			end = pos + length;
+#ifndef DRIZZLED
 			if (type != HA_KEYTYPE_NUM) {
+#endif
 				while (end > pos && end[-1] == ' ')
 					end--;
+#ifndef DRIZZLED
 			}
 			else {
 				while (pos < end && pos[0] == ' ')
 					pos++;
 			}
+#endif
 			length = (u_int) (end-pos);
 			FIX_LENGTH(cs, pos, length, char_length);
 			store_key_length_inc(key,char_length);
@@ -333,6 +345,7 @@ xtPublic u_int myxt_create_key_from_row(
 		if (keyseg->flag & HA_SWAP_KEY)
 		{						/* Numerical column */
 #ifdef HAVE_ISNAN
+#ifndef DRIZZLED
 			if (type == HA_KEYTYPE_FLOAT)
 			{
 				float nr;
@@ -345,7 +358,9 @@ xtPublic u_int myxt_create_key_from_row(
 					continue;
 				}
 			}
-			else if (type == HA_KEYTYPE_DOUBLE) {
+			else 
+#endif			
+			if (type == HA_KEYTYPE_DOUBLE) {
 				double nr;
 
 				float8get(nr,pos);
@@ -414,6 +429,7 @@ xtPublic u_int myxt_create_foreign_key_f
 		char_length= ((cs && cs->mbmaxlen > 1) ? length/cs->mbmaxlen : length);
 
 		pos = record + keyseg->start;
+#ifndef DRIZZLED
 		if (type == HA_KEYTYPE_BIT)
 		{
 			if (keyseg->bit_length)
@@ -427,17 +443,22 @@ xtPublic u_int myxt_create_foreign_key_f
 			key+= length;
 			continue;
 		}
+#endif
 		if (keyseg->flag & HA_SPACE_PACK)
 		{
 			end = pos + length;
+#ifndef DRIZZLED
 			if (type != HA_KEYTYPE_NUM) {
+#endif
 				while (end > pos && end[-1] == ' ')
 					end--;
+#ifndef DRIZZLED
 			}
 			else {
 				while (pos < end && pos[0] == ' ')
 					pos++;
 			}
+#endif
 			length = (u_int) (end-pos);
 			FIX_LENGTH(cs, pos, length, char_length);
 			store_key_length_inc(key,char_length);
@@ -471,6 +492,7 @@ xtPublic u_int myxt_create_foreign_key_f
 		if (keyseg->flag & HA_SWAP_KEY)
 		{						/* Numerical column */
 #ifdef HAVE_ISNAN
+#ifndef DRIZZLED
 			if (type == HA_KEYTYPE_FLOAT)
 			{
 				float nr;
@@ -483,7 +505,9 @@ xtPublic u_int myxt_create_foreign_key_f
 					continue;
 				}
 			}
-			else if (type == HA_KEYTYPE_DOUBLE) {
+			else 
+#endif
+			if (type == HA_KEYTYPE_DOUBLE) {
 				double nr;
 
 				float8get(nr,pos);
@@ -622,7 +646,6 @@ static char *mx_get_length_and_data(Fiel
 		case MYSQL_TYPE_SET:
 		case MYSQL_TYPE_GEOMETRY:
 #else
-		case DRIZZLE_TYPE_TINY:
 		case DRIZZLE_TYPE_LONG:
 		case DRIZZLE_TYPE_DOUBLE:
 		case DRIZZLE_TYPE_NULL:
@@ -740,7 +763,6 @@ static void mx_set_length_and_data(Field
 		case MYSQL_TYPE_SET:
 		case MYSQL_TYPE_GEOMETRY:
 #else
-		case DRIZZLE_TYPE_TINY:
 		case DRIZZLE_TYPE_LONG:
 		case DRIZZLE_TYPE_DOUBLE:
 		case DRIZZLE_TYPE_NULL:
@@ -825,6 +847,7 @@ xtPublic xtBool myxt_create_row_from_key
 			}
 			record[keyseg->null_pos] &= ~keyseg->null_bit;
 		}
+#ifndef DRIZZLED
 		if (keyseg->type == HA_KEYTYPE_BIT)
 		{
 			uint length = keyseg->length;
@@ -845,6 +868,7 @@ xtPublic xtBool myxt_create_row_from_key
 			key+= length;
 			continue;
 		}
+#endif
 		if (keyseg->flag & HA_SPACE_PACK)
 		{
 			uint length;
@@ -854,16 +878,20 @@ xtPublic xtBool myxt_create_row_from_key
 				goto err;
 #endif
 			pos = record+keyseg->start;
+#ifndef DRIZZLED
 			if (keyseg->type != (int) HA_KEYTYPE_NUM)
 			{
+#endif
 				memcpy(pos,key,(size_t) length);
 				bfill(pos+length,keyseg->length-length,' ');
+#ifndef DRIZZLED
 			}
 			else
 			{
 				bfill(pos,keyseg->length-length,' ');
 				memcpy(pos+keyseg->length-length,key,(size_t) length);
 			}
+#endif
 			key+=length;
 			continue;
 		}
@@ -945,7 +973,7 @@ xtPublic xtBool myxt_create_row_from_key
 static int my_compare_bin(uchar *a, uint a_length, uchar *b, uint b_length,
 											 my_bool part_key, my_bool skip_end_space)
 {
-	uint length= min(a_length,b_length);
+	uint length= a_length < b_length ? a_length : b_length;
 	uchar *end= a+ length;
 	int flag;
 
@@ -1023,6 +1051,7 @@ xtPublic u_int myxt_get_key_length(XTInd
 				get_key_pack_length(seg_len, pack_len, key_data);
 				key_data += seg_len;
 				break;
+#ifndef DRIZZLED
 			case HA_KEYTYPE_NUM: {
 				/* Numeric key */
 				if (keyseg->flag & HA_SPACE_PACK)
@@ -1035,15 +1064,16 @@ xtPublic u_int myxt_get_key_length(XTInd
 			case HA_KEYTYPE_INT8:
 			case HA_KEYTYPE_SHORT_INT:
 			case HA_KEYTYPE_USHORT_INT:
+			case HA_KEYTYPE_INT24:
+			case HA_KEYTYPE_FLOAT:
+			case HA_KEYTYPE_BIT:
+#endif
 			case HA_KEYTYPE_LONG_INT:
 			case HA_KEYTYPE_ULONG_INT:
-			case HA_KEYTYPE_INT24:
 			case HA_KEYTYPE_UINT24:
-			case HA_KEYTYPE_FLOAT:
 			case HA_KEYTYPE_DOUBLE:
 			case HA_KEYTYPE_LONGLONG:
 			case HA_KEYTYPE_ULONGLONG:
-			case HA_KEYTYPE_BIT:
 				key_data += keyseg->length;
 				break;
 			case HA_KEYTYPE_END:
@@ -1190,6 +1220,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += b_length;
 				break;
 			}
+#ifndef DRIZZLED
 			case HA_KEYTYPE_INT8:
 			{
 				int i_1 = (int) *((signed char *) a);
@@ -1218,6 +1249,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#endif
 			case HA_KEYTYPE_LONG_INT: {
 				int32 l_1 = sint4korr(a);
 				int32 l_2 = sint4korr(b);
@@ -1236,6 +1268,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#ifndef DRIZZLED
 			case HA_KEYTYPE_INT24: {
 				int32 l_1 = sint3korr(a);
 				int32 l_2 = sint3korr(b);
@@ -1245,6 +1278,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#endif
 			case HA_KEYTYPE_UINT24: {
 				int32 l_1 = uint3korr(a);
 				int32 l_2 = uint3korr(b);
@@ -1254,6 +1288,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#ifndef DRIZZLED
 			case HA_KEYTYPE_FLOAT: {
 				float f_1, f_2;
 
@@ -1270,6 +1305,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#endif
 			case HA_KEYTYPE_DOUBLE: {
 				double d_1, d_2;
 
@@ -1286,6 +1322,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += keyseg->length;
 				break;
 			}
+#ifndef DRIZZLED
 			case HA_KEYTYPE_NUM: {
 				/* Numeric key */
 				if (keyseg->flag & HA_SPACE_PACK) {
@@ -1339,6 +1376,7 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				b += b_length;
 				break;
 			}
+#endif
 #ifdef HAVE_LONG_LONG
 			case HA_KEYTYPE_LONGLONG: {
 				longlong ll_a = sint8korr(a);
@@ -1359,9 +1397,11 @@ xtPublic int myxt_compare_key(XTIndexPtr
 				break;
 			}
 #endif
+#ifndef DRIZZLED
 			case HA_KEYTYPE_BIT:
 				/* TODO: What here? */
 				break;
+#endif
 			case HA_KEYTYPE_END:												/* Ready */
 				goto end;
 		}
@@ -1410,16 +1450,19 @@ xtPublic u_int myxt_key_seg_length(XTInd
 			key_length = has_null + a_length + pack_len;
 			break;
 		}
+#ifndef DRIZZLED
 		case HA_KEYTYPE_INT8:
 		case HA_KEYTYPE_SHORT_INT:
 		case HA_KEYTYPE_USHORT_INT:
+		case HA_KEYTYPE_INT24:
+		case HA_KEYTYPE_FLOAT:
+#endif		
 		case HA_KEYTYPE_LONG_INT:
 		case HA_KEYTYPE_ULONG_INT:
-		case HA_KEYTYPE_INT24:
 		case HA_KEYTYPE_UINT24:
-		case HA_KEYTYPE_FLOAT:
 		case HA_KEYTYPE_DOUBLE:
 			break;
+#ifndef DRIZZLED
 		case HA_KEYTYPE_NUM: {
 			/* Numeric key */
 			if (keyseg->flag & HA_SPACE_PACK) {
@@ -1428,14 +1471,17 @@ xtPublic u_int myxt_key_seg_length(XTInd
 			}
 			break;
 		}
+#endif
 #ifdef HAVE_LONG_LONG
 		case HA_KEYTYPE_LONGLONG:
 		case HA_KEYTYPE_ULONGLONG:
 			break;
 #endif
+#ifndef DRIZZLED
 		case HA_KEYTYPE_BIT:
 			/* TODO: What here? */
 			break;
+#endif
 		case HA_KEYTYPE_END:												/* Ready */
 			break;
 	}
@@ -1486,7 +1532,7 @@ xtPublic xtWord4 myxt_store_row_length(X
 	return row_size;
 }
 
-static xtWord4 mx_store_row(XTOpenTablePtr ot, xtWord4 row_size, char *rec_buff)
+xtPublic xtWord4 myxt_store_row_data(XTOpenTablePtr ot, xtWord4 row_size, char *rec_buff)
 {
 	TABLE	*table = ot->ot_table->tab_dic.dic_my_table;
 	char	*sdata;
@@ -1614,8 +1660,9 @@ xtPublic size_t myxt_load_row_length(XTO
 }
 
 /* Unload from PBXT variable length format to the MySQL row format. */
-xtPublic xtBool myxt_load_row(XTOpenTablePtr ot, xtWord1 *source_buf, xtWord1 *dest_buff, u_int col_cnt)
+xtPublic xtWord4 myxt_load_row_data(XTOpenTablePtr ot, xtWord1 *source_buf, xtWord1 *dest_buff, u_int col_cnt)
 {
+	xtWord1 *input_buf = source_buf;
 	TABLE	*table;
 	xtWord4	len;
 	Field	*curr_field;
@@ -1624,7 +1671,7 @@ xtPublic xtBool myxt_load_row(XTOpenTabl
 
 	if (!(table = ot->ot_table->tab_dic.dic_my_table)) {
 		xt_register_taberr(XT_REG_CONTEXT, XT_ERR_NO_DICTIONARY, ot->ot_table->tab_name);
-		return FAILED;
+		return 0;
 	}
 
 	/* According to the InnoDB implementation:
@@ -1657,7 +1704,7 @@ xtPublic xtBool myxt_load_row(XTOpenTabl
 			default: // Length byte
 				if (*source_buf > 240) {
 					xt_register_xterr(XT_REG_CONTEXT, XT_ERR_BAD_RECORD_FORMAT);
-					return FAILED;
+					return 0;
 				}
 				len = *source_buf;
 				source_buf++;
@@ -1671,7 +1718,12 @@ xtPublic xtBool myxt_load_row(XTOpenTabl
 
 		source_buf += len;
  	}
-	return OK;
+	return (xtWord4) (source_buf - input_buf);
+}
+
+xtPublic xtBool myxt_load_row(XTOpenTablePtr ot, xtWord1 *source_buf, xtWord1 *dest_buff, u_int col_cnt)
+{
+	return myxt_load_row_data(ot, source_buf, dest_buff, col_cnt) != 0;
 }
 
 xtPublic xtBool myxt_find_column(XTOpenTablePtr ot, u_int *col_idx, const char *col_name)
@@ -1784,7 +1836,7 @@ xtPublic xtBool myxt_store_row(XTOpenTab
 	else {
 		xtWord4 row_size;
 
-		if (!(row_size = mx_store_row(ot, XT_REC_EXT_HEADER_SIZE, rec_buff)))
+		if (!(row_size = myxt_store_row_data(ot, XT_REC_EXT_HEADER_SIZE, rec_buff)))
 			return FAILED;
 		if (row_size - XT_REC_FIX_EXT_HEADER_DIFF <= ot->ot_rec_size) {	
 			rec_info->ri_fix_rec_buf = (XTTabRecFixDPtr) &ot->ot_row_wbuffer[XT_REC_FIX_EXT_HEADER_DIFF];
@@ -1951,7 +2003,7 @@ static TABLE *my_open_table(XTThreadPtr 
 
 #ifdef DRIZZLED
 	share->init(db_name, 0, name, path);
-	if ((error = open_table_def(thd, share)) ||
+	if ((error = open_table_def(*thd, share)) ||
 		(error = open_table_from_share(thd, share, "", 0, (uint32_t) READ_ALL, 0, table, OTM_OPEN)))
 	{
 		xt_free(self, table);
@@ -1995,7 +2047,7 @@ static TABLE *my_open_table(XTThreadPtr 
 		return NULL;
 	}
 
-#if MYSQL_VERSION_ID >= 60003
+#if MYSQL_VERSION_ID >= 50404
 	if ((error = open_table_from_share(thd, share, "", 0, (uint) READ_ALL, 0, table, OTM_OPEN)))
 #else
 	if ((error = open_table_from_share(thd, share, "", 0, (uint) READ_ALL, 0, table, FALSE)))
@@ -2145,7 +2197,10 @@ static XTIndexPtr my_create_index(XTThre
 		if (options & HA_OPTION_PACK_KEYS ||
 			(index->flags & (HA_PACK_KEY | HA_BINARY_PACK_KEY | HA_SPACE_PACK_USED)))
 		{
-			if (key_part->length > 8 && (type == HA_KEYTYPE_TEXT || type == HA_KEYTYPE_NUM ||
+			if (key_part->length > 8 && (type == HA_KEYTYPE_TEXT || 
+#ifndef DRIZZLED
+				type == HA_KEYTYPE_NUM ||
+#endif
 				(type == HA_KEYTYPE_BINARY && !field->zero_pack())))
 			{
 				/* No blobs here */
@@ -2213,8 +2268,12 @@ static XTIndexPtr my_create_index(XTThre
 		else if (field->type() == MYSQL_TYPE_ENUM) {
 			switch (seg->length) {
 				case 2: 
+#ifdef DRIZZLED
+					ASSERT_NS(FALSE);
+#else
 					seg->type = HA_KEYTYPE_USHORT_INT;
 					break;
+#endif
 				case 3:
 					seg->type = HA_KEYTYPE_UINT24;
 					break;
@@ -2675,7 +2734,11 @@ xtPublic xtBool myxt_load_dictionary(XTT
 	if (!(my_tab = my_open_table(self, db, tab_path)))
 		return FAILED;
 	dic->dic_my_table = my_tab;
+#ifdef DRIZZLED
+	dic->dic_def_ave_row_size = (xtWord8) my_tab->s->getAvgRowLength();
+#else
 	dic->dic_def_ave_row_size = (xtWord8) my_tab->s->avg_row_length;
+#endif
 	myxt_setup_dictionary(self, dic);
 	dic->dic_keys = (XTIndexPtr *) xt_calloc(self, sizeof(XTIndexPtr) * TS(my_tab)->keys);
 	for (uint i=0; i<TS(my_tab)->keys; i++)
@@ -2805,8 +2868,10 @@ static void ha_create_dd_index(XTThreadP
 
 static char *my_type_to_string(XTThreadPtr self, Field *field, TABLE *XT_UNUSED(my_tab))
 {
-	char		buffer[MAX_FIELD_WIDTH + 400], *ptr;
+	char		buffer[MAX_FIELD_WIDTH + 400];
+	const char 	*ptr;
 	String		type((char *) buffer, sizeof(buffer), system_charset_info);
+	xtWord4		len;
 
 	/* GOTCHA:
 	 * - Above sets the string length to the same as the buffer,
@@ -2817,10 +2882,17 @@ static char *my_type_to_string(XTThreadP
 	 */
 	type.length(0);
 	field->sql_type(type);
-	ptr = type.c_ptr();
+	ptr = type.ptr();
+	len = type.length();
+
+	if (len >= sizeof(buffer))
+		len = sizeof(buffer)-1;
+
 	if (ptr != buffer)
-		xt_strcpy(sizeof(buffer), buffer, ptr);			
+		xt_strcpy(sizeof(buffer), buffer, ptr);
 
+	buffer[len] = 0;
+			
 	if (field->has_charset()) {
 		/* Always include the charset so that we can compare types
 		 * for FK/PK releations.
@@ -2877,6 +2949,10 @@ xtPublic XTDDTable *myxt_create_table_fr
 
 xtPublic void myxt_static_convert_identifier(XTThreadPtr XT_UNUSED(self), MX_CHARSET_INFO *cs, char *from, char *to, size_t to_len)
 {
+#ifdef DRIZZLED
+	((void *)cs);
+	 xt_strcpy(to_len, to, from);
+#else
 	uint errors;
 
 	/*
@@ -2888,11 +2964,16 @@ xtPublic void myxt_static_convert_identi
 		xt_strcpy(to_len, to, from);
 	else
 		strconvert(cs, from, &my_charset_utf8_general_ci, to, to_len, &errors);
+#endif
 }
 
 // cs == current_thd->charset()
 xtPublic char *myxt_convert_identifier(XTThreadPtr self, MX_CHARSET_INFO *cs, char *from)
 {
+#ifdef DRIZZLED
+	char *to = xt_dup_string(self, from);
+	((void *)cs);
+#else
 	uint	errors;
 	u_int	len;
 	char	*to;
@@ -2904,6 +2985,7 @@ xtPublic char *myxt_convert_identifier(X
 		to = (char *) xt_malloc(self, len);
 		strconvert(cs, from, &my_charset_utf8_general_ci, to, len, &errors);
 	}
+#endif
 	return to;
 }
 
@@ -2954,11 +3036,19 @@ xtPublic MX_CHARSET_INFO *myxt_getcharse
 		THD *thd = current_thd;
 
 		if (thd)
-			return thd_charset(thd);
+			return (MX_CHARSET_INFO *)thd_charset(thd);
 	}
-	return &my_charset_utf8_general_ci;
+	return (MX_CHARSET_INFO *)&my_charset_utf8_general_ci;
 }
 
+#ifdef DBUG_OFF
+//typedef struct st_plugin_int *plugin_ref;
+#define REF_MYSQL_PLUGIN(x)		(x)
+#else
+//typedef struct st_plugin_int **plugin_ref;
+#define REF_MYSQL_PLUGIN(x)		(*(x))
+#endif
+
 xtPublic void *myxt_create_thread()
 {
 #ifdef DRIZZLED
@@ -3011,12 +3101,22 @@ xtPublic void *myxt_create_thread()
 		return NULL;
 	}
 
-	if (!(new_thd = new THD())) {
+	if (!(new_thd = new THD)) {
 		my_thread_end();
 		xt_register_error(XT_REG_CONTEXT, XT_ERR_MYSQL_ERROR, 0, "Unable to create MySQL thread (THD)");
 		return NULL;
 	}
 
+	/*
+	 * If PBXT is the default storage engine, then creating any THD objects will add extra 
+	 * references to the PBXT plugin object and will effectively deadlock the plugin so 
+	 * that server will have to force plugin shutdown. To avoid deadlocking and forced shutdown 
+	 * we must dereference the plugin after creating THD objects.
+	 */
+	LEX_STRING& plugin_name = REF_MYSQL_PLUGIN(new_thd->variables.table_plugin)->name;
+	if ((plugin_name.length == 4) && (strncmp(plugin_name.str, "PBXT", plugin_name.length) == 0)) {
+		REF_MYSQL_PLUGIN(new_thd->variables.table_plugin)->ref_count--;
+	}
 	new_thd->thread_stack = (char *) &new_thd;
 	new_thd->store_globals();
 	lex_start(new_thd);
@@ -3052,6 +3152,17 @@ xtPublic void myxt_destroy_thread(void *
 	close_thread_tables(thd);
 #endif
 
+	/*
+	 * In myxt_create_thread we decremented plugin ref-count to avoid dead-locking.
+	 * Here we need to increment ref-count to avoid assertion failures.
+	 */
+	if (thd->variables.table_plugin) {
+		LEX_STRING& plugin_name = REF_MYSQL_PLUGIN(thd->variables.table_plugin)->name;
+		if ((plugin_name.length == 4) && (strncmp(plugin_name.str, "PBXT", plugin_name.length) == 0)) {
+			REF_MYSQL_PLUGIN(thd->variables.table_plugin)->ref_count++;
+		}
+	}
+	
 	delete thd;
 
 	/* Remember that we don't have a THD */
@@ -3128,11 +3239,12 @@ xtPublic int myxt_statistics_fill_table(
 	const char		*stat_name;
 	u_llong			stat_value;
 	XTStatisticsRec	statistics;
+	XTDatabaseHPtr	db = self->st_database;
 
 	xt_gather_statistics(&statistics);
 	for (u_int rec_id=0; !err && rec_id<XT_STAT_CURRENT_MAX; rec_id++) {
 		stat_name = xt_get_stat_meta_data(rec_id)->sm_name;
-		stat_value = xt_get_statistic(&statistics, self->st_database, rec_id);
+		stat_value = xt_get_statistic(&statistics, db, rec_id);
 
 		col=0;
 		mx_put_u_llong(table, col++, rec_id+1);
@@ -3213,19 +3325,31 @@ static void myxt_bitmap_init(XTThreadPtr
 	my_bitmap_map	*buf;
     uint			size_in_bytes = (((n_bits) + 31) / 32) * 4;
 
-    buf = (my_bitmap_map *) xt_malloc(self, size_in_bytes);
+	buf = (my_bitmap_map *) xt_malloc(self, size_in_bytes);
+
+#ifdef DRIZZLED
+	map->init(buf, n_bits);
+#else
 	map->bitmap= buf;
 	map->n_bits= n_bits;
 	create_last_word_mask(map);
 	bitmap_clear_all(map);
+#endif
 }
 
 static void myxt_bitmap_free(XTThreadPtr self, MX_BITMAP *map)
 {
+#ifdef DRIZZLED
+	my_bitmap_map *buf = map->getBitmap();
+	if (buf)
+		xt_free(self, buf);
+	map->setBitmap(NULL);
+#else
 	if (map->bitmap) {
 		xt_free(self, map->bitmap);
 		map->bitmap = NULL;
 	}
+#endif
 }
 
 /*
@@ -3269,3 +3393,29 @@ XTDDColumn *XTDDColumnFactory::createFro
 	return col;
 }
 
+/*
+ * -----------------------------------------------------------------------
+ * utilities
+ */
+
+/*
+ * MySQL (not sure about Drizzle) first calls hton->init and then assigns the plugin a thread slot
+ * which is used by xt_get_self(). This is a problem as pbxt_init() starts a number of daemon threads
+ * which could try to use the slot before it is assigned. This code waits till slot is inited.
+ * We cannot directly check hton->slot as in some versions of MySQL it can be 0 before init which is a 
+ * valid value.
+ */
+extern ulong total_ha;
+
+xtPublic void myxt_wait_pbxt_plugin_slot_assigned(XTThread *self)
+{
+#ifdef DRIZZLED
+	static LEX_STRING plugin_name = { C_STRING_WITH_LEN("PBXT") };
+
+	while (!self->t_quit && !Registry::singleton().find(&plugin_name))
+		xt_sleep_milli_second(1);
+#else
+	while(!self->t_quit && (pbxt_hton->slot >= total_ha))
+		xt_sleep_milli_second(1);
+#endif
+}

=== modified file 'storage/pbxt/src/myxt_xt.h'
--- a/storage/pbxt/src/myxt_xt.h	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/myxt_xt.h	2009-11-27 15:37:02 +0000
@@ -52,8 +52,10 @@ void		myxt_set_default_row_from_key(XTOp
 void		myxt_print_key(XTIndexPtr ind, xtWord1 *key_value);
 
 xtWord4		myxt_store_row_length(XTOpenTablePtr ot, char *rec_buff);
+xtWord4		myxt_store_row_data(XTOpenTablePtr ot, xtWord4 row_size, char *rec_buff);
 xtBool		myxt_store_row(XTOpenTablePtr ot, XTTabRecInfoPtr rec_info, char *rec_buff);
 size_t		myxt_load_row_length(XTOpenTablePtr ot, size_t buffer_size, xtWord1 *source_buf, u_int *ret_col_cnt);
+xtWord4		myxt_load_row_data(XTOpenTablePtr ot, xtWord1 *source_buf, xtWord1 *dest_buff, u_int col_cnt);
 xtBool		myxt_load_row(XTOpenTablePtr ot, xtWord1 *source_buf, xtWord1 *dest_buff, u_int col_cnt);
 xtBool		myxt_find_column(XTOpenTablePtr ot, u_int *col_idx, const char *col_name);
 void		myxt_get_column_name(XTOpenTablePtr ot, u_int col_idx, u_int len, char *col_name);
@@ -93,4 +95,6 @@ public:
 	static XTDDColumn *createFromMySQLField(XTThread *self, STRUCT_TABLE *, Field *);
 };
 
+void myxt_wait_pbxt_plugin_slot_assigned(XTThread *self);
+
 #endif

=== modified file 'storage/pbxt/src/pbms.h'
--- a/storage/pbxt/src/pbms.h	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/pbms.h	2009-11-24 10:55:06 +0000
@@ -344,16 +344,16 @@ public:
 	int couldBeURL(char *blob_url, int size)
 	{
 		if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
-			char			buffer[PBMS_BLOB_URL_SIZE+1];
-			u_int32_t		db_id = 0;
-			u_int32_t		tab_id = 0;
-			u_int64_t		blob_id = 0;
-			u_int64_t		blob_ref_id = 0;
-			u_int64_t		blob_size = 0;
-			u_int32_t		auth_code = 0;
-			u_int32_t		server_id = 0;
-			char		type, junk[5];
-			int			scanned;
+			char				buffer[PBMS_BLOB_URL_SIZE+1];
+			unsigned long		db_id = 0;
+			unsigned long		tab_id = 0;
+			unsigned long long	blob_id = 0;
+			unsigned long long	blob_ref_id = 0;
+			unsigned long long	blob_size = 0;
+			unsigned long		auth_code = 0;
+			unsigned long		server_id = 0;
+			char				type, junk[5];
+			int					scanned;
 
 			junk[0] = 0;
 			if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
@@ -364,12 +364,12 @@ public:
 			
 			scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
 			if (scanned != 8) {// If junk is found at the end this will also result in an invalid URL. 
-		printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
+				printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 				return 0;
 			}
 			
 			if (junk[0] || (type != '~' && type != '_')) {
-		printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
+				printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
 				return 0;
 			}
 		

=== modified file 'storage/pbxt/src/pbms_enabled.cc'
--- a/storage/pbxt/src/pbms_enabled.cc	2009-10-06 15:16:01 +0000
+++ b/storage/pbxt/src/pbms_enabled.cc	2009-11-24 10:55:06 +0000
@@ -29,15 +29,10 @@
  *
  */
 
-/*
-  The following two lines backported by psergey. Remove them when we merge from PBXT again.
-*/
 #include "xt_config.h"
-#ifdef PBMS_ENABLED
 
-#define PBMS_API	pbms_enabled_api
+#ifdef PBMS_ENABLED
 
-#include "pbms_enabled.h"
 #ifdef DRIZZLED
 #include <sys/stat.h>
 #include <drizzled/common_includes.h>
@@ -47,11 +42,15 @@
 #include <mysql/plugin.h>
 #define session_alloc(sess, size) thd_alloc(sess, size);
 #define current_session current_thd
-#endif 
+#endif
 
-#define GET_BLOB_FIELD(t, i) (Field_blob *)(t->field[t->s->blob_field[i]])
-#define DB_NAME(f) (f->table->s->db.str)
-#define TAB_NAME(f) (*(f->table_name))
+#define GET_BLOB_FIELD(t, i)	(Field_blob *)(t->field[t->s->blob_field[i]])
+#define DB_NAME(f)				(f->table->s->db.str)
+#define TAB_NAME(f)				(*(f->table_name))
+
+#define PBMS_API	pbms_enabled_api
+
+#include "pbms_enabled.h"
 
 static PBMS_API pbms_api;
 
@@ -242,4 +241,4 @@ void pbms_completed(TABLE *table, bool o
 	 return ;
 }
 
-#endif
\ No newline at end of file
+#endif // PBMS_ENABLED

=== modified file 'storage/pbxt/src/pbms_enabled.h'
--- a/storage/pbxt/src/pbms_enabled.h	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/pbms_enabled.h	2009-11-24 10:55:06 +0000
@@ -35,13 +35,6 @@
 
 #include "pbms.h"
 
-#ifdef DRIZZLED
-#include <drizzled/server_includes.h>
-#define TABLE Table
-#else
-#include <mysql_priv.h>
-#endif
-
 /*
  * pbms_initialize() should be called from the engines plugIn's 'init()' function.
  * The engine_name is the name of your engine, "PBXT" or "InnoDB" for example.

=== modified file 'storage/pbxt/src/pthread_xt.cc'
--- a/storage/pbxt/src/pthread_xt.cc	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/pthread_xt.cc	2009-11-24 10:55:06 +0000
@@ -578,8 +578,8 @@ xtPublic int xt_p_mutex_unlock(xt_mutex_
 
 xtPublic int xt_p_mutex_destroy(xt_mutex_type *mutex)
 {
-	ASSERT_NS(mutex->mu_init == 12345);
-	mutex->mu_init = 89898;
+	//ASSERT_NS(mutex->mu_init == 12345);
+	mutex->mu_init = 11111;
 #ifdef XT_THREAD_LOCK_INFO
 	xt_thread_lock_info_free(&mutex->mu_lock_info);
 #endif

=== modified file 'storage/pbxt/src/restart_xt.cc'
--- a/storage/pbxt/src/restart_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/restart_xt.cc	2009-11-27 15:37:02 +0000
@@ -34,12 +34,16 @@
 
 #include "ha_pbxt.h"
 
+#ifdef DRIZZLED
+#include <drizzled/data_home.h>
+using drizzled::plugin::Registry;
+#endif
+
 #include "xactlog_xt.h"
 #include "database_xt.h"
 #include "util_xt.h"
 #include "strutil_xt.h"
 #include "filesys_xt.h"
-#include "restart_xt.h"
 #include "myxt_xt.h"
 #include "trace_xt.h"
 
@@ -57,13 +61,27 @@
 //#define PRINTF		xt_ftracef
 //#define PRINTF		xt_trace
 
-void xt_print_bytes(xtWord1 *buf, u_int len)
+/*
+ * -----------------------------------------------------------------------
+ * GLOBALS
+ */
+
+xtPublic int				pbxt_recovery_state;
+
+/*
+ * -----------------------------------------------------------------------
+ * UTILITIES
+ */
+
+#ifdef TRACE_RECORD_DATA
+static void xt_print_bytes(xtWord1 *buf, u_int len)
 {
 	for (u_int i=0; i<len; i++) {
 		PRINTF("%02x ", (u_int) *buf);
 		buf++;
 	}
 }
+#endif
 
 void xt_print_log_record(xtLogID log, xtLogOffset offset, XTXactLogBufferDPtr record)
 {
@@ -252,7 +270,7 @@ void xt_print_log_record(xtLogID log, xt
 			rec_type = "DELETE";
 			break;
 		case XT_LOG_ENT_DELETE_FL:
-			rec_type = "DELETE-FL-BG";
+			rec_type = "DELETE-FL";
 			break;
 		case XT_LOG_ENT_UPDATE_BG:
 			rec_type = "UPDATE-BG";
@@ -320,6 +338,11 @@ void xt_print_log_record(xtLogID log, xt
 		case XT_LOG_ENT_END_OF_LOG:
 			rec_type = "END OF LOG";
 			break;
+		case XT_LOG_ENT_PREPARE:
+			rec_type = "PREPARE";
+			xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
+			xn_set = TRUE;
+			break;
 	}
 
 	if (log)
@@ -327,6 +350,8 @@ void xt_print_log_record(xtLogID log, xt
 	PRINTF("%s ", rec_type);
 	if (type)
 		PRINTF("op=%lu tab=%lu %s=%lu ", (u_long) op_no, (u_long) tab_id, type, (u_long) rec_id);
+	else if (tab_id)
+		PRINTF("tab=%lu ", (u_long) tab_id);
 	if (row_id)
 		PRINTF("row=%lu ", (u_long) row_id);
 	if (log_id)
@@ -459,6 +484,7 @@ static xtBool xres_open_table(XTThreadPt
 		}
 		return OK;
 	}
+
 	ws->ws_tab_gone = tab_id;
 	return FAILED;
 }
@@ -629,6 +655,7 @@ static void xres_apply_change(XTThreadPt
 	xtWord1				*rec_data = NULL;
 	XTTabRecFreeDPtr	free_data;
 
+	ASSERT(ot->ot_thread == self);
 	if (tab->tab_dic.dic_key_count == 0)
 		check_index = FALSE;
 
@@ -1300,7 +1327,7 @@ static void xres_apply_operations(XTThre
  * These operations are applied even though operations
  * in sequence are missing.
  */
-xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
+static xtBool xres_sync_operations(XTThreadPtr self, XTDatabaseHPtr db, XTWriterStatePtr ws)
 {
 	u_int			edx;
 	XTTableEntryPtr	te_ptr;
@@ -1881,15 +1908,6 @@ xtBool XTXactRestart::xres_check_checksu
 void XTXactRestart::xres_recover_progress(XTThreadPtr self, XTOpenFilePtr *of, int perc)
 {
 #ifdef XT_USE_GLOBAL_DB
-	if (!perc) {
-		char file_path[PATH_MAX];
-
-		xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
-		xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
-		*of = xt_open_file(self, file_path, XT_FS_CREATE | XT_FS_MAKE_PATH);
-		xt_set_eof_file(self, *of, 0);
-	}
-
 	if (perc > 100) {
 		char file_path[PATH_MAX];
 
@@ -1905,6 +1923,15 @@ void XTXactRestart::xres_recover_progres
 	else {
 		char number[40];
 
+		if (!*of) {
+			char file_path[PATH_MAX];
+
+			xt_strcpy(PATH_MAX, file_path, xres_db->db_main_path);
+			xt_add_pbxt_file(PATH_MAX, file_path, "recovery-progress");
+			*of = xt_open_file(self, file_path, XT_FS_CREATE | XT_FS_MAKE_PATH);
+			xt_set_eof_file(self, *of, 0);
+		}
+
 		sprintf(number, "%d", perc);
 		if (!xt_pwrite_file(*of, 0, strlen(number), number, &self->st_statistics.st_x, self))
 			xt_throw(self);
@@ -1927,10 +1954,11 @@ xtBool XTXactRestart::xres_restart(XTThr
 	off_t					bytes_to_read;
 	volatile xtBool			print_progress = FALSE;
 	volatile off_t			perc_size = 0, next_goal = 0;
-	int						perc_complete = 1;
+	int						perc_complete = 1, perc_to_write = 1;
 	XTOpenFilePtr			progress_file = NULL;
 	xtBool					min_ram_xn_id_set = FALSE;
 	u_int					log_count;
+	time_t					start_time;
 
 	memset(&ws, 0, sizeof(ws));
 
@@ -1955,12 +1983,11 @@ xtBool XTXactRestart::xres_restart(XTThr
 	/* Don't print anything about recovering an empty database: */
 	if (bytes_to_read != 0)
 		xt_logf(XT_NT_INFO, "PBXT: Recovering from %lu-%llu, bytes to read: %llu\n", (u_long) xres_cp_log_id, (u_llong) xres_cp_log_offset, (u_llong) bytes_to_read);
-	if (bytes_to_read >= 10*1024*1024) {
-		print_progress = TRUE;
-		perc_size = bytes_to_read / 100;
-		next_goal = perc_size;
-		xres_recover_progress(self, &progress_file, 0);
-	}
+
+	print_progress = FALSE;
+	start_time = time(NULL);
+	perc_size = bytes_to_read / 100;
+	next_goal = perc_size;
 
 	if (!db->db_xlog.xlog_seq_start(&ws.ws_seqread, xres_cp_log_id, xres_cp_log_offset, FALSE)) {
 		ok = FALSE;
@@ -1983,17 +2010,28 @@ xtBool XTXactRestart::xres_restart(XTThr
 #ifdef PRINT_LOG_ON_RECOVERY
 			xt_print_log_record(ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
 #endif
-			if (print_progress && bytes_read > next_goal) {
-				if (((perc_complete - 1) % 25) == 0)
-					xt_logf(XT_NT_INFO, "PBXT: ");
-				if ((perc_complete % 25) == 0)
-					xt_logf(XT_NT_INFO, "%2d\n", (int) perc_complete);
-				else
-					xt_logf(XT_NT_INFO, "%2d ", (int) perc_complete);
-				xt_log_flush(self);
-				xres_recover_progress(self, &progress_file, perc_complete);
-				next_goal += perc_size;
-				perc_complete++;
+			if (bytes_read >= next_goal) {
+				while (bytes_read >= next_goal) {
+					next_goal += perc_size;
+					perc_complete++;
+				}
+				if (!print_progress) {
+					if (time(NULL) - start_time > 2)
+						print_progress = TRUE;
+				}
+				if (print_progress) {
+					while (perc_to_write < perc_complete) {
+						if (((perc_to_write - 1) % 25) == 0)
+							xt_logf(XT_NT_INFO, "PBXT: ");
+						if ((perc_to_write % 25) == 0)
+							xt_logf(XT_NT_INFO, "%2d\n", (int) perc_to_write);
+						else
+							xt_logf(XT_NT_INFO, "%2d ", (int) perc_to_write);
+						xt_log_flush(self);
+						xres_recover_progress(self, &progress_file, perc_to_write);
+						perc_to_write++;
+					}
+				}
 			}
 			switch (record->xl.xl_status_1) {
 				case XT_LOG_ENT_HEADER:
@@ -2053,8 +2091,11 @@ xtBool XTXactRestart::xres_restart(XTThr
 						xact->xd_end_xn_id = xn_id;
 						xact->xd_flags |= XT_XN_XAC_ENDED | XT_XN_XAC_SWEEP;
 						xact->xd_flags &= ~XT_XN_XAC_RECOVERED; // We can expect an end record on cleanup!
+						xact->xd_flags &= ~XT_XN_XAC_PREPARED;  // Prepared transactions cannot be swept!
 						if (record->xl.xl_status_1 == XT_LOG_ENT_COMMIT)
 							xact->xd_flags |= XT_XN_XAC_COMMITTED;
+						if (xt_sl_get_size(db->db_xn_xa_list) > 0)
+							xt_xn_delete_xa_data_by_xact(db, xn_id, self);
 					}
 					break;
 				case XT_LOG_ENT_CLEANUP:
@@ -2071,6 +2112,14 @@ xtBool XTXactRestart::xres_restart(XTThr
 					rec_log_id = XT_GET_DISK_4(record->xl.xl_log_id_4);
 					xt_dl_set_to_delete(self, db, rec_log_id);
 					break;
+				case XT_LOG_ENT_PREPARE:
+					xn_id = XT_GET_DISK_4(record->xp.xp_xact_id_4);
+					if ((xact = xt_xn_get_xact(db, xn_id, self))) {
+						xact->xd_flags |= XT_XN_XAC_PREPARED;
+						if (!xt_xn_store_xa_data(db, xn_id, record->xp.xp_xa_len_1, record->xp.xp_xa_data, self))
+							xt_throw(self);
+					}
+					break;
 				default:
 					xt_xres_apply_in_order(self, &ws, ws.ws_seqread.xseq_rec_log_id, ws.ws_seqread.xseq_rec_log_offset, record);
 					break;
@@ -2534,7 +2583,8 @@ static void xres_cp_main(XTThreadPtr sel
 				/* This condition means we could checkpoint: */
 				if (!(xt_sl_get_size(db->db_datalogs.dlc_to_delete) == 0 &&
 					xt_sl_get_size(db->db_datalogs.dlc_deleted) == 0 &&
-					xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0))
+					xt_comp_log_pos(log_id, log_offset, db->db_restart.xres_cp_log_id, db->db_restart.xres_cp_log_offset) <= 0) &&
+					xt_sl_get_size(db->db_xn_xa_list) == 0)
 					break;
 
 				xres_cp_wait_for_log_writer(self, db, 400);
@@ -2654,7 +2704,7 @@ xtPublic xtBool xt_begin_checkpoint(XTDa
 	 * until they are flushed.
 	 */
 	/* This is an alternative to the above.
-	if (!xt_xlog_flush_log(self))
+	if (!xt_xlog_flush_log(db, self))
 		xt_throw(self);
 	*/
 	xt_lock_mutex_ns(&db->db_wr_lock);
@@ -2776,6 +2826,14 @@ xtPublic xtBool xt_end_checkpoint(XTData
 	size_t					chk_size = 0; 
 	u_int					no_of_logs = 0; 
 
+	/* As long as we have outstanding XA transactions, we may not checkpoint! */
+	if (xt_sl_get_size(db->db_xn_xa_list) > 0) {
+#ifdef DEBUG
+		printf("Checkpoint must wait\n");
+#endif
+		return OK;
+	}
+
 #ifdef NEVER_CHECKPOINT
 	return OK;
 #endif
@@ -3183,7 +3241,7 @@ xtPublic void xt_dump_xlogs(XTDatabaseHP
  * D A T A B A S E   R E C O V E R Y   T H R E A D
  */
 
-extern XTDatabaseHPtr	pbxt_database;
+
 static XTThreadPtr		xres_recovery_thread;
 
 static void *xn_xres_run_recovery_thread(XTThreadPtr self)
@@ -3193,18 +3251,18 @@ static void *xn_xres_run_recovery_thread
 	if (!(mysql_thread = (THD *) myxt_create_thread()))
 		xt_throw(self);
 
-	while (!xres_recovery_thread->t_quit && !ha_resolve_by_legacy_type(mysql_thread, DB_TYPE_PBXT))
-		xt_sleep_milli_second(1);
+	myxt_wait_pbxt_plugin_slot_assigned(self);
 
 	if (!xres_recovery_thread->t_quit) {
-		/* {GLOBAL-DB}
-		 * It can happen that something will just get in before this
-		 * thread and open/recover the database!
-		 */
-		if (!pbxt_database) {
-			try_(a) {
+		try_(a) {
+			/* {GLOBAL-DB}
+			 * It can happen that something will just get in before this
+			 * thread and open/recover the database!
+			 */
+			if (!pbxt_database) {
 				xt_open_database(self, mysql_real_data_home, TRUE);
-				/* This can be done at the same time by a foreground thread,
+				/* {GLOBAL-DB}
+				 * This can be done at the same time as the recovery thread,
 				 * strictly speaking I need a lock.
 				 */
 				if (!pbxt_database) {
@@ -3212,11 +3270,22 @@ static void *xn_xres_run_recovery_thread
 					xt_heap_reference(self, pbxt_database);
 				}
 			}
-			catch_(a) {
-				xt_log_and_clear_exception(self);
-			}
-			cont_(a);
+			else
+				xt_use_database(self, pbxt_database, XT_FOR_USER);
+
+			pbxt_recovery_state = XT_RECOVER_DONE;
+
+			/* {WAIT-FOR-SW-AFTER-RECOV}
+			 * Moved to here...
+			 */
+			xt_wait_for_sweeper(self, self->st_database, 0);
+
+			pbxt_recovery_state = XT_RECOVER_SWEPT;
 		}
+		catch_(a) {
+			xt_log_and_clear_exception(self);
+		}
+		cont_(a);
 	}
 
    /*
@@ -3261,11 +3330,12 @@ xtPublic void xt_xres_start_database_rec
 	sprintf(name, "DB-RECOVERY-%s", xt_last_directory_of_path(mysql_real_data_home));
 	xt_remove_dir_char(name);
 
+	pbxt_recovery_state = XT_RECOVER_PENDING;
 	xres_recovery_thread = xt_create_daemon(self, name);
 	xt_run_thread(self, xres_recovery_thread, xn_xres_run_recovery_thread);
 }
 
-xtPublic void xt_xres_wait_for_recovery(XTThreadPtr self)
+xtPublic void xt_xres_terminate_recovery(XTThreadPtr self)
 {
 	XTThreadPtr thr_rec;
 

=== modified file 'storage/pbxt/src/restart_xt.h'
--- a/storage/pbxt/src/restart_xt.h	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/restart_xt.h	2009-11-24 10:55:06 +0000
@@ -37,6 +37,8 @@ struct XTOpenTable;
 struct XTDatabase;
 struct XTTable;
 
+extern int				pbxt_recovery_state;
+
 typedef struct XTWriterState {
 	struct XTDatabase		*ws_db;
 	xtBool					ws_in_recover;
@@ -132,6 +134,16 @@ void xt_print_log_record(xtLogID log, of
 void xt_dump_xlogs(struct XTDatabase *db, xtLogID start_log);
 
 void xt_xres_start_database_recovery(XTThreadPtr self);
-void xt_xres_wait_for_recovery(XTThreadPtr self);
+void xt_xres_terminate_recovery(XTThreadPtr self);
+
+#define XT_RECOVER_PENDING			0
+#define XT_RECOVER_DONE				1
+#define XT_RECOVER_SWEPT			2
+
+inline void xt_xres_wait_for_recovery(XTThreadPtr XT_UNUSED(self), int state)
+{
+	while (pbxt_recovery_state < state)
+		xt_sleep_milli_second(100);
+}
 
 #endif

=== modified file 'storage/pbxt/src/strutil_xt.cc'
--- a/storage/pbxt/src/strutil_xt.cc	2009-10-26 11:35:42 +0000
+++ b/storage/pbxt/src/strutil_xt.cc	2009-11-24 10:55:06 +0000
@@ -21,8 +21,10 @@
  * H&G2JCtL
  */
 
-#include "mysql_priv.h"
 #include "xt_config.h"
+
+#include <stdio.h>
+#include <string.h>
 #include <ctype.h>
 
 #include "strutil_xt.h"
@@ -107,13 +109,17 @@ xtPublic void xt_2nd_last_name_of_path(s
 		*dest = 0;
 		return;
 	}
-        /* If temporary file */
-        if (!is_prefix(path, mysql_data_home) &&
+
+ 	/* {INVALID-OLD-TABLE-FIX}
+	 * I have changed the implementation of
+	 * this bug fix (see {INVALID-OLD-TABLE-FIX}).
+       if (!is_prefix(path, mysql_data_home) &&
             !is_prefix(path, mysql_real_data_home))
         {
           *dest= 0;
           return;
         }
+	 */
 
 	ptr = path + len - 1;
 	while (ptr != path && !XT_IS_DIR_CHAR(*ptr))
@@ -374,7 +380,7 @@ xtPublic void xt_int8_to_byte_size(xtInt
 /* Version number must also be set in configure.in! */
 xtPublic c_char *xt_get_version(void)
 {
-	return "1.0.08d RC";
+	return "1.0.09f RC";
 }
 
 /* Copy and URL decode! */

=== modified file 'storage/pbxt/src/systab_xt.cc'
--- a/storage/pbxt/src/systab_xt.cc	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/systab_xt.cc	2009-11-24 10:55:06 +0000
@@ -130,7 +130,7 @@ static int pbms_discover_handler(handler
  * MYSQL UTILITIES
  */
 
-void xt_my_set_notnull_in_record(Field *field, char *record)
+static void xt_my_set_notnull_in_record(Field *field, char *record)
 {
 	if (field->null_ptr)
 		record[(uint) (field->null_ptr - (uchar *) field->table->record[0])] &= (uchar) ~field->null_bit;
@@ -518,7 +518,7 @@ bool XTStatisticsTable::seqScanRead(xtWo
  * SYSTEM TABLE SHARES
  */
 
-void st_path_to_table_name(size_t size, char *buffer, const char *path)
+static void st_path_to_table_name(size_t size, char *buffer, const char *path)
 {
 	char *str;
 

=== modified file 'storage/pbxt/src/tabcache_xt.cc'
--- a/storage/pbxt/src/tabcache_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/tabcache_xt.cc	2009-11-27 15:37:02 +0000
@@ -590,7 +590,7 @@ xtBool XTTabCache::tc_fetch(XT_ROW_REC_F
 		 * So there could be a deadlock if I don't flush the log!
 		 */
 		if ((self = xt_get_self())) {
-			if (!xt_xlog_flush_log(self))
+			if (!xt_xlog_flush_log(tci_table->tab_db, self))
 				goto failed;
 		}
 
@@ -1150,6 +1150,8 @@ static void *tabc_fr_run_thread(XTThread
 	int		count;
 	void	*mysql_thread;
 
+	myxt_wait_pbxt_plugin_slot_assigned(self);
+
 	mysql_thread = myxt_create_thread();
 
 	while (!self->t_quit) {

=== modified file 'storage/pbxt/src/tabcache_xt.h'
--- a/storage/pbxt/src/tabcache_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/tabcache_xt.h	2009-11-24 10:55:06 +0000
@@ -168,7 +168,7 @@ typedef struct XTTableSeq {
 #define TAB_CAC_UNLOCK(i, o)			xt_xsmutex_unlock(i, o)
 #elif defined(TAB_CAC_USE_PTHREAD_RW)
 #define TAB_CAC_LOCK_TYPE				xt_rwlock_type
-#define TAB_CAC_INIT_LOCK(s, i)			xt_init_rwlock(s, i)
+#define TAB_CAC_INIT_LOCK(s, i)			xt_init_rwlock_with_autoname(s, i)
 #define TAB_CAC_FREE_LOCK(s, i)			xt_free_rwlock(i)	
 #define TAB_CAC_READ_LOCK(i, o)			xt_slock_rwlock_ns(i)
 #define TAB_CAC_WRITE_LOCK(i, o)		xt_xlock_rwlock_ns(i)

=== modified file 'storage/pbxt/src/table_xt.cc'
--- a/storage/pbxt/src/table_xt.cc	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/table_xt.cc	2009-11-25 15:40:51 +0000
@@ -35,7 +35,6 @@
 #include <drizzled/common.h>
 #include <mysys/thr_lock.h>
 #include <drizzled/dtcollation.h>
-#include <drizzled/plugin/storage_engine.h>
 #else
 #include "mysql_priv.h"
 #endif
@@ -48,7 +47,6 @@
 #include "cache_xt.h"
 #include "trace_xt.h"
 #include "index_xt.h"
-#include "restart_xt.h"
 #include "systab_xt.h"
 
 #ifdef DEBUG
@@ -2347,7 +2345,7 @@ xtPublic void xt_flush_table(XTThreadPtr
 
 }
 
-xtPublic XTOpenTablePtr tab_open_table(XTTableHPtr tab)
+static XTOpenTablePtr tab_open_table(XTTableHPtr tab)
 {
 	volatile XTOpenTablePtr	ot;
 	XTThreadPtr				self;
@@ -2588,7 +2586,7 @@ xtPublic xtBool xt_tab_put_log_op_rec_da
 			return FAILED;
 	}
 
-	return xt_xlog_modify_table(ot, status, op_seq, free_rec_id, rec_id, size, buffer);
+	return xt_xlog_modify_table(tab->tab_id, status, op_seq, free_rec_id, rec_id, size, buffer, ot->ot_thread);
 }
 
 xtPublic xtBool xt_tab_put_log_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq)
@@ -2606,7 +2604,7 @@ xtPublic xtBool xt_tab_put_log_rec_data(
 			return FAILED;
 	}
 
-	return xt_xlog_modify_table(ot, status, *op_seq, free_rec_id, rec_id, size, buffer);
+	return xt_xlog_modify_table(tab->tab_id, status, *op_seq, free_rec_id, rec_id, size, buffer, ot->ot_thread);
 }
 
 xtPublic xtBool xt_tab_get_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer)
@@ -3541,7 +3539,7 @@ xtPublic xtBool xt_tab_free_row(XTOpenTa
 	tab->tab_row_fnum++;
 	xt_unlock_mutex_ns(&tab->tab_row_lock);
 
-	if (!xt_xlog_modify_table(ot, XT_LOG_ENT_ROW_FREED, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row))
+	if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_ROW_FREED, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &free_row, ot->ot_thread))
 		return FAILED;
 
 	return OK;
@@ -3791,7 +3789,7 @@ xtPublic int xt_tab_remove_record(XTOpen
 	xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
 	free_rec->rf_rec_type_1 = old_rec_type;
-	return xt_xlog_modify_table(ot, XT_LOG_ENT_REC_REMOVED_BI, op_seq, (xtRecordID) new_rec_type, rec_id, rec_size, ot->ot_row_rbuffer);
+	return xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_REMOVED_BI, op_seq, (xtRecordID) new_rec_type, rec_id, rec_size, ot->ot_row_rbuffer, ot->ot_thread);
 }
 
 static xtRowID tab_new_row(XTOpenTablePtr ot, XTTableHPtr tab)
@@ -3837,7 +3835,7 @@ static xtRowID tab_new_row(XTOpenTablePt
 	op_seq = tab->tab_seq.ts_get_op_seq();
 	xt_unlock_mutex_ns(&tab->tab_row_lock);
 
-	if (!xt_xlog_modify_table(ot, status, op_seq, next_row_id, row_id, 0, NULL))
+	if (!xt_xlog_modify_table(tab->tab_id, status, op_seq, next_row_id, row_id, 0, NULL, ot->ot_thread))
 		return 0;
 
 	XT_DISABLED_TRACE(("new row tx=%d row=%d\n", (int) ot->ot_thread->st_xact_data->xd_start_xn_id, (int) row_id));
@@ -3868,7 +3866,7 @@ xtPublic xtBool xt_tab_set_row(XTOpenTab
 	if (!tab->tab_rows.xt_tc_write(ot->ot_row_file, row_id, 0, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, &op_seq, TRUE, ot->ot_thread))
 		return FAILED;
 
-	return xt_xlog_modify_table(ot, status, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf);
+	return xt_xlog_modify_table(tab->tab_id, status, op_seq, 0, row_id, sizeof(XTTabRowRefDRec), (xtWord1 *) &row_buf, ot->ot_thread);
 }
 
 xtPublic xtBool xt_tab_free_record(XTOpenTablePtr ot, u_int status, xtRecordID rec_id, xtBool clean_delete)
@@ -3937,7 +3935,7 @@ xtPublic xtBool xt_tab_free_record(XTOpe
 		tab->tab_rec_fnum++;
 		xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
-		if (!xt_xlog_modify_table(ot, status, op_seq, rec_id, rec_id, sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_stat_id_1), &free_rec.fr_stat_id_1))
+		if (!xt_xlog_modify_table(tab->tab_id, status, op_seq, rec_id, rec_id, sizeof(XTactFreeRecEntryDRec) - offsetof(XTactFreeRecEntryDRec, fr_stat_id_1), &free_rec.fr_stat_id_1, ot->ot_thread))
 			return FAILED;
 	}
 	return OK;
@@ -4016,7 +4014,7 @@ static xtBool tab_add_record(XTOpenTable
 	}
 	xt_unlock_mutex_ns(&tab->tab_rec_lock);
 
-	if (!xt_xlog_modify_table(ot, status, op_seq, next_rec_id, rec_id,  rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf))
+	if (!xt_xlog_modify_table(tab->tab_id, status, op_seq, next_rec_id, rec_id,  rec_info->ri_rec_buf_size, (xtWord1 *) rec_info->ri_fix_rec_buf, ot->ot_thread))
 		return FAILED;
 
 	if (rec_info->ri_ext_rec) {
@@ -4932,6 +4930,12 @@ xtPublic void xt_tab_seq_exit(XTOpenTabl
 #endif
 #endif
 
+xtPublic void xt_tab_seq_repeat(XTOpenTablePtr ot)
+{
+	ot->ot_seq_rec_id--;
+	ot->ot_seq_offset -= ot->ot_table->tab_dic.dic_rec_size;
+}
+
 xtPublic xtBool xt_tab_seq_next(XTOpenTablePtr ot, xtWord1 *buffer, xtBool *eof)
 {
 	register XTTableHPtr	tab = ot->ot_table;
@@ -5094,7 +5098,7 @@ static xtBool tab_exec_repair_pending(XT
 			return FALSE;
 	}
 	else {
-		if (!xt_open_file_ns(&of, file_path, XT_FS_DEFAULT))
+		if (!xt_open_file_ns(&of, file_path, XT_FS_DEFAULT | XT_FS_MISSING_OK))
 			return FALSE;
 	}
 	if (!of)
@@ -5190,15 +5194,76 @@ static xtBool tab_exec_repair_pending(XT
 	return FALSE;
 }
 
-xtPublic void tab_make_table_name(XTTableHPtr tab, char *table_name, size_t size)
+static void tab_make_table_name(XTTableHPtr tab, char *table_name, size_t size)
 {
-	char	name_buf[XT_IDENTIFIER_NAME_SIZE*3+3];
+	char	*nptr;
 
-	xt_2nd_last_name_of_path(sizeof(name_buf), name_buf, tab->tab_name->ps_path);
-	myxt_static_convert_file_name(name_buf, table_name, size);
-	xt_strcat(size, table_name, ".");
-	myxt_static_convert_file_name(xt_last_name_of_path(tab->tab_name->ps_path), name_buf, sizeof(name_buf));
-	xt_strcat(size, table_name, name_buf);
+	nptr = xt_last_name_of_path(tab->tab_name->ps_path);
+	if (xt_starts_with(nptr, "#sql")) {
+		/* {INVALID-OLD-TABLE-FIX}
+		 * Temporary files can have strange paths, for example
+		 * ..../var/tmp/mysqld.1/#sqldaec_1_6
+		 * This occurs, for example, occurs when the temp_table.test is
+		 * run using the PBXT suite in MariaDB:
+		 * ./mtr --suite=pbxt --do-test=temp_table
+		 *
+		 * Calling myxt_static_convert_file_name, with a '.', in the name
+		 * causes the error:
+		 * [ERROR] Invalid (old?) table or database name 'mysqld.1'
+		 * To prevent this, we do not convert the temporary
+		 * table names using the mysql functions.
+		 *
+		 * Note, this bug was found by Monty, and fixed by modifying
+		 * xt_2nd_last_name_of_path(), see {INVALID-OLD-TABLE-FIX}.
+		 *
+		 */
+		xt_2nd_last_name_of_path(size, table_name, tab->tab_name->ps_path);
+		xt_strcat(size, table_name, ".");
+		xt_strcat(size, table_name, nptr);
+	}
+	else {
+		char	name_buf[XT_TABLE_NAME_SIZE*3+3];
+		char	*part_ptr;
+		size_t	len;
+
+		xt_2nd_last_name_of_path(sizeof(name_buf), name_buf, tab->tab_name->ps_path);
+		myxt_static_convert_file_name(name_buf, table_name, size);
+		xt_strcat(size, table_name, ".");
+		
+		/* Handle partition extensions to table names: */
+		if ((part_ptr = strstr(nptr, "#P#")))
+			xt_strncpy(sizeof(name_buf), name_buf, nptr, part_ptr - nptr);
+		else
+			xt_strcpy(sizeof(name_buf), name_buf, nptr);
+
+		len = strlen(table_name);
+		myxt_static_convert_file_name(name_buf, table_name + len, size - len);
+
+		if (part_ptr) {
+			/* Add the partition extension (which is relevant to the engine). */
+			char	*sub_part_ptr;
+
+			part_ptr += 3;
+			if ((sub_part_ptr = strstr(part_ptr, "#SP#")))
+				xt_strncpy(sizeof(name_buf), name_buf, part_ptr, sub_part_ptr - part_ptr);
+			else
+				xt_strcpy(sizeof(name_buf), name_buf, part_ptr);
+			
+			xt_strcat(size, table_name, " (");
+			len = strlen(table_name);
+			myxt_static_convert_file_name(name_buf, table_name + len, size - len);
+			
+			if (sub_part_ptr) {
+			
+				sub_part_ptr += 4;
+				xt_strcat(size, table_name, " - ");
+				len = strlen(table_name);
+				myxt_static_convert_file_name(sub_part_ptr, table_name + len, size - len);
+			}
+
+			xt_strcat(size, table_name, ")");
+		}
+	}
 }
 
 xtPublic xtBool xt_tab_is_table_repair_pending(XTTableHPtr tab)

=== modified file 'storage/pbxt/src/table_xt.h'
--- a/storage/pbxt/src/table_xt.h	2009-08-18 07:46:53 +0000
+++ b/storage/pbxt/src/table_xt.h	2009-11-24 10:55:06 +0000
@@ -127,7 +127,7 @@ struct XTTablePath;
 #define XT_TAB_ROW_UNLOCK(i, s)			xt_xsmutex_unlock(i, (s)->t_id)
 #elif defined(XT_TAB_ROW_USE_PTHREAD_RW)
 #define XT_TAB_ROW_LOCK_TYPE			xt_rwlock_type
-#define XT_TAB_ROW_INIT_LOCK(s, i)		xt_init_rwlock(s, i)
+#define XT_TAB_ROW_INIT_LOCK(s, i)		xt_init_rwlock_with_autoname(s, i)
 #define XT_TAB_ROW_FREE_LOCK(s, i)		xt_free_rwlock(i)	
 #define XT_TAB_ROW_READ_LOCK(i, s)		xt_slock_rwlock_ns(i)
 #define XT_TAB_ROW_WRITE_LOCK(i, s)		xt_xlock_rwlock_ns(i)
@@ -528,13 +528,14 @@ xtBool				xt_table_exists(struct XTDatab
 void				xt_enum_tables_init(u_int *edx);
 XTTableEntryPtr		xt_enum_tables_next(struct XTThread *self, struct XTDatabase *db, u_int *edx);
 
-void				xt_enum_files_of_tables_init(struct XTDatabase *db, char *tab_name, xtTableID tab_id, XTFilesOfTablePtr ft);
+void				xt_enum_files_of_tables_init(XTPathStrPtr tab_name, xtTableID tab_id, XTFilesOfTablePtr ft);
 xtBool				xt_enum_files_of_tables_next(XTFilesOfTablePtr ft);
 
 xtBool				xt_tab_seq_init(XTOpenTablePtr ot);
 void				xt_tab_seq_reset(XTOpenTablePtr ot);
 void				xt_tab_seq_exit(XTOpenTablePtr ot);
 xtBool				xt_tab_seq_next(XTOpenTablePtr ot, xtWord1 *buffer, xtBool *eof);
+void				xt_tab_seq_repeat(XTOpenTablePtr ot);
 
 xtBool				xt_tab_new_record(XTOpenTablePtr ot, xtWord1 *buffer);
 xtBool				xt_tab_delete_record(XTOpenTablePtr ot, xtWord1 *buffer);

=== modified file 'storage/pbxt/src/thread_xt.cc'
--- a/storage/pbxt/src/thread_xt.cc	2009-10-06 15:16:01 +0000
+++ b/storage/pbxt/src/thread_xt.cc	2009-11-24 10:55:06 +0000
@@ -75,6 +75,13 @@ static xt_mutex_type	thr_array_lock;
 /* Global accumulated statistics: */
 static XTStatisticsRec	thr_statistics;
 
+#ifdef DEBUG
+static void break_in_assertion(c_char *expr, c_char *func, c_char *file, u_int line)
+{
+	printf("%s(%s:%d) %s\n", func, file, (int) line, expr);
+}
+#endif
+
 /*
  * -----------------------------------------------------------------------
  * Error logging
@@ -658,6 +665,9 @@ static c_char *thr_get_err_string(int xt
 		case XT_ERR_FK_REF_TEMP_TABLE:		str = "Foreign key may not reference temporary table"; break;
 		case XT_ERR_MYSQL_SHUTDOWN:			str = "Cannot open table, MySQL has shutdown"; break;
 		case XT_ERR_MYSQL_NO_THREAD:		str = "Cannot create thread, MySQL has shutdown"; break;
+		case XT_ERR_BUFFER_TOO_SMALL:		str = "System backup buffer too small"; break;
+		case XT_ERR_BAD_BACKUP_FORMAT:		str = "Unknown or corrupt backup format, restore aborted"; break;
+		case XT_ERR_PBXT_NOT_INSTALLED:		str = "PBXT plugin is not installed"; break;
 		default:							str = "Unknown XT error"; break;
 	}
 	return str;
@@ -862,6 +872,11 @@ xtPublic xtBool xt_exception_errno(XTExc
 	return FAILED;
 }
 
+xtPublic void xt_exception_xterr(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err)
+{
+	xt_exception_error(e, self, func, file, line, xt_err, 0, thr_get_err_string(xt_err));
+}
+
 /*
  * -----------------------------------------------------------------------
  * LOG ERRORS
@@ -887,7 +902,7 @@ xtPublic xtBool xt_assert(XTThreadPtr se
 #ifdef DEBUG
 	//xt_set_fflush(TRUE);
 	//xt_dump_trace();
-	printf("%s(%s:%d) %s\n", func, file, (int) line, expr);
+	break_in_assertion(expr, func, file, line);
 #ifdef CRASH_ON_ASSERT
 	abort();
 #endif

=== modified file 'storage/pbxt/src/thread_xt.h'
--- a/storage/pbxt/src/thread_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/thread_xt.h	2009-11-24 10:55:06 +0000
@@ -536,6 +536,8 @@ extern struct XTThread	**xt_thr_array;
  * Function prototypes
  */
 
+extern "C" void *thr_main(void *data);
+
 void			xt_get_now(char *buffer, size_t len);
 xtBool			xt_init_logging(void);
 void			xt_exit_logging(void);
@@ -583,6 +585,7 @@ void			xt_register_xterr(c_char *func, c
 void			xt_exceptionf(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *fmt, ...);
 void			xt_exception_error(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err, int sys_err, c_char *msg);
 xtBool			xt_exception_errno(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int err);
+void			xt_exception_xterr(XTExceptionPtr e, XTThreadPtr self, c_char *func, c_char *file, u_int line, int xt_err);
 
 void			xt_log_errno(XTThreadPtr self, c_char *func, c_char *file, u_int line, int err);
 
@@ -610,7 +613,7 @@ void			xt_critical_wait(void);
 void			xt_yield(void);
 void			xt_sleep_milli_second(u_int t);
 xtBool 			xt_suspend(XTThreadPtr self);
-xtBool 			xt_unsuspend(XTThreadPtr self, XTThreadPtr target);
+xtBool			xt_unsuspend(XTThreadPtr target);
 void			xt_lock_thread(XTThreadPtr thread);
 void			xt_unlock_thread(XTThreadPtr thread);
 xtBool			xt_wait_thread(XTThreadPtr thread);

=== modified file 'storage/pbxt/src/util_xt.cc'
--- a/storage/pbxt/src/util_xt.cc	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/util_xt.cc	2009-11-24 10:55:06 +0000
@@ -150,6 +150,23 @@ xtPublic xtWord1 xt_get_checksum1(xtWord
 	return (xtWord1) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
 }
 
+xtPublic xtWord4 xt_get_checksum4(xtWord1 *data, size_t len)
+{
+	register xtWord4	sum = 0, g;
+	xtWord1				*chk;
+
+	chk = data + len - 1;
+	while (chk > data) {
+		sum = (sum << 4) + *chk;
+		if ((g = sum & 0xF0000000)) {
+			sum = sum ^ (g >> 24);
+			sum = sum ^ g;
+		}
+		chk--;
+	}
+	return sum;
+}
+
 /*
  * --------------- Data Buffer ------------------
  */

=== modified file 'storage/pbxt/src/util_xt.h'
--- a/storage/pbxt/src/util_xt.h	2009-03-26 12:18:01 +0000
+++ b/storage/pbxt/src/util_xt.h	2009-11-24 10:55:06 +0000
@@ -39,6 +39,7 @@ xtWord4	xt_file_name_to_id(char *file_na
 xtBool	xt_time_difference(register xtWord4 now, register xtWord4 then);
 xtWord2	xt_get_checksum(xtWord1 *data, size_t len, u_int interval);
 xtWord1 xt_get_checksum1(xtWord1 *data, size_t len);
+xtWord4 xt_get_checksum4(xtWord1 *data, size_t len);
 
 typedef struct XTDataBuffer {
 	size_t			db_size;

=== modified file 'storage/pbxt/src/xaction_xt.cc'
--- a/storage/pbxt/src/xaction_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/xaction_xt.cc	2009-11-24 10:55:06 +0000
@@ -1075,6 +1075,7 @@ xtPublic void xt_xn_init_db(XTThreadPtr 
 #endif
 	xt_spinlock_init_with_autoname(self, &db->db_xn_id_lock);
 	xt_spinlock_init_with_autoname(self, &db->db_xn_wait_spinlock);
+	xt_init_mutex_with_autoname(self, &db->db_xn_xa_lock);
 	//xt_init_mutex_with_autoname(self, &db->db_xn_wait_lock);
 	//xt_init_cond(self, &db->db_xn_wait_cond);
 	xt_init_mutex_with_autoname(self, &db->db_sw_lock);
@@ -1096,6 +1097,9 @@ xtPublic void xt_xn_init_db(XTThreadPtr 
 		}
 	}
 
+	/* Create a sorted list for XA transactions recovered: */
+	db->db_xn_xa_list = xt_new_sortedlist(self, sizeof(XTXactXARec), 100, 50, xt_xn_xa_compare, db, NULL, FALSE, FALSE);
+
 	/* Initialize the data logs: */
 	db->db_datalogs.dlc_init(self, db); 
 
@@ -1146,6 +1150,7 @@ xtPublic void xt_xn_exit_db(XTThreadPtr 
 	printf("=========> MAX TXs NOT CLEAN: %lu\n", not_clean_max);
 	printf("=========> MAX TXs IN RAM: %lu\n", in_ram_max);
 #endif
+	XTXactPreparePtr xap, xap_next;
 
 	xt_stop_sweeper(self, db);	// Should be done already!
 	xt_stop_writer(self, db);	// Should be done already!
@@ -1187,6 +1192,19 @@ xtPublic void xt_xn_exit_db(XTThreadPtr 
 	xt_free_mutex(&db->db_sw_lock);
 	//xt_free_cond(&db->db_xn_wait_cond);
 	//xt_free_mutex(&db->db_xn_wait_lock);
+	xt_free_mutex(&db->db_xn_xa_lock);
+	for (u_int i=0; i<XT_XA_HASH_TAB_SIZE; i++) {
+		xap = db->db_xn_xa_table[i];
+		while (xap) {
+			xap_next = xap->xp_next;
+			xt_free(self, xap);
+			xap = xap_next;
+		}
+	}
+	if (db->db_xn_xa_list) {
+		xt_free_sortedlist(self, db->db_xn_xa_list);
+		db->db_xn_xa_list = NULL;
+	}
 	xt_spinlock_free(self, &db->db_xn_wait_spinlock);
 	xt_spinlock_free(self, &db->db_xn_id_lock);
 #ifdef DEBUG_RAM_LIST
@@ -1428,7 +1446,7 @@ static xtBool xn_end_xact(XTThreadPtr th
 			wait_xn_id = thread->st_prev_xact[thread->st_last_xact];
 			thread->st_prev_xact[thread->st_last_xact] = xn_id;
 			/* This works because XT_MAX_XACT_BEHIND == 2! */
-			ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == thread->st_last_xact ^ 1);
+			ASSERT_NS((thread->st_last_xact + 1) % XT_MAX_XACT_BEHIND == (thread->st_last_xact ^ 1));
 			thread->st_last_xact ^= 1;
 			while (xt_xn_is_before(db->db_xn_to_clean_id, wait_xn_id) && (db->db_sw_faster & XT_SW_TOO_FAR_BEHIND)) {
 				xt_critical_wait();
@@ -1548,6 +1566,149 @@ xtPublic int xt_xn_status(XTOpenTablePtr
 	return XT_XN_ABORTED;
 }
 
+/* ----------------------------------------------------------------------
+ * XA Functionality
+ */
+ 
+xtPublic int xt_xn_xa_compare(XTThreadPtr XT_UNUSED(self), register const void *XT_UNUSED(thunk), register const void *a, register const void *b)
+{
+	xtXactID	*x = (xtXactID *) a;
+	XTXactXAPtr	y = (XTXactXAPtr) b;
+
+	if (*x == y->xx_xact_id)
+		return 0;
+	if (xt_xn_is_before(*x, y->xx_xact_id))
+		return -1;
+	return 1;
+}
+
+xtPublic xtBool xt_xn_prepare(int len, xtWord1 *xa_data, XTThreadPtr thread)
+{
+	XTXactDataPtr xact;
+
+	ASSERT_NS(thread->st_xact_data);
+	if ((xact = thread->st_xact_data)) {
+		xtXactID xn_id = xact->xd_start_xn_id;
+
+		/* Only makes sense if the transaction has already been logged: */
+		if ((thread->st_xact_data->xd_flags & XT_XN_XAC_LOGGED)) {
+			if (!xt_xlog_modify_table(0, XT_LOG_ENT_PREPARE, xn_id, 0, 0, len, xa_data, thread))
+				return FAILED;
+		}
+	}
+	return OK;
+}
+
+xtPublic xtBool xt_xn_store_xa_data(XTDatabaseHPtr db, xtXactID xact_id, int len, xtWord1 *xa_data, XTThreadPtr XT_UNUSED(thread))
+{
+	XTXactPreparePtr	xap;
+	u_int				idx;
+	XTXactXARec			xx;
+
+	if (!(xap = (XTXactPreparePtr) xt_malloc_ns(offsetof(XTXactPrepareRec, xp_xa_data) + len)))
+		return FAILED;
+	xap->xp_xact_id = xact_id;
+	xap->xp_hash = xt_get_checksum4(xa_data, len);
+	xap->xp_data_len = len;
+	memcpy(xap->xp_xa_data, xa_data, len);
+	xx.xx_xact_id = xact_id;
+	xx.xx_xa_ptr = xap;
+
+	idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
+	xt_lock_mutex_ns(&db->db_xn_xa_lock);
+	if (!xt_sl_insert(NULL, db->db_xn_xa_list, &xact_id, &xx)) {
+		xt_unlock_mutex_ns(&db->db_xn_xa_lock);
+		xt_free_ns(xap);
+	}
+	xap->xp_next = db->db_xn_xa_table[idx];
+	db->db_xn_xa_table[idx] = xap;
+	xt_unlock_mutex_ns(&db->db_xn_xa_lock);
+	return OK;
+}
+
+xtPublic void xt_xn_delete_xa_data_by_xact(XTDatabaseHPtr db, xtXactID xact_id, XTThreadPtr thread)
+{
+	XTXactXAPtr xx;
+
+	xt_lock_mutex_ns(&db->db_xn_xa_lock);
+	if (!(xx = (XTXactXAPtr) xt_sl_find(NULL, db->db_xn_xa_list, &xact_id)))
+		return;
+	xt_xn_delete_xa_data(db, xx->xx_xa_ptr, TRUE, thread);
+}
+
+xtPublic void xt_xn_delete_xa_data(XTDatabaseHPtr db, XTXactPreparePtr xap, xtBool unlock, XTThreadPtr XT_UNUSED(thread))
+{
+	u_int				idx;
+	XTXactPreparePtr	xap_ptr, xap_pptr = NULL;
+
+	xt_sl_delete(NULL, db->db_xn_xa_list, &xap->xp_xact_id);
+	idx = xap->xp_hash % XT_XA_HASH_TAB_SIZE;
+	xap_ptr = db->db_xn_xa_table[idx];
+	while (xap_ptr) {
+		if (xap_ptr == xap)
+			break;
+		xap_pptr = xap_ptr;
+		xap_ptr = xap_ptr->xp_next;
+	}
+	if (xap_ptr) {
+		if (xap_pptr)
+			xap_pptr->xp_next = xap_ptr->xp_next;
+		else
+			db->db_xn_xa_table[idx] = xap_ptr->xp_next;
+		xt_free_ns(xap);
+	}
+	if (unlock)
+		xt_unlock_mutex_ns(&db->db_xn_xa_lock);
+}
+
+xtPublic XTXactPreparePtr xt_xn_find_xa_data(XTDatabaseHPtr db, int len, xtWord1 *xa_data, xtBool lock, XTThreadPtr XT_UNUSED(thread))
+{
+	xtWord4				hash;
+	XTXactPreparePtr	xap;
+	u_int				idx;
+
+	if (lock)
+		xt_lock_mutex_ns(&db->db_xn_xa_lock);
+	hash = xt_get_checksum4(xa_data, len);
+	idx = hash % XT_XA_HASH_TAB_SIZE;
+	xap = db->db_xn_xa_table[idx];
+	while (xap) {
+		if (xap->xp_hash == hash &&
+			xap->xp_data_len == len &&
+			memcmp(xap->xp_xa_data, xa_data, len) == 0) {
+			break;
+		}
+		xap = xap->xp_next;
+	}
+	
+	return xap;
+}
+
+xtPublic XTXactPreparePtr xt_xn_enum_xa_data(XTDatabaseHPtr db, XTXactEnumXAPtr exa)
+{
+	XTXactXAPtr xx;
+
+	if (!exa->exa_locked) {
+		xt_lock_mutex_ns(&db->db_xn_xa_lock);
+		exa->exa_locked = TRUE;
+	}
+
+	if ((xx = (XTXactXAPtr) xt_sl_item_at(db->db_xn_xa_list, exa->exa_index))) {
+		exa->exa_index++;
+		return xx->xx_xa_ptr;
+	}
+
+	if (exa->exa_locked) {
+		exa->exa_locked = FALSE;
+		xt_unlock_mutex_ns(&db->db_xn_xa_lock);
+	}
+	return NULL;
+}
+
+/* ----------------------------------------------------------------------
+ * S W E E P E R    F U N C T I O N S
+ */
+
 xtPublic xtWord8 xt_xn_bytes_to_sweep(XTDatabaseHPtr db, XTThreadPtr thread)
 {
 	xtXactID				xn_id;
@@ -2047,7 +2208,7 @@ static xtBool xn_sw_cleanup_variation(XT
 				if(!tab->tab_recs.xt_tc_write_cond(self, ot->ot_rec_file, rec_id, rec_head.tr_rec_type_1, &op_seq, xn_id, row_id, stat_id, rec_type))
 					/* this means record was not updated by xt_tc_write_bor and doesn't need to */
 					break;
-				if (!xt_xlog_modify_table(ot, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, rec_id, 1, &rec_head.tr_rec_type_1))
+				if (!xt_xlog_modify_table(tab->tab_id, XT_LOG_ENT_REC_CLEANED_1, op_seq, 0, rec_id, 1, &rec_head.tr_rec_type_1, self))
 					throw_();
 				xn_sw_clean_indices(self, ot, rec_id, row_id, rec_buf, ss->ss_databuf.db_data);
 				break;
@@ -2397,8 +2558,10 @@ static void xn_sw_main(XTThreadPtr self)
 			if ((xact = xt_xn_get_xact(db, db->db_xn_to_clean_id, self))) {
 				xtXactID xn_id;
 
-				if (!(xact->xd_flags & XT_XN_XAC_SWEEP))
-					/* Transaction has not yet ending, and ready to sweep. */
+				/* The sweep flag is set when the transaction is ready for sweeping.
+				 * Prepared transactions may not be swept!
+				 */
+				if (!(xact->xd_flags & XT_XN_XAC_SWEEP) || (xact->xd_flags & XT_XN_XAC_PREPARED))
 					goto sleep;
 
 				/* Check if we can cleanup the transaction.
@@ -2493,7 +2656,7 @@ static void xn_sw_main(XTThreadPtr self)
 				 * we flush the log.
 				 */
 				if (now >= idle_start + 2) {
-					if (!xt_xlog_flush_log(self))
+					if (!xt_xlog_flush_log(db, self))
 						xt_throw(self);
 					ss->ss_flush_pending = FALSE;
 				}
@@ -2516,7 +2679,7 @@ static void xn_sw_main(XTThreadPtr self)
 	}
 
 	if (ss->ss_flush_pending) {
-		xt_xlog_flush_log(self);
+		xt_xlog_flush_log(db, self);
 		ss->ss_flush_pending = FALSE;
 	}
 

=== modified file 'storage/pbxt/src/xaction_xt.h'
--- a/storage/pbxt/src/xaction_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/xaction_xt.h	2009-11-24 10:55:06 +0000
@@ -87,6 +87,7 @@ struct XTOpenTable;
 #define XT_XN_XAC_CLEANED		8					/* The transaction has been cleaned. */
 #define XT_XN_XAC_RECOVERED		16					/* This transaction was detected on recovery. */
 #define XT_XN_XAC_SWEEP			32					/* End ID has been set, OK to sweep. */
+#define XT_XN_XAC_PREPARED		64					/* The transaction was prepared (used only by recovery). */
 
 #define XT_XN_VISIBLE			0					/* The transaction is committed, and the record is visible. */
 #define XT_XN_NOT_VISIBLE		1					/* The transaction is committed, but not visible. */
@@ -95,6 +96,24 @@ struct XTOpenTable;
 #define XT_XN_OTHER_UPDATE		4					/* The record was updated by someone else. */
 #define XT_XN_REREAD			5					/* The transaction is not longer in RAM, status is unkown, retry. */
 
+typedef struct XTXactPrepare {
+	xtXactID					xp_xact_id;
+	xtWord4						xp_hash;
+	struct XTXactPrepare		*xp_next;			/* Next item in hash table. */
+	int							xp_data_len;
+	xtWord1						xp_xa_data[XT_MAX_XA_DATA_SIZE];
+} XTXactPrepareRec, *XTXactPreparePtr;
+
+typedef struct XTXactXA {
+	xtXactID					xx_xact_id;
+	XTXactPreparePtr			xx_xa_ptr;
+} XTXactXARec, *XTXactXAPtr;
+
+typedef struct XTXactEnumXA {
+	u_int						exa_index;
+	xtBool						exa_locked;
+} XTXactEnumXARec, *XTXactEnumXAPtr;
+
 typedef struct XTXactData {
 	xtXactID					xd_start_xn_id;			/* Note: may be zero!. */
 	xtXactID					xd_end_xn_id;			/* Note: may be zero!. */
@@ -105,6 +124,7 @@ typedef struct XTXactData {
 	int							xd_flags;
 	xtWord4						xd_end_time;
 	xtThreadID					xd_thread_id;
+	xtWord4						xd_xa_hash;				/* 0 if no XA transaction. */
 
 	/* A transaction may be indexed twice in the hash table.
 	 * Once on the start sequence number, and once on the
@@ -123,7 +143,7 @@ typedef struct XTXactData {
 
 #if defined(XT_XACT_USE_PTHREAD_RW)
 #define XT_XACT_LOCK_TYPE				xt_rwlock_type
-#define XT_XACT_INIT_LOCK(s, i)			xt_init_rwlock(s, i)
+#define XT_XACT_INIT_LOCK(s, i)			xt_init_rwlock_with_autoname(s, i)
 #define XT_XACT_FREE_LOCK(s, i)			xt_free_rwlock(i)	
 #define XT_XACT_READ_LOCK(i, s)			xt_slock_rwlock_ns(i)
 #define XT_XACT_WRITE_LOCK(i, s)		xt_xlock_rwlock_ns(i)
@@ -183,6 +203,14 @@ void			xt_xn_wakeup_thread(xtThreadID th
 xtXactID		xt_xn_get_curr_id(struct XTDatabase *db);
 xtWord8			xt_xn_bytes_to_sweep(struct XTDatabase *db, struct XTThread *thread);
 
+int				xt_xn_xa_compare(struct XTThread *self, register const void *thunk, register const void *a, register const void *b);
+xtBool			xt_xn_prepare(int len, xtWord1 *xa_data, struct XTThread *thread);
+xtBool			xt_xn_store_xa_data(struct XTDatabase *db, xtXactID xn_id, int len, xtWord1 *xa_data, struct XTThread *thread);
+void			xt_xn_delete_xa_data_by_xact(struct XTDatabase *db, xtXactID xact_id, struct XTThread *thread);
+void			xt_xn_delete_xa_data(struct XTDatabase *db, XTXactPreparePtr xap, xtBool unlock, struct XTThread *thread);
+XTXactPreparePtr	xt_xn_find_xa_data(struct XTDatabase *db, int len, xtWord1 *xa_data, xtBool lock, struct XTThread *thread);
+XTXactPreparePtr	xt_xn_enum_xa_data(struct XTDatabase *db, XTXactEnumXAPtr exa);
+
 XTXactDataPtr	xt_xn_add_old_xact(struct XTDatabase *db, xtXactID xn_id, struct XTThread *thread);
 XTXactDataPtr	xt_xn_get_xact(struct XTDatabase *db, xtXactID xn_id, struct XTThread *thread);
 xtBool			xt_xn_delete_xact(struct XTDatabase *db, xtXactID xn_id, struct XTThread *thread);

=== modified file 'storage/pbxt/src/xactlog_xt.cc'
--- a/storage/pbxt/src/xactlog_xt.cc	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/xactlog_xt.cc	2009-11-24 10:55:06 +0000
@@ -1108,6 +1108,9 @@ xtBool XTDatabaseLog::xlog_append(XTThre
 		if ((part_size = xl_write_buf_pos % 512)) {
 			part_size = 512 - part_size;
 			xl_write_buffer[xl_write_buf_pos] = XT_LOG_ENT_END_OF_LOG;
+#ifdef HAVE_valgrind
+			memset(xl_write_buffer + xl_write_buf_pos + 1, 0x66, part_size);
+#endif
 			if (!xt_pwrite_file(xl_log_file, xl_write_log_offset, xl_write_buf_pos+part_size, xl_write_buffer, &thread->st_statistics.st_xlog, thread))
 				goto write_failed;			
 		}
@@ -1477,9 +1480,9 @@ void XTDatabaseLog::xlog_name(size_t siz
  * T H R E A D   T R A N S A C T I O N   B U F F E R
  */
 
-xtPublic xtBool xt_xlog_flush_log(XTThreadPtr thread)
+xtPublic xtBool xt_xlog_flush_log(struct XTDatabase *db, XTThreadPtr thread)
 {
-	return thread->st_database->db_xlog.xlog_flush(thread);
+	return db->db_xlog.xlog_flush(thread);
 }
 
 xtPublic xtBool xt_xlog_log_data(XTThreadPtr thread, size_t size, XTXactLogBufferDPtr log_entry, xtBool commit)
@@ -1488,15 +1491,14 @@ xtPublic xtBool xt_xlog_log_data(XTThrea
 }
 
 /* Allocate a record from the free list. */
-xtPublic xtBool xt_xlog_modify_table(struct XTOpenTable *ot, u_int status, xtOpSeqNo op_seq, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *data)
+xtPublic xtBool xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *data, XTThreadPtr thread)
 {
 	XTXactLogBufferDRec	log_entry;
-	XTThreadPtr			thread = ot->ot_thread;
-	XTTableHPtr			tab = ot->ot_table;
 	size_t				len;
 	xtWord4				sum = 0;
 	int					check_size = 1;
 	XTXactDataPtr		xact = NULL;
+	xtBool				commit = FALSE;
 
 	switch (status) {
 		case XT_LOG_ENT_REC_MODIFIED:
@@ -1505,7 +1507,7 @@ xtPublic xtBool xt_xlog_modify_table(str
 		case XT_LOG_ENT_DELETE:
 			check_size = 2;
 			XT_SET_DISK_4(log_entry.xu.xu_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xu.xu_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xu.xu_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xu.xu_rec_id_4, rec_id);
 			XT_SET_DISK_2(log_entry.xu.xu_size_2, size);
 			len = offsetof(XTactUpdateEntryDRec, xu_rec_type_1);
@@ -1521,7 +1523,7 @@ xtPublic xtBool xt_xlog_modify_table(str
 		case XT_LOG_ENT_DELETE_FL:
 			check_size = 2;
 			XT_SET_DISK_4(log_entry.xf.xf_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xf.xf_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xf.xf_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xf.xf_rec_id_4, rec_id);
 			XT_SET_DISK_2(log_entry.xf.xf_size_2, size);
 			XT_SET_DISK_4(log_entry.xf.xf_free_rec_id_4, free_rec_id);
@@ -1539,14 +1541,14 @@ xtPublic xtBool xt_xlog_modify_table(str
 		case XT_LOG_ENT_REC_REMOVED_EXT:
 			ASSERT_NS(size == 1 + XT_XACT_ID_SIZE + sizeof(XTTabRecFreeDRec));
 			XT_SET_DISK_4(log_entry.fr.fr_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.fr.fr_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.fr.fr_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.fr.fr_rec_id_4, rec_id);
 			len = offsetof(XTactFreeRecEntryDRec, fr_stat_id_1);
 			break;
 		case XT_LOG_ENT_REC_REMOVED_BI:
 			check_size = 2;
 			XT_SET_DISK_4(log_entry.rb.rb_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.rb.rb_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.rb.rb_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.rb.rb_rec_id_4, rec_id);
 			XT_SET_DISK_2(log_entry.rb.rb_size_2, size);
 			log_entry.rb.rb_new_rec_type_1 = (xtWord1) free_rec_id;
@@ -1556,42 +1558,42 @@ xtPublic xtBool xt_xlog_modify_table(str
 		case XT_LOG_ENT_REC_MOVED:
 			ASSERT_NS(size == 8);
 			XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 			len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 			break;
 		case XT_LOG_ENT_REC_CLEANED:
 			ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
 			XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 			len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 			break;
 		case XT_LOG_ENT_REC_CLEANED_1:
 			ASSERT_NS(size == 1);
 			XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 			len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 			break;
 		case XT_LOG_ENT_REC_UNLINKED:
 			ASSERT_NS(size == offsetof(XTTabRecHeadDRec, tr_prev_rec_id_4) + XT_RECORD_ID_SIZE);
 			XT_SET_DISK_4(log_entry.xw.xw_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xw.xw_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xw.xw_rec_id_4, rec_id);
 			len = offsetof(XTactWriteRecEntryDRec, xw_rec_type_1);
 			break;
 		case XT_LOG_ENT_ROW_NEW:
 			ASSERT_NS(size == 0);
 			XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
 			len = offsetof(XTactRowAddedEntryDRec, xa_row_id_4) + XT_ROW_ID_SIZE;
 			break;
 		case XT_LOG_ENT_ROW_NEW_FL:
 			ASSERT_NS(size == 0);
 			XT_SET_DISK_4(log_entry.xa.xa_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.xa.xa_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.xa.xa_row_id_4, rec_id);
 			XT_SET_DISK_4(log_entry.xa.xa_free_list_4, free_rec_id);
 			sum ^= XT_CHECKSUM4_REC(free_rec_id);
@@ -1602,10 +1604,17 @@ xtPublic xtBool xt_xlog_modify_table(str
 		case XT_LOG_ENT_ROW_FREED:
 			ASSERT_NS(size == sizeof(XTTabRowRefDRec));
 			XT_SET_DISK_4(log_entry.wr.wr_op_seq_4, op_seq);
-			XT_SET_DISK_4(log_entry.wr.wr_tab_id_4, tab->tab_id);
+			XT_SET_DISK_4(log_entry.wr.wr_tab_id_4, tab_id);
 			XT_SET_DISK_4(log_entry.wr.wr_row_id_4, rec_id);
 			len = offsetof(XTactWriteRowEntryDRec, wr_ref_id_4);
 			break;
+		case XT_LOG_ENT_PREPARE:
+			check_size = 2;
+			XT_SET_DISK_4(log_entry.xp.xp_xact_id_4, op_seq);
+			log_entry.xp.xp_xa_len_1 = (xtWord1) size;
+			len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
+			commit = TRUE;
+			break;
 		default:
 			ASSERT_NS(FALSE);
 			len = 0;
@@ -1615,7 +1624,7 @@ xtPublic xtBool xt_xlog_modify_table(str
 	xtWord1	*dptr = data;
 	xtWord4	g;
 
-	sum ^= op_seq ^ (tab->tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
+	sum ^= op_seq ^ (tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
 	if ((g = sum & 0xF0000000)) {
 		sum = sum ^ (g >> 24);
 		sum = sum ^ g;
@@ -1643,9 +1652,9 @@ xtPublic xtBool xt_xlog_modify_table(str
 	xt_print_log_record(0, 0, &log_entry);
 #endif
 	if (xact)
-		return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, FALSE, &xact->xd_begin_log, &xact->xd_begin_offset);
+		return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, &xact->xd_begin_log, &xact->xd_begin_offset);
 
-	return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, FALSE, NULL, NULL);
+	return thread->st_database->db_xlog.xlog_append(thread, len, (xtWord1 *) &log_entry, size, data, commit, NULL, NULL);
 }
 
 /*
@@ -1905,6 +1914,7 @@ xtBool XTDatabaseLog::xlog_verify(XTXact
 	xtRecordID	rec_id, free_rec_id;
 	int			check_size = 1;
 	xtWord1		*dptr;
+	xtWord4		g;
 
 	switch (record->xh.xh_status_1) {
 		case XT_LOG_ENT_HEADER:
@@ -2019,13 +2029,19 @@ xtBool XTDatabaseLog::xlog_verify(XTXact
 			return record->xe.xe_checksum_1 == (XT_CHECKSUM_1(sum) ^ XT_CHECKSUM_1(log_id));
 		case XT_LOG_ENT_END_OF_LOG:
 			return FALSE;
+		case XT_LOG_ENT_PREPARE:
+			check_size = 2;
+			op_seq = XT_GET_DISK_4(record->xp.xp_xact_id_4);
+			tab_id = 0;
+			rec_id = 0;
+			dptr = record->xp.xp_xa_data;
+			rec_size -= offsetof(XTXactPrepareEntryDRec, xp_xa_data);
+			break;
 		default:
 			ASSERT_NS(FALSE);
 			return FALSE;
 	}
 
-	xtWord4	g;
-
 	sum ^= (xtWord4) op_seq ^ ((xtWord4) tab_id << 8) ^ XT_CHECKSUM4_REC(rec_id);
 
 	if ((g = sum & 0xF0000000)) {
@@ -2193,6 +2209,14 @@ xtBool XTDatabaseLog::xlog_seq_next(XTXa
 			}
 			goto return_empty;
 		}
+		case XT_LOG_ENT_PREPARE:
+			check_size = 2;
+			len = offsetof(XTXactPrepareEntryDRec, xp_xa_data);
+			if (len > max_rec_len)
+				/* The size is not in the buffer: */
+				goto read_more;
+			len += (size_t) record->xp.xp_xa_len_1;
+			break;
 		default:
 			/* It is possible to land here after a crash, if the
 			 * log was not completely written.
@@ -2231,7 +2255,7 @@ xtBool XTDatabaseLog::xlog_seq_next(XTXa
 		goto return_empty;
 	}
 
-	/* The record is not completely in the buffer: */
+	/* The record is now completely in the buffer: */
 	seq->xseq_record_len = len;
 	*ret_entry = (XTXactLogBufferDPtr) seq->xseq_buffer;
 	return OK;
@@ -2428,7 +2452,7 @@ static void xlog_wr_wait_for_log_flush(X
 	if (reason == XT_LOG_CACHE_FULL || reason == XT_TIME_TO_WRITE || reason == XT_CHECKPOINT_REQ) {
 		/* Make sure that we have something to write: */
 		if (db->db_xlog.xlog_bytes_to_write() < 2 * 1204 * 1024)
-			xt_xlog_flush_log(self);
+			xt_xlog_flush_log(db, self);
 	}
 
 #ifdef TRACE_WRITER_ACTIVITY
@@ -2529,6 +2553,7 @@ static void xlog_wr_main(XTThreadPtr sel
 					case XT_LOG_ENT_ABORT:
 					case XT_LOG_ENT_CLEANUP:
 					case XT_LOG_ENT_OP_SYNC:
+					case XT_LOG_ENT_PREPARE:
 						break;
 					case XT_LOG_ENT_DEL_LOG:
 						xtLogID log_id;

=== modified file 'storage/pbxt/src/xactlog_xt.h'
--- a/storage/pbxt/src/xactlog_xt.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/xactlog_xt.h	2009-11-24 10:55:06 +0000
@@ -160,6 +160,7 @@ typedef struct XTXLogCache {
 #define XT_LOG_ENT_END_OF_LOG		37					/* This is a record that indicates the end of the log, and
 														 * fills to the end of a 512 byte block.
 														 */
+#define XT_LOG_ENT_PREPARE			39					/* XA prepare log entry. */
 
 #define XT_LOG_FILE_MAGIC			0xAE88FE12
 #define XT_LOG_VERSION_NO			1
@@ -201,6 +202,14 @@ typedef struct XTXactEndEntry {
 	XTDiskValue4			xe_not_used_4;		/* Was the end sequence number (no longer used - v1.0.04+), set to zero). */
 } XTXactEndEntryDRec, *XTXactEndEntryDPtr;
 
+typedef struct XTXactPrepareEntry {
+	xtWord1					xp_status_1;		/* XT_LOG_ENT_PREPARE */
+	XTDiskValue2			xp_checksum_2;		
+	XTDiskValue4			xp_xact_id_4;		/* The transaction. */
+	xtWord1					xp_xa_len_1;		/* The length of the XA data. */
+	xtWord1					xp_xa_data[XT_MAX_XA_DATA_SIZE];
+} XTXactPrepareEntryDRec, *XTXactPrepareEntryDPtr;
+
 typedef struct XTXactCleanupEntry {
 	xtWord1					xc_status_1;		/* XT_LOG_ENT_CLEANUP */
 	xtWord1					xc_checksum_1;		
@@ -344,6 +353,7 @@ typedef union XTXactLogBuffer {
 	XTactOpSyncEntryDRec	os;
 	XTactExtRecEntryDRec	er;
 	XTactNoOpEntryDRec		no;
+	XTXactPrepareEntryDRec	xp;
 } XTXactLogBufferDRec, *XTXactLogBufferDPtr;
 
 /* ---------------------------------------- */
@@ -453,9 +463,9 @@ private:
 	xtBool					xlog_open_log(xtLogID log_id, off_t curr_eof, struct XTThread *thread);
 } XTDatabaseLogRec, *XTDatabaseLogPtr;
 
-xtBool			xt_xlog_flush_log(struct XTThread *thread);
+xtBool			xt_xlog_flush_log(struct XTDatabase *db, struct XTThread *thread);
 xtBool			xt_xlog_log_data(struct XTThread *thread, size_t len, XTXactLogBufferDPtr log_entry, xtBool commit);
-xtBool			xt_xlog_modify_table(struct XTOpenTable *ot, u_int status, xtOpSeqNo op_seq, xtRecordID free_list, xtRecordID address, size_t size, xtWord1 *data);
+xtBool			xt_xlog_modify_table(xtTableID tab_id, u_int status, xtOpSeqNo op_seq, xtRecordID free_list, xtRecordID address, size_t size, xtWord1 *data, struct XTThread *thread);
 
 void			xt_xlog_init(struct XTThread *self, size_t cache_size);
 void			xt_xlog_exit(struct XTThread *self);

=== modified file 'storage/pbxt/src/xt_config.h'
--- a/storage/pbxt/src/xt_config.h	2009-08-31 11:07:44 +0000
+++ b/storage/pbxt/src/xt_config.h	2009-11-24 10:55:06 +0000
@@ -50,7 +50,9 @@ const int max_connections = 500;
 /*
  * Make sure we use the thread safe version of the library.
  */
+#ifndef _THREAD_SAFE // Seems to be defined by some Drizzle header
 #define _THREAD_SAFE
+#endif
 
 /*
  * This causes things to be defined like stuff in inttypes.h
@@ -72,12 +74,12 @@ const int max_connections = 500;
 #define XT_MAC
 #endif
 
-#if defined(MSDOS) || defined(__WIN__)
+#if defined(MSDOS) || defined(__WIN__) || defined(_WIN64)
 #define XT_WIN
 #endif
 
 #ifdef XT_WIN
-#ifdef _DEBUG
+#if defined(_DEBUG) && !defined(DEBUG)
 #define DEBUG
 #endif // _DEBUG
 #else
@@ -101,8 +103,13 @@ const int max_connections = 500;
  * Definition of which atomic operations to use:
  */
 #ifdef XT_WIN
+#ifdef _WIN64
+/* 64-bit Windows atomic ops are not yet supported: */
+#define XT_NO_ATOMICS
+#else
 /* MS Studio style embedded assembler for x86 */
 #define XT_ATOMIC_WIN32_X86
+#endif
 #elif defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
 /* Use GNU style embedded assembler for x86 */
 #define XT_ATOMIC_GNUC_X86
@@ -115,4 +122,10 @@ const int max_connections = 500;
 #define XT_NO_ATOMICS
 #endif
 
+#ifndef DRIZZLED
+#if MYSQL_VERSION_ID >= 50404
+#define MYSQL_SUPPORTS_BACKUP
+#endif
+#endif
+
 #endif

=== modified file 'storage/pbxt/src/xt_defs.h'
--- a/storage/pbxt/src/xt_defs.h	2009-08-17 11:12:36 +0000
+++ b/storage/pbxt/src/xt_defs.h	2009-11-24 10:55:06 +0000
@@ -379,6 +379,19 @@ typedef struct XTPathStr {
  */
 //#define XT_IMPLEMENT_NO_ACTION
 
+/* Define this value if online-backup should be supported.
+ * Note that, online backup is currently only supported
+ * by MySQL 6.0.9 or later
+ */
+#define XT_ENABLE_ONLINE_BACKUP
+
+/* Define this switch if you don't want to use atomic
+ * synchronisation.
+ */
+#ifndef XT_NO_ATOMICS
+//#define XT_NO_ATOMICS
+#endif
+
 /* ----------------------------------------------------------------------
  * GLOBAL CONSTANTS
  */
@@ -406,6 +419,8 @@ typedef struct XTPathStr {
 
 #define XT_ADD_PTR(p, l)				((void *) ((char *) (p) + (l)))
 
+#define XT_MAX_XA_DATA_SIZE				(3*4 + 128)			/* Corresponds to the maximum size of struct xid_t in handler.h. */
+
 /* ----------------------------------------------------------------------
  * DEFINES DEPENDENT ON  CONSTANTS
  */
@@ -744,6 +759,7 @@ extern xtBool				pbxt_crash_debug;
 #define MYSQL_PLUGIN_VAR_HEADER				DRIZZLE_PLUGIN_VAR_HEADER
 #define MYSQL_SYSVAR_STR					DRIZZLE_SYSVAR_STR
 #define MYSQL_SYSVAR_INT					DRIZZLE_SYSVAR_INT
+#define MYSQL_SYSVAR_BOOL					DRIZZLE_SYSVAR_BOOL
 #define MYSQL_SYSVAR						DRIZZLE_SYSVAR
 #define MYSQL_STORAGE_ENGINE_PLUGIN			DRIZZLE_STORAGE_ENGINE_PLUGIN
 #define MYSQL_INFORMATION_SCHEMA_PLUGIN		DRIZZLE_INFORMATION_SCHEMA_PLUGIN
@@ -752,9 +768,8 @@ extern xtBool				pbxt_crash_debug;
 
 #define mx_tmp_use_all_columns(x, y)		(x)->use_all_columns(y)
 #define mx_tmp_restore_column_map(x, y)		(x)->restore_column_map(y)
-#define MX_BIT_FAST_TEST_AND_SET(x, y)		bitmap_test_and_set(x, y)
 
-#define MX_TABLE_TYPES_T					handler::Table_flags
+#define MX_TABLE_TYPES_T					Cursor::Table_flags
 #define MX_UINT8_T							uint8_t
 #define MX_ULONG_T							uint32_t
 #define MX_ULONGLONG_T						uint64_t
@@ -762,6 +777,10 @@ extern xtBool				pbxt_crash_debug;
 #define MX_CHARSET_INFO						struct charset_info_st
 #define MX_CONST_CHARSET_INFO				const struct charset_info_st			
 #define MX_CONST							const
+#define MX_BITMAP							MyBitmap
+#define MX_BIT_SIZE()						numOfBitsInMap()
+#define MX_BIT_SET(x, y)					(x)->setBit(y)
+#define MX_BIT_FAST_TEST_AND_SET(x, y)				(x)->testAndSet(y)
 
 #define my_bool								bool
 #define int16								int16_t
@@ -771,6 +790,7 @@ extern xtBool				pbxt_crash_debug;
 #define uchar								unsigned char
 #define longlong							int64_t
 #define ulonglong							uint64_t
+#define handler								Cursor
 
 #define HAVE_LONG_LONG
 
@@ -823,10 +843,13 @@ extern xtBool				pbxt_crash_debug;
 
 class PBXTStorageEngine;
 typedef PBXTStorageEngine handlerton;
+class Session;
+
+extern "C" void session_mark_transaction_to_rollback(Session *session, bool all);
 
 #else // DRIZZLED
 /* The MySQL case: */
-#if MYSQL_VERSION_ID >= 60008
+#if MYSQL_VERSION_ID >= 50404
 #define STRUCT_TABLE						struct TABLE
 #else
 #define STRUCT_TABLE						struct st_table
@@ -844,13 +867,13 @@ typedef PBXTStorageEngine handlerton;
 #define MX_CHARSET_INFO						CHARSET_INFO
 #define MX_CONST_CHARSET_INFO				struct charset_info_st			
 #define MX_CONST							
+#define MX_BITMAP							MY_BITMAP
+#define MX_BIT_SIZE()						n_bits
+#define MX_BIT_SET(x, y)					bitmap_set_bit(x, y)
 
 #endif // DRIZZLED
 
-#define MX_BITMAP							MY_BITMAP
-#define MX_BIT_SIZE()						n_bits
 #define MX_BIT_IS_SUBSET(x, y)				bitmap_is_subset(x, y)
-#define MX_BIT_SET(x, y)					bitmap_set_bit(x, y)
 
 #ifndef XT_SCAN_CORE_DEFINED
 #define XT_SCAN_CORE_DEFINED

=== modified file 'storage/pbxt/src/xt_errno.h'
--- a/storage/pbxt/src/xt_errno.h	2009-09-03 06:15:03 +0000
+++ b/storage/pbxt/src/xt_errno.h	2009-11-24 10:55:06 +0000
@@ -119,6 +119,9 @@
 #define XT_ERR_FK_REF_TEMP_TABLE	-95
 #define XT_ERR_MYSQL_SHUTDOWN		-98
 #define XT_ERR_MYSQL_NO_THREAD		-99
+#define XT_ERR_BUFFER_TOO_SMALL		-100
+#define XT_ERR_BAD_BACKUP_FORMAT	-101
+#define XT_ERR_PBXT_NOT_INSTALLED	-102
 
 #ifdef XT_WIN
 #define XT_ENOMEM					ERROR_NOT_ENOUGH_MEMORY