maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #11575
Re: d282f5c5560: MDEV-10963 Fragmented BINLOG query
Sergei, hallo.
> Hi, Andrei!
>
> Looks better!
There's never a limit to improvements though :-).
Thanks for checking and insightful comments as usual!
I am publishing an updated patch to cover all of them.
To your notes, I am replying and commenting on a few of your questions below.
Could you also please clarify on one point, which is
>> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
>> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
>> + my_bool do_reinit,
>> + size_t count)
>> {
>> - size_t bytes_in_cache;
>> + size_t curr_write, bytes_in_cache;
>> DBUG_ENTER("my_b_copy_to_file");
>>
>> /* Reinit the cache to read from the beginning of the cache */
>> - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>> + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>
> generally, when there's a function that is always called with a
> constant (compile-time) argument, I prefer to split the code
> compile-time too, if it isn't too much trouble. In this case it would
> mean a new function like
>
> int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
> {
> if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
> return 1;
> return my_b_copy_to_file(cache, file, SIZE_T_MAX);
> }
>
why is the preference specifically?
I looked around to find something like
https://en.wikipedia.org/wiki/Compile_time_function_execution
... but I somewhat doubt you meant that. Or did you really?
Now to my acknowledgments and comments...
>
> There are no major problems, but see comments below. There're few
> suggestions how to simplify the code.
>
> On Nov 05, Andrei Elkin wrote:
>> revision-id: d282f5c55609469cd74d7390f70c7d922c778711 (mariadb-10.1.35-93-gd282f5c5560)
>> parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51
>> author: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
>> committer: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
>> timestamp: 2018-10-21 23:42:00 +0300
>> message:
>>
>> MDEV-10963 Fragmented BINLOG query
>>
>> diff --git
>> a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> new file mode 100644
>> index 00000000000..bdf41c94c76
>> --- /dev/null
>> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
>> @@ -0,0 +1,50 @@
>> +--source include/have_debug.inc
>> +--source include/have_log_bin.inc
>> +--source include/have_binlog_format_row.inc
>
> you don't need to include have_log_bin, if you include
> have_binlog_format_row.
ack
>
>> +
>> +--let $MYSQLD_DATADIR= `select @@datadir`
>> +--let $max_size=1024
>> +
>> +CREATE TABLE t (a TEXT);
>> +# events of interest are guaranteed to stay in 000001 log
>> +RESET MASTER;
>> +--eval INSERT INTO t SET a=repeat('a', $max_size)
>
> eh? why did you do it with let/eval instead of a simple sql statement?
> you don't use $max_size anywhere else.
left inadvertently from a previous patch.
>
>> +SELECT a from t into @a;
>> +FLUSH LOGS;
>> +DELETE FROM t;
>> +
>> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256
>> $MYSQLD_DATADIR/master-bin.000001 >
>> $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +
>> +--let $assert_text= BINLOG is fragmented
>> +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1
>> +--let $assert_count= 1
>> +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +--source include/assert_grep.inc
>
> no, please, use search_pattern_in_file.inc instead.
ack
>
>> +
>> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +
>> +SELECT a LIKE @a as 'true' FROM t;
>> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
>
> that makes no sense, @binlog_fragment_0 and _1 were set in a separate
> client session. You cannot test whether they were cleared or not there,
> by looking at the values here
You caught me here! :-).
In the new patch I relocate the check in the other test file.
>
>> +
>> +# improper syntax error
>> +--echo BINLOG number-of-fragments must be exactly two
>> +--error ER_PARSE_ERROR
>> +BINLOG @binlog_fragment;
>> +--error ER_PARSE_ERROR
>> +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment;
>> +
>> +# corrupted fragments error check (to the expected error code notice,
>> +# the same error code occurs in a similar unfragmented case)
>> +SET @binlog_fragment_0='012345';
>> +SET @binlog_fragment_1='012345';
>> +--error ER_SYNTAX_ERROR
>> +BINLOG @binlog_fragment_0, @binlog_fragment_1;
>> +
>> +# Not existing fragment is not allowed
>> +SET @binlog_fragment_0='012345';
>> +--error ER_WRONG_TYPE_FOR_VAR
>> +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist;
>> +
>> +--echo # Cleanup
>> +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
>> +DROP TABLE t;
>> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
>> --- a/mysys/mf_iocache2.c
>> +++ b/mysys/mf_iocache2.c
>> @@ -22,52 +22,53 @@
>> #include <stdarg.h>
>> #include <m_ctype.h>
>>
>> -/*
>> - Copy contents of an IO_CACHE to a file.
>> -
>> - SYNOPSIS
>> - my_b_copy_to_file()
>> - cache IO_CACHE to copy from
>> - file File to copy to
>> -
>> - DESCRIPTION
>> - Copy the contents of the cache to the file. The cache will be
>> - re-inited to a read cache and will read from the beginning of the
>> - cache.
>> -
>> - If a failure to write fully occurs, the cache is only copied
>> - partially.
>> +/**
>> + Copy the cache to the file. Copying can be constrained to @c count
>> + number of bytes when the parameter is less than SIZE_T_MAX. The
>> + cache will be optionally re-inited to a read cache and will read
>> + from the beginning of the cache. If a failure to write fully
>> + occurs, the cache is only copied partially.
>>
>> TODO
>> - Make this function solid by handling partial reads from the cache
>> - in a correct manner: it should be atomic.
>> -
>> - RETURN VALUE
>> - 0 All OK
>> - 1 An error occurred
>> + Make this function solid by handling partial reads from the cache
>> + in a correct manner: it should be atomic.
>> +
>> + @param cache IO_CACHE to copy from
>> + @param file File to copy to
>> + @param do_reinit whether to turn the cache to read mode
>> + @param count the copied size or the max of the type
>> + when the whole cache is to be copied.
>> + @return
>> + 0 All OK
>> + 1 An error occurred
>> */
>> int
>> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
>> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
>> + my_bool do_reinit,
>> + size_t count)
>> {
>> - size_t bytes_in_cache;
>> + size_t curr_write, bytes_in_cache;
>> DBUG_ENTER("my_b_copy_to_file");
>>
>> /* Reinit the cache to read from the beginning of the cache */
>> - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>> + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
>
> generally, when there's a function that is always called with a
> constant (compile-time) argument, I prefer to split the code
> compile-time too, if it isn't too much trouble. In this case it would
> mean a new function like
>
> int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
> {
> if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
> return 1;
> return my_b_copy_to_file(cache, file, SIZE_T_MAX);
> }
>
> and all old code will be changed to use my_b_copy_all_to_file().
> Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and
> your code will use it directly.
>
>> DBUG_RETURN(1);
>> bytes_in_cache= my_b_bytes_in_cache(cache);
>> do
>> {
>> - if (my_fwrite(file, cache->read_pos, bytes_in_cache,
>> + curr_write= MY_MIN(bytes_in_cache, count);
>> + if (my_fwrite(file, cache->read_pos, curr_write,
>> MYF(MY_WME | MY_NABP)) == (size_t) -1)
>> DBUG_RETURN(1);
>> - } while ((bytes_in_cache= my_b_fill(cache)));
>> +
>> + cache->read_pos += curr_write;
>> + count -= curr_write;
>> + } while (count && (bytes_in_cache= my_b_fill(cache)));
>> if(cache->error == -1)
>> DBUG_RETURN(1);
>> DBUG_RETURN(0);
>> }
>>
>> -
>> my_off_t my_b_append_tell(IO_CACHE* info)
>> {
>> /*
>> diff --git a/sql/log_event.cc b/sql/log_event.cc
>> index e07b7002398..aeca794f0cd 100644
>> --- a/sql/log_event.cc
>> +++ b/sql/log_event.cc
>> @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol)
>> #endif
>>
>> #ifdef MYSQL_CLIENT
>> +/**
>> + Print an event "body" cache to @c file possibly in multiple fragements.
>> + Each fragement is optionally per @c do_wrap to procude an SQL statement.
>> +
>> + @param file a file to print to
>> + @param body the "body" IO_CACHE of event
>> + @param do_wrap whether to wrap base64-encoded strings with
>> + SQL cover.
>> + The function signals on any error through setting @c body->error to -1.
>> +*/
>> +void copy_cache_to_file_wrapped(FILE *file,
>> + IO_CACHE *body,
>> + bool do_wrap,
>> + const char *delimiter)
>> +{
>> + uint n_frag= 1;
>> + const char* before_frag= NULL;
>> + char* after_frag= NULL;
>> + char* after_last= NULL;
>> + /*
>> + 2 fragments can always represent near 1GB row-based
>> + base64-encoded event as two strings each of size less than
>> + max(max_allowed_packet). Greater number of fragments does not
>> + save from potential need to tweak (increase) @@max_allowed_packet
>> + before to process the fragments. So 2 is safe and enough.
>> + */
>> + const char fmt_last_frag2[]=
>> + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n";
>> + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d ='\n";
>
> this ONE_SHOT is confusing, even if in a comment. Better not to do it :)
ack
>
>> + /*
>> + Buffer to hold computed formatted strings according to specifiers.
>> + The sizes may depend on an actual fragment number size in terms of decimal
>> + signs so its maximum is estimated (not precisely yet safely) below.
>> + */
>> + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2)
>> + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal index
>> + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n and 0
>
> sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean
>
> sizeof(PRINT_EVENT_INFO::delimiter)
>
> or simply
>
> PRINT_EVENT_INFO::max_delimiter_len
>
> without sizeof?
Indeed!
>
>> +
>> + if (do_wrap)
>> + {
>> + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
>> + sprintf(after_frag, "'%s\n", delimiter);
>> + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size)
>> + n_frag= 2;
>> + if (n_frag > 1)
>> + {
>> + before_frag= fmt_before_frag;
>> + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
>> + sprintf(after_last, fmt_last_frag2, (char*) delimiter);
>> + }
>> + else
>> + {
>> + before_frag= "\nBINLOG '\n"; // single "fragment"
>> + }
>> + }
>> +
>> + size_t total_size= my_b_tell(body), total_written= 0;
>> + size_t frag_size= total_size / n_frag + 1, curr_size;
>> +
>> + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE))
>> + {
>> + body->error= -1;
>> + goto err;
>> + }
>> +
>> + for (uint i= 0; i < n_frag; i++, total_written += curr_size)
>> + {
>> + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written;
>> +
>> + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size);
>> +
>> + if (before_frag)
>> + {
>> + sprintf(buf, before_frag, i);
>> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> + }
>> + if (my_b_copy_to_file(body, file, FALSE, curr_size))
>> + {
>> + body->error= -1;
>> + goto err;
>> + }
>> + if (after_frag)
>> + {
>> + sprintf(buf, after_frag, NULL);
>> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> + }
>> + }
>
> Hmm, dunno. I suspect you can do it three times shorter and five times
> easier to read if you wouldn't try to generalize it for a arbitrary
> number of fragments with arbitrary prefixes and suffixes. Just
>
> if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin)
> {
> my_fprintf(file, "BINLOG '");
> my_b_copy_to_file(body, file);
> my_fprintf(file, "'%s\n", delimiter);
> }
> else
> {
> my_fprintf(file, "SET @binlog_fragment_0='");
> my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size);
> my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter);
> my_b_copy_to_file(body, file, SIZE_T_MAX);
> my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n",
> delimiter, delimiter);
> }
>
> See?
Very true. I did not do consider this simple method just "historically"
having it all started with an idea of arbitrary number of fragments.
As result now no loop in the new patch.
>
>> +
>> + if (after_last)
>> + {
>> + sprintf(buf, after_last, n_frag);
>> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
>> + }
>> + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE);
>> +
>> +err:
>> + my_free(after_frag);
>> + my_free(after_last);
>> +}
>> +
>> +/**
>> + The function invokes base64 encoder to run on the current
>> + event string and store the result into two caches.
>> + When the event ends the current statement the caches are is copied into
>> + the argument file.
>> + Copying is also concerned how to wrap the event, specifically to produce
>> + a valid SQL syntax.
>> + When the encoded data size is within max(MAX_ALLOWED_PACKET)
>> + a regular BINLOG query is composed. Otherwise it is build as fragmented
>> +
>> + SET @binlog_fragment_0='...';
>> + SET @binlog_fragment_1='...';
>> + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
>> +
>> + where fragments are represented by a sequence of "indexed" user
>> + variables.
>> + Two more statements are composed as well
>> +
>> + SET @binlog_fragment_0=NULL;
>> + SET @binlog_fragment_1=NULL;
>> +
>> + to promptly release memory.
>
> No, they aren't
ack
>
>> +
>> + NOTE.
>
> @note
>
>> + If any changes made don't forget to duplicate them to
>> + Old_rows_log_event as long as it's supported.
>> +
>> + @param file pointer to IO_CACHE
>> + @param print_event_info pointer to print_event_info specializing
>> + what out of and how to print the event
>> + @param name the name of a table that the event operates on
>> +
>> + The function signals on any error of cache access through setting
>> + that cache's @c error to -1.
>> +*/
>> void Rows_log_event::print_helper(FILE *file,
>> PRINT_EVENT_INFO *print_event_info,
>> char const *const name)
>> {
>> IO_CACHE *const head= &print_event_info->head_cache;
>> IO_CACHE *const body= &print_event_info->body_cache;
>> + bool do_print_encoded=
>> + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
>> + !print_event_info->short_form;
>> +
>> if (!print_event_info->short_form)
>> {
>> bool const last_stmt_event= get_flags(STMT_END_F);
>> diff --git a/sql/log_event.h b/sql/log_event.h
>> index 90900f63533..28277e659d2 100644
>> --- a/sql/log_event.h
>> +++ b/sql/log_event.h
>> @@ -749,6 +749,7 @@ typedef struct st_print_event_info
>> that was printed. We cache these so that we don't have to print
>> them if they are unchanged.
>> */
>> + static const uint max_delimiter_len= 16;
>
> why did you introduce this max_delimiter_len, if all you use
> is sizeof(delimiter) anyway? (and even that is not needed)
I use the new introduced constant in the new patch which is renamed
though to hint at its 'size' rather than 'length' semantics.
>
>> // TODO: have the last catalog here ??
>> char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
>> bool flags2_inited;
>> @@ -798,7 +799,7 @@ typedef struct st_print_event_info
>> bool printed_fd_event;
>> my_off_t hexdump_from;
>> uint8 common_header_len;
>> - char delimiter[16];
>> + char delimiter[max_delimiter_len];
>>
>> uint verbose;
>> table_mapping m_table_map;
>> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
>> index 91cf038907e..b4e3342d8f3 100644
>> --- a/sql/sql_binlog.cc
>> +++ b/sql/sql_binlog.cc
>> @@ -28,6 +28,70 @@
>> // START_EVENT_V3,
>> // Log_event_type,
>> // Log_event
>> +
>> +/**
>> + Copy fragments into the standard placeholder thd->lex->comment.str.
>> +
>> + Compute the size of the (still) encoded total,
>> + allocate and then copy fragments one after another.
>> + The size can exceed max(max_allowed_packet) which is not a
>> + problem as no String instance is created off this char array.
>> +
>> + @param thd THD handle
>> + @return
>> + 0 at success,
>> + -1 otherwise.
>> +*/
>> +int binlog_defragment(THD *thd)
>> +{
>> + user_var_entry *entry[2];
>> + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident };
>> +
>> + /* compute the total size */
>> + thd->lex->comment.str= NULL;
>> + thd->lex->comment.length= 0;
>> + for (uint k= 0; k < 2; k++)
>> + {
>> + entry[k]=
>> + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
>> + name[k].length);
>> + if (!entry[k] || entry[k]->type != STRING_RESULT)
>> + {
>> + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
>> + return -1;
>> + }
>> + thd->lex->comment.length += entry[k]->length;
>> + }
>> +
>> + thd->lex->comment.str= // to be freed by the caller
>> + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
>> + if (!thd->lex->comment.str)
>> + {
>> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
>> + return -1;
>> + }
>> +
>> + /* fragments are merged into allocated buf while the user var:s get reset */
>> + size_t gathered_length= 0;
>> + for (uint k=0; k < 2; k++)
>> + {
>> + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, entry[k]->length);
>> + gathered_length += entry[k]->length;
>> + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0))
>> + {
>> + my_printf_error(ER_WRONG_TYPE_FOR_VAR,
>> + "%s: BINLOG fragment user "
>> + "variable '%s' could not be unset", MYF(0),
>> + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value);
>> + }
>
> I don't see how update_hash(entry[k], true, ...) can ever fail, so
> there's no need to pretend that it can.
Right. I did not look into its branches. The 'true' one does not error
out.
>
>> + }
>> +
>> + DBUG_ASSERT(gathered_length == thd->lex->comment.length);
>> +
>> + return 0;
>> +}
>> +
>> +
>> /**
>> Execute a BINLOG statement.
>>
>> @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd)
>> rli->sql_driver_thd= thd;
>> rli->no_storage= TRUE;
>>
>> + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
>> + if (binlog_defragment(thd))
>> + goto end;
>> +
>> + if (!(coded_len= thd->lex->comment.length))
>> + {
>> + my_error(ER_SYNTAX_ERROR, MYF(0));
>> + goto end;
>> + }
>> +
>> + decoded_len= base64_needed_decoded_length(coded_len);
>> + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME))))
>> + {
>> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
>> + goto end;
>> + }
>> +
>
> Technically, it should be possible to decode base64 in-place and avoid
> allocating a second 3GB buffer. But let's not do it in this MDEV :)
To my defenses I only can repeat the no-limit argument :-).
Huge bunch of thanks!
Andrei
>
>> for (char const *strptr= thd->lex->comment.str ;
>> strptr < thd->lex->comment.str + thd->lex->comment.length ; )
>> {
>
> Regards,
> Sergei
> Chief Architect MariaDB
> and security@xxxxxxxxxxx
>
> _______________________________________________
> Mailing list: https://launchpad.net/~maria-developers
> Post to : maria-developers@xxxxxxxxxxxxxxxxxxx
> Unsubscribe : https://launchpad.net/~maria-developers
> More help : https://help.launchpad.net/ListHelp
References