← Back to team overview

maria-developers team mailing list archive

Review of MDEV-4506, parallel replication, part 1

 

Hi!

Here is the first part of the review.

I have 1/2 of one file left (rpl_parallel.cc) to review but I wanted
you to have a chance to read what I have done so far.

I plan to finish the review tomorrow and start working on the code
during the weekend.

>>>>> "Kristian" == Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:

Kristian> Hi Monty,
Kristian> Here is a copy of the status mail I sent earlier. The status is mostly still
Kristian> current.

Kristian> The code is in this tree:

Kristian>     lp:~maria-captains/maria/10.0-knielsen

Kristian> In that tree, you can extract the parallel replication code as a single patch
Kristian> with a command like this:

Kristian>     bzr diff -rrevid:knielsen@xxxxxxxxxxxxxxx-20130608103621-g91eaielv4n22aha

Kristian> In the forwarded mail, I describe getting a 4.5 times speedup on a quad-core
Kristian> laptop, so that's quite promising.


=== modified file 'mysql-test/r/mysqld--help.result'
--- mysql-test/r/mysqld--help.result	2013-05-28 11:28:31 +0000
+++ mysql-test/r/mysqld--help.result	2013-08-29 08:18:22 +0000
<cut>
@@ -783,6 +794,11 @@
  --slave-net-timeout=# 
  Number of seconds to wait for more data from any
  master/slave connection before aborting the read
+ --slave-parallel-threads=# 
+ If non-zero, number of threads to spawn to apply in
+ parallel events on the slave that were group-committed on
+ the master or were logged with GTID in different
+ replication domains.

What happens if --slave-parallel-threads is 0 ?
Does this mean that there will be no parallel threads?
Is this the same as 1 ?

If same as 1, why not have the option to take values starting from 1 ?

IRC:

K> yes, if --slave-parallel-threads is 0, then the feature is disabled. The old code is used, and the SQL thread applies the events itself
K> if --slave-parallel-threads is > 0, then a pool of threads are spawned. The SQL threads do not apply events themselves, they just put it in queues for the parallel threads to apply
M> do we ever need to do this with only one thread?
K> (we might want to give an error if user tries to do this, as it does not make much sense)

M> So why not have the value from 1 to N
M> where 1 is old code and > 1 is using new code ?

K> That might make sense

=== modified file 'mysql-test/suite/perfschema/r/dml_setup_instruments.result'
--- mysql-test/suite/perfschema/r/dml_setup_instruments.result	2013-03-26 09:35:34 +0000
+++ mysql-test/suite/perfschema/r/dml_setup_instruments.result	2013-08-29 08:18:22 +0000
@@ -38,14 +38,14 @@ order by name limit 10;
 NAME	ENABLED	TIMED
 wait/synch/cond/sql/COND_flush_thread_cache	YES	YES
 wait/synch/cond/sql/COND_manager	YES	YES
+wait/synch/cond/sql/COND_parallel_entry	YES	YES
+wait/synch/cond/sql/COND_prepare_ordered	YES	YES
 wait/synch/cond/sql/COND_queue_state	YES	YES
 wait/synch/cond/sql/COND_rpl_status	YES	YES
+wait/synch/cond/sql/COND_rpl_thread	YES	YES
+wait/synch/cond/sql/COND_rpl_thread_pool	YES	YES
 wait/synch/cond/sql/COND_server_started	YES	YES
 wait/synch/cond/sql/COND_thread_cache	YES	YES
-wait/synch/cond/sql/COND_thread_count	YES	YES
-wait/synch/cond/sql/Delayed_insert::cond	YES	YES
-wait/synch/cond/sql/Delayed_insert::cond_client	YES	YES
-wait/synch/cond/sql/Event_scheduler::COND_state	YES	YES

Any idea why the above was deleted?
Don't see anything in your patch that could affect that.

<cut>

=== added file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- mysql-test/suite/rpl/t/rpl_parallel.test	1970-01-01 00:00:00 +0000
+++ mysql-test/suite/rpl/t/rpl_parallel.test	2013-08-29 08:18:22 +0000

I didn't find any test results for rpl_parallel.test or rpl_paralell2 in
your tree. Did you forget to commit these?

I was also missing some comment in these files what they was testing.

=== modified file 'sql/log.cc'
--- sql/log.cc	2013-06-06 15:51:28 +0000
+++ sql/log.cc	2013-08-29 08:18:22 +0000

