maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #08575
Re: MDEV-8112: PATCH: no slave left behind
Hi Jonas,
Thanks for the patch!
I read through the patch. I am not that familiar with the semi-sync plugin
code, but it looks reasonable.
But the patch causes a test failure in sys_vars.all_vars. This is because
not all newly introduced variables have corresponding test cases in
mysql-test/suite/sys_vars/t/*_basic.test.
Also, do you have some documentation for the feature? (There is a bit in the
Jira entry, but for example the new variable
rpl_semi_sync_master_slave_lag_heartbeat_frequency_us seems to be not
described at all...
If you can fix the above two points (and if Buildbot does not point out more
test suite issues), then I will push it into the MariaDB tree.
Thanks,
- Kristian.
> patch has been ported to 10.1
> you can use it under new (also called "3-clause") BSD license
> i have done internal benchmarks...but you are most welcome to do some too.
>
> DESCRIPTION:
> no slave left behind
>
> this patch implements master throttling based on slave lag,
> aka no slave left behind. the core feature works as follows
> 1) the semi-sync-reply is ammended to also report back SQL-thread
> position (aka exec position)
> 2) transactions are not removed from the "active-transaction-list"
> in the semi-sync-master plugin until atleast one slave has reported
> that it has executed this transaction. the slave lag can then
> be estimated by calculating how long the oldest transaction has been
> lingering in the active-transaction-list.
> 3) client-threads are forced to wait before commit until slave lag
> has decreased to acceptable value.
>
> the following variables are introduced on master:
>
> rpl_semi_sync_master_max_slave_lag (global)
> rpl_semi_sync_master_slave_lag_wait_timeout (session)
>
> the following status variables are introduced on master:
>
> rpl_semi_sync_master_slave_lag_wait_sessions
> rpl_semi_sync_master_estimated_slave_lag
> rpl_semi_sync_master_trx_slave_lag_wait_time
> rpl_semi_sync_master_trx_slave_lag_wait_num
> rpl_semi_sync_master_avg_trx_slave_lag_wait_time
>
> the following variables are introduced on slave:
>
> rpl_semi_sync_slave_lag_enabled (global)
>
> in addition to this, 2 optimizations that decreases overhead of semi-sync
> is introduced.
> 1) the idea of this is that if when a slave should send and transaction,
> it checks if it should be semi-synced, but rather
> than semi-sync:ing each transaction (which is done currently) the code
> will skip semi-syncing transaction if there already is newer transactions
> committed. But, since this can mean that semi-syncing is delayed indefinitely
> a cap is set using 2 new master variables:
>
> rpl_semi_sync_master_max_unacked_event_bytes (global)
> rpl_semi_sync_master_max_unacked_event_count (global)
> 2) rpl_semi_sync_master_group_commit which makes the semi-sync
> plugin only semi-sync the last transaction in a group commit.
> diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> new file mode 100644
> index 0000000..f3545a4
> --- /dev/null
> +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> @@ -0,0 +1,150 @@
> +include/master-slave.inc
> +[connection master]
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +Variable_name Value
> +rpl_semi_sync_master_max_slave_lag 10
> +rpl_semi_sync_master_slave_lag_heartbeat_frequency_us 500000
> +rpl_semi_sync_master_slave_lag_wait_timeout 50
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +include/start_slave.inc
> +# create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +#
> +# Check basic behaviour
> +#
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +# Now wait for slave lag to increase to > 0
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +# Now wait for slave lag to increase to > 10s
> +# Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +should_be_1
> +1
> +# Second transaction should now fail. slave_lag is >10s when it commits
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +ERROR HY000: Slave-lag timeout
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to < 10s
> +# And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +# [ on slave ]
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i f
> +1 1
> +2 2
> +3 3
> +4 4
> +5 5
> +6 6
> +7 7
> +8 8
> +9 9
> +13 d
> +14 e
> +15 f
> +#
> +# Test interaction with READ_ONLY
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# Now wait for slave lag to increase to > 10s
> +# [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# [ on master ]
> +set global read_only = 1;
> +# [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# read-only is check *before* slave lag
> +COMMIT;
> +ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +set global read_only = 0;
> +#
> +# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +# Now wait for slave lag to increase to > 0
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400;
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +should_be_1
> +1
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +#
> +# Clean up
> +#
> +# [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +set global rpl_semi_sync_master_enabled = 0;
> +include/start_slave.inc
> +DROP TABLE t1;
> +include/rpl_end.inc
> diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> new file mode 100644
> index 0000000..f9bdbd9
> --- /dev/null
> +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> @@ -0,0 +1,252 @@
> +source include/have_semisync.inc;
> +source include/not_embedded.inc;
> +source include/have_innodb.inc;
> +source include/master-slave.inc;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +--echo # create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +connect (con1,localhost,test,,test);
> +
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +
> +--echo #
> +--echo # Check basic behaviour
> +--echo #
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +
> +--echo # Second transaction should now fail. slave_lag is >10s when it commits
> +--error ER_ERROR_DURING_COMMIT
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to < 10s
> +let $wait_condition= SELECT VARIABLE_VALUE < 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +sync_slave_with_master;
> +
> +--echo # [ on slave ]
> +connection slave;
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +--echo #
> +--echo # Test interaction with READ_ONLY
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +connection con1;
> +--echo # [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +connection master;
> +--echo # [ on master ]
> +set global read_only = 1;
> +
> +connection con1;
> +--echo # [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # read-only is check *before* slave lag
> +--error ER_OPTION_PREVENTS_STATEMENT
> +COMMIT;
> +
> +disconnect con1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +set global read_only = 0;
> +
> +--echo #
> +--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +--disable_result_log
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h
> +
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +--disable_result_log
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo #
> +--echo # Clean up
> +--echo #
> +connection master;
> +--echo # [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 0;
> +
> +connection slave;
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +DROP TABLE t1;
> +sync_slave_with_master;
> +--source include/rpl_end.inc
> diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
> index 4a80360..fc02b9d 100644
> --- a/plugin/semisync/semisync.cc
> +++ b/plugin/semisync/semisync.cc
> @@ -20,6 +20,7 @@
>
> const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
> const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
> +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
>
>
> const unsigned long Trace::kTraceGeneral = 0x0001;
> @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
>
> const unsigned char ReplSemiSyncBase::kSyncHeader[2] =
> {ReplSemiSyncBase::kPacketMagicNum, 0};
> +
> +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
> + "rpl_semi_sync_slave_report_exec";
> diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
> index 2857729..78faba9 100644
> --- a/plugin/semisync/semisync.h
> +++ b/plugin/semisync/semisync.h
> @@ -75,13 +75,23 @@ class ReplSemiSyncBase
>
> /* Constants in network packet header. */
> static const unsigned char kPacketMagicNum;
> + /* this event should be semisync acked */
> static const unsigned char kPacketFlagSync;
> + /* this event should be semisync acked including the current SQL position */
> + static const unsigned char kPacketFlagSyncAndReport;
> +
> + /* user variable for enabling exec-pos reporting */
> + static const char* const kRplSemiSyncSlaveReportExec;
> };
>
> /* The layout of a semisync slave reply packet:
> 1 byte for the magic num
> 8 bytes for the binlog positon
> - n bytes for the binlog filename, terminated with a '\0'
> + n bytes for the binlog filename, NOT terminated with a '\0'
> + [ optionally ]
> + 1 byte == 0
> + 8 bytes for the sql-thread position
> + n bytes for the sql-thread filename, terminated with a '\0'
> */
> #define REPLY_MAGIC_NUM_LEN 1
> #define REPLY_BINLOG_POS_LEN 8
> diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
> index c88c162..3ee6e26 100644
> --- a/plugin/semisync/semisync_master.cc
> +++ b/plugin/semisync/semisync_master.cc
> @@ -22,6 +22,9 @@
> #define TIME_MILLION 1000000
> #define TIME_BILLION 1000000000
>
> +/* thd_key for per slave thread state */
> +static MYSQL_THD_KEY_T thd_key;
> +
> /* This indicates whether semi-synchronous replication is enabled. */
> char rpl_semi_sync_master_enabled;
> unsigned long rpl_semi_sync_master_wait_point =
> @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
> unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
> char rpl_semi_sync_master_wait_no_slave = 1;
>
> +unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
> +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
> +
> +unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
> +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
> +unsigned long rpl_semi_sync_master_max_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> +
> +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
>
> static int getWaitTime(const struct timespec& start_ts);
>
> @@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
> ins_node->log_pos_ = log_file_pos;
>
> + {
> + /**
> + * set trans commit time
> + * this is called when writing into binlog, which is not
> + * exactly right, but close enough for our purposes
> + */
> + ins_node->tranx_commit_time_us = my_hrtime().val;
> + }
> +
> if (!trx_front_)
> {
> /* The list is empty. */
> @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
> return function_exit(kWho, result);
> }
>
> -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> - my_off_t log_file_pos)
> +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos)
> {
> - const char *kWho = "ActiveTranx::is_tranx_end_pos";
> + const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
> function_enter(kWho);
> -
> unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
> TranxNode *entry = trx_htb_[hash_val];
>
> @@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> }
>
> if (trace_level_ & kTraceDetail)
> - sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
> - log_file_name, (unsigned long)log_file_pos, hash_val);
> + sql_print_information("%s: probe (%s, %lu)", kWho,
> + log_file_name, (unsigned long)log_file_pos);
>
> function_exit(kWho, (entry != NULL));
> - return (entry != NULL);
> + return entry;
> }
>
> -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos)
> +int ActiveTranx::clear_active_tranx_nodes()
> {
> - const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
> - TranxNode *new_front;
> + set_new_front(NULL);
> + return 0;
> +}
>
> +void ActiveTranx::set_new_front(TranxNode *new_front)
> +{
> + const char *kWho = "ActiveTranx::set_new_front";
> function_enter(kWho);
>
> - if (log_file_name != NULL)
> - {
> - new_front = trx_front_;
> -
> - while (new_front)
> - {
> - if (compare(new_front, log_file_name, log_file_pos) > 0)
> - break;
> - new_front = new_front->next_;
> - }
> - }
> - else
> - {
> - /* If log_file_name is NULL, clear everything. */
> - new_front = NULL;
> - }
> -
> if (new_front == NULL)
> {
> /* No active transaction nodes after the call. */
> @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> trx_front_ = NULL;
> trx_rear_ = NULL;
> }
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared all nodes", kWho);
> }
> @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
>
> trx_front_ = new_front;
> allocator_.free_nodes_before(trx_front_);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
> kWho, n_frees,
> trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
> }
> + function_exit(kWho, 0);
> +}
>
> - return function_exit(kWho, 0);
> +bool ActiveTranx::prune_active_tranx_nodes(
> + LogPosPtr pos,
> + ulonglong *oldest_tranx_commit_time_us)
> +{
> + TranxNode *old_front = trx_front_;
> + TranxNode *new_front;
> +
> + new_front = trx_front_;
> + while (new_front)
> + {
> + if (compare(new_front, pos.file_name, pos.file_pos) > 0)
> + break;
> + new_front = new_front->next_;
> + }
> +
> + set_new_front(new_front);
> +
> + if (oldest_tranx_commit_time_us)
> + {
> + if (trx_front_ == NULL)
> + *oldest_tranx_commit_time_us = 0;
> + else
> + *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
> + }
> +
> + return ! (old_front == trx_front_);
> }
>
>
> @@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
> wait_file_pos_(0),
> master_enabled_(false),
> wait_timeout_(0L),
> - state_(0)
> + state_(0),
> + oldest_unapplied_tranx_commit_time_us_(0)
> {
> strcpy(reply_file_name_, "");
> strcpy(wait_file_name_, "");
> @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
> mysql_cond_init(key_ss_cond_COND_binlog_send_,
> &COND_binlog_send_, NULL);
>
> + /* Mutex initialization can only be done after MY_INIT(). */
> + mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
> + &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
> + mysql_cond_init(key_ss_cond_COND_slave_lag_,
> + &COND_slave_lag_, NULL);
> +
> if (rpl_semi_sync_master_enabled)
> result = enableMaster();
> else
> result = disableMaster();
>
> + thd_key_create(&thd_key);
> +
> return result;
> }
>
> @@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
> {
> mysql_mutex_destroy(&LOCK_binlog_);
> mysql_cond_destroy(&COND_binlog_send_);
> + mysql_mutex_destroy(&LOCK_slave_lag_);
> + mysql_cond_destroy(&COND_slave_lag_);
> init_done_= 0;
> }
>
> @@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
> {
> lock();
> rpl_semi_sync_master_clients++;
> + if (has_semi_sync_slave_lag())
> + rpl_semi_sync_master_slave_lag_clients++;
> unlock();
> +
> + if (has_semi_sync_slave_lag())
> + {
> + int null_val = 0;
> + longlong new_val =
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
> + longlong old_val = new_val + 1;
> +
> + get_user_var_int("master_heartbeat_period", &old_val, &null_val);
> + if (old_val > new_val || null_val)
> + {
> + /* if there no old value or it's bigger than what we want */
> + int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
> + if (res == -1)
> + {
> + sql_print_error(
> + "Repl_semi_sync::failed to set master_heartbeat_period");
> + }
> + }
> + }
> +
> + /**
> + * create per slave-state and store it in thread-local-storage */
> + ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
> + thd_setspecific(current_thd, thd_key, state);
> }
>
> void ReplSemiSyncMaster::remove_slave()
> @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
> rpl_semi_sync_master_clients == 0)
> switch_off();
> }
> +
> + bool no_slave_lag_clients = false;
> + if (has_semi_sync_slave_lag())
> + {
> + if (--rpl_semi_sync_master_slave_lag_clients == 0)
> + {
> + no_slave_lag_clients = true;
> + }
> + }
> +
> unlock();
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
> + thd_setspecific(current_thd, thd_key, NULL);
> +
> + if (state != NULL)
> + {
> + delete state;
> + }
> +
> + if (no_slave_lag_clients)
> + {
> + wake_slave_lag_waiters(0);
> + }
> }
>
> bool ReplSemiSyncMaster::is_semi_sync_slave()
> @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
> return val;
> }
>
> +bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
> +{
> + int null_value;
> + long long val= 0;
> + get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
> + return val;
> +}
> +
> +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
> +{
> + if (log_pos == NULL)
> + {
> + /* heartbeat events does not have logpos (since they are not actually
> + * stored in the binlog).
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* don't semi-sync them if we haven't enabled slave-lag handling */
> + return 0;
> + }
> + else
> + {
> + /* else ask for both IO and exec position */
> + return 2;
> + }
> + }
> +
> + /**
> + * check if this log-pos is a candidate for semi-syncing event
> + */
> + TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
> + log_pos->file_pos);
> +
> + if (entry == NULL)
> + return 0;
> +
> + ReplSemiSyncMasterPerSlaveState *state =
> + (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
> + thd_key);
> + do
> + {
> + state->unacked_event_count_++;
> +
> + if (active_tranxs_->is_rear(entry))
> + {
> + /* always ask for ack on last event in tranx list */
> + break;
> + }
> +
> + if (state->unacked_event_count_ >=
> + rpl_semi_sync_master_max_unacked_event_count)
> + {
> + /* enough events passed that it's time for another ack */
> + break;
> + }
> +
> + if (!state->sync_req_pos_.IsInited())
> + {
> + /* first event => time for ack */
> + break;
> + }
> +
> + if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
> + {
> + /* new file => time for ack */
> + break;
> + }
> +
> + if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
> + rpl_semi_sync_master_max_unacked_event_bytes))
> + {
> + /* enough bytes => time for ack */
> + break;
> + }
> +
> + /* we skip asking for semi-sync ack on this event */
> + return 0;
> +
> + } while (0);
> +
> + /* keep track on when we last asked for semi-sync-ack */
> + state->unacked_event_count_ = 0;
> + state->sync_req_pos_.Assign(log_pos);
> +
> + /**
> + * check if this slave can report back exec position
> + */
> + if (!has_semi_sync_slave_lag())
> + {
> + /* slave can't report back SQL position */
> + return 1;
> + }
> +
> + /* ask for both IO and SQL position */
> + return 2;
> +}
> +
> int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> const char *log_file_name,
> - my_off_t log_file_pos)
> + my_off_t log_file_pos,
> + const LogPos *exec_pos)
> {
> const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
> int cmp;
> bool can_release_threads = false;
> bool need_copy_send_pos = true;
> + bool pruned_trx_list = false;
> + ulonglong oldest_tranx_commit_time_us = 0;
> +
>
> if (!(getMasterEnabled()))
> return 0;
> @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> reply_file_pos_ = log_file_pos;
> reply_file_name_inited_ = true;
>
> - /* Remove all active transaction nodes before this point. */
> - assert(active_tranxs_ != NULL);
> - active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
> -
> if (trace_level_ & kTraceDetail)
> sql_print_information("%s: Got reply at (%s, %lu)", kWho,
> log_file_name, (unsigned long)log_file_pos);
> }
>
> + assert(active_tranxs_ != NULL);
> + if (exec_pos != NULL)
> + {
> + /* prune using exec_pos */
> + LogPosPtr ptr(*exec_pos);
> + pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
> + ptr, &oldest_tranx_commit_time_us);
> + }
> + else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
> + {
> + /**
> + * if we don't have any slaves that can do exec_pos reporting,
> + * prune by IO position as "plain old semi sync"
> + */
> + LogPosPtr ptr(log_file_name, log_file_pos);
> + active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
> + }
> +
> if (rpl_semi_sync_master_wait_sessions > 0)
> {
> /* Let us check if some of the waiting threads doing a trx
> @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
> cond_broadcast();
> }
>
> + if (pruned_trx_list)
> + {
> + /**
> + * if we did prune trx list, it might be that we should wake up
> + * threads waiting for slave-lag to decrease
> + */
> + wake_slave_lag_waiters(oldest_tranx_commit_time_us);
> + }
> +
> return function_exit(kWho, 0);
> }
>
> @@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
> }
> }
>
> - /*
> - At this point, the binlog file and position of this transaction
> - must have been removed from ActiveTranx.
> - */
> - assert(thd_killed(NULL) ||
> - !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
> - trx_wait_binlog_pos));
> -
> l_end:
> /* Update the status counter. */
> if (is_on())
> @@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
>
> /* Clear the active transaction list. */
> assert(active_tranxs_ != NULL);
> - result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
> + result = active_tranxs_->clear_active_tranx_nodes();
>
> rpl_semi_sync_master_off_times++;
> wait_file_name_inited_ = false;
> @@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> {
> const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
> int cmp = 0;
> - bool sync = false;
> + int sync = 0;
>
> /* If the semi-sync master is not enabled, or the slave is not a semi-sync
> * target, do not request replies from the slave.
> @@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* semi-sync is ON */
> /* sync= false; No sync unless a transaction is involved. */
>
> + if (log_file_name == NULL)
> + {
> + /* this is heartbeat, request io_pos and exec_pos */
> + sync = checkSyncReq(0);
> + goto l_end;
> + }
> +
> if (reply_file_name_inited_)
> {
> cmp = ActiveTranx::compare(log_file_name, log_file_pos,
> @@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> */
> if (cmp >= 0)
> {
> - /*
> + /*
> * We only wait if the event is a transaction's ending event.
> */
> assert(active_tranxs_ != NULL);
> - sync = active_tranxs_->is_tranx_end_pos(log_file_name,
> - log_file_pos);
> + LogPosPtr pos(log_file_name, log_file_pos);
> + sync = checkSyncReq(&pos);
> }
> }
> else
> @@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> }
> else
> {
> - sync = true;
> + sync = 1;
> }
> }
>
> @@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
> /* We do not need to clear sync flag because we set it to 0 when we
> * reserve the packet header.
> */
> - if (sync)
> + if (sync == 1)
> {
> (packet)[2] = kPacketFlagSync;
> }
> + else if (sync == 2)
> + {
> + (packet)[2] = kPacketFlagSyncAndReport;
> + }
>
> return function_exit(kWho, 0);
> }
> @@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> if (is_on())
> {
> assert(active_tranxs_ != NULL);
> - if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> + bool empty = active_tranxs_->is_empty();
> + if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> {
> /*
> if insert tranx_node failed, print a warning message
> @@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> log_file_name, (ulong)log_file_pos);
> switch_off();
> }
> + else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
> + {
> + /* if the list of transactions was empty,
> + * we need to init the oldest_tranx_commit_time_us
> + */
> + oldest_unapplied_tranx_commit_time_us_ =
> + active_tranxs_->get_oldest_tranx_commit_time_us();
> + }
> }
>
> l_end:
> @@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
> }
>
> int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> - const char *event_buf)
> + const char *event_buf_)
> {
> const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
> - const unsigned char *packet;
> + const unsigned char *packet, *packet_start;
> char log_file_name[FN_REFLEN];
> my_off_t log_file_pos;
> ulong log_file_len = 0;
> @@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> int result = -1;
> struct timespec start_ts;
> ulong trc_level = trace_level_;
> + const unsigned char *event_buf = (const unsigned char*)event_buf_;
> + bool exec_pos_present = false; // is SQL exec pos present in reply
> + LogPos exec_pos; // position of SQL thread
> LINT_INIT_STRUCT(start_ts);
>
> function_enter(kWho);
>
> - assert((unsigned char)event_buf[1] == kPacketMagicNum);
> - if ((unsigned char)event_buf[2] != kPacketFlagSync)
> + assert(event_buf[1] == kPacketMagicNum);
> + if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
> {
> /* current event does not require reply */
> result = 0;
> @@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> goto l_end;
> }
>
> - packet = net->read_pos;
> + packet_start = packet = net->read_pos;
> if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
> {
> sql_print_error("Read semi-sync reply magic number error");
> goto l_end;
> }
>
> + /* we determine if this semisync ack contains a sql-thread exec-pos
> + * by checking if last byte == 0, since the packet then contains
> + * \0-terminated filenames */
> + exec_pos_present = packet[packet_len - 1] == 0;
> +
> log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
> - log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + if (exec_pos_present == false)
> + {
> + log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> + }
> + else
> + {
> + log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
> + MY_MIN((ulong)FN_REFLEN,
> + packet_len - REPLY_BINLOG_NAME_OFFSET));
> + }
> if (log_file_len >= FN_REFLEN)
> {
> sql_print_error("Read semi-sync reply binlog file length too large");
> goto l_end;
> }
> - strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
> + packet+= REPLY_BINLOG_NAME_OFFSET;
> +
> + strncpy(log_file_name, (const char*)packet, log_file_len);
> log_file_name[log_file_len] = 0;
>
> + if (exec_pos_present)
> + {
> + packet += log_file_len + 1;
> + if (packet + 8 + 1 >= (packet_start + packet_len))
> + {
> + sql_print_error("Read semi-sync reply binlog. "
> + "Packet to short to contain exec-position!");
> + goto l_end;
> + }
> + exec_pos.file_pos = uint8korr(packet);
> + packet += 8;
> + strncpy(exec_pos.file_name, (char*)packet,
> + (packet_start + packet_len) - packet);
> + }
> +
> if (trc_level & kTraceDetail)
> sql_print_information("%s: Got reply (%s, %lu)",
> kWho, log_file_name, (ulong)log_file_pos);
>
> - result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
> + result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
> + exec_pos_present ? &exec_pos : NULL);
>
> l_end:
> return function_exit(kWho, result);
> @@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
> wait_file_name_inited_ = false;
> reply_file_name_inited_ = false;
> commit_file_name_inited_ = false;
> + if (active_tranxs_ != NULL)
> + {
> + /**
> + * make sure to empty transaction hash/list
> + * with slave-lag reporting this container does
> + * not have to be empty even if no transaction is
> + * currently running
> + */
> + active_tranxs_->clear_active_tranx_nodes();
> + }
>
> rpl_semi_sync_master_yes_transactions = 0;
> rpl_semi_sync_master_no_transactions = 0;
> @@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
>
> unlock();
>
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> + oldest_unapplied_tranx_commit_time_us_ = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> + rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +
> return function_exit(kWho, result);
> }
>
> @@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
> ((double)rpl_semi_sync_master_net_wait_num)) : 0);
>
> unlock();
> +
> + if (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
> + oldest_unapplied_tranx_commit_time_us_;
> + }
> + else
> + {
> + rpl_semi_sync_master_estimated_slave_lag = 0;
> + }
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + if (rpl_semi_sync_master_trx_slave_lag_wait_num)
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
> + (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
> + (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
> + }
> + else
> + {
> + rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> }
>
> /* Get the waiting time given the wait's staring time.
> @@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
>
> return (int)(end_usecs - start_usecs);
> }
> +
> +void ReplSemiSyncMaster::wake_slave_lag_waiters(
> + ulonglong oldest_unapplied_tranx_commit_time_us)
> +{
> + mysql_mutex_lock(&LOCK_slave_lag_);
> + oldest_unapplied_tranx_commit_time_us_ =
> + oldest_unapplied_tranx_commit_time_us;
> +
> + if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
> + {
> + mysql_cond_broadcast(&COND_slave_lag_);
> + }
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> +}
> +
> +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
> +{
> + int error = 0;
> + PSI_stage_info old_stage;
> +
> + /* slave lag waiting not enabled, return directly */
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + return 0;
> +
> + /* there is no slave that can report slave lag, return directly */
> + if (rpl_semi_sync_master_slave_lag_clients == 0)
> + return 0;
> +
> + /* compute start_time and end_time */
> + struct timespec end_time;
> + set_timespec(end_time, 0);
> + ulonglong start_time_us = timespec_to_usec(&end_time);
> + end_time.tv_sec += timeout_sec;
> +
> + mysql_mutex_lock(&LOCK_slave_lag_);
> +
> + if (oldest_unapplied_tranx_commit_time_us_ == 0)
> + {
> + /* no slave lag, atleast one slave is up to date */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + if (rpl_semi_sync_master_max_slave_lag == 0)
> + {
> + /* slave lag waiting not enabled */
> + mysql_mutex_unlock(&LOCK_slave_lag_);
> + return 0;
> + }
> +
> + /* This must be called after acquired the lock */
> + THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
> + &stage_waiting_for_semi_sync_slave_lag,
> + &old_stage);
> +
> + bool waited = false;
> + ulonglong lag = 0;
> + ulonglong max_lag = 0;
> + while (oldest_unapplied_tranx_commit_time_us_ != 0)
> + {
> + /* check kill_level after THD_ENTER_COND but *before* cond_wait
> + * to avoid missing kills */
> + if (! (getMasterEnabled() && is_on() &&
> + thd_kill_level(NULL) == THD_IS_NOT_KILLED))
> + break;
> +
> + lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
> + max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
> + if (lag <= max_lag)
> + break;
> +
> + waited = true;
> + rpl_semi_sync_master_slave_lag_wait_sessions++;
> + int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
> + &end_time);
> + rpl_semi_sync_master_slave_lag_wait_sessions--;
> +
> + bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED;
> + if (wait_result != 0 || thd_was_killed)
> + {
> + break;
> + }
> + }
> +
> + if (thd_kill_level(NULL) != THD_IS_NOT_KILLED)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Killed while waiting for replication semi-sync slave-lag.",
> + MYF(0));
> + }
> + else if (lag > max_lag)
> + {
> + /* Return error to client. */
> + error = 1;
> + my_printf_error(ER_ERROR_DURING_COMMIT,
> + "Slave-lag timeout",
> + MYF(0));
> + }
> +
> + if (waited)
> + {
> + rpl_semi_sync_master_trx_slave_lag_wait_num++;
> + rpl_semi_sync_master_trx_slave_lag_wait_time +=
> + (my_hrtime().val - start_time_us);
> + }
> +
> + /* The lock held will be released by thd_exit_cond, so no need to
> + call unlock() here */
> + THD_EXIT_COND(NULL, & old_stage);
> +
> + return error;
> +}
> diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
> index d9dc4ce..e5c0de6 100644
> --- a/plugin/semisync/semisync_master.h
> +++ b/plugin/semisync/semisync_master.h
> @@ -24,17 +24,101 @@
> #ifdef HAVE_PSI_INTERFACE
> extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> extern PSI_cond_key key_ss_cond_COND_binlog_send_;
> +
> +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
> +extern PSI_cond_key key_ss_cond_COND_slave_lag_;
> #endif
>
> extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
> +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
>
> struct TranxNode {
> char log_name_[FN_REFLEN];
> - my_off_t log_pos_;
> + my_off_t log_pos_;
> + ulonglong tranx_commit_time_us;
> struct TranxNode *next_; /* the next node in the sorted list */
> struct TranxNode *hash_next_; /* the next node during hash collision */
> };
>
> +struct LogPos;
> +
> +/* This represent a log position */
> +struct LogPosPtr {
> + LogPosPtr() { Uninit();}
> + LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
> + explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
> +
> + const char *file_name;
> + my_off_t file_pos;
> +
> + LogPosPtr& Assign(const LogPosPtr *src) {
> + file_name = src->file_name;
> + file_pos = src->file_pos;
> + return *this;
> + }
> +
> + LogPosPtr& Assign(const LogPos *src);
> +
> + void Uninit() { file_name = NULL;}
> + bool IsInited() const { return file_name != NULL; }
> +};
> +
> +struct LogPos {
> + char file_name[FN_REFLEN];
> + my_off_t file_pos;
> +
> + LogPos() { Uninit(); }
> +
> + LogPosPtr ToLogPosPtr() const {
> + if (IsInited()){
> + LogPosPtr p(file_name, file_pos);
> + return p;
> + } else {
> + LogPosPtr p;
> + return p;
> + }
> + }
> +
> + LogPos& Assign(const LogPosPtr *src) {
> + if (src->IsInited()) {
> + strcpy(file_name, src->file_name);
> + file_pos = src->file_pos;
> + } else {
> + Uninit();
> + }
> + return *this;
> + }
> +
> + LogPos& Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> + }
> +
> + void Uninit() { file_name[0] = 0; }
> + bool IsInited() const { return file_name[0] != 0; }
> +};
> +
> +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
> + LogPosPtr p = src->ToLogPosPtr();
> + Assign(&p);
> + return *this;
> +}
> +
> +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
> + int cmp = strcmp(pos1->file_name, pos2->file_name);
> +
> + if (cmp != 0)
> + return cmp;
> +
> + if (pos1->file_pos > pos2->file_pos)
> + return 1;
> + else if (pos1->file_pos < pos2->file_pos)
> + return -1;
> + else
> + return 0;
> +}
> +
> /**
> @class TranxNodeAllocator
>
> @@ -329,10 +413,14 @@ class ActiveTranx
> node2->log_name_, node2->log_pos_);
> }
>
> + void set_new_front(TranxNode* new_front);
> +
> public:
> ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
> ~ActiveTranx();
>
> + bool is_empty() const { return trx_front_ == NULL; }
> +
> /* Insert an active transaction node with the specified position.
> *
> * Return:
> @@ -340,21 +428,42 @@ class ActiveTranx
> */
> int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
>
> - /* Clear the active transaction nodes until(inclusive) the specified
> - * position.
> - * If log_file_name is NULL, everything will be cleared: the sorted
> + /* Clear the active transaction
> + * Everything will be cleared: the sorted
> * list and the hash table will be reset to empty.
> - *
> + *
> * Return:
> - * 0: success; non-zero: error
> + * 0 success; non-zero: error
> + */
> + int clear_active_tranx_nodes();
> +
> + /* Prune the active transaction nodes until the specified
> + * position (inclusive).
> + *
> + * Return:
> + * true if any transaction was removed
> + * false if list was left unchanged
> */
> - int clear_active_tranx_nodes(const char *log_file_name,
> - my_off_t log_file_pos);
> + bool prune_active_tranx_nodes(LogPosPtr logpos,
> + ulonglong *oldest_tranx_commit_time_us);
>
> - /* Given a position, check to see whether the position is an active
> - * transaction's ending position by probing the hash table.
> + /* Lookup a transaction's ending position by probing the hash table.
> + *
> + * return entry if found or NULL otherwise
> */
> - bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
> + TranxNode* lookup_tranx_end_pos(const char *log_file_name,
> + my_off_t log_file_pos);
> +
> +
> + /* Check if an entry is rear (i.e last) */
> + bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
> +
> + /**
> + * return timestamp of oldest transaction in list
> + */
> + ulonglong get_oldest_tranx_commit_time_us() const {
> + return trx_front_->tranx_commit_time_us;
> + }
>
> /* Given two binlog positions, compare which one is bigger based on
> * (file_name, file_position).
> @@ -365,6 +474,24 @@ class ActiveTranx
> };
>
> /**
> + * State that semisync master keeps per slave
> + */
> +struct ReplSemiSyncMasterPerSlaveState
> +{
> + ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
> +
> + /**
> + * No of events that has not been semi-sync acked
> + */
> + unsigned unacked_event_count_;
> +
> + /**
> + * Position of last event that was semisync'ed
> + */
> + LogPos sync_req_pos_;
> +};
> +
> +/**
> The extension class for the master of semi-synchronous replication
> */
> class ReplSemiSyncMaster
> @@ -432,6 +559,16 @@ class ReplSemiSyncMaster
>
> bool state_; /* whether semi-sync is switched */
>
> + /* This cond variable is signaled when slave lag has decreased */
> + mysql_cond_t COND_slave_lag_;
> +
> + /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
> + mysql_mutex_t LOCK_slave_lag_;
> +
> + /* this is commit time of oldest transaction that has not been applied
> + * on any slave */
> + ulonglong oldest_unapplied_tranx_commit_time_us_;
> +
> void lock();
> void unlock();
> void cond_broadcast();
> @@ -493,6 +630,9 @@ class ReplSemiSyncMaster
> /* Is the slave servered by the thread requested semi-sync */
> bool is_semi_sync_slave();
>
> + /* Does this slave have slave lag reporting capabilities */
> + bool has_semi_sync_slave_lag();
> +
> /* In semi-sync replication, reports up to which binlog position we have
> * received replies from the slave indicating that it already get the events.
> *
> @@ -501,13 +641,15 @@ class ReplSemiSyncMaster
> * log_file_name - (IN) binlog file name
> * end_offset - (IN) the offset in the binlog file up to which we have
> * the replies from the slave
> + * exec_position - (IN) position of SQL thread or NULL if not present
> *
> * Return:
> * 0: success; non-zero: error
> */
> int reportReplyBinlog(uint32 server_id,
> const char* log_file_name,
> - my_off_t end_offset);
> + my_off_t end_offset,
> + const LogPos *exec_position);
>
> /* Commit a transaction in the final step. This function is called from
> * InnoDB before returning from the low commit. If semi-sync is switch on,
> @@ -540,6 +682,16 @@ class ReplSemiSyncMaster
> */
> int reserveSyncHeader(unsigned char *header, unsigned long size);
>
> + /*
> + * check if an event should be semi synced and optionally
> + * if it should report back position of SQL thread on slave
> + *
> + * return 0 - no semi sync
> + * 1 - semi sync
> + * 2 - semi sync and report exec position
> + */
> + int checkSyncReq(const LogPosPtr *log_pos);
> +
> /* Update the sync bit in the packet header to indicate to the slave whether
> * the master will wait for the reply of the event. If semi-sync is switched
> * off and we detect that the slave is catching up, we switch semi-sync on.
> @@ -592,6 +744,21 @@ class ReplSemiSyncMaster
> * go off for that.
> */
> int resetMaster();
> +
> + /**
> + * wake potential slave-lag waiters
> + * called by binlog dump-thread(s)
> + */
> + void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
> +
> + /**
> + * wait for slave lag to get below threshold
> + * called by user-thread(s)
> + *
> + * return 0 - success
> + * 1 - timeout
> + */
> + int wait_slave_lag(ulong max_wait_time_sec);
> };
>
> enum rpl_semi_sync_master_wait_point_t {
> @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
> extern unsigned long long rpl_semi_sync_master_net_wait_time;
> extern unsigned long long rpl_semi_sync_master_trx_wait_time;
>
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
> +extern unsigned long rpl_semi_sync_master_max_slave_lag;
> +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
> +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
> +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
> +
> +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
> +
> /*
> This indicates whether we should keep waiting if no semi-sync slave
> is available.
> diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
> index 7bb0eea..3282158 100644
> --- a/plugin/semisync/semisync_master_plugin.cc
> +++ b/plugin/semisync/semisync_master_plugin.cc
> @@ -21,6 +21,11 @@
>
> static ReplSemiSyncMaster repl_semisync;
>
> +// forward declaration
> +static inline ulong get_slave_lag_wait_timeout(THD* thd);
> +
> +static char rpl_semi_sync_master_group_commit = 0;
> +
> C_MODE_START
>
> int repl_semi_report_binlog_update(Binlog_storage_param *param,
> @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
>
> if (repl_semisync.getMasterEnabled())
> {
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> /*
> Let us store the binlog file name and the position, so that
> we know how long to wait for the binlog to the replicated to
> @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
> return error;
> }
>
> -int repl_semi_request_commit(Trans_param *param)
> +int repl_semi_before_commit(Trans_param *param, int *error)
> {
> + *error = repl_semisync.wait_slave_lag(
> + get_slave_lag_wait_timeout(current_thd));
> +
> return 0;
> }
>
> @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
> my_off_t log_pos, uint32 flags)
> {
> int error= 0;
> +
> + if (rpl_semi_sync_master_group_commit &&
> + ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> + {
> + /** there are transactions more coming... */
> + return 0;
> + }
> +
> if (rpl_semi_sync_master_wait_point ==
> SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
> {
> @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
> Let's assume this semi-sync slave has already received all
> binlog events before the filename and position it requests.
> */
> - repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
> + repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
> }
> sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
> semi_sync_slave ? "semi-sync" : "asynchronous",
> @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
> &fix_rpl_semi_sync_master_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
> + rpl_semi_sync_master_max_unacked_event_count,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication events",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
> + rpl_semi_sync_master_max_unacked_event_bytes,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum unacked replication bytes",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
> + PLUGIN_VAR_OPCMDARG,
> + "Maximum allowed lag of fastest semi-sync slave (in seconds), "
> + "checked before commit.",
> + NULL, // check
> + NULL, // update
> + rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
> +
> +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
> + PLUGIN_VAR_RQCMDARG,
> + "Timeout in seconds a rw-transaction may wait for max slave lag before "
> + "being rolled back.",
> + NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
> +
> +static MYSQL_SYSVAR_ULONG(
> + slave_lag_heartbeat_frequency_us,
> + rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
> + PLUGIN_VAR_RQCMDARG,
> + "Heartbeat frequency when slave-lag is enabled (in microseconds).",
> + NULL, // check
> + NULL, // update
> + 500000, /* 500 ms */
> + 1, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
> + PLUGIN_VAR_OPCMDARG,
> + "Group commit for semi sync",
> + NULL, // check
> + NULL,
> + 0);
> +
> static SYS_VAR* semi_sync_master_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(wait_point),
> MYSQL_SYSVAR(timeout),
> MYSQL_SYSVAR(wait_no_slave),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(max_unacked_event_count),
> + MYSQL_SYSVAR(max_unacked_event_bytes),
> + MYSQL_SYSVAR(max_slave_lag),
> + MYSQL_SYSVAR(slave_lag_wait_timeout),
> + MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
> + MYSQL_SYSVAR(group_commit),
> NULL,
> };
>
> +static inline ulong get_slave_lag_wait_timeout(THD* thd)
> +{
> + return THDVAR(thd, slave_lag_wait_timeout);
> +}
>
> static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
> SYS_VAR *var,
> @@ -297,6 +377,7 @@ Trans_observer trans_observer = {
>
> repl_semi_report_commit, // after_commit
> repl_semi_report_rollback, // after_rollback
> + repl_semi_before_commit, // before commit
> };
>
> Binlog_storage_observer storage_observer = {
> @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
> DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
> DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
> DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
> -
> +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
> +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
>
> /* plugin status variables */
> static SHOW_VAR semi_sync_master_status_vars[]= {
> @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
> {"Rpl_semi_sync_master_net_avg_wait_time",
> (char*) &SHOW_FNAME(avg_net_wait_time),
> SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_slave_lag_wait_sessions",
> + (char*) &SHOW_FNAME(slave_lag_wait_sessions),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_estimated_slave_lag",
> + (char*) &SHOW_FNAME(estimated_slave_lag),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_slave_lag_waits",
> + (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
> + SHOW_SIMPLE_FUNC},
> + {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
> + (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
> + SHOW_SIMPLE_FUNC},
> {NULL, NULL, SHOW_LONG},
> };
>
> #ifdef HAVE_PSI_INTERFACE
> PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
>
> static PSI_mutex_info all_semisync_mutexes[]=
> {
> - { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
> + { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
> + { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
> };
>
> PSI_cond_key key_ss_cond_COND_binlog_send_;
> +PSI_cond_key key_ss_cond_COND_slave_lag_;
>
> static PSI_cond_info all_semisync_conds[]=
> {
> - { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
> + { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
> + { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
> };
> #endif /* HAVE_PSI_INTERFACE */
>
> PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
> { 0, "Waiting for semi-sync ACK from slave", 0};
>
> +PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
> +{ 0, "Waiting for semi-sync slave lag", 0};
> +
> #ifdef HAVE_PSI_INTERFACE
> PSI_stage_info *all_semisync_stages[]=
> {
> - & stage_waiting_for_semi_sync_ack_from_slave
> + & stage_waiting_for_semi_sync_ack_from_slave,
> + & stage_waiting_for_semi_sync_slave_lag
> };
>
> static void init_semisync_psi_keys(void)
> @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
> index 5f98472..839e0cc 100644
> --- a/plugin/semisync/semisync_slave.cc
> +++ b/plugin/semisync/semisync_slave.cc
> @@ -20,6 +20,7 @@
> char rpl_semi_sync_slave_enabled;
> char rpl_semi_sync_slave_status= 0;
> unsigned long rpl_semi_sync_slave_trace_level;
> +char rpl_semi_sync_slave_lag_enabled= 0;
>
> int ReplSemiSyncSlave::initObject()
> {
> @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
>
> int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
> unsigned long total_len,
> - bool *need_reply,
> + unsigned char *need_reply,
> const char **payload,
> unsigned long *payload_len)
> {
> @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
>
> if ((unsigned char)(header[0]) == kPacketMagicNum)
> {
> - *need_reply = (header[1] & kPacketFlagSync);
> + *need_reply = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
> *payload_len = total_len - 2;
> *payload = header + 2;
>
> @@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
> return 0;
> }
>
> -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> - const char *binlog_filename,
> - my_off_t binlog_filepos)
> +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info * mi)
> {
> const char *kWho = "ReplSemiSyncSlave::slaveReply";
> NET *net= &mysql->net;
> - uchar reply_buffer[REPLY_MAGIC_NUM_LEN
> - + REPLY_BINLOG_POS_LEN
> - + REPLY_BINLOG_NAME_LEN];
> + uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
> + 2 * ( REPLY_BINLOG_POS_LEN +
> + REPLY_BINLOG_NAME_LEN +
> + /* '\0' */ 1) ];
> int reply_res, name_len = strlen(binlog_filename);
> + int msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
>
> function_enter(kWho);
>
> @@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> sql_print_information("%s: reply (%s, %lu)", kWho,
> binlog_filename, (ulong)binlog_filepos);
>
> + if (header_byte & kPacketFlagSyncAndReport)
> + {
> + /**
> + * master requests that we also report back SQL-thread position
> + */
> +
> + // where to store sql filename/position
> + char *bufptr = (char*)reply_buffer + msg_len;
> + bufptr[0] = 0; // '\0' terminate previous filename
> + bufptr++;
> +
> + my_off_t sql_file_pos;
> + // get file/position and store the filename directly info bufptr+8
> + size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
> + int8store(bufptr, sql_file_pos); // store position
> +
> + msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
> + }
> +
> net_clear(net, 0);
> /* Send the reply. */
> - reply_res = my_net_write(net, reply_buffer,
> - name_len + REPLY_BINLOG_NAME_OFFSET);
> + reply_res = my_net_write(net, reply_buffer, msg_len);
> +
> if (!reply_res)
> {
> reply_res = net_flush(net);
> diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
> index 1bf8cf3..c91847d 100644
> --- a/plugin/semisync/semisync_slave.h
> +++ b/plugin/semisync/semisync_slave.h
> @@ -60,23 +60,30 @@ class ReplSemiSyncSlave
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
> + int slaveReadSyncHeader(const char *header, unsigned long total_len,
> + unsigned char *need_reply_byte,
> const char **payload, unsigned long *payload_len);
>
> /* A slave replies to the master indicating its replication process. It
> * indicates that the slave has received all events before the specified
> * binlog position.
> - *
> + *
> * Input:
> + * need_reply_byte - (IN) the header byte
> * mysql - (IN) the mysql network connection
> * binlog_filename - (IN) the reply point's binlog file name
> * binlog_filepos - (IN) the reply point's binlog file offset
> + * master_info - (IN) the master info struct so that we can get more
> + * info if needed
> *
> * Return:
> * 0: success; non-zero: error
> */
> - int slaveReply(MYSQL *mysql, const char *binlog_filename,
> - my_off_t binlog_filepos);
> + int slaveReply(unsigned char need_reply_byte,
> + MYSQL *mysql,
> + const char *binlog_filename,
> + my_off_t binlog_filepos,
> + Master_info* master_info);
>
> int slaveStart(Binlog_relay_IO_param *param);
> int slaveStop(Binlog_relay_IO_param *param);
> @@ -93,5 +100,6 @@ class ReplSemiSyncSlave
> extern char rpl_semi_sync_slave_enabled;
> extern unsigned long rpl_semi_sync_slave_trace_level;
> extern char rpl_semi_sync_slave_status;
> +extern char rpl_semi_sync_slave_lag_enabled;
>
> #endif /* SEMISYNC_SLAVE_H */
> diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
> index 572ead2..0bf03be 100644
> --- a/plugin/semisync/semisync_slave_plugin.cc
> +++ b/plugin/semisync/semisync_slave_plugin.cc
> @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
> event read is the last event of a transaction. And the value is
> checked in repl_semi_slave_queue_event.
> */
> -bool semi_sync_need_reply= false;
> +unsigned char semi_sync_need_reply= 0;
>
> C_MODE_START
>
> @@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
> return 1;
> }
> mysql_free_result(mysql_store_result(mysql));
> +
> + if (rpl_semi_sync_slave_lag_enabled)
> + {
> + char buf[100];
> + /*
> + Tell master that we can do exec-position reporting
> + */
> + snprintf(buf, sizeof(buf), "SET @%s= 1",
> + ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
> + if (mysql_real_query(mysql, buf, strlen(buf)))
> + {
> + sql_print_error("query: %s on master failed", buf);
> + return 1;
> + }
> + mysql_free_result(mysql_store_result(mysql));
> + }
> +
> rpl_semi_sync_slave_status= 1;
> return 0;
> }
> @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
> should not cause the slave IO thread to stop, and the error
> messages are already reported.
> */
> - (void) repl_semisync.slaveReply(param->mysql,
> + (void) repl_semisync.slaveReply(semi_sync_need_reply,
> + param->mysql,
> param->master_log_name,
> - param->master_log_pos);
> + param->master_log_pos,
> + param->mi);
> }
> return 0;
> }
> @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
> &fix_rpl_semi_sync_trace_level, // update
> 32, 0, ~0UL, 1);
>
> +static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
> + PLUGIN_VAR_OPCMDARG,
> + "Enable semi-synchronous replication slave lag reporting. ",
> + NULL, // check
> + NULL, // update
> + 0);
> +
> static SYS_VAR* semi_sync_slave_system_vars[]= {
> MYSQL_SYSVAR(enabled),
> MYSQL_SYSVAR(trace_level),
> + MYSQL_SYSVAR(lag_enabled),
> NULL,
> };
>
> @@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
> MariaDB_PLUGIN_MATURITY_GAMMA
> }
> maria_declare_plugin_end;
> -
> diff --git a/sql/handler.cc b/sql/handler.cc
> index 3ca9ec3..3e6cd65 100644
> --- a/sql/handler.cc
> +++ b/sql/handler.cc
> @@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all)
> uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
> /* rw_trans is TRUE when we in a transaction changing data */
> bool rw_trans= is_real_trans && (rw_ha_count > 0);
> + bool mdl_request_initialized= false;
> MDL_request mdl_request;
> DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
> is_real_trans, rw_trans, rw_ha_count));
>
> if (rw_trans)
> {
> + /* check READ-ONLY just before before_commit hook to decrease likelihood
> + * of having threads hanging waiting for slave-lag only to be aborted
> + * due to read-only.
> + */
> + if (opt_readonly &&
> + !(thd->security_ctx->master_access & SUPER_ACL) &&
> + !thd->slave_thread)
> + {
> + my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
> + goto err;
> + }
> +
> + if (RUN_HOOK(transaction, before_commit, (thd)))
> + {
> + goto err;
> + }
> +
> /*
> Acquire a metadata lock which will ensure that COMMIT is blocked
> by an active FLUSH TABLES WITH READ LOCK (and vice versa:
> @@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all)
> We allow the owner of FTWRL to COMMIT; we assume that it knows
> what it does.
> */
> + mdl_request_initialized= true;
> mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
> MDL_EXPLICIT);
>
> @@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all)
> ha_rollback_trans(thd, all);
>
> end:
> - if (rw_trans && mdl_request.ticket)
> + if (rw_trans && mdl_request_initialized && mdl_request.ticket)
> {
> /*
> We do not always immediately release transactional locks
> diff --git a/sql/replication.h b/sql/replication.h
> index 4731c22..309bdb4 100644
> --- a/sql/replication.h
> +++ b/sql/replication.h
> @@ -110,6 +110,21 @@ typedef struct Trans_observer {
> @retval 1 Failure
> */
> int (*after_rollback)(Trans_param *param);
> +
> + /**
> + This callback is called before transaction commit
> + If function does not return *error == 0 transaction will
> + not be committed but error code will be returned to client
> +
> + @note *error!=0 and return code 0 shall be used by plugin to signal
> + that transaction should be aborted.
> + If returning non-zero transaction will also be aborted and an error
> + will be printed to error log.
> +
> + @retval 0 Sucess
> + @retval non-zero error
> + */
> + int (*before_commit)(Trans_param *param, int *error);
> } Trans_observer;
>
> /**
> @@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
> };
>
>
> +class Master_info;
> +
> /**
> Replication binlog relay IO observer parameter
> */
> @@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
> my_off_t master_log_pos;
>
> MYSQL *mysql; /* the connection to master */
> +
> + Master_info * mi; /* master info handle */
> } Binlog_relay_IO_param;
>
> +
> +/* get the master log given a Master_info
> + * and store it in filename_buf/filepos
> + * return length of filename (excluding \0)
> + *
> + * note: filename_buf should be a minimum FN_REFLEN
> + */
> +size_t get_master_log_pos(const Master_info *mi,
> + char *filename_buf, my_off_t *filepos);
> +
> /**
> Observes and extends the service of slave IO thread.
> */
> @@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
> char *value, unsigned long len,
> unsigned int precision, int *null_value);
>
> -
> +
> +/**
> + Set or replace the value of user variable as to an ulonglong
> +
> + @param name user variable name
> + @param value the value
> + @param old_value pointer to where old value will be stored (or NULL)
> +
> + @retval 0 Success, no prior value found
> + @retval 1 Success, old_value populated
> + @retval -1 Fail
> +*/
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value);
>
> #ifdef __cplusplus
> }
> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
> index 3962600..209a809 100644
> --- a/sql/rpl_handler.cc
> +++ b/sql/rpl_handler.cc
> @@ -23,6 +23,7 @@
> #include "rpl_filter.h"
> #include <my_dir.h>
> #include "rpl_handler.h"
> +#include "sql_prepare.h"
>
> Trans_delegate *transaction_delegate;
> Binlog_storage_delegate *binlog_storage_delegate;
> @@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value,
> return 0;
> }
>
> +int set_user_var_int(const char *name,
> + long long int value,
> + long long int *old_value)
> +{
> + THD* thd= current_thd;
> + bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) my_hash_search(&thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (entry != NULL)
> + {
> + if (old_value != NULL)
> + *old_value= entry->val_int(&null_val);
> + }
> +
> + Ed_connection con(thd);
> +
> + char buf[256];
> + int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
> + if (/* error */ res < 0 ||
> + /* truncated */ res >= sizeof(buf))
> + {
> + return -1;
> + }
> +
> + LEX_STRING str;
> + lex_string_set(&str, buf);
> +
> + if (con.execute_direct(str))
> + {
> + return -1;
> + }
> +
> + return entry == NULL ? 0 : 1;
> +}
> +
> int delegates_init()
> {
> static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
> @@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
> return ret;
> }
>
> +int Trans_delegate::before_commit(THD *thd)
> +{
> + int ret= 0, error= 0;
> + Trans_param param;
> + param.flags= 0;
> + param.log_file= 0;
> + param.log_pos= 0;
> + FOREACH_OBSERVER(ret, before_commit, thd, (¶m, &error));
> + return error;
> +}
> +
> int Binlog_storage_delegate::after_flush(THD *thd,
> const char *log_file,
> my_off_t log_pos,
> @@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
>
> int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> String *packet,
> - const char *log_file,
> + const char *log_file_path,
> my_off_t log_pos)
> {
> Binlog_transmit_param param;
> param.flags= flags;
>
> int ret= 0;
> + const char* log_file_name= log_file_path != NULL ?
> + log_file_path + dirname_length(log_file_path) : NULL;
> FOREACH_OBSERVER(ret, before_send_event, false,
> (¶m, (uchar *)packet->c_ptr(),
> packet->length(),
> - log_file+dirname_length(log_file), log_pos));
> + log_file_name, log_pos));
> return ret;
> }
>
> @@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
> Master_info *mi)
> {
> + param->mi = mi;
> param->mysql= mi->mysql;
> param->user= mi->user;
> param->host= mi->host;
> @@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
> }
> +
> +/* get master log pos for a Master_info struct */
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + mysql_mutex_t *mutex= &mi->rli.data_lock;
> +
> + mysql_mutex_lock(mutex);
> + *filepos= mi->rli.group_master_log_pos;
> + strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
> + mysql_mutex_unlock(mutex);
> + return strnlen(filename_buf, FN_REFLEN);
> +}
> +
> #else
> int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
> {
> @@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> {
> return 0;
> }
> +
> +size_t get_master_log_pos(const Master_info* mi,
> + char *filename_buf, my_off_t *filepos)
> +{
> + *filepos= 0;
> + filename_buf[0]= 0;
> + return 0;
> +}
> +
> #endif /* HAVE_REPLICATION */
> diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
> index afcfd9d..5119ee4 100644
> --- a/sql/rpl_handler.h
> +++ b/sql/rpl_handler.h
> @@ -142,7 +142,7 @@ class Trans_delegate
> :public Delegate {
> public:
> typedef Trans_observer Observer;
> - int before_commit(THD *thd, bool all);
> + int before_commit(THD *thd);
> int before_rollback(THD *thd, bool all);
> int after_commit(THD *thd, bool all);
> int after_rollback(THD *thd, bool all);
> diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
> index 56300c6..d60a122 100644
> --- a/sql/rpl_rli.h
> +++ b/sql/rpl_rli.h
> @@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability
> standard lock acquisition order to avoid deadlocks:
> run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
> */
> - mysql_mutex_t data_lock, run_lock;
> + mutable mysql_mutex_t data_lock;
> + mysql_mutex_t run_lock;
> +
> /*
> start_cond is broadcast when SQL thread is started
> stop_cond - when stopped
> diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
> index d9ae6ca..9662058 100644
> --- a/sql/sql_repl.cc
> +++ b/sql/sql_repl.cc
> @@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> packet->append(b, sizeof(b));
> }
>
> + if (RUN_HOOK(binlog_transmit, before_send_event,
> + (info->thd, info->flags, packet, 0, 0)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
> net_flush(net))
> {
> @@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info,
> DBUG_RETURN(-1);
> }
>
> + if (RUN_HOOK(binlog_transmit, after_send_event,
> + (info->thd, info->flags, packet)))
> + {
> + info->error= ER_UNKNOWN_ERROR;
> + DBUG_RETURN(-1);
> + }
> +
> DBUG_RETURN(0);
> }
>