← Back to team overview

maria-developers team mailing list archive

bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2849)

 

#At lp:maria

 2849 knielsen@xxxxxxxxxxxxxxx	2010-05-26
      Preliminary commit of group commit proof-of-concept.
      modified:
        sql/handler.cc
        sql/handler.h
        sql/log.cc
        sql/log.h
        sql/log_event.h
        sql/sql_class.cc
        sql/sql_class.h
        sql/sql_load.cc
        sql/table.cc
        sql/table.h
        storage/xtradb/handler/ha_innodb.cc

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2010-04-06 22:47:08 +0000
+++ b/sql/handler.cc	2010-05-26 08:13:32 +0000
@@ -76,6 +76,7 @@ TYPELIB tx_isolation_typelib= {array_ele
 static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
 uint known_extensions_id= 0;
 
+static int commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered);
 
 
 static plugin_ref ha_default_plugin(THD *thd)
@@ -544,6 +545,26 @@ err:
   DBUG_RETURN(1);
 }
 
+/*
+  This is a queue of THDs waiting for being group committed with
+  tc_log->group_log_xid().
+*/
+static THD *group_commit_queue;
+/*
+  This mutex protects the group_commit_queue on platforms without native
+  atomic operations.
+ */
+static pthread_mutex_t LOCK_group_commit_queue;
+/* This mutex is used to serialize calls to handler prepare_ordered methods. */
+static pthread_mutex_t LOCK_prepare_ordered;
+/* This mutex is used to serialize calls to handler commit_ordered methods. */
+static pthread_mutex_t LOCK_commit_ordered;
+/* This mutex is used to serialize calls to group_log_xid(). */
+static pthread_mutex_t LOCK_group_commit;
+static pthread_cond_t COND_group_commit;
+
+static bool mutexes_inited= FALSE;
+
 int ha_init()
 {
   int error= 0;
@@ -557,6 +578,19 @@ int ha_init()
   */
   opt_using_transactions= total_ha>(ulong)opt_bin_log;
   savepoint_alloc_size+= sizeof(SAVEPOINT);
+
+  group_commit_queue= NULL;
+  my_pthread_mutex_init(&LOCK_group_commit_queue, MY_MUTEX_INIT_FAST,
+                        "LOCK_group_commit_queue", MYF(0));
+  my_pthread_mutex_init(&LOCK_prepare_ordered, MY_MUTEX_INIT_SLOW,
+                        "LOCK_prepare_ordered", MYF(0));
+  my_pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_SLOW,
+                        "LOCK_commit_ordered", MYF(0));
+  my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
+                        "LOCK_group_commit", MYF(0));
+  pthread_cond_init(&COND_group_commit, 0);
+  mutexes_inited= TRUE;
+
   DBUG_RETURN(error);
 }
 
@@ -574,6 +608,15 @@ int ha_end()
   if (ha_finish_errors())
     error= 1;
 
+  if (mutexes_inited)
+  {
+    pthread_mutex_destroy(&LOCK_group_commit_queue);
+    pthread_mutex_destroy(&LOCK_prepare_ordered);
+    pthread_mutex_destroy(&LOCK_commit_ordered);
+    pthread_mutex_destroy(&LOCK_group_commit);
+    mutexes_inited= FALSE;
+  }
+
   DBUG_RETURN(error);
 }
 
@@ -1053,6 +1096,108 @@ ha_check_and_coalesce_trx_read_only(THD 
   return rw_ha_count;
 }
 
+/*
+  Atomically enqueue a THD at the head of the queue of threads waiting to
+  group commit, and return the previous head of the queue.
+*/
+static THD *
+enqueue_atomic(THD *thd)
+{
+  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+  thd->next_commit_ordered= group_commit_queue;
+  while (!my_atomic_casptr((void **)(&group_commit_queue),
+                           (void **)(&thd->next_commit_ordered),
+                           thd))
+    ;
+  my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
+  return thd->next_commit_ordered;
+}
+
+static THD *
+atomic_grab_reverse_queue()
+{
+  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+  THD *queue= group_commit_queue;
+  while (!my_atomic_casptr((void **)(&group_commit_queue),
+                           (void **)(&queue),
+                           NULL))
+    ;
+  my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
+
+  /*
+    Since we enqueue at the head, the queue is actually in reverse order.
+    So reverse it back into correct commit order before returning.
+  */
+  THD *prev= NULL;
+  while (queue)
+  {
+    THD *next= queue->next_commit_ordered;
+    queue->next_commit_ordered= prev;
+    prev= queue;
+    queue= next;
+  }
+
+  return prev;
+}
+
+static void
+call_commit_ordered(Ha_trx_info *ha_info, THD *thd, bool all)
+{
+  for (; ha_info; ha_info= ha_info->next())
+  {
+    handlerton *ht= ha_info->ht();
+    if (!ht->commit_ordered)
+      continue;
+    ht->commit_ordered(ht, thd, all);
+  }
+}
+
+static void
+group_commit_wait_for_wakeup(THD *thd)
+{
+  pthread_mutex_lock(&thd->LOCK_commit_ordered);
+  while (!thd->group_commit_ready)
+    pthread_cond_wait(&thd->COND_commit_ordered,
+                      &thd->LOCK_commit_ordered);
+  pthread_mutex_unlock(&thd->LOCK_commit_ordered);
+}
+
+static void
+group_commit_wakeup_other(THD *other_thd)
+{
+  pthread_mutex_lock(&other_thd->LOCK_commit_ordered);
+  other_thd->group_commit_ready= TRUE;
+  pthread_cond_signal(&other_thd->COND_commit_ordered);
+  pthread_mutex_unlock(&other_thd->LOCK_commit_ordered);
+}
+
+static bool group_commit_queue_busy= 0;
+
+static void
+group_commit_mark_queue_idle()
+{
+  pthread_mutex_lock(&LOCK_group_commit);
+  group_commit_queue_busy= FALSE;
+  pthread_cond_signal(&COND_group_commit);
+  pthread_mutex_unlock(&LOCK_group_commit);
+}
+
+static void
+group_commit_mark_queue_busy()
+{
+  safe_mutex_assert_owner(&LOCK_group_commit);
+  group_commit_queue_busy= TRUE;
+}
+
+static void
+group_commit_wait_queue_idle()
+{
+  /* Wait for any existing queue run to finish. */
+  safe_mutex_assert_owner(&LOCK_group_commit);
+  while (group_commit_queue_busy)
+    pthread_cond_wait(&COND_group_commit, &LOCK_group_commit);
+}
+
 
 /**
   @retval
@@ -1070,7 +1215,7 @@ ha_check_and_coalesce_trx_read_only(THD 
 */
 int ha_commit_trans(THD *thd, bool all)
 {
-  int error= 0, cookie= 0;
+  int error= 0;
   /*
     'all' means that this is either an explicit commit issued by
     user, or an implicit commit issued by a DDL.
@@ -1085,7 +1230,10 @@ int ha_commit_trans(THD *thd, bool all)
   */
   bool is_real_trans= all || thd->transaction.all.ha_list == 0;
   Ha_trx_info *ha_info= trans->ha_list;
-  my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
+  bool need_prepare_ordered, need_commit_ordered;
+  bool need_enqueue;
+  bool is_group_commit_leader;
+  my_xid xid;
   DBUG_ENTER("ha_commit_trans");
 
   /*
@@ -1118,85 +1266,277 @@ int ha_commit_trans(THD *thd, bool all)
     DBUG_RETURN(2);
   }
 #ifdef USING_TRANSACTIONS
-  if (ha_info)
+  if (!ha_info)
   {
-    uint rw_ha_count;
-    bool rw_trans;
+    /* Free resources and perform other cleanup even for 'empty' transactions. */
+    if (is_real_trans)
+      thd->transaction.cleanup();
+    DBUG_RETURN(0);
+  }
 
-    DBUG_EXECUTE_IF("crash_commit_before", abort(););
+  DBUG_EXECUTE_IF("crash_commit_before", abort(););
 
-    /* Close all cursors that can not survive COMMIT */
-    if (is_real_trans)                          /* not a statement commit */
-      thd->stmt_map.close_transient_cursors();
+  /* Close all cursors that can not survive COMMIT */
+  if (is_real_trans)                          /* not a statement commit */
+    thd->stmt_map.close_transient_cursors();
 
-    rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
-    /* rw_trans is TRUE when we in a transaction changing data */
-    rw_trans= is_real_trans && (rw_ha_count > 0);
+  uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
+  /* rw_trans is TRUE when we in a transaction changing data */
+  bool rw_trans= is_real_trans && (rw_ha_count > 0);
 
-    if (rw_trans &&
-        wait_if_global_read_lock(thd, 0, 0))
-    {
-      ha_rollback_trans(thd, all);
-      DBUG_RETURN(1);
-    }
+  if (rw_trans &&
+      wait_if_global_read_lock(thd, 0, 0))
+  {
+    ha_rollback_trans(thd, all);
+    DBUG_RETURN(1);
+  }
+
+  if (rw_trans &&
+      opt_readonly &&
+      !(thd->security_ctx->master_access & SUPER_ACL) &&
+      !thd->slave_thread)
+  {
+    my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
+    goto err;
+  }
 
-    if (rw_trans &&
-        opt_readonly &&
-        !(thd->security_ctx->master_access & SUPER_ACL) &&
-        !thd->slave_thread)
+  if (trans->no_2pc || (rw_ha_count <= 1))
+  {
+    error= ha_commit_one_phase(thd, all);
+    DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+    goto end;
+  }
+
+  need_prepare_ordered= FALSE;
+  need_commit_ordered= FALSE;
+  xid= thd->transaction.xid_state.xid.get_my_xid();
+
+  for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
+  {
+    int err;
+    handlerton *ht= hi->ht();
+    /*
+      Do not call two-phase commit if this particular
+      transaction is read-only. This allows for simpler
+      implementation in engines that are always read-only.
+    */
+    if (! hi->is_trx_read_write())
+      continue;
+    /*
+      Sic: we know that prepare() is not NULL since otherwise
+      trans->no_2pc would have been set.
+    */
+    if ((err= ht->prepare(ht, thd, all)))
+      my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+    status_var_increment(thd->status_var.ha_prepare_count);
+
+    if (err)
+      goto err;
+
+    if (ht->prepare_ordered)
+      need_prepare_ordered= TRUE;
+    if (ht->commit_ordered)
+      need_commit_ordered= TRUE;
+  }
+  DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT(););
+
+  if (!is_real_trans)
+  {
+    error= commit_one_phase_2(thd, all, FALSE);
+    DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+    goto end;
+  }
+
+  /*
+    We can optimise away some of the thread synchronisation that may not be
+    needed.
+
+    If need_prepare_ordered, then we need to take LOCK_prepare_ordered.
+
+    If (xid && use_group_log_xid), then we need to enqueue (and this must
+    be done under LOCK_prepare_ordered if we take that lock).
+
+    Similarly, if (need_prepare_ordered && need_commit_ordered), then we
+    need to enqueue under the LOCK_prepare_ordered.
+
+    If (xid && use_group_log_xid), then we need to take LOCK_group_commit.
+
+    If need_commit_ordered, then we need to take LOCK_commit_ordered.
+
+    Cases not covered by above can be skipped to optimise things a bit.
+  */
+  need_enqueue= (xid && tc_log->use_group_log_xid) ||
+    (need_prepare_ordered && need_commit_ordered);
+
+  thd->group_commit_ready= FALSE;
+  thd->group_commit_all= all;
+  if (need_prepare_ordered)
+  {
+    pthread_mutex_lock(&LOCK_prepare_ordered);
+
+    for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
     {
-      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
-      ha_rollback_trans(thd, all);
-      error= 1;
-      goto end;
+      int err;
+      handlerton *ht= hi->ht();
+      if (! hi->is_trx_read_write())
+        continue;
+      if (ht->prepare_ordered && (err= ht->prepare_ordered(ht, thd, all)))
+      {
+        my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+        pthread_mutex_unlock(&LOCK_prepare_ordered);
+        goto err;
+      }
     }
+  }
+  if (need_enqueue)
+  {
+    THD *previous_queue= enqueue_atomic(thd);
+    is_group_commit_leader= (previous_queue == NULL);
+  }
+  if (need_prepare_ordered)
+    pthread_mutex_unlock(&LOCK_prepare_ordered);
 