@@ -6541,44 +6543,201 @@ MYSQL_BIN_LOG::write_transaction_to_binl
 }
 
Could you please document in detail the arguments and return value
for the following function.
- I spent 3 hours to try to understand this function in detail and having
  a full description of the arguments would have helped a bit.

 bool
-MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
+                                      wait_for_commit *wfc)
 {
+  group_commit_entry *orig_queue;
+  wait_for_commit *list, *cur, *last;
+
   /*
     To facilitate group commit for the binlog, we first queue up ourselves in
     the group commit queue. Then the first thread to enter the queue waits for
     the LOCK_log mutex, and commits for everyone in the queue once it gets the
     lock. Any other threads in the queue just wait for the first one to finish
     the commit and wake them up.

Do you still use LOCK_log for this?
It's a bit confusing to mention LOCK_log here when you take the
LOCK_prepare_ordered in the function.

If LOCK_log is required, shouldn't there be an assert for it in this function?

+
+    To support in-order parallel replication with group commit, after we add
+    some transaction to the queue, we check if there were other transactions
+    already prepared to commit but just waiting for the first one to commit.
+    If so, we add those to the queue as well, transitively for all waiters.
   */
 
   entry->thd->clear_wakeup_ready();
   mysql_mutex_lock(&LOCK_prepare_ordered);

<cut>

+  orig_queue= group_commit_queue;
+
+  /*
+    Iteratively process everything added to the queue, looking for waiters,
+    and their waiters, and so on. If a waiter is ready to commit, we
+    immediately add it to the queue; if not we just wake it up.
+
+    This would be natural to do with recursion, but we want to avoid
+    potentially unbounded recursion blowing the C stack, so we use the list
+    approach instead.

Add here:

  cur->next_subsequent_commit is at this point an empty list. 'last' will
  always point to the last element of the list.  We use this list
  to simulate recursion.

I did find it confusing that here you threat next_subsequent_commit as
a way to avoid recursion, but you use this list for other things in
register_wait_for_prior_commit().

Suggestion:
- If you want to reuse memory for the list, please use an union over
  next_subsequent_commit and have two names for it, so that you don't
  in the code use the same name for two different list.

+  */
+  list= wfc;
+  cur= list;
+  last= list;
+  for (;;)
+  {
+    /* Add the entry to the group commit queue. */
+    entry->next= group_commit_queue;
+    group_commit_queue= entry;
+
+    if (entry->cache_mngr->using_xa)
+    {
+      DEBUG_SYNC(entry->thd, "commit_before_prepare_ordered");
+      run_prepare_ordered(entry->thd, entry->all);
+      DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered");
+    }
+
+    if (!cur)
+      break;             // Can happen if initial entry has no wait_for_commit
+

Add a comment what the following if is testing
(As there is several lists involved, the code is not that easy to understand)

+    if (cur->subsequent_commits_list)
+    {
+      bool have_lock;
+      wait_for_commit *waiter;
+
+      mysql_mutex_lock(&cur->LOCK_wait_commit);
+      have_lock= true;

+      waiter= cur->subsequent_commits_list;
+      /* Check again, now safely under lock. */
+      if (waiter)
+      {
+        /* Grab the list of waiters and process it. */
+        cur->subsequent_commits_list= NULL;
+        do
+        {
+          wait_for_commit *next= waiter->next_subsequent_commit;
+          group_commit_entry *entry2=
+            (group_commit_entry *)waiter->opaque_pointer;
+          if (entry2)
+          {
+            /*
+              This is another transaction ready to be written to the binary
+              log. We can put it into the queue directly, without needing a
+              separate context switch to the other thread. We just set a flag
+              so that the other thread will know when it wakes up that it was
+              already processed.
+
+              So put it at the end of the list to be processed in a subsequent
+              iteration of the outer loop.
+            */
+            entry2->queued_by_other= true;
+            last->next_subsequent_commit= waiter;
+            last= waiter;
+            /*
+              As a small optimisation, we do not actually need to set
+              waiter->next_subsequent_commit to NULL, as we can use the
+              pointer `last' to check for end-of-list.
+            */
+          }
+          else
+          {
+            /*
+              Wake up the waiting transaction.
+
+              For this, we need to set the "wakeup running" flag and release
+              the waitee lock to avoid a deadlock, see comments on
+              THD::wakeup_subsequent_commits2() for details.
+            */
+            if (have_lock)
+            {
+              cur->wakeup_subsequent_commits_running= true;
+              mysql_mutex_unlock(&cur->LOCK_wait_commit);
+              have_lock= false;

Move have_lock= false to just after the 'if (have_lock)'
- Easier to see that you are really reseting the variable.
- Faster as there is no need to reload the variable from stack after
  mysql_mutex_unlock() function.
- I try to nowadays in my code to always reset flag variables just after
  testing them to achive the above.

+            }
+            waiter->wakeup();
+          }
+          waiter= next;
+        } while (waiter);
+      }

You can optimze away one row, one 'if level' and one 'if' away by doing this:

Move:  'cur->subsequent_commits_list= NULL;' before the if.
Change 'if (waiter)' to 'while (waiter)'
Remove 'do' and the corresponding 'while (waiter);'

+      if (have_lock)
+        mysql_mutex_unlock(&cur->LOCK_wait_commit);
+    }

+    if (cur == last)
+      break;

Add a comment here of what is in next_subsequent_commit list.

+    cur= cur->next_subsequent_commit;
+    entry= (group_commit_entry *)cur->opaque_pointer;
+    DBUG_ASSERT(entry != NULL);
   }
+
+  /* Now we need to clear the wakeup_subsequent_commits_running flags. */
+  if (list)
+  {
+    for (;;)
+    {
+      if (list->wakeup_subsequent_commits_running)
+      {
+        mysql_mutex_lock(&list->LOCK_wait_commit);
+        list->wakeup_subsequent_commits_running= false;
+        mysql_mutex_unlock(&list->LOCK_wait_commit);

Why do we need the mutex above?
The only place where we test this variable, as far as I can find, is in
wait_for_commit::register_wait_for_prior_commit()

This thread doesn't know if the other thread is just before
mysql_mutex_lock(&waitee->LOCK_wait_commit), inside the lock or after
the lock.  So when we reset the variable should not matter.

But somehow this sounds strange. What happens if the above variable is
reset just before mysql_mutex_lock(&waitee->LOCK_wait_commit) in
wait_for_commit::register_wait_for_prior_commit() ?
Isn't the waitee added to the list when it shouldn't?

+      }
+      if (list == last)
+        break;
+      list= list->next_subsequent_commit;
+    }
+  }
+
+  if (opt_binlog_commit_wait_count > 0)
+    mysql_cond_signal(&COND_prepare_ordered);
   mysql_mutex_unlock(&LOCK_prepare_ordered);
   DEBUG_SYNC(entry->thd, "commit_after_release_LOCK_prepare_ordered");
 
+  return orig_queue == NULL;
+}
+
+bool
+MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
+{
+  wait_for_commit *wfc;
+  bool is_leader;
+
+  wfc= entry->thd->wait_for_commit_ptr;
+  entry->queued_by_other= false;
+  if (wfc && wfc->waiting_for_commit)
+  {

Add comment, something like:

/* We have to wait for the threads in wfc->waiting_for_commit */

+    mysql_mutex_lock(&wfc->LOCK_wait_commit);
+    /* Do an extra check here, this time safely under lock. */
+    if (wfc->waiting_for_commit)
+    {
+      wfc->opaque_pointer= entry;
+      do
+      {
+        mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+      } while (wfc->waiting_for_commit);
+      wfc->opaque_pointer= NULL;
+    }
+    mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+  }

<cut>

@@ -6597,7 +6756,10 @@ MYSQL_BIN_LOG::write_transaction_to_binl
 
     if (next)
     {
-      next->thd->signal_wakeup_ready();
+      if (next->queued_by_other)
+        next->thd->wait_for_commit_ptr->wakeup();

What does the above code mean ? (add a comment).
Why do we wakeup the thread we where waiting for instead of ourselves?

+      else
+        next->thd->signal_wakeup_ready();
     }
     else
     {

<cut>

@@ -6968,6 +7140,48 @@ MYSQL_BIN_LOG::write_transaction_or_stmt
   return 0;
 }
 
+
+void
+MYSQL_BIN_LOG::wait_for_sufficient_commits()
+{
+  size_t count;
+  group_commit_entry *e;
+  group_commit_entry *last_head;
+  struct timespec wait_until;
+
+  mysql_mutex_assert_owner(&LOCK_log);
+  mysql_mutex_assert_owner(&LOCK_prepare_ordered);
+
+  count= 0;
+  for (e= last_head= group_commit_queue; e; e= e->next)
+    ++count;
+  if (count >= opt_binlog_commit_wait_count)
+    return;

Assuming that the queue is bigger may be better:

for (e= last_head= group_commit_queue, count=0; e ; e= e->next)
  if (++count >= opt_binlog_commit_wait_count)
     return;

+  mysql_mutex_unlock(&LOCK_log);

Here you are going to take LOCK_log and LOCK_prepare_ordered in the
wrong order.  safe_mutex() should have noticed this.

In other code you are first taking LOCK_log and then
LOCK_prepare_ordered.  If someone else is calling
trx_group_commit_leader() while you are in this code you would get a
mutex deadlock.

+  set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
+
+  for (;;)
+  {
+    int err;
+    group_commit_entry *head;
+
+    err= mysql_cond_timedwait(&COND_prepare_ordered, &LOCK_prepare_ordered,
+                              &wait_until);
+    if (err == ETIMEDOUT)
+      break;
+    head= group_commit_queue;
+    for (e= head; e && e != last_head; e= e->next)
+      ++count;
+    if (count >= opt_binlog_commit_wait_count)
+      break;
+    last_head= head;
+  }
+
+  mysql_mutex_lock(&LOCK_log);
+}

<cut>

=== modified file 'sql/log_event.cc'
--- sql/log_event.cc	2013-06-06 15:51:28 +0000
+++ sql/log_event.cc	2013-08-29 08:18:22 +0000
<cut>
@@ -6101,6 +6110,18 @@ Gtid_log_event::Gtid_log_event(const cha
   domain_id= uint4korr(buf);
   buf+= 4;
   flags2= *buf;
+  if (flags2 & FL_GROUP_COMMIT_ID)
+  {
+    if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
+    {
+      seq_no= 0;                                // So is_valid() returns false
+      return;
+    }
+    ++buf;
+    commit_id= uint8korr(buf);
+  }
+  else
+    commit_id= 0;
 }
 
Setting commit_id=0 first instead in the if will generate smaller and
faster code (setting a variable is faster than an a jmp).

=== modified file 'sql/log_event.h'
--- sql/log_event.h	2013-06-06 15:51:28 +0000
+++ sql/log_event.h	2013-08-29 08:18:22 +0000
@@ -1317,9 +1317,9 @@ class Log_event
 
      @see do_apply_event
    */
-  int apply_event(Relay_log_info const *rli)
+  int apply_event(struct rpl_group_info *rgi)

What is the simple rule to know when to use Relay_log_info and when to
use rpl_group_info ?

If I understand things correct, relay_log_info should only be used for
direct manipulation of data in the relay log.

By the way, you should probably change 'struct rpl_group_info' to just
rpl_group_info; Now you are mixing both versions in the code.


=== modified file 'sql/log_event_old.cc'
--- sql/log_event_old.cc	2013-02-19 10:45:29 +0000
+++ sql/log_event_old.cc	2013-08-29 08:18:22 +0000
@@ -67,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_r
     do_apply_event(). We still check here to prevent future coding
     errors.
   */
-  DBUG_ASSERT(rli->sql_thd == ev_thd);
+  DBUG_ASSERT(rgi->thd == ev_thd);

Should we change all test for rli->sql_thd to rgi->thd ?
You have at least this test in log_event.c and slave.cc.

I assume that rli should not have a sql_thd member anymore ?

<cut> 

=== added file 'sql/rpl_parallel.cc'
--- sql/rpl_parallel.cc	1970-01-01 00:00:00 +0000
+++ sql/rpl_parallel.cc	2013-08-29 08:18:22 +0000
@@ -0,0 +1,699 @@
+#include "my_global.h"
+#include "rpl_parallel.h"
+#include "slave.h"
+#include "rpl_mi.h"
+
+
+/*
+  Code for optional parallel execution of replicated events on the slave.
+
+  ToDo list:
+
+   - Review every field in Relay_log_info, and all code that accesses it.
+     Split out the necessary parts into rpl_group_info, to avoid conflicts
+     between parallel execution of events. (Such as deferred events ...)
+
+   - Error handling. If we fail in one of multiple parallel executions, we
+     need to make a best effort to complete prior transactions and roll back
+     following transactions, so slave binlog position will be correct.
+     And all the retry logic for temporary errors like deadlock.
+
+   - Stopping the slave needs to handle stopping all parallel executions. And
+     the logic in sql_slave_killed() that waits for current event group to
+     complete needs to be extended appropriately...
+
+   - Audit the use of Relay_log_info::data_lock. Make sure it is held
+     correctly in all needed places also when using parallel replication.
+
+   - We need some user-configurable limit on how far ahead the SQL thread will
+     fetch and queue events for parallel execution (otherwise if slave gets
+     behind we will fill up memory with pending malloc()'ed events).
+
+   - Fix update of relay-log.info and master.info. In non-GTID replication,
+     they must be serialised to preserve correctness. In GTID replication, we
+     should not update them at all except at slave thread stop.
+
+   - All the waits (eg. in struct wait_for_commit and in
+     rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
+     everything needs to be correctly rolled back and stopped in all threads,
+     to ensure a consistent slave replication state.

I can add kill checking in all functions that waits for a mutex.
Will do it with adding enter_cond()/exit_cond() in all functions
waiting for mysql_cond_wait().

+
+   - Handle the case of a partial event group. This occurs when the master
+     crashes in the middle of writing the event group to the binlog. The
+     slave rolls back the transaction; parallel execution needs to be able
+     to deal with this wrt. commit_orderer and such.
+
+   - Relay_log_info::is_in_group(). This needs to be handled correctly in all
+     callers. I think it needs to be split into two, one version in
+     Relay_log_info to be used from next_event() in slave.cc, one to be used in
+     per-transaction stuff.
+
+   - We should fail if we connect to the master with opt_slave_parallel_threads
+     greater than zero and master does not support GTID. Just to avoid a bunch
+     of potential problems, we won't be able to do any parallel replication
+     in this case anyway.

If the master doesn't support GTID, I think we should instead of
stopping, just issue a warning in the log and continue with normal
replication.

Otherwise we would have problems when using multi-source replication
if a single slave doesn't support GTID.

+*/
+
+struct rpl_parallel_thread_pool global_rpl_thread_pool;
+
+
+static void
+rpt_handle_event(rpl_parallel_thread::queued_event *qev,
+                 struct rpl_parallel_thread *rpt)
+{
+  int err;
+  struct rpl_group_info *rgi= qev->rgi;
+  Relay_log_info *rli= rgi->rli;
+  THD *thd= rgi->thd;
+
+  thd->rli_slave= rli;
+  thd->rpl_filter = rli->mi->rpl_filter;
+  /* ToDo: Get rid of rli->group_info, it is not thread safe. */
+  rli->group_info= rgi;
+
+  /* ToDo: Access to thd, and what about rli, split out a parallel part? */
+  mysql_mutex_lock(&rli->data_lock);
+  err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);

We should also get rid of the thd argument and change the function to
be a bool so that we can return an error.

+  /* ToDo: error handling. */
+}
+
+
+pthread_handler_t
+handle_rpl_parallel_thread(void *arg)
+{

<cut>

+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->thd= thd;
+
+  while (rpt->delay_start)
+    mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);

