← Back to team overview

maria-developers team mailing list archive

Re: [Commits] b428e82: MDEV-7974 backport fix for mysql bug#12161 (XA and binlog).

 

Привет, Алексей!

Я просматриваю патч. Где-нить через 2-3 часа должны быть комментарии от
меня. Можно будет созвониться. И сразу - полагаю, ты сейчас прямо
усердно трудишься над тестами (в патче не присутствующими).

Andrei.

holyfoot@xxxxxxxxxxxx (Alexey Botchkov) writes:

> revision-id: b428e822da09b2bc82a2447332bb980f93c80262 (mariadb-10.4.1-103-gb428e82)
> parent(s): 8aae31cf494678b6253031c627566e50bc666920
> committer: Alexey Botchkov
> timestamp: 2019-02-14 02:46:57 +0400
> message:
>
> MDEV-7974 backport fix for mysql bug#12161 (XA and binlog).
>
> XA transactions now are kept persistent after prepare.
> XA_prepare_log_event implamented, and XA tranasctions are logged
> as XA transactions.
>
> ---
>  sql/handler.cc     |   9 ++
>  sql/handler.h      |  10 ++
>  sql/log.cc         | 115 +++++++++++++---
>  sql/log.h          |  10 ++
>  sql/log_event.cc   | 397 +++++++++++++++++++++++++++++++++++++++++++++++++++--
>  sql/log_event.h    |  81 +++++++++++
>  sql/sql_class.cc   |  18 +--
>  sql/sql_class.h    |  20 ++-
>  sql/sql_connect.cc |   1 +
>  sql/transaction.cc |  69 +++++++++-
>  sql/transaction.h  |   1 +
>  11 files changed, 691 insertions(+), 40 deletions(-)
>
> diff --git a/sql/handler.cc b/sql/handler.cc
> index 001055c..3b2a3e0 100644
> --- a/sql/handler.cc
> +++ b/sql/handler.cc
> @@ -1214,6 +1214,9 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all)
>  }
>  
>  
> +/*static inline */int
> +binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
> +                                   bool all, my_xid xid);
>  /**
>    @retval
>      0   ok
> @@ -1225,6 +1228,7 @@ int ha_prepare(THD *thd)
>    int error=0, all=1;
>    THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
>    Ha_trx_info *ha_info= trans->ha_list;
> +
>    DBUG_ENTER("ha_prepare");
>  
>    if (ha_info)
> @@ -1250,6 +1254,11 @@ int ha_prepare(THD *thd)
>  
>        }
>      }
> +    if (unlikely(tc_log->log_prepare(thd, all)))
> +    {
> +      ha_rollback_trans(thd, all);
> +      error=1;
> +    }
>    }
>  
>    DBUG_RETURN(error);
> diff --git a/sql/handler.h b/sql/handler.h
> index fc6246c..613c1c3 100644
> --- a/sql/handler.h
> +++ b/sql/handler.h
> @@ -810,6 +810,16 @@ struct xid_t {
>    long gtrid_length;
>    long bqual_length;
>    char data[XIDDATASIZE];  // not \0-terminated !
> +  /*
> +    The size of the string containing serialized Xid representation
> +    is computed as a sum of
> +    eight as the number of formatting symbols (X'',X'',)
> +    plus 2 x XIDDATASIZE (2 due to hex format),
> +    plus space for decimal digits of XID::formatID,
> +    plus one for 0x0.
> +  */
> +  static const uint ser_buf_size=
> +    8 + 2 * XIDDATASIZE + 4 * sizeof(long) + 1;
>  
>    xid_t() {}                                /* Remove gcc warning */
>    bool eq(struct xid_t *xid)
> diff --git a/sql/log.cc b/sql/log.cc
> index a56117a..316b871 100644
> --- a/sql/log.cc
> +++ b/sql/log.cc
> @@ -87,6 +87,9 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton,
>  static int binlog_commit(handlerton *hton, THD *thd, bool all);
>  static int binlog_rollback(handlerton *hton, THD *thd, bool all);
>  static int binlog_prepare(handlerton *hton, THD *thd, bool all);
> +static int binlog_xa_recover(handlerton *hton, XID *xid_list, uint len);
> +static int binlog_commit_by_xid(handlerton *hton, XID *xid);
> +static int binlog_rollback_by_xid(handlerton *hton, XID *xid);
>  static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd);
>  
>  static const LEX_CSTRING write_error_msg=
> @@ -1688,6 +1691,9 @@ int binlog_init(void *p)
>    binlog_hton->commit= binlog_commit;
>    binlog_hton->rollback= binlog_rollback;
>    binlog_hton->prepare= binlog_prepare;
> +  binlog_hton->recover= binlog_xa_recover;
> +  binlog_hton->commit_by_xid = binlog_commit_by_xid;
> +  binlog_hton->rollback_by_xid = binlog_rollback_by_xid;
>    binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot;
>    binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
>    return 0;
> @@ -1883,23 +1889,16 @@ static inline int
>  binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr,
>                                 bool all, my_xid xid)
>  {
> -  if (xid)
> -  {
> -    Xid_log_event end_evt(thd, xid, TRUE);
> -    return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
> -  }
> -  else
> +  /* Mask XA COMMIT ... ONE PHASE as plain BEGIN ... COMMIT */
> +  if (!xid)
>    {
> -    /*
> -      Empty xid occurs in XA COMMIT ... ONE PHASE.
> -      In this case, we do not have a MySQL xid for the transaction, and the
> -      external XA transaction coordinator will have to handle recovery if
> -      needed. So we end the transaction with a plain COMMIT query event.
> -    */
> -    Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
> -                            TRUE, TRUE, TRUE, 0);
> -    return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
> +    DBUG_ASSERT(thd->transaction.xid_state.xa_state == XA_IDLE &&
> +                thd->lex->xa_opt == XA_ONE_PHASE);
> +    xid= thd->query_id;
>    }
> +
> +  Xid_log_event end_evt(thd, xid, TRUE);
> +  return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
>  }
>  
>  /**
> @@ -1961,11 +1960,77 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all)
>      do nothing.
>      just pretend we can do 2pc, so that MySQL won't
>      switch to 1pc.
> -    real work will be done in MYSQL_BIN_LOG::log_and_order()
> +    real work is done in MYSQL_BIN_LOG::log_and_order()
>    */
>    return 0;
>  }
>  
> +
> +static int serialize_xid(XID *xid, char *buf)
> +{
> +  size_t size;
> +  buf[0]= '\'';
> +  memcpy(buf+1, xid->data, xid->gtrid_length);
> +  size= xid->gtrid_length + 2;
> +  buf[size-1]= '\'';
> +  if (xid->bqual_length == 0 && xid->formatID == 1)
> +    return size;
> +
> +  memcpy(buf+size, ", '", 3);
> +  memcpy(buf+size+3, xid->data+xid->gtrid_length, xid->bqual_length);
> +  size+= 3 + xid->bqual_length;
> +  buf[size]= '\'';
> +  size++;
> +  if (xid->formatID != 1)
> +    size+= sprintf(buf+size, ", %ld", xid->formatID);
> +  return size;
> +}
> +
> +
> +static int binlog_xa_recover(handlerton *hton __attribute__((unused)),
> +                             XID *xid_list __attribute__((unused)),
> +                             uint len __attribute__((unused)))
> +{
> +  /* Does nothing. */
> +  return 0;
> +}
> +
> +
> +static int binlog_commit_by_xid(handlerton *hton, XID *xid)
> +{
> +  THD *thd= current_thd;
> +  const size_t xc_len= sizeof("XA COMMIT ") - 1; // do not count trailing 0
> +  char buf[xc_len + xid_t::ser_buf_size];
> +  size_t buflen;
> +  binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
> +
> +  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT);
> +
> +  if (!cache_mngr)
> +    return 1;
> +
> +  memcpy(buf, "XA COMMIT ", xc_len);
> +  buflen= xc_len + serialize_xid(xid, buf+xc_len);
> +  Query_log_event qe(thd, buf, buflen, TRUE, FALSE, FALSE, 0);
> +  return binlog_flush_cache(thd, cache_mngr, &qe, TRUE, TRUE, TRUE); 
> +}
> +
> +
> +static int binlog_rollback_by_xid(handlerton *hton, XID *xid)
> +{
> +  THD *thd= current_thd;
> +  const size_t xr_len= sizeof("XA ROLLBACK ") - 1; // do not count trailing 0
> +  char buf[xr_len + xid_t::ser_buf_size];
> +  size_t buflen;
> +
> +  DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK);
> +  memcpy(buf, "XA ROLLBACK ", xr_len);
> +  buflen= xr_len + serialize_xid(xid, buf+xr_len);
> +  Query_log_event qe(thd, buf, buflen, FALSE, TRUE, TRUE, 0);
> +  return mysql_bin_log.write_event(&qe);
> +}
> +
> +
>  /*
>    We flush the cache wrapped in a beging/rollback if:
>      . aborting a single or multi-statement transaction and;
> @@ -9809,6 +9874,24 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
>    DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie));
>  }
>  
> +
> +int TC_LOG_BINLOG::log_prepare(THD *thd, bool all)
> +{
> +  XID *xid= &thd->transaction.xid_state.xid;
> +  XA_prepare_log_event end_evt(thd, xid, FALSE);
> +  binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
> +
> +  if (!cache_mngr)
> +  {
> +    WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
> +    return 0;
> +  }
> +
> +  cache_mngr->using_xa= FALSE;
> +  return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE));
> +}
> +
> +
>  void
>  TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie)
>  {
> diff --git a/sql/log.h b/sql/log.h
> index 7dfdb36..92fdf95 100644
> --- a/sql/log.h
> +++ b/sql/log.h
> @@ -61,6 +61,7 @@ class TC_LOG
>                              bool need_prepare_ordered,
>                              bool need_commit_ordered) = 0;
>    virtual int unlog(ulong cookie, my_xid xid)=0;
> +  virtual int log_prepare(THD *thd, bool all)= 0;
>    virtual void commit_checkpoint_notify(void *cookie)= 0;
>  
>  protected:
> @@ -115,6 +116,10 @@ class TC_LOG_DUMMY: public TC_LOG // use it to disable the logging
>      return 1;
>    }
>    int unlog(ulong cookie, my_xid xid)  { return 0; }
> +  int log_prepare(THD *thd, bool all)
> +  {
> +    return 0;
> +  }
>    void commit_checkpoint_notify(void *cookie) { DBUG_ASSERT(0); };
>  };
>  
> @@ -198,6 +203,10 @@ class TC_LOG_MMAP: public TC_LOG
>    int log_and_order(THD *thd, my_xid xid, bool all,
>                      bool need_prepare_ordered, bool need_commit_ordered);
>    int unlog(ulong cookie, my_xid xid);
> +  int log_prepare(THD *thd, bool all)
> +  {
> +    return 0;
> +  }
>    void commit_checkpoint_notify(void *cookie);
>    int recover();
>  
> @@ -698,6 +707,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
>    int log_and_order(THD *thd, my_xid xid, bool all,
>                      bool need_prepare_ordered, bool need_commit_ordered);
>    int unlog(ulong cookie, my_xid xid);
> +  int log_prepare(THD *thd, bool all);
>    void commit_checkpoint_notify(void *cookie);
>    int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log,
>                Format_description_log_event *fdle, bool do_xa);
> diff --git a/sql/log_event.cc b/sql/log_event.cc
> index 7a0d0be..354c5f3 100644
> --- a/sql/log_event.cc
> +++ b/sql/log_event.cc
> @@ -2139,6 +2139,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
>      case XID_EVENT:
>        ev = new Xid_log_event(buf, fdle);
>        break;
> +    case XA_PREPARE_LOG_EVENT:
> +      ev = new XA_prepare_log_event(buf, fdle);
> +      break;
>      case RAND_EVENT:
>        ev = new Rand_log_event(buf, fdle);
>        break;
> @@ -2190,7 +2193,6 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
>      case PREVIOUS_GTIDS_LOG_EVENT:
>      case TRANSACTION_CONTEXT_EVENT:
>      case VIEW_CHANGE_EVENT:
> -    case XA_PREPARE_LOG_EVENT:
>        ev= new Ignorable_log_event(buf, fdle,
>                                    get_type_str((Log_event_type) event_type));
>        break;
> @@ -6222,6 +6224,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
>        post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN;
>        post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
>        post_header_len[XID_EVENT-1]= XID_HEADER_LEN;
> +      post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN;
>        post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN;
>        post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
>        /*
> @@ -7874,7 +7877,7 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
>    buf+= 8;
>    domain_id= uint4korr(buf);
>    buf+= 4;
> -  flags2= *buf;
> +  flags2= *(buf++);
>    if (flags2 & FL_GROUP_COMMIT_ID)
>    {
>      if (event_len < (uint)header_size + GTID_HEADER_LEN + 2)
> @@ -7882,8 +7885,22 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
>        seq_no= 0;                                // So is_valid() returns false
>        return;
>      }
> -    ++buf;
>      commit_id= uint8korr(buf);
> +    buf+= 8;
> +  }
> +  if (flags2 & FL_XA_TRANSACTION)
> +  {
> +    xid.formatID= (long) buf[0];
> +    xid.gtrid_length= (long) buf[1];
> +    xid.bqual_length= (long) buf[2];
> +
> +    buf+= 3;
> +    if (xid.formatID >= 0)
> +    {
> +      long data_length= xid.bqual_length + xid.gtrid_length;
> +      memcpy(xid.data, buf, data_length);
> +      buf+= data_length;
> +    }
>    }
>  }
>  
> @@ -7914,6 +7931,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
>    /* Preserve any DDL or WAITED flag in the slave's binlog. */
>    if (thd_arg->rgi_slave)
>      flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED));
> +  if (thd->transaction.xid_state.xa_state == XA_IDLE &&
> +      thd->lex->xa_opt != XA_ONE_PHASE)
> +  {
> +    flags2|= FL_XA_TRANSACTION;
> +    xid= thd->transaction.xid_state.xid;
> +  }
>  }
>  
>  
> @@ -7956,7 +7979,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len,
>  bool
>  Gtid_log_event::write()
>  {
> -  uchar buf[GTID_HEADER_LEN+2];
> +  uchar buf[GTID_HEADER_LEN+2+sizeof(XID)];
>    size_t write_len;
>  
>    int8store(buf, seq_no);
> @@ -7968,8 +7991,25 @@ Gtid_log_event::write()
>      write_len= GTID_HEADER_LEN + 2;
>    }
>    else
> +    write_len= 13;
> +
> +  if (flags2 & FL_XA_TRANSACTION)
> +  {
> +    buf[write_len]= (uchar) ((char) xid.formatID);
> +    buf[write_len+1]= (uchar) xid.gtrid_length;
> +    buf[write_len+2]= (uchar) xid.bqual_length;
> +    write_len+= 3;
> +    if (xid.formatID >= 0)
> +    {
> +      long data_length= xid.bqual_length + xid.gtrid_length;
> +      memcpy(buf+write_len, xid.data, data_length);
> +      write_len+= data_length;
> +    }
> +  }
> +
> +  if (write_len < GTID_HEADER_LEN)
>    {
> -    bzero(buf+13, GTID_HEADER_LEN-13);
> +    bzero(buf+write_len, GTID_HEADER_LEN-write_len);
>      write_len= GTID_HEADER_LEN;
>    }
>    return write_header(write_len) ||
> @@ -8012,7 +8052,7 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
>  void
>  Gtid_log_event::pack_info(Protocol *protocol)
>  {
> -  char buf[6+5+10+1+10+1+20+1+4+20+1];
> +  char buf[6+5+10+1+10+1+20+1+4+20+1+5+128];
>    char *p;
>    p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
>    p= longlong10_to_str(domain_id, p, 10);
> @@ -8026,6 +8066,12 @@ Gtid_log_event::pack_info(Protocol *protocol)
>      p= longlong10_to_str(commit_id, p, 10);
>    }
>  
> +  if (flags2 & FL_XA_TRANSACTION)
> +  {
> +    p= strmov(p, " XID :");
> +    p= strnmov(p, xid.data, xid.bqual_length + xid.gtrid_length);
> +  }
> +
>    protocol->store(buf, p-buf, &my_charset_bin);
>  }
>  
> @@ -8079,11 +8125,25 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
>    thd->lex->sql_command= SQLCOM_BEGIN;
>    thd->is_slave_error= 0;
>    status_var_increment(thd->status_var.com_stat[thd->lex->sql_command]);
> -  if (trans_begin(thd, 0))
> +  if (flags2 & FL_XA_TRANSACTION)
>    {
> -    DBUG_PRINT("error", ("trans_begin() failed"));
> -    thd->is_slave_error= 1;
> +    thd->lex->xid= &xid;
> +    thd->lex->xa_opt= XA_NONE;
> +    if (trans_xa_start(thd))
> +    {
> +      DBUG_PRINT("error", ("trans_xa_start() failed"));
> +      thd->is_slave_error= 1;
> +    }
> +  }
> +  else
> +  {
> +    if (trans_begin(thd, 0))
> +    {
> +      DBUG_PRINT("error", ("trans_begin() failed"));
> +      thd->is_slave_error= 1;
> +    }
>    }
> +
>    thd->update_stats();
>  
>    if (likely(!thd->is_slave_error))
> @@ -8202,9 +8262,29 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
>                        buf, print_event_info->delimiter))
>          goto err;
>    }
> -  if (!(flags2 & FL_STANDALONE))
> -    if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", print_event_info->delimiter))
> +  if ((flags2 & FL_XA_TRANSACTION) && !is_flashback)
> +  {
> +    my_b_write_string(&cache, "XA START '");
> +    my_b_write(&cache, (uchar *) xid.data, xid.gtrid_length);
> +    my_b_write_string(&cache, "'");
> +    if (xid.bqual_length > 0 || xid.formatID != 1)
> +    {
> +      my_b_write_string(&cache, ", '");
> +      my_b_write(&cache, (uchar *) xid.data+xid.gtrid_length, xid.bqual_length);
> +      my_b_write_string(&cache, "'");
> +      if (xid.formatID != 1)
> +        if (my_b_printf(&cache, ", %d", xid.formatID))
> +          goto err;
> +    }
> +    if (my_b_printf(&cache, "%s\n", print_event_info->delimiter))
> +      goto err;
> +  }
> +  else if (!(flags2 & FL_STANDALONE))
> +  {
> +    if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n",
> +                    print_event_info->delimiter))
>        goto err;
> +  }
>  
>    return cache.flush_data();
>  err:
> @@ -9003,6 +9083,300 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi)
>  #endif /* !MYSQL_CLIENT */
>  
>  
> +/**
> +  Function serializes XID which is characterized by by four last arguments
> +  of the function.
> +  Serialized XID is presented in valid hex format and is returned to
> +  the caller in a buffer pointed by the first argument.
> +  The buffer size provived by the caller must be not less than
> +  8 + 2 * XIDDATASIZE +  4 * sizeof(XID::formatID) + 1, see
> +  XID::serialize_xid() that is a caller and plugin.h for XID declaration.
> +
> +  @param buf  pointer to a buffer allocated for storing serialized data
> +
> +  @return  the value of the buffer pointer
> +*/
> +
> +char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const
> +{
> +  int i;
> +  char *c= buf;
> +  /*
> +    Build a string like following pattern:
> +      X'hex11hex12...hex1m',X'hex21hex22...hex2n',11
> +    and store it into buf.
> +    Here hex1i and hex2k are hexadecimals representing XID's internal
> +    raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers
> +    half of which corresponding to the lengths of XID's components.
> +  */
> +  c[0]= 'X';
> +  c[1]= '\'';
> +  c+= 2;
> +  for (i= 0; i < gtrid_length; i++)
> +  {
> +    c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4];
> +    c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f];
> +    c+= 2;
> +  }
> +  c[0]= '\'';
> +  c[1]= ',';
> +  c[2]= 'X';
> +  c[3]= '\'';
> +  c+= 4;
> +
> +  for (; i < gtrid_length + bqual_length; i++)
> +  {
> +    c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4];
> +    c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f];
> +    c+= 2;
> +  }
> +  c[0]= '\'';
> +  sprintf(c+1, ",%lu", formatID);
> +
> + return buf;
> +}
> +
> +
> +/**************************************************************************
> +  XA_prepare_log_event methods
> +**************************************************************************/
> +/**
> +  @note
> +  It's ok not to use int8store here,
> +  as long as xid_t::set(ulonglong) and
> +  xid_t::get_n_xid doesn't do it either.
> +  We don't care about actual values of xids as long as
> +  identical numbers compare identically
> +*/
> +
> +XA_prepare_log_event::
> +XA_prepare_log_event(const char* buf,
> +                     const Format_description_log_event *description_event)
> +  :Log_event(buf, description_event)
> +{
> +  uint32 temp= 0;
> +  uint8 temp_byte;
> +
> +  buf+= description_event->common_header_len +
> +    description_event->post_header_len[XA_PREPARE_LOG_EVENT-1];
> +  memcpy(&temp_byte, buf, 1);
> +  one_phase= (bool) temp_byte;
> +  buf += sizeof(temp_byte);
> +  memcpy(&temp, buf, sizeof(temp));
> +  m_xid.formatID= le32toh(temp);
> +  buf += sizeof(temp);
> +  memcpy(&temp, buf, sizeof(temp));
> +  m_xid.gtrid_length= le32toh(temp);
> +  buf += sizeof(temp);
> +  memcpy(&temp, buf, sizeof(temp));
> +  m_xid.bqual_length= le32toh(temp);
> +  buf += sizeof(temp);
> +  memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length);
> +
> +  xid= NULL;
> +}
> +
> +
> +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> +void XA_prepare_log_event::pack_info(Protocol *protocol)
> +{
> +  char buf[ser_buf_size];
> +  char query[sizeof("XA COMMIT ONE PHASE") + 1 + sizeof(buf)];
> +
> +  /* RHS of the following assert is unknown to client sources */
> +  compile_time_assert(ser_buf_size == XID::ser_buf_size);
> +  m_xid.serialize(buf);
> +  sprintf(query,
> +          (one_phase ? "XA COMMIT %s ONE PHASE" :  "XA PREPARE %s"),
> +          buf);
> +
> +  protocol->store(query, strlen(query), &my_charset_bin);
> +}
> +#endif
> +
> +
> +#ifndef MYSQL_CLIENT
> +bool XA_prepare_log_event::write()
> +{
> +  uchar data[1 + 4 + 4 + 4];
> +  uint8 one_phase_byte= one_phase;
> +
> +  data[0]= one_phase;
> +  int4store(data+1, static_cast<XID*>(xid)->formatID);
> +  int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length);
> +  int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length);
> +
> +  DBUG_ASSERT(xid_bufs_size == sizeof(data) - 1);
> +
> +  return write_header(sizeof(one_phase_byte) + xid_bufs_size +
> +                      static_cast<XID*>(xid)->gtrid_length +
> +                      static_cast<XID*>(xid)->bqual_length) ||
> +         write_data(data, sizeof(data)) ||
> +         write_data((uchar*) static_cast<XID*>(xid)->data,
> +                     static_cast<XID*>(xid)->gtrid_length +
> +                     static_cast<XID*>(xid)->bqual_length) ||
> +         write_footer();
> +}
> +#endif
> +
> +
> +#ifdef MYSQL_CLIENT
> +bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
> +{
> +  Write_on_release_cache cache(&print_event_info->head_cache, file,
> +                               Write_on_release_cache::FLUSH_F, this);
> +  char buf[ser_buf_size];
> +
> +  m_xid.serialize(buf);
> +
> +  if (!print_event_info->short_form)
> +  {
> +    print_header(&cache, print_event_info, FALSE);
> +    if (my_b_printf(&cache, "\tXID = %s\n", buf))
> +      goto error;
> +  }
> +
> +  if (my_b_printf(&cache, "XA END %s\n%s\n",
> +                   buf, print_event_info->delimiter) ||
> +    my_b_printf(&cache, "XA PREPARE %s\n%s\n",
> +                   buf, print_event_info->delimiter))
> +    goto error;
> +
> +  return cache.flush_data();
> +error:
> +  return TRUE;
> +}
> +#endif /* MYSQL_CLIENT */
> +
> +
> +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> +int XA_prepare_log_event::do_apply_event(rpl_group_info *rgi)
> +{
> +  bool res;
> +  int err;
> +  rpl_gtid gtid;
> +  uint64 sub_id= 0;
> +  Relay_log_info const *rli= rgi->rli;
> +  xid_t xid;
> +  void *hton= NULL;
> +
> +  /*
> +    XID_EVENT works like a COMMIT statement. And it also updates the
> +    mysql.gtid_slave_pos table with the GTID of the current transaction.
> +
> +    Therefore, it acts much like a normal SQL statement, so we need to do
> +    THD::reset_for_next_command() as if starting a new statement.
> +  */
> +  thd->reset_for_next_command();
> +  /*
> +    Record any GTID in the same transaction, so slave state is transactionally
> +    consistent.
> +  */
> +#ifdef WITH_WSREP
> +  thd->wsrep_affected_rows= 0;
> +#endif
> +
> +  if (rgi->gtid_pending)
> +  {
> +    xa_states c_state= thd->transaction.xid_state.xa_state;
> +    sub_id= rgi->gtid_sub_id;
> +    rgi->gtid_pending= false;
> +
> +    gtid= rgi->current_gtid;
> +
> +    thd->transaction.xid_state.xa_state= XA_ACTIVE;
> +    err= rpl_global_gtid_slave_state->record_gtid(thd, &gtid, sub_id, true,
> +                                                  false, &hton);
> +    thd->transaction.xid_state.xa_state= c_state;
> +    if (err)
> +    {
> +      int ec= thd->get_stmt_da()->sql_errno();
> +      /*
> +        Do not report an error if this is really a kill due to a deadlock.
> +        In this case, the transaction will be re-tried instead.
> +      */
> +      if (!is_parallel_retry_error(rgi, ec))
> +        rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
> +                    "Error during XID COMMIT: failed to update GTID state in "
> +                    "%s.%s: %d: %s",
> +                    "mysql", rpl_gtid_slave_state_table_name.str, ec,
> +                    thd->get_stmt_da()->message());
> +      thd->is_slave_error= 1;
> +      return err;
> +    }
> +
> +    DBUG_EXECUTE_IF("gtid_fail_after_record_gtid",
> +        { my_error(ER_ERROR_DURING_COMMIT, MYF(0), HA_ERR_WRONG_COMMAND);
> +          thd->is_slave_error= 1;
> +          return 1;
> +        });
> +  }
> +  /* For a slave XA_prepare_log_event is COMMIT */
> +  general_log_print(thd, COM_QUERY,
> +                    "COMMIT /* implicit, from Xid_log_event */");
> +  thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
> +
> +  xid.set(m_xid.formatID,
> +          m_xid.data, m_xid.gtrid_length,
> +          m_xid.data + m_xid.gtrid_length, m_xid.bqual_length);
> +
> +  thd->lex->xid= &xid;
> +  if (trans_xa_end(thd))
> +    return 1;
> +
> +  if (!one_phase)
> +  {
> +    res= trans_xa_prepare(thd);
> +  }
> +  else
> +  {
> +    res= trans_xa_commit(thd);
> +    thd->mdl_context.release_transactional_locks();
> + }
> +
> +
> +  if (!res && sub_id)
> +    rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, hton, rgi);
> +
> +  /*
> +    Increment the global status commit count variable
> +  */
> +  status_var_increment(thd->status_var.com_stat[SQLCOM_COMMIT]);
> +
> +  return res;
> +}
> +
> +
> +Log_event::enum_skip_reason
> +XA_prepare_log_event::do_shall_skip(rpl_group_info *rgi)
> +{
> +  DBUG_ENTER("Xid_log_event::do_shall_skip");
> +  if (rgi->rli->slave_skip_counter > 0)
> +  {
> +    DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
> +    thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
> +    DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
> +  }
> +#ifdef WITH_WSREP
> +  else if (wsrep_mysql_replication_bundle && WSREP_ON &&
> +           opt_slave_domain_parallel_threads == 0)
> +  {
> +    if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
> +    {
> +      WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
> +      DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE);
> +    }
> +    else
> +    {
> +      thd->wsrep_mysql_replicated = 0;
> +    }
> +  }
> +#endif
> +  DBUG_RETURN(Log_event::do_shall_skip(rgi));
> +}
> +#endif /* !MYSQL_CLIENT */
> +
> +
>  /**************************************************************************
>    User_var_log_event methods
>  **************************************************************************/
> @@ -14789,7 +15163,6 @@ bool event_that_should_be_ignored(const char *buf)
>        event_type == PREVIOUS_GTIDS_LOG_EVENT ||
>        event_type == TRANSACTION_CONTEXT_EVENT ||
>        event_type == VIEW_CHANGE_EVENT ||
> -      event_type == XA_PREPARE_LOG_EVENT ||
>        (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F))
>      return 1;
>    return 0;
> diff --git a/sql/log_event.h b/sql/log_event.h
> index 38a40c9..b5c48c9 100644
> --- a/sql/log_event.h
> +++ b/sql/log_event.h
> @@ -217,6 +217,7 @@ class String;
>  #define GTID_HEADER_LEN       19
>  #define GTID_LIST_HEADER_LEN   4
>  #define START_ENCRYPTION_HEADER_LEN 0
> +#define XA_PREPARE_HEADER_LEN 0
>  
>  /* 
>    Max number of possible extra bytes in a replication event compared to a
> @@ -3064,6 +3065,79 @@ class Xid_log_event: public Log_event
>  #endif
>  };
>  
> +
> +/**
> +  @class XA_prepare_log_event
> +
> +  Similar to Xid_log_event except that
> +  - it is specific to XA transaction
> +  - it carries out the prepare logics rather than the final committing
> +    when @c one_phase member is off.
> +  From the groupping perspective the event finalizes the current "prepare" group
> +  started with XA START Query-log-event.
> +  When @c one_phase is false Commit of Rollback for XA transaction are
> +  logged separately to the prepare-group events so being a groups of
> +  their own.
> +*/
> +
> +class XA_prepare_log_event: public Log_event
> +{
> +protected:
> +  /* The event_xid_t members were copied from handler.h */
> +  struct event_xid_t
> +  {
> +    long formatID;
> +    long gtrid_length;
> +    long bqual_length;
> +    char data[MYSQL_XIDDATASIZE];  // not \0-terminated !
> +    char *serialize(char *buf) const;
> +  };
> +
> +  /* size of serialization buffer is explained in $MYSQL/sql/xa.h. */
> +  static const uint ser_buf_size=
> +    8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1;
> +
> +  /* Total size of buffers to hold serialized members of XID struct */
> +  static const int xid_bufs_size= 12;
> +  event_xid_t m_xid;
> +  void *xid;
> +  bool one_phase;
> +
> +public:
> +#ifdef MYSQL_SERVER
> +  XA_prepare_log_event(THD* thd_arg, XID *xid_arg, bool one_phase_arg):
> +    Log_event(thd_arg, 0, TRUE), xid(xid_arg), one_phase(one_phase_arg)
> +  {
> +    cache_type= Log_event::EVENT_NO_CACHE;
> +  }
> +#ifdef HAVE_REPLICATION
> +  void pack_info(Protocol* protocol);
> +#endif /* HAVE_REPLICATION */
> +#else
> +  bool print(FILE* file, PRINT_EVENT_INFO* print_event_info);
> +#endif
> +  XA_prepare_log_event(const char* buf,
> +                       const Format_description_log_event *description_event);
> +  ~XA_prepare_log_event() {}
> +  Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; }
> +  int get_data_size()
> +  {
> +    return xid_bufs_size + m_xid.gtrid_length + m_xid.bqual_length;
> +  }
> +
> +#ifdef MYSQL_SERVER
> +  bool write();
> +#endif
> +  bool is_valid() const { return 1; }
> +
> +private:
> +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
> +  virtual int do_apply_event(rpl_group_info *rgi);
> +  enum_skip_reason do_shall_skip(rpl_group_info *rgi);
> +#endif
> +};
> +
> +
>  /**
>    @class User_var_log_event
>  
> @@ -3376,6 +3450,11 @@ class Gtid_log_event: public Log_event
>    uint64 seq_no;
>    uint64 commit_id;
>    uint32 domain_id;
> +#ifdef MYSQL_SERVER
> +   XID xid;
> +#else
> +   struct st_mysql_xid xid;
> +#endif
>    uchar flags2;
>  
>    /* Flags2. */
> @@ -3404,6 +3483,8 @@ class Gtid_log_event: public Log_event
>    static const uchar FL_WAITED= 16;
>    /* FL_DDL is set for event group containing DDL. */
>    static const uchar FL_DDL= 32;
> +  /* FL_XA_TRANSACTION is set for XA transaction. */
> +  static const uchar FL_XA_TRANSACTION= 64;
>  
>  #ifdef MYSQL_SERVER
>    Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
> diff --git a/sql/sql_class.cc b/sql/sql_class.cc
> index fa2f866..cc75da9 100644
> --- a/sql/sql_class.cc
> +++ b/sql/sql_class.cc
> @@ -1461,12 +1461,19 @@ void THD::cleanup(void)
>    DBUG_ASSERT(cleanup_done == 0);
>  
>    set_killed(KILL_CONNECTION);
> -#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE
>    if (transaction.xid_state.xa_state == XA_PREPARED)
>    {
> -#error xid_state in the cache should be replaced by the allocated value
> +    trans_detach(this);
> +    transaction.xid_state.xa_state= XA_NOTR;
> +    transaction.xid_state.rm_error= 0;
> +  }
> +  else
> +  {
> +    transaction.xid_state.xa_state= XA_NOTR;
> +    transaction.xid_state.rm_error= 0;
> +    trans_rollback(this);
> +    xid_cache_delete(this, &transaction.xid_state);
>    }
> -#endif
>  
>    mysql_ha_cleanup(this);
>    locked_tables_list.unlock_locked_tables(this);
> @@ -1474,11 +1481,6 @@ void THD::cleanup(void)
>    delete_dynamic(&user_var_events);
>    close_temporary_tables();
>  
> -  transaction.xid_state.xa_state= XA_NOTR;
> -  transaction.xid_state.rm_error= 0;
> -  trans_rollback(this);
> -  xid_cache_delete(this, &transaction.xid_state);
> -
>    DBUG_ASSERT(open_tables == NULL);
>    /*
>      If the thread was in the middle of an ongoing transaction (rolled
> diff --git a/sql/sql_class.h b/sql/sql_class.h
> index 69fabee..76befcb 100644
> --- a/sql/sql_class.h
> +++ b/sql/sql_class.h
> @@ -1255,6 +1255,18 @@ typedef struct st_xid_state {
>    /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
>    uint rm_error;
>    XID_cache_element *xid_cache_element;
> +  /*
> +    Binary logging status.
> +    It is set to TRUE at XA PREPARE if the transaction was written
> +    to the binlog.
> +    Naturally FALSE means the transaction was not written to
> +    the binlog. Happens if the trnasaction did not modify anything
> +    or binlogging was turned off. In that case we shouldn't binlog
> +    the consequent XA COMMIT/ROLLBACK.
> +    The recovered transaction after server restart sets it to TRUE always.
> +    That can cause inconsistencies (shoud be fixed?).
> +  */
> +  bool is_binlogged;
>  
>    /**
>      Check that XA transaction has an uncommitted work. Report an error
> @@ -1278,6 +1290,12 @@ typedef struct st_xid_state {
>      }
>      return false;
>    }
> +
> +  void reset()
> +  {
> +    xid.null();
> +    is_binlogged= FALSE;
> +  }
>  } XID_STATE;
>  
>  void xid_cache_init(void);
> @@ -2603,7 +2621,7 @@ class THD :public Statement,
>          then.
>        */
>        if (!xid_state.rm_error)
> -        xid_state.xid.null();
> +        xid_state.reset();
>        free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
>        DBUG_VOID_RETURN;
>      }
> diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
> index b48070b..3e4a067 100644
> --- a/sql/sql_connect.cc
> +++ b/sql/sql_connect.cc
> @@ -1414,6 +1414,7 @@ void do_handle_one_connection(CONNECT *connect)
>  #endif
>  end_thread:
>      close_connection(thd);
> +    thd->get_stmt_da()->reset_diagnostics_area();
>  
>      if (thd->userstat_running)
>        update_global_user_stats(thd, create_user, time(NULL));
> diff --git a/sql/transaction.cc b/sql/transaction.cc
> index 13614d3..64533d7 100644
> --- a/sql/transaction.cc
> +++ b/sql/transaction.cc
> @@ -790,6 +790,44 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
>  
>  
>  /**
> +  Detach the current XA transaction;
> +
> +  @param thd     Current thread
> +
> +  @retval FALSE  Success
> +  @retval TRUE   Failure
> +*/
> +
> +bool trans_detach(THD *thd)
> +{
> +  XID_STATE *xid_s= &thd->transaction.xid_state;
> +  Ha_trx_info *ha_info, *ha_info_next;
> +
> +  DBUG_ENTER("trans_detach");
> +
> +//  DBUG_ASSERT(xid_s->xa_state == XA_PREPARED &&
> +//              xid_cache_search(thd, &xid_s->xid));
> +
> +  xid_cache_delete(thd, xid_s);
> +  if (xid_cache_insert(&xid_s->xid, XA_PREPARED))
> +    DBUG_RETURN(TRUE);
> +
> +  for (ha_info= thd->transaction.all.ha_list;
> +       ha_info;
> +       ha_info= ha_info_next)
> +  {
> +    ha_info_next= ha_info->next();
> +    ha_info->reset(); /* keep it conveniently zero-filled */
> +  }
> +
> +  thd->transaction.all.ha_list= 0;
> +  thd->transaction.all.no_2pc= 0;
> +
> +  DBUG_RETURN(FALSE);
> +}
> +
> +
> +/**
>    Starts an XA transaction with the given xid value.
>  
>    @param thd    Current thread
> @@ -928,6 +966,12 @@ bool trans_xa_commit(THD *thd)
>      res= !xs;
>      if (res)
>        my_error(ER_XAER_NOTA, MYF(0));
> +    else if (thd->in_multi_stmt_transaction_mode())
> +    {
> +      my_error(ER_XAER_RMFAIL, MYF(0),
> +                xa_state_names[thd->transaction.xid_state.xa_state]);
> +      res= TRUE;
> +    }
>      else
>      {
>        res= xa_trans_rolled_back(xs);
> @@ -978,8 +1022,16 @@ bool trans_xa_commit(THD *thd)
>      {
>        DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
>  
> -      res= MY_TEST(ha_commit_one_phase(thd, 1));
> -      if (res)
> +      if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
> +      {
> +        res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE,
> +                               thd->query(), thd->query_length(),
> +                               FALSE, FALSE, FALSE, 0);
> +      }
> +      else
> +        res= 0;
> +
> +      if (res || (res= MY_TEST(ha_commit_one_phase(thd, 1))))
>          my_error(ER_XAER_RMERR, MYF(0));
>      }
>    }
> @@ -1044,7 +1096,18 @@ bool trans_xa_rollback(THD *thd)
>      DBUG_RETURN(TRUE);
>    }
>  
> -  res= xa_trans_force_rollback(thd);
> +  if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
> +  {
> +    res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE,
> +        thd->query(), thd->query_length(),
> +        FALSE, FALSE, FALSE, 0);
> +  }
> +  else
> +    res= 0;
> +
> +  res= res || xa_trans_force_rollback(thd);
> +  if (res || (res= MY_TEST(xa_trans_force_rollback(thd))))
> +    my_error(ER_XAER_RMERR, MYF(0));
>  
>    thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
>    thd->transaction.all.reset();
> diff --git a/sql/transaction.h b/sql/transaction.h
> index 7e34693..f228cc6 100644
> --- a/sql/transaction.h
> +++ b/sql/transaction.h
> @@ -29,6 +29,7 @@ bool trans_commit(THD *thd);
>  bool trans_commit_implicit(THD *thd);
>  bool trans_rollback(THD *thd);
>  bool trans_rollback_implicit(THD *thd);
> +bool trans_detach(THD *thd);
>  
>  bool trans_commit_stmt(THD *thd);
>  bool trans_rollback_stmt(THD *thd);
> _______________________________________________
> commits mailing list
> commits@xxxxxxxxxxx
> https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits