← Back to team overview

maria-developers team mailing list archive

Re: d282f5c5560: MDEV-10963 Fragmented BINLOG query

 

Sergei, hallo.

> Hi, Andrei!
>
> Looks better!

There's never a limit to improvements though :-).
Thanks for checking and insightful comments as usual!

I am publishing an updated patch to cover all of them.
To your notes, I am replying and commenting on a few of your questions below.

Could you also please clarify on one point, which is

   >> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
   >> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
   >> +                  my_bool do_reinit,
   >> +                  size_t count)
   >>  {
   >> -  size_t bytes_in_cache;
   >> +  size_t curr_write, bytes_in_cache;
   >>    DBUG_ENTER("my_b_copy_to_file");
   >>  
   >>    /* Reinit the cache to read from the beginning of the cache */
   >> -  if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
   >> +  if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
   >
   > generally, when there's a function that is always called with a
   > constant (compile-time) argument, I prefer to split the code
   > compile-time too, if it isn't too much trouble. In this case it would
   > mean a new function like
   >
   >   int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
   >   {
   >     if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
   >       return 1;
   >     return my_b_copy_to_file(cache, file, SIZE_T_MAX);
   >   }
   >

why is the preference specifically?
I looked around to find something like
 https://en.wikipedia.org/wiki/Compile_time_function_execution
... but I somewhat doubt you meant that. Or did you really?


Now to my acknowledgments and comments...

>
> There are no major problems, but see comments below. There're few
> suggestions how to simplify the code.
>
> On Nov 05, Andrei Elkin wrote:
>> revision-id: d282f5c55609469cd74d7390f70c7d922c778711 (mariadb-10.1.35-93-gd282f5c5560)
>> parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51
>> author: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
>> committer: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
>> timestamp: 2018-10-21 23:42:00 +0300
>> message:
>> 
>> MDEV-10963 Fragmented BINLOG query
>> 
>> diff --git
>> a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> new file mode 100644
>> index 00000000000..bdf41c94c76
>> --- /dev/null
>> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> @@ -0,0 +1,50 @@
>> +--source include/have_debug.inc
>> +--source include/have_log_bin.inc
>> +--source include/have_binlog_format_row.inc
>
> you don't need to include have_log_bin, if you include
> have_binlog_format_row.

ack

>
>> +
>> +--let $MYSQLD_DATADIR= `select @@datadir`
>> +--let $max_size=1024
>> +
>> +CREATE TABLE t (a TEXT);
>> +# events of interest are guaranteed to stay in 000001 log
>> +RESET MASTER;
>> +--eval INSERT INTO t SET a=repeat('a', $max_size)
>
> eh? why did you do it with let/eval instead of a simple sql statement?
> you don't use $max_size anywhere else.

left inadvertently from a previous patch.

>
>> +SELECT a from t into @a;
>> +FLUSH LOGS;
>> +DELETE FROM t;
>> +
>> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256
>> $MYSQLD_DATADIR/master-bin.000001 >
>> $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +
>> +--let $assert_text= BINLOG is fragmented
>> +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1
>> +--let $assert_count= 1
>> +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +--source include/assert_grep.inc
>
> no, please, use search_pattern_in_file.inc instead.

ack

>
>> +
>> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +
>> +SELECT a LIKE @a as 'true' FROM t;
>> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
>
> that makes no sense, @binlog_fragment_0 and _1 were set in a separate
> client session. You cannot test whether they were cleared or not there,
> by looking at the values here

You caught me here! :-).
In the new patch I relocate the check in the other test file.