-    if (!trans->no_2pc && (rw_ha_count > 1))
+  int cookie;
+  if (tc_log->use_group_log_xid)
+  {
+    if (is_group_commit_leader)
     {
-      for (; ha_info && !error; ha_info= ha_info->next())
+      pthread_mutex_lock(&LOCK_group_commit);
+      group_commit_wait_queue_idle();
+
+      THD *queue= atomic_grab_reverse_queue();
+      /* The first in the queue is the leader. */
+      DBUG_ASSERT(queue == thd);
+
+      /*
+        This will set individual error codes in each thd->xid_error, as
+        well as set thd->xid_cookie for later unlog() call.
+      */
+      tc_log->group_log_xid(queue);
+
+      /*
+        Call commit_ordered methods for all transactions in the queue
+        (that did not get an error in group_log_xid()).
+
+        We do this under an additional global LOCK_commit_ordered; this is
+        so that transactions that do not need 2-phase commit do not have
+        to wait for the potentially long duration of LOCK_group_commit.
+      */
+      if (need_commit_ordered)
       {
-        int err;
-        handlerton *ht= ha_info->ht();
-        /*
-          Do not call two-phase commit if this particular
-          transaction is read-only. This allows for simpler
-          implementation in engines that are always read-only.
-        */
-        if (! ha_info->is_trx_read_write())
-          continue;
-        /*
-          Sic: we know that prepare() is not NULL since otherwise
-          trans->no_2pc would have been set.
-        */
-        if ((err= ht->prepare(ht, thd, all)))
+        pthread_mutex_lock(&LOCK_commit_ordered);
+        for (THD *thd2= queue; thd2 != NULL; thd2= thd2->next_commit_ordered)
         {
-          my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
-          error= 1;
+          if (!queue->xid_error)
+            call_commit_ordered(ha_info, thd2, thd2->group_commit_all);
         }
-        status_var_increment(thd->status_var.ha_prepare_count);
+        pthread_mutex_unlock(&LOCK_commit_ordered);
       }
-      DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_ABORT(););
-      if (error || (is_real_trans && xid &&
-                    (error= !(cookie= tc_log->log_xid(thd, xid)))))
-      {
-        ha_rollback_trans(thd, all);
-        error= 1;
-        goto end;
-      }
-      DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT(););
+      pthread_mutex_unlock(&LOCK_group_commit);
+
+        /* Wake up everyone except ourself. */
+      while ((queue= queue->next_commit_ordered)  != NULL)
+        group_commit_wakeup_other(queue);
+    }
+    else
+    {
+      /* If not leader, just wait until leader wakes us up. */
+      group_commit_wait_for_wakeup(thd);
+    }
+
+    /*
+      Now that we're back in our own thread context, do any delayed error
+      reporting.
+    */
+    if (thd->xid_error)
+    {
+      tc_log->xid_delayed_error(thd);
+      goto err;
+    }
+    cookie= thd->xid_cookie;
+    /* The cookie must be non-zero in the non-error case. */
+    DBUG_ASSERT(cookie);
+  }
+  else
+  {
+    if (xid)
+      cookie= tc_log->log_xid(thd, xid);
+
+    if (!need_enqueue)
+    {
+      error= commit_one_phase_2(thd, all, TRUE);
+      DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+      goto end;
+    }
+
+    /*
+      We only get here to do correctly sequenced prepare_ordered and
+      commit_ordered() calls.
+
+      In this case, we need to wait for the previous in queue to finish
+      commit_ordered before us to get the correct sequence.
+    */
+    DBUG_ASSERT(need_prepare_ordered && need_commit_ordered);
+
+    if (is_group_commit_leader)
+    {
+      pthread_mutex_lock(&LOCK_group_commit);
+      group_commit_wait_queue_idle();
+      THD *queue= atomic_grab_reverse_queue();
+      /*
+        Mark the queue busy while we bounce it from one thread to the
+        next.
+      */
+      group_commit_mark_queue_busy();
+      pthread_mutex_unlock(&LOCK_group_commit);
+
+      /* The first in the queue is the leader. */
+      DBUG_ASSERT(queue == thd);
     }
-    error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0;
-    DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT(););
+    else
+    {
+      /* If not leader, just wait until previous thread wakes us up. */
+      group_commit_wait_for_wakeup(thd);
+    }
+
+    /* Only run commit_ordered() if log_xid was successful. */
     if (cookie)
-      tc_log->unlog(cookie, xid);
-    DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
-end:
-    if (rw_trans)
-      start_waiting_global_read_lock(thd);
+    {
+      pthread_mutex_lock(&LOCK_commit_ordered);
+      call_commit_ordered(ha_info, thd, all);
+      pthread_mutex_unlock(&LOCK_commit_ordered);
+    }
+
+    THD *next= thd->next_commit_ordered;
+    if (next)
+      group_commit_wakeup_other(next);
+    else
+      group_commit_mark_queue_idle();
+
+    if (!cookie)
+      goto err;
   }
-  /* Free resources and perform other cleanup even for 'empty' transactions. */
-  else if (is_real_trans)
-    thd->transaction.cleanup();
+
+  DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_ABORT(););
+
+  error= commit_one_phase_2(thd, all, FALSE) ? 2 : 0;
+
+  DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_ABORT(););
+  DBUG_ASSERT(cookie);
+  tc_log->unlog(cookie, xid);
+
+  DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+  goto end;
+
+  /* Come here if error and we need to rollback. */
+err:
+  if (!error)
+    error= 1;
+  ha_rollback_trans(thd, all);
+
+end:
+  if (rw_trans)
+    start_waiting_global_read_lock(thd);
 #endif /* USING_TRANSACTIONS */
   DBUG_RETURN(error);
 }
