← Back to team overview

maria-developers team mailing list archive

Re: a Query_log_event charset bug in parallel replication

 

nanyi607rao,

Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:

> Yes, you are absolutely right, the patch I pushed for this is completely
> wrong, just as you explained :-(
>
> Sorry about this, I will try to come up with a better fix.

What do you think about the below patch?
It puts the cached_charset into the THD, which seems to be a better place,
since it describes which charset is currently active in the THD.

(I also introduced THD::system_thread_info, as I did not want to add even more
data to the THD, which is already far too big; that's why the patch is a bit
bigger).

 - Kristian.

=== modified file 'sql/log_event.cc'
--- sql/log_event.cc	2014-04-23 14:06:06 +0000
+++ sql/log_event.cc	2014-04-24 11:00:52 +0000
@@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_
                    (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
       if (charset_inited)
       {
-        if (rgi->cached_charset_compare(charset))
+        rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
+        if (sql_info->cached_charset_compare(charset))
         {
           /* Verify that we support the charsets found in the event. */
           if (!(thd->variables.character_set_client=

=== modified file 'sql/rpl_mi.h'
--- sql/rpl_mi.h	2013-11-20 11:05:39 +0000
+++ sql/rpl_mi.h	2014-04-24 11:01:57 +0000
@@ -216,6 +216,16 @@ class Master_info_index
   bool stop_all_slaves(THD *thd);
 };
 
+
+/*
+  The class rpl_io_thread_info is the THD::system_thread_info for the IO thread.
+*/
+class rpl_io_thread_info
+{
+public:
+};
+
+
 bool check_master_connection_name(LEX_STRING *name);
 void create_logfile_name_with_suffix(char *res_file_name, size_t length,
                              const char *info_file, 

=== modified file 'sql/rpl_parallel.cc'
--- sql/rpl_parallel.cc	2014-04-09 12:42:46 +0000
+++ sql/rpl_parallel.cc	2014-04-24 11:40:29 +0000
@@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::qu
   THD *thd= rgi->thd;
 
   thd->rgi_slave= rgi;
-  thd->rpl_filter = rli->mi->rpl_filter;
+  thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
 
   /* ToDo: Access to thd, and what about rli, split out a parallel part? */
   mysql_mutex_lock(&rli->data_lock);
@@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg)
   rpl_parallel_thread::queued_event *qevs_to_free;
   rpl_group_info *rgis_to_free;
   group_commit_orderer *gcos_to_free;
+  rpl_sql_thread_info sql_info(NULL);
   size_t total_event_size;
   int err;
 
@@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg)
   thd_proc_info(thd, "Waiting for work from main SQL threads");
   thd->set_time();
   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
+  thd->system_thread_info.rpl_sql_info= &sql_info;
   /*
     For now, we need to run the replication parallel worker threads in
     READ COMMITTED. This is needed because gap locks are not symmetric.

=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc	2014-04-23 14:06:06 +0000
+++ sql/rpl_rli.cc	2014-04-24 12:28:41 +0000
@@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log
     deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
 {
   reinit(rli);
-  cached_charset_invalidate();
   bzero(&current_gtid, sizeof(current_gtid));
   mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
                    MY_MUTEX_INIT_FAST);
@@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_grou
 }
 
 
-void rpl_group_info::cached_charset_invalidate()
-{
-  DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
-
-  /* Full of zeroes means uninitialized. */
-  bzero(cached_charset, sizeof(cached_charset));
-  DBUG_VOID_RETURN;
-}
-
-
-bool rpl_group_info::cached_charset_compare(char *charset) const
-{
-  DBUG_ENTER("rpl_group_info::cached_charset_compare");
-
-  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
-  {
-    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
-    DBUG_RETURN(1);
-  }
-  DBUG_RETURN(0);
-}
-
-
 void rpl_group_info::cleanup_context(THD *thd, bool error)
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
@@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit()
 }
 
 
+rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
+  : rpl_filter(filter)
+{
+  cached_charset_invalidate();
+}
+
+
+void rpl_sql_thread_info::cached_charset_invalidate()
+{
+  DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
+
+  /* Full of zeroes means uninitialized. */
+  bzero(cached_charset, sizeof(cached_charset));
+  DBUG_VOID_RETURN;
+}
+
+
+bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
+{
+  DBUG_ENTER("rpl_group_info::cached_charset_compare");
+
+  if (memcmp(cached_charset, charset, sizeof(cached_charset)))
+  {
+    memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
+    DBUG_RETURN(1);
+  }
+  DBUG_RETURN(0);
+}
+
 #endif

=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h	2014-04-23 14:06:06 +0000
+++ sql/rpl_rli.h	2014-04-24 12:28:48 +0000
@@ -26,6 +26,7 @@
 
 struct RPL_TABLE_LIST;
 class Master_info;
+class Rpl_filter;
 
 /****************************************************************************
 
@@ -536,8 +537,6 @@ struct rpl_group_info
   mysql_mutex_t sleep_lock;
   mysql_cond_t sleep_cond;
 
-  char cached_charset[6];
-
   /*
     trans_retries varies between 0 to slave_transaction_retries and counts how
     many times the slave has retried the present transaction; gets reset to 0
@@ -671,15 +670,6 @@ struct rpl_group_info
     return false;
   }
 
-  /*
-    Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
-    the thread save 3 get_charset() per Query_log_event if the charset is not
-    changing from event to event (common situation).
-    When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
-  */
-  void cached_charset_invalidate();
-  bool cached_charset_compare(char *charset) const;
-
   void clear_tables_to_lock();
   void cleanup_context(THD *, bool);
   void slave_close_thread_tables(THD *);
@@ -727,6 +717,30 @@ struct rpl_group_info
 };
 
 
+/*
+  The class rpl_sql_thread_info is the THD::system_thread_info for an SQL
+  thread; this is either the driver SQL thread or a worker thread for parallel
+  replication.
+*/
+class rpl_sql_thread_info
+{
+public:
+  char cached_charset[6];
+  Rpl_filter* rpl_filter;
+
+  rpl_sql_thread_info(Rpl_filter *filter);
+
+  /*
+    Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
+    the thread save 3 get_charset() per Query_log_event if the charset is not
+    changing from event to event (common situation).
+    When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
+  */
+  void cached_charset_invalidate();
+  bool cached_charset_compare(char *charset) const;
+};
+
+
 // Defined in rpl_rli.cc
 int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
 

=== modified file 'sql/slave.cc'
--- sql/slave.cc	2014-04-23 14:06:06 +0000
+++ sql/slave.cc	2014-04-24 11:39:30 +0000
@@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(TH
     global_system_variables.collation_server;
   thd->update_charset();
 
-  rgi->cached_charset_invalidate();
+  thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
   DBUG_VOID_RETURN;
 }
 
@@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void *
   uint retry_count;
   bool suppress_warnings;
   int ret;
+  rpl_io_thread_info io_info;
 #ifndef DBUG_OFF
   uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
 #endif
@@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void *
     sql_print_error("Failed during slave I/O thread initialization");
     goto err_during_init;
   }
