maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #07196
Re: a Query_log_event charset bug in parallel replication
nanyi607rao,
Kristian Nielsen <knielsen@xxxxxxxxxxxxxxx> writes:
> Yes, you are absolutely right, the patch I pushed for this is completely
> wrong, just as you explained :-(
>
> Sorry about this, I will try to come up with a better fix.
What do you think about the below patch?
It puts the cached_charset into the THD, which seems to be a better place,
since it describes which charset is currently active in the THD.
(I also introduced THD::system_thread_info, as I did not want to add even more
data to the THD, which is already far too big; that's why the patch is a bit
bigger).
- Kristian.
=== modified file 'sql/log_event.cc'
--- sql/log_event.cc 2014-04-23 14:06:06 +0000
+++ sql/log_event.cc 2014-04-24 11:00:52 +0000
@@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_
(sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
if (charset_inited)
{
- if (rgi->cached_charset_compare(charset))
+ rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
+ if (sql_info->cached_charset_compare(charset))
{
/* Verify that we support the charsets found in the event. */
if (!(thd->variables.character_set_client=
=== modified file 'sql/rpl_mi.h'
--- sql/rpl_mi.h 2013-11-20 11:05:39 +0000
+++ sql/rpl_mi.h 2014-04-24 11:01:57 +0000
@@ -216,6 +216,16 @@ class Master_info_index
bool stop_all_slaves(THD *thd);
};
+
+/*
+ The class rpl_io_thread_info is the THD::system_thread_info for the IO thread.
+*/
+class rpl_io_thread_info
+{
+public:
+};
+
+
bool check_master_connection_name(LEX_STRING *name);
void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file,
=== modified file 'sql/rpl_parallel.cc'
--- sql/rpl_parallel.cc 2014-04-09 12:42:46 +0000
+++ sql/rpl_parallel.cc 2014-04-24 11:40:29 +0000
@@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::qu
THD *thd= rgi->thd;
thd->rgi_slave= rgi;
- thd->rpl_filter = rli->mi->rpl_filter;
+ thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
@@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg)
rpl_parallel_thread::queued_event *qevs_to_free;
rpl_group_info *rgis_to_free;
group_commit_orderer *gcos_to_free;
+ rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
@@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg)
thd_proc_info(thd, "Waiting for work from main SQL threads");
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
+ thd->system_thread_info.rpl_sql_info= &sql_info;
/*
For now, we need to run the replication parallel worker threads in
READ COMMITTED. This is needed because gap locks are not symmetric.
=== modified file 'sql/rpl_rli.cc'
--- sql/rpl_rli.cc 2014-04-23 14:06:06 +0000
+++ sql/rpl_rli.cc 2014-04-24 12:28:41 +0000
@@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
reinit(rli);
- cached_charset_invalidate();
bzero(¤t_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
@@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_grou
}
-void rpl_group_info::cached_charset_invalidate()
-{
- DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
-
- /* Full of zeroes means uninitialized. */
- bzero(cached_charset, sizeof(cached_charset));
- DBUG_VOID_RETURN;
-}
-
-
-bool rpl_group_info::cached_charset_compare(char *charset) const
-{
- DBUG_ENTER("rpl_group_info::cached_charset_compare");
-
- if (memcmp(cached_charset, charset, sizeof(cached_charset)))
- {
- memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
- DBUG_RETURN(1);
- }
- DBUG_RETURN(0);
-}
-
-
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
@@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit()
}
+rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
+ : rpl_filter(filter)
+{
+ cached_charset_invalidate();
+}
+
+
+void rpl_sql_thread_info::cached_charset_invalidate()
+{
+ DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
+
+ /* Full of zeroes means uninitialized. */
+ bzero(cached_charset, sizeof(cached_charset));
+ DBUG_VOID_RETURN;
+}
+
+
+bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
+{
+ DBUG_ENTER("rpl_group_info::cached_charset_compare");
+
+ if (memcmp(cached_charset, charset, sizeof(cached_charset)))
+ {
+ memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
+ DBUG_RETURN(1);
+ }
+ DBUG_RETURN(0);
+}
+
#endif
=== modified file 'sql/rpl_rli.h'
--- sql/rpl_rli.h 2014-04-23 14:06:06 +0000
+++ sql/rpl_rli.h 2014-04-24 12:28:48 +0000
@@ -26,6 +26,7 @@
struct RPL_TABLE_LIST;
class Master_info;
+class Rpl_filter;
/****************************************************************************
@@ -536,8 +537,6 @@ struct rpl_group_info
mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond;
- char cached_charset[6];
-
/*
trans_retries varies between 0 to slave_transaction_retries and counts how
many times the slave has retried the present transaction; gets reset to 0
@@ -671,15 +670,6 @@ struct rpl_group_info
return false;
}
- /*
- Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
- the thread save 3 get_charset() per Query_log_event if the charset is not
- changing from event to event (common situation).
- When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
- */
- void cached_charset_invalidate();
- bool cached_charset_compare(char *charset) const;
-
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
@@ -727,6 +717,30 @@ struct rpl_group_info
};
+/*
+ The class rpl_sql_thread_info is the THD::system_thread_info for an SQL
+ thread; this is either the driver SQL thread or a worker thread for parallel
+ replication.
+*/
+class rpl_sql_thread_info
+{
+public:
+ char cached_charset[6];
+ Rpl_filter* rpl_filter;
+
+ rpl_sql_thread_info(Rpl_filter *filter);
+
+ /*
+ Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
+ the thread save 3 get_charset() per Query_log_event if the charset is not
+ changing from event to event (common situation).
+ When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
+ */
+ void cached_charset_invalidate();
+ bool cached_charset_compare(char *charset) const;
+};
+
+
// Defined in rpl_rli.cc
int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
=== modified file 'sql/slave.cc'
--- sql/slave.cc 2014-04-23 14:06:06 +0000
+++ sql/slave.cc 2014-04-24 11:39:30 +0000
@@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(TH
global_system_variables.collation_server;
thd->update_charset();
- rgi->cached_charset_invalidate();
+ thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
DBUG_VOID_RETURN;
}
@@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void *
uint retry_count;
bool suppress_warnings;
int ret;
+ rpl_io_thread_info io_info;
#ifndef DBUG_OFF
uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0;
#endif
@@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void *
sql_print_error("Failed during slave I/O thread initialization");
goto err_during_init;
}
+ thd->system_thread_info.rpl_io_info= &io_info;
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
@@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void
Relay_log_info* rli = &mi->rli;
const char *errmsg;
rpl_group_info *serial_rgi;
+ rpl_sql_thread_info sql_info(mi->rpl_filter);
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
@@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void
serial_rgi= new rpl_group_info(rli);
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
- thd->rpl_filter = mi->rpl_filter;
+ thd->system_thread_info.rpl_sql_info= &sql_info;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
@@ -4676,7 +4679,7 @@ log '%s' at position %s, relay log '%s'
mysql_cond_broadcast(&rli->data_cond);
rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */
- serial_rgi->cached_charset_invalidate();
+ thd->system_thread_info.rpl_sql_info->cached_charset_invalidate();
/*
TODO: see if we can do this conditionally in next_event() instead
=== modified file 'sql/sql_acl.cc'
--- sql/sql_acl.cc 2014-03-29 10:33:25 +0000
+++ sql/sql_acl.cc 2014-04-24 12:02:04 +0000
@@ -38,6 +38,7 @@
#include "records.h" // READ_RECORD, read_record_info,
// init_read_record, end_read_record
#include "rpl_filter.h" // rpl_filter
+#include "rpl_rli.h"
#include <m_ctype.h>
#include <stdarg.h>
#include "sp_head.h"
@@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const cha
{
TABLE_LIST tables;
TABLE *table;
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
/* Buffer should be extended when password length is extended. */
char buff[512];
ulong query_length;
@@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const cha
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
- if (thd->slave_thread && rpl_filter->is_on())
+ if (thd->slave_thread &&
+ (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
@@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
TABLE_LIST tables[3];
bool create_new_users=0;
char *db_name, *table_name;
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_table_grant");
if (!initialized)
@@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LI
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
- if (thd->slave_thread && rpl_filter->is_on())
+ if (thd->slave_thread &&
+ (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
@@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE
TABLE_LIST tables[2];
bool create_new_users=0, result=0;
char *db_name, *table_name;
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_routine_grant");
if (!initialized)
@@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
- if (thd->slave_thread && rpl_filter->is_on())
+ if (thd->slave_thread &&
+ (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
@@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *d
char tmp_db[SAFE_NAME_LEN+1];
bool create_new_users=0;
TABLE_LIST tables[2];
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
DBUG_ENTER("mysql_grant");
if (!initialized)
@@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *d
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
- if (thd->slave_thread && rpl_filter->is_on())
+ if (thd->slave_thread &&
+ (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
@@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const cha
#define GRANT_TABLES 7
static int open_grant_tables(THD *thd, TABLE_LIST *tables)
{
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
DBUG_ENTER("open_grant_tables");
if (!initialized)
@@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, T
GRANT and REVOKE are applied the slave in/exclusion rules as they are
some kind of updates to the mysql.% tables.
*/
- if (thd->slave_thread && rpl_filter->is_on())
+ if (thd->slave_thread &&
+ (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on())
{
/*
The tables must be marked "updating" so that tables_ok() takes them into
=== modified file 'sql/sql_class.h'
--- sql/sql_class.h 2014-04-23 06:57:25 +0000
+++ sql/sql_class.h 2014-04-24 12:28:29 +0000
@@ -72,6 +72,8 @@ class Parser_state;
class Rows_log_event;
class Sroutine_hash_entry;
class user_var_entry;
+class rpl_io_thread_info;
+class rpl_sql_thread_info;
enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE };
@@ -1810,8 +1812,10 @@ class THD :public Statement,
/* Slave applier execution context */
rpl_group_info* rgi_slave;
- /* Used to SLAVE SQL thread */
- Rpl_filter* rpl_filter;
+ union {
+ rpl_io_thread_info *rpl_io_info;
+ rpl_sql_thread_info *rpl_sql_info;
+ } system_thread_info;
void reset_for_next_command();
/*
=== modified file 'sql/sql_parse.cc'
--- sql/sql_parse.cc 2014-03-29 10:33:25 +0000
+++ sql/sql_parse.cc 2014-04-24 11:54:18 +0000
@@ -171,8 +171,9 @@ const char *xa_state_names[]={
*/
inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables)
{
- return thd->rpl_filter->is_on() && tables && !thd->spcont &&
- !thd->rpl_filter->tables_ok(thd->db, tables);
+ Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+ return rpl_filter->is_on() && tables && !thd->spcont &&
+ !rpl_filter->tables_ok(thd->db, tables);
}
#endif
@@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd)
/* have table map for update for multi-update statement (BUG#37051) */
bool have_table_map_for_update= FALSE;
/* */
- Rpl_filter *rpl_filter= thd->rpl_filter;
+ Rpl_filter *rpl_filter;
#endif
DBUG_ENTER("mysql_execute_command");
@@ -3885,12 +3886,15 @@ mysql_execute_command(THD *thd)
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
- if (thd->slave_thread &&
- (!rpl_filter->db_ok(lex->name.str) ||
- !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+ if (thd->slave_thread)
{
- my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
- break;
+ rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+ if (!rpl_filter->db_ok(lex->name.str) ||
+ !rpl_filter->db_ok_with_wild_table(lex->name.str))
+ {
+ my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+ break;
+ }
}
#endif
if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3913,12 +3917,15 @@ mysql_execute_command(THD *thd)
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
- if (thd->slave_thread &&
- (!rpl_filter->db_ok(lex->name.str) ||
- !rpl_filter->db_ok_with_wild_table(lex->name.str)))
+ if (thd->slave_thread)
{
- my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
- break;
+ rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+ if (!rpl_filter->db_ok(lex->name.str) ||
+ !rpl_filter->db_ok_with_wild_table(lex->name.str))
+ {
+ my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+ break;
+ }
}
#endif
if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
@@ -3930,13 +3937,16 @@ mysql_execute_command(THD *thd)
{
LEX_STRING *db= & lex->name;
#ifdef HAVE_REPLICATION
- if (thd->slave_thread &&
- (!rpl_filter->db_ok(db->str) ||
- !rpl_filter->db_ok_with_wild_table(db->str)))
+ if (thd->slave_thread)
{
- res= 1;
- my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
- break;
+ rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+ if (!rpl_filter->db_ok(db->str) ||
+ !rpl_filter->db_ok_with_wild_table(db->str))
+ {
+ res= 1;
+ my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+ break;
+ }
}
#endif
if (check_db_name(db))
@@ -3973,12 +3983,15 @@ mysql_execute_command(THD *thd)
above was not called. So we have to check rules again here.
*/
#ifdef HAVE_REPLICATION
- if (thd->slave_thread &&
- (!rpl_filter->db_ok(db->str) ||
- !rpl_filter->db_ok_with_wild_table(db->str)))
+ if (thd->slave_thread)
{
- my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
- break;
+ rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter;
+ if (!rpl_filter->db_ok(db->str) ||
+ !rpl_filter->db_ok_with_wild_table(db->str))
+ {
+ my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0));
+ break;
+ }
}
#endif
if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
Follow ups
References