@@ -1207,6 +1547,17 @@ end:
 */
 int ha_commit_one_phase(THD *thd, bool all)
 {
+  /*
+    When we come here, we did not call handler commit_ordered() methods in
+    ha_commit_trans() 2-phase commit, so we pass TRUE to do it in
+    commit_one_phase_2().
+  */
+  return commit_one_phase_2(thd, all, TRUE);
+}
+
+static int
+commit_one_phase_2(THD *thd, bool all, bool do_commit_ordered)
+{
   int error=0;
   THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
   /*
@@ -1218,10 +1569,40 @@ int ha_commit_one_phase(THD *thd, bool a
   */
   bool is_real_trans=all || thd->transaction.all.ha_list == 0;
   Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
-  DBUG_ENTER("ha_commit_one_phase");
+  DBUG_ENTER("commit_one_phase_2");
 #ifdef USING_TRANSACTIONS
   if (ha_info)
   {
+    if (is_real_trans && do_commit_ordered)
+    {
+      /*
+        If we did not do it already, call any commit_ordered() method.
+
+        Even though we do not need to keep any ordering with other threads
+        (as there is no prepare or log_xid for this commit), we still need to
+        do this under the LOCK_commit_ordered mutex to not run in parallel
+        with other commit_ordered calls.
+      */
+
+      bool locked= FALSE;
+
+      for (Ha_trx_info *hi= ha_info; hi; hi= hi->next())
+      {
+        handlerton *ht= hi->ht();
+        if (ht->commit_ordered)
+        {
+          if (!locked)
+          {
+            pthread_mutex_lock(&LOCK_commit_ordered);
+            locked= 1;
+          }
+          ht->commit_ordered(ht, thd, all);
+        }
+      }
+      if (locked)
+        pthread_mutex_unlock(&LOCK_commit_ordered);
+    }
+
     for (; ha_info; ha_info= ha_info_next)
     {
       int err;

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2010-01-14 16:51:00 +0000
+++ b/sql/handler.h	2010-05-26 08:13:32 +0000
@@ -656,9 +656,81 @@ struct handlerton
      NOTE 'all' is also false in auto-commit mode where 'end of statement'
      and 'real commit' mean the same event.
    */
-   int  (*commit)(handlerton *hton, THD *thd, bool all);
+   int (*commit)(handlerton *hton, THD *thd, bool all);
+   /*
+     The commit_ordered() method is called prior to the commit() method, after
+     the transaction manager has decided to commit (not rollback) the
+     transaction.
+
+     The calls to commit_ordered() in multiple parallel transactions is
+     guaranteed to happen in the same order in every participating
+     handler. This can be used to ensure the same commit order among multiple
+     handlers (eg. in table handler and binlog). So if transaction T1 calls
+     into commit_ordered() of handler A before T2, then T1 will also call
+     commit_ordered() of handler B before T2.
+
+     The intension is that commit_ordered() should do the minimal amount of
+     work that needs to happen in consistent commit order among handlers. To
+     preserve ordering, calls need to be serialised on a global mutex, so
+     doing any time-consuming or blocking operations in commit_ordered() will
+     limit scalability.
+
+     Handlers can rely on commit_ordered() calls being serialised (no two
+     calls can run in parallel, so no extra locking on the handler part is
+     required to ensure this).
+
+     Note that commit_ordered() can be called from a different thread than the
+     one handling the transaction! So it can not do anything that depends on
+     thread local storage, in particular it can not call my_error() and
+     friends (instead it can store the error code and delay the call to
+     my_error() to the commit() method).
+
+     Similarly, since commit_ordered() returns void, any return error code
+     must be saved and returned from the commit() method instead.
+
+     commit_ordered() is called only when actually committing a transaction
+     (autocommit or not), not when ending a statement in the middle of a
+     transaction.
+
+     The commit_ordered method is optional, and can be left unset if not
+     needed in a particular handler.
+   */
+   void (*commit_ordered)(handlerton *hton, THD *thd, bool all);
    int  (*rollback)(handlerton *hton, THD *thd, bool all);
    int  (*prepare)(handlerton *hton, THD *thd, bool all);
+   /*
+     The prepare_ordered method is optional. If set, it will be called after
+     successful prepare() in all handlers participating in 2-phase commit.
+
+     The calls to prepare_ordered() among multiple parallel transactions are
+     ordered consistently with calls to commit_ordered(). This means that
+     calls to prepare_ordered() effectively define the commit order, and that
+     each handler will see the same sequence of transactions calling into
+     prepare_ordered() and commit_ordered().
+
+     Thus, prepare_ordered() can be used to define commit order for handlers
+     that need to do this in the prepare step (like binlog). It can also be
+     used to release transactions locks early in an order consistent with the
+     order transactions will be eventually committed.
+
+     Like commit_ordered(), prepare_ordered() calls are serialised to maintain
+     ordering, so the intension is that they should execute fast, with only
+     the minimal amount of work needed to define commit order. Handlers can
+     rely on this serialisation, and do not need to do any extra locking to
+     avoid two prepare_ordered() calls running in parallel.
+
+     Unlike commit_ordered(), prepare_ordered() _is_ guaranteed to be called
+     in the context of the thread handling the rest of the transaction.
+
+     Note that for user-level XA SQL commands, no consistent ordering among
+     prepare_ordered() and commit_ordered() is guaranteed (as that would
+     require blocking all other commits for an indefinite time).
+
+     prepare_ordered() is called only when actually committing a transaction
+     (autocommit or not), not when ending a statement in the middle of a
+     transaction.
+   */
+   int  (*prepare_ordered)(handlerton *hton, THD *thd, bool all);
    int  (*recover)(handlerton *hton, XID *xid_list, uint len);
    int  (*commit_by_xid)(handlerton *hton, XID *xid);
    int  (*rollback_by_xid)(handlerton *hton, XID *xid);

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2010-04-06 22:47:08 +0000
+++ b/sql/log.cc	2010-05-26 08:13:32 +0000
@@ -154,9 +154,12 @@ class binlog_trx_data {
 public:
   binlog_trx_data()
     : at_least_one_stmt_committed(0), incident(FALSE), m_pending(0),
-    before_stmt_pos(MY_OFF_T_UNDEF)
+    before_stmt_pos(MY_OFF_T_UNDEF), using_xa(0)
   {
     trans_log.end_of_file= max_binlog_cache_size;
+    (void) my_pthread_mutex_init(&LOCK_group_commit, MY_MUTEX_INIT_SLOW,
+                                 "LOCK_group_commit", MYF(0));
+    (void) pthread_cond_init(&COND_group_commit, 0);
   }
 
   ~binlog_trx_data()
@@ -208,11 +211,12 @@ public:
     completely.
    */
   void reset() {
-    if (!empty())
+    if (trans_log.type != WRITE_CACHE || !empty())
       truncate(0);
     before_stmt_pos= MY_OFF_T_UNDEF;
     incident= FALSE;
     trans_log.end_of_file= max_binlog_cache_size;
+    using_xa= FALSE;
     DBUG_ASSERT(empty());
   }
 
@@ -257,6 +261,41 @@ public:
     Binlog position before the start of the current statement.
   */
   my_off_t before_stmt_pos;
+
+  /* 0 or error when writing to binlog; set during group commit. */
+  int error;
+  /* If error != 0, value of errno (for my_error() reporting). */
+  int commit_errno;
+  /* Link for queueing transactions up for group commit to binlog. */
+  binlog_trx_data *next;
+  /*
+    Flag set true when group commit for this transaction is finished; used
+    with pthread_cond_wait() to wait until commit is done.
+    This flag is protected by LOCK_group_commit.
+  */
+  bool done;
+  /*
+    Flag set if this transaction is the group commit leader that will handle
+    the actual writing to the binlog.
+    This flag is protected by LOCK_group_commit.
+  */
+  bool group_commit_leader;
+  /*
+    Flag set true if this transaction is committed with log_xid() as part of
+    XA, false if not.
+  */
+  bool using_xa;
+  /*
+    Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
+    written during group commit. The incident_event is only valid if
+    has_incident() is true.
+  */
+  Log_event *begin_event;
+  Log_event *end_event;
+  Log_event *incident_event;
+  /* Mutex and condition for wakeup after group commit. */
+  pthread_mutex_t LOCK_group_commit;
+  pthread_cond_t COND_group_commit;
 };
 
 handlerton *binlog_hton;
@@ -1391,117 +1430,188 @@ static int binlog_close_connection(handl
   return 0;
 }
 
+/* Helper functions for binlog_flush_trx_cache(). */
+static int
+binlog_flush_trx_cache_prepare(THD *thd)
+{
+  if (thd->binlog_flush_pending_rows_event(TRUE))
+    return 1;
+  return 0;
+}
+
+static void
+binlog_flush_trx_cache_finish(THD *thd, binlog_trx_data *trx_data)
+{
+  IO_CACHE *trans_log= &trx_data->trans_log;
+
+  trx_data->reset();
+
+  statistic_increment(binlog_cache_use, &LOCK_status);
+  if (trans_log->disk_writes != 0)
+  {
+    statistic_increment(binlog_cache_disk_use, &LOCK_status);
+    trans_log->disk_writes= 0;
+  }
+}
+
 /*
-  End a transaction.
+  End a transaction, writing events to the binary log.
 
   SYNOPSIS
-    binlog_end_trans()
+    binlog_flush_trx_cache()
 
     thd      The thread whose transaction should be ended
     trx_data Pointer to the transaction data to use
-    end_ev   The end event to use, or NULL
-    all      True if the entire transaction should be ended, false if
-             only the statement transaction should be ended.
+    end_ev   The end event to use (COMMIT, ROLLBACK, or commit XID)
 
   DESCRIPTION
 
     End the currently open transaction. The transaction can be either
-    a real transaction (if 'all' is true) or a statement transaction
-    (if 'all' is false).
+    a real transaction or a statement transaction.
 
-    If 'end_ev' is NULL, the transaction is a rollback of only
-    transactional tables, so the transaction cache will be truncated
-    to either just before the last opened statement transaction (if
-    'all' is false), or reset completely (if 'all' is true).
+    This can be to commit a transaction, with a COMMIT query event or an XA
+    commit XID event. But it can also be to rollback a transaction with a
+    ROLLBACK query event, used for rolling back transactions which also
+    contain updates to non-transactional tables.
  */
 static int
-binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
-                 Log_event *end_ev, bool all)
+binlog_flush_trx_cache(THD *thd, binlog_trx_data *trx_data,
+                       Log_event *end_ev)
 {
-  DBUG_ENTER("binlog_end_trans");
-  int error=0;
-  IO_CACHE *trans_log= &trx_data->trans_log;
-  DBUG_PRINT("enter", ("transaction: %s  end_ev: 0x%lx",
-                       all ? "all" : "stmt", (long) end_ev));
+  DBUG_ENTER("binlog_flush_trx_cache");
   DBUG_PRINT("info", ("thd->options={ %s%s}",
                       FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
                       FLAGSTR(thd->options, OPTION_BEGIN)));
 
+  if (binlog_flush_trx_cache_prepare(thd))
+    DBUG_RETURN(1);
+
   /*
-    NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
-    only transactional tables.  If the transaction contain changes to
-    any non-transactiona tables, we need write the transaction and log
-    a ROLLBACK last.
-  */
-  if (end_ev != NULL)
-  {
-    if (thd->binlog_flush_pending_rows_event(TRUE))
-      DBUG_RETURN(1);
-    /*
-      Doing a commit or a rollback including non-transactional tables,
-      i.e., ending a transaction where we might write the transaction
-      cache to the binary log.
-
-      We can always end the statement when ending a transaction since
-      transactions are not allowed inside stored functions.  If they
-      were, we would have to ensure that we're not ending a statement
-      inside a stored function.
-     */
-    error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev,
-                               trx_data->has_incident());
-    trx_data->reset();
+    Doing a commit or a rollback including non-transactional tables,
+    i.e., ending a transaction where we might write the transaction
+    cache to the binary log.
+
+    We can always end the statement when ending a transaction since
+    transactions are not allowed inside stored functions.  If they
+    were, we would have to ensure that we're not ending a statement
+    inside a stored function.
+   */
+  int error= mysql_bin_log.write_transaction_to_binlog(thd, trx_data, end_ev);
 
-    /*
-      We need to step the table map version after writing the
-      transaction cache to disk.
-    */
-    mysql_bin_log.update_table_map_version();
-    statistic_increment(binlog_cache_use, &LOCK_status);
-    if (trans_log->disk_writes != 0)
-    {
-      statistic_increment(binlog_cache_disk_use, &LOCK_status);
-      trans_log->disk_writes= 0;
-    }
-  }
-  else
-  {
-    /*
-      If rolling back an entire transaction or a single statement not
-      inside a transaction, we reset the transaction cache.
+  binlog_flush_trx_cache_finish(thd, trx_data);
 
-      If rolling back a statement in a transaction, we truncate the
-      transaction cache to remove the statement.
-     */
-    thd->binlog_remove_pending_rows_event(TRUE);
-    if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
-    {
-      if (trx_data->has_incident())
-        error= mysql_bin_log.write_incident(thd, TRUE);
-      trx_data->reset();
-    }
-    else                                        // ...statement
-      trx_data->truncate(trx_data->before_stmt_pos);
+  DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
+  DBUG_RETURN(error);
+}
 
-    /*
-      We need to step the table map version on a rollback to ensure
-      that a new table map event is generated instead of the one that
-      was written to the thrown-away transaction cache.
-    */
-    mysql_bin_log.update_table_map_version();
+/*
+  Discard a transaction, ie. ROLLBACK with only transactional table updates.
+
+  SYNOPSIS
+    binlog_truncate_trx_cache()
+
+    thd      The thread whose transaction should be ended
+    trx_data Pointer to the transaction data to use
+    all      True if the entire transaction should be ended, false if
+             only the statement transaction should be ended.
+
+  DESCRIPTION
+
+    Rollback (and end) a transaction that only modifies transactional
+    tables. The transaction can be either a real transaction (if 'all' is
+    true) or a statement transaction (if 'all' is false).
+
+    The transaction cache will be truncated to either just before the last
+    opened statement transaction (if 'all' is false), or reset completely (if
+    'all' is true).
+ */
+static int
+binlog_truncate_trx_cache(THD *thd, binlog_trx_data *trx_data, bool all)
+{
+  DBUG_ENTER("binlog_truncate_trx_cache");
+  int error= 0;
+  DBUG_PRINT("enter", ("transaction: %s", all ? "all" : "stmt"));
+  DBUG_PRINT("info", ("thd->options={ %s%s}",
+                      FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
+                      FLAGSTR(thd->options, OPTION_BEGIN)));
+
+  /*
+    ROLLBACK with nothing to replicate: i.e., rollback of only transactional
+    tables.
+  */
+
+  /*
+    If rolling back an entire transaction or a single statement not
+    inside a transaction, we reset the transaction cache.
+
+    If rolling back a statement in a transaction, we truncate the
+    transaction cache to remove the statement.
+   */
+  thd->binlog_remove_pending_rows_event(TRUE);
+  if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+  {
+    if (trx_data->has_incident())
+      error= mysql_bin_log.write_incident(thd);
+    trx_data->reset();
   }
+  else                                        // ...statement
+    trx_data->truncate(trx_data->before_stmt_pos);
 
   DBUG_ASSERT(thd->binlog_get_pending_rows_event() == NULL);
   DBUG_RETURN(error);
 }
 
+static LEX_STRING const write_error_msg=
+    { C_STRING_WITH_LEN("error writing to the binary log") };
+
 static int binlog_prepare(handlerton *hton, THD *thd, bool all)
 {
   /*
-    do nothing.
-    just pretend we can do 2pc, so that MySQL won't
-    switch to 1pc.
-    real work will be done in MYSQL_BIN_LOG::log_xid()
+    If this prepare is for a single statement in the middle of a transactions,
+    not the actual transaction commit, then we do nothing. The real work is
+    only done later, in the prepare for making persistent changes.
   */
+  if (!all && (thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT)))
+    return 0;
+
+  binlog_trx_data *trx_data=
+    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+  trx_data->using_xa= TRUE;
+
+  if (binlog_flush_trx_cache_prepare(thd))
+    return 1;
+
+  my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
+  if (!xid)
+  {
+    /* Skip logging this transaction, marked by setting end_event to NULL. */
+    trx_data->end_event= NULL;
+    return 0;
+  }
+
+  /*
+    Allocate the extra events that will be logged to the binlog in binlog group
+    commit. Use placement new to allocate them on the THD memroot, as they need
+    to remain live until log_xid() returns.
+   */
+  size_t needed_size= sizeof(Query_log_event) + sizeof(Xid_log_event);
+  if (trx_data->has_incident())
+    needed_size+= sizeof(Incident_log_event);
+  uchar *mem= (uchar *)thd->alloc(needed_size);
+  if (!mem)
+    return 1;
+
+  trx_data->begin_event= new ((void *)mem)
+    Query_log_event(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+  mem+= sizeof(Query_log_event);
+
+  trx_data->end_event= new ((void *)mem) Xid_log_event(thd, xid);
+
+  if (trx_data->has_incident())
+    trx_data->incident_event= new ((void *)(mem + sizeof(Xid_log_event)))
+      Incident_log_event(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+
   return 0;
 }
 
@@ -1525,11 +1635,11 @@ static int binlog_commit(handlerton *hto
   binlog_trx_data *const trx_data=
     (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
 
-  if (trx_data->empty())
+  if (trx_data->using_xa)
   {
     // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
-    trx_data->reset();
-    DBUG_RETURN(0);
+    binlog_flush_trx_cache_finish(thd, trx_data);
+    DBUG_RETURN(error);
   }
 
   /*
@@ -1556,8 +1666,8 @@ static int binlog_commit(handlerton *hto
        !stmt_has_updated_trans_table(thd) &&
        thd->transaction.stmt.modified_non_trans_table))
   {
-    Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
-    error= binlog_end_trans(thd, trx_data, &qev, all);
+    Query_log_event end_ev(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, 0);
+    error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
   }
 
   trx_data->at_least_one_stmt_committed = my_b_tell(&trx_data->trans_log) > 0;
@@ -1621,7 +1731,7 @@ static int binlog_rollback(handlerton *h
         (thd->options & OPTION_KEEP_LOG)) &&
         mysql_bin_log.check_write_error(thd))
       trx_data->set_incident();
-    error= binlog_end_trans(thd, trx_data, 0, all);
+    error= binlog_truncate_trx_cache(thd, trx_data, all);
   }
   else
   {
@@ -1641,8 +1751,8 @@ static int binlog_rollback(handlerton *h
          thd->current_stmt_binlog_row_based) ||
         ((thd->options & OPTION_KEEP_LOG)))
     {
-      Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
-      error= binlog_end_trans(thd, trx_data, &qev, all);
+      Query_log_event end_ev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, TRUE, 0);
+      error= binlog_flush_trx_cache(thd, trx_data, &end_ev);
     }
     /*
       Otherwise, we simply truncate the cache as there is no change on
@@ -1650,7 +1760,7 @@ static int binlog_rollback(handlerton *h
     */
     else if ((all && !thd->transaction.all.modified_non_trans_table) ||
           (!all && !thd->transaction.stmt.modified_non_trans_table))
-      error= binlog_end_trans(thd, trx_data, 0, all);
+      error= binlog_truncate_trx_cache(thd, trx_data, all);
   }
   if (!all)
     trx_data->before_stmt_pos = MY_OFF_T_UNDEF; // part of the stmt rollback
@@ -2464,7 +2574,7 @@ const char *MYSQL_LOG::generate_name(con
 
 MYSQL_BIN_LOG::MYSQL_BIN_LOG()
   :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
-   need_start_event(TRUE), m_table_map_version(0),
+   need_start_event(TRUE),
    is_relay_log(0),
    description_event_for_exec(0), description_event_for_queue(0)
 {
@@ -2477,6 +2587,7 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG()
   index_file_name[0] = 0;
   bzero((char*) &index_file, sizeof(index_file));
   bzero((char*) &purge_index_file, sizeof(purge_index_file));
+  use_group_log_xid= TRUE;
 }
 
 /* this is called only once */
@@ -2492,6 +2603,7 @@ void MYSQL_BIN_LOG::cleanup()
     delete description_event_for_exec;
     (void) pthread_mutex_destroy(&LOCK_log);
     (void) pthread_mutex_destroy(&LOCK_index);
+    (void) pthread_mutex_destroy(&LOCK_queue);
     (void) pthread_cond_destroy(&update_cond);
   }
   DBUG_VOID_RETURN;
@@ -2520,6 +2632,8 @@ void MYSQL_BIN_LOG::init_pthread_objects
   */ 
   (void) my_pthread_mutex_init(&LOCK_index, MY_MUTEX_INIT_SLOW, "LOCK_index",
                                MYF_NO_DEADLOCK_DETECTION);
+  (void) my_pthread_mutex_init(&LOCK_queue, MY_MUTEX_INIT_FAST, "LOCK_queue",
+                               MYF(0));
   (void) pthread_cond_init(&update_cond, 0);
 }
 
@@ -4113,7 +4227,6 @@ int THD::binlog_write_table_map(TABLE *t
     DBUG_RETURN(error);
 
   binlog_table_maps++;
-  table->s->table_map_version= mysql_bin_log.table_map_version();
   DBUG_RETURN(0);
 }
 
@@ -4194,64 +4307,41 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
 
   if (Rows_log_event* pending= trx_data->pending())
   {
-    IO_CACHE *file= &log_file;
-
     /*
       Decide if we should write to the log file directly or to the
       transaction log.
     */
     if (pending->get_cache_stmt() || my_b_tell(&trx_data->trans_log))
-      file= &trx_data->trans_log;
-
-    /*
-      If we are writing to the log file directly, we could avoid
-      locking the log. This does not work since we need to step the
-      m_table_map_version below, and that change has to be protected
-      by the LOCK_log mutex.
-    */
-    pthread_mutex_lock(&LOCK_log);
-
-    /*
-      Write pending event to log file or transaction cache
-    */
-    if (pending->write(file))
     {
-      pthread_mutex_unlock(&LOCK_log);
-      set_write_error(thd);
-      DBUG_RETURN(1);
+      /* Write to transaction log/cache. */
+      if (pending->write(&trx_data->trans_log))
+      {
+        set_write_error(thd);
+        DBUG_RETURN(1);
+      }
     }
-
-    /*
-      We step the table map version if we are writing an event
-      representing the end of a statement.  We do this regardless of
-      wheather we write to the transaction cache or to directly to the
-      file.
-
-      In an ideal world, we could avoid stepping the table map version
-      if we were writing to a transaction cache, since we could then
-      reuse the table map that was written earlier in the transaction
-      cache.  This does not work since STMT_END_F implies closing all
-      table mappings on the slave side.
-
-      TODO: Find a solution so that table maps does not have to be
-      written several times within a transaction.
-     */
-    if (pending->get_flags(Rows_log_event::STMT_END_F))
-      ++m_table_map_version;
-
-    delete pending;
-
-    if (file == &log_file)
+    else
     {
+      /* Write directly to log file. */
+      pthread_mutex_lock(&LOCK_log);
+      if (pending->write(&log_file))
+      {
+        pthread_mutex_unlock(&LOCK_log);
+        set_write_error(thd);
+        DBUG_RETURN(1);
+      }
+
       error= flush_and_sync();
       if (!error)
       {
         signal_update();
         rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
       }
+
+      pthread_mutex_unlock(&LOCK_log);
     }
 
-    pthread_mutex_unlock(&LOCK_log);
+    delete pending;
   }
 
   thd->binlog_set_pending_rows_event(event);
@@ -4450,9 +4540,6 @@ err:
       set_write_error(thd);
   }
 
-  if (event_info->flags & LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F)
-    ++m_table_map_version;
-
   pthread_mutex_unlock(&LOCK_log);
   DBUG_RETURN(error);
 }
@@ -4575,18 +4662,14 @@ uint MYSQL_BIN_LOG::next_file_id()
   SYNOPSIS
     write_cache()
     cache    Cache to write to the binary log
-    lock_log True if the LOCK_log mutex should be aquired, false otherwise
-    sync_log True if the log should be flushed and sync:ed
 
   DESCRIPTION
     Write the contents of the cache to the binary log. The cache will
     be reset as a READ_CACHE to be able to read the contents from it.
  */
 
-int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache, bool lock_log, bool sync_log)
+int MYSQL_BIN_LOG::write_cache(IO_CACHE *cache)
 {
-  Mutex_sentry sentry(lock_log ? &LOCK_log : NULL);
-
   if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
     return ER_ERROR_ON_WRITE;
   uint length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
@@ -4697,6 +4780,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE 
     }
 
     /* Write data to the binary log file */