+  thd->system_thread_info.rpl_io_info= &io_info;
   mysql_mutex_lock(&LOCK_thread_count);
   threads.append(thd);
   mysql_mutex_unlock(&LOCK_thread_count);
@@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void
   Relay_log_info* rli = &mi->rli;
   const char *errmsg;
   rpl_group_info *serial_rgi;
+  rpl_sql_thread_info sql_info(mi->rpl_filter);
 
   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
   my_thread_init();
@@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void
   serial_rgi= new rpl_group_info(rli);
   thd = new THD; // note that contructor of THD uses DBUG_ !
   thd->thread_stack = (char*)&thd; // remember where our stack is
-  thd->rpl_filter = mi->rpl_filter;
+  thd->system_thread_info.rpl_sql_info= &sql_info;
 
   DBUG_ASSERT(rli->inited);
   DBUG_ASSERT(rli->mi == mi);
@@ -4676,7 +4679,7 @@ log '%s' at position %s, relay log '%s'
   mysql_cond_broadcast(&rli->data_cond);
   rli->ignore_log_space_limit= 0; /* don't need any lock */
   /* we die so won't remember charset - re-update them on next thread start */
-  serial_rgi->cached_charset_invalidate();
+  thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
 
   /*
     TODO: see if we can do this conditionally in next_event() instead

=== modified file 'sql/sql_acl.cc'
--- sql/sql_acl.cc	2014-03-29 10:33:25 +0000
+++ sql/sql_acl.cc	2014-04-24 12:02:04 +0000
@@ -38,6 +38,7 @@
 #include "records.h"              // READ_RECORD, read_record_info,
                                   // init_read_record, end_read_record
 #include "rpl_filter.h"           // rpl_filter
+#include "rpl_rli.h"
 #include <m_ctype.h>
 #include <stdarg.h>
 #include "sp_head.h"
@@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const cha
 {
   TABLE_LIST tables;
   TABLE *table;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   /* Buffer should be extended when password length is extended. */
   char buff[512];
   ulong query_length;