A much easier way to achive the wait is to have the caller lock
mysql_mutex_lock(&rpt->LOCK_rpl_thread) and then just release it
when it's time for this thread to continue.
- No need for signaling or checking if the thread got the signal
- No need to have a cond wait here
- No need to have a delay_start flag

+  rpt->running= true;
+
+  while (!rpt->stop && !thd->killed)
+  {
+    rpl_parallel_thread *list;
+
+    old_msg= thd->proc_info;
+    thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
+                    "Waiting for work from SQL thread");
+    while (!rpt->stop && !thd->killed && !(events= rpt->event_queue))
+      mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);

Add comment:  /* Mark that this thread is now executing */

+    rpt->free= false;
+    rpt->event_queue= rpt->last_in_queue= NULL;
+    thd->exit_cond(old_msg);
+

Note that events may be undefined here if thd->killed was set above
(for the first loop)

It would be good to check for thd->killed or rpt->stop here and then abort
the loop.

Another question: do we really need rpt->stop ?
Isn't it enough with using the thd->killed for this ?

Otherwise we would need to check for both rpt->top and thd->killed in
every loop that may be aborted.

+  more_events:
+    while (events)
+    {
+      struct rpl_parallel_thread::queued_event *next= events->next;
+      Log_event_type event_type= events->ev->get_type_code();
+      rpl_group_info *rgi= events->rgi;
+      rpl_parallel_entry *entry= rgi->parallel_entry;
+      uint64 wait_for_sub_id;
+      uint64 wait_start_sub_id;
+      bool end_of_group;
+

Add a comment why this is handled here and not in rpt_handle_event()

+      if (event_type == GTID_EVENT)
+      {
+        in_event_group= true;

Add a comment what the following variable stands for

+        group_standalone=
+          (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
+                 Gtid_log_event::FL_STANDALONE));
+

Rest of this file will be reviewed tomorrow.

<cut>

=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc	2013-06-06 15:51:28 +0000
+++ sql/rpl_rli.cc	2013-08-29 08:18:22 +0000
@@ -1194,13 +1194,15 @@ bool Relay_log_info::cached_charset_comp
 
 
 void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
-                               time_t event_creation_time, THD *thd)
+                               time_t event_creation_time, THD *thd,
+                               struct rpl_group_info *rgi)

