← Back to team overview

maria-developers team mailing list archive

Rev 3195: MWL#36: Add a mysqlbinlog option to change the used database in file:///home/psergey/dev/mysql-5.1-mwl36/

 

At file:///home/psergey/dev/mysql-5.1-mwl36/

------------------------------------------------------------
revno: 3195
revision-id: psergey@xxxxxxxxxxxx-20091121155306-33ze26zq4i0o3l71
parent: build@xxxxxxxxx-20091105202217-biphfh9cz8m1y82k
committer: Sergey Petrunya <psergey@xxxxxxxxxxxx>
branch nick: mysql-5.1-mwl36
timestamp: Sat 2009-11-21 17:53:06 +0200
message:
  MWL#36: Add a mysqlbinlog option to change the used database
  - Port the patch to MySQL
=== modified file 'client/Makefile.am'
--- a/client/Makefile.am	2009-09-03 12:29:25 +0000
+++ b/client/Makefile.am	2009-11-21 15:53:06 +0000
@@ -107,7 +107,8 @@
 	rpl_utility.h rpl_tblmap.h rpl_tblmap.cc \
 	log_event.cc my_decimal.h my_decimal.cc \
 	log_event_old.h log_event_old.cc \
-	rpl_record_old.h rpl_record_old.cc
+	rpl_record_old.h rpl_record_old.cc \
+        sql_list.h rpl_filter.h sql_list.cc rpl_filter.cc
 strings_src=decimal.c
 
 link_sources:

=== modified file 'client/client_priv.h'
--- a/client/client_priv.h	2008-01-31 16:46:50 +0000
+++ b/client/client_priv.h	2009-11-21 15:53:06 +0000
@@ -80,5 +80,6 @@
   OPT_FIX_TABLE_NAMES, OPT_FIX_DB_NAMES, OPT_SSL_VERIFY_SERVER_CERT,
   OPT_DEBUG_INFO, OPT_DEBUG_CHECK, OPT_COLUMN_TYPES, OPT_ERROR_LOG_FILE,
   OPT_WRITE_BINLOG, OPT_DUMP_DATE,
+  OPT_REWRITE_DB,
   OPT_MAX_CLIENT_OPTION
 };

=== modified file 'client/mysqlbinlog.cc'
--- a/client/mysqlbinlog.cc	2009-11-03 00:52:57 +0000
+++ b/client/mysqlbinlog.cc	2009-11-21 15:53:06 +0000
@@ -35,6 +35,15 @@
 #include "log_event.h"
 #include "sql_common.h"
 
+/* Needed for Rpl_filter */
+CHARSET_INFO* system_charset_info= &my_charset_utf8_general_ci;
+
+#include "sql_string.h"   // needed for Rpl_filter
+#include "sql_list.h"     // needed for Rpl_filter
+#include "rpl_filter.h"
+
+Rpl_filter *binlog_filter;
+
 #define BIN_LOG_HEADER_SIZE	4
 #define PROBE_HEADER_LEN	(EVENT_LEN_OFFSET+4)
 