+    DBUG_EXECUTE_IF("fail_binlog_write_1", return ER_ERROR_ON_WRITE;);
     if (my_b_write(&log_file, cache->read_pos, length))
       return ER_ERROR_ON_WRITE;
     cache->read_pos=cache->read_end;		// Mark buffer used up
@@ -4704,9 +4788,6 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE 
 
   DBUG_ASSERT(carry == 0);
 
-  if (sync_log)
-    flush_and_sync();
-
   return 0;                                     // All OK
 }
 
@@ -4739,26 +4820,22 @@ int query_error_code(THD *thd, bool not_
   return error;
 }
 
-bool MYSQL_BIN_LOG::write_incident(THD *thd, bool lock)
+bool MYSQL_BIN_LOG::write_incident(THD *thd)
 {
   uint error= 0;
   DBUG_ENTER("MYSQL_BIN_LOG::write_incident");
-  LEX_STRING const write_error_msg=
-    { C_STRING_WITH_LEN("error writing to the binary log") };
   Incident incident= INCIDENT_LOST_EVENTS;
   Incident_log_event ev(thd, incident, write_error_msg);
-  if (lock)
-    pthread_mutex_lock(&LOCK_log);
+
+  pthread_mutex_lock(&LOCK_log);
   error= ev.write(&log_file);
-  if (lock)
+  if (!error && !(error= flush_and_sync()))
   {
-    if (!error && !(error= flush_and_sync()))
-    {
-      signal_update();
-      rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
-    }
-    pthread_mutex_unlock(&LOCK_log);
+    signal_update();
+    rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
   }
+  pthread_mutex_unlock(&LOCK_log);
+
   DBUG_RETURN(error);
 }
 
