← Back to team overview

maria-developers team mailing list archive

Review of MDEV-4506, parallel replication, part 2

 

Hi!

Part 2 of review of parallel replication

=== added file 'sql/rpl_parallel.cc'

<cut>

+pthread_handler_t
+handle_rpl_parallel_thread(void *arg)
+{

<cut>

+      if (end_of_group)
+      {
+        in_event_group= false;
+

Add comment:

/*
  All events for this group has now been executed and logged (but not
  necessaerly synced).
  Inform the other event groups that are waiting for this thread that
  we are done.
*/

+        rgi->commit_orderer.unregister_wait_for_prior_commit();
+        thd->wait_for_commit_ptr= NULL;
+
+        /*
+          Record that we have finished, so other event groups will no
+          longer attempt to wait for us to commit.
+
+          We can race here with the next transactions, but that is fine, as
+          long as we check that we do not decrease last_committed_sub_id. If
+          this commit is done, then any prior commits will also have been
+          done and also no longer need waiting for.
+        */
+        mysql_mutex_lock(&entry->LOCK_parallel_entry);
+        if (entry->last_committed_sub_id < event_gtid_sub_id)
+        {
+          entry->last_committed_sub_id= event_gtid_sub_id;
+          mysql_cond_broadcast(&entry->COND_parallel_entry);
+        }
+        mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+

Add:

  /*
    Tell storage engines that they can stop waiting.
    See comments in sql_class.cc::wakeup_subsequent_commits2() for why we can't
    do this part of the wakeup in unregister_wait_for_prior_commit().
 */

+        rgi->commit_orderer.wakeup_subsequent_commits();
+        delete rgi;
+      }
+
+      events= next;
+    }
+
+    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+    if ((events= rpt->event_queue) != NULL)
+    {
+      rpt->event_queue= rpt->last_in_queue= NULL;
+      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+      goto more_events;
+    }
+
+    if (!in_event_group)
+    {
+      rpt->current_entry= NULL;

As far as I can see, rpt->free is always 0 here.
- It's set to 0 at the top of this loop and only set to 1 here.

Can we remove the rpt->free variable?

+      if (!rpt->stop && !rpt->free)
+      {

Add comment.

/*
   Take next group of events from the replication pool.
   This is faster than having to wakeup the pool manager thread to give us
   a new event.
*/

+        mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
+        list= rpt->pool->free_list;
+        rpt->next= list;
+        rpt->pool->free_list= rpt;
+        if (!list)
+          mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
+        mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
+        rpt->free= true;
+      }
+    }
+  }

<cut>

+int
+rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
+                                 uint32 new_count, bool skip_check)
+{
+  uint32 i;
+  rpl_parallel_thread **new_list= NULL;
+  rpl_parallel_thread *new_free_list= NULL;
+
+  /*
+    Allocate the new list of threads up-front.
+    That way, if we fail half-way, we only need to free whatever we managed
+    to allocate, and will not be left with a half-functional thread pool.
+  */
+  if (new_count &&
+      !(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list),
+                                                    MYF(MY_WME))))
+  {
+    my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list))));
+    goto err;;
+  }

Why not allocate new_list[] elements at the same time?
Faster, less memory not allocation to check and freeing is also esier.

if (!my_multi_malloc(MYF(MY_WME),
                     &new_list, (uint) new_count*sizeof(*new_list),
                     &list_part, new_count*sizeof(**new_list),
                     NULL))
...

+
+  for (i= 0; i < new_count; ++i)
+  {
+    pthread_t th;
+
+    if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])),
+                                                        MYF(MY_WME))))
+    {
+      my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i])));
+      goto err;
+    }

The above would then simple be:

new_list[i]= list_part++;

Instead of setting everything to false and NULL, why not simple
bzero() the structure?  That way we can be more sure that we don't
forget to initialize any new element in the future.

+    new_list[i]->delay_start= true;
+    new_list[i]->running= false;
+    new_list[i]->stop= false;
+    new_list[i]->free= false;
+    mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
+                     MY_MUTEX_INIT_SLOW);
+    mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+    new_list[i]->pool= pool;
+    new_list[i]->current_entry= NULL;
+    new_list[i]->event_queue= NULL;
+    new_list[i]->last_in_queue= NULL;

<cut>

Add:

/*
  The following loops wait until each thread is done it's work for each
  event group and then stops it when the thread has nothing to do.

  Note that this may take some time as rpl_paralell:do_event() may request
  threads at the same time.
*/

To make this faster and ensuring we don't start new request,
wouldn't it be better to go through pool->threads and signal each of them?