@@ -619,6 +628,42 @@
 
 
 /**
+  Print "use <db>" statement when current db is to be changed.
+
+  We have to control emiting USE statements according to rewrite-db options.
+  We have to do it here (see process_event() below) and to suppress
+  producing USE statements by corresponding log event print-functions.
+*/
+
+void print_use_stmt(PRINT_EVENT_INFO* pinfo, const char* db, size_t db_len)
+{
+  // pinfo->db is the current db.
+  // If current db is the same as required db, do nothing.
+  if (!db || !memcmp(pinfo->db, db, db_len + 1))
+    return;
+
+  // Current db and required db are different.
+  // Check for rewrite rule for required db. (Note that in a rewrite rule
+  // neither db_from nor db_to part can be empty).
+  size_t len_to= 0;
+  const char *db_to= binlog_filter->get_rewrite_db(db, &len_to);
+
+  // If there is no rewrite rule for db (in this case len_to is left = 0),
+  // printing of the corresponding USE statement is left for log event
+  // print-function.
+  if (!len_to)
+    return;
+
+  // In case of rewrite rule print USE statement for db_to
+  fprintf(result_file, "use %s%s\n", db_to, pinfo->delimiter);
+
+  // Copy the *original* db to pinfo to suppress emiting
+  // of USE stmts by log_event print-functions.
+  memcpy(pinfo->db, db, db_len + 1);
+}
+
+
+/**
   Prints the given event in base64 format.
 
   The header is printed to the head cache and the body is printed to
@@ -729,11 +774,14 @@
 
     switch (ev_type) {
     case QUERY_EVENT:
+    {
+      Query_log_event *qe= (Query_log_event*)ev;
       if (strncmp(((Query_log_event*)ev)->query, "BEGIN", 5) && 
           strncmp(((Query_log_event*)ev)->query, "COMMIT", 6) && 
           strncmp(((Query_log_event*)ev)->query, "ROLLBACK", 8) &&  
-          shall_skip_database(((Query_log_event*)ev)->db))
+          shall_skip_database(qe->db))
         goto end;
+      print_use_stmt(print_event_info, qe->db, qe->db_len);
       if (opt_base64_output_mode == BASE64_OUTPUT_ALWAYS)
       {
         if ((retval= write_event_header_and_base64(ev, result_file,
@@ -744,6 +792,7 @@
       else
         ev->print(result_file, print_event_info);
       break;
+    }
 
     case CREATE_FILE_EVENT:
     {
@@ -861,6 +910,7 @@
 
       if (!shall_skip_database(exlq->db))
       {
+        print_use_stmt(print_event_info, exlq->db, exlq->db_len);
         if (fname)
         {
           convert_path_to_forward_slashes(fname);
@@ -884,6 +934,13 @@
         destroy_evt= FALSE;
         goto end;
       }
+      size_t len_to= 0;
+      const char* db_to= binlog_filter->get_rewrite_db(map->get_db_name(), &len_to);
+      if (len_to && map->rewrite_db(db_to, len_to, glob_description_event))
+      {
+        error("Could not rewrite database name");
+        goto err;
+      }
     }
     case WRITE_ROWS_EVENT:
     case DELETE_ROWS_EVENT:
@@ -968,14 +1025,16 @@
   retval= ERROR_STOP;
 end:
   rec_count++;
+  
   /*
-    Destroy the log_event object. If reading from a remote host,
-    set the temp_buf to NULL so that memory isn't freed twice.
+    Destroy the log_event object. 
+    MariaDB MWL#36: mainline does this:
+      If reading from a remote host,
+      set the temp_buf to NULL so that memory isn't freed twice.
+    We no longer do that, we use Rpl_filter::event_owns_temp_buf instead.
   */
   if (ev)
   {
-    if (remote_opt)
-      ev->temp_buf= 0;
     if (destroy_evt) /* destroy it later if not set (ignored table map) */
       delete ev;
   }
@@ -1142,6 +1201,10 @@
    "Used to reserve file descriptors for usage by this program",
    (uchar**) &open_files_limit, (uchar**) &open_files_limit, 0, GET_ULONG,
    REQUIRED_ARG, MY_NFILE, 8, OS_FILE_LIMIT, 0, 1, 0},
+  {"rewrite-db", OPT_REWRITE_DB,
+   "Updates to a database with a different name than the original. \
+Example: rewrite-db='from->to'.",
+   0, 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
   {0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
 };
 
@@ -1333,6 +1396,53 @@
         (find_type_or_exit(argument, &base64_output_mode_typelib, opt->name)-1);
     }
     break;