@@ -4786,103 +4863,364 @@ bool MYSQL_BIN_LOG::write_incident(THD *
     'cache' needs to be reinitialized after this functions returns.
 */
 
-bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event,
-                          bool incident)
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+                                           Log_event *end_ev)
 {
-  DBUG_ENTER("MYSQL_BIN_LOG::write(THD *, IO_CACHE *, Log_event *)");
+  DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+
+  /*
+    Create the necessary events here, where we have the correct THD (and
+    thread context).
+
+    Due to group commit the actual writing to binlog may happen in a different
+    thread.
+  */
+  Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+  trx_data->begin_event= &qinfo;
+  trx_data->end_event= end_ev;
+  if (trx_data->has_incident())
+  {
+    Incident_log_event inc_ev(thd, INCIDENT_LOST_EVENTS, write_error_msg);
+    trx_data->incident_event= &inc_ev;
+    DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+  }
+  else
+  {
+    trx_data->incident_event= NULL;
+    DBUG_RETURN(write_transaction_to_binlog_events(trx_data));
+  }
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(binlog_trx_data *trx_data)
+{
+  /*
+    To facilitate group commit for the binlog, we first queue up ourselves in
+    the group commit queue. Then the first thread to enter the queue waits for
+    the LOCK_log mutex, and commits for everyone in the queue once it gets the
+    lock. Any other threads in the queue just wait for the first one to finish
+    the commit and wake them up.
+  */
+
+  pthread_mutex_lock(&trx_data->LOCK_group_commit);
+  const binlog_trx_data *orig_queue= atomic_enqueue_trx(trx_data);
+
+  if (orig_queue != NULL)
+  {
+    trx_data->group_commit_leader= FALSE;
+    trx_data->done= FALSE;
+    trx_group_commit_participant(trx_data);
+  }
+  else
+  {
+    trx_data->group_commit_leader= TRUE;
+    pthread_mutex_unlock(&trx_data->LOCK_group_commit);
+    trx_group_commit_leader(NULL);
+  }
+
+  return trx_group_commit_finish(trx_data);
+}
+
+/*
+  Participate as secondary transaction in group commit.
+
+  Another thread is already waiting to obtain the LOCK_log, and should include
+  this thread in the group commit once the log is obtained. So here we put
+  ourself in the queue and wait to be signalled that the group commit is done.
+
+  Note that this function must be called with the trs_data->LOCK_group_commit
+  locked; the mutex will be released before return.
+*/
+void
+MYSQL_BIN_LOG::trx_group_commit_participant(binlog_trx_data *trx_data)
+{
+  safe_mutex_assert_owner(&trx_data->LOCK_group_commit);
+
+  /* Wait until trx_data.done == true and woken up by the leader. */
+  while (!trx_data->done)
+    pthread_cond_wait(&trx_data->COND_group_commit,
+                      &trx_data->LOCK_group_commit);
+  pthread_mutex_unlock(&trx_data->LOCK_group_commit);
+}
+
+bool
+MYSQL_BIN_LOG::trx_group_commit_finish(binlog_trx_data *trx_data)
+{
+  if (trx_data->error)
+  {
+    switch (trx_data->error)
+    {
+    case ER_ERROR_ON_WRITE:
+      my_error(ER_ERROR_ON_WRITE, MYF(ME_NOREFRESH), name, trx_data->commit_errno);
+      break;
+    case ER_ERROR_ON_READ:
+      my_error(ER_ERROR_ON_READ, MYF(ME_NOREFRESH),
+               trx_data->trans_log.file_name, trx_data->commit_errno);
+      break;
+    default:
+      /*
+        There are not (and should not be) any errors thrown not covered above.
+        But just in case one is added later without updating the above switch
+        statement, include a catch-all.
+      */
+      my_printf_error(trx_data->error,
+                      "Error writing transaction to binary log: %d",
+                      MYF(ME_NOREFRESH), trx_data->error);
+    }
+
+    /*
+      Since we return error, this transaction XID will not be committed, so
+      we need to mark it as not needed for recovery (unlog() is not called
+      for a transaction if log_xid() fails).
+     */
+    if (trx_data->end_event->get_type_code() == XID_EVENT)
+      mark_xid_done();
+
+    return 1;
+  }
+
+  return 0;
+}
+
+/*
+  Do binlog group commit as the lead thread.
+
+  This must be called when this thread/transaction is queued at the start of
+  the group_commit_queue. It will wait to obtain the LOCK_log mutex, then group
+  commit all the transactions in the queue (more may have entered while waiting
+  for LOCK_log). After commit is done, all other threads in the queue will be
+  signalled.
+
+ */
+void
+MYSQL_BIN_LOG::trx_group_commit_leader(THD *first_thd)
+{
+  uint xid_count= 0;
+  uint write_count= 0;
+
+  /* First, put anything from group_log_xid into the queue. */
+  binlog_trx_data *full_queue= NULL;
+  binlog_trx_data **next_ptr= &full_queue;
+  for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered)
+  {
+    binlog_trx_data *const trx_data=
+      (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+
+    /* Skip log_xid for transactions without xid, marked by NULL end_event. */
+    if (!trx_data->end_event)
+      continue;
+
+    trx_data->error= 0;
+    *next_ptr= trx_data;
+    next_ptr= &(trx_data->next);
+  }
+
+  /*
+    Next, lock the LOCK_log(), and once we get it, add any additional writes
+    that queued up while we were waiting.
+
+    Note that if some writer not going through log_xid() comes in and gets the
+    LOCK_log before us, they will not be able to include us in their group
+    commit (and they are not able to handle ensuring same commit order between
+    us and participating transactional storage engines anyway).
+
+    On the other hand, when we get the LOCK_log, we will be able to include
+    any non-trasactional writes that queued up in our group commit. This
+    should hopefully not be too big of a problem, as group commit is most
+    important for the transactional case anyway when durability (fsync) is
+    enabled.
+  */
   VOID(pthread_mutex_lock(&LOCK_log));
 
-  /* NULL would represent nothing to replicate after ROLLBACK */
-  DBUG_ASSERT(commit_event != NULL);
+  /*
+    As the queue is in reverse order of entering, reverse the queue as we add
+    it to the existing one. Note that there is no ordering defined between
+    transactional and non-transactional commits.
+  */
+  binlog_trx_data *current= atomic_grab_trx_queue();
+  binlog_trx_data *xtra_queue= NULL;
+  while (current)
+  {
+    current->error= 0;
+    binlog_trx_data *next= current->next;
+    current->next= xtra_queue;
+    xtra_queue= current;
+    current= next;
+  }
+  *next_ptr= xtra_queue;
 
+  /*
+    Now we have in full_queue the list of transactions to be committed in
+    order.
+  */
   DBUG_ASSERT(is_open());
   if (likely(is_open()))                       // Should always be true
   {
     /*
-      We only bother to write to the binary log if there is anything
-      to write.
-     */
-    if (my_b_tell(cache) > 0)
+      Commit every transaction in the queue.
+
+      Note that we are doing this in a different thread than the one running
+      the transaction! So we are limited in the operations we can do. In
+      particular, we cannot call my_error() on behalf of a transaction, as
+      that obtains the THD from thread local storage. Instead, we must set
+      current->error and let the thread do the error reporting itself once
+      we wake it up.
+    */
+    for (current= full_queue; current != NULL; current= current->next)
     {
-      /*
-        Log "BEGIN" at the beginning of every transaction.  Here, a
-        transaction is either a BEGIN..COMMIT block or a single
-        statement in autocommit mode.
-      */
-      Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, TRUE, 0);
+      IO_CACHE *cache= &current->trans_log;
 
       /*
-        Now this Query_log_event has artificial log_pos 0. It must be
-        adjusted to reflect the real position in the log. Not doing it
-        would confuse the slave: it would prevent this one from
-        knowing where he is in the master's binlog, which would result
-        in wrong positions being shown to the user, MASTER_POS_WAIT
-        undue waiting etc.
+        We only bother to write to the binary log if there is anything
+        to write.
       */
-      if (qinfo.write(&log_file))
-        goto err;
-
-      DBUG_EXECUTE_IF("crash_before_writing_xid",
-                      {
-                        if ((write_error= write_cache(cache, false, true)))
-                          DBUG_PRINT("info", ("error writing binlog cache: %d",
-                                               write_error));
-                        DBUG_PRINT("info", ("crashing before writing xid"));
-                        abort();
-                      });
-
-      if ((write_error= write_cache(cache, false, false)))
-        goto err;
+      if (my_b_tell(cache) > 0)
+      {
+        current->error= write_transaction(current);
+        if (current->error)
+          current->commit_errno= errno;
 
-      if (commit_event && commit_event->write(&log_file))
-        goto err;
+        write_count++;
+      }
 
-      if (incident && write_incident(thd, FALSE))
-        goto err;
+      if (current->end_event->get_type_code() == XID_EVENT)
+        xid_count++;
+    }
 
+    if (write_count > 0)
+    {
       if (flush_and_sync())
-        goto err;
-      DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
-      if (cache->error)				// Error on read
       {
-        sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno);
-        write_error=1;				// Don't give more errors
-        goto err;
+        for (current= full_queue; current != NULL; current= current->next)
+        {
+          if (!current->error)
+          {
+            current->error= ER_ERROR_ON_WRITE;
+            current->commit_errno= errno;
+          }
+        }
+      }
+      else
+      {
+        signal_update();
       }
-      signal_update();
     }
 
     /*
-      if commit_event is Xid_log_event, increase the number of
+      if any commit_events are Xid_log_event, increase the number of
       prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated
       if there're prepared xids in it - see the comment in new_file() for
       an explanation.
-      If the commit_event is not Xid_log_event (then it's a Query_log_event)
-      rotate binlog, if necessary.
+      If no Xid_log_events (then it's all Query_log_event) rotate binlog,
+      if necessary.
     */
-    if (commit_event && commit_event->get_type_code() == XID_EVENT)
+    if (xid_count > 0)
     {
-      pthread_mutex_lock(&LOCK_prep_xids);
-      prepared_xids++;
-      pthread_mutex_unlock(&LOCK_prep_xids);
+      mark_xids_active(xid_count);
     }
     else
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
   }