As long as rpt->stop is defined as 'stop when you are done processing one
group' then this is fine (which is how it looks it's defined now).

+  for (i= 0; i < pool->count; ++i)
+  {
+    rpl_parallel_thread *rpt= pool->get_thread(NULL);
+    rpt->stop= true;
+    mysql_cond_signal(&rpt->COND_rpl_thread);
+    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+  }
+
+  for (i= 0; i < pool->count; ++i)
+  {
+    rpl_parallel_thread *rpt= pool->threads[i];
+    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+    while (rpt->running)
+      mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
+    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+    mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
+    mysql_cond_destroy(&rpt->COND_rpl_thread);
+    my_free(rpt);

By using my_multi_malloc() you can remove the above free

+  }
+
+  my_free(pool->threads);
+  pool->threads= new_list;
+  pool->free_list= new_free_list;
+  pool->count= new_count;
+  for (i= 0; i < pool->count; ++i)
+  {
+    mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
+    pool->threads[i]->delay_start= false;
+    mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
+    mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
+  }
+
+  if (!skip_check)
+  {
+    mysql_mutex_lock(&LOCK_active_mi);
+    pool->changing= false;
+    mysql_mutex_unlock(&LOCK_active_mi);
+  }
+  return 0;
+
+err:
+  if (new_list)
+  {
+    while (new_free_list)
+    {
+      rpl_parallel_thread *next= new_free_list->next;
+      mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
+      new_free_list->delay_start= false;
+      new_free_list->stop= true;
+      mysql_cond_signal(&new_free_list->COND_rpl_thread);
+      while (!new_free_list->running)
+        mysql_cond_wait(&new_free_list->COND_rpl_thread,
+                        &new_free_list->LOCK_rpl_thread);
+      while (new_free_list->running)
+        mysql_cond_wait(&new_free_list->COND_rpl_thread,
+                        &new_free_list->LOCK_rpl_thread);
+      mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
+      my_free(new_free_list);

By using my_multi_malloc() you can remove the above free and even the 'next'
variable

+      new_free_list= next;
+    }
+    my_free(new_list);
+  }

<cut>

<cut>

+void
+rpl_parallel::wait_for_done()
+{
+  struct rpl_parallel_entry *e;
+  uint32 i;
+
+  for (i= 0; i < domain_hash.records; ++i)
+  {
+    e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+    mysql_mutex_lock(&e->LOCK_parallel_entry);
+    while (e->current_sub_id > e->last_committed_sub_id)
+      mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
+    mysql_mutex_unlock(&e->LOCK_parallel_entry);
+  }
+}

How to handle errors in the above case?
For example when we get an error and we had to abort things?

I assume we should also abort for:

e->rpl_thread->thd->killed()

but this is hard as thd and even rpl_thread may be deleted.

One way to fix this would be to add an entry 'killed' to rpl_parell_entry
that we can set. However it will be hard to keep this up to date.

The other way would be to, on error, set last_committed_sub_id back to
it's last value.

We could also have some kind of shutdown flag that we check to avoid
hang on shutdown.

What do you suggest?
Assuming setting the value back would be the easiest solution..

+  for (i= 0; i < domain_hash.records; ++i)
+  {
+    e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+    mysql_mutex_lock(&e->LOCK_parallel_entry);
+    while (e->current_sub_id > e->last_committed_sub_id)
+      mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
+    mysql_mutex_unlock(&e->LOCK_parallel_entry);
+  }


+bool
+rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
+{
+  rpl_parallel_entry *e;
+  rpl_parallel_thread *cur_thread;
+  rpl_parallel_thread::queued_event *qev;
+  struct rpl_group_info *rgi;
+  Relay_log_info *rli= serial_rgi->rli;
+  enum Log_event_type typ;
+
+  /* ToDo: what to do with this lock?!? */
+  mysql_mutex_unlock(&rli->data_lock);
+
+  if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
+                                                            MYF(0))))
+  {
+    my_error(ER_OUT_OF_RESOURCES, MYF(0));
+    return true;
+  }
+  qev->ev= ev;
+  qev->next= NULL;
+
+  if ((typ= ev->get_type_code()) == GTID_EVENT)
+  {
+    Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+
+    if (!(e= find(gtid_ev->domain_id)) ||
+        !(rgi= new rpl_group_info(rli)) ||
+        event_group_new_gtid(rgi, gtid_ev))
+    {

Here add delete of rgi. Also set rgi to 0 before.

+      my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+      return true;
+    }
+
+    if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+        e->last_commit_id == gtid_ev->commit_id)
+    {
+      /*
+        We are already executing something else in this domain. But the two
+        event groups were committed together in the same group commit on the
+        master, so we can still do them in parallel here on the slave.
+
+        However, the commit of this event must wait for the commit of the prior
+        event, to preserve binlog commit order and visibility across all
+        servers in the replication hierarchy.
+      */
+      rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);