+  case OPT_REWRITE_DB:    // db_from->db_to
+  {
+    /* See also handling of OPT_REPLICATE_REWRITE_DB in sql/mysqld.cc */
+    char* ptr;
+    char* key= argument;  // db-from
+    char* val;            // db-to
+
+    // Where key begins
+    while (*key && my_isspace(&my_charset_latin1, *key))
+      key++;
+
+    // Where val begins
+    if (!(ptr= strstr(argument, "->")))
+    {
+      sql_print_error("Bad syntax in rewrite-db: missing '->'!\n");
+      return 1;
+    }
+    val= ptr + 2;
+    while (*val && my_isspace(&my_charset_latin1, *val))
+      val++;
+
+    // Write \0 and skip blanks at the end of key
+    *ptr-- = 0;
+    while (my_isspace(&my_charset_latin1, *ptr) && ptr > argument)
+      *ptr-- = 0;
+
+    if (!*key)
+    {
+      sql_print_error("Bad syntax in rewrite-db: empty db-from!\n");
+      return 1;
+    }
+
+    // Skip blanks at the end of val
+    ptr= val;
+    while (*ptr && !my_isspace(&my_charset_latin1, *ptr))
+      ptr++;
+    *ptr= 0;
+
+    if (!*val)
+    {
+      sql_print_error("Bad syntax in rewrite-db: empty db-to!\n");
+      return 1;
+    }
+
+    binlog_filter->add_db_rewrite(key, val);
+    break;
+  }
   case 'v':
     if (argument == disabled_my_option)
       verbose= 0;
@@ -1603,7 +1713,7 @@
       If reading from a remote host, ensure the temp_buf for the
       Log_event class is pointing to the incoming stream.
     */
-    ev->register_temp_buf((char *) net->read_pos + 1);
+    ev->register_temp_buf((char *) net->read_pos + 1, FALSE);
 
     Log_event_type type= ev->get_type_code();
     if (glob_description_event->binlog_version >= 3 ||
@@ -2003,6 +2113,8 @@
   return retval;
 }
 
+/* Used in sql_alloc(). Inited and freed in main() */
+MEM_ROOT s_mem_root;
 
 int main(int argc, char** argv)
 {
@@ -2015,6 +2127,13 @@
 
   my_init_time(); // for time functions
 
+  init_alloc_root(&s_mem_root, 16384, 0);
+  if (!(binlog_filter= new Rpl_filter))
+  {
+    error("Failed to create Rpl_filter");
+    exit(1);
+  }
+
   parse_args(&argc, (char***)&argv);
   defaults_argv=argv;
 
@@ -2101,6 +2220,8 @@
   if (result_file != stdout)
     my_fclose(result_file, MYF(0));
   cleanup();
+  delete binlog_filter;
+  free_root(&s_mem_root, MYF(0));
   free_defaults(defaults_argv);
   my_free_open_file_info();
   load_processor.destroy();
@@ -2112,6 +2233,12 @@
   DBUG_RETURN(retval == ERROR_STOP ? 1 : 0);
 }
 
+
+void *sql_alloc(size_t size)
+{
+  return alloc_root(&s_mem_root, size);
+}
+
 /*
   We must include this here as it's compiled with different options for
   the server
@@ -2122,4 +2249,7 @@
 #include "my_decimal.cc"
 #include "log_event.cc"
 #include "log_event_old.cc"
+#include "sql_string.cc"
+#include "sql_list.cc"
+#include "rpl_filter.cc"
 

=== modified file 'client/sql_string.cc'
--- a/client/sql_string.cc	2009-03-24 13:58:52 +0000
+++ b/client/sql_string.cc	2009-11-21 15:53:06 +0000
@@ -26,15 +26,6 @@
 #ifdef HAVE_FCONVERT
 #include <floatingpoint.h>
 #endif
-
-/*
-  The following extern declarations are ok as these are interface functions
-  required by the string function
-*/
-
-extern void sql_alloc(size_t size);
-extern void sql_element_free(void *ptr);
-
 #include "sql_string.h"
 
 /*****************************************************************************

=== modified file 'client/sql_string.h'
--- a/client/sql_string.h	2007-03-08 03:27:41 +0000
+++ b/client/sql_string.h	2009-11-21 15:53:06 +0000
@@ -15,6 +15,9 @@
 
 /* This file is originally from the mysql distribution. Coded by monty */
 