Do we need thd anymore as this is in rgi->thd?

If they are not the same, please add a comment about this. It's not otherwise
clear when to use rgi->thd and thd as this function uses both.

@@ -1265,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
 
-  DBUG_ASSERT(sql_thd == thd);
+  DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);

Add a comment which THD is passed to the function.

<cut>

+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+  uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
+  if (!sub_id)
+  {

Add a comment that this is an end of memory error.

<cut>
+
+void
+delete_or_keep_event_post_apply(Relay_log_info *rli,
+                                Log_event_type typ, Log_event *ev)
+{
+  /*
+    ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
+    thread-safe for parallel replication.
+  */
+  switch (typ) {
+  case FORMAT_DESCRIPTION_EVENT:
+    /*
+      Format_description_log_event should not be deleted because it
+      will be used to read info about the relay log's format;
+      it will be deleted when the SQL thread does not need it,
+      i.e. when this thread terminates.
+    */
+    break;
+  case ANNOTATE_ROWS_EVENT:
+    /*
+      Annotate_rows event should not be deleted because after it has
+      been applied, thd->query points to the string inside this event.
+      The thd->query will be used to generate new Annotate_rows event
+      during applying the subsequent Rows events.
+    */
+    rli->set_annotate_event((Annotate_rows_log_event*) ev);

I assume we should create relay_group_info->set_annotate_event instead of the
above.

+    break;
+  case DELETE_ROWS_EVENT:
+  case UPDATE_ROWS_EVENT:
+  case WRITE_ROWS_EVENT:
+    /*
+      After the last Rows event has been applied, the saved Annotate_rows
+      event (if any) is not needed anymore and can be deleted.
+    */
+    if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+      rli->free_annotate_event();
+    /* fall through */
+  default:
+    DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+    if (!rli->is_deferred_event(ev))
+      delete ev;

I assume that is_deferred_event should also be moved to relay_group_info.

+    break;
+  }
+}
+
 #endif

=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h	2013-06-06 15:51:28 +0000
+++ sql/rpl_rli.h	2013-08-29 08:18:22 +0000
@@ -22,6 +22,7 @@
 #include "log.h"                         /* LOG_INFO, MYSQL_BIN_LOG */
 #include "sql_class.h"                   /* THD */
 #include "log_event.h"
+#include "rpl_parallel.h"
 
 struct RPL_TABLE_LIST;
 class Master_info;
@@ -52,6 +53,8 @@ class Master_info;
 
 *****************************************************************************/
 
+struct rpl_group_info;
+
 class Relay_log_info : public Slave_reporting_capability
 {
 public:
@@ -311,13 +314,9 @@ class Relay_log_info : public Slave_repo
   char slave_patternload_file[FN_REFLEN]; 
   size_t slave_patternload_file_size;  
 
-  /*
-    Current GTID being processed.
-    The sub_id gives the binlog order within one domain_id. A zero sub_id
-    means that there is no active GTID.
-  */
-  uint64 gtid_sub_id;
-  rpl_gtid current_gtid;
+  /* ToDo: We need to remove this, always use the per-transaction one to work with parallel replication. */
+  struct rpl_group_info *group_info;
+  rpl_parallel parallel;
 
   Relay_log_info(bool is_slave_recovery);
   ~Relay_log_info();
@@ -459,7 +458,8 @@ class Relay_log_info : public Slave_repo
     the <code>Seconds_behind_master</code> field.
   */
   void stmt_done(my_off_t event_log_pos,
-                 time_t event_creation_time, THD *thd);
+                 time_t event_creation_time, THD *thd,
+                 struct rpl_group_info *rgi);
 
 
   /**
@@ -594,6 +594,60 @@ class Relay_log_info : public Slave_repo
 };
 
 
+/*
+  This is data for various state needed to be kept for the processing of
+  one event group in the SQL thread.
+
+  For single-threaded replication it is linked from the RLI, for parallel
+  replication it is linked into each event group being executed in parallel.
+*/

Could you please rewrite the above to make it more clear.
After discussing with you, I think the following could be added:

/*
  In single-threaded replication, there will be one global rpl_group_info and
  one global RPLI per master connection.  They will be linked together.

  In parallel replication, there will be one rpl_group_info object for
  each running thd. All rpl_group_info will share the same RLI.
*/

+struct rpl_group_info
+{
+  Relay_log_info *rli;
+  THD *thd;

<cut>

=== modified file 'sql/sql_class.cc'
--- sql/sql_class.cc	2013-06-06 15:51:28 +0000
+++ sql/sql_class.cc	2013-08-29 08:18:22 +0000

+void
+wait_for_commit::wakeup()
+{
+  /*
+    We signal each waiter on their own condition and mutex (rather than using
+    pthread_cond_broadcast() or something like that).
+
+    Otherwise we would need to somehow ensure that they were done
+    waking up before we could allow this THD to be destroyed, which would
+    be annoying and unnecessary.
+  */
+  mysql_mutex_lock(&LOCK_wait_commit);
+  waiting_for_commit= false;
+  mysql_cond_signal(&COND_wait_commit);
+  mysql_mutex_unlock(&LOCK_wait_commit);
+}

In this particular case it looks safe to move the cond signal out of
the mutex. I don't see how anyone could miss the signal.

+
+
+/*
+  Register that the next commit of this THD should wait to complete until
+  commit in another THD (the waitee) has completed.
+
+  The wait may occur explicitly, with the waiter sitting in
+  wait_for_prior_commit() until the waitee calls wakeup_subsequent_commits().
+
+  Alternatively, the TC (eg. binlog) may do the commits of both waitee and
+  waiter at once during group commit, resolving both of them in the right
+  order.
+
+  Only one waitee can be registered for a waiter; it must be removed by
+  wait_for_prior_commit() or unregister_wait_for_prior_commit() before a new
+  one is registered. But it is ok for several waiters to register a wait for
+  the same waitee. It is also permissible for one THD to be both a waiter and
+  a waitee at the same time.
+*/
+void
+wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
+{
+  waiting_for_commit= true;
+  DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+  this->waitee= waitee;
+
+  mysql_mutex_lock(&waitee->LOCK_wait_commit);
+  /*
+    If waitee is in the middle of wakeup, then there is nothing to wait for,
+    so we need not register. This is necessary to avoid a race in unregister,
+    see comments on wakeup_subsequent_commits2() for details.
+  */
+  if (waitee->wakeup_subsequent_commits_running)
+    waiting_for_commit= false;
+  else
+  {

Please add a comment here what the following code is doing.
As this is two different lists involved, it's not trivial code.

You also seam to use the list next_subsequent_commit for something
different in queue_for_group_commit() code.

+    this->next_subsequent_commit= waitee->subsequent_commits_list;
+    waitee->subsequent_commits_list= this;
+  }
+  mysql_mutex_unlock(&waitee->LOCK_wait_commit);
+}

<cut>

+/*
+  Wakeup anyone waiting for us to have committed.
+
+  Note about locking:
+
+  We have a potential race or deadlock between wakeup_subsequent_commits() in
+  the waitee and unregister_wait_for_prior_commit() in the waiter.
+
+  Both waiter and waitee needs to take their own lock before it is safe to take
+  a lock on the other party - else the other party might disappear and invalid
+  memory data could be accessed. But if we take the two locks in different
+  order, we may end up in a deadlock.
+
+  The waiter needs to lock the waitee to delete itself from the list in
+  unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
+  hold its own lock while locking waiters, lest we deadlock.

lest ?

+  So we need to prevent unregister_wait_for_prior_commit() running while wakeup
+  is in progress - otherwise the unregister could complete before the wakeup,
+  leading to incorrect spurious wakeup or accessing invalid memory.
+
<cut>

+  Then also register_wait_for_prior_commit() needs to check if
+  wakeup_subsequent_commits() is running, and skip the registration if
+  so. This is needed in case a new waiter manages to register itself and
+  immediately try to unregister while wakeup_subsequent_commits() is
+  running. Else the new waiter would also wait rather than unregister, but it
+  would not be woken up until next wakeup, which could be potentially much
+  later than necessary.
+*/

Add new line here

+void
+wait_for_commit::wakeup_subsequent_commits2()
+{

<cut>

=== modified file 'sql/sql_class.h'
--- sql/sql_class.h	2013-06-06 15:51:28 +0000
+++ sql/sql_class.h	2013-08-29 08:18:22 +0000
@@ -1553,6 +1553,116 @@ class Global_read_lock
 };
 
 
+/*
+  Class to facilitate the commit of one transactions waiting for the commit of
+  another transaction to complete first.
+
+  This is used during (parallel) replication, to allow different transactions
+  to be applied in parallel, but still commit in order.
+
+  The transaction that wants to wait for a prior commit must first register
+  to wait with register_wait_for_prior_commit(waitee). Such registration
+  must be done holding the waitee->LOCK_wait_commit, to prevent the other
+  THD from disappearing during the registration.
+
+  Then during commit, if a THD is registered to wait, it will call
+  wait_for_prior_commit() as part of ha_commit_trans(). If no wait is
+  registered, or if the waitee for has already completed commit, then
+  wait_for_prior_commit() returns immediately.
+
+  And when a THD that may be waited for has completed commit (more precisely
+  commit_ordered()), then it must call wakeup_subsequent_commits() to wake
+  up any waiters. Note that this must be done at a point that is guaranteed
+  to be later than any waiters registering themselves. It is safe to call
+  wakeup_subsequent_commits() multiple times, as waiters are removed from
+  registration as part of the wakeup.
+
+  The reason for separate register and wait calls is that this allows to
+  register the wait early, at a point where the waited-for THD is known to
+  exist. And then the actual wait can be done much later, where the
+  waited-for THD may have been long gone. By registering early, the waitee
+  can signal before disappearing.
+*/

Instead of taking locks to ensure that THD will disapper, why not keep
all THD objects around until all transactions in a group has been
executed?


+struct wait_for_commit
+{

<cut>

+  bool waiting_for_commit;
+  /*
+    Flag set when wakeup_subsequent_commits_running() is active, see commonts

commonts -> comments

=== modified file 'sql/sys_vars.cc'
--- sql/sys_vars.cc	2013-06-07 07:31:11 +0000
+++ sql/sys_vars.cc	2013-08-29 08:18:22 +0000

<cut>


+static bool
+check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
+{
+  bool running;
+
+  mysql_mutex_lock(&LOCK_active_mi);
+  running= master_info_index->give_error_if_slave_running();
+  mysql_mutex_unlock(&LOCK_active_mi);
+  if (running)
+    return true;
+
+  return false;
+}
+
+static bool
+fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
+{
+  bool running;
+  bool err= false;
+
+  mysql_mutex_unlock(&LOCK_global_system_variables);
+  mysql_mutex_lock(&LOCK_active_mi);
+  running= master_info_index->give_error_if_slave_running();
+  mysql_mutex_unlock(&LOCK_active_mi);

You can call check_slave_parallel_threads() above to reuse code.

+  if (running || rpl_parallel_change_thread_count(&global_rpl_thread_pool,
+                                                  opt_slave_parallel_threads))
+    err= true;
+  mysql_mutex_lock(&LOCK_global_system_variables);
+
+  return err;
+}
+
+
+static Sys_var_ulong Sys_slave_parallel_threads(
+       "slave_parallel_threads",
+       "If non-zero, number of threads to spawn to apply in parallel events "
+       "on the slave that were group-committed on the master or were logged "
+       "with GTID in different replication domains.",
+       GLOBAL_VAR(opt_slave_parallel_threads), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
+       NOT_IN_BINLOG, ON_CHECK(check_slave_parallel_threads),
+       ON_UPDATE(fix_slave_parallel_threads));
 #endif

We have talked about having 1 to be the minimum, which would give the
old behavior.

<cut>

Overal things looks very good and it should not be too much work to
add the things you have written on the todo part.a

Regards
Monty


Follow ups