← Back to team overview

maria-developers team mailing list archive

Re: Incorrect format description event skipping in queue_event()

 

Pavel Ivanov <pivanof@xxxxxxxxxx> writes:

> Why not change Format_description_log_event::write()? It seems
> actually that it will be more correct if we change it. I made the
> following change (line numbers may be off) and it worked:

Agree, that sounds like a good idea, thanks!

>> If you are interested in helping with testing this, I can write a patch later,
>> but I need a way to test it without having to spend unreasonable amounts of
>> time on it.
>
> Sure, I'll be happy to test.

Ok, great.

Attached is a new patch for this. It makes mysql_binlog_send() and
gtid_state_from_pos() construct the Format_description_log_event and pass it
to Gtid_log_event::peek() and Gtid_list_log_event() to get the correct (I
hope) common_header_length.

The patch also includes my original patch for outputting the master's
Format_description event during slave reconnect, fixed with out suggested
changes.

 - Kristian.

=== modified file 'sql/log.cc'
--- sql/log.cc	2013-08-23 12:02:13 +0000
+++ sql/log.cc	2013-09-03 10:08:51 +0000
@@ -4806,12 +4806,23 @@ int MYSQL_BIN_LOG::new_file_impl(bool ne
 }
 
 
-bool MYSQL_BIN_LOG::append(Log_event* ev)
+bool
+MYSQL_BIN_LOG::append(Log_event *ev)
 {
-  bool error = 0;
+  bool res;
   mysql_mutex_lock(&LOCK_log);
+  res= append_no_lock(ev);
+  mysql_mutex_unlock(&LOCK_log);
+  return res;
+}
+
+
+bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
+{
+  bool error = 0;
   DBUG_ENTER("MYSQL_BIN_LOG::append");
 
+  mysql_mutex_assert_owner(&LOCK_log);
   DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
   /*
     Log_event::write() is smart enough to use my_b_write() or
@@ -4829,7 +4840,6 @@ bool MYSQL_BIN_LOG::append(Log_event* ev
   if (my_b_append_tell(&log_file) > max_size)
     error= new_file_without_locking();
 err:
-  mysql_mutex_unlock(&LOCK_log);
   signal_update();				// Safe as we don't call close
   DBUG_RETURN(error);
 }

=== modified file 'sql/log.h'
--- sql/log.h	2013-08-23 12:02:13 +0000
+++ sql/log.h	2013-09-03 10:08:51 +0000
@@ -712,6 +712,7 @@ class MYSQL_BIN_LOG: public TC_LOG, priv
   */
   bool appendv(const char* buf,uint len,...);
   bool append(Log_event* ev);
+  bool append_no_lock(Log_event* ev);
 
   void mark_xids_active(ulong cookie, uint xid_count);
   void mark_xid_done(ulong cookie, bool write_checkpoint);

=== modified file 'sql/log_event.cc'
--- sql/log_event.cc	2013-08-23 08:16:43 +0000
+++ sql/log_event.cc	2013-09-03 10:08:51 +0000
@@ -4754,16 +4754,15 @@ bool Format_description_log_event::write
     We don't call Start_log_event_v3::write() because this would make 2
     my_b_safe_write().
   */
-  uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
-  size_t rec_size= sizeof(buff);
+  uchar buff[START_V3_HEADER_LEN+1];
+  size_t rec_size= sizeof(buff) + BINLOG_CHECKSUM_ALG_DESC_LEN +
+                   number_of_event_types;
   int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
   memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
   if (!dont_set_created)
     created= get_time();
   int4store(buff + ST_CREATED_OFFSET,created);
   buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
-  memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
-         LOG_EVENT_TYPES);
   /*
     if checksum is requested
     record the checksum-algorithm descriptor next to
@@ -4776,7 +4775,7 @@ bool Format_description_log_event::write
 #ifndef DBUG_OFF
   data_written= 0; // to prepare for need_checksum assert
 #endif
-  buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
+  uchar checksum_byte= need_checksum() ?
     checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
   /* 
      FD of checksum-aware server is always checksum-equipped, (V) is in,
@@ -4796,7 +4795,10 @@ bool Format_description_log_event::write
     checksum_alg= BINLOG_CHECKSUM_ALG_CRC32;  // Forcing (V) room to fill anyway
   }
   ret= (write_header(file, rec_size) ||
-        wrapper_my_b_safe_write(file, buff, rec_size) ||
+        wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
+        wrapper_my_b_safe_write(file, (uchar*)post_header_len,
+                                number_of_event_types) ||
+        wrapper_my_b_safe_write(file, &checksum_byte, sizeof(checksum_byte)) ||
         write_footer(file));
   if (no_checksum)
     checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
@@ -6125,7 +6127,7 @@ bool
 Gtid_log_event::peek(const char *event_start, size_t event_len,
                      uint8 checksum_alg,
                      uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
-                     uchar *flags2)
+                     uchar *flags2, const Format_description_log_event *fdev)
 {
   const char *p;
 
@@ -6140,10 +6142,10 @@ Gtid_log_event::peek(const char *event_s
     DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
                 checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
 
-  if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+  if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
     return true;
   *server_id= uint4korr(event_start + SERVER_ID_OFFSET);
-  p= event_start + LOG_EVENT_HEADER_LEN;
+  p= event_start + fdev->common_header_len;
   *seq_no= uint8korr(p);
   p+= 8;
   *domain_id= uint4korr(p);
@@ -6581,7 +6583,8 @@ Gtid_list_log_event::print(FILE *file, P
 bool
 Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
                           uint8 checksum_alg,
-                          rpl_gtid **out_gtid_list, uint32 *out_list_len)
+                          rpl_gtid **out_gtid_list, uint32 *out_list_len,
+                          const Format_description_log_event *fdev)
 {
   const char *p;
   uint32 count_field, count;
@@ -6598,13 +6601,13 @@ Gtid_list_log_event::peek(const char *ev
     DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
                 checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
 
-  if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
+  if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN)
     return true;
-  p= event_start + LOG_EVENT_HEADER_LEN;
+  p= event_start + fdev->common_header_len;
   count_field= uint4korr(p);
   p+= 4;
   count= count_field & ((1<<28)-1);
-  if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
+  if (event_len < (uint32)fdev->common_header_len + GTID_LIST_HEADER_LEN +
       16 * count)
     return true;
   if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count + (count == 0),

=== modified file 'sql/log_event.h'
--- sql/log_event.h	2013-08-22 10:36:42 +0000
+++ sql/log_event.h	2013-09-03 09:55:57 +0000
@@ -3118,7 +3118,7 @@ class Gtid_log_event: public Log_event
   static bool peek(const char *event_start, size_t event_len,
                    uint8 checksum_alg,
                    uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
-                   uchar *flags2);
+                   uchar *flags2, const Format_description_log_event *fdev);
 #endif
 };
 
@@ -3232,7 +3232,8 @@ class Gtid_list_log_event: public Log_ev
 #endif
   static bool peek(const char *event_start, uint32 event_len,
                    uint8 checksum_alg,
-                   rpl_gtid **out_gtid_list, uint32 *out_list_len);
+                   rpl_gtid **out_gtid_list, uint32 *out_list_len,
+                   const Format_description_log_event *fdev);
 };
 
 

=== modified file 'sql/slave.cc'
--- sql/slave.cc	2013-08-23 08:16:43 +0000
+++ sql/slave.cc	2013-09-03 10:08:51 +0000
@@ -5182,7 +5182,8 @@ static int queue_event(Master_info* mi,c
 
     if (Gtid_log_event::peek(buf, event_len, checksum_alg,
                              &event_gtid.domain_id, &event_gtid.server_id,
-                             &event_gtid.seq_no, &dummy_flag))
+                             &event_gtid.seq_no, &dummy_flag,
+                             rli->relay_log.description_event_for_queue))
     {
       error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
       goto err;
@@ -5243,7 +5244,8 @@ static int queue_event(Master_info* mi,c
     if (Gtid_log_event::peek(buf, event_len, checksum_alg,
                              &mi->last_queued_gtid.domain_id,
                              &mi->last_queued_gtid.server_id,
-                             &mi->last_queued_gtid.seq_no, &dummy_flag))
+                             &mi->last_queued_gtid.seq_no, &dummy_flag,
+                             rli->relay_log.description_event_for_queue))
     {
       error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
       goto err;
@@ -5308,6 +5310,26 @@ static int queue_event(Master_info* mi,c
   if (unlikely(gtid_skip_enqueue))
   {
     mi->master_log_pos+= inc_pos;
+    if ((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT &&
+        s_id == mi->master_id)
+    {
+      /*
+        If we write this master's description event in the middle of an event
+        group due to GTID reconnect, SQL thread will think that master crashed
+        in the middle of the group and roll back the first half, so we must not.
+
+        But we still have to write an artificial copy of the masters description
+        event, to override the initial slave-version description event so that
+        SQL thread has the right information for parsing the events it reads.
+      */
+      rli->relay_log.description_event_for_queue->created= 0;
+      rli->relay_log.description_event_for_queue->set_artificial_event();
+      if (rli->relay_log.append_no_lock
+          (rli->relay_log.description_event_for_queue))
+        error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
+      else
+        rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+    }
   }
   else
   if ((s_id == global_system_variables.server_id &&

=== modified file 'sql/sql_repl.cc'
--- sql/sql_repl.cc	2013-08-23 12:02:13 +0000
+++ sql/sql_repl.cc	2013-09-03 09:59:34 +0000
@@ -1269,6 +1269,7 @@ gtid_state_from_pos(const char *name, ui
   uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
   int err;
   String packet;
+  Format_description_log_event *fdev= NULL;
 
   if (gtid_state->load((const rpl_gtid *)NULL, 0))
   {
@@ -1280,6 +1281,13 @@ gtid_state_from_pos(const char *name, ui
   if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
     return errormsg;
 
+  if (!(fdev= new Format_description_log_event(3)))
+  {
+    errormsg= "Out of memory initializing format_description event "
+      "while scanning binlog to find start position";
+    goto end;
+  }
+
   /*
     First we need to find the initial GTID_LIST_EVENT. We need this even
     if the offset is at the very start of the binlog file.
@@ -1315,6 +1323,8 @@ gtid_state_from_pos(const char *name, ui
     typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
     if (typ == FORMAT_DESCRIPTION_EVENT)
     {
+      Format_description_log_event *tmp;
+
       if (found_format_description_event)
       {
         errormsg= "Duplicate format description log event found while "
@@ -1324,6 +1334,15 @@ gtid_state_from_pos(const char *name, ui
 
       current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
       found_format_description_event= true;
+      if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(),
+                                                  fdev)))
+      {
+        errormsg= "Corrupt Format_description event found or out-of-memory "
+          "while searching for old-style position in binlog";
+        goto end;
+      }
+      delete fdev;
+      fdev= tmp;
     }
     else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
     {
@@ -1348,7 +1367,7 @@ gtid_state_from_pos(const char *name, ui
       }
       status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
                                         current_checksum_alg,
-                                        &gtid_list, &list_len);
+                                        &gtid_list, &list_len, fdev);
       if (status)
       {
         errormsg= "Error reading Gtid_list_log_event while searching "
@@ -1376,7 +1395,7 @@ gtid_state_from_pos(const char *name, ui
       uchar flags2;
       if (Gtid_log_event::peek(packet.ptr(), packet.length(),
                                current_checksum_alg, &gtid.domain_id,
-                               &gtid.server_id, &gtid.seq_no, &flags2))
+                               &gtid.server_id, &gtid.seq_no, &flags2, fdev))
       {
         errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
           "initial slave position";
@@ -1399,6 +1418,7 @@ gtid_state_from_pos(const char *name, ui
   }
 
 end:
+  delete fdev;
   end_io_cache(&cache);
   mysql_file_close(file, MYF(MY_WME));
 
@@ -1502,7 +1522,8 @@ send_event_to_slave(THD *thd, NET *net,
                     enum_gtid_until_state *gtid_until_group,
                     rpl_binlog_state *until_binlog_state,
                     bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
-                    bool *send_fake_gtid_list)
+                    bool *send_fake_gtid_list,
+                    Format_description_log_event *fdev)
 {
   my_off_t pos;
   size_t len= packet->length();
@@ -1516,7 +1537,7 @@ send_event_to_slave(THD *thd, NET *net,
     if (ev_offset > len ||
         Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
                                   current_checksum_alg,
-                                  &gtid_list, &list_len))
+                                  &gtid_list, &list_len, fdev))
     {
       my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
       return "Failed to read Gtid_list_log_event: corrupt binlog";
@@ -1545,7 +1566,7 @@ send_event_to_slave(THD *thd, NET *net,
           Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
                                current_checksum_alg,
                                &event_gtid.domain_id, &event_gtid.server_id,
-                               &event_gtid.seq_no, &flags2))
+                               &event_gtid.seq_no, &flags2, fdev))
       {
         my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
         return "Failed to read Gtid_log_event: corrupt binlog";
@@ -1881,6 +1902,7 @@ void mysql_binlog_send(THD* thd, char* l
 
   uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
   int old_max_allowed_packet= thd->variables.max_allowed_packet;
+  Format_description_log_event *fdev= NULL;
 
 #ifndef DBUG_OFF
   int left_events = max_binlog_dump_events;
@@ -1956,6 +1978,13 @@ void mysql_binlog_send(THD* thd, char* l
   }
 #endif
 
+  if (!(fdev= new Format_description_log_event(3)))
+  {
+    errmsg= "Out of memory initializing format_description event";
+    my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+    goto err;
+  }
+
   if (!mysql_bin_log.is_open())
   {
     errmsg = "Binary log is not open";
@@ -2119,6 +2148,8 @@ impossible position";
                    (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
        if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
+         Format_description_log_event *tmp;
+
          current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
                                                 packet->length() - ev_offset);
          DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
@@ -2136,6 +2167,18 @@ impossible position";
                              "slaves that cannot process them");
            goto err;
          }
+
+         if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+                                                     packet->length()-ev_offset,
+                                                     fdev)))
+         {
+           my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+           errmsg= "Corrupt Format_description event found or out-of-memory";
+           goto err;
+         }
+         delete fdev;
+         fdev= tmp;
+
          (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
@@ -2253,6 +2296,8 @@ impossible position";
 #endif
       if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
+        Format_description_log_event *tmp;
+
         current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
                                                packet->length() - ev_offset);
         DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
@@ -2271,6 +2316,17 @@ impossible position";
           goto err;
         }
 
+        if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+                                                    packet->length()-ev_offset,
+                                                    fdev)))
+        {
+          my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+          errmsg= "Corrupt Format_description event found or out-of-memory";
+          goto err;
+        }
+        delete fdev;
+        fdev= tmp;
+
         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
 
@@ -2295,7 +2351,7 @@ impossible position";
                                         until_gtid_state, &gtid_until_group,
                                         &until_binlog_state,
                                         slave_gtid_strict_mode, &error_gtid,
-                                        &send_fake_gtid_list)))
+                                        &send_fake_gtid_list, fdev)))
       {
         errmsg= tmp_msg;
         goto err;
@@ -2501,7 +2557,7 @@ impossible position";
                                             &gtid_skip_group, until_gtid_state,
                                             &gtid_until_group, &until_binlog_state,
                                             slave_gtid_strict_mode, &error_gtid,
-                                            &send_fake_gtid_list)))
+                                            &send_fake_gtid_list, fdev)))
           {
             errmsg= tmp_msg;
             goto err;
@@ -2599,6 +2655,7 @@ impossible position";
   thd->current_linfo = 0;
   mysql_mutex_unlock(&LOCK_thread_count);
   thd->variables.max_allowed_packet= old_max_allowed_packet;
+  delete fdev;
   DBUG_VOID_RETURN;
 
 err:
@@ -2674,6 +2731,7 @@ impossible position";
   if (file >= 0)
     mysql_file_close(file, MYF(MY_WME));
   thd->variables.max_allowed_packet= old_max_allowed_packet;
+  delete fdev;
 
   my_message(my_errno, error_text, MYF(0));
   DBUG_VOID_RETURN;


Follow ups

References