← Back to team overview

maria-developers team mailing list archive

Re: MDEV-8112: PATCH: no slave left behind

 

Hi Jonas,

Thanks for the patch!

I read through the patch. I am not that familiar with the semi-sync plugin
code, but it looks reasonable.

But the patch causes a test failure in sys_vars.all_vars. This is because
not all newly introduced variables have corresponding test cases in
mysql-test/suite/sys_vars/t/*_basic.test.

Also, do you have some documentation for the feature? (There is a bit in the
Jira entry, but for example the new variable
rpl_semi_sync_master_slave_lag_heartbeat_frequency_us seems to be not
described at all...

If you can fix the above two points (and if Buildbot does not point out more
test suite issues), then I will push it into the MariaDB tree.

Thanks,

 - Kristian.

> patch has been ported to 10.1
> you can use it under new (also called "3-clause") BSD license
> i have done internal benchmarks...but you are most welcome to do some too.
> 
> DESCRIPTION:
> no slave left behind
> 
> this patch implements master throttling based on slave lag,
> aka no slave left behind. the core feature works as follows
> 1) the semi-sync-reply is ammended to also report back SQL-thread
> position (aka exec position)
> 2) transactions are not removed from the "active-transaction-list"
> in the semi-sync-master plugin until atleast one slave has reported
> that it has executed this transaction. the slave lag can then
> be estimated by calculating how long the oldest transaction has been
> lingering in the active-transaction-list.
> 3) client-threads are forced to wait before commit until slave lag
> has decreased to acceptable value.
> 
> the following variables are introduced on master:
> 
>     rpl_semi_sync_master_max_slave_lag (global)
>     rpl_semi_sync_master_slave_lag_wait_timeout (session)
> 
> the following status variables are introduced on master:
> 
>     rpl_semi_sync_master_slave_lag_wait_sessions
>     rpl_semi_sync_master_estimated_slave_lag
>     rpl_semi_sync_master_trx_slave_lag_wait_time
>     rpl_semi_sync_master_trx_slave_lag_wait_num
>     rpl_semi_sync_master_avg_trx_slave_lag_wait_time
> 
> the following variables are introduced on slave:
> 
>     rpl_semi_sync_slave_lag_enabled (global)
> 
> in addition to this, 2 optimizations that decreases overhead of semi-sync
> is introduced.
> 1) the idea of this is that if when a slave should send and transaction,
> it checks if it should be semi-synced, but rather
> than semi-sync:ing each transaction (which is done currently) the code
> will skip semi-syncing transaction if there already is newer transactions
> committed. But, since this can mean that semi-syncing is delayed indefinitely
> a cap is set using 2 new master variables:
> 
>     rpl_semi_sync_master_max_unacked_event_bytes (global)
>     rpl_semi_sync_master_max_unacked_event_count (global)
>     2) rpl_semi_sync_master_group_commit which makes the semi-sync
>     plugin only semi-sync the last transaction in a group commit.


> diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> new file mode 100644
> index 0000000..f3545a4
> --- /dev/null
> +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_slave_lag.result
> @@ -0,0 +1,150 @@
> +include/master-slave.inc
> +[connection master]
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +Variable_name	Value
> +rpl_semi_sync_master_max_slave_lag	10
> +rpl_semi_sync_master_slave_lag_heartbeat_frequency_us	500000
> +rpl_semi_sync_master_slave_lag_wait_timeout	50
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +include/start_slave.inc
> +# create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +#
> +# Check basic behaviour
> +#
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +# Now wait for slave lag to increase to > 0
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +# Now wait for slave lag to increase to > 10s
> +# Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +should_be_1
> +1
> +# Second transaction should now fail. slave_lag is >10s when it commits
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +ERROR HY000: Slave-lag timeout
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to < 10s
> +# And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i	f
> +1	1
> +2	2
> +3	3
> +4	4
> +5	5
> +6	6
> +7	7
> +8	8
> +9	9
> +13	d
> +14	e
> +15	f
> +# [ on slave ]
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +i	f
> +1	1
> +2	2
> +3	3
> +4	4
> +5	5
> +6	6
> +7	7
> +8	8
> +9	9
> +13	d
> +14	e
> +15	f
> +#
> +# Test interaction with READ_ONLY
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# Now wait for slave lag to increase to > 10s
> +# [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +# [ on master ]
> +set global read_only = 1;
> +# [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +# read-only is check *before* slave lag
> +COMMIT;
> +ERROR HY000: The MariaDB server is running with the --read-only option so it cannot execute this statement
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +set global read_only = 0;
> +#
> +# check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +#
> +# [ on slave ]
> +STOP SLAVE SQL_THREAD;
> +# [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +# Now wait for slave lag to increase to > 0
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400;
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +# Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +# There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +should_be_1
> +1
> +# [ on slave ]
> +START SLAVE SQL_THREAD;
> +# [ on master ]
> +# Now wait for slave lag to decrease to 0
> +#
> +# Clean up
> +#
> +# [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +include/stop_slave.inc
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +set global rpl_semi_sync_master_enabled = 0;
> +include/start_slave.inc
> +DROP TABLE t1;
> +include/rpl_end.inc
> diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> new file mode 100644
> index 0000000..f9bdbd9
> --- /dev/null
> +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_slave_lag.test
> @@ -0,0 +1,252 @@
> +source include/have_semisync.inc;
> +source include/not_embedded.inc;
> +source include/have_innodb.inc;
> +source include/master-slave.inc;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 1;
> +set global rpl_semi_sync_master_max_slave_lag = 10;
> +
> +show variables like 'rpl_semi_sync_master_%slave_lag%';
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 1;
> +set global rpl_semi_sync_slave_lag_enabled = 1;
> +
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +--echo # create non-root user for testing READ_ONLY
> +grant SELECT, INSERT on *.* to test@localhost;
> +connect (con1,localhost,test,,test);
> +
> +CREATE TABLE t1 (i INT NOT NULL AUTO_INCREMENT PRIMARY KEY, f varchar(8))
> +ENGINE=innodb;
> +
> +--echo #
> +--echo # Check basic behaviour
> +--echo #
> +INSERT INTO t1 (f) VALUES ('1'),('2'),('3');
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +INSERT INTO t1 (f) VALUES ('4'),('5'),('6');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # First transaction should succeed. slave_lag is zero when it commits
> +INSERT INTO t1 (f) VALUES ('7'),('8'),('9');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Check that estimated_slave_lag is > 10s
> +SELECT VARIABLE_VALUE > 10000000 as should_be_1
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +
> +--echo # Second transaction should now fail. slave_lag is >10s when it commits
> +--error ER_ERROR_DURING_COMMIT
> +INSERT INTO t1 (f) VALUES ('a'),('b'),('c');
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +--echo # Now wait for slave lag to decrease to < 10s
> +let $wait_condition= SELECT VARIABLE_VALUE < 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # And now it should succeed again
> +INSERT INTO t1 (f) VALUES ('d'),('e'),('f');
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +sync_slave_with_master;
> +
> +--echo # [ on slave ]
> +connection slave;
> +
> +SELECT *
> +FROM t1
> +ORDER BY 1;
> +
> +--echo #
> +--echo # Test interaction with READ_ONLY
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +--echo # [ on master ]
> +connection master;
> +
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +--echo # Now wait for slave lag to increase to > 10s
> +let $wait_condition= SELECT VARIABLE_VALUE > 10000000
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +connection con1;
> +--echo # [ on con1 ]
> +BEGIN;
> +INSERT INTO t1 (f) VALUES ('g'),('h'),('i');
> +
> +connection master;
> +--echo # [ on master ]
> +set global read_only = 1;
> +
> +connection con1;
> +--echo # [ on con1 ]
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = 5;
> +
> +--echo # read-only is check *before* slave lag
> +--error ER_OPTION_PREVENTS_STATEMENT
> +COMMIT;
> +
> +disconnect con1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +set global read_only = 0;
> +
> +--echo #
> +--echo # check slave_lag > 0 but less than rpl_semi_sync_master_max_slave_lag
> +--echo #
> +
> +--echo # [ on slave ]
> +connection slave;
> +STOP SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +INSERT INTO t1 (f) VALUES ('j'),('k'),('l');
> +
> +--echo # Now wait for slave lag to increase to > 0
> +let $wait_condition= SELECT VARIABLE_VALUE > 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits before transaction
> +--disable_result_log
> +select @count_before := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # Set maximum allowed slave lag to 24h
> +set global rpl_semi_sync_master_max_slave_lag = 86400; # 24h
> +
> +INSERT INTO t1 (f) VALUES ('m'),('n'),('o');
> +
> +--echo # Capture rpl_semi_sync_master_tx_slave_lag_waits after transaction
> +--disable_result_log
> +select @count_after := VARIABLE_VALUE
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_tx_slave_lag_waits';
> +--enable_result_log
> +
> +--echo # There should have been no wait, since maximum allowed is very high
> +select @count_before = @count_after as should_be_1;
> +
> +--echo # [ on slave ]
> +connection slave;
> +START SLAVE SQL_THREAD;
> +
> +connection master;
> +--echo # [ on master ]
> +
> +--echo # Now wait for slave lag to decrease to 0
> +let $wait_condition= SELECT VARIABLE_VALUE = 0
> +FROM INFORMATION_SCHEMA.GLOBAL_STATUS
> +WHERE VARIABLE_NAME = 'rpl_semi_sync_master_estimated_slave_lag';
> +--source include/wait_condition.inc
> +
> +--echo #
> +--echo # Clean up
> +--echo #
> +connection master;
> +--echo # [ on master ]
> +set global rpl_semi_sync_master_max_slave_lag = default;
> +set session rpl_semi_sync_master_slave_lag_wait_timeout = default;
> +DROP USER test@localhost;
> +
> +connection slave;
> +source include/stop_slave.inc;
> +set global rpl_semi_sync_slave_enabled = 0;
> +set global rpl_semi_sync_slave_lag_enabled = default;
> +
> +connection master;
> +set global rpl_semi_sync_master_enabled = 0;
> +
> +connection slave;
> +source include/start_slave.inc;
> +
> +connection master;
> +
> +DROP TABLE t1;
> +sync_slave_with_master;
> +--source include/rpl_end.inc
> diff --git a/plugin/semisync/semisync.cc b/plugin/semisync/semisync.cc
> index 4a80360..fc02b9d 100644
> --- a/plugin/semisync/semisync.cc
> +++ b/plugin/semisync/semisync.cc
> @@ -20,6 +20,7 @@
>  
>  const unsigned char ReplSemiSyncBase::kPacketMagicNum = 0xef;
>  const unsigned char ReplSemiSyncBase::kPacketFlagSync = 0x01;
> +const unsigned char ReplSemiSyncBase::kPacketFlagSyncAndReport = 0x02;
>  
>  
>  const unsigned long Trace::kTraceGeneral  = 0x0001;
> @@ -29,3 +30,6 @@ const unsigned long Trace::kTraceFunction = 0x0040;
>  
>  const unsigned char  ReplSemiSyncBase::kSyncHeader[2] =
>    {ReplSemiSyncBase::kPacketMagicNum, 0};
> +
> +const char* const ReplSemiSyncBase::kRplSemiSyncSlaveReportExec =
> +    "rpl_semi_sync_slave_report_exec";
> diff --git a/plugin/semisync/semisync.h b/plugin/semisync/semisync.h
> index 2857729..78faba9 100644
> --- a/plugin/semisync/semisync.h
> +++ b/plugin/semisync/semisync.h
> @@ -75,13 +75,23 @@ class ReplSemiSyncBase
>  
>    /* Constants in network packet header. */
>    static const unsigned char kPacketMagicNum;
> +  /* this event should be semisync acked */
>    static const unsigned char kPacketFlagSync;
> +  /* this event should be semisync acked including the current SQL position */
> +  static const unsigned char kPacketFlagSyncAndReport;
> +
> +  /* user variable for enabling exec-pos reporting */
> +  static const char* const kRplSemiSyncSlaveReportExec;
>  };
>  
>  /* The layout of a semisync slave reply packet:
>     1 byte for the magic num
>     8 bytes for the binlog positon
> -   n bytes for the binlog filename, terminated with a '\0'
> +   n bytes for the binlog filename, NOT terminated with a '\0'
> +   [ optionally ]
> +   1 byte == 0
> +   8 bytes for the sql-thread position
> +   n bytes for the sql-thread filename, terminated with a '\0'
>  */
>  #define REPLY_MAGIC_NUM_LEN 1
>  #define REPLY_BINLOG_POS_LEN 8
> diff --git a/plugin/semisync/semisync_master.cc b/plugin/semisync/semisync_master.cc
> index c88c162..3ee6e26 100644
> --- a/plugin/semisync/semisync_master.cc
> +++ b/plugin/semisync/semisync_master.cc
> @@ -22,6 +22,9 @@
>  #define TIME_MILLION  1000000
>  #define TIME_BILLION  1000000000
>  
> +/* thd_key for per slave thread state */
> +static MYSQL_THD_KEY_T thd_key;
> +
>  /* This indicates whether semi-synchronous replication is enabled. */
>  char rpl_semi_sync_master_enabled;
>  unsigned long rpl_semi_sync_master_wait_point       =
> @@ -45,6 +48,18 @@ unsigned long long rpl_semi_sync_master_net_wait_time = 0;
>  unsigned long long rpl_semi_sync_master_trx_wait_time = 0;
>  char rpl_semi_sync_master_wait_no_slave = 1;
>  
> +unsigned long rpl_semi_sync_master_max_unacked_event_count = 0;
> +unsigned long rpl_semi_sync_master_max_unacked_event_bytes = 4096;
> +
> +unsigned long rpl_semi_sync_master_slave_lag_clients = 0;
> +unsigned long long rpl_semi_sync_master_estimated_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us = 500000;
> +unsigned long rpl_semi_sync_master_max_slave_lag = 0;
> +unsigned long rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> +
> +unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> +unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
>  
>  static int getWaitTime(const struct timespec& start_ts);
>  
> @@ -150,6 +165,15 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
>    ins_node->log_name_[FN_REFLEN-1] = 0; /* make sure it ends properly */
>    ins_node->log_pos_ = log_file_pos;
>  
> +  {
> +    /**
> +     * set trans commit time
> +     *   this is called when writing into binlog, which is not
> +     *   exactly right, but close enough for our purposes
> +     */
> +    ins_node->tranx_commit_time_us = my_hrtime().val;
> +  }
> +
>    if (!trx_front_)
>    {
>      /* The list is empty. */
> @@ -193,12 +217,11 @@ int ActiveTranx::insert_tranx_node(const char *log_file_name,
>    return function_exit(kWho, result);
>  }
>  
> -bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
> -				   my_off_t    log_file_pos)
> +TranxNode* ActiveTranx::lookup_tranx_end_pos(const char *log_file_name,
> +                                             my_off_t log_file_pos)
>  {
> -  const char *kWho = "ActiveTranx::is_tranx_end_pos";
> +  const char *kWho = "ActiveTranx::lookup_tranx_end_pos";
>    function_enter(kWho);
> -
>    unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
>    TranxNode *entry = trx_htb_[hash_val];
>  
> @@ -211,38 +234,24 @@ bool ActiveTranx::is_tranx_end_pos(const char *log_file_name,
>    }
>  
>    if (trace_level_ & kTraceDetail)
> -    sql_print_information("%s: probe (%s, %lu) in entry(%u)", kWho,
> -                          log_file_name, (unsigned long)log_file_pos, hash_val);
> +    sql_print_information("%s: probe (%s, %lu)", kWho,
> +                          log_file_name, (unsigned long)log_file_pos);
>  
>    function_exit(kWho, (entry != NULL));
> -  return (entry != NULL);
> +  return entry;
>  }
>  
> -int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
> -					  my_off_t log_file_pos)
> +int ActiveTranx::clear_active_tranx_nodes()
>  {
> -  const char *kWho = "ActiveTranx::::clear_active_tranx_nodes";
> -  TranxNode *new_front;
> +  set_new_front(NULL);
> +  return 0;
> +}
>  
> +void ActiveTranx::set_new_front(TranxNode *new_front)
> +{
> +  const char *kWho = "ActiveTranx::set_new_front";
>    function_enter(kWho);
>  
> -  if (log_file_name != NULL)
> -  {
> -    new_front = trx_front_;
> -
> -    while (new_front)
> -    {
> -      if (compare(new_front, log_file_name, log_file_pos) > 0)
> -        break;
> -      new_front = new_front->next_;
> -    }
> -  }
> -  else
> -  {
> -    /* If log_file_name is NULL, clear everything. */
> -    new_front = NULL;
> -  }
> -
>    if (new_front == NULL)
>    {
>      /* No active transaction nodes after the call. */
> @@ -257,7 +266,6 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
>        trx_front_ = NULL;
>        trx_rear_  = NULL;
>      }
> -
>      if (trace_level_ & kTraceDetail)
>        sql_print_information("%s: cleared all nodes", kWho);
>    }
> @@ -291,14 +299,40 @@ int ActiveTranx::clear_active_tranx_nodes(const char *log_file_name,
>  
>      trx_front_ = new_front;
>      allocator_.free_nodes_before(trx_front_);
> -
>      if (trace_level_ & kTraceDetail)
>        sql_print_information("%s: cleared %d nodes back until pos (%s, %lu)",
>                              kWho, n_frees,
>                              trx_front_->log_name_, (unsigned long)trx_front_->log_pos_);
>    }
> +  function_exit(kWho, 0);
> +}
>  
> -  return function_exit(kWho, 0);
> +bool ActiveTranx::prune_active_tranx_nodes(
> +    LogPosPtr pos,
> +    ulonglong *oldest_tranx_commit_time_us)
> +{
> +  TranxNode *old_front = trx_front_;
> +  TranxNode *new_front;
> +
> +  new_front = trx_front_;
> +  while (new_front)
> +  {
> +    if (compare(new_front, pos.file_name, pos.file_pos) > 0)
> +      break;
> +    new_front = new_front->next_;
> +  }
> +
> +  set_new_front(new_front);
> +
> +  if (oldest_tranx_commit_time_us)
> +  {
> +    if (trx_front_ == NULL)
> +      *oldest_tranx_commit_time_us = 0;
> +    else
> +      *oldest_tranx_commit_time_us = trx_front_->tranx_commit_time_us;
> +  }
> +
> +  return ! (old_front == trx_front_);
>  }
>  
>  
> @@ -334,7 +368,8 @@ ReplSemiSyncMaster::ReplSemiSyncMaster()
>      wait_file_pos_(0),
>      master_enabled_(false),
>      wait_timeout_(0L),
> -    state_(0)
> +    state_(0),
> +    oldest_unapplied_tranx_commit_time_us_(0)
>  {
>    strcpy(reply_file_name_, "");
>    strcpy(wait_file_name_, "");
> @@ -362,11 +397,19 @@ int ReplSemiSyncMaster::initObject()
>    mysql_cond_init(key_ss_cond_COND_binlog_send_,
>                    &COND_binlog_send_, NULL);
>  
> +  /* Mutex initialization can only be done after MY_INIT(). */
> +  mysql_mutex_init(key_ss_mutex_LOCK_slave_lag_,
> +                   &LOCK_slave_lag_, MY_MUTEX_INIT_FAST);
> +  mysql_cond_init(key_ss_cond_COND_slave_lag_,
> +                  &COND_slave_lag_, NULL);
> +
>    if (rpl_semi_sync_master_enabled)
>      result = enableMaster();
>    else
>      result = disableMaster();
>  
> +  thd_key_create(&thd_key);
> +
>    return result;
>  }
>  
> @@ -437,6 +480,8 @@ void ReplSemiSyncMaster::cleanup()
>    {
>      mysql_mutex_destroy(&LOCK_binlog_);
>      mysql_cond_destroy(&COND_binlog_send_);
> +    mysql_mutex_destroy(&LOCK_slave_lag_);
> +    mysql_cond_destroy(&COND_slave_lag_);
>      init_done_= 0;
>    }
>  
> @@ -473,7 +518,34 @@ void ReplSemiSyncMaster::add_slave()
>  {
>    lock();
>    rpl_semi_sync_master_clients++;
> +  if (has_semi_sync_slave_lag())
> +    rpl_semi_sync_master_slave_lag_clients++;
>    unlock();
> +
> +  if (has_semi_sync_slave_lag())
> +  {
> +    int null_val = 0;
> +    longlong new_val =
> +        rpl_semi_sync_master_slave_lag_heartbeat_frequency_us * 1000;
> +    longlong old_val = new_val + 1;
> +
> +    get_user_var_int("master_heartbeat_period", &old_val, &null_val);
> +    if (old_val > new_val || null_val)
> +    {
> +      /* if there no old value or it's bigger than what we want */
> +      int res = set_user_var_int("master_heartbeat_period",new_val, &old_val);
> +      if (res == -1)
> +      {
> +        sql_print_error(
> +            "Repl_semi_sync::failed to set master_heartbeat_period");
> +      }
> +    }
> +  }
> +
> +  /**
> +   * create per slave-state and store it in thread-local-storage */
> +  ReplSemiSyncMasterPerSlaveState *state = new ReplSemiSyncMasterPerSlaveState;
> +  thd_setspecific(current_thd, thd_key, state);
>  }
>  
>  void ReplSemiSyncMaster::remove_slave()
> @@ -492,7 +564,31 @@ void ReplSemiSyncMaster::remove_slave()
>          rpl_semi_sync_master_clients == 0)
>        switch_off();
>    }
> +
> +  bool no_slave_lag_clients = false;
> +  if (has_semi_sync_slave_lag())
> +  {
> +    if (--rpl_semi_sync_master_slave_lag_clients == 0)
> +    {
> +      no_slave_lag_clients = true;
> +    }
> +  }
> +
>    unlock();
> +
> +  ReplSemiSyncMasterPerSlaveState *state =
> +      (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd, thd_key);
> +  thd_setspecific(current_thd, thd_key, NULL);
> +
> +  if (state != NULL)
> +  {
> +    delete state;
> +  }
> +
> +  if (no_slave_lag_clients)
> +  {
> +    wake_slave_lag_waiters(0);
> +  }
>  }
>  
>  bool ReplSemiSyncMaster::is_semi_sync_slave()
> @@ -503,14 +599,115 @@ bool ReplSemiSyncMaster::is_semi_sync_slave()
>    return val;
>  }
>  
> +bool ReplSemiSyncMaster::has_semi_sync_slave_lag()
> +{
> +  int null_value;
> +  long long val= 0;
> +  get_user_var_int(kRplSemiSyncSlaveReportExec, &val, &null_value);
> +  return val;
> +}
> +
> +int ReplSemiSyncMaster::checkSyncReq(const LogPosPtr *log_pos)
> +{
> +  if (log_pos == NULL)
> +  {
> +    /* heartbeat events does not have logpos (since they are not actually
> +     * stored in the binlog).
> +     */
> +    if (!has_semi_sync_slave_lag())
> +    {
> +      /* don't semi-sync them if we haven't enabled slave-lag handling */
> +      return 0;
> +    }
> +    else
> +    {
> +      /* else ask for both IO and exec position */
> +      return 2;
> +    }
> +  }
> +
> +  /**
> +   * check if this log-pos is a candidate for semi-syncing event
> +   */
> +  TranxNode *entry = active_tranxs_->lookup_tranx_end_pos(log_pos->file_name,
> +                                                          log_pos->file_pos);
> +
> +  if (entry == NULL)
> +    return 0;
> +
> +  ReplSemiSyncMasterPerSlaveState *state =
> +      (ReplSemiSyncMasterPerSlaveState*)thd_getspecific(current_thd,
> +                                                        thd_key);
> +  do
> +  {
> +    state->unacked_event_count_++;
> +
> +    if (active_tranxs_->is_rear(entry))
> +    {
> +      /* always ask for ack on last event in tranx list */
> +      break;
> +    }
> +
> +    if (state->unacked_event_count_ >=
> +        rpl_semi_sync_master_max_unacked_event_count)
> +    {
> +      /* enough events passed that it's time for another ack */
> +      break;
> +    }
> +
> +    if (!state->sync_req_pos_.IsInited())
> +    {
> +      /* first event => time for ack */
> +      break;
> +    }
> +
> +    if (strcmp(log_pos->file_name, state->sync_req_pos_.file_name) != 0)
> +    {
> +      /* new file => time for ack */
> +      break;
> +    }
> +
> +    if (log_pos->file_pos >= (state->sync_req_pos_.file_pos +
> +                              rpl_semi_sync_master_max_unacked_event_bytes))
> +    {
> +      /* enough bytes => time for ack */
> +      break;
> +    }
> +
> +    /* we skip asking for semi-sync ack on this event */
> +    return 0;
> +
> +  } while (0);
> +
> +  /* keep track on when we last asked for semi-sync-ack */
> +  state->unacked_event_count_ = 0;
> +  state->sync_req_pos_.Assign(log_pos);
> +
> +  /**
> +   * check if this slave can report back exec position
> +   */
> +  if (!has_semi_sync_slave_lag())
> +  {
> +    /* slave can't report back SQL position */
> +    return 1;
> +  }
> +
> +  /* ask for both IO and SQL position */
> +  return 2;
> +}
> +
>  int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
>  					  const char *log_file_name,
> -					  my_off_t log_file_pos)
> +					  my_off_t log_file_pos,
> +                                          const LogPos *exec_pos)
>  {
>    const char *kWho = "ReplSemiSyncMaster::reportReplyBinlog";
>    int   cmp;
>    bool  can_release_threads = false;
>    bool  need_copy_send_pos = true;
> +  bool  pruned_trx_list = false;
> +  ulonglong oldest_tranx_commit_time_us = 0;
> +
>  
>    if (!(getMasterEnabled()))
>      return 0;
> @@ -559,15 +756,29 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
>      reply_file_pos_ = log_file_pos;
>      reply_file_name_inited_ = true;
>  
> -    /* Remove all active transaction nodes before this point. */
> -    assert(active_tranxs_ != NULL);
> -    active_tranxs_->clear_active_tranx_nodes(log_file_name, log_file_pos);
> -
>      if (trace_level_ & kTraceDetail)
>        sql_print_information("%s: Got reply at (%s, %lu)", kWho,
>                              log_file_name, (unsigned long)log_file_pos);
>    }
>  
> +  assert(active_tranxs_ != NULL);
> +  if (exec_pos != NULL)
> +  {
> +    /* prune using exec_pos */
> +    LogPosPtr ptr(*exec_pos);
> +    pruned_trx_list = active_tranxs_->prune_active_tranx_nodes(
> +        ptr, &oldest_tranx_commit_time_us);
> +  }
> +  else if (rpl_semi_sync_master_slave_lag_clients == 0 && need_copy_send_pos)
> +  {
> +    /**
> +     * if we don't have any slaves that can do exec_pos reporting,
> +     * prune by IO position as "plain old semi sync"
> +     */
> +    LogPosPtr ptr(log_file_name, log_file_pos);
> +    active_tranxs_->prune_active_tranx_nodes(ptr, NULL);
> +  }
> +
>    if (rpl_semi_sync_master_wait_sessions > 0)
>    {
>      /* Let us check if some of the waiting threads doing a trx
> @@ -596,6 +807,15 @@ int ReplSemiSyncMaster::reportReplyBinlog(uint32 server_id,
>      cond_broadcast();
>    }
>  
> +  if (pruned_trx_list)
> +  {
> +    /**
> +     * if we did prune trx list, it might be that we should wake up
> +     * threads waiting for slave-lag to decrease
> +     */
> +    wake_slave_lag_waiters(oldest_tranx_commit_time_us);
> +  }
> +
>    return function_exit(kWho, 0);
>  }
>  
> @@ -743,14 +963,6 @@ int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
>        }
>      }
>  
> -    /*
> -      At this point, the binlog file and position of this transaction
> -      must have been removed from ActiveTranx.
> -    */
> -    assert(thd_killed(NULL) ||
> -           !active_tranxs_->is_tranx_end_pos(trx_wait_binlog_name,
> -                                             trx_wait_binlog_pos));
> -    
>    l_end:
>      /* Update the status counter. */
>      if (is_on())
> @@ -794,7 +1006,7 @@ int ReplSemiSyncMaster::switch_off()
>  
>    /* Clear the active transaction list. */
>    assert(active_tranxs_ != NULL);
> -  result = active_tranxs_->clear_active_tranx_nodes(NULL, 0);
> +  result = active_tranxs_->clear_active_tranx_nodes();
>  
>    rpl_semi_sync_master_off_times++;
>    wait_file_name_inited_   = false;
> @@ -884,7 +1096,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
>  {
>    const char *kWho = "ReplSemiSyncMaster::updateSyncHeader";
>    int  cmp = 0;
> -  bool sync = false;
> +  int sync = 0;
>  
>    /* If the semi-sync master is not enabled, or the slave is not a semi-sync
>     * target, do not request replies from the slave.
> @@ -905,6 +1117,13 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
>      /* semi-sync is ON */
>      /* sync= false; No sync unless a transaction is involved. */
>  
> +    if (log_file_name == NULL)
> +    {
> +      /* this is heartbeat, request io_pos and exec_pos */
> +      sync = checkSyncReq(0);
> +      goto l_end;
> +    }
> +
>      if (reply_file_name_inited_)
>      {
>        cmp = ActiveTranx::compare(log_file_name, log_file_pos,
> @@ -933,12 +1152,12 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
>       */
>      if (cmp >= 0)
>      {
> -      /* 
> +      /*
>         * We only wait if the event is a transaction's ending event.
>         */
>        assert(active_tranxs_ != NULL);
> -      sync = active_tranxs_->is_tranx_end_pos(log_file_name,
> -                                               log_file_pos);
> +      LogPosPtr pos(log_file_name, log_file_pos);
> +      sync = checkSyncReq(&pos);
>      }
>    }
>    else
> @@ -951,7 +1170,7 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
>      }
>      else
>      {
> -      sync = true;
> +      sync = 1;
>      }
>    }
>  
> @@ -966,10 +1185,14 @@ int ReplSemiSyncMaster::updateSyncHeader(unsigned char *packet,
>    /* We do not need to clear sync flag because we set it to 0 when we
>     * reserve the packet header.
>     */
> -  if (sync)
> +  if (sync == 1)
>    {
>      (packet)[2] = kPacketFlagSync;
>    }
> +  else if (sync == 2)
> +  {
> +    (packet)[2] = kPacketFlagSyncAndReport;
> +  }
>  
>    return function_exit(kWho, 0);
>  }
> @@ -1018,7 +1241,8 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
>    if (is_on())
>    {
>      assert(active_tranxs_ != NULL);
> -    if(active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
> +    bool empty = active_tranxs_->is_empty();
> +    if (active_tranxs_->insert_tranx_node(log_file_name, log_file_pos))
>      {
>        /*
>          if insert tranx_node failed, print a warning message
> @@ -1028,6 +1252,14 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
>                          log_file_name, (ulong)log_file_pos);
>        switch_off();
>      }
> +    else if (empty && rpl_semi_sync_master_slave_lag_clients > 0)
> +    {
> +      /* if the list of transactions was empty,
> +       * we need to init the oldest_tranx_commit_time_us
> +       */
> +      oldest_unapplied_tranx_commit_time_us_ =
> +          active_tranxs_->get_oldest_tranx_commit_time_us();
> +    }
>    }
>  
>   l_end:
> @@ -1037,10 +1269,10 @@ int ReplSemiSyncMaster::writeTranxInBinlog(const char* log_file_name,
>  }
>  
>  int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
> -                                       const char *event_buf)
> +                                       const char *event_buf_)
>  {
>    const char *kWho = "ReplSemiSyncMaster::readSlaveReply";
> -  const unsigned char *packet;
> +  const unsigned char *packet, *packet_start;
>    char     log_file_name[FN_REFLEN];
>    my_off_t log_file_pos;
>    ulong    log_file_len = 0;
> @@ -1048,12 +1280,15 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
>    int      result = -1;
>    struct timespec start_ts;
>    ulong trc_level = trace_level_;
> +  const unsigned char *event_buf = (const unsigned char*)event_buf_;
> +  bool exec_pos_present = false; // is SQL exec pos present in reply
> +  LogPos   exec_pos;             // position of SQL thread
>    LINT_INIT_STRUCT(start_ts);
>  
>    function_enter(kWho);
>  
> -  assert((unsigned char)event_buf[1] == kPacketMagicNum);
> -  if ((unsigned char)event_buf[2] != kPacketFlagSync)
> +  assert(event_buf[1] == kPacketMagicNum);
> +  if ((event_buf[2] & (kPacketFlagSync | kPacketFlagSyncAndReport)) == 0)
>    {
>      /* current event does not require reply */
>      result = 0;
> @@ -1111,28 +1346,60 @@ int ReplSemiSyncMaster::readSlaveReply(NET *net, uint32 server_id,
>      goto l_end;
>    }
>  
> -  packet = net->read_pos;
> +  packet_start = packet = net->read_pos;
>    if (packet[REPLY_MAGIC_NUM_OFFSET] != ReplSemiSyncMaster::kPacketMagicNum)
>    {
>      sql_print_error("Read semi-sync reply magic number error");
>      goto l_end;
>    }
>  
> +  /* we determine if this semisync ack contains a sql-thread exec-pos
> +   * by checking if last byte == 0, since the packet then contains
> +   * \0-terminated filenames */
> +  exec_pos_present = packet[packet_len - 1] == 0;
> +
>    log_file_pos = uint8korr(packet + REPLY_BINLOG_POS_OFFSET);
> -  log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> +  if (exec_pos_present == false)
> +  {
> +    log_file_len = packet_len - REPLY_BINLOG_NAME_OFFSET;
> +  }
> +  else
> +  {
> +    log_file_len = strnlen((char*)packet + REPLY_BINLOG_NAME_OFFSET,
> +                           MY_MIN((ulong)FN_REFLEN,
> +                                  packet_len - REPLY_BINLOG_NAME_OFFSET));
> +  }
>    if (log_file_len >= FN_REFLEN)
>    {
>      sql_print_error("Read semi-sync reply binlog file length too large");
>      goto l_end;
>    }
> -  strncpy(log_file_name, (const char*)packet + REPLY_BINLOG_NAME_OFFSET, log_file_len);
> +  packet+= REPLY_BINLOG_NAME_OFFSET;
> +
> +  strncpy(log_file_name, (const char*)packet, log_file_len);
>    log_file_name[log_file_len] = 0;
>  
> +  if (exec_pos_present)
> +  {
> +    packet += log_file_len + 1;
> +    if (packet + 8 + 1 >= (packet_start + packet_len))
> +    {
> +      sql_print_error("Read semi-sync reply binlog. "
> +                      "Packet to short to contain exec-position!");
> +      goto l_end;
> +    }
> +    exec_pos.file_pos = uint8korr(packet);
> +    packet += 8;
> +    strncpy(exec_pos.file_name, (char*)packet,
> +            (packet_start + packet_len) - packet);
> +  }
> +
>    if (trc_level & kTraceDetail)
>      sql_print_information("%s: Got reply (%s, %lu)",
>                            kWho, log_file_name, (ulong)log_file_pos);
>  
> -  result = reportReplyBinlog(server_id, log_file_name, log_file_pos);
> +  result = reportReplyBinlog(server_id, log_file_name, log_file_pos,
> +                             exec_pos_present ? &exec_pos : NULL);
>  
>   l_end:
>    return function_exit(kWho, result);
> @@ -1154,6 +1421,16 @@ int ReplSemiSyncMaster::resetMaster()
>    wait_file_name_inited_   = false;
>    reply_file_name_inited_  = false;
>    commit_file_name_inited_ = false;
> +  if (active_tranxs_ != NULL)
> +  {
> +    /**
> +     * make sure to empty transaction hash/list
> +     * with slave-lag reporting this container does
> +     * not have to be empty even if no transaction is
> +     * currently running
> +     */
> +    active_tranxs_->clear_active_tranx_nodes();
> +  }
>  
>    rpl_semi_sync_master_yes_transactions = 0;
>    rpl_semi_sync_master_no_transactions = 0;
> @@ -1168,6 +1445,13 @@ int ReplSemiSyncMaster::resetMaster()
>  
>    unlock();
>  
> +  mysql_mutex_lock(&LOCK_slave_lag_);
> +  rpl_semi_sync_master_slave_lag_wait_sessions = 0;
> +  oldest_unapplied_tranx_commit_time_us_ = 0;
> +  rpl_semi_sync_master_trx_slave_lag_wait_num = 0;
> +  rpl_semi_sync_master_trx_slave_lag_wait_time = 0;
> +  mysql_mutex_unlock(&LOCK_slave_lag_);
> +
>    return function_exit(kWho, result);
>  }
>  
> @@ -1186,6 +1470,29 @@ void ReplSemiSyncMaster::setExportStats()
>                       ((double)rpl_semi_sync_master_net_wait_num)) : 0);
>  
>    unlock();
> +
> +  if (oldest_unapplied_tranx_commit_time_us_ != 0)
> +  {
> +    rpl_semi_sync_master_estimated_slave_lag = my_hrtime().val -
> +        oldest_unapplied_tranx_commit_time_us_;
> +  }
> +  else
> +  {
> +    rpl_semi_sync_master_estimated_slave_lag = 0;
> +  }
> +
> +  mysql_mutex_lock(&LOCK_slave_lag_);
> +  if (rpl_semi_sync_master_trx_slave_lag_wait_num)
> +  {
> +    rpl_semi_sync_master_avg_trx_slave_lag_wait_time =
> +        (unsigned long)((double)rpl_semi_sync_master_trx_slave_lag_wait_time /
> +                        (double)rpl_semi_sync_master_trx_slave_lag_wait_num);
> +  }
> +  else
> +  {
> +    rpl_semi_sync_master_avg_trx_slave_lag_wait_time = 0;
> +  }
> +  mysql_mutex_unlock(&LOCK_slave_lag_);
>  }
>  
>  /* Get the waiting time given the wait's staring time.
> @@ -1213,3 +1520,117 @@ static int getWaitTime(const struct timespec& start_ts)
>  
>    return (int)(end_usecs - start_usecs);
>  }
> +
> +void ReplSemiSyncMaster::wake_slave_lag_waiters(
> +    ulonglong oldest_unapplied_tranx_commit_time_us)
> +{
> +  mysql_mutex_lock(&LOCK_slave_lag_);
> +  oldest_unapplied_tranx_commit_time_us_ =
> +      oldest_unapplied_tranx_commit_time_us;
> +
> +  if (rpl_semi_sync_master_slave_lag_wait_sessions > 0)
> +  {
> +    mysql_cond_broadcast(&COND_slave_lag_);
> +  }
> +  mysql_mutex_unlock(&LOCK_slave_lag_);
> +}
> +
> +int ReplSemiSyncMaster::wait_slave_lag(ulong timeout_sec)
> +{
> +  int error = 0;
> +  PSI_stage_info old_stage;
> +
> +  /* slave lag waiting not enabled, return directly */
> +  if (rpl_semi_sync_master_max_slave_lag == 0)
> +    return 0;
> +
> +  /* there is no slave that can report slave lag, return directly */
> +  if (rpl_semi_sync_master_slave_lag_clients == 0)
> +    return 0;
> +
> +  /* compute start_time and end_time */
> +  struct timespec end_time;
> +  set_timespec(end_time, 0);
> +  ulonglong start_time_us = timespec_to_usec(&end_time);
> +  end_time.tv_sec += timeout_sec;
> +
> +  mysql_mutex_lock(&LOCK_slave_lag_);
> +
> +  if (oldest_unapplied_tranx_commit_time_us_ == 0)
> +  {
> +    /* no slave lag, atleast one slave is up to date */
> +    mysql_mutex_unlock(&LOCK_slave_lag_);
> +    return 0;
> +  }
> +
> +  if (rpl_semi_sync_master_max_slave_lag == 0)
> +  {
> +    /* slave lag waiting not enabled */
> +    mysql_mutex_unlock(&LOCK_slave_lag_);
> +    return 0;
> +  }
> +
> +  /* This must be called after acquired the lock */
> +  THD_ENTER_COND(NULL, &COND_slave_lag_, &LOCK_slave_lag_,
> +                 &stage_waiting_for_semi_sync_slave_lag,
> +                 &old_stage);
> +
> +  bool waited = false;
> +  ulonglong lag = 0;
> +  ulonglong max_lag = 0;
> +  while (oldest_unapplied_tranx_commit_time_us_ != 0)
> +  {
> +    /* check kill_level after THD_ENTER_COND but *before* cond_wait
> +     * to avoid missing kills */
> +    if (! (getMasterEnabled() && is_on() &&
> +           thd_kill_level(NULL) == THD_IS_NOT_KILLED))
> +      break;
> +
> +    lag = start_time_us - oldest_unapplied_tranx_commit_time_us_;
> +    max_lag = 1000000 * rpl_semi_sync_master_max_slave_lag;
> +    if (lag <= max_lag)
> +      break;
> +
> +    waited = true;
> +    rpl_semi_sync_master_slave_lag_wait_sessions++;
> +    int wait_result = mysql_cond_timedwait(&COND_slave_lag_, &LOCK_slave_lag_,
> +                                           &end_time);
> +    rpl_semi_sync_master_slave_lag_wait_sessions--;
> +
> +    bool thd_was_killed = thd_kill_level(NULL) != THD_IS_NOT_KILLED;
> +    if (wait_result != 0 || thd_was_killed)
> +    {
> +      break;
> +    }
> +  }
> +
> +  if (thd_kill_level(NULL) != THD_IS_NOT_KILLED)
> +  {
> +    /* Return error to client. */
> +    error = 1;
> +    my_printf_error(ER_ERROR_DURING_COMMIT,
> +                    "Killed while waiting for replication semi-sync slave-lag.",
> +                    MYF(0));
> +  }
> +  else if (lag > max_lag)
> +  {
> +    /* Return error to client. */
> +    error = 1;
> +    my_printf_error(ER_ERROR_DURING_COMMIT,
> +                    "Slave-lag timeout",
> +                    MYF(0));
> +  }
> +
> +  if (waited)
> +  {
> +    rpl_semi_sync_master_trx_slave_lag_wait_num++;
> +    rpl_semi_sync_master_trx_slave_lag_wait_time +=
> +        (my_hrtime().val - start_time_us);
> +  }
> +
> +  /* The lock held will be released by thd_exit_cond, so no need to
> +     call unlock() here */
> +  THD_EXIT_COND(NULL, & old_stage);
> +
> +  return error;
> +}
> diff --git a/plugin/semisync/semisync_master.h b/plugin/semisync/semisync_master.h
> index d9dc4ce..e5c0de6 100644
> --- a/plugin/semisync/semisync_master.h
> +++ b/plugin/semisync/semisync_master.h
> @@ -24,17 +24,101 @@
>  #ifdef HAVE_PSI_INTERFACE
>  extern PSI_mutex_key key_ss_mutex_LOCK_binlog_;
>  extern PSI_cond_key key_ss_cond_COND_binlog_send_;
> +
> +extern PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
> +extern PSI_cond_key key_ss_cond_COND_slave_lag_;
>  #endif
>  
>  extern PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave;
> +extern PSI_stage_info stage_waiting_for_semi_sync_slave_lag;
>  
>  struct TranxNode {
>    char             log_name_[FN_REFLEN];
> -  my_off_t          log_pos_;
> +  my_off_t         log_pos_;
> +  ulonglong        tranx_commit_time_us;
>    struct TranxNode *next_;            /* the next node in the sorted list */
>    struct TranxNode *hash_next_;    /* the next node during hash collision */
>  };
>  
> +struct LogPos;
> +
> +/* This represent a log position */
> +struct LogPosPtr {
> +  LogPosPtr() { Uninit();}
> +  LogPosPtr(const char *name, my_off_t pos) : file_name(name), file_pos(pos){}
> +  explicit LogPosPtr(const LogPos& pos) { Assign(&pos); }
> +
> +  const char *file_name;
> +  my_off_t    file_pos;
> +
> +  LogPosPtr& Assign(const LogPosPtr *src) {
> +    file_name = src->file_name;
> +    file_pos = src->file_pos;
> +    return *this;
> +  }
> +
> +  LogPosPtr& Assign(const LogPos *src);
> +
> +  void Uninit() { file_name = NULL;}
> +  bool IsInited() const { return file_name != NULL; }
> +};
> +
> +struct LogPos {
> +  char     file_name[FN_REFLEN];
> +  my_off_t file_pos;
> +
> +  LogPos() { Uninit(); }
> +
> +  LogPosPtr ToLogPosPtr() const {
> +    if (IsInited()){
> +      LogPosPtr p(file_name, file_pos);
> +      return p;
> +    } else {
> +      LogPosPtr p;
> +      return p;
> +    }
> +  }
> +
> +  LogPos& Assign(const LogPosPtr *src) {
> +    if (src->IsInited()) {
> +      strcpy(file_name, src->file_name);
> +      file_pos = src->file_pos;
> +    } else {
> +      Uninit();
> +    }
> +    return *this;
> +  }
> +
> +  LogPos& Assign(const LogPos* src) {
> +    LogPosPtr p = src->ToLogPosPtr();
> +    Assign(&p);
> +    return *this;
> +  }
> +
> +  void Uninit() { file_name[0] = 0; }
> +  bool IsInited() const { return file_name[0] != 0; }
> +};
> +
> +inline LogPosPtr& LogPosPtr::Assign(const LogPos* src) {
> +  LogPosPtr p = src->ToLogPosPtr();
> +  Assign(&p);
> +  return *this;
> +}
> +
> +inline int CompareLogPos(const LogPosPtr *pos1, const LogPosPtr *pos2) {
> +  int cmp = strcmp(pos1->file_name, pos2->file_name);
> +
> +  if (cmp != 0)
> +    return cmp;
> +
> +  if (pos1->file_pos > pos2->file_pos)
> +    return 1;
> +  else if (pos1->file_pos < pos2->file_pos)
> +    return -1;
> +  else
> +    return 0;
> +}
> +
>  /**
>    @class TranxNodeAllocator
>  
> @@ -329,10 +413,14 @@ class ActiveTranx
>                     node2->log_name_, node2->log_pos_);
>    }
>  
> +  void set_new_front(TranxNode* new_front);
> +
>  public:
>    ActiveTranx(mysql_mutex_t *lock, unsigned long trace_level);
>    ~ActiveTranx();
>  
> +  bool is_empty() const { return trx_front_ == NULL; }
> +
>    /* Insert an active transaction node with the specified position.
>     *
>     * Return:
> @@ -340,21 +428,42 @@ class ActiveTranx
>     */
>    int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
>  
> -  /* Clear the active transaction nodes until(inclusive) the specified
> -   * position.
> -   * If log_file_name is NULL, everything will be cleared: the sorted
> +  /* Clear the active transaction
> +   * Everything will be cleared: the sorted
>     * list and the hash table will be reset to empty.
> -   * 
> +   *
>     * Return:
> -   *  0: success;  non-zero: error
> +   *   0 success; non-zero: error
> +   */
> +  int clear_active_tranx_nodes();
> +
> +  /* Prune the active transaction nodes until the specified
> +   * position (inclusive).
> +   *
> +   * Return:
> +   *   true  if any transaction was removed
> +   *   false if list was left unchanged
>     */
> -  int clear_active_tranx_nodes(const char *log_file_name,
> -                               my_off_t    log_file_pos);
> +  bool prune_active_tranx_nodes(LogPosPtr logpos,
> +                                ulonglong *oldest_tranx_commit_time_us);
>  
> -  /* Given a position, check to see whether the position is an active
> -   * transaction's ending position by probing the hash table.
> +  /* Lookup a transaction's ending position by probing the hash table.
> +   *
> +   * return  entry if found or NULL otherwise
>     */
> -  bool is_tranx_end_pos(const char *log_file_name, my_off_t log_file_pos);
> +  TranxNode* lookup_tranx_end_pos(const char *log_file_name,
> +                                  my_off_t log_file_pos);
> +
> +
> +  /* Check if an entry is rear (i.e last) */
> +  bool is_rear(TranxNode* entry) const { return entry == trx_rear_; }
> +
> +  /**
> +   * return timestamp of oldest transaction in list
> +   */
> +  ulonglong get_oldest_tranx_commit_time_us() const {
> +    return trx_front_->tranx_commit_time_us;
> +  }
>  
>    /* Given two binlog positions, compare which one is bigger based on
>     * (file_name, file_position).
> @@ -365,6 +474,24 @@ class ActiveTranx
>  };
>  
>  /**
> + * State that semisync master keeps per slave
> + */
> +struct ReplSemiSyncMasterPerSlaveState
> +{
> +  ReplSemiSyncMasterPerSlaveState() : unacked_event_count_(0) {}
> +
> +  /**
> +   * No of events that has not been semi-sync acked
> +   */
> +  unsigned unacked_event_count_;
> +
> +  /**
> +   * Position of last event that was semisync'ed
> +   */
> +  LogPos sync_req_pos_;
> +};
> +
> +/**
>     The extension class for the master of semi-synchronous replication
>  */
>  class ReplSemiSyncMaster
> @@ -432,6 +559,16 @@ class ReplSemiSyncMaster
>  
>    bool            state_;                    /* whether semi-sync is switched */
>  
> +  /* This cond variable is signaled when slave lag has decreased */
> +  mysql_cond_t COND_slave_lag_;
> +
> +  /* Mutex that protects oldest_unapplied_tranx_commit_time_us */
> +  mysql_mutex_t LOCK_slave_lag_;
> +
> +  /* this is commit time of oldest transaction that has not been applied
> +   * on any slave */
> +  ulonglong oldest_unapplied_tranx_commit_time_us_;
> +
>    void lock();
>    void unlock();
>    void cond_broadcast();
> @@ -493,6 +630,9 @@ class ReplSemiSyncMaster
>    /* Is the slave servered by the thread requested semi-sync */
>    bool is_semi_sync_slave();
>  
> +  /* Does this slave have slave lag reporting capabilities */
> +  bool has_semi_sync_slave_lag();
> +
>    /* In semi-sync replication, reports up to which binlog position we have
>     * received replies from the slave indicating that it already get the events.
>     *
> @@ -501,13 +641,15 @@ class ReplSemiSyncMaster
>     *  log_file_name - (IN)  binlog file name
>     *  end_offset    - (IN)  the offset in the binlog file up to which we have
>     *                        the replies from the slave
> +   *  exec_position - (IN)  position of SQL thread or NULL if not present
>     *
>     * Return:
>     *  0: success;  non-zero: error
>     */
>    int reportReplyBinlog(uint32 server_id,
>                          const char* log_file_name,
> -                        my_off_t end_offset);
> +                        my_off_t end_offset,
> +                        const LogPos *exec_position);
>  
>    /* Commit a transaction in the final step.  This function is called from
>     * InnoDB before returning from the low commit.  If semi-sync is switch on,
> @@ -540,6 +682,16 @@ class ReplSemiSyncMaster
>     */
>    int reserveSyncHeader(unsigned char *header, unsigned long size);
>  
> +  /*
> +   * check if an event should be semi synced and optionally
> +   * if it should report back position of SQL thread on slave
> +   *
> +   * return 0 - no semi sync
> +   *        1 - semi sync
> +   *        2 - semi sync and report exec position
> +   */
> +  int checkSyncReq(const LogPosPtr *log_pos);
> +
>    /* Update the sync bit in the packet header to indicate to the slave whether
>     * the master will wait for the reply of the event.  If semi-sync is switched
>     * off and we detect that the slave is catching up, we switch semi-sync on.
> @@ -592,6 +744,21 @@ class ReplSemiSyncMaster
>     * go off for that.
>     */
>    int resetMaster();
> +
> +  /**
> +   * wake potential slave-lag waiters
> +   *   called by binlog dump-thread(s)
> +   */
> +  void wake_slave_lag_waiters(ulonglong oldest_unapplied_tranx_commit_time_us);
> +
> +  /**
> +   * wait for slave lag to get below threshold
> +   *   called by user-thread(s)
> +   *
> +   * return 0 - success
> +   *        1 - timeout
> +   */
> +  int wait_slave_lag(ulong max_wait_time_sec);
>  };
>  
>  enum rpl_semi_sync_master_wait_point_t {
> @@ -621,6 +788,17 @@ extern unsigned long long rpl_semi_sync_master_trx_wait_num;
>  extern unsigned long long rpl_semi_sync_master_net_wait_time;
>  extern unsigned long long rpl_semi_sync_master_trx_wait_time;
>  
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_count;
> +extern unsigned long rpl_semi_sync_master_max_unacked_event_bytes;
> +extern unsigned long rpl_semi_sync_master_max_slave_lag;
> +extern unsigned long rpl_semi_sync_master_slave_lag_heartbeat_frequency_us;
> +extern unsigned long rpl_semi_sync_master_slave_lag_wait_sessions;
> +extern unsigned long long rpl_semi_sync_master_estimated_slave_lag;
> +
> +extern unsigned long rpl_semi_sync_master_avg_trx_slave_lag_wait_time;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_num;
> +extern unsigned long long rpl_semi_sync_master_trx_slave_lag_wait_time;
> +
>  /*
>    This indicates whether we should keep waiting if no semi-sync slave
>    is available.
> diff --git a/plugin/semisync/semisync_master_plugin.cc b/plugin/semisync/semisync_master_plugin.cc
> index 7bb0eea..3282158 100644
> --- a/plugin/semisync/semisync_master_plugin.cc
> +++ b/plugin/semisync/semisync_master_plugin.cc
> @@ -21,6 +21,11 @@
>  
>  static ReplSemiSyncMaster repl_semisync;
>  
> +// forward declaration
> +static inline ulong get_slave_lag_wait_timeout(THD* thd);
> +
> +static char rpl_semi_sync_master_group_commit = 0;
> +
>  C_MODE_START
>  
>  int repl_semi_report_binlog_update(Binlog_storage_param *param,
> @@ -31,6 +36,13 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
>  
>    if (repl_semisync.getMasterEnabled())
>    {
> +    if (rpl_semi_sync_master_group_commit &&
> +        ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> +    {
> +      /** there are transactions more coming... */
> +      return 0;
> +    }
> +
>      /*
>        Let us store the binlog file name and the position, so that
>        we know how long to wait for the binlog to the replicated to
> @@ -43,8 +55,11 @@ int repl_semi_report_binlog_update(Binlog_storage_param *param,
>    return error;
>  }
>  
> -int repl_semi_request_commit(Trans_param *param)
> +int repl_semi_before_commit(Trans_param *param, int *error)
>  {
> +  *error = repl_semisync.wait_slave_lag(
> +      get_slave_lag_wait_timeout(current_thd));
> +
>    return 0;
>  }
>  
> @@ -53,6 +68,14 @@ int repl_semi_report_binlog_sync(Binlog_storage_param *param,
>                                   my_off_t log_pos, uint32 flags)
>  {
>    int error= 0;
> +
> +  if (rpl_semi_sync_master_group_commit &&
> +      ((flags & BINLOG_GROUP_COMMIT_TRAILER) == 0))
> +  {
> +    /** there are transactions more coming... */
> +    return 0;
> +  }
> +
>    if (rpl_semi_sync_master_wait_point ==
>        SEMI_SYNC_MASTER_WAIT_POINT_AFTER_BINLOG_SYNC)
>    {
> @@ -100,7 +123,7 @@ int repl_semi_binlog_dump_start(Binlog_transmit_param *param,
>        Let's assume this semi-sync slave has already received all
>        binlog events before the filename and position it requests.
>      */
> -    repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos);
> +    repl_semisync.reportReplyBinlog(param->server_id, log_file, log_pos, NULL);
>    }
>    sql_print_information("Start %s binlog_dump to slave (server_id: %d), pos(%s, %lu)",
>  			semi_sync_slave ? "semi-sync" : "asynchronous",
> @@ -242,15 +265,72 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_master_trace_level,
>    &fix_rpl_semi_sync_master_trace_level, // update
>    32, 0, ~0UL, 1);
>  
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_count,
> +  rpl_semi_sync_master_max_unacked_event_count,
> +  PLUGIN_VAR_OPCMDARG,
> +  "Maximum unacked replication events",
> +  NULL, // check
> +  NULL, // update
> +  rpl_semi_sync_master_max_unacked_event_count, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_unacked_event_bytes,
> +  rpl_semi_sync_master_max_unacked_event_bytes,
> +  PLUGIN_VAR_OPCMDARG,
> +  "Maximum unacked replication bytes",
> +  NULL, // check
> +  NULL, // update
> +  rpl_semi_sync_master_max_unacked_event_bytes, 0, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_ULONG(max_slave_lag, rpl_semi_sync_master_max_slave_lag,
> +  PLUGIN_VAR_OPCMDARG,
> +  "Maximum allowed lag of fastest semi-sync slave (in seconds), "
> +  "checked before commit.",
> +  NULL, // check
> +  NULL, // update
> +  rpl_semi_sync_master_max_slave_lag, 0, ~0UL, 1);
> +
> +static MYSQL_THDVAR_ULONG(slave_lag_wait_timeout,
> +  PLUGIN_VAR_RQCMDARG,
> +  "Timeout in seconds a rw-transaction may wait for max slave lag before "
> +  "being rolled back.",
> +  NULL, NULL, 50, 1, 1024 * 1024 * 1024, 0);
> +
> +static MYSQL_SYSVAR_ULONG(
> +    slave_lag_heartbeat_frequency_us,
> +    rpl_semi_sync_master_slave_lag_heartbeat_frequency_us,
> +    PLUGIN_VAR_RQCMDARG,
> +    "Heartbeat frequency when slave-lag is enabled (in microseconds).",
> +    NULL, // check
> +    NULL, // update
> +    500000, /* 500 ms */
> +    1, ~0UL, 1);
> +
> +static MYSQL_SYSVAR_BOOL(group_commit, rpl_semi_sync_master_group_commit,
> +  PLUGIN_VAR_OPCMDARG,
> + "Group commit for semi sync",
> +  NULL, 			// check
> +  NULL,
> +  0);
> +
>  static SYS_VAR* semi_sync_master_system_vars[]= {
>    MYSQL_SYSVAR(enabled),
>    MYSQL_SYSVAR(wait_point),
>    MYSQL_SYSVAR(timeout),
>    MYSQL_SYSVAR(wait_no_slave),
>    MYSQL_SYSVAR(trace_level),
> +  MYSQL_SYSVAR(max_unacked_event_count),
> +  MYSQL_SYSVAR(max_unacked_event_bytes),
> +  MYSQL_SYSVAR(max_slave_lag),
> +  MYSQL_SYSVAR(slave_lag_wait_timeout),
> +  MYSQL_SYSVAR(slave_lag_heartbeat_frequency_us),
> +  MYSQL_SYSVAR(group_commit),
>    NULL,
>  };
>  
> +static inline ulong get_slave_lag_wait_timeout(THD* thd)
> +{
> +  return THDVAR(thd, slave_lag_wait_timeout);
> +}
>  
>  static void fix_rpl_semi_sync_master_timeout(MYSQL_THD thd,
>  				      SYS_VAR *var,
> @@ -297,6 +377,7 @@ Trans_observer trans_observer = {
>  
>    repl_semi_report_commit,	// after_commit
>    repl_semi_report_rollback,	// after_rollback
> +  repl_semi_before_commit,	// before commit
>  };
>  
>  Binlog_storage_observer storage_observer = {
> @@ -339,7 +420,11 @@ DEF_SHOW_FUNC(net_wait_time, SHOW_LONGLONG)
>  DEF_SHOW_FUNC(net_wait_num, SHOW_LONGLONG)
>  DEF_SHOW_FUNC(avg_net_wait_time, SHOW_LONG)
>  DEF_SHOW_FUNC(avg_trx_wait_time, SHOW_LONG)
> -
> +DEF_SHOW_FUNC(slave_lag_wait_sessions, SHOW_LONG)
> +DEF_SHOW_FUNC(estimated_slave_lag, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_time, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(trx_slave_lag_wait_num, SHOW_LONGLONG)
> +DEF_SHOW_FUNC(avg_trx_slave_lag_wait_time, SHOW_LONG)
>  
>  /* plugin status variables */
>  static SHOW_VAR semi_sync_master_status_vars[]= {
> @@ -385,32 +470,55 @@ static SHOW_VAR semi_sync_master_status_vars[]= {
>    {"Rpl_semi_sync_master_net_avg_wait_time",
>     (char*) &SHOW_FNAME(avg_net_wait_time),
>     SHOW_SIMPLE_FUNC},
> +  {"Rpl_semi_sync_master_slave_lag_wait_sessions",
> +   (char*) &SHOW_FNAME(slave_lag_wait_sessions),
> +   SHOW_SIMPLE_FUNC},
> +  {"Rpl_semi_sync_master_estimated_slave_lag",
> +   (char*) &SHOW_FNAME(estimated_slave_lag),
> +   SHOW_SIMPLE_FUNC},
> +  {"Rpl_semi_sync_master_tx_slave_lag_wait_time",
> +   (char*) &SHOW_FNAME(trx_slave_lag_wait_time),
> +   SHOW_SIMPLE_FUNC},
> +  {"Rpl_semi_sync_master_tx_slave_lag_waits",
> +   (char*) &SHOW_FNAME(trx_slave_lag_wait_num),
> +   SHOW_SIMPLE_FUNC},
> +  {"Rpl_semi_sync_master_tx_avg_slave_lag_wait_time",
> +   (char*) &SHOW_FNAME(avg_trx_slave_lag_wait_time),
> +   SHOW_SIMPLE_FUNC},
>    {NULL, NULL, SHOW_LONG},
>  };
>  
>  #ifdef HAVE_PSI_INTERFACE
>  PSI_mutex_key key_ss_mutex_LOCK_binlog_;
> +PSI_mutex_key key_ss_mutex_LOCK_slave_lag_;
>  
>  static PSI_mutex_info all_semisync_mutexes[]=
>  {
> -  { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0}
> +  { &key_ss_mutex_LOCK_binlog_, "LOCK_binlog_", 0 },
> +  { &key_ss_mutex_LOCK_slave_lag_, "LOCK_slave_lag_", 0 }
>  };
>  
>  PSI_cond_key key_ss_cond_COND_binlog_send_;
> +PSI_cond_key key_ss_cond_COND_slave_lag_;
>  
>  static PSI_cond_info all_semisync_conds[]=
>  {
> -  { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0}
> +  { &key_ss_cond_COND_binlog_send_, "COND_binlog_send_", 0 },
> +  { &key_ss_cond_COND_slave_lag_, "COND_slave_lag_", 0 }
>  };
>  #endif /* HAVE_PSI_INTERFACE */
>  
>  PSI_stage_info stage_waiting_for_semi_sync_ack_from_slave=
>  { 0, "Waiting for semi-sync ACK from slave", 0};
>  
> +PSI_stage_info stage_waiting_for_semi_sync_slave_lag=
> +{ 0, "Waiting for semi-sync slave lag", 0};
> +
>  #ifdef HAVE_PSI_INTERFACE
>  PSI_stage_info *all_semisync_stages[]=
>  {
> -  & stage_waiting_for_semi_sync_ack_from_slave
> +  & stage_waiting_for_semi_sync_ack_from_slave,
> +  & stage_waiting_for_semi_sync_slave_lag
>  };
>  
>  static void init_semisync_psi_keys(void)
> @@ -492,4 +600,3 @@ maria_declare_plugin(semisync_master)
>    MariaDB_PLUGIN_MATURITY_GAMMA
>  }
>  maria_declare_plugin_end;
> -
> diff --git a/plugin/semisync/semisync_slave.cc b/plugin/semisync/semisync_slave.cc
> index 5f98472..839e0cc 100644
> --- a/plugin/semisync/semisync_slave.cc
> +++ b/plugin/semisync/semisync_slave.cc
> @@ -20,6 +20,7 @@
>  char rpl_semi_sync_slave_enabled;
>  char rpl_semi_sync_slave_status= 0;
>  unsigned long rpl_semi_sync_slave_trace_level;
> +char rpl_semi_sync_slave_lag_enabled= 0;
>  
>  int ReplSemiSyncSlave::initObject()
>  {
> @@ -42,7 +43,7 @@ int ReplSemiSyncSlave::initObject()
>  
>  int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
>                                        unsigned long total_len,
> -                                      bool  *need_reply,
> +                                      unsigned char *need_reply,
>                                        const char **payload,
>                                        unsigned long *payload_len)
>  {
> @@ -52,7 +53,7 @@ int ReplSemiSyncSlave::slaveReadSyncHeader(const char *header,
>  
>    if ((unsigned char)(header[0]) == kPacketMagicNum)
>    {
> -    *need_reply  = (header[1] & kPacketFlagSync);
> +    *need_reply  = (header[1] & (kPacketFlagSync | kPacketFlagSyncAndReport));
>      *payload_len = total_len - 2;
>      *payload     = header + 2;
>  
> @@ -95,16 +96,20 @@ int ReplSemiSyncSlave::slaveStop(Binlog_relay_IO_param *param)
>    return 0;
>  }
>  
> -int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
> -                                 const char *binlog_filename,
> -                                 my_off_t binlog_filepos)
> +int ReplSemiSyncSlave::slaveReply(unsigned char header_byte,
> +                                  MYSQL *mysql,
> +                                  const char *binlog_filename,
> +                                  my_off_t binlog_filepos,
> +                                  Master_info * mi)
>  {
>    const char *kWho = "ReplSemiSyncSlave::slaveReply";
>    NET *net= &mysql->net;
> -  uchar reply_buffer[REPLY_MAGIC_NUM_LEN
> -                     + REPLY_BINLOG_POS_LEN
> -                     + REPLY_BINLOG_NAME_LEN];
> +  uchar reply_buffer[REPLY_MAGIC_NUM_LEN +
> +                     2 * ( REPLY_BINLOG_POS_LEN +
> +                           REPLY_BINLOG_NAME_LEN +
> +                           /* '\0' */ 1) ];
>    int  reply_res, name_len = strlen(binlog_filename);
> +  int  msg_len = name_len + REPLY_BINLOG_NAME_OFFSET;
>  
>    function_enter(kWho);
>  
> @@ -119,10 +124,29 @@ int ReplSemiSyncSlave::slaveReply(MYSQL *mysql,
>      sql_print_information("%s: reply (%s, %lu)", kWho,
>                            binlog_filename, (ulong)binlog_filepos);
>  
> +  if (header_byte & kPacketFlagSyncAndReport)
> +  {
> +    /**
> +     * master requests that we also report back SQL-thread position
> +     */
> +
> +    // where to store sql filename/position
> +    char *bufptr = (char*)reply_buffer + msg_len;
> +    bufptr[0] = 0; // '\0' terminate previous filename
> +    bufptr++;
> +
> +    my_off_t sql_file_pos;
> +    // get file/position and store the filename directly info bufptr+8
> +    size_t name_len2 = get_master_log_pos(mi, bufptr + 8, &sql_file_pos);
> +    int8store(bufptr, sql_file_pos); // store position
> +
> +    msg_len += /* '\0' */ 1 + /* position */ 8 + name_len2 + /* '\0' */ 1;
> +  }
> +
>    net_clear(net, 0);
>    /* Send the reply. */
> -  reply_res = my_net_write(net, reply_buffer,
> -                           name_len + REPLY_BINLOG_NAME_OFFSET);
> +  reply_res = my_net_write(net, reply_buffer, msg_len);
> +
>    if (!reply_res)
>    {
>      reply_res = net_flush(net);
> diff --git a/plugin/semisync/semisync_slave.h b/plugin/semisync/semisync_slave.h
> index 1bf8cf3..c91847d 100644
> --- a/plugin/semisync/semisync_slave.h
> +++ b/plugin/semisync/semisync_slave.h
> @@ -60,23 +60,30 @@ class ReplSemiSyncSlave
>     * Return:
>     *  0: success;  non-zero: error
>     */
> -  int slaveReadSyncHeader(const char *header, unsigned long total_len, bool *need_reply,
> +  int slaveReadSyncHeader(const char *header, unsigned long total_len,
> +                          unsigned char *need_reply_byte,
>                            const char **payload, unsigned long *payload_len);
>  
>    /* A slave replies to the master indicating its replication process.  It
>     * indicates that the slave has received all events before the specified
>     * binlog position.
> -   * 
> +   *
>     * Input:
> +   *  need_reply_byte  - (IN)  the header byte
>     *  mysql            - (IN)  the mysql network connection
>     *  binlog_filename  - (IN)  the reply point's binlog file name
>     *  binlog_filepos   - (IN)  the reply point's binlog file offset
> +   *  master_info      - (IN)  the master info struct so that we can get more
> +   *                           info if needed
>     *
>     * Return:
>     *  0: success;  non-zero: error
>     */
> -  int slaveReply(MYSQL *mysql, const char *binlog_filename,
> -                 my_off_t binlog_filepos);
> +  int slaveReply(unsigned char need_reply_byte,
> +                 MYSQL *mysql,
> +                 const char *binlog_filename,
> +                 my_off_t binlog_filepos,
> +                 Master_info* master_info);
>  
>    int slaveStart(Binlog_relay_IO_param *param);
>    int slaveStop(Binlog_relay_IO_param *param);
> @@ -93,5 +100,6 @@ class ReplSemiSyncSlave
>  extern char rpl_semi_sync_slave_enabled;
>  extern unsigned long rpl_semi_sync_slave_trace_level;
>  extern char rpl_semi_sync_slave_status;
> +extern char rpl_semi_sync_slave_lag_enabled;
>  
>  #endif /* SEMISYNC_SLAVE_H */
> diff --git a/plugin/semisync/semisync_slave_plugin.cc b/plugin/semisync/semisync_slave_plugin.cc
> index 572ead2..0bf03be 100644
> --- a/plugin/semisync/semisync_slave_plugin.cc
> +++ b/plugin/semisync/semisync_slave_plugin.cc
> @@ -28,7 +28,7 @@ static ReplSemiSyncSlave repl_semisync;
>    event read is the last event of a transaction. And the value is
>    checked in repl_semi_slave_queue_event.
>  */
> -bool semi_sync_need_reply= false;
> +unsigned char semi_sync_need_reply= 0;
>  
>  C_MODE_START
>  
> @@ -81,6 +81,23 @@ int repl_semi_slave_request_dump(Binlog_relay_IO_param *param,
>      return 1;
>    }
>    mysql_free_result(mysql_store_result(mysql));
> +
> +  if (rpl_semi_sync_slave_lag_enabled)
> +  {
> +    char buf[100];
> +    /*
> +      Tell master that we can do exec-position reporting
> +    */
> +    snprintf(buf, sizeof(buf), "SET @%s= 1",
> +             ReplSemiSyncBase::kRplSemiSyncSlaveReportExec);
> +    if (mysql_real_query(mysql, buf, strlen(buf)))
> +    {
> +      sql_print_error("query: %s on master failed", buf);
> +      return 1;
> +    }
> +    mysql_free_result(mysql_store_result(mysql));
> +  }
> +
>    rpl_semi_sync_slave_status= 1;
>    return 0;
>  }
> @@ -110,9 +127,11 @@ int repl_semi_slave_queue_event(Binlog_relay_IO_param *param,
>        should not cause the slave IO thread to stop, and the error
>        messages are already reported.
>      */
> -    (void) repl_semisync.slaveReply(param->mysql,
> +    (void) repl_semisync.slaveReply(semi_sync_need_reply,
> +                                    param->mysql,
>                                      param->master_log_name,
> -                                    param->master_log_pos);
> +                                    param->master_log_pos,
> +                                    param->mi);
>    }
>    return 0;
>  }
> @@ -164,9 +183,17 @@ static MYSQL_SYSVAR_ULONG(trace_level, rpl_semi_sync_slave_trace_level,
>    &fix_rpl_semi_sync_trace_level, // update
>    32, 0, ~0UL, 1);
>  
> +static MYSQL_SYSVAR_BOOL(lag_enabled, rpl_semi_sync_slave_lag_enabled,
> +  PLUGIN_VAR_OPCMDARG,
> +  "Enable semi-synchronous replication slave lag reporting. ",
> +  NULL, // check
> +  NULL, // update
> +  0);
> +
>  static SYS_VAR* semi_sync_slave_system_vars[]= {
>    MYSQL_SYSVAR(enabled),
>    MYSQL_SYSVAR(trace_level),
> +  MYSQL_SYSVAR(lag_enabled),
>    NULL,
>  };
>  
> @@ -230,4 +257,3 @@ maria_declare_plugin(semisync_slave)
>    MariaDB_PLUGIN_MATURITY_GAMMA
>  }
>  maria_declare_plugin_end;
> -
> diff --git a/sql/handler.cc b/sql/handler.cc
> index 3ca9ec3..3e6cd65 100644
> --- a/sql/handler.cc
> +++ b/sql/handler.cc
> @@ -1364,12 +1364,30 @@ int ha_commit_trans(THD *thd, bool all)
>    uint rw_ha_count= ha_check_and_coalesce_trx_read_only(thd, ha_info, all);
>    /* rw_trans is TRUE when we in a transaction changing data */
>    bool rw_trans= is_real_trans && (rw_ha_count > 0);
> +  bool mdl_request_initialized= false;
>    MDL_request mdl_request;
>    DBUG_PRINT("info", ("is_real_trans: %d  rw_trans:  %d  rw_ha_count: %d",
>                        is_real_trans, rw_trans, rw_ha_count));
>  
>    if (rw_trans)
>    {
> +    /* check READ-ONLY just before before_commit hook to decrease likelihood
> +     * of having threads hanging waiting for slave-lag only to be aborted
> +     * due to read-only.
> +     */
> +    if (opt_readonly &&
> +        !(thd->security_ctx->master_access & SUPER_ACL) &&
> +        !thd->slave_thread)
> +    {
> +      my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--read-only");
> +      goto err;
> +    }
> +
> +    if (RUN_HOOK(transaction, before_commit, (thd)))
> +    {
> +      goto err;
> +    }
> +
>      /*
>        Acquire a metadata lock which will ensure that COMMIT is blocked
>        by an active FLUSH TABLES WITH READ LOCK (and vice versa:
> @@ -1378,6 +1396,7 @@ int ha_commit_trans(THD *thd, bool all)
>        We allow the owner of FTWRL to COMMIT; we assume that it knows
>        what it does.
>      */
> +    mdl_request_initialized= true;
>      mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
>                       MDL_EXPLICIT);
>  
> @@ -1486,7 +1505,7 @@ int ha_commit_trans(THD *thd, bool all)
>      ha_rollback_trans(thd, all);
>  
>  end:
> -  if (rw_trans && mdl_request.ticket)
> +  if (rw_trans && mdl_request_initialized && mdl_request.ticket)
>    {
>      /*
>        We do not always immediately release transactional locks
> diff --git a/sql/replication.h b/sql/replication.h
> index 4731c22..309bdb4 100644
> --- a/sql/replication.h
> +++ b/sql/replication.h
> @@ -110,6 +110,21 @@ typedef struct Trans_observer {
>       @retval 1 Failure
>    */
>    int (*after_rollback)(Trans_param *param);
> +
> +  /**
> +     This callback is called before transaction commit
> +     If function does not return *error == 0 transaction will
> +     not be committed but error code will be returned to client
> +
> +     @note *error!=0 and return code 0 shall be used by plugin to signal
> +     that transaction should be aborted.
> +     If returning non-zero transaction will also be aborted and an error
> +     will be printed to error log.
> +
> +     @retval 0 Sucess
> +     @retval non-zero error
> +  */
> +  int (*before_commit)(Trans_param *param, int *error);
>  } Trans_observer;
>  
>  /**
> @@ -294,6 +309,8 @@ enum Binlog_relay_IO_flags {
>  };
>  
>  
> +class Master_info;
> +
>  /**
>    Replication binlog relay IO observer parameter
>  */
> @@ -309,8 +326,20 @@ typedef struct Binlog_relay_IO_param {
>    my_off_t master_log_pos;
>  
>    MYSQL *mysql;                        /* the connection to master */
> +
> +  Master_info * mi;                    /* master info handle */
>  } Binlog_relay_IO_param;
>  
> +
> +/* get the master log given a Master_info
> + * and store it in filename_buf/filepos
> + * return length of filename (excluding \0)
> + *
> + * note: filename_buf should be a minimum FN_REFLEN
> + */
> +size_t get_master_log_pos(const Master_info *mi,
> +                          char *filename_buf, my_off_t *filepos);
> +
>  /**
>     Observes and extends the service of slave IO thread.
>  */
> @@ -561,7 +590,21 @@ int get_user_var_str(const char *name,
>                       char *value, unsigned long len,
>                       unsigned int precision, int *null_value);
>  
> -  
> +
> +/**
> +   Set or replace the value of user variable as to an ulonglong
> +
> +   @param name      user variable name
> +   @param value     the value
> +   @param old_value pointer to where old value will be stored (or NULL)
> +
> +   @retval  0 Success, no prior value found
> +   @retval  1 Success, old_value populated
> +   @retval -1 Fail
> +*/
> +int set_user_var_int(const char *name,
> +                     long long int value,
> +                     long long int *old_value);
>  
>  #ifdef __cplusplus
>  }
> diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc
> index 3962600..209a809 100644
> --- a/sql/rpl_handler.cc
> +++ b/sql/rpl_handler.cc
> @@ -23,6 +23,7 @@
>  #include "rpl_filter.h"
>  #include <my_dir.h>
>  #include "rpl_handler.h"
> +#include "sql_prepare.h"
>  
>  Trans_delegate *transaction_delegate;
>  Binlog_storage_delegate *binlog_storage_delegate;
> @@ -88,6 +89,42 @@ int get_user_var_str(const char *name, char *value,
>    return 0;
>  }
>  
> +int set_user_var_int(const char *name,
> +                     long long int value,
> +                     long long int *old_value)
> +{
> +  THD* thd= current_thd;
> +  bool null_val;
> +  user_var_entry *entry=
> +      (user_var_entry*) my_hash_search(&thd->user_vars,
> +                                       (uchar*) name, strlen(name));
> +  if (entry != NULL)
> +  {
> +    if (old_value != NULL)
> +      *old_value= entry->val_int(&null_val);
> +  }
> +
> +  Ed_connection con(thd);
> +
> +  char buf[256];
> +  int res= snprintf(buf, sizeof(buf), "SET @%s=%lld", name, value);
> +  if (/* error */ res < 0 ||
> +      /* truncated */ res >= sizeof(buf))
> +  {
> +    return -1;
> +  }
> +
> +  LEX_STRING str;
> +  lex_string_set(&str, buf);
> +
> +  if (con.execute_direct(str))
> +  {
> +    return -1;
> +  }
> +
> +  return entry == NULL ? 0 : 1;
> +}
> +
>  int delegates_init()
>  {
>    static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
> @@ -249,6 +286,17 @@ int Trans_delegate::after_rollback(THD *thd, bool all)
>    return ret;
>  }
>  
> +int Trans_delegate::before_commit(THD *thd)
> +{
> +  int ret= 0, error= 0;
> +  Trans_param param;
> +  param.flags= 0;
> +  param.log_file= 0;
> +  param.log_pos= 0;
> +  FOREACH_OBSERVER(ret, before_commit, thd, (&param, &error));
> +  return error;
> +}
> +
>  int Binlog_storage_delegate::after_flush(THD *thd,
>                                           const char *log_file,
>                                           my_off_t log_pos,
> @@ -374,17 +422,19 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
>  
>  int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
>                                                  String *packet,
> -                                                const char *log_file,
> +                                                const char *log_file_path,
>                                                  my_off_t log_pos)
>  {
>    Binlog_transmit_param param;
>    param.flags= flags;
>  
>    int ret= 0;
> +  const char* log_file_name= log_file_path != NULL ?
> +      log_file_path + dirname_length(log_file_path) : NULL;
>    FOREACH_OBSERVER(ret, before_send_event, false,
>                     (&param, (uchar *)packet->c_ptr(),
>                      packet->length(),
> -                    log_file+dirname_length(log_file), log_pos));
> +                    log_file_name, log_pos));
>    return ret;
>  }
>  
> @@ -414,6 +464,7 @@ int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
>  void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
>                                            Master_info *mi)
>  {
> +  param->mi = mi;
>    param->mysql= mi->mysql;
>    param->user= mi->user;
>    param->host= mi->host;
> @@ -540,6 +591,20 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
>  {
>    return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
>  }
> +
> +/* get master log pos for a Master_info struct */
> +size_t get_master_log_pos(const Master_info* mi,
> +                          char *filename_buf, my_off_t *filepos)
> +{
> +  mysql_mutex_t *mutex= &mi->rli.data_lock;
> +
> +  mysql_mutex_lock(mutex);
> +  *filepos= mi->rli.group_master_log_pos;
> +  strncpy(filename_buf, mi->rli.group_master_log_name, FN_REFLEN);
> +  mysql_mutex_unlock(mutex);
> +  return strnlen(filename_buf, FN_REFLEN);
> +}
> +
>  #else
>  int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
>  {
> @@ -560,4 +625,13 @@ int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
>  {
>    return 0;
>  }
> +
> +size_t get_master_log_pos(const Master_info* mi,
> +                          char *filename_buf, my_off_t *filepos)
> +{
> +  *filepos= 0;
> +  filename_buf[0]= 0;
> +  return 0;
> +}
> +
>  #endif /* HAVE_REPLICATION */
> diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h
> index afcfd9d..5119ee4 100644
> --- a/sql/rpl_handler.h
> +++ b/sql/rpl_handler.h
> @@ -142,7 +142,7 @@ class Trans_delegate
>    :public Delegate {
>  public:
>    typedef Trans_observer Observer;
> -  int before_commit(THD *thd, bool all);
> +  int before_commit(THD *thd);
>    int before_rollback(THD *thd, bool all);
>    int after_commit(THD *thd, bool all);
>    int after_rollback(THD *thd, bool all);
> diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
> index 56300c6..d60a122 100644
> --- a/sql/rpl_rli.h
> +++ b/sql/rpl_rli.h
> @@ -154,7 +154,9 @@ class Relay_log_info : public Slave_reporting_capability
>      standard lock acquisition order to avoid deadlocks:
>      run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
>    */
> -  mysql_mutex_t data_lock, run_lock;
> +  mutable mysql_mutex_t data_lock;
> +  mysql_mutex_t run_lock;
> +
>    /*
>      start_cond is broadcast when SQL thread is started
>      stop_cond - when stopped
> diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
> index d9ae6ca..9662058 100644
> --- a/sql/sql_repl.cc
> +++ b/sql/sql_repl.cc
> @@ -818,6 +818,13 @@ static int send_heartbeat_event(binlog_send_info *info,
>      packet->append(b, sizeof(b));
>    }
>  
> +  if (RUN_HOOK(binlog_transmit, before_send_event,
> +               (info->thd, info->flags, packet, 0, 0)))
> +  {
> +    info->error= ER_UNKNOWN_ERROR;
> +    DBUG_RETURN(-1);
> +  }
> +
>    if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
>        net_flush(net))
>    {
> @@ -825,6 +832,13 @@ static int send_heartbeat_event(binlog_send_info *info,
>      DBUG_RETURN(-1);
>    }
>  
> +  if (RUN_HOOK(binlog_transmit, after_send_event,
> +               (info->thd, info->flags, packet)))
> +  {
> +    info->error= ER_UNKNOWN_ERROR;
> +    DBUG_RETURN(-1);
> +  }
> +
>    DBUG_RETURN(0);
>  }
>