maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #08675
Review requested for MDEV-7818
I need someone to review my below fix for MDEV-7818.
This involves the FLUSH TABLES WITH READ LOCK command and associated
metadata locking, something that I know little about. Monty suggested the
basic semantics - that all currently running parallel replication
transactions would be allowed to complete before granting the global read
lock. But it really needs a review of the actual implementation, by someone
who has an understanding of the FTWRL semantics and implementation.
Thanks,
- Kristian.
Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:
> revision-id: 0e609daea4796fafca94f7adf796addfc506168b
> parent(s): 22b6c297dc0de6b176238ae820bddb5e70f2c8ab
> committer: Kristian Nielsen
> branch nick: mariadb
> timestamp: 2015-06-09 16:01:53 +0200
> message:
>
> MDEV-7818: Deadlock occurring with parallel replication and FTWRL
>
> Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from
> starting new commits, then waits for running commits to complete. But
> in-order parallel replication needs commits to happen in a particular
> order, so this can easily deadlock.
>
> To fix this problem, this patch introduces a way to temporarily pause
> the parallel replication worker threads. Before starting FTWRL, we let
> all worker threads complete in-progress transactions, and then
> wait. Then we proceed to take the global read lock. Once the lock is
> obtained, we unpause the worker threads. Now commits are blocked from
> starting by the global read lock, so the deadlock will no longer occur.
>
> ---
> sql/mysqld.cc | 3 +
> sql/mysqld.h | 3 +
> sql/rpl_parallel.cc | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++--
> sql/rpl_parallel.h | 27 ++++--
> sql/sql_parse.cc | 13 +++
> 5 files changed, 262 insertions(+), 14 deletions(-)
>
> diff --git a/sql/mysqld.cc b/sql/mysqld.cc
> index e05c0b6..af8ae2c 100644
> --- a/sql/mysqld.cc
> +++ b/sql/mysqld.cc
> @@ -9514,6 +9514,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for
> PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
> PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
> PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0};
> +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0};
> +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0};
> +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0};
> PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
> PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
> PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
> diff --git a/sql/mysqld.h b/sql/mysqld.h
> index 156e7f9..8c953c1 100644
> --- a/sql/mysqld.h
> +++ b/sql/mysqld.h
> @@ -454,6 +454,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
> extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit;
> extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
> extern PSI_stage_info stage_waiting_for_workers_idle;
> +extern PSI_stage_info stage_waiting_for_ftwrl;
> +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause;
> +extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
> extern PSI_stage_info stage_master_gtid_wait_primary;
> extern PSI_stage_info stage_master_gtid_wait;
> extern PSI_stage_info stage_gtid_wait_other_connection;
> diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
> index 738d0e5..d90507a 100644
> --- a/sql/rpl_parallel.cc
> +++ b/sql/rpl_parallel.cc
> @@ -280,6 +280,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
> rpl_parallel_entry *entry= rgi->parallel_entry;
> uint64 wait_count;
>
> + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
> +
> if (!gco->installed)
> {
> group_commit_orderer *prev_gco= gco->prev_gco;
> @@ -336,6 +338,159 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
> }
>
>
> +static void
> +do_ftwrl_wait(rpl_group_info *rgi,
> + bool *did_enter_cond, PSI_stage_info *old_stage)
> +{
> + THD *thd= rgi->thd;
> + rpl_parallel_entry *entry= rgi->parallel_entry;
> + uint64 sub_id= rgi->gtid_sub_id;
> +
> + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
> +
> + if (unlikely(entry->pause_sub_id > 0) && sub_id > entry->pause_sub_id)
> + {
> + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
> + &stage_waiting_for_ftwrl, old_stage);
> + *did_enter_cond= true;
> + do
> + {
> + if (entry->force_abort || rgi->worker_error)
> + break;
> + if (thd->check_killed())
> + {
> + thd->send_kill_message();
> + slave_output_error_info(rgi, thd);
> + signal_error_to_sql_driver_thread(thd, rgi, 1);
> + break;
> + }
> + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry);
> + } while (entry->pause_sub_id > 0 && sub_id > entry->pause_sub_id);
> + }
> +
> + if (sub_id > entry->largest_started_sub_id)
> + entry->largest_started_sub_id= sub_id;
> +}
> +
> +
> +static void
> +pool_mark_busy(rpl_parallel_thread_pool *pool)
> +{
> + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool);
> + DBUG_ASSERT(!pool->busy);
> + pool->busy= true;
> +}
> +
> +
> +static void
> +pool_mark_not_busy(rpl_parallel_thread_pool *pool)
> +{
> + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool);
> + DBUG_ASSERT(pool->busy);
> + pool->busy= false;
> + mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> +}
> +
> +
> +void
> +rpl_unpause_for_ftwrl(THD *thd)
> +{
> + uint32 i;
> + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
> +
> + DBUG_ASSERT(pool->busy);
> +
> + for (i= 0; i < pool->count; ++i)
> + {
> + rpl_parallel_entry *e;
> + rpl_parallel_thread *rpt= pool->threads[i];
> +
> + mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> + if (!rpt->current_owner)
> + {
> + mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> + continue;
> + }
> + e= rpt->current_entry;
> + mysql_mutex_lock(&e->LOCK_parallel_entry);
> + mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> + e->pause_sub_id= 0;
> + mysql_cond_broadcast(&e->COND_parallel_entry);
> + }
> +
> + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> + pool_mark_not_busy(pool);
> + mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +}
> +
> +
> +/*
> + .
> +
> + Note: in case of error return, unpause_for_ftwrl() must _not_ be called.
> +*/
> +int
> +rpl_pause_for_ftwrl(THD *thd)
> +{
> + uint32 i;
> + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
> + int err= 0;
> +
> + /*
> + While the count_pending_pause_for_ftwrl counter is non-zero, the pool
> + cannot be shutdown/resized, so threads are guaranteed to not disappear.
> +
> + This is required to safely be able to access the individual threads below.
> + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool,
> + as this can deadlock against release_thread()).
> + */
> + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> + pool_mark_busy(pool);
> + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> +
> + for (i= 0; i < pool->count; ++i)
> + {
> + PSI_stage_info old_stage;
> + rpl_parallel_entry *e;
> + rpl_parallel_thread *rpt= pool->threads[i];
> +
> + mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> + if (!rpt->current_owner)
> + {
> + mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> + continue;
> + }
> + e= rpt->current_entry;
> + mysql_mutex_lock(&e->LOCK_parallel_entry);
> + mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> + ++e->need_sub_id_signal;
> + if (!e->pause_sub_id)
> + e->pause_sub_id= e->largest_started_sub_id;
> + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
> + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
> + while (e->last_committed_sub_id < e->pause_sub_id && !err)
> + {
> + if (thd->check_killed())
> + {
> + thd->send_kill_message();
> + err= 1;
> + break;
> + }
> + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
> + };
> + --e->need_sub_id_signal;
> + thd->EXIT_COND(&old_stage);
> + if (err)
> + break;
> + }
> +
> + if (err)
> + rpl_unpause_for_ftwrl(thd);
> + return err;
> +}
> +
> +
> #ifndef DBUG_OFF
> static int
> dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
> @@ -856,6 +1011,9 @@ handle_rpl_parallel_thread(void *arg)
>
> if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
> skip_event_group= true;
> + if (likely(!skip_event_group))
> + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage);
> +
> register_wait_for_prior_event_group_commit(rgi, entry);
>
> unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
> @@ -1058,6 +1216,47 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
> rpl_parallel_thread **new_list= NULL;
> rpl_parallel_thread *new_free_list= NULL;
> rpl_parallel_thread *rpt_array= NULL;
> + THD *thd;
> + PSI_stage_info old_stage;
> + int res;
> +
> + /*
> + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH
> + READ LOCK work correctly, without incuring extra locking penalties in
> + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the
> + thread pool, and for this we need to make sure the pool will not go away
> + during the operation. The LOCK_rpl_thread_pool is not suitable for
> + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it
> + must be released before locking any LOCK_rpl_thread lock, or a deadlock
> + can occur.
> +
> + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and
> + pool size changes with this condition wait.
> + */
> + thd= current_thd;
> + res= 0;
> + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> + if (thd)
> + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
> + &stage_waiting_for_rpl_thread_pool, &old_stage);
> + while (pool->busy)
> + {
> + if (thd->check_killed())
> + {
> + thd->send_kill_message();
> + res= 1;
> + break;
> + }
> + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
> + }
> + if (!res)
> + pool_mark_busy(pool);
> + if (thd)
> + thd->EXIT_COND(&old_stage);
> + else
> + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> + if (res)
> + return res;
>
> /*
> Allocate the new list of threads up-front.
> @@ -1106,7 +1305,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
> */
> for (i= 0; i < pool->count; ++i)
> {
> - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
> + rpl_parallel_thread *rpt;
> +
> + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> + while ((rpt= pool->free_list) == NULL)
> + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool);
> + pool->free_list= rpt->next;
> + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> + mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> rpt->stop= true;
> mysql_cond_signal(&rpt->COND_rpl_thread);
> mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> @@ -1157,7 +1363,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
> }
>
> mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> - mysql_cond_broadcast(&pool->COND_rpl_thread_pool);
> + pool_mark_not_busy(pool);
> mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
>
> return 0;
> @@ -1182,6 +1388,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
> }
> my_free(new_list);
> }
> + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
> + pool_mark_not_busy(pool);
> + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
> return 1;
> }
>
> @@ -1445,7 +1654,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
>
>
> rpl_parallel_thread_pool::rpl_parallel_thread_pool()
> - : count(0), threads(0), free_list(0), inited(false)
> + : threads(0), free_list(0), count(0), inited(false), busy(false)
> {
> }
>
> @@ -1453,9 +1662,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool()
> int
> rpl_parallel_thread_pool::init(uint32 size)
> {
> - count= 0;
> threads= NULL;
> free_list= NULL;
> + count= 0;
> + busy= false;
>
> mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool,
> MY_MUTEX_INIT_SLOW);
> @@ -1496,8 +1706,14 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
> rpl_parallel_thread *rpt;
>
> mysql_mutex_lock(&LOCK_rpl_thread_pool);
> - while ((rpt= free_list) == NULL)
> + for (;;)
> + {
> + while (unlikely(busy))
> + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
> + if ((rpt= free_list) != NULL)
> + break;
> mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);
> + }
> free_list= rpt->next;
> mysql_mutex_unlock(&LOCK_rpl_thread_pool);
> mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> @@ -1908,7 +2124,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
>
> e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
> mysql_mutex_lock(&e->LOCK_parallel_entry);
> - e->need_sub_id_signal= true;
> + ++e->need_sub_id_signal;
> thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
> &stage_waiting_for_workers_idle, &old_stage);
> while (e->current_sub_id > e->last_committed_sub_id)
> @@ -1921,7 +2137,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
> }
> mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
> }
> - e->need_sub_id_signal= false;
> + --e->need_sub_id_signal;
> thd->EXIT_COND(&old_stage);
> if (err)
> return err;
> diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
> index 04f8706..77a4005 100644
> --- a/sql/rpl_parallel.h
> +++ b/sql/rpl_parallel.h
> @@ -199,12 +199,18 @@ struct rpl_parallel_thread {
>
>
> struct rpl_parallel_thread_pool {
> - uint32 count;
> struct rpl_parallel_thread **threads;
> struct rpl_parallel_thread *free_list;
> mysql_mutex_t LOCK_rpl_thread_pool;
> mysql_cond_t COND_rpl_thread_pool;
> + uint32 count;
> bool inited;
> + /*
> + While FTWRL runs, this counter is incremented to make SQL thread or
> + STOP/START slave not try to start new activity while that operation
> + is in progress.
> + */
> + bool busy;
>
> rpl_parallel_thread_pool();
> int init(uint32 size);
> @@ -219,6 +225,12 @@ struct rpl_parallel_entry {
> mysql_mutex_t LOCK_parallel_entry;
> mysql_cond_t COND_parallel_entry;
> uint32 domain_id;
> + /*
> + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
> + that they are waiting, so that finish_event_group knows to signal them
> + when last_committed_sub_id is increased.
> + */
> + uint32 need_sub_id_signal;
> uint64 last_commit_id;
> bool active;
> /*
> @@ -228,12 +240,6 @@ struct rpl_parallel_entry {
> */
> bool force_abort;
> /*
> - Set in wait_for_workers_idle() to show that it is waiting, so that
> - finish_event_group knows to signal it when last_committed_sub_id is
> - increased.
> - */
> - bool need_sub_id_signal;
> - /*
> At STOP SLAVE (force_abort=true), we do not want to process all events in
> the queue (which could unnecessarily delay stop, if a lot of events happen
> to be queued). The stop_count provides a safe point at which to stop, so
> @@ -291,6 +297,11 @@ struct rpl_parallel_entry {
> The value is ULONGLONG_MAX when no error occured.
> */
> uint64 stop_on_error_sub_id;
> + /*
> + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
> + this value must not start, but wait until the global read lock is released.
> + */
> + uint64 pause_sub_id;
> /* Total count of event groups queued so far. */
> uint64 count_queued_event_groups;
> /*
> @@ -331,5 +342,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
> extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
> extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
> extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
> +extern int rpl_pause_for_ftwrl(THD *thd);
> +extern void rpl_unpause_for_ftwrl(THD *thd);
>
> #endif /* RPL_PARALLEL_H */
> diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
> index 5635e9a..ca6b11e 100644
> --- a/sql/sql_parse.cc
> +++ b/sql/sql_parse.cc
> @@ -4271,6 +4271,17 @@ case SQLCOM_PREPARE:
> break;
> }
>
> + if (lex->type & REFRESH_READ_LOCK)
> + {
> + /*
> + We need to pause any parallel replication slave workers during FLUSH
> + TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as
> + worker threads eun run in arbitrary order but need to commit in a
> + specific given order.
> + */
> + if (rpl_pause_for_ftwrl(thd))
> + goto error;
> + }
> /*
> reload_acl_and_cache() will tell us if we are allowed to write to the
> binlog or not.
> @@ -4301,6 +4312,8 @@ case SQLCOM_PREPARE:
> if (!res)
> my_ok(thd);
> }
> + if (lex->type & REFRESH_READ_LOCK)
> + rpl_unpause_for_ftwrl(thd);
>
> break;
> }
> _______________________________________________
> commits mailing list
> commits@xxxxxxxxxxx
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits