← Back to team overview

maria-developers team mailing list archive

Review requested for MDEV-7818

 

I need someone to review my below fix for MDEV-7818.

This involves the FLUSH TABLES WITH READ LOCK command and associated
metadata locking, something that I know little about. Monty suggested the
basic semantics - that all currently running parallel replication
transactions would be allowed to complete before granting the global read
lock. But it really needs a review of the actual implementation, by someone
who has an understanding of the FTWRL semantics and implementation.

Thanks,

 - Kristian.

Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:

> revision-id: 0e609daea4796fafca94f7adf796addfc506168b
> parent(s): 22b6c297dc0de6b176238ae820bddb5e70f2c8ab
> committer: Kristian Nielsen
> branch nick: mariadb
> timestamp: 2015-06-09 16:01:53 +0200
> message:
>
> MDEV-7818: Deadlock occurring with parallel replication and FTWRL
>
> Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from
> starting new commits, then waits for running commits to complete. But
> in-order parallel replication needs commits to happen in a particular
> order, so this can easily deadlock.
>
> To fix this problem, this patch introduces a way to temporarily pause
> the parallel replication worker threads. Before starting FTWRL, we let
> all worker threads complete in-progress transactions, and then
> wait. Then we proceed to take the global read lock. Once the lock is
> obtained, we unpause the worker threads. Now commits are blocked from
> starting by the global read lock, so the deadlock will no longer occur.
>
> ---
>  sql/mysqld.cc       |   3 +
>  sql/mysqld.h        |   3 +
>  sql/rpl_parallel.cc | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++--
>  sql/rpl_parallel.h  |  27 ++++--
>  sql/sql_parse.cc    |  13 +++
>  5 files changed, 262 insertions(+), 14 deletions(-)
>
> diff --git a/sql/mysqld.cc b/sql/mysqld.cc
> index e05c0b6..af8ae2c 100644
> --- a/sql/mysqld.cc
> +++ b/sql/mysqld.cc
> @@ -9514,6 +9514,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
>  PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
>  PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
>  PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0};
> +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0};
> +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0};
> +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0};
>  PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
>  PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
>  PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
> diff --git a/sql/mysqld.h b/sql/mysqld.h
> index 156e7f9..8c953c1 100644
> --- a/sql/mysqld.h
> +++ b/sql/mysqld.h
> @@ -454,6 +454,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
>  extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit;
>  extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
>  extern PSI_stage_info stage_waiting_for_workers_idle;
> +extern PSI_stage_info stage_waiting_for_ftwrl;
> +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause;
> +extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
>  extern PSI_stage_info stage_master_gtid_wait_primary;
>  extern PSI_stage_info stage_master_gtid_wait;
>  extern PSI_stage_info stage_gtid_wait_other_connection;
> diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
> index 738d0e5..d90507a 100644
> --- a/sql/rpl_parallel.cc
> +++ b/sql/rpl_parallel.cc
> @@ -280,6 +280,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
>    rpl_parallel_entry *entry= rgi->parallel_entry;
>    uint64 wait_count;
>  
> +  mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
> +
>    if (!gco->installed)
>    {
>      group_commit_orderer *prev_gco= gco->prev_gco;
> @@ -336,6 +338,159 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
>  }
>  
>  
> +static void
> +do_ftwrl_wait(rpl_group_info *rgi,
> +              bool *did_enter_cond, PSI_stage_info *old_stage)
> +{
> +  THD *thd= rgi->thd;
> +  rpl_parallel_entry *entry= rgi->parallel_entry;
> +  uint64 sub_id= rgi->gtid_sub_id;
> +
> +  mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
> +
> +  if (unlikely(entry->pause_sub_id > 0) && sub_id > entry->pause_sub_id)
> +  {
> +    thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
> +                    &stage_waiting_for_ftwrl, old_stage);
> +    *did_enter_cond= true;
> +    do
> +    {
> +      if (entry->force_abort || rgi->worker_error)
> +        break;
> +      if (thd->check_killed())
> +      {
> +        thd->send_kill_message();
> +        slave_output_error_info(rgi, thd);
> +        signal_error_to_sql_driver_thread(thd, rgi, 1);
> +        break;
> +      }
> +      mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
> +    } while (entry->pause_sub_id > 0 && sub_id > entry->pause_sub_id);
> +  }
> +
> +  if (sub_id > entry->largest_started_sub_id)
> +    entry->largest_started_sub_id= sub_id;
> +}
> +
> +
> +static void
> +pool_mark_busy(rpl_parallel_thread_pool *pool)
> +{
> +  mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool);
> +  DBUG_ASSERT(!pool->busy);
> +  pool->busy= true;
> +}
> +
> +
> +static void
> +pool_mark_not_busy(rpl_parallel_thread_pool *pool)
> +{
> +  mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool);
> +  DBUG_ASSERT(pool->busy);
> +  pool->busy= false;
> +  mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> +}
> +
> +
> +void
> +rpl_unpause_for_ftwrl(THD *thd)
> +{
> +  uint32 i;
> +  rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
> +
> +  DBUG_ASSERT(pool->busy);
> +
> +  for (i= 0; i < pool->count; ++i)
> +  {
> +    rpl_parallel_entry *e;
> +    rpl_parallel_thread *rpt= pool->threads[i];
> +
> +    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> +    if (!rpt->current_owner)
> +    {
> +      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +      continue;
> +    }
> +    e= rpt->current_entry;
> +    mysql_mutex_lock(&e->LOCK_parallel_entry);
> +    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +    e->pause_sub_id= 0;
> +    mysql_cond_broadcast(&e->COND_parallel_entry);
> +  }
> +
> +  mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> +  pool_mark_not_busy(pool);
> +  mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> +  mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +}
> +
> +
> +/*
> +  .
> +
> +  Note: in case of error return, unpause_for_ftwrl() must _not_ be called.
> +*/
> +int
> +rpl_pause_for_ftwrl(THD *thd)
> +{
> +  uint32 i;
> +  rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
> +  int err= 0;
> +
> +  /*
> +    While the count_pending_pause_for_ftwrl counter is non-zero, the pool
> +    cannot be shutdown/resized, so threads are guaranteed to not disappear.
> +
> +    This is required to safely be able to access the individual threads below.
> +    (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
> +    as this can deadlock against release_thread()).
> +  */
> +  mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> +  pool_mark_busy(pool);
> +  mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +
> +  for (i= 0; i < pool->count; ++i)
> +  {
> +    PSI_stage_info old_stage;
> +    rpl_parallel_entry *e;
> +    rpl_parallel_thread *rpt= pool->threads[i];
> +
> +    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> +    if (!rpt->current_owner)
> +    {
> +      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +      continue;
> +    }
> +    e= rpt->current_entry;
> +    mysql_mutex_lock(&e->LOCK_parallel_entry);
> +    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +    ++e->need_sub_id_signal;
> +    if (!e->pause_sub_id)
> +      e->pause_sub_id= e->largest_started_sub_id;
> +    thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
> +                    &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
> +    while (e->last_committed_sub_id < e->pause_sub_id && !err)
> +    {
> +      if (thd->check_killed())
> +      {
> +        thd->send_kill_message();
> +        err= 1;
> +        break;
> +      }
> +      mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
> +    };
> +    --e->need_sub_id_signal;
> +    thd->EXIT_COND(&old_stage);
> +    if (err)
> +      break;
> +  }
> +
> +  if (err)
> +    rpl_unpause_for_ftwrl(thd);
> +  return err;
> +}
> +
> +
>  #ifndef DBUG_OFF
>  static int
>  dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
> @@ -856,6 +1011,9 @@ handle_rpl_parallel_thread(void *arg)
>  
>          if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
>            skip_event_group= true;
> +        if (likely(!skip_event_group))
> +          do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
> +
>          register_wait_for_prior_event_group_commit(rgi, entry);
>  
>          unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
> @@ -1058,6 +1216,47 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
>    rpl_parallel_thread **new_list= NULL;
>    rpl_parallel_thread *new_free_list= NULL;
>    rpl_parallel_thread *rpt_array= NULL;
> +  THD *thd;
> +  PSI_stage_info old_stage;
> +  int res;
> +
> +  /*
> +    Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
> +    READ LOCK work correctly, without incuring extra locking penalties in
> +    normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
> +    thread pool, and for this we need to make sure the pool will not go away
> +    during the operation. The LOCK_rpl_thread_pool is not suitable for
> +    this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
> +    must be released before locking any LOCK_rpl_thread lock, or a deadlock
> +    can occur.
> +
> +    So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
> +    pool size changes with this condition wait.
> +  */
> +  thd= current_thd;
> +  res= 0;
> +  mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> +  if (thd)
> +    thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
> +                    &stage_waiting_for_rpl_thread_pool, &old_stage);
> +  while (pool->busy)
> +  {
> +    if (thd->check_killed())
> +    {
> +      thd->send_kill_message();
> +      res= 1;
> +      break;
> +    }
> +    mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
> +  }
> +  if (!res)
> +    pool_mark_busy(pool);
> +  if (thd)
> +    thd->EXIT_COND(&old_stage);
> +  else
> +    mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +  if (res)
> +    return res;
>  
>    /*
>      Allocate the new list of threads up-front.
> @@ -1106,7 +1305,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
>    */
>    for (i= 0; i < pool->count; ++i)
>    {
> -    rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
> +    rpl_parallel_thread *rpt;
> +
> +    mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> +    while ((rpt= pool->free_list) == NULL)
> +      mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
> +    pool->free_list= rpt->next;
> +    mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
>      rpt->stop= true;
>      mysql_cond_signal(&rpt->COND_rpl_thread);
>      mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> @@ -1157,7 +1363,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
>    }
>  
>    mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> -  mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> +  pool_mark_not_busy(pool);
>    mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
>  
>    return 0;
> @@ -1182,6 +1388,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
>      }
>      my_free(new_list);
>    }
> +  mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> +  pool_mark_not_busy(pool);
> +  mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
>    return 1;
>  }
>  
> @@ -1445,7 +1654,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
>  
>  
>  rpl_parallel_thread_pool::rpl_parallel_thread_pool()
> -  : count(0), threads(0), free_list(0), inited(false)
> +  : threads(0), free_list(0), count(0), inited(false), busy(false)
>  {
>  }
>  
> @@ -1453,9 +1662,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
>  int
>  rpl_parallel_thread_pool::init(uint32 size)
>  {
> -  count= 0;
>    threads= NULL;
>    free_list= NULL;
> +  count= 0;
> +  busy= false;
>  
>    mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
>                     MY_MUTEX_INIT_SLOW);
> @@ -1496,8 +1706,14 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
>    rpl_parallel_thread *rpt;
>  
>    mysql_mutex_lock(&LOCK_rpl_thread_pool);
> -  while ((rpt= free_list) == NULL)
> +  for (;;)
> +  {
> +    while (unlikely(busy))
> +      mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
> +    if ((rpt= free_list) != NULL)
> +      break;
>      mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
> +  }
>    free_list= rpt->next;
>    mysql_mutex_unlock(&LOCK_rpl_thread_pool);
>    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> @@ -1908,7 +2124,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
>  
>      e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
>      mysql_mutex_lock(&e->LOCK_parallel_entry);
> -    e->need_sub_id_signal= true;
> +    ++e->need_sub_id_signal;
>      thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
>                      &stage_waiting_for_workers_idle, &old_stage);
>      while (e->current_sub_id > e->last_committed_sub_id)
> @@ -1921,7 +2137,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
>        }
>        mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
>      }
> -    e->need_sub_id_signal= false;
> +    --e->need_sub_id_signal;
>      thd->EXIT_COND(&old_stage);
>      if (err)
>        return err;
> diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
> index 04f8706..77a4005 100644
> --- a/sql/rpl_parallel.h
> +++ b/sql/rpl_parallel.h
> @@ -199,12 +199,18 @@ struct rpl_parallel_thread {
>  
>  
>  struct rpl_parallel_thread_pool {
> -  uint32 count;
>    struct rpl_parallel_thread **threads;
>    struct rpl_parallel_thread *free_list;
>    mysql_mutex_t LOCK_rpl_thread_pool;
>    mysql_cond_t COND_rpl_thread_pool;
> +  uint32 count;
>    bool inited;
> +  /*
> +    While FTWRL runs, this counter is incremented to make SQL thread or
> +    STOP/START slave not try to start new activity while that operation
> +    is in progress.
> +  */
> +  bool busy;
>  
>    rpl_parallel_thread_pool();
>    int init(uint32 size);
> @@ -219,6 +225,12 @@ struct rpl_parallel_entry {
>    mysql_mutex_t LOCK_parallel_entry;
>    mysql_cond_t COND_parallel_entry;
>    uint32 domain_id;
> +  /*
> +    Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
> +    that they are waiting, so that finish_event_group knows to signal them
> +    when last_committed_sub_id is increased.
> +  */
> +  uint32 need_sub_id_signal;
>    uint64 last_commit_id;
>    bool active;
>    /*
> @@ -228,12 +240,6 @@ struct rpl_parallel_entry {
>    */
>    bool force_abort;
>    /*
> -    Set in wait_for_workers_idle() to show that it is waiting, so that
> -    finish_event_group knows to signal it when last_committed_sub_id is
> -    increased.
> -  */
> -  bool need_sub_id_signal;
> -  /*
>     At STOP SLAVE (force_abort=true), we do not want to process all events in
>     the queue (which could unnecessarily delay stop, if a lot of events happen
>     to be queued). The stop_count provides a safe point at which to stop, so
> @@ -291,6 +297,11 @@ struct rpl_parallel_entry {
>      The value is ULONGLONG_MAX when no error occured.
>    */
>    uint64 stop_on_error_sub_id;
> +  /*
> +    During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
> +    this value must not start, but wait until the global read lock is released.
> +  */
> +  uint64 pause_sub_id;
>    /* Total count of event groups queued so far. */
>    uint64 count_queued_event_groups;
>    /*
> @@ -331,5 +342,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
>  extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
>  extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
>  extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
> +extern int rpl_pause_for_ftwrl(THD *thd);
> +extern void rpl_unpause_for_ftwrl(THD *thd);
>  
>  #endif  /* RPL_PARALLEL_H */
> diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
> index 5635e9a..ca6b11e 100644
> --- a/sql/sql_parse.cc
> +++ b/sql/sql_parse.cc
> @@ -4271,6 +4271,17 @@ case SQLCOM_PREPARE:
>        break;
>      }
>  
> +    if (lex->type & REFRESH_READ_LOCK)
> +    {
> +      /*
> +        We need to pause any parallel replication slave workers during FLUSH
> +        TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as
> +        worker threads eun run in arbitrary order but need to commit in a
> +        specific given order.
> +      */
> +      if (rpl_pause_for_ftwrl(thd))
> +        goto error;
> +    }
>      /*
>        reload_acl_and_cache() will tell us if we are allowed to write to the
>        binlog or not.
> @@ -4301,6 +4312,8 @@ case SQLCOM_PREPARE:
>        if (!res)
>          my_ok(thd);
>      } 
> +    if (lex->type & REFRESH_READ_LOCK)
> +      rpl_unpause_for_ftwrl(thd);
>      
>      break;
>    }
> _______________________________________________
> commits mailing list
> commits@xxxxxxxxxxx
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits