← Back to team overview

maria-developers team mailing list archive

Re: c850799967c: MDEV-10963 Fragmented BINLOG query

 

Hi, Andrei!

First, when I run it I got:

Warning: 112 bytes lost at 0x7f212f26a670, allocated by T@0 at sql/sql_binlog.cc:68, sql/sql_binlog.cc:181, sql/sql_parse.cc:5625, sql/sql_parse.cc:7465, sql/sql_parse.cc:1497, sql/sql_parse.cc:1124, sql/sql_connect.cc:1330, sql/sql_connect.cc:1243

so you have a memory leak with your thd->lex->comment.str

Second, BINGLOG CONCAT() is totally possible, inventing a new keyword
defeats the whole point of using a familiar and intuitive syntax.
I've attached a simple patch that implements it.

On the other hand, I just thought that CONCAT() syntax might be not so
intuitive after all, as - strictly speaking - a result of CONCAT cannot
be larger than max_allowed_packet. So it might be confusing to use
CONCAT here. May be just use your idea of

  BINLOG @a, @b;

with no CONCAT or anything else at all?

Also, I'm thinking whether BINLOG (in this last form) could
automatically clear its variables @a and @b. Without an explicit SET
@a=NULL. It's a bit weird to have such a side effect. But it'll help to
save memory - the event size it over 3GB, right? So you'll have it
split in two variables. Then you create a concatenated copy (it's over
3GB more), then you decode it (already more than 9GB in memory), and
then it's executed where copies of huge blob values are created again,
may be more than once. Freeing @a and @b and the concatenated copy
as soon as possible will save over 6GB in RAM usage.

Note, we cannot auto-reset variables in the syntax with CONCAT(),
DEFRAGMENT() or whatever, because functions cannot do it to their
arguments.

On Oct 18, Andrei Elkin wrote:
> revision-id: c850799967c561d08242987269d94af6ae4c7c5e (mariadb-10.1.35-59-gc850799967c)
> parent(s): d3a8b5aa9cee24a6397662e30df2e915f45460e0
> author: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
> committer: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
> timestamp: 2018-09-21 18:48:06 +0300
> message:
> 
> MDEV-10963 Fragmented BINLOG query
> 
> The problem was originally stated in
>   http://bugs.mysql.com/bug.php?id=82212
> The size of an base64-encoded Rows_log_event exceeds its
> vanilla byte representation in 4/3 times.
> When a binlogged event size is about 1GB mysqlbinlog generates
> a BINLOG query that can't be send out due to its size.
> 
> It is fixed with fragmenting the BINLOG argument C-string into
> (approximate) halves when the base64 encoded event is over 1GB size.
> The mysqlbinlog in such case puts out
> 
>     SET @binlog_fragment_0='base64-encoded-fragment_0';
>     SET @binlog_fragment_1='base64-encoded-fragment_1';
>     BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
> 
> to represent a big BINLOG 'base64-encoded-"total"'.
> Two more statements are composed to promptly release memory
>     SET @binlog_fragment_0=NULL;
>     SET @binlog_fragment_1=NULL;
> 
> The 2 fragments are enough, though the client and server still may
> need to tweak their @@max_allowed_packet to satisfy to the fragment
> size (which they would have to do anyway with greater number of
> fragments, should that be desired).
> 
> On the lower level the following changes are made:
> 
> Log_event::print_base64()
>   remains to call encoder and store the encoded data into a cache but
>   now *without* doing any formatting. The latter is left for time
>   when the cache is copied to an output file (e.g mysqlbinlog output).
>   No formatting behavior is also reflected by the change in the meaning
>   of the last argument which specifies whether to cache the encoded data.
> 
> my_b_copy_to_file()
>   is turned into my_b_copy_to_file_frag()
>   which accepts format specifier arguments to build a proper syntax BINLOG
>   query in both the fragmented (n_frag > 1) and non-fragmented (n_frag == 1)
>   cases.
> 
> Rows_log_event::print_helper()
>   Takes decision whether to fragment, prepares respective format specifiers
>   and invokes the cache-to-file copying function, which is now
> 
> copy_cache_frag_to_file_and_reinit()
>   replaces original copy_event_cache_to_file_and_reinit() to pass
>   extra arguments to my_b_copy_to_file() successor of
> 
> my_b_copy_to_file_frag()
>   replaces the former pure copier not to also conduct wrapping the encoded
>   data per format specifiers.
>   With its 'n_frag' argument as 1
>   and the rest or args NULL works as the original function.
> 
> diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> new file mode 100644
> index 00000000000..5318f8acec6
> --- /dev/null
> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> @@ -0,0 +1,43 @@
> +--source include/have_debug.inc
> +--source include/have_log_bin.inc
> +--source include/have_binlog_format_row.inc
> +
> +--let $MYSQLD_DATADIR= `select @@datadir`
> +--let $max_size=1024
> +
> +CREATE TABLE t (a TEXT);
> +# events of interest are guaranteed to stay in 000001 log
> +RESET MASTER;
> +--eval INSERT INTO t SET a=repeat('a', $max_size)
> +SELECT a from t into @a;
> +FLUSH LOGS;
> +DELETE FROM t;
> +
> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +
> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql

