maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #11500
Re: d282f5c5560: MDEV-10963 Fragmented BINLOG query
Hi, Andrei!
Looks better!
There are no major problems, but see comments below. There're few
suggestions how to simplify the code.
On Nov 05, Andrei Elkin wrote:
> revision-id: d282f5c55609469cd74d7390f70c7d922c778711 (mariadb-10.1.35-93-gd282f5c5560)
> parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51
> author: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
> committer: Andrei Elkin <andrei.elkin@xxxxxxxxxxx>
> timestamp: 2018-10-21 23:42:00 +0300
> message:
>
> MDEV-10963 Fragmented BINLOG query
>
> diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> new file mode 100644
> index 00000000000..bdf41c94c76
> --- /dev/null
> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> @@ -0,0 +1,50 @@
> +--source include/have_debug.inc
> +--source include/have_log_bin.inc
> +--source include/have_binlog_format_row.inc
you don't need to include have_log_bin, if you include
have_binlog_format_row.
> +
> +--let $MYSQLD_DATADIR= `select @@datadir`
> +--let $max_size=1024
> +
> +CREATE TABLE t (a TEXT);
> +# events of interest are guaranteed to stay in 000001 log
> +RESET MASTER;
> +--eval INSERT INTO t SET a=repeat('a', $max_size)
eh? why did you do it with let/eval instead of a simple sql statement?
you don't use $max_size anywhere else.
> +SELECT a from t into @a;
> +FLUSH LOGS;
> +DELETE FROM t;
> +
> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +
> +--let $assert_text= BINLOG is fragmented
> +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1
> +--let $assert_count= 1
> +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +--source include/assert_grep.inc
no, please, use search_pattern_in_file.inc instead.
> +
> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +
> +SELECT a LIKE @a as 'true' FROM t;
> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';
that makes no sense, @binlog_fragment_0 and _1 were set in a separate
client session. You cannot test whether they were cleared or not there,
by looking at the values here
> +
> +# improper syntax error
> +--echo BINLOG number-of-fragments must be exactly two
> +--error ER_PARSE_ERROR
> +BINLOG @binlog_fragment;
> +--error ER_PARSE_ERROR
> +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment;
> +
> +# corrupted fragments error check (to the expected error code notice,
> +# the same error code occurs in a similar unfragmented case)
> +SET @binlog_fragment_0='012345';
> +SET @binlog_fragment_1='012345';
> +--error ER_SYNTAX_ERROR
> +BINLOG @binlog_fragment_0, @binlog_fragment_1;
> +
> +# Not existing fragment is not allowed
> +SET @binlog_fragment_0='012345';
> +--error ER_WRONG_TYPE_FOR_VAR
> +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist;
> +
> +--echo # Cleanup
> +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +DROP TABLE t;
> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
> --- a/mysys/mf_iocache2.c
> +++ b/mysys/mf_iocache2.c
> @@ -22,52 +22,53 @@
> #include <stdarg.h>
> #include <m_ctype.h>
>
> -/*
> - Copy contents of an IO_CACHE to a file.
> -
> - SYNOPSIS
> - my_b_copy_to_file()
> - cache IO_CACHE to copy from
> - file File to copy to
> -
> - DESCRIPTION
> - Copy the contents of the cache to the file. The cache will be
> - re-inited to a read cache and will read from the beginning of the
> - cache.
> -
> - If a failure to write fully occurs, the cache is only copied
> - partially.
> +/**
> + Copy the cache to the file. Copying can be constrained to @c count
> + number of bytes when the parameter is less than SIZE_T_MAX. The
> + cache will be optionally re-inited to a read cache and will read
> + from the beginning of the cache. If a failure to write fully
> + occurs, the cache is only copied partially.
>
> TODO
> - Make this function solid by handling partial reads from the cache
> - in a correct manner: it should be atomic.
> -
> - RETURN VALUE
> - 0 All OK
> - 1 An error occurred
> + Make this function solid by handling partial reads from the cache
> + in a correct manner: it should be atomic.
> +
> + @param cache IO_CACHE to copy from
> + @param file File to copy to
> + @param do_reinit whether to turn the cache to read mode
> + @param count the copied size or the max of the type
> + when the whole cache is to be copied.
> + @return
> + 0 All OK
> + 1 An error occurred
> */
> int
> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
> + my_bool do_reinit,
> + size_t count)
> {
> - size_t bytes_in_cache;
> + size_t curr_write, bytes_in_cache;
> DBUG_ENTER("my_b_copy_to_file");
>
> /* Reinit the cache to read from the beginning of the cache */
> - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
> + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
generally, when there's a function that is always called with a
constant (compile-time) argument, I prefer to split the code
compile-time too, if it isn't too much trouble. In this case it would
mean a new function like
int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
{
if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
return 1;
return my_b_copy_to_file(cache, file, SIZE_T_MAX);
}
and all old code will be changed to use my_b_copy_all_to_file().
Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and
your code will use it directly.
> DBUG_RETURN(1);
> bytes_in_cache= my_b_bytes_in_cache(cache);
> do
> {
> - if (my_fwrite(file, cache->read_pos, bytes_in_cache,
> + curr_write= MY_MIN(bytes_in_cache, count);
> + if (my_fwrite(file, cache->read_pos, curr_write,
> MYF(MY_WME | MY_NABP)) == (size_t) -1)
> DBUG_RETURN(1);
> - } while ((bytes_in_cache= my_b_fill(cache)));
> +
> + cache->read_pos += curr_write;
> + count -= curr_write;
> + } while (count && (bytes_in_cache= my_b_fill(cache)));
> if(cache->error == -1)
> DBUG_RETURN(1);
> DBUG_RETURN(0);
> }
>
> -
> my_off_t my_b_append_tell(IO_CACHE* info)
> {
> /*
> diff --git a/sql/log_event.cc b/sql/log_event.cc
> index e07b7002398..aeca794f0cd 100644
> --- a/sql/log_event.cc
> +++ b/sql/log_event.cc
> @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol)
> #endif
>
> #ifdef MYSQL_CLIENT
> +/**
> + Print an event "body" cache to @c file possibly in multiple fragements.
> + Each fragement is optionally per @c do_wrap to procude an SQL statement.
> +
> + @param file a file to print to
> + @param body the "body" IO_CACHE of event
> + @param do_wrap whether to wrap base64-encoded strings with
> + SQL cover.
> + The function signals on any error through setting @c body->error to -1.
> +*/
> +void copy_cache_to_file_wrapped(FILE *file,
> + IO_CACHE *body,
> + bool do_wrap,
> + const char *delimiter)
> +{
> + uint n_frag= 1;
> + const char* before_frag= NULL;
> + char* after_frag= NULL;
> + char* after_last= NULL;
> + /*
> + 2 fragments can always represent near 1GB row-based
> + base64-encoded event as two strings each of size less than
> + max(max_allowed_packet). Greater number of fragments does not
> + save from potential need to tweak (increase) @@max_allowed_packet
> + before to process the fragments. So 2 is safe and enough.
> + */
> + const char fmt_last_frag2[]=
> + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n";
> + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d ='\n";
this ONE_SHOT is confusing, even if in a comment. Better not to do it :)
> + /*
> + Buffer to hold computed formatted strings according to specifiers.
> + The sizes may depend on an actual fragment number size in terms of decimal
> + signs so its maximum is estimated (not precisely yet safely) below.
> + */
> + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2)
> + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal index
> + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n and 0
sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean
sizeof(PRINT_EVENT_INFO::delimiter)
or simply
PRINT_EVENT_INFO::max_delimiter_len
without sizeof?
> +
> + if (do_wrap)
> + {
> + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> + sprintf(after_frag, "'%s\n", delimiter);
> + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size)
> + n_frag= 2;
> + if (n_frag > 1)
> + {
> + before_frag= fmt_before_frag;
> + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> + sprintf(after_last, fmt_last_frag2, (char*) delimiter);
> + }
> + else
> + {
> + before_frag= "\nBINLOG '\n"; // single "fragment"
> + }
> + }
> +
> + size_t total_size= my_b_tell(body), total_written= 0;
> + size_t frag_size= total_size / n_frag + 1, curr_size;
> +
> + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE))
> + {
> + body->error= -1;
> + goto err;
> + }
> +
> + for (uint i= 0; i < n_frag; i++, total_written += curr_size)
> + {
> + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written;
> +
> + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size);
> +
> + if (before_frag)
> + {
> + sprintf(buf, before_frag, i);
> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> + }
> + if (my_b_copy_to_file(body, file, FALSE, curr_size))
> + {
> + body->error= -1;
> + goto err;
> + }
> + if (after_frag)
> + {
> + sprintf(buf, after_frag, NULL);
> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> + }
> + }
Hmm, dunno. I suspect you can do it three times shorter and five times
easier to read if you wouldn't try to generalize it for a arbitrary
number of fragments with arbitrary prefixes and suffixes. Just
if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin)
{
my_fprintf(file, "BINLOG '");
my_b_copy_to_file(body, file);
my_fprintf(file, "'%s\n", delimiter);
}
else
{
my_fprintf(file, "SET @binlog_fragment_0='");
my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size);
my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter);
my_b_copy_to_file(body, file, SIZE_T_MAX);
my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n",
delimiter, delimiter);
}
See?
> +
> + if (after_last)
> + {
> + sprintf(buf, after_last, n_frag);
> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> + }
> + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE);
> +
> +err:
> + my_free(after_frag);
> + my_free(after_last);
> +}
> +
> +/**
> + The function invokes base64 encoder to run on the current
> + event string and store the result into two caches.
> + When the event ends the current statement the caches are is copied into
> + the argument file.
> + Copying is also concerned how to wrap the event, specifically to produce
> + a valid SQL syntax.
> + When the encoded data size is within max(MAX_ALLOWED_PACKET)
> + a regular BINLOG query is composed. Otherwise it is build as fragmented
> +
> + SET @binlog_fragment_0='...';
> + SET @binlog_fragment_1='...';
> + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
> +
> + where fragments are represented by a sequence of "indexed" user
> + variables.
> + Two more statements are composed as well
> +
> + SET @binlog_fragment_0=NULL;
> + SET @binlog_fragment_1=NULL;
> +
> + to promptly release memory.
No, they aren't
> +
> + NOTE.
@note
> + If any changes made don't forget to duplicate them to
> + Old_rows_log_event as long as it's supported.
> +
> + @param file pointer to IO_CACHE
> + @param print_event_info pointer to print_event_info specializing
> + what out of and how to print the event
> + @param name the name of a table that the event operates on
> +
> + The function signals on any error of cache access through setting
> + that cache's @c error to -1.
> +*/
> void Rows_log_event::print_helper(FILE *file,
> PRINT_EVENT_INFO *print_event_info,
> char const *const name)
> {
> IO_CACHE *const head= &print_event_info->head_cache;
> IO_CACHE *const body= &print_event_info->body_cache;
> + bool do_print_encoded=
> + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
> + !print_event_info->short_form;
> +
> if (!print_event_info->short_form)
> {
> bool const last_stmt_event= get_flags(STMT_END_F);
> diff --git a/sql/log_event.h b/sql/log_event.h
> index 90900f63533..28277e659d2 100644
> --- a/sql/log_event.h
> +++ b/sql/log_event.h
> @@ -749,6 +749,7 @@ typedef struct st_print_event_info
> that was printed. We cache these so that we don't have to print
> them if they are unchanged.
> */
> + static const uint max_delimiter_len= 16;
why did you introduce this max_delimiter_len, if all you use
is sizeof(delimiter) anyway? (and even that is not needed)
> // TODO: have the last catalog here ??
> char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
> bool flags2_inited;
> @@ -798,7 +799,7 @@ typedef struct st_print_event_info
> bool printed_fd_event;
> my_off_t hexdump_from;
> uint8 common_header_len;
> - char delimiter[16];
> + char delimiter[max_delimiter_len];
>
> uint verbose;
> table_mapping m_table_map;
> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
> index 91cf038907e..b4e3342d8f3 100644
> --- a/sql/sql_binlog.cc
> +++ b/sql/sql_binlog.cc
> @@ -28,6 +28,70 @@
> // START_EVENT_V3,
> // Log_event_type,
> // Log_event
> +
> +/**
> + Copy fragments into the standard placeholder thd->lex->comment.str.
> +
> + Compute the size of the (still) encoded total,
> + allocate and then copy fragments one after another.
> + The size can exceed max(max_allowed_packet) which is not a
> + problem as no String instance is created off this char array.
> +
> + @param thd THD handle
> + @return
> + 0 at success,
> + -1 otherwise.
> +*/
> +int binlog_defragment(THD *thd)
> +{
> + user_var_entry *entry[2];
> + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident };
> +
> + /* compute the total size */
> + thd->lex->comment.str= NULL;
> + thd->lex->comment.length= 0;
> + for (uint k= 0; k < 2; k++)
> + {
> + entry[k]=
> + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
> + name[k].length);
> + if (!entry[k] || entry[k]->type != STRING_RESULT)
> + {
> + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
> + return -1;
> + }
> + thd->lex->comment.length += entry[k]->length;
> + }
> +
> + thd->lex->comment.str= // to be freed by the caller
> + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
> + if (!thd->lex->comment.str)
> + {
> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
> + return -1;
> + }
> +
> + /* fragments are merged into allocated buf while the user var:s get reset */
> + size_t gathered_length= 0;
> + for (uint k=0; k < 2; k++)
> + {
> + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, entry[k]->length);
> + gathered_length += entry[k]->length;
> + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 0))
> + {
> + my_printf_error(ER_WRONG_TYPE_FOR_VAR,
> + "%s: BINLOG fragment user "
> + "variable '%s' could not be unset", MYF(0),
> + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value);
> + }
I don't see how update_hash(entry[k], true, ...) can ever fail, so
there's no need to pretend that it can.
> + }
> +
> + DBUG_ASSERT(gathered_length == thd->lex->comment.length);
> +
> + return 0;
> +}
> +
> +
> /**
> Execute a BINLOG statement.
>
> @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd)
> rli->sql_driver_thd= thd;
> rli->no_storage= TRUE;
>
> + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
> + if (binlog_defragment(thd))
> + goto end;
> +
> + if (!(coded_len= thd->lex->comment.length))
> + {
> + my_error(ER_SYNTAX_ERROR, MYF(0));
> + goto end;
> + }
> +
> + decoded_len= base64_needed_decoded_length(coded_len);
> + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME))))
> + {
> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
> + goto end;
> + }
> +
Technically, it should be possible to decode base64 in-place and avoid
allocating a second 3GB buffer. But let's not do it in this MDEV :)
> for (char const *strptr= thd->lex->comment.str ;
> strptr < thd->lex->comment.str + thd->lex->comment.length ; )
> {
Regards,
Sergei
Chief Architect MariaDB
and security@xxxxxxxxxxx
Follow ups