+#ifndef CLIENT_SQL_STRING_H
+#define CLIENT_SQL_STRING_H
+
 #ifdef USE_PRAGMA_INTERFACE
 #pragma interface			/* gcc class implementation */
 #endif
@@ -353,3 +356,5 @@
     return (s->alloced && Ptr >= s->Ptr && Ptr < s->Ptr + s->str_length);
   }
 };
+
+#endif 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2009-10-23 03:13:42 +0000
+++ b/sql/log_event.cc	2009-11-21 15:53:06 +0000
@@ -1122,7 +1122,7 @@
     goto err;
   }
   if ((res= read_log_event(buf, data_len, &error, description_event)))
-    res->register_temp_buf(buf);
+    res->register_temp_buf(buf, TRUE);
 
 err:
   UNLOCK_MUTEX;
@@ -8041,6 +8041,111 @@
   my_free(m_memory, MYF(MY_ALLOW_ZERO_PTR));
 }
 
+
+#ifdef MYSQL_CLIENT
+
+/*
+  Rewrite database name for the event to name specified by new_db
+  SYNOPSIS
+    new_db   Database name to change to
+    new_len  Length
+    desc     Event describing binlog that we're writing to.
+
+  DESCRIPTION
+    Reset db name. This function assumes that temp_buf member contains event
+    representation taken from a binary log. It resets m_dbnam and m_dblen and
+    rewrites temp_buf with new db name.
+
+  RETURN 
+    0     - Success
+    other - Error
+*/
+
+int Table_map_log_event::rewrite_db(const char* new_db, size_t new_len,
+                                    const Format_description_log_event* desc)
+{
+  DBUG_ENTER("Table_map_log_event::rewrite_db");
+  DBUG_ASSERT(temp_buf);
+
+  uint header_len= min(desc->common_header_len,
+                       LOG_EVENT_MINIMAL_HEADER_LEN) + TABLE_MAP_HEADER_LEN;
+  int len_diff;
+
+  if (!(len_diff= new_len - m_dblen))
+  {
+    memcpy((void*) (temp_buf + header_len + 1), new_db, m_dblen + 1);
+    memcpy((void*) m_dbnam, new_db, m_dblen + 1);
+    DBUG_RETURN(0);
+  }
+
+  // Create new temp_buf
+  ulong event_cur_len= uint4korr(temp_buf + EVENT_LEN_OFFSET);
+  ulong event_new_len= event_cur_len + len_diff;
+  char* new_temp_buf= (char*) my_malloc(event_new_len, MYF(MY_WME));
+
+  if (!new_temp_buf)
+  {
+    sql_print_error("Table_map_log_event::rewrite_db: "
+                    "failed to allocate new temp_buf (%d bytes required)",
+                    event_new_len);
+    DBUG_RETURN(-1);
+  }
+
+  // Rewrite temp_buf
+  char* ptr= new_temp_buf;
+  ulong cnt= 0;
+
+  // Copy header and change event length
+  memcpy(ptr, temp_buf, header_len);
+  int4store(ptr + EVENT_LEN_OFFSET, event_new_len);
+  ptr += header_len;
+  cnt += header_len;
+
+  // Write new db name length and new name
+  *ptr++ = new_len;
+  memcpy(ptr, new_db, new_len + 1);
+  ptr += new_len + 1;
+  cnt += m_dblen + 2;
+
+  // Copy rest part
+  memcpy(ptr, temp_buf + cnt, event_cur_len - cnt);
+
+  // Reregister temp buf
+  free_temp_buf();
+  register_temp_buf(new_temp_buf, TRUE);
+
+  // Reset m_dbnam and m_dblen members
+  m_dblen= new_len;
+
+  // m_dbnam resides in m_memory together with m_tblnam and m_coltype
+  uchar* memory= m_memory;
+  char const* tblnam= m_tblnam;
+  uchar* coltype= m_coltype;
+
+  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
+                                     &m_dbnam, (uint) m_dblen + 1,
+                                     &m_tblnam, (uint) m_tbllen + 1,
+                                     &m_coltype, (uint) m_colcnt,
+                                     NullS);
+
+  if (!m_memory)
+  {
+    sql_print_error("Table_map_log_event::rewrite_db: "
+                    "failed to allocate new m_memory (%d + %d + %d bytes required)",
+                    m_dblen + 1, m_tbllen + 1, m_colcnt);
+    DBUG_RETURN(-1);
+  }
+
+  memcpy((void*)m_dbnam, new_db, m_dblen + 1);
+  memcpy((void*)m_tblnam, tblnam, m_tbllen + 1);
+  memcpy(m_coltype, coltype, m_colcnt);
+
+  my_free(memory, MYF(MY_WME));
+  DBUG_RETURN(0);
+}
+#endif /* MYSQL_CLIENT */
+
+
 /*
   Return value is an error code, one of:
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2009-09-28 12:41:10 +0000
+++ b/sql/log_event.h	2009-11-21 15:53:06 +0000
@@ -873,6 +873,13 @@
      event's type, and its content is distributed in the event-specific fields.
   */
   char *temp_buf;