+
   VOID(pthread_mutex_unlock(&LOCK_log));
 
-  DBUG_RETURN(0);
+  /*
+    Signal those that are not part of group_log_xid, and are not group leaders
+    running the queue.
 
-err:
-  if (!write_error)
+    Since a group leader runs the queue itself if a group_log_xid does not get
+    to do it forst, such leader threads do not need wait or wakeup.
+  */
+  for (current= xtra_queue; current != NULL; current= current->next)
   {
-    write_error= 1;
-    sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
+    /*
+      Note that we need to take LOCK_group_commit even in the case of a leader!
+
+      Otherwise there is a race between setting and testing the
+      group_commit_leader flag.
+    */
+    pthread_mutex_lock(&current->LOCK_group_commit);
+    if (!current->group_commit_leader)
+    {
+      current->done= true;
+      pthread_cond_signal(&current->COND_group_commit);
+    }
+    pthread_mutex_unlock(&current->LOCK_group_commit);
   }
-  VOID(pthread_mutex_unlock(&LOCK_log));
-  DBUG_RETURN(1);
 }
 
+int
+MYSQL_BIN_LOG::write_transaction(binlog_trx_data *trx_data)
+{
+  IO_CACHE *cache= &trx_data->trans_log;
+  /*
+    Log "BEGIN" at the beginning of every transaction.  Here, a transaction is
+    either a BEGIN..COMMIT block or a single statement in autocommit mode. The
+    event was constructed in write_transaction_to_binlog(), in the thread
+    running the transaction.
+
+    Now this Query_log_event has artificial log_pos 0. It must be
+    adjusted to reflect the real position in the log. Not doing it
+    would confuse the slave: it would prevent this one from
+    knowing where he is in the master's binlog, which would result
+    in wrong positions being shown to the user, MASTER_POS_WAIT
+    undue waiting etc.
+  */
+  if (trx_data->begin_event->write(&log_file))
+    return ER_ERROR_ON_WRITE;
+
+  DBUG_EXECUTE_IF("crash_before_writing_xid",
+                  {
+                    if ((write_cache(cache)))
+                      DBUG_PRINT("info", ("error writing binlog cache"));
+                    else
+                      flush_and_sync();
+
+                    DBUG_PRINT("info", ("crashing before writing xid"));
+                    abort();
+                  });
+
+  if (write_cache(cache))
+    return ER_ERROR_ON_WRITE;
+
+  if (trx_data->end_event->write(&log_file))
+    return ER_ERROR_ON_WRITE;
+
+  if (trx_data->has_incident() && trx_data->incident_event->write(&log_file))
+    return ER_ERROR_ON_WRITE;
+
+  if (cache->error)				// Error on read
+    return ER_ERROR_ON_READ;
+
+  return 0;
+}
+
+binlog_trx_data *
+MYSQL_BIN_LOG::atomic_enqueue_trx(binlog_trx_data *trx_data)
+{
+  my_atomic_rwlock_wrlock(&LOCK_queue);
+  trx_data->next= group_commit_queue;
+  while (!my_atomic_casptr((void **)(&group_commit_queue),
+                           (void **)(&trx_data->next),
+                           trx_data))
+    ;
+  my_atomic_rwlock_wrunlock(&LOCK_queue);
+  return trx_data->next;
+}
+
+binlog_trx_data *
+MYSQL_BIN_LOG::atomic_grab_trx_queue()
+{
+  my_atomic_rwlock_wrlock(&LOCK_queue);
+  binlog_trx_data *queue= group_commit_queue;
+  while (!my_atomic_casptr((void **)(&group_commit_queue),
+                           (void **)(&queue),
+                           NULL))
+    ;
+  my_atomic_rwlock_wrunlock(&LOCK_queue);
+  return queue;
+}
 
 /**
   Wait until we get a signal that the binary log has been updated.
@@ -5879,9 +6217,6 @@ void TC_LOG_BINLOG::close()
 }
 
 /**
-  @todo
-  group commit
-
   @retval
     0    error
   @retval
@@ -5889,19 +6224,83 @@ void TC_LOG_BINLOG::close()
 */
 int TC_LOG_BINLOG::log_xid(THD *thd, my_xid xid)
 {
-  DBUG_ENTER("TC_LOG_BINLOG::log");
-  Xid_log_event xle(thd, xid);
-  binlog_trx_data *trx_data=
-    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+  int error;
+  DBUG_ENTER("TC_LOG_BINLOG::log_xid");
+
+  thd->next_commit_ordered= 0;
+  group_log_xid(thd);
+  if (thd->xid_error)
+    error= xid_delayed_error(thd);
+  else
+    error= 0;
+
   /*
-    We always commit the entire transaction when writing an XID. Also
-    note that the return value is inverted.
-   */
-  DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE));
+    Note that the return value is inverted: zero on failure, private non-zero
+    'cookie' on success.
+  */
+  DBUG_RETURN(!error);
 }
 
-void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+/*
+  Do a binlog log_xid() for a group of transactions, linked through
+  thd->next_commit_ordered.
+*/
+void
+TC_LOG_BINLOG::group_log_xid(THD *first_thd)
+{
+  DBUG_ENTER("TC_LOG_BINLOG::group_log_xid");
+  trx_group_commit_leader(first_thd);
+  for (THD *thd= first_thd; thd; thd= thd->next_commit_ordered)
+  {
+    binlog_trx_data *const trx_data=
+      (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+    thd->xid_error= trx_data->error;
+    thd->xid_cookie= !trx_data->error;
+  }
+  DBUG_VOID_RETURN;
+}
+
+int
+TC_LOG_BINLOG::xid_delayed_error(THD *thd)
 {
+  binlog_trx_data *const trx_data=
+    (binlog_trx_data*) thd_get_ha_data(thd, binlog_hton);
+  return trx_group_commit_finish(trx_data);
+}
+
+/*
+  After an XID is logged, we need to hold on to the current binlog file until
+  it is fully committed in the storage engine. The reason is that crash
+  recovery only looks at the latest binlog, so we must make sure there are no
+  outstanding prepared (but not committed) transactions before rotating the
+  binlog.
+
+  To handle this, we keep a count of outstanding XIDs. This function is used
+  to increase this count when committing one or more transactions to the
+  binary log.
+*/
+void
+TC_LOG_BINLOG::mark_xids_active(uint xid_count)
+{
+  DBUG_ENTER("TC_LOG_BINLOG::mark_xids_active");
+  DBUG_PRINT("info", ("xid_count=%u", xid_count));
+  pthread_mutex_lock(&LOCK_prep_xids);
+  prepared_xids+= xid_count;
+  pthread_mutex_unlock(&LOCK_prep_xids);
+  DBUG_VOID_RETURN;
+}
+
+/*
+  Once an XID is committed, it is safe to rotate the binary log, as it can no
+  longer be needed during crash recovery.
+
+  This function is called to mark an XID this way. It needs to decrease the
+  count of pending XIDs, and signal the log rotator thread when it reaches zero.
+*/
+void
+TC_LOG_BINLOG::mark_xid_done()
+{
+  DBUG_ENTER("TC_LOG_BINLOG::mark_xid_done");
   pthread_mutex_lock(&LOCK_prep_xids);
   DBUG_ASSERT(prepared_xids > 0);
   if (--prepared_xids == 0) {
@@ -5909,7 +6308,16 @@ void TC_LOG_BINLOG::unlog(ulong cookie, 
     pthread_cond_signal(&COND_prep_xids);
   }
   pthread_mutex_unlock(&LOCK_prep_xids);
-  rotate_and_purge(0);     // as ::write() did not rotate
+  DBUG_VOID_RETURN;
+}
+
+void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
+{
+  DBUG_ENTER("TC_LOG_BINLOG::unlog");
+  if (xid)
+    mark_xid_done();
+  rotate_and_purge(0);     // as ::write_transaction_to_binlog() did not rotate
+  DBUG_VOID_RETURN;
 }
 
 int TC_LOG_BINLOG::recover(IO_CACHE *log, Format_description_log_event *fdle)

=== modified file 'sql/log.h'
--- a/sql/log.h	2009-12-04 14:40:42 +0000
+++ b/sql/log.h	2010-05-26 08:13:32 +0000
@@ -28,13 +28,49 @@ class TC_LOG
 {
   public:
   int using_heuristic_recover();
-  TC_LOG() {}
+  /* True if we should use group_log_xid(), false to use log_xid(). */
+  bool use_group_log_xid;
+
+  TC_LOG() : use_group_log_xid(0) {}
   virtual ~TC_LOG() {}
 
   virtual int open(const char *opt_name)=0;
   virtual void close()=0;
   virtual int log_xid(THD *thd, my_xid xid)=0;
   virtual void unlog(ulong cookie, my_xid xid)=0;
+  /*
+    If use_group_log_xid is true, then this method is used instead of
+    log_xid() to do logging of a group of transactions all at once.
+
+    The transactions will be linked through THD::next_commit_ordered.
+
+    Additionally, when this method is used instead of log_xid(), the order in
+    which handler->prepare_ordered() and handler->commit_ordered() are called
+    is guaranteed to be the same as the order of calls and THD list elements
+    for group_log_xid().
+
+    This can be used to efficiently implement group commit that at the same
+    time preserves the order of commits among handlers and TC (eg. to get same
+    commit order in InnoDB and binary log).
+
+    For TCs that do not need this, it can be preferable to use plain log_xid()
+    instead, as it allows threads to run log_xid() in parallel with each
+    other. In contrast, group_log_xid() runs under a global mutex, so it is
+    guaranteed that only once call into it will be active at once.
+
+    Since this call handles multiple threads/THDs at once, my_error() (and
+    other code that relies on thread local storage) cannot be used in this
+    method. Instead, in case of error, thd->xid_error should be set to the
+    error code, and xid_delayed_error() will be called later in the correct
+    thread context to actually report the error.
+
+    In the success case, this method must set thd->xid_cookie for each thread
+    to the cookie that is normally returned from log_xid() (which must be
+    non-zero in the non-error case).
+  */
+  virtual void group_log_xid(THD *first_thd) { DBUG_ASSERT(FALSE); }
+  /* Error reporting for group_log_xid(). */
+  virtual int xid_delayed_error(THD *thd) { DBUG_ASSERT(FALSE); return 0; }
 };
 
 class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
@@ -227,12 +263,19 @@ private:
   time_t last_time;
 };
 
+class binlog_trx_data;
 class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
 {
  private:
   /* LOCK_log and LOCK_index are inited by init_pthread_objects() */
   pthread_mutex_t LOCK_index;
   pthread_mutex_t LOCK_prep_xids;
+  /*
+    Mutex to protect the queue of transactions waiting to participate in group
+    commit. (Only used on platforms without native atomic operations).
+  */
+  pthread_mutex_t LOCK_queue;
+
   pthread_cond_t  COND_prep_xids;
   pthread_cond_t update_cond;
   ulonglong bytes_written;
@@ -271,8 +314,8 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
     In 5.0 it's 0 for relay logs too!
   */
   bool no_auto_events;
-
-  ulonglong m_table_map_version;
+  /* Queue of transactions queued up to participate in group commit. */
+  binlog_trx_data *group_commit_queue;
 
   int write_to_file(IO_CACHE *cache);
   /*
@@ -282,6 +325,14 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
   */
   void new_file_without_locking();
   void new_file_impl(bool need_lock);
+  int write_transaction(binlog_trx_data *trx_data);
+  bool write_transaction_to_binlog_events(binlog_trx_data *trx_data);
+  void trx_group_commit_participant(binlog_trx_data *trx_data);
+  void trx_group_commit_leader(THD *first_thd);
+  binlog_trx_data *atomic_enqueue_trx(binlog_trx_data *trx_data);
+  binlog_trx_data *atomic_grab_trx_queue();
+  void mark_xid_done();
+  void mark_xids_active(uint xid_count);
 
 public:
   MYSQL_LOG::generate_name;
@@ -311,17 +362,11 @@ public:
   int open(const char *opt_name);
   void close();
   int log_xid(THD *thd, my_xid xid);
+  int xid_delayed_error(THD *thd);
+  void group_log_xid(THD *first_thd);
   void unlog(ulong cookie, my_xid xid);
   int recover(IO_CACHE *log, Format_description_log_event *fdle);
 #if !defined(MYSQL_CLIENT)
-  bool is_table_mapped(TABLE *table) const
-  {
-    return table->s->table_map_version == table_map_version();
-  }
-
-  ulonglong table_map_version() const { return m_table_map_version; }
-  void update_table_map_version() { ++m_table_map_version; }
-
   int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event);
   int remove_pending_rows_event(THD *thd);
 
@@ -362,10 +407,12 @@ public:
   void new_file();
 
   bool write(Log_event* event_info); // binary log write
-  bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident);
-  bool write_incident(THD *thd, bool lock);
+  bool write_transaction_to_binlog(THD *thd, binlog_trx_data *trx_data,
+                                   Log_event *end_ev);
+  bool trx_group_commit_finish(binlog_trx_data *trx_data);
+  bool write_incident(THD *thd);
 