>
>> +
>> +# improper syntax error
>> +--echo BINLOG number-of-fragments must be exactly two
>> +--error ER_PARSE_ERROR
>> +BINLOG @binlog_fragment;
>> +--error ER_PARSE_ERROR
>> +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment;
>> +
>> +# corrupted fragments error check (to the expected error code notice,
>> +# the same error code occurs in a similar unfragmented case)
>> +SET @binlog_fragment_0='012345';
>> +SET @binlog_fragment_1='012345';
>> +--error ER_SYNTAX_ERROR
>> +BINLOG @binlog_fragment_0, @binlog_fragment_1;
>> +
>> +# Not existing fragment is not allowed
>> +SET @binlog_fragment_0='012345';
>> +--error ER_WRONG_TYPE_FOR_VAR
>> +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist;
>> +
>> +--echo # Cleanup
>> +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +DROP TABLE t;
>> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
>> --- a/mysys/mf_iocache2.c
>> +++ b/mysys/mf_iocache2.c
>> @@ -22,52 +22,53 @@
>>  #include <stdarg.h>
>>  #include <m_ctype.h>
>>  
>> -/*
>> -  Copy contents of an IO_CACHE to a file.
>> -
>> -  SYNOPSIS
>> -    my_b_copy_to_file()
>> -    cache  IO_CACHE to copy from
>> -    file   File to copy to
>> -
>> -  DESCRIPTION
>> -    Copy the contents of the cache to the file. The cache will be
>> -    re-inited to a read cache and will read from the beginning of the
>> -    cache.
>> -
>> -    If a failure to write fully occurs, the cache is only copied
>> -    partially.
>> +/**
>> +  Copy the cache to the file. Copying can be constrained to @c count
>> +  number of bytes when the parameter is less than SIZE_T_MAX.  The
>> +  cache will be optionally re-inited to a read cache and will read
>> +  from the beginning of the cache.  If a failure to write fully
>> +  occurs, the cache is only copied partially.
>>  
>>    TODO
>> -    Make this function solid by handling partial reads from the cache
>> -    in a correct manner: it should be atomic.
>> -
>> -  RETURN VALUE
>> -    0  All OK
>> -    1  An error occurred
>> +  Make this function solid by handling partial reads from the cache
>> +  in a correct manner: it should be atomic.
>> +
>> +  @param cache      IO_CACHE to copy from
>> +  @param file       File to copy to
>> +  @param do_reinit  whether to turn the cache to read mode
>> +  @param count      the copied size or the max of the type
>> +                    when the whole cache is to be copied.
>> +  @return
>> +         0          All OK
>> +         1          An error occurred
>>  */
>>  int
>> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
>> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
>> +                  my_bool do_reinit,
>> +                  size_t count)
>>  {
>> -  size_t bytes_in_cache;
>> +  size_t curr_write, bytes_in_cache;
>>    DBUG_ENTER("my_b_copy_to_file");
>>  
>>    /* Reinit the cache to read from the beginning of the cache */
>> -  if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>> +  if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>
> generally, when there's a function that is always called with a
> constant (compile-time) argument, I prefer to split the code
> compile-time too, if it isn't too much trouble. In this case it would
> mean a new function like
>
>   int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
>   {
>     if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
>       return 1;
>     return my_b_copy_to_file(cache, file, SIZE_T_MAX);
>   }
>
> and all old code will be changed to use my_b_copy_all_to_file().
> Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and
> your code will use it directly.
>
>>      DBUG_RETURN(1);
>>    bytes_in_cache= my_b_bytes_in_cache(cache);
>>    do
>>    {
>> -    if (my_fwrite(file, cache->read_pos, bytes_in_cache,
>> +    curr_write= MY_MIN(bytes_in_cache, count);
>> +    if (my_fwrite(file, cache->read_pos, curr_write,
>>                    MYF(MY_WME | MY_NABP)) == (size_t) -1)
>>        DBUG_RETURN(1);
>> -  } while ((bytes_in_cache= my_b_fill(cache)));
>> +
>> +    cache->read_pos += curr_write;
>> +    count -= curr_write;
>> +  } while (count && (bytes_in_cache= my_b_fill(cache)));
>>    if(cache->error == -1)
>>      DBUG_RETURN(1);
>>    DBUG_RETURN(0);
>>  }
>>  
>> -
>>  my_off_t my_b_append_tell(IO_CACHE* info)
>>  {
>>    /*
>> diff --git a/sql/log_event.cc b/sql/log_event.cc
>> index e07b7002398..aeca794f0cd 100644
>> --- a/sql/log_event.cc
>> +++ b/sql/log_event.cc
>> @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol)
>>  #endif
>>  
>>  #ifdef MYSQL_CLIENT
>> +/**
>> +  Print an event "body" cache to @c file possibly in multiple fragements.
>> +  Each fragement is optionally per @c do_wrap to procude an SQL statement.
>> +
>> +  @param file      a file to print to
>> +  @param body      the "body" IO_CACHE of event
>> +  @param do_wrap   whether to wrap base64-encoded strings with
>> +                   SQL cover.
>> +  The function signals on any error through setting @c body->error to -1.
>> +*/
>> +void copy_cache_to_file_wrapped(FILE *file,
>> +                                IO_CACHE *body,
>> +                                bool do_wrap,
>> +                                const char *delimiter)
>> +{
>> +  uint n_frag= 1;
>> +  const char* before_frag= NULL;
>> +  char* after_frag= NULL;
>> +  char* after_last= NULL;
>> +  /*
>> +    2 fragments can always represent near 1GB row-based
>> +    base64-encoded event as two strings each of size less than
>> +    max(max_allowed_packet).  Greater number of fragments does not
>> +    save from potential need to tweak (increase) @@max_allowed_packet
>> +    before to process the fragments. So 2 is safe and enough.
>> +  */
>> +  const char fmt_last_frag2[]=
>> +    "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n";
>> +  const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d ='\n";
>
> this ONE_SHOT is confusing, even if in a comment. Better not to do it :)

ack

>
>> +  /*
>> +    Buffer to hold computed formatted strings according to specifiers.
>> +    The sizes may depend on an actual fragment number size in terms of decimal
>> +    signs so its maximum is estimated (not precisely yet safely) below.
>> +  */
>> +  char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2)
>> +           + ((sizeof(n_frag) * 8)/3 + 1)                // max of decimal index
>> +           + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n and 0
>
> sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean
>
>   sizeof(PRINT_EVENT_INFO::delimiter)
>

> or simply
>
>   PRINT_EVENT_INFO::max_delimiter_len
>
> without sizeof?

Indeed!

>
>> +
>> +  if (do_wrap)
>> +  {
>> +    after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
>> +    sprintf(after_frag, "'%s\n", delimiter);
>> +    if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size)
>> +      n_frag= 2;
>> +    if (n_frag > 1)
>> +    {
>> +      before_frag= fmt_before_frag;
>> +      after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
>> +      sprintf(after_last, fmt_last_frag2, (char*) delimiter);
>> +    }
>> +    else
>> +    {
>> +      before_frag= "\nBINLOG '\n"; // single "fragment"
>> +    }
>> +  }
>> +
>> +  size_t total_size= my_b_tell(body), total_written= 0;
>> +  size_t frag_size= total_size / n_frag + 1, curr_size;
>> +
>> +  if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE))
>> +  {
>> +    body->error= -1;
>> +    goto err;
>> +  }
>> +
>> +  for (uint i= 0; i < n_frag; i++, total_written += curr_size)
>> +  {
>> +    curr_size= i < n_frag - 1 ? frag_size : total_size - total_written;
>> +
>> +    DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size);
>> +
>> +    if (before_frag)
>> +    {
>> +      sprintf(buf, before_frag, i);
>> +      my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> +    }
>> +    if (my_b_copy_to_file(body, file, FALSE, curr_size))
>> +    {
>> +      body->error= -1;
>> +      goto err;
>> +    }
>> +    if (after_frag)
>> +    {
>> +      sprintf(buf, after_frag, NULL);
>> +      my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> +    }
>> +  }
>
> Hmm, dunno. I suspect you can do it three times shorter and five times
> easier to read if you wouldn't try to generalize it for a arbitrary
> number of fragments with arbitrary prefixes and suffixes. Just
>
>   if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin)
>   {
>     my_fprintf(file, "BINLOG '");
>     my_b_copy_to_file(body, file);
>     my_fprintf(file, "'%s\n", delimiter);
>   }
>   else
>   {
>     my_fprintf(file, "SET @binlog_fragment_0='");
>     my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size);
>     my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter);
>     my_b_copy_to_file(body, file, SIZE_T_MAX);
>     my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n",
>                delimiter, delimiter);
>   }
>
> See?

