← Back to team overview

maria-developers team mailing list archive

Re: Review request for MDEV-6020, MDEV-5914, MDEV-5941, and MDEV-5262

 

Hi, Kristian!

On Jun 10, Kristian Nielsen wrote:
> Hi Serg,
> 
> We discussed on IRC a review of the $subj bugs related to deadlocks in
> parallel replication and handling of automatic retry of transactions in case
> of deadlock.

Here it is, below.

General comments:

* MySQL has a variable binlog_order_commits.
  we can introduce it too - it allows to cut lots of corners
  in binlogging code. And most users don't need strict binlog ordering
  most of the time.

* I tried to ignore background killing code, because we're discussing
  whether it'll stay or not. And the discussion of the API is happening
  in another thread. So I was mostly looking at the general logic and at
  the binlog re-reading code, that we've already discussed on irc.

> === modified file 'sql/sql_admin.cc'
> --- sql/sql_admin.cc	2014-03-27 20:32:53 +0000
> +++ sql/sql_admin.cc	2014-06-10 17:48:32 +0000
> @@ -912,7 +912,7 @@ static bool mysql_admin_table(THD* thd,
>        protocol->store(operator_name, system_charset_info);
>        if (result_code) // either mysql_recreate_table or analyze failed
>        {
> -        DBUG_ASSERT(thd->is_error() || thd->killed);
> +        DBUG_ASSERT(thd->is_error());

why?

>          if (thd->is_error())
>          {
>            const char *err_msg= thd->get_stmt_da()->message();
> 
> === modified file 'storage/heap/hp_write.c'
> --- storage/heap/hp_write.c	2014-03-26 21:25:38 +0000
> +++ storage/heap/hp_write.c	2014-06-10 17:48:32 +0000
> @@ -153,10 +153,10 @@ static uchar *next_free_record_pos(HP_SH
>          (info->data_length + info->index_length >= info->max_table_size))
>      {
>        DBUG_PRINT("error",
> -                 ("record file full. records: %u  max_records: %lu  "
> +                 ("record file full. records: %lu  max_records: %lu  "
>                    "data_length: %llu  index_length: %llu  "
>                    "max_table_size: %llu",
> -                  info->records, info->max_records,
> +                  (unsigned long)info->records, info->max_records,

Eh? Why did you add a cast from ulong to ulong?

>                    info->data_length, info->index_length,
>                    info->max_table_size));
>        my_errno=HA_ERR_RECORD_FILE_FULL;
> 
> === modified file 'sql/sql_class.h'
> --- sql/sql_class.h	2014-04-25 10:58:31 +0000
> +++ sql/sql_class.h	2014-06-10 17:48:32 +0000
> @@ -1357,7 +1357,8 @@ enum enum_thread_type
>    SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8,
>    SYSTEM_THREAD_EVENT_SCHEDULER= 16,
>    SYSTEM_THREAD_EVENT_WORKER= 32,
> -  SYSTEM_THREAD_BINLOG_BACKGROUND= 64
> +  SYSTEM_THREAD_BINLOG_BACKGROUND= 64,
> +  SYSTEM_THREAD_SLAVE_BACKGROUND= 128,

is it your background killer thread?

>  };
>  
>  inline char const *
> === modified file 'storage/innobase/lock/lock0lock.cc'
> --- storage/innobase/lock/lock0lock.cc	2014-02-26 18:22:48 +0000
> +++ storage/innobase/lock/lock0lock.cc	2014-06-10 17:48:32 +0000
> @@ -1020,6 +1021,28 @@ lock_rec_has_to_wait(
>  			return(FALSE);
>  		}
>  
> +		if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2)) &&
> +		    !thd_need_ordering_with(trx->mysql_thd,
> +					    lock2->trx->mysql_thd)) {
> +			/* If the upper server layer has already decided on the
> +			commit order between the transaction requesting the
> +			lock and the transaction owning the lock, we do not
> +			need to wait for gap locks. Such ordeering by the upper
> +			server layer happens in parallel replication, where the
> +			commit order is fixed to match the original order on the
> +			master.
> +
> +			Such gap locks are mainly needed to get serialisability
> +			between transactions so that they will be binlogged in
> +			the correct order so that statement-based replication
> +			will give the correct results. Since the right order
> +			was already determined on the master, we do not need
> +			to enforce it again here (and doing so could lead to
> +			occasional deadlocks). */

Please, clarify this comment. It is not clear whether this if() is required
for correctness, for the code to work. Or it's merely an optimization,
and these occasional deadlocks will only cause the transactiion to be
reexecuted, but the end result will still be the same.

I believe, you meant the latter.

> +
> +			return (FALSE);
> +		}
> +
>  		return(TRUE);
>  	}
>  
> === modified file 'sql/sql_class.cc'
> --- sql/sql_class.cc	2014-03-26 21:32:20 +0000
> +++ sql/sql_class.cc	2014-06-10 17:48:32 +0000
> @@ -4211,12 +4211,121 @@ extern "C" int thd_slave_thread(const MY
>    return(thd->slave_thread);
>  }
>  
> -/* Returns true for a worker thread in parallel replication. */
> -extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
> +extern "C" int
> +thd_need_wait_for(const MYSQL_THD thd)
>  {
> -  return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
> +  rpl_group_info *rgi;
> +
> +  if (!thd)
> +    return false;
> +  rgi= thd->rgi_slave;
> +  if (!rgi)
> +    return false;
> +  return rgi->is_parallel_exec;
> +}
> +
> +extern "C" void
> +thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
> +{
> +  rpl_group_info *rgi;
> +  rpl_group_info *other_rgi;
> +
> +  if (!thd || !other_thd)
> +    return;
> +  rgi= thd->rgi_slave;
> +  other_rgi= other_thd->rgi_slave;
> +  if (!rgi || !other_rgi)
> +    return;
> +  if (!rgi->is_parallel_exec)
> +    return;
> +  if (rgi->rli != other_rgi->rli)
> +    return;
> +  if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id)
> +    return;
> +  if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
> +    return;
> +  if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
> +    return;
> +  /*
> +    This transaction is about to wait for another transaction that is required
> +    by replication binlog order to commit after. This would cause a deadlock.
> +
> +    So send a kill to the other transaction, with a temporary error; this will
> +    cause replication to rollback (and later re-try) the other transaction,
> +    releasing the lock for this transaction so replication can proceed.
> +  */
> +
> +#ifdef HAVE_REPLICATION
> +  slave_background_kill_request(other_thd);
> +#endif
> +}
> +
> +extern "C" int
> +thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
> +{
> +  rpl_group_info *rgi, *other_rgi;
> +
> +  if (!thd || !other_thd)
> +    return 1;
> +  rgi= thd->rgi_slave;
> +  other_rgi= other_thd->rgi_slave;
> +  if (!rgi || !other_rgi)
> +    return 1;
> +  if (!rgi->is_parallel_exec)
> +    return 1;
> +  if (rgi->rli != other_rgi->rli)
> +    return 1;
> +  if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
> +    return 1;
> +  /*
> +    These two threads are doing parallel replication within the same
> +    replication domain. Their commit order is already fixed, so we do not need
> +    gap locks or similar to otherwise enforce ordering (and in fact such locks
> +    could lead to unnecessary deadlocks and transaction retry).

Oh, right. That's what I mean when I commented in 
innobase/lock/lock0lock.cc

> +  */
> +  return 0;
>  }
>  
> +
> +extern "C" int
> +thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2)
> +{
> +  rpl_group_info *rgi1, *rgi2;
> +  bool nontrans1, nontrans2;
> +
> +  if (!thd1 || !thd2)
> +    return 0;
> +
> +  /*
> +    If the transactions are participating in the same replication domain in
> +    parallel replication, then request to select the one that will commit
> +    later (in the fixed commit order from the master) as the deadlock victim.
> +  */
> +  rgi1= thd1->rgi_slave;
> +  rgi2= thd2->rgi_slave;
> +  if (rgi1 && rgi2 &&
> +      rgi1->is_parallel_exec &&
> +      rgi1->rli == rgi2->rli &&
> +      rgi1->current_gtid.domain_id == rgi2->current_gtid.domain_id)
> +    return rgi1->gtid_sub_id < rgi2->gtid_sub_id ? 1 : -1;
> +
> +  /*
> +    If one transaction has modified non-transactional tables (so that it
> +    cannot be safely rolled back), and the other has not, then prefer to
> +    select the purely transactional one as the victim.
> +  */
> +  nontrans1= thd1->transaction.all.modified_non_trans_table;
> +  nontrans2= thd2->transaction.all.modified_non_trans_table;
> +  if (nontrans1 && !nontrans2)
> +    return 1;
> +  else if (!nontrans1 && nontrans2)
> +    return -1;
> +
> +  /* No preferences, let the storage engine decide. */
> +  return 0;
> +}
> +
> +
>  extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
>  {
>    return(thd->transaction.all.modified_non_trans_table);
> === modified file 'sql/log.cc'
> --- sql/log.cc	2014-03-23 19:09:38 +0000
> +++ sql/log.cc	2014-06-10 17:48:32 +0000
> @@ -4067,7 +4068,30 @@ int MYSQL_BIN_LOG::purge_first_log(Relay
>    DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
>  
>    mysql_mutex_lock(&LOCK_index);
> -  to_purge_if_included= my_strdup(rli->group_relay_log_name, MYF(0));
> +
> +  ir= rli->inuse_relaylog_list;
> +  while (ir)
> +  {
> +    inuse_relaylog *next= ir->next;

are relay logs ordered in this list?
That is, all newer relay logs are after the current one (ir),
all older relay logs are before?

> +    if (!ir->completed || ir->dequeued_count < ir->queued_count)
> +    {
> +      included= false;
> +      break;
> +    }
> +    if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))

do we ever use this reverse style in the code (const == expr)?
I don't remember that

> +      break;
> +    if (!next)
> +    {
> +      rli->last_inuse_relaylog= NULL;
> +      included= 1;
> +      to_purge_if_included= my_strdup(ir->name, MYF(0));
> +    }
> +    my_free(ir);
> +    ir= next;
> +  }
> +  rli->inuse_relaylog_list= ir;
> +  if (ir)
> +    to_purge_if_included= my_strdup(ir->name, MYF(0));
>  
>    /*
>      Read the next log file name from the index file and pass it back to
> === modified file 'sql/rpl_rli.h'
> --- sql/rpl_rli.h	2014-04-25 10:58:31 +0000
> +++ sql/rpl_rli.h	2014-06-10 17:48:32 +0000
> @@ -158,6 +159,14 @@ class Relay_log_info : public Slave_repo
>    Master_info *mi;
>  
>    /*
> +    List of active relay log files.
> +    (This can be more than one in case of parallel replication).
> +  */
> +  inuse_relaylog *inuse_relaylog_list;
> +  inuse_relaylog *last_inuse_relaylog;
> +  my_atomic_rwlock_t inuse_relaylog_atomic_lock;