-  int  write_cache(IO_CACHE *cache, bool lock_log, bool flush_and_sync);
+  int  write_cache(IO_CACHE *cache);
   void set_write_error(THD *thd);
   bool check_write_error(THD *thd);
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-03-04 08:03:07 +0000
+++ b/sql/log_event.h	2010-05-26 08:13:32 +0000
@@ -463,10 +463,9 @@ struct sql_ex_info
 #define LOG_EVENT_SUPPRESS_USE_F    0x8
 
 /*
-  The table map version internal to the log should be increased after
-  the event has been written to the binary log.
+  This used to be LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F, but is now unused.
  */
-#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x10
+#define LOG_EVENT_UNUSED1_F 0x10
 
 /**
    @def LOG_EVENT_ARTIFICIAL_F

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2010-01-15 15:27:55 +0000
+++ b/sql/sql_class.cc	2010-05-26 08:13:32 +0000
@@ -673,6 +673,8 @@ THD::THD()
   active_vio = 0;
 #endif
   pthread_mutex_init(&LOCK_thd_data, MY_MUTEX_INIT_FAST);
+  pthread_mutex_init(&LOCK_commit_ordered, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&COND_commit_ordered, 0);
 
   /* Variables with default values */
   proc_info="login";
@@ -3773,7 +3775,6 @@ int THD::binlog_flush_pending_rows_event
     if (stmt_end)
     {
       pending->set_flags(Rows_log_event::STMT_END_F);
-      pending->flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
       binlog_table_maps= 0;
     }
 