+  
+  /*
+    TRUE <=> this event 'owns' temp_buf and should call my_free() when done
+    with it
+  */
+  bool event_owns_temp_buf;
+
   /*
     Timestamp on the master(for debugging and replication of
     NOW()/TIMESTAMP).  It is important for queries and LOAD DATA
@@ -1014,12 +1021,17 @@
   Log_event(const char* buf, const Format_description_log_event
             *description_event);
   virtual ~Log_event() { free_temp_buf();}
-  void register_temp_buf(char* buf) { temp_buf = buf; }
+  void register_temp_buf(char* buf, bool must_free) 
+  { 
+    temp_buf= buf; 
+    event_owns_temp_buf= must_free;
+  }
   void free_temp_buf()
   {
     if (temp_buf)
     {
-      my_free(temp_buf, MYF(0));
+      if (event_owns_temp_buf)
+        my_free(temp_buf, MYF(0));
       temp_buf = 0;
     }
   }
@@ -3310,6 +3322,8 @@
   ulong get_table_id() const        { return m_table_id; }
   const char *get_table_name() const { return m_tblnam; }
   const char *get_db_name() const    { return m_dbnam; }
+  int rewrite_db(const char* new_name, size_t new_name_len,
+                 const Format_description_log_event*);
 #endif
 
   virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; }

=== modified file 'sql/mysql_priv.h'
--- a/sql/mysql_priv.h	2009-10-27 13:20:34 +0000
+++ b/sql/mysql_priv.h	2009-11-21 15:53:06 +0000
@@ -91,12 +91,16 @@
 #include "unireg.h"
 
 void init_sql_alloc(MEM_ROOT *root, uint block_size, uint pre_alloc_size);
+#endif // MYSQL_CLIENT
+
 void *sql_alloc(size_t);
 void *sql_calloc(size_t);
 char *sql_strdup(const char *str);
 char *sql_strmake(const char *str, size_t len);
 void *sql_memdup(const void * ptr, size_t size);
 void sql_element_free(void *ptr);
+
+#ifndef MYSQL_CLIENT
 char *sql_strmake_with_convert(const char *str, size_t arg_length,
 			       CHARSET_INFO *from_cs,
 			       size_t max_res_length,

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2009-11-03 00:52:57 +0000
+++ b/sql/mysqld.cc	2009-11-21 15:53:06 +0000
@@ -7970,6 +7970,7 @@
   }
   case (int)OPT_REPLICATE_REWRITE_DB:
   {
+    /* See also OPT_REWRITE_DB handling in client/mysqlbinlog.cc */
     char* key = argument,*p, *val;
 
     if (!(p= strstr(argument, "->")))