how do you know the event was split, if you aren't looking into
mysqlbinlog.sql file? may be your code doesn't work at all, or it was
removed by some incorrect merge? the test will pass anyway...

> +
> +SELECT a LIKE @a as 'true' FROM t;
> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
> +
> +# improper syntax error
> +--echo BINLOG number-of-fragments must be exactly two
> +--error ER_PARSE_ERROR
> +BINLOG DEFRAGMENT(@binlog_fragment);
> +--error ER_PARSE_ERROR
> +BINLOG DEFRAGMENT(@binlog_fragment, @binlog_fragment, @binlog_fragment);
> +
> +# corrupted fragments error check (to the expected error code notice,
> +# the same error code occurs in a similar unfragmented case)
> +SET @binlog_fragment_0='012345';
> +SET @binlog_fragment_1='012345';
> +--error ER_SYNTAX_ERROR
> +BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
> +
> +# Not existing fragment is not allowed
> +SET @binlog_fragment_0='012345';
> +--error ER_BASE64_DECODE_ERROR
> +BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_not_exist);
> +
> +--echo # Cleanup
> +DROP TABLE t;
> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
> index 2499094037d..6b7ff8a7568 100644
> --- a/mysys/mf_iocache2.c
> +++ b/mysys/mf_iocache2.c
> @@ -22,13 +22,23 @@
>  #include <stdarg.h>
>  #include <m_ctype.h>
>  
> -/*
> +/**
>    Copy contents of an IO_CACHE to a file.
>  
>    SYNOPSIS
> -    my_b_copy_to_file()
> -    cache  IO_CACHE to copy from
> -    file   File to copy to
> +    my_b_copy_to_file_frag
> +
> +    cache                    IO_CACHE to copy from
> +    file                     File to copy to
> +    n_frag                   # of fragments
> +
> +    Other arguments represent format strings to enable wrapping
> +    of the fragments and their total, including
> +
> +    before_frag              before a fragment
> +    after_frag               after a fragment
> +    after_last_frag          after all the fragments
> +    final_per_frag           in the end per each fragment

if you edit a comment, change it to Doxygen syntax.
Especially, as you start it with Doxygen's /**, it is simply invalid to
have SYNOPSIS and DESCRIPTION inside.

>  
>    DESCRIPTION
>      Copy the contents of the cache to the file. The cache will be
> @@ -38,33 +48,120 @@
>      If a failure to write fully occurs, the cache is only copied
>      partially.
>  
> -  TODO
> -    Make this function solid by handling partial reads from the cache
> -    in a correct manner: it should be atomic.
> +    The copying is made in so many steps as the number of fragments as
> +    specified by the parameter 'n_frag'.  Each step is wrapped with
> +    writing to the file 'before_frag' and 'after_frag' formated
> +    strings, unless the parameters are NULL. In the end, optionally,
> +    first 'after_last_frag' string is appended to 'file' followed by
> +    'final_per_frag' per each fragment.
> +    final item.
>  
>    RETURN VALUE
>      0  All OK
>      1  An error occurred
> +
> +  TODO
> +    Make this function solid by handling partial reads from the cache
> +    in a correct manner: it should be atomic.
>  */
>  int
> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
> +my_b_copy_to_file_frag(IO_CACHE *cache, FILE *file,
> +                       uint n_frag,
> +                       const char* before_frag,
> +                       const char* after_frag,
> +                       const char* after_last,
> +                       const char* final_per_frag,
> +                       char* buf)
>  {
> -  size_t bytes_in_cache;
> -  DBUG_ENTER("my_b_copy_to_file");
> +  size_t bytes_in_cache;         // block, may have short size in the last one
> +  size_t written_off_last_block; // consumed part of the block by last fragment
> +  size_t total_size= my_b_tell(cache);
> +  size_t frag_size= total_size / n_frag + 1;
> +  size_t total_written= 0;
> +  size_t frag_written;           // bytes collected in the current fragment
> +  uint i;
> +
> +  DBUG_ENTER("my_b_copy_to_file_frag");
> +
> +  DBUG_ASSERT(cache->type == WRITE_CACHE);
>  
>    /* Reinit the cache to read from the beginning of the cache */
>    if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>      DBUG_RETURN(1);
> -  bytes_in_cache= my_b_bytes_in_cache(cache);
> -  do
> -  {
> -    if (my_fwrite(file, cache->read_pos, bytes_in_cache,
> -                  MYF(MY_WME | MY_NABP)) == (size_t) -1)
> -      DBUG_RETURN(1);
> -  } while ((bytes_in_cache= my_b_fill(cache)));
> -  if(cache->error == -1)
> -    DBUG_RETURN(1);
> -  DBUG_RETURN(0);
> +
> +  for (i= 0, written_off_last_block= 0, bytes_in_cache= my_b_bytes_in_cache(cache);
> +       i < n_frag;
> +       i++, total_written += frag_written)
> +  {
> +       frag_written= 0;
> +       if (before_frag)
> +       {
> +            sprintf(buf, before_frag, i);
> +            my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +       }
> +       do
> +       {
> +            /*
> +              Either the current block is the last (L) in making the
> +              current fragment (and possibly has some extra not to fit (LG) into
> +              the fragment), or (I) the current (whole then) block is
> +              intermediate.
> +            */
> +            size_t block_to_write= (frag_written + bytes_in_cache >= frag_size) ?
> +                 frag_size - frag_written : bytes_in_cache;
> +
> +            DBUG_ASSERT(n_frag != 1 ||
> +                        (block_to_write == bytes_in_cache &&
> +                         written_off_last_block == 0));
> +
> +            if (my_fwrite(file, cache->read_pos + written_off_last_block,
> +                          block_to_write,
> +                          MYF(MY_WME | MY_NABP)) == (size_t) -1)
> +                 /* no cache->error is set here */
> +                 DBUG_RETURN(1);
> +
> +            frag_written += block_to_write;
> +            if (frag_written == frag_size)                 // (L)
> +            {
> +                 DBUG_ASSERT(block_to_write <= bytes_in_cache);
> +                 written_off_last_block= block_to_write;
> +                 bytes_in_cache -= written_off_last_block; // (LG) when bytes>0
> +                 /*
> +                   Nothing should be left in cache at the end of the
> +                   last fragment construction.
> +                 */
> +                 DBUG_ASSERT(i != n_frag - 1 || bytes_in_cache == 0);
> +
> +                 break;
> +            }
> +            else
> +            {
> +                 written_off_last_block= 0; // (I)
> +            }
> +       } while ((bytes_in_cache= my_b_fill(cache)));
> +
> +       if (after_frag)
> +       {
> +            sprintf(buf, after_frag, NULL);
> +            my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +       }
> +  }
> +
> +  DBUG_ASSERT(total_written == total_size); // output == input
> +
> +  if (after_last)
> +  {
> +       sprintf(buf, after_last, n_frag);
> +       my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +  }
> +
> +  for (i= 0; final_per_frag && i < n_frag ; i++)
> +  {
> +       sprintf(buf, final_per_frag, i);
> +       my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +  }
> +
> +  DBUG_RETURN(cache->error == -1);

this is a big and very specialized function with non-standard
indentation. Please, move it from mysys to mysqlbinlog.cc and fix the
indentation.

>  }
>  
>  
> diff --git a/sql/log_event.cc b/sql/log_event.cc
> index e1912ad4620..0c39200148f 100644
> --- a/sql/log_event.cc
> +++ b/sql/log_event.cc
> @@ -10474,12 +10482,108 @@ void Rows_log_event::pack_info(Protocol *protocol)
>  #endif
>  
>  #ifdef MYSQL_CLIENT
> +void copy_cache_to_file_wrapped(FILE *file,
> +                                PRINT_EVENT_INFO *print_event_info,
> +                                IO_CACHE *body,
> +                                bool do_print_encoded)
> +{
> +  uint n_frag= 1;
> +  const char* before_frag= NULL;
> +  char* after_frag= NULL;
> +  char* after_last= NULL;
> +  char* final_per_frag= NULL;
> +  /*
> +    2 fragments can always represent near 1GB row-based
> +    base64-encoded event as two strings each of size less than
> +    max(max_allowed_packet).  Greater number of fragments does not
> +    save from potential need to tweak (increase) @@max_allowed_packet
> +    before to process the fragments. So 2 is safe and enough.
> +  */
> +  const char fmt_last_frag2[]=
> +    "\nBINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1)%s\n";
> +  const char fmt_last_per_frag[]= "\nSET @binlog_fragment_%%d = NULL%s\n";

this is completely unnecessary "generalization", print simply

  "SET @binlog_fragment_0=NULL,binlog_fragment_1=NULL%s\n"

in particular, as your "BINLOG DEFRAGMENT" statement is not generalized.
(or to the cleanup inside BINLOG, as discussed above).

> +  const char fmt_before_frag[]= "\nSET @binlog_fragment_%d ='\n";
> +  /*
> +    Buffer to pass to copy_cache_frag_to_file_and_reinit to
> +    compute formatted strings according to specifiers.
> +    The sizes may depend on an actual fragment number size in terms of decimal
> +    signs so its maximum is estimated (not precisely yet safely) below.
> +  */
> +  char buf[(sizeof(fmt_last_frag2) + sizeof(fmt_last_per_frag))
> +           + ((sizeof(n_frag) * 8)/3 + 1)                // decimal index
> +           + sizeof(print_event_info->delimiter) + 3];   // delim, \n and 0.
> +
> +  if (do_print_encoded)
> +  {
> +    after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> +    sprintf(after_frag, "'%s\n", print_event_info->delimiter);
> +    if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size)
> +      n_frag= 2;
> +    if (n_frag > 1)
> +    {
> +      before_frag= fmt_before_frag;
> +      after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> +      sprintf(after_last, fmt_last_frag2, (char*) print_event_info->delimiter);
> +      final_per_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> +      sprintf(final_per_frag, fmt_last_per_frag,
> +              (char*) print_event_info->delimiter);
> +    }
> +    else
> +    {
> +      before_frag= "\nBINLOG '\n";
> +    }
> +  }
> +  if (copy_cache_frag_to_file_and_reinit(body, file, n_frag,
> +                                         before_frag, after_frag,
> +                                         after_last, final_per_frag, buf))