Here we could add a test if get_thread() is 0, which could happen on shutdown
or when killing a replication thread.

I think it's relatively safe to use get_thread() as a end-synchronize
point as we should never have anything half-done when we call
get_thread()).  As get_thread() can take a very long time and is not
called in many places, it's also sounds like a resonable thing to do.

+      rgi->wait_commit_sub_id= e->current_sub_id;
+      rgi->wait_commit_group_info= e->current_group_info;
+      rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
+      e->rpl_thread= cur_thread= rpt;
+      /* get_thread() returns with the LOCK_rpl_thread locked. */

Add the abvove comments also where get_thread() is defined.

+    }
+    else
+    {
+      /* Check if we already have a worker thread for this entry. */

I had a hard time to understand what the above meant.
Is this new suggested comments correct?


/* Get the last or current thread for the domain */
+      cur_thread= e->rpl_thread;
+      if (cur_thread)

+      {
        /*
           Active thread exists. The thread may still be executing queries for
           this domain or it may have been rescheduled to work for another
           domain.
        */

+        mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
+        if (cur_thread->current_entry != e)
+        {
+          /* Not ours anymore, we need to grab a new one. */

Suggested new comment:

/*
   The thread has been reused for another domain.
   (e->rpl_thread was not reset or was changed during the mutex_lock)
   As there is no threads working on this domain, it's safe to
   assign a new thread to work on it
*/

+          mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+          e->rpl_thread= cur_thread= NULL;
+        }
+      }

<cut>

Add here:

/*
  Current is not set for the case where the master did not have
  GTID, like a MariaDB 5.5 or MySQL master.
*/

+  else if (!Log_event::is_group_event(typ) || !current)
+  {
+    /*
+      Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
+      Same for events not preceeded by GTID (we should not see those normally,
+      but they might be from an old master).
+    */
+    qev->rgi= serial_rgi;
+    rpt_handle_event(qev, NULL);
+    delete_or_keep_event_post_apply(rli, typ, qev->ev);

Why is delete_or_keep_event_post_apply() not part of rpt_handle_event()?
You always call these two together...

+
+    return false;
+  }
+  else
+  {

The following code is identical (except one line) to the code earlier
in this function.  Better to create a function for this to reuse code and to
clarify the differences.

+    cur_thread= current->rpl_thread;
+    if (cur_thread)
+    {
+      mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
+      if (cur_thread->current_entry != current)
+      {
+        /* Not ours anymore, we need to grab a new one. */
+        mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+        cur_thread= NULL;
+      }
+    }
+    if (!cur_thread)
+    {
+      cur_thread= current->rpl_thread=
+        global_rpl_thread_pool.get_thread(current);
+    }
+  }
+  qev->rgi= current->current_group_info;
+
+  /*
+    Queue the event for processing.
+  */
+  if (cur_thread->last_in_queue)
+    cur_thread->last_in_queue->next= qev;
+  else
+    cur_thread->event_queue= qev;
+  cur_thread->last_in_queue= qev;
+  mysql_cond_signal(&cur_thread->COND_rpl_thread);
+  mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);

In this case it's safe to move the cond_signal outside of the mutex_unlock.
(Should make things notable faster as we win a sleep-wakup in other thread).

<cut>

=== added file 'sql/rpl_parallel.h'
--- sql/rpl_parallel.h	1970-01-01 00:00:00 +0000
+++ sql/rpl_parallel.h	2013-08-29 08:18:22 +0000
@@ -0,0 +1,92 @@
+#ifndef RPL_PARALLEL_H
+#define RPL_PARALLEL_H
+
+#include "log_event.h"
+
+
+struct rpl_parallel;
+struct rpl_parallel_entry;
+struct rpl_parallel_thread_pool;
+
+class Relay_log_info;
+struct rpl_parallel_thread {
+  bool delay_start;

I think we can remove deley_start. Se comments from previous review.

+  bool running;
+  bool stop;

We need to define 'stop' properly.
Like 'stop after we have executed the current event group'
(So that we don't have to do rollback).

+  bool free;

I think we can remove free. Se comments above.

Otherwise it would be good to have a good descripition for each struct
in this file.

Will now start reviewing your new commits to the parallel
replication tree.

Regards,
Monty