=== modified file 'sql/rpl_filter.cc'
--- a/sql/rpl_filter.cc	2009-09-10 07:40:57 +0000
+++ b/sql/rpl_filter.cc	2009-11-21 15:53:06 +0000
@@ -45,6 +45,7 @@
 }
 
 
+#ifndef MYSQL_CLIENT
 /*
   Returns true if table should be logged/replicated 
 
@@ -129,6 +130,7 @@
               !do_table_inited && !wild_do_table_inited);
 }
 
+#endif
 
 /*
   Checks whether a db matches some do_db and ignore_db rules
@@ -514,6 +516,13 @@
 }
 
 
+bool
+Rpl_filter::rewrite_db_is_empty()
+{
+  return rewrite_db.is_empty();
+}
+
+
 const char*
 Rpl_filter::get_rewrite_db(const char* db, size_t *new_len)
 {

=== modified file 'sql/rpl_filter.h'
--- a/sql/rpl_filter.h	2007-05-10 09:59:39 +0000
+++ b/sql/rpl_filter.h	2009-11-21 15:53:06 +0000
@@ -42,7 +42,9 @@
  
   /* Checks - returns true if ok to replicate/log */
 
-  bool tables_ok(const char* db, TABLE_LIST* tables);
+#ifndef MYSQL_CLIENT
+  bool tables_ok(const char* db, TABLE_LIST *tables);
+#endif 
   bool db_ok(const char* db);
   bool db_ok_with_wild_table(const char *db);
 
@@ -69,6 +71,7 @@
   void get_wild_do_table(String* str);
   void get_wild_ignore_table(String* str);
 
+  bool rewrite_db_is_empty();
   const char* get_rewrite_db(const char* db, size_t *new_len);
 
   I_List<i_string>* get_do_db();

=== modified file 'sql/sql_string.cc'
--- a/sql/sql_string.cc	2009-07-31 17:14:52 +0000
+++ b/sql/sql_string.cc	2009-11-21 15:53:06 +0000
@@ -37,6 +37,9 @@
 
 #include "sql_string.h"
 
+#ifdef MYSQL_CLIENT
+#error Attempt to use server-side sql_string on client. Use client/sql_string.cc
+#endif 
 /*****************************************************************************
 ** String functions
 *****************************************************************************/

=== modified file 'sql/sql_string.h'
--- a/sql/sql_string.h	2009-07-31 17:14:52 +0000
+++ b/sql/sql_string.h	2009-11-21 15:53:06 +0000
@@ -1,3 +1,5 @@
+#ifndef MYSQL_SQL_STRING_H_INCLUDED
+#define MYSQL_SQL_STRING_H_INCLUDED
 /* Copyright (C) 2000 MySQL AB
 
    This program is free software; you can redistribute it and/or modify
@@ -23,6 +25,10 @@
 #define NOT_FIXED_DEC			31
 #endif
 
+#ifdef MYSQL_CLIENT
+#error Attempt to use server-side sql_string on client. Use client/sql_string.h
+#endif 
+
 class String;
 int sortcmp(const String *a,const String *b, CHARSET_INFO *cs);
 String *copy_if_not_alloced(String *a,String *b,uint32 arg_length);
@@ -389,3 +395,5 @@
 {
   return str+ cs->cset->scan(cs, str, end, MY_SEQ_SPACES) == end;
 }
+
+#endif // MYSQL_SQL_STRING_H_INCLUDED

=== modified file 'sql/thr_malloc.cc'
--- a/sql/thr_malloc.cc	2009-06-29 14:00:47 +0000
+++ b/sql/thr_malloc.cc	2009-11-21 15:53:06 +0000
@@ -59,11 +59,13 @@
 }
 
 
+#ifndef MYSQL_CLIENT
 void *sql_alloc(size_t Size)
 {
   MEM_ROOT *root= *my_pthread_getspecific_ptr(MEM_ROOT**,THR_MALLOC);
   return alloc_root(root,Size);
 }
+#endif
 
 
 void *sql_calloc(size_t size)