1. get rid of this copy_cache_frag_to_file_and_reinit function, it's not
   helping.

2. instead of pushing all this complex logic of printing fragments with
   before and after and in-between and start and final, it'd be *much*
   simpler to do it here and just extend my_b_copy_to_file() to print at
   most N bytes.

> +  {
> +    body->error= -1;
> +    goto err;
> +  }
> +
> +err:
> +  my_free(after_frag);
> +  my_free(after_last);
> +  my_free(final_per_frag);
> +}
> +
> +/*
> +  The function invokes base64 encoder to run on the current
> +  event string and store the result into two caches.
> +  When the event ends the current statement the caches are is copied into
> +  the argument file.
> +  Copying is also concerned how to wrap the event, specifically to produce
> +  a valid SQL syntax.
> +  When the encoded data size is within max(MAX_ALLOWED_PACKET)
> +  a regular BINLOG query is composed. Otherwise it is build as fragmented
> +
> +    SET @binlog_fragment_0='...';
> +    SET @binlog_fragment_1='...';
> +    BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
> +
> +  where fragments are represented by a sequence of "indexed" user
> +  variables.
> +  Two more statements are composed as well
> +
> +    SET @binlog_fragment_0=NULL;
> +    SET @binlog_fragment_1=NULL;
> +
> +  to promptly release memory.
> +
> +  NOTE.
> +  If any changes made don't forget to duplicate them to
> +  Old_rows_log_event as long as it's supported.
> +*/
>  void Rows_log_event::print_helper(FILE *file,
>                                    PRINT_EVENT_INFO *print_event_info,
>                                    char const *const name)
>  {
>    IO_CACHE *const head= &print_event_info->head_cache;
>    IO_CACHE *const body= &print_event_info->body_cache;
> +  bool do_print_encoded=
> +    print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
> +    !print_event_info->short_form;
> +
>    if (!print_event_info->short_form)
>    {
>      bool const last_stmt_event= get_flags(STMT_END_F);
> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
> index 91cf038907e..b15d72e036a 100644
> --- a/sql/sql_binlog.cc
> +++ b/sql/sql_binlog.cc
> @@ -28,6 +28,73 @@
>                                                  // START_EVENT_V3,
>                                                  // Log_event_type,
>                                                  // Log_event
> +
> +/*
> +  Copy fragments into the standard placeholder thd->lex->comment.str.
> +
> +  Compute the size of the (still) encoded total,
> +  allocate and then copy fragments one after another.
> +  The size can exceed max(max_allowed_packet) which is not a
> +  problem as no String instance is created off this char array.
> +
> +  Return 0 at success, -1 otherwise.
> +*/
> +int binlog_defragment(THD *thd)
> +{
> +  LEX_STRING *curr_frag_name;
> +
> +  thd->lex->comment.length= 0;
> +  thd->lex->comment.str= NULL;
> +  /* compute the total size */
> +  for (uint i= 0; i < thd->lex->fragmented_binlog_event.n_frag; i++)
> +  {
> +    user_var_entry *entry;
> +
> +    curr_frag_name= &thd->lex->fragmented_binlog_event.frag_name[i];
> +    entry=
> +      (user_var_entry*) my_hash_search(&thd->user_vars,
> +                                       (uchar*) curr_frag_name->str,
> +                                       curr_frag_name->length);
> +    if (!entry || entry->type != STRING_RESULT)
> +    {
> +      my_printf_error(ER_BASE64_DECODE_ERROR,
> +                      "%s: BINLOG fragment user "
> +                      "variable '%s' has unexpectedly no value", MYF(0),
> +                      ER_THD(thd, ER_BASE64_DECODE_ERROR), curr_frag_name->str);
> +      return -1;
> +    }
> +    thd->lex->comment.length += entry->length;
> +  }
> +  thd->lex->comment.str=                            // to be freed by the caller
> +    (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
> +  if (!thd->lex->comment.str)
> +  {
> +    my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
> +    return -1;
> +  }
> +
> +  /* fragments are merged into allocated buf */
> +  size_t gathered_length= 0;
> +  for (uint i= 0; i < thd->lex->fragmented_binlog_event.n_frag; i++)
> +  {
> +    user_var_entry *entry;
> +
> +    curr_frag_name= &thd->lex->fragmented_binlog_event.frag_name[i];
> +    entry=
> +      (user_var_entry*) my_hash_search(&thd->user_vars,
> +                                       (uchar*) curr_frag_name->str,
> +                                       curr_frag_name->length);
> +    memcpy(thd->lex->comment.str + gathered_length,
> +           entry->value, entry->length);
> +
> +    gathered_length += entry->length;
> +  }
> +  DBUG_ASSERT(gathered_length == thd->lex->comment.length);

ok.

1. BINLOG uses LEX::comment to avoid increasing sizeof(LEX) for a rarely
   used statement. So don't increase it for a much-more-rarely used
   event overlow case. That is, remove fragmented_binlog_event. You can
   use a pair of LEX_STRING's in LEX (e.g. comment/ident) or (less
   hackish) List<LEX_STRING>, which are plenty in LEX.

2. Don't look up variables twice, remember them in the first loop.
   You can unroll it too, there're only two variables anyway. E.g. the
   second loop can be replaced by just

   memcpy(thd->lex->comment.str, var[0]->value, var[0]->length);
   memcpy(thd->lex->comment.str + var[0]->length, var[1]->value, var[1]->length);

   or even shorter, if you start your function with
   
   LEX_STRING *out= &thd->lex->comment;

3. Not "unexpectedly no value". ER_WRONG_TYPE_FOR_VAR looks more
   appropriate there.

> +
> +  return 0;
> +}
> +
> +
>  /**
>    Execute a BINLOG statement.

Regards,
Sergei
Chief Architect MariaDB
and security@xxxxxxxxxxx
diff --git a/mysql-test/suite/binlog/r/binlog_base64_flag.result b/mysql-test/suite/binlog/r/binlog_base64_flag.result
index a1d39f9ef7b..1522604060b 100644
--- a/mysql-test/suite/binlog/r/binlog_base64_flag.result
+++ b/mysql-test/suite/binlog/r/binlog_base64_flag.result
@@ -38,7 +38,7 @@ TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE=
 TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA==
 ';
 SET @binlog_fragment_1='';
-BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
+BINLOG CONCAT(@binlog_fragment_0, @binlog_fragment_1);
 select * from t1;
 a
 1
diff --git a/mysql-test/suite/binlog/t/binlog_base64_flag.test b/mysql-test/suite/binlog/t/binlog_base64_flag.test
index 19e8ccc1905..5f45251392a 100644
--- a/mysql-test/suite/binlog/t/binlog_base64_flag.test
+++ b/mysql-test/suite/binlog/t/binlog_base64_flag.test
@@ -86,7 +86,7 @@ TFtYRxMBAAAAKQAAAH8BAAAAABAAAAAAAAAABHRlc3QAAnQxAAEDAAE=
 TFtYRxcBAAAAIgAAAKEBAAAQABAAAAAAAAEAAf/+AwAAAA==
 ';
 SET @binlog_fragment_1='';
-BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
+BINLOG CONCAT(@binlog_fragment_0, @binlog_fragment_1);
 # The above line should succeed and 3 should be in the table
 select * from t1;
 
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 82f6bbd3173..8ced934143e 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -631,19 +631,6 @@ class Create_func_compress : public Create_func_arg1
 };
 
 
-class Create_func_concat : public Create_native_func
-{
-public:
-  virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
-
-  static Create_func_concat s_singleton;
-
-protected:
-  Create_func_concat() {}
-  virtual ~Create_func_concat() {}
-};
-
-
 class Create_func_decode_histogram : public Create_func_arg2
 {
 public:
@@ -3475,26 +3462,6 @@ Create_func_dyncol_json::create_1_arg(THD *thd, Item *arg1)
   return new (thd->mem_root) Item_func_dyncol_json(thd, arg1);
 }
 
-Create_func_concat Create_func_concat::s_singleton;
-
-Item*
-Create_func_concat::create_native(THD *thd, LEX_STRING name,
-                                  List<Item> *item_list)
-{
-  int arg_count= 0;
-
-  if (item_list != NULL)
-    arg_count= item_list->elements;
-
-  if (arg_count < 1)
-  {
-    my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
-    return NULL;
-  }
-
-  return new (thd->mem_root) Item_func_concat(thd, *item_list);
-}
-
 Create_func_decode_histogram Create_func_decode_histogram::s_singleton;
 
 Item *
@@ -5778,7 +5745,6 @@ static Native_func_registry func_array[] =
   { { C_STRING_WITH_LEN("COLUMN_LIST") }, BUILDER(Create_func_dyncol_list)},
   { { C_STRING_WITH_LEN("COLUMN_JSON") }, BUILDER(Create_func_dyncol_json)},
   { { C_STRING_WITH_LEN("COMPRESS") }, BUILDER(Create_func_compress)},
-  { { C_STRING_WITH_LEN("CONCAT") }, BUILDER(Create_func_concat)},
   { { C_STRING_WITH_LEN("CONCAT_WS") }, BUILDER(Create_func_concat_ws)},
   { { C_STRING_WITH_LEN("CONNECTION_ID") }, BUILDER(Create_func_connection_id)},
   { { C_STRING_WITH_LEN("CONV") }, BUILDER(Create_func_conv)},
diff --git a/sql/lex.h b/sql/lex.h
index cd3f3803b80..322c26bdc2c 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -133,7 +133,6 @@ static SYMBOL symbols[] = {
   { "COMPACT",		SYM(COMPACT_SYM)},
   { "COMPLETION",	SYM(COMPLETION_SYM)},
   { "COMPRESSED",	SYM(COMPRESSED_SYM)},
-  { "DEFRAGMENT",       SYM(DEFRAGMENT_SYM)},
   { "CONCURRENT",	SYM(CONCURRENT)},
   { "CONDITION",        SYM(CONDITION_SYM)},
   { "CONNECTION",       SYM(CONNECTION_SYM)},
@@ -679,6 +678,7 @@ static SYMBOL sql_functions[] = {
   { "BIT_OR",		SYM(BIT_OR)},
   { "BIT_XOR",		SYM(BIT_XOR)},
   { "CAST",		SYM(CAST_SYM)},
+  { "CONCAT",           SYM(CONCAT_SYM)},
   { "COUNT",		SYM(COUNT_SYM)},
   { "CURDATE",		SYM(CURDATE)},
   { "CURTIME",		SYM(CURTIME)},
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index c15a3e98110..dff093ec9ba 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -1134,6 +1134,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
 %token  COMPACT_SYM
 %token  COMPLETION_SYM
 %token  COMPRESSED_SYM
+%token  CONCAT_SYM
 %token  CONCURRENT
 %token  CONDITION_SYM                 /* SQL-2003-R, SQL-2008-R */
 %token  CONNECTION_SYM
@@ -1179,7 +1180,6 @@ bool my_yyoverflow(short **a, YYSTYPE **b, ulong *yystacksize);
 %token  DECLARE_SYM                   /* SQL-2003-R */
 %token  DEFAULT                       /* SQL-2003-R */
 %token  DEFINER_SYM
-%token  DEFRAGMENT_SYM                /* MYSQL */
 %token  DELAYED_SYM
 %token  DELAY_KEY_WRITE_SYM
 %token  DELETE_SYM                    /* SQL-2003-R */
@@ -7953,7 +7953,7 @@ binlog_base64_event:
             Lex->comment= $2;
           }
           |
-          BINLOG_SYM DEFRAGMENT_SYM '(' '@' ident_or_text ',' '@' ident_or_text ')'
+          BINLOG_SYM CONCAT_SYM '(' '@' ident_or_text ',' '@' ident_or_text ')'
           {
             Lex->sql_command = SQLCOM_BINLOG_BASE64_EVENT;
             Lex->fragmented_binlog_event.n_frag= 2;
@@ -9645,6 +9645,12 @@ function_call_conflict:
             if ($$ == NULL)
               MYSQL_YYABORT;
           }
+        | CONCAT_SYM '(' expr_list ')'
+          {
+            $$= new (thd->mem_root) Item_func_concat(thd, *$3);
+            if ($$ == NULL)
+              MYSQL_YYABORT;
+          }
         | DATABASE '(' ')'
           {
             $$= new (thd->mem_root) Item_func_database(thd);
@@ -13932,6 +13938,7 @@ keyword:
         | COLUMN_GET_SYM        {}
         | COMMENT_SYM           {}
         | COMMIT_SYM            {}
+        | CONCAT_SYM            {}
         | CONTAINS_SYM          {}
         | DEALLOCATE_SYM        {}
         | DO_SYM                {}
@@ -14047,7 +14054,6 @@ keyword_sp:
         | DATE_SYM                 {}
         | DAY_SYM                  {}
         | DEFINER_SYM              {}
-        | DEFRAGMENT_SYM           {}
         | DELAY_KEY_WRITE_SYM      {}
         | DES_KEY_FILE             {}
         | DIAGNOSTICS_SYM          {}

Follow ups