@@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const cha
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
   TABLE_LIST tables[3];
   bool create_new_users=0;
   char *db_name, *table_name;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_table_grant");
 
   if (!initialized)
@@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LI
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE
   TABLE_LIST tables[2];
   bool create_new_users=0, result=0;
   char *db_name, *table_name;
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_routine_grant");
 
   if (!initialized)
@@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *d
   char tmp_db[SAFE_NAME_LEN+1];
   bool create_new_users=0;
   TABLE_LIST tables[2];
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("mysql_grant");
 
   if (!initialized)
@@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *d
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into
@@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const cha
 #define GRANT_TABLES 7
 static int open_grant_tables(THD *thd, TABLE_LIST *tables)
 {
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
   DBUG_ENTER("open_grant_tables");
 
   if (!initialized)
@@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, T
     GRANT and REVOKE are applied the slave in/exclusion rules as they are
     some kind of updates to the mysql.% tables.
   */
-  if (thd->slave_thread && rpl_filter->is_on())
+  if (thd->slave_thread &&
+      (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
   {
     /*
       The tables must be marked "updating" so that tables_ok() takes them into

=== modified file 'sql/sql_class.h'
--- sql/sql_class.h	2014-04-23 06:57:25 +0000
+++ sql/sql_class.h	2014-04-24 12:28:29 +0000
@@ -72,6 +72,8 @@ class Parser_state;
 class Rows_log_event;
 class Sroutine_hash_entry;
 class user_var_entry;
+class rpl_io_thread_info;
+class rpl_sql_thread_info;
 
 enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
 enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE };
@@ -1810,8 +1812,10 @@ class THD :public Statement,
   /* Slave applier execution context */
   rpl_group_info* rgi_slave;
 
-  /* Used to SLAVE SQL thread */
-  Rpl_filter* rpl_filter;
+  union {
+    rpl_io_thread_info *rpl_io_info;
+    rpl_sql_thread_info *rpl_sql_info;
+  } system_thread_info;
 
   void reset_for_next_command();
   /*

=== modified file 'sql/sql_parse.cc'
--- sql/sql_parse.cc	2014-03-29 10:33:25 +0000
+++ sql/sql_parse.cc	2014-04-24 11:54:18 +0000
@@ -171,8 +171,9 @@ const char *xa_state_names[]={
 */
 inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
 {
-  return thd->rpl_filter->is_on() && tables && !thd->spcont &&
-         !thd->rpl_filter->tables_ok(thd->db, tables);
+  Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+  return rpl_filter->is_on() && tables && !thd->spcont &&
+         !rpl_filter->tables_ok(thd->db, tables);
 }
 #endif
 
@@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd)
   /* have table map for update for multi-update statement (BUG#37051) */
   bool have_table_map_for_update= FALSE;
   /* */
-  Rpl_filter *rpl_filter= thd->rpl_filter;
+  Rpl_filter *rpl_filter;
 #endif
   DBUG_ENTER("mysql_execute_command");
 
@@ -3885,12 +3886,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-	(!rpl_filter->db_ok(lex->name.str) ||
-	 !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(lex->name.str) ||
+          !rpl_filter->db_ok_with_wild_table(lex->name.str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3913,12 +3917,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-	(!rpl_filter->db_ok(lex->name.str) ||
-	 !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(lex->name.str) ||
+          !rpl_filter->db_ok_with_wild_table(lex->name.str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3930,13 +3937,16 @@ mysql_execute_command(THD *thd)
   {
     LEX_STRING *db= & lex->name;
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread && 
-       (!rpl_filter->db_ok(db->str) ||
-        !rpl_filter->db_ok_with_wild_table(db->str)))
+    if (thd->slave_thread)
     {
-      res= 1;
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(db->str) ||
+          !rpl_filter->db_ok_with_wild_table(db->str))
+      {
+        res= 1;
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_db_name(db))
@@ -3973,12 +3983,15 @@ mysql_execute_command(THD *thd)
       above was not called. So we have to check rules again here.
     */
 #ifdef HAVE_REPLICATION
-    if (thd->slave_thread &&
-	(!rpl_filter->db_ok(db->str) ||
-	 !rpl_filter->db_ok_with_wild_table(db->str)))
+    if (thd->slave_thread)
     {
-      my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
-      break;
+      rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+      if (!rpl_filter->db_ok(db->str) ||
+          !rpl_filter->db_ok_with_wild_table(db->str))
+      {
+        my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+        break;
+      }
     }
 #endif
     if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))



Follow ups

References