A comment would be nice here, explaining what this inuse_relaylog_atomic_lock
protects (dequeued_count, as far as I can see)

> +
> +  /*
>      Needed to deal properly with cur_log getting closed and re-opened with
>      a different log under our feet
>    */
> === modified file 'sql/rpl_rli.cc'
> --- sql/rpl_rli.cc	2014-04-25 10:58:31 +0000
> +++ sql/rpl_rli.cc	2014-06-10 17:48:32 +0000
> @@ -1279,6 +1291,32 @@ void Relay_log_info::stmt_done(my_off_t
>    DBUG_VOID_RETURN;
>  }
>  
> +
> +int
> +Relay_log_info::alloc_inuse_relaylog(const char *name)
> +{
> +  inuse_relaylog *ir;
> +
> +  if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
> +  {
> +    my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
> +    return 1;
> +  }
> +  strcpy(ir->name, name);

No checks whether a name fits?
Even if it doesn, I would've been more comfortable seeing strmake here.
We have a convenience macro for this:

 strmake_buf(ir->name, name);

> +
> +  if (!inuse_relaylog_list)
> +    inuse_relaylog_list= ir;
> +  else
> +  {
> +    last_inuse_relaylog->completed= true;
> +    last_inuse_relaylog->next= ir;
> +  }
> +  last_inuse_relaylog= ir;
> +
> +  return 0;
> +}
> +
> +
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
>  int
>  rpl_load_gtid_slave_state(THD *thd)
> === modified file 'sql/log_event.cc'
> --- sql/log_event.cc	2014-04-25 10:58:31 +0000
> +++ sql/log_event.cc	2014-06-10 17:48:32 +0000
> @@ -4384,18 +4420,21 @@ Default database: '%s'. Query: '%s'",
>      {
>        DBUG_PRINT("info",("error ignored"));
>        clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
> -      thd->reset_killed();
> +      if (actual_error == ER_QUERY_INTERRUPTED ||
> +          actual_error == ER_CONNECTION_KILLED)
> +        thd->reset_killed();

Why?

>      }
>      /*
>        Other cases: mostly we expected no error and get one.
>      */
>      else if (thd->is_slave_error || thd->is_fatal_error)
>      {
> -      rli->report(ERROR_LEVEL, actual_error,
> -                      "Error '%s' on query. Default database: '%s'. Query: '%s'",
> -                      (actual_error ? thd->get_stmt_da()->message() :
> -                       "unexpected success or fatal error"),
> -                      print_slave_db_safe(thd->db), query_arg);
> +      if (!is_parallel_retry_error(rgi, actual_error))
> +        rli->report(ERROR_LEVEL, actual_error,
> +                    "Error '%s' on query. Default database: '%s'. Query: '%s'",
> +                    (actual_error ? thd->get_stmt_da()->message() :
> +                     "unexpected success or fatal error"),
> +                    print_slave_db_safe(thd->db), query_arg);
>        thd->is_slave_error= 1;
>      }
>  
> @@ -7284,28 +7321,34 @@ int Xid_log_event::do_apply_event(rpl_gr
>    bool res;
>    int err;
>    rpl_gtid gtid;
> -  uint64 sub_id;
> +  uint64 sub_id= 0;
>    Relay_log_info const *rli= rgi->rli;
>  
> +  mysql_reset_thd_for_next_command(thd);

hmm. mysql_reset_thd_for_next_command() is called before the new
sql statement, not at the end of the old one.
So, you end up calling it twice.
It is not a function to use when you only need to reset the error status.
The same, I suppose, applies to the previous chunk, Gtid_log_event::do_apply_event

>    /*
>      Record any GTID in the same transaction, so slave state is transactionally
>      consistent.
>    */
> -  if ((sub_id= rgi->gtid_sub_id))
> +  if (rgi->gtid_pending)
>    {
> -    /* Clear the GTID from the RLI so we don't accidentally reuse it. */
> -    rgi->gtid_sub_id= 0;
> +    sub_id= rgi->gtid_sub_id;
> +    rgi->gtid_pending= false;
>  
>      gtid= rgi->current_gtid;
>      err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
>      if (err)
>      {
> -      rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
> -                  "Error during XID COMMIT: failed to update GTID state in "
> -                  "%s.%s: %d: %s",
> -                  "mysql", rpl_gtid_slave_state_table_name.str,
> -                  thd->get_stmt_da()->sql_errno(),
> -                  thd->get_stmt_da()->message());
> +      int ec= thd->get_stmt_da()->sql_errno();
> +      /*
> +        Do not report an error if this is really a kill due to a deadlock.
> +        In this case, the transaction will be re-tried instead.
> +      */
> +      if (!is_parallel_retry_error(rgi, ec))
> +        rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
> +                    "Error during XID COMMIT: failed to update GTID state in "
> +                    "%s.%s: %d: %s",
> +                    "mysql", rpl_gtid_slave_state_table_name.str, ec,
> +                    thd->get_stmt_da()->message());
>        trans_rollback(thd);
>        thd->is_slave_error= 1;
>        return err;
> === modified file 'sql/rpl_parallel.cc'
> --- sql/rpl_parallel.cc	2014-04-25 10:58:31 +0000
> +++ sql/rpl_parallel.cc	2014-06-10 17:48:32 +0000
> @@ -197,6 +187,279 @@ unlock_or_exit_cond(THD *thd, mysql_mute
>  }
>  
>  
> +static void
> +register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
> +                                           rpl_parallel_entry *entry)
> +{
> +  mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
> +  if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
> +  {
> +    /*
> +      Register that the commit of this event group must wait for the
> +      commit of the previous event group to complete before it may
> +      complete itself, so that we preserve commit order.
> +    */
> +    wait_for_commit *waitee=
> +      &rgi->wait_commit_group_info->commit_orderer;
> +    rgi->commit_orderer.register_wait_for_prior_commit(waitee);
> +  }
> +}
> +
> +
> +#ifndef DBUG_OFF
> +static int
> +dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
> +{
> +  if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
> +      rgi->retry_event_count == 4)
> +  {
> +    thd->clear_error();
> +    thd->get_stmt_da()->reset_diagnostics_area();
> +    my_error(ER_LOCK_DEADLOCK, MYF(0));
> +    return 1;
> +  }
> +  return 0;
> +}
> +#endif
> +
> +
> +/*
> +  If we detect a deadlock due to eg. storage engine locks that conflict with
> +  the fixed commit order, then the later transaction will be killed
> +  asynchroneously to allow the former to complete its commit.
> +
> +  In this case, we convert the 'killed' error into a deadlock error, and retry
> +  the later transaction.  */
> +static void
> +convert_kill_to_deadlock_error(rpl_group_info *rgi)
> +{
> +  THD *thd= rgi->thd;
> +  int err_code= thd->get_stmt_da()->sql_errno();
> +
> +  if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
> +      rgi->killed_for_retry)
> +  {
> +    thd->clear_error();
> +    thd->get_stmt_da()->reset_diagnostics_area();

Why do you always do both clear_error() and reset_diagnostics_area()?
The former normally includes the latter.

> +    my_error(ER_LOCK_DEADLOCK, MYF(0));
> +    rgi->killed_for_retry= false;
> +    thd->reset_killed();
> +  }
> +}
> +
> +
> +static bool
> +is_group_ending(Log_event *ev, Log_event_type event_type)
> +{
> +  return event_type == XID_EVENT ||
> +         (event_type == QUERY_EVENT &&
> +          (((Query_log_event *)ev)->is_commit() ||
> +           ((Query_log_event *)ev)->is_rollback()));
> +}
> +
> +
> +static int
> +retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
> +                  rpl_parallel_thread::queued_event *orig_qev)
> +{
> +  IO_CACHE rlog;
> +  LOG_INFO linfo;
> +  File fd= (File)-1;
> +  const char *errmsg= NULL;
> +  inuse_relaylog *ir= rgi->relay_log;
> +  uint64 event_count;
> +  uint64 events_to_execute= rgi->retry_event_count;
> +  Relay_log_info *rli= rgi->rli;
> +  int err;
> +  ulonglong cur_offset, old_offset;
> +  char log_name[FN_REFLEN];
> +  THD *thd= rgi->thd;
> +  rpl_parallel_entry *entry= rgi->parallel_entry;
> +  ulong retries= 0;
> +
> +do_retry:
> +  event_count= 0;
> +  err= 0;
> +
> +  /*
> +    If we already started committing before getting the deadlock (or other
> +    error) that caused us to need to retry, we have already signalled
> +    subsequent transactions that we have started committing. This is
> +    potentially a problem, as now we will rollback, and if subsequent
> +    transactions would start to execute now, they could see an unexpected
> +    state of the database and get eg. key not found or duplicate key error.
> +
> +    However, to get a deadlock in the first place, there must have been
> +    another earlier transaction that is waiting for us. Thus that other
> +    transaction has _not_ yet started to commit, and any subsequent
> +    transactions will still be waiting at this point.
> +
> +    So here, we decrement back the count of transactions that started
> +    committing (if we already incremented it), undoing the effect of an
> +    earlier mark_start_commit(). Then later, when the retry succeeds and we
> +    commit again, we can do a new mark_start_commit() and eventually wake up
> +    subsequent transactions at the proper time.
> +
> +    We need to do the unmark before the rollback, to be sure that the
> +    transaction we deadlocked with will not signal that it started to commit
> +    until after the unmark.
> +  */
> +  rgi->unmark_start_commit();
> +
> +  /*
> +    We might get the deadlock error that causes the retry during commit, while
> +    sitting in wait_for_prior_commit(). If this happens, we will have a
> +    pending error in the wait_for_commit object. So clear this by
> +    unregistering (and later re-registering) the wait.
> +  */
> +  if(thd->wait_for_commit_ptr)
> +    thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
> +  rgi->cleanup_context(thd, 1);
> +
> +  mysql_mutex_lock(&rli->data_lock);
> +  ++rli->retried_trans;
> +  statistic_increment(slave_retried_transactions, LOCK_status);
> +  mysql_mutex_unlock(&rli->data_lock);
> +
> +  mysql_mutex_lock(&entry->LOCK_parallel_entry);
> +  register_wait_for_prior_event_group_commit(rgi, entry);
> +  mysql_mutex_unlock(&entry->LOCK_parallel_entry);
> +
> +  strcpy(log_name, ir->name);

use strmake_buf, please. even if strcpy is safe here,
if you use a strmake_buf, a reviewer (not just me, but any developer who'll
see this code) wouldn't need to stop and think whether this line is safe.

> +  if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
> +  {
> +    err= 1;
> +    goto err;
> +  }
> +  cur_offset= rgi->retry_start_offset;
> +  my_b_seek(&rlog, cur_offset);
> +
> +  do
> +  {
> +    Log_event_type event_type;
> +    Log_event *ev;
> +    rpl_parallel_thread::queued_event *qev;
> +
> +    /* The loop is here so we can try again the next relay log file on EOF. */
> +    for (;;)
> +    {
> +      old_offset= cur_offset;
> +      ev= Log_event::read_log_event(&rlog, 0,
> +                                    rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
> +                                    opt_slave_sql_verify_checksum);

I don't particularly like that there's a second binlog-reading loop here.
It would be better to abstract the existing one, somehow, and reuse it here.
But I suppose it would've been too difficult to do :(

> +      cur_offset= my_b_tell(&rlog);
> +
> +      if (ev)
> +        break;
> +      if (rlog.error < 0)
> +      {
> +        errmsg= "slave SQL thread aborted because of I/O error";
> +        err= 1;
> +        goto err;
> +      }
> +      if (rlog.error > 0)
> +      {
> +        sql_print_error("Slave SQL thread: I/O error reading "
> +                        "event(errno: %d  cur_log->error: %d)",
> +                        my_errno, rlog.error);
> +        errmsg= "Aborting slave SQL thread because of partial event read";
> +        err= 1;
> +        goto err;
> +      }
> +      /* EOF. Move to the next relay log. */
> +      end_io_cache(&rlog);
> +      mysql_file_close(fd, MYF(MY_WME));
> +      fd= (File)-1;
> +
> +      /* Find the next relay log file. */
> +      if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
> +         (err= rli->relay_log.find_next_log(&linfo, 1)))
> +      {
> +        char buff[22];
> +        sql_print_error("next log error: %d  offset: %s  log: %s",
> +                        err,
> +                        llstr(linfo.index_file_offset, buff),
> +                        log_name);
> +        goto err;
> +      }
> +      strmake_buf(log_name ,linfo.log_file_name);
> +
> +      if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
> +      {
> +        err= 1;
> +        goto err;
> +      }
> +      /* Loop to try again on the new log file. */
> +    }
> +
> +    event_type= ev->get_type_code();
> +    if (!Log_event::is_group_event(event_type))
> +    {
> +      delete ev;
> +      continue;
> +    }
> +    ev->thd= thd;
> +
> +    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> +    qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
> +                            cur_offset - old_offset);
> +    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +    if (!qev)
> +    {
> +      delete ev;
> +      my_error(ER_OUT_OF_RESOURCES, MYF(0));
> +      err= 1;
> +      goto err;
> +    }
> +    if (is_group_ending(ev, event_type))
> +      rgi->mark_start_commit();
> +
> +    err= rpt_handle_event(qev, rpt);
> +    ++event_count;
> +    mysql_mutex_lock(&rpt->LOCK_rpl_thread);
> +    rpt->free_qev(qev);
> +    mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
> +
> +    delete_or_keep_event_post_apply(rgi, event_type, ev);
> +    DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
> +                    if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
> +    DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
> +                    err= dbug_simulate_tmp_error(rgi, thd););
> +    if (err)
> +    {
> +      convert_kill_to_deadlock_error(rgi);
> +      if (has_temporary_error(thd))
> +      {
> +        ++retries;
> +        if (retries < slave_trans_retries)
> +        {
> +          end_io_cache(&rlog);
> +          mysql_file_close(fd, MYF(MY_WME));
> +          fd= (File)-1;
> +          goto do_retry;
> +        }
> +        sql_print_error("Slave worker thread retried transaction %lu time(s) "
> +                        "in vain, giving up. Consider raising the value of "
> +                        "the slave_transaction_retries variable.",
> +                        slave_trans_retries);
> +      }
> +      goto err;
> +    }
> +  } while (event_count < events_to_execute);
> +
> +err:
> +
> +  if (fd >= 0)
> +  {
> +    end_io_cache(&rlog);
> +    mysql_file_close(fd, MYF(MY_WME));
> +  }
> +  if (errmsg)
> +    sql_print_error("Error reading relay log event: %s", errmsg);
> +  return err;
> +}
> +
> +
>  pthread_handler_t
>  handle_rpl_parallel_thread(void *arg)
>  {

Regards,
Sergei


Follow ups

References