@@ -3901,7 +3902,6 @@ int THD::binlog_query(THD::enum_binlog_q
     {
       Query_log_event qinfo(this, query_arg, query_len, is_trans, suppress_use,
                             errcode);
-      qinfo.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
       /*
         Binlog table maps will be irrelevant after a Query_log_event
         (they are just removed on the slave side) so after the query

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2010-03-30 12:36:49 +0000
+++ b/sql/sql_class.h	2010-05-26 08:13:32 +0000
@@ -1438,6 +1438,21 @@ public:
   /* container for handler's private per-connection data */
   Ha_data ha_data[MAX_HA];
 
+  /* Mutex and condition for waking up threads after group commit. */
+  pthread_mutex_t LOCK_commit_ordered;
+  pthread_cond_t COND_commit_ordered;
+  bool group_commit_ready;
+  /* Pointer for linking THDs into queue waiting for group commit. */
+  THD *next_commit_ordered;
+  /*
+    The "all" parameter of commit(), to communicate it to the thread that
+    calls commit_ordered().
+  */
+  bool group_commit_all;
+  /* Set by TC_LOG::group_log_xid(), to return per-thd error and cookie. */
+  int xid_error;
+  int xid_cookie;
+
 #ifndef MYSQL_CLIENT
   int binlog_setup_trx_data();
 

=== modified file 'sql/sql_load.cc'
--- a/sql/sql_load.cc	2010-03-04 08:03:07 +0000
+++ b/sql/sql_load.cc	2010-05-26 08:13:32 +0000
@@ -516,7 +516,6 @@ int mysql_load(THD *thd,sql_exchange *ex
 	  else
 	  {
 	    Delete_file_log_event d(thd, db, transactional_table);
-            d.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
 	    (void) mysql_bin_log.write(&d);
 	  }
 	}
@@ -698,7 +697,6 @@ static bool write_execute_load_query_log
       (duplicates == DUP_REPLACE) ? LOAD_DUP_REPLACE :
       (ignore ? LOAD_DUP_IGNORE : LOAD_DUP_ERROR),
       transactional_table, FALSE, errcode);
-  e.flags|= LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F;
   return mysql_bin_log.write(&e);
 }
 

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2010-03-10 10:32:14 +0000
+++ b/sql/table.cc	2010-05-26 08:13:32 +0000
@@ -297,13 +297,6 @@ TABLE_SHARE *alloc_table_share(TABLE_LIS
     share->version=       refresh_version;
 
     /*
-      This constant is used to mark that no table map version has been
-      assigned.  No arithmetic is done on the value: it will be
-      overwritten with a value taken from MYSQL_BIN_LOG.
-    */
-    share->table_map_version= ~(ulonglong)0;
-
-    /*
       Since alloc_table_share() can be called without any locking (for
       example, ha_create_table... functions), we do not assign a table
       map id here.  Instead we assign a value that is not used
@@ -367,10 +360,9 @@ void init_tmp_table_share(THD *thd, TABL
   share->frm_version= 		 FRM_VER_TRUE_VARCHAR;
 
   /*
-    Temporary tables are not replicated, but we set up these fields
+    Temporary tables are not replicated, but we set up this fields
     anyway to be able to catch errors.
    */
-  share->table_map_version= ~(ulonglong)0;
   share->cached_row_logging_check= -1;
 
   /*

=== modified file 'sql/table.h'
--- a/sql/table.h	2010-02-10 19:06:24 +0000
+++ b/sql/table.h	2010-05-26 08:13:32 +0000
@@ -433,7 +433,6 @@ typedef struct st_table_share
   bool waiting_on_cond;                 /* Protection against free */
   bool deleting;                        /* going to delete this table */
   ulong table_map_id;                   /* for row-based replication */
-  ulonglong table_map_version;
 
   /*
     Cache for row-based replication table share checks that does not

=== modified file 'storage/xtradb/handler/ha_innodb.cc'
--- a/storage/xtradb/handler/ha_innodb.cc	2010-01-15 21:12:30 +0000
+++ b/storage/xtradb/handler/ha_innodb.cc	2010-05-26 08:13:32 +0000
@@ -138,8 +138,6 @@ bool check_global_access(THD *thd, ulong
 
 /** to protect innobase_open_files */
 static pthread_mutex_t innobase_share_mutex;
-/** to force correct commit order in binlog */
-static pthread_mutex_t prepare_commit_mutex;
 static ulong commit_threads = 0;
 static pthread_mutex_t commit_threads_m;
 static pthread_cond_t commit_cond;
@@ -239,6 +237,7 @@ static const char* innobase_change_buffe
 static INNOBASE_SHARE *get_share(const char *table_name);
 static void free_share(INNOBASE_SHARE *share);
 static int innobase_close_connection(handlerton *hton, THD* thd);
+static void innobase_commit_ordered(handlerton *hton, THD* thd, bool all);
 static int innobase_commit(handlerton *hton, THD* thd, bool all);
 static int innobase_rollback(handlerton *hton, THD* thd, bool all);
 static int innobase_rollback_to_savepoint(handlerton *hton, THD* thd,
@@ -1356,7 +1355,6 @@ innobase_trx_init(
 	trx_t*	trx)	/*!< in/out: InnoDB transaction handle */
 {
 	DBUG_ENTER("innobase_trx_init");
-	DBUG_ASSERT(EQ_CURRENT_THD(thd));
 	DBUG_ASSERT(thd == trx->mysql_thd);
 
 	trx->check_foreigns = !thd_test_options(
@@ -1416,8 +1414,6 @@ check_trx_exists(
 {
 	trx_t*&	trx = thd_to_trx(thd);
 
-	ut_ad(EQ_CURRENT_THD(thd));
-
 	if (trx == NULL) {
 		trx = innobase_trx_allocate(thd);
 	} else if (UNIV_UNLIKELY(trx->magic_n != TRX_MAGIC_N)) {
@@ -2024,6 +2020,7 @@ innobase_init(
         innobase_hton->savepoint_set=innobase_savepoint;
         innobase_hton->savepoint_rollback=innobase_rollback_to_savepoint;
         innobase_hton->savepoint_release=innobase_release_savepoint;
+        innobase_hton->commit_ordered=innobase_commit_ordered;
         innobase_hton->commit=innobase_commit;
         innobase_hton->rollback=innobase_rollback;
         innobase_hton->prepare=innobase_xa_prepare;
@@ -2492,7 +2489,6 @@ skip_overwrite:
 
 	innobase_open_tables = hash_create(200);
 	pthread_mutex_init(&innobase_share_mutex, MY_MUTEX_INIT_FAST);
-	pthread_mutex_init(&prepare_commit_mutex, MY_MUTEX_INIT_FAST);
 	pthread_mutex_init(&commit_threads_m, MY_MUTEX_INIT_FAST);
 	pthread_mutex_init(&commit_cond_m, MY_MUTEX_INIT_FAST);
 	pthread_mutex_init(&analyze_mutex, MY_MUTEX_INIT_FAST);
@@ -2547,7 +2543,6 @@ innobase_end(
 		my_free(internal_innobase_data_file_path,
 						MYF(MY_ALLOW_ZERO_PTR));
 		pthread_mutex_destroy(&innobase_share_mutex);
-		pthread_mutex_destroy(&prepare_commit_mutex);
 		pthread_mutex_destroy(&commit_threads_m);
 		pthread_mutex_destroy(&commit_cond_m);
 		pthread_mutex_destroy(&analyze_mutex);
@@ -2681,6 +2676,101 @@ innobase_start_trx_and_assign_read_view(
 }
 
 /*****************************************************************//**
+Perform the first, fast part of InnoDB commit.
+
+Doing it in this call ensures that we get the same commit order here
+as in binlog and any other participating transactional storage engines.
+
+Note that we want to do as little as really needed here, as we run
+under a global mutex. The expensive fsync() is done later, in
+innobase_commit(), without a lock so group commit can take place.
+
+Note also that this method can be called from a different thread than
+the one handling the rest of the transaction. */
+static
+void
+innobase_commit_ordered(
+/*============*/
+	handlerton *hton, /*!< in: Innodb handlerton */
+	THD*	thd,	/*!< in: MySQL thread handle of the user for whom
+			the transaction should be committed */
+	bool	all)	/*!< in:	TRUE - commit transaction
+				FALSE - the current SQL statement ended */
+{
+	trx_t*		trx;
+	DBUG_ENTER("innobase_commit_ordered");
+	DBUG_ASSERT(hton == innodb_hton_ptr);
+
+	trx = check_trx_exists(thd);
+
+	if (trx->active_trans == 0
+		&& trx->conc_state != TRX_NOT_STARTED) {
+		/* We throw an error here; instead we will catch this error
+		again in innobase_commit() and report it from there. */
+		DBUG_VOID_RETURN;
+	}
+	/* Since we will reserve the kernel mutex, we have to release
+	the search system latch first to obey the latching order. */
+
+	if (trx->has_search_latch) {
+		trx_search_latch_release_if_reserved(trx);
+	}
+
+	/* commit_ordered is only called when committing the whole transaction
+	(or an SQL statement when autocommit is on). */
+	DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
+
+	/* We need current binlog position for ibbackup to work.
+	Note, the position is current because commit_ordered is guaranteed
+	to be called in same sequenece as writing to binlog. */
+
+retry:
+	if (innobase_commit_concurrency > 0) {
+		pthread_mutex_lock(&commit_cond_m);
+		commit_threads++;
+
+		if (commit_threads > innobase_commit_concurrency) {
+			commit_threads--;
+			pthread_cond_wait(&commit_cond,
+					  &commit_cond_m);
+			pthread_mutex_unlock(&commit_cond_m);
+			goto retry;
+		}
+		else {
+			pthread_mutex_unlock(&commit_cond_m);
+		}
+	}
+
+	/* The following calls to read the MySQL binary log
+	   file name and the position return consistent results:
+	   1) We use commit_ordered() to get same commit order
+	   in InnoDB as in binary log.
+	   2) A MySQL log file rotation cannot happen because
+	   MySQL protects against this by having a counter of
+	   transactions in prepared state and it only allows
+	   a rotation when the counter drops to zero. See
+	   LOCK_prep_xids and COND_prep_xids in log.cc. */
+	trx->mysql_log_file_name = mysql_bin_log_file_name();
+	trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos();
+
+	/* Don't do write + flush right now. For group commit
+	   to work we want to do the flush in the innobase_commit()
+	   method, which runs without holding any locks. */
+	trx->flush_log_later = TRUE;
+	innobase_commit_low(trx);
+	trx->flush_log_later = FALSE;
+
+	if (innobase_commit_concurrency > 0) {
+		pthread_mutex_lock(&commit_cond_m);
+		commit_threads--;
+		pthread_cond_signal(&commit_cond);
+		pthread_mutex_unlock(&commit_cond_m);
+	}
+
+	DBUG_VOID_RETURN;
+}
+
+/*****************************************************************//**
 Commits a transaction in an InnoDB database or marks an SQL statement
 ended.
 @return	0 */
@@ -2702,13 +2792,6 @@ innobase_commit(
 
 	trx = check_trx_exists(thd);
 
-	/* Since we will reserve the kernel mutex, we have to release
-	the search system latch first to obey the latching order. */
-
-	if (trx->has_search_latch) {
-		trx_search_latch_release_if_reserved(trx);
-	}
-
 	/* The flag trx->active_trans is set to 1 in
 
 	1. ::external_lock(),
@@ -2736,62 +2819,8 @@ innobase_commit(
 		/* We were instructed to commit the whole transaction, or
 		this is an SQL statement end and autocommit is on */
 
-		/* We need current binlog position for ibbackup to work.
-		Note, the position is current because of
-		prepare_commit_mutex */
-retry:
-		if (innobase_commit_concurrency > 0) {
-			pthread_mutex_lock(&commit_cond_m);
-			commit_threads++;
-
-			if (commit_threads > innobase_commit_concurrency) {
-				commit_threads--;
-				pthread_cond_wait(&commit_cond,
-					&commit_cond_m);
-				pthread_mutex_unlock(&commit_cond_m);
-				goto retry;
-			}
-			else {
-				pthread_mutex_unlock(&commit_cond_m);
-			}
-		}
-
-		/* The following calls to read the MySQL binary log
-		file name and the position return consistent results:
-		1) Other InnoDB transactions cannot intervene between
-		these calls as we are holding prepare_commit_mutex.
-		2) Binary logging of other engines is not relevant
-		to InnoDB as all InnoDB requires is that committing
-		InnoDB transactions appear in the same order in the
-		MySQL binary log as they appear in InnoDB logs.
-		3) A MySQL log file rotation cannot happen because
-		MySQL protects against this by having a counter of
-		transactions in prepared state and it only allows
-		a rotation when the counter drops to zero. See
-		LOCK_prep_xids and COND_prep_xids in log.cc. */
-		trx->mysql_log_file_name = mysql_bin_log_file_name();
-		trx->mysql_log_offset = (ib_int64_t) mysql_bin_log_file_pos();
-
-		/* Don't do write + flush right now. For group commit
-		to work we want to do the flush after releasing the
-		prepare_commit_mutex. */
-		trx->flush_log_later = TRUE;
-		innobase_commit_low(trx);
-		trx->flush_log_later = FALSE;
-
-		if (innobase_commit_concurrency > 0) {
-			pthread_mutex_lock(&commit_cond_m);
-			commit_threads--;
-			pthread_cond_signal(&commit_cond);
-			pthread_mutex_unlock(&commit_cond_m);
-		}
-
-		if (trx->active_trans == 2) {
-
-			pthread_mutex_unlock(&prepare_commit_mutex);
-		}
-
-		/* Now do a write + flush of logs. */
+		/* We did the first part already in innobase_commit_ordered(),
+		Now finish by doing a write + flush of logs. */
 		trx_commit_complete_for_mysql(trx);
 		trx->active_trans = 0;
 
@@ -4621,6 +4650,7 @@ no_commit:
 			no need to re-acquire locks on it. */
 
 			/* Altering to InnoDB format */
+			innobase_commit_ordered(ht, user_thd, 1);
 			innobase_commit(ht, user_thd, 1);
 			/* Note that this transaction is still active. */
 			prebuilt->trx->active_trans = 1;
@@ -4637,6 +4667,7 @@ no_commit:
 
 			/* Commit the transaction.  This will release the table
 			locks, so they have to be acquired again. */
+			innobase_commit_ordered(ht, user_thd, 1);
 			innobase_commit(ht, user_thd, 1);
 			/* Note that this transaction is still active. */
 			prebuilt->trx->active_trans = 1;
@@ -8339,6 +8370,7 @@ ha_innobase::external_lock(
 
 		if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
 			if (trx->active_trans != 0) {
+				innobase_commit_ordered(ht, thd, TRUE);
 				innobase_commit(ht, thd, TRUE);
 			}
 		} else {
@@ -9448,36 +9480,6 @@ innobase_xa_prepare(
 
 	srv_active_wake_master_thread();
 
-	if (thd_sql_command(thd) != SQLCOM_XA_PREPARE &&
-	    (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
-	{
-		if (srv_enable_unsafe_group_commit && !THDVAR(thd, support_xa)) {
-			/* choose group commit rather than binlog order */
-			return(error);
-		}
-
-		/* For ibbackup to work the order of transactions in binlog
-		and InnoDB must be the same. Consider the situation
-
-		  thread1> prepare; write to binlog; ...
-			  <context switch>
-		  thread2> prepare; write to binlog; commit
-		  thread1>			     ... commit
-
-		To ensure this will not happen we're taking the mutex on
-		prepare, and releasing it on commit.
-
-		Note: only do it for normal commits, done via ha_commit_trans.
-		If 2pc protocol is executed by external transaction
-		coordinator, it will be just a regular MySQL client
-		executing XA PREPARE and XA COMMIT commands.
-		In this case we cannot know how many minutes or hours
-		will be between XA PREPARE and XA COMMIT, and we don't want
-		to block for undefined period of time. */
-		pthread_mutex_lock(&prepare_commit_mutex);
-		trx->active_trans = 2;
-	}
-
 	return(error);
 }
 
@@ -10669,11 +10671,6 @@ static MYSQL_SYSVAR_ENUM(adaptive_checkp
   "Enable/Disable flushing along modified age. (none, reflex, [estimate])",
   NULL, innodb_adaptive_checkpoint_update, 2, &adaptive_checkpoint_typelib);
 
-static MYSQL_SYSVAR_ULONG(enable_unsafe_group_commit, srv_enable_unsafe_group_commit,
-  PLUGIN_VAR_RQCMDARG,
-  "Enable/Disable unsafe group commit when support_xa=OFF and use with binlog or other XA storage engine.",
-  NULL, NULL, 0, 0, 1, 0);
-
 static MYSQL_SYSVAR_ULONG(expand_import, srv_expand_import,
   PLUGIN_VAR_RQCMDARG,
   "Enable/Disable converting automatically *.ibd files when import tablespace.",
@@ -10763,7 +10760,6 @@ static struct st_mysql_sys_var* innobase
   MYSQL_SYSVAR(flush_neighbor_pages),
   MYSQL_SYSVAR(read_ahead),
   MYSQL_SYSVAR(adaptive_checkpoint),
-  MYSQL_SYSVAR(enable_unsafe_group_commit),
   MYSQL_SYSVAR(expand_import),
   MYSQL_SYSVAR(extra_rsegments),
   MYSQL_SYSVAR(dict_size_limit),