Very true. I did not do consider this simple method just "historically"
having it all started with an idea of arbitrary number of fragments.

As result now no loop in the new patch.

>
>> +
>> +  if (after_last)
>> +  {
>> +    sprintf(buf, after_last, n_frag);
>> +    my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> +  }
>> +  reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE);
>> +
>> +err:
>> +  my_free(after_frag);
>> +  my_free(after_last);
>> +}
>> +
>> +/**
>> +  The function invokes base64 encoder to run on the current
>> +  event string and store the result into two caches.
>> +  When the event ends the current statement the caches are is copied into
>> +  the argument file.
>> +  Copying is also concerned how to wrap the event, specifically to produce
>> +  a valid SQL syntax.
>> +  When the encoded data size is within max(MAX_ALLOWED_PACKET)
>> +  a regular BINLOG query is composed. Otherwise it is build as fragmented
>> +
>> +    SET @binlog_fragment_0='...';
>> +    SET @binlog_fragment_1='...';
>> +    BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
>> +
>> +  where fragments are represented by a sequence of "indexed" user
>> +  variables.
>> +  Two more statements are composed as well
>> +
>> +    SET @binlog_fragment_0=NULL;
>> +    SET @binlog_fragment_1=NULL;
>> +
>> +  to promptly release memory.
>
> No, they aren't

ack

>
>> +
>> +  NOTE.
>
> @note
>
>> +  If any changes made don't forget to duplicate them to
>> +  Old_rows_log_event as long as it's supported.
>> +
>> +  @param file               pointer to IO_CACHE
>> +  @param print_event_info   pointer to print_event_info specializing
>> +                            what out of and how to print the event
>> +  @param name               the name of a table that the event operates on
>> +
>> +  The function signals on any error of cache access through setting
>> +  that cache's @c error to -1.
>> +*/
>>  void Rows_log_event::print_helper(FILE *file,
>>                                    PRINT_EVENT_INFO *print_event_info,
>>                                    char const *const name)
>>  {
>>    IO_CACHE *const head= &print_event_info->head_cache;
>>    IO_CACHE *const body= &print_event_info->body_cache;
>> +  bool do_print_encoded=
>> +    print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
>> +    !print_event_info->short_form;
>> +
>>    if (!print_event_info->short_form)
>>    {
>>      bool const last_stmt_event= get_flags(STMT_END_F);
>> diff --git a/sql/log_event.h b/sql/log_event.h
>> index 90900f63533..28277e659d2 100644
>> --- a/sql/log_event.h
>> +++ b/sql/log_event.h
>> @@ -749,6 +749,7 @@ typedef struct st_print_event_info
>>      that was printed.  We cache these so that we don't have to print
>>      them if they are unchanged.
>>    */
>> +  static const uint max_delimiter_len= 16;
>
> why did you introduce this max_delimiter_len, if all you use
> is sizeof(delimiter) anyway? (and even that is not needed)

I use the new introduced constant in the new patch which is renamed
though to hint at its 'size' rather than 'length' semantics.

>
>>    // TODO: have the last catalog here ??
>>    char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
>>    bool flags2_inited;
>> @@ -798,7 +799,7 @@ typedef struct st_print_event_info
>>    bool printed_fd_event;
>>    my_off_t hexdump_from;
>>    uint8 common_header_len;
>> -  char delimiter[16];
>> +  char delimiter[max_delimiter_len];
>>  
>>    uint verbose;
>>    table_mapping m_table_map;
>> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
>> index 91cf038907e..b4e3342d8f3 100644
>> --- a/sql/sql_binlog.cc
>> +++ b/sql/sql_binlog.cc
>> @@ -28,6 +28,70 @@
>>                                                  // START_EVENT_V3,
>>                                                  // Log_event_type,
>>                                                  // Log_event
>> +
>> +/**
>> +  Copy fragments into the standard placeholder thd->lex->comment.str.
>> +
>> +  Compute the size of the (still) encoded total,
>> +  allocate and then copy fragments one after another.
>> +  The size can exceed max(max_allowed_packet) which is not a
>> +  problem as no String instance is created off this char array.
>> +
>> +  @param thd  THD handle
>> +  @return
>> +     0        at success,
>> +    -1        otherwise.
>> +*/
>> +int binlog_defragment(THD *thd)
>> +{
>> +  user_var_entry *entry[2];
>> +  LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident };
>> +
>> +  /* compute the total size */
>> +  thd->lex->comment.str= NULL;
>> +  thd->lex->comment.length= 0;
>> +  for (uint k= 0; k < 2; k++)
>> +  {
>> +    entry[k]=
>> +      (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
>> +                                       name[k].length);
>> +    if (!entry[k] || entry[k]->type != STRING_RESULT)
>> +    {
>> +      my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
>> +      return -1;
>> +    }
>> +    thd->lex->comment.length += entry[k]->length;
>> +  }
>> +
>> +  thd->lex->comment.str=                            // to be freed by the caller
>> +    (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
>> +  if (!thd->lex->comment.str)
>> +  {
>> +    my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
>> +    return -1;
>> +  }
>> +
>> +  /* fragments are merged into allocated buf while the user var:s get reset */
>> +  size_t gathered_length= 0;
>> +  for (uint k=0; k < 2; k++)
>> +  {
>> +    memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, entry[k]->length);
>> +    gathered_length += entry[k]->length;
>> +    if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0))
>> +    {
>> +      my_printf_error(ER_WRONG_TYPE_FOR_VAR,
>> +                      "%s: BINLOG fragment user "
>> +                      "variable '%s' could not be unset", MYF(0),
>> +                      ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value);
>> +    }
>
> I don't see how update_hash(entry[k], true, ...) can ever fail, so
> there's no need to pretend that it can.

Right. I did not look into its branches. The 'true' one does not error
out.

>
>> +  }
>> +
>> +  DBUG_ASSERT(gathered_length == thd->lex->comment.length);
>> +
>> +  return 0;
>> +}
>> +
>> +
>>  /**
>>    Execute a BINLOG statement.
>>  
>> @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd)
>>    rli->sql_driver_thd= thd;
>>    rli->no_storage= TRUE;
>>  
>> +  if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
>> +    if (binlog_defragment(thd))
>> +      goto end;
>> +
>> +  if (!(coded_len= thd->lex->comment.length))
>> +  {
>> +    my_error(ER_SYNTAX_ERROR, MYF(0));
>> +    goto end;
>> +  }
>> +
>> +  decoded_len= base64_needed_decoded_length(coded_len);
>> +  if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME))))
>> +  {
>> +    my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
>> +    goto end;
>> +  }
>> +
>
> Technically, it should be possible to decode base64 in-place and avoid
> allocating a second 3GB buffer. But let's not do it in this MDEV :)

To my defenses I only can repeat the no-limit argument :-).

Huge bunch of thanks!

Andrei

>
>>    for (char const *strptr= thd->lex->comment.str ;
>>         strptr < thd->lex->comment.str + thd->lex->comment.length ; )
>>    {
>
> Regards,
> Sergei
> Chief Architect MariaDB
> and security@xxxxxxxxxxx
>
> _______________________________________________
> Mailing list: https://launchpad.net/~maria-developers
> Post to     : maria-developers@xxxxxxxxxxxxxxxxxxx
> Unsubscribe : https://launchpad.net/~maria-developers
> More help   : https://help.launchpad.net/ListHelp


References