← Back to team overview

maria-developers team mailing list archive

Fwd: [andrei.elkin@xxxxxxxxxxx] 30a3ee1b8ce: MDEV-21469: Implement crash-safe logging of the user XA

 

Kristian,

Fyi, here is the XA replication event recovery part.
It's largely coded by Sujatha Sivakumar. The latest patch resides in bb-10.5-MDEV_21469.

Cheers,

Andrei

revision-id: 30a3ee1b8ce98ea33dfc0595207bf467cdb91def (mariadb-10.5.0-285-g30a3ee1b8ce)
parent(s): 77f4a1938f2a069174e07b3453fa0f99ba171a2e
author: Sujatha
committer: Andrei Elkin
timestamp: 2020-03-04 14:45:01 +0200
message:

MDEV-21469: Implement crash-safe logging of the user XA

Description: Make XA PREPARE, XA COMMIT and XA ROLLBACK statements crash-safe.

Implementation:
In order to ensure consistent replication XA statements like XA PREPARE, XA
COMMIT and XA ROLLBACK are firstly written into the binary log and then to
storage engine. In a case server crashes after writing to binary log but not
in storage engine it will lead to inconsistent state.

In order to make both binary log and engine to be consistent crash recovery
needs to be initiated. During crash recovery binary log needs to be parsed to
identify the transactions which are present only in binary log and not present
in engine. These transaction are resubmitted to engine to make it consistent
along with binary log.

---
 .../r/binlog_xa_multi_binlog_crash_recovery.result |  40 +++
 .../t/binlog_xa_multi_binlog_crash_recovery.test   |  85 +++++
 .../suite/rpl/r/rpl_xa_commit_crash_safe.result    |  50 +++
 .../suite/rpl/r/rpl_xa_event_apply_failure.result  |  60 ++++
 .../rpl/r/rpl_xa_prepare_commit_prepare.result     |  48 +++
 .../suite/rpl/r/rpl_xa_prepare_crash_safe.result   |  62 ++++
 .../rpl/r/rpl_xa_rollback_commit_crash_safe.result |  47 +++
 .../suite/rpl/t/rpl_xa_commit_crash_safe.test      |  98 ++++++
 .../suite/rpl/t/rpl_xa_event_apply_failure.test    | 119 +++++++
 .../suite/rpl/t/rpl_xa_prepare_commit_prepare.test |  95 ++++++
 .../suite/rpl/t/rpl_xa_prepare_crash_safe.test     | 117 +++++++
 .../rpl/t/rpl_xa_rollback_commit_crash_safe.test   |  97 ++++++
 sql/handler.cc                                     |  29 +-
 sql/handler.h                                      |  12 +-
 sql/log.cc                                         | 379 +++++++++++++++++++--
 sql/log.h                                          |   8 +-
 sql/log_event.cc                                   |  18 +-
 sql/log_event.h                                    |  15 +-
 sql/mysqld.cc                                      |   4 +-
 sql/xa.cc                                          |   4 +
 20 files changed, 1353 insertions(+), 34 deletions(-)

diff --git a/mysql-test/suite/binlog/r/binlog_xa_multi_binlog_crash_recovery.result b/mysql-test/suite/binlog/r/binlog_xa_multi_binlog_crash_recovery.result
new file mode 100644
index 00000000000..a09472aa94c
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_xa_multi_binlog_crash_recovery.result
@@ -0,0 +1,40 @@
+RESET MASTER;
+CREATE TABLE t1 (a INT PRIMARY KEY, b MEDIUMTEXT) ENGINE=Innodb;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+connect con1,localhost,root,,;
+SET DEBUG_SYNC= "simulate_hang_after_binlog_prepare SIGNAL con1_ready WAIT_FOR con1_go";
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+XA START 'xa1';
+INSERT INTO t1 SET a=1;
+XA END 'xa1';
+XA PREPARE 'xa1';;
+connection default;
+SET DEBUG_SYNC= "now WAIT_FOR con1_ready";
+FLUSH LOGS;
+FLUSH LOGS;
+FLUSH LOGS;
+show binary logs;
+Log_name	File_size
+master-bin.000001	#
+master-bin.000002	#
+master-bin.000003	#
+master-bin.000004	#
+include/show_binlog_events.inc
+Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+master-bin.000004	#	Format_desc	#	#	SERVER_VERSION, BINLOG_VERSION
+master-bin.000004	#	Gtid_list	#	#	[#-#-#]
+master-bin.000004	#	Binlog_checkpoint	#	#	master-bin.000001
+SET DEBUG_SYNC= "now SIGNAL con1_go";
+connection con1;
+ERROR HY000: Lost connection to MySQL server during query
+connection default;
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+1	3	0	xa1
+XA COMMIT 'xa1';
+SELECT * FROM t1;
+a	b
+1	NULL
+connection default;
+DROP TABLE t1;
+SET debug_sync = 'reset';
diff --git a/mysql-test/suite/binlog/t/binlog_xa_multi_binlog_crash_recovery.test b/mysql-test/suite/binlog/t/binlog_xa_multi_binlog_crash_recovery.test
new file mode 100644
index 00000000000..aa8d3d04fc7
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_xa_multi_binlog_crash_recovery.test
@@ -0,0 +1,85 @@
+# ==== Purpose ====
+#
+# Test verifies that XA crash recovery works fine across multiple binary logs.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate an explicit XA transaction. Using debug simulation hold the
+#        execution of XA PREPARE statement after the XA PREPARE is written to
+#        the binary log. With this the prepare will not be done in engine.
+#    1 - By executing FLUSH LOGS generate multiple binary logs.
+#    2 - Now make the server to disappear at this point.
+#    3 - Restart the server. During recovery the XA PREPARE from the binary
+#        log will be read. It is cross checked with engine. Since it is not
+#        present in engine it will be executed once again.
+#    4 - When server is up execute XA RECOVER to check that the XA is
+#        prepared in engine as well.
+#    5 - XA COMMIT the transaction and check the validity of the data.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+#
+
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/have_log_bin.inc
+
+RESET MASTER;
+
+CREATE TABLE t1 (a INT PRIMARY KEY, b MEDIUMTEXT) ENGINE=Innodb;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+
+connect(con1,localhost,root,,);
+SET DEBUG_SYNC= "simulate_hang_after_binlog_prepare SIGNAL con1_ready WAIT_FOR con1_go";
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+XA START 'xa1';
+INSERT INTO t1 SET a=1;
+XA END 'xa1';
+--send XA PREPARE 'xa1';
+
+connection default;
+SET DEBUG_SYNC= "now WAIT_FOR con1_ready";
+FLUSH LOGS;
+FLUSH LOGS;
+FLUSH LOGS;
+
+--source include/show_binary_logs.inc
+--let $binlog_file= master-bin.000004
+--let $binlog_start= 4
+--source include/show_binlog_events.inc
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET DEBUG_SYNC= "now SIGNAL con1_go";
+--source include/wait_until_disconnected.inc
+
+--connection con1
+--error 2013
+--reap
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart
+EOF
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+XA RECOVER;
+XA COMMIT 'xa1';
+
+SELECT * FROM t1;
+
+# Clean up.
+connection default;
+DROP TABLE t1;
+SET debug_sync = 'reset';
diff --git a/mysql-test/suite/rpl/r/rpl_xa_commit_crash_safe.result b/mysql-test/suite/rpl/r/rpl_xa_commit_crash_safe.result
new file mode 100644
index 00000000000..27d043270ea
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_commit_crash_safe.result
@@ -0,0 +1,50 @@
+include/master-slave.inc
+[connection master]
+connect  master2,localhost,root,,;
+connection master;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+connection slave;
+include/stop_slave.inc
+connection master1;
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_commit";
+XA COMMIT 'xa2';
+ERROR HY000: Lost connection to MySQL server during query
+connection master1;
+connection master;
+connection default;
+connection server_1;
+connection master;
+connection slave;
+include/start_slave.inc
+connection master;
+SELECT * FROM t;
+f
+20
+40
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+XA COMMIT 'xa2';
+ERROR XAE04: XAER_NOTA: Unknown XID
+SELECT * FROM t;
+f
+20
+40
+connection slave;
+SELECT * FROM t;
+f
+20
+40
+connection master;
+DROP TABLE t;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_event_apply_failure.result b/mysql-test/suite/rpl/r/rpl_xa_event_apply_failure.result
new file mode 100644
index 00000000000..547a0aae9a6
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_event_apply_failure.result
@@ -0,0 +1,60 @@
+include/master-slave.inc
+[connection master]
+connect  master2,localhost,root,,;
+connection master;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CALL mtr.add_suppression("Failed to execute binlog query event");
+CALL mtr.add_suppression("Recovery: Error .Out of memory..");
+CALL mtr.add_suppression("Crash recovery failed.");
+CALL mtr.add_suppression("Can.t init tc log");
+CALL mtr.add_suppression("Aborting");
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+connection slave;
+include/stop_slave.inc
+connection master1;
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_commit";
+XA COMMIT 'xa2';
+ERROR HY000: Lost connection to MySQL server during query
+connection master1;
+connection master;
+connection default;
+connection default;
+connection master;
+*** must be no 'xa2' commit seen, as it's still prepared:
+SELECT * FROM t;
+f
+20
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+1	3	0	xa2
+SET GLOBAL DEBUG_DBUG="";
+SET SQL_LOG_BIN=0;
+XA COMMIT 'xa2';
+SET SQL_LOG_BIN=1;
+connection server_1;
+connection master;
+connection slave;
+include/start_slave.inc
+connection master;
+SELECT * FROM t;
+f
+20
+40
+connection slave;
+SELECT * FROM t;
+f
+20
+40
+connection master;
+DROP TABLE t;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_prepare_commit_prepare.result b/mysql-test/suite/rpl/r/rpl_xa_prepare_commit_prepare.result
new file mode 100644
index 00000000000..9ba24716639
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_prepare_commit_prepare.result
@@ -0,0 +1,48 @@
+include/master-slave.inc
+[connection master]
+connect  master2,localhost,root,,;
+connection master;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+connection slave;
+include/stop_slave.inc
+connection master1;
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+XA PREPARE 'xa2';
+ERROR HY000: Lost connection to MySQL server during query
+connection master1;
+connection master;
+connection default;
+connection server_1;
+connection master;
+connection slave;
+include/start_slave.inc
+connection master;
+SELECT * FROM t;
+f
+20
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+1	3	0	xa2
+XA COMMIT 'xa2';
+SELECT * FROM t;
+f
+20
+40
+connection slave;
+SELECT * FROM t;
+f
+20
+40
+connection master;
+DROP TABLE t;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_prepare_crash_safe.result b/mysql-test/suite/rpl/r/rpl_xa_prepare_crash_safe.result
new file mode 100644
index 00000000000..99baf59a3c1
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_prepare_crash_safe.result
@@ -0,0 +1,62 @@
+include/master-slave.inc
+[connection master]
+connect  master2,localhost,root,,;
+connection master;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CALL mtr.add_suppression("Found 2 prepared XA transactions");
+CALL mtr.add_suppression("Found 3 prepared XA transactions");
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+connection slave;
+include/stop_slave.inc
+connection master1;
+use test;
+xa start 'xa2';
+insert into t values (30);
+xa end 'xa2';
+SET DEBUG_SYNC="simulate_hang_after_binlog_prepare SIGNAL reached WAIT_FOR go";
+xa prepare 'xa2';
+connection master2;
+XA START 'xa3';
+INSERT INTO t VALUES (40);
+XA END 'xa3';
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+XA PREPARE 'xa3';
+ERROR HY000: Lost connection to MySQL server during query
+connection master1;
+ERROR HY000: Lost connection to MySQL server during query
+connection master;
+connection default;
+connection server_1;
+connection master;
+connection slave;
+include/start_slave.inc
+connection master;
+SELECT * FROM t;
+f
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+1	3	0	xa3
+1	3	0	xa1
+1	3	0	xa2
+XA COMMIT 'xa1';
+XA COMMIT 'xa2';
+XA COMMIT 'xa3';
+SELECT * FROM t;
+f
+20
+30
+40
+connection slave;
+SELECT * FROM t;
+f
+20
+30
+40
+connection master;
+DROP TABLE t;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_rollback_commit_crash_safe.result b/mysql-test/suite/rpl/r/rpl_xa_rollback_commit_crash_safe.result
new file mode 100644
index 00000000000..bc48c84e1c7
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_xa_rollback_commit_crash_safe.result
@@ -0,0 +1,47 @@
+include/master-slave.inc
+[connection master]
+connect  master2,localhost,root,,;
+connection master;
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+connection slave;
+include/stop_slave.inc
+connection master1;
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_rollback";
+XA ROLLBACK 'xa2';
+ERROR HY000: Lost connection to MySQL server during query
+connection master1;
+connection master;
+connection default;
+connection server_1;
+connection master;
+connection slave;
+include/start_slave.inc
+connection master;
+SELECT * FROM t;
+f
+20
+XA RECOVER;
+formatID	gtrid_length	bqual_length	data
+XA ROLLBACK 'xa2';
+ERROR XAE04: XAER_NOTA: Unknown XID
+SELECT * FROM t;
+f
+20
+connection slave;
+SELECT * FROM t;
+f
+20
+connection master;
+DROP TABLE t;
+connection slave;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_xa_commit_crash_safe.test b/mysql-test/suite/rpl/t/rpl_xa_commit_crash_safe.test
new file mode 100644
index 00000000000..b9e3b0d3d0d
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_commit_crash_safe.test
@@ -0,0 +1,98 @@
+# ==== Purpose ====
+#
+# Test verifies that XA COMMIT statements are crash safe.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate 2 explicit XA transactions. 'xa1' and 'xa2'.
+#        'xa1' will be prepared and committed.
+#    1 - For 'xa2' let the XA COMMIT be done in binary log and crash the
+#        server so that it is not committed in engine.
+#    2 - Restart the server. The recovery code should successfully recover
+#        'xa2'. The COMMIT should be executed during recovery.
+#    3 - Check the data in table. Both rows should be present in table.
+#    4 - Trying to commit 'xa2' should report unknown 'XA' error as COMMIT is
+#        already complete during recovery.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+
+
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+connect (master2,localhost,root,,);
+--connection master
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+--sync_slave_with_master
+--source include/stop_slave.inc
+
+--connection master1
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_commit";
+--error 2013 # CR_SERVER_LOST
+XA COMMIT 'xa2';
+--source include/wait_until_disconnected.inc
+
+--connection master1
+--source include/wait_until_disconnected.inc
+
+--connection master
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart
+EOF
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+# rpl_end.inc needs to use the connection server_1
+connection server_1;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection master
+SELECT * FROM t;
+XA RECOVER;
+--error 1397 # ER_XAER_NOTA
+XA COMMIT 'xa2';
+SELECT * FROM t;
+--sync_slave_with_master
+
+SELECT * FROM t;
+
+--connection master
+DROP TABLE t;
+--sync_slave_with_master
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_xa_event_apply_failure.test b/mysql-test/suite/rpl/t/rpl_xa_event_apply_failure.test
new file mode 100644
index 00000000000..71d0de0fc56
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_event_apply_failure.test
@@ -0,0 +1,119 @@
+# ==== Purpose ====
+#
+# Test verifies that if for some reason an event cannot be applied during
+# recovery, appropriate error is reported.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate 2 explicit XA transactions. 'xa1' and 'xa2'.
+#        'xa1' will be prepared and committed.
+#    1 - For 'xa2' let the XA COMMIT be done in binary log and crash the
+#        server so that it is not committed in engine.
+#    2 - Restart the server. Using debug simulation point make XA COMMIT 'xa2'
+#        execution to fail. The server will resume anyway
+#        to leave the error in the errlog (see "Recovery: Error..").
+#    3 - Work around the simulated failure with Commit once again
+#        from a connection that turns OFF binlogging.
+#        Slave must catch up with the master.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+
+
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+connect (master2,localhost,root,,);
+--connection master
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CALL mtr.add_suppression("Failed to execute binlog query event");
+CALL mtr.add_suppression("Recovery: Error .Out of memory..");
+CALL mtr.add_suppression("Crash recovery failed.");
+CALL mtr.add_suppression("Can.t init tc log");
+CALL mtr.add_suppression("Aborting");
+
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+--sync_slave_with_master
+--source include/stop_slave.inc
+
+--connection master1
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_commit";
+--error 2013 # CR_SERVER_LOST
+XA COMMIT 'xa2';
+--source include/wait_until_disconnected.inc
+
+--connection master1
+--source include/wait_until_disconnected.inc
+
+--connection master
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart: --debug-dbug=d,trans_xa_commit_fail
+EOF
+
+connection default;
+--source include/wait_until_disconnected.inc
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--enable_reconnect
+--echo *** must be no 'xa2' commit seen, as it's still prepared:
+SELECT * FROM t;
+XA RECOVER;
+
+# Commit it manually now to work around the extra binlog record
+# by turning binlogging OFF by the connection.
+
+SET GLOBAL DEBUG_DBUG="";
+SET SQL_LOG_BIN=0;
+--error 0
+XA COMMIT 'xa2';
+SET SQL_LOG_BIN=1;
+
+
+# rpl_end.inc needs to use the connection server_1
+connection server_1;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--source include/wait_until_connected_again.inc
+
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection master
+SELECT * FROM t;
+
+--sync_slave_with_master
+SELECT * FROM t;
+
+--connection master
+DROP TABLE t;
+--sync_slave_with_master
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_xa_prepare_commit_prepare.test b/mysql-test/suite/rpl/t/rpl_xa_prepare_commit_prepare.test
new file mode 100644
index 00000000000..7b987c7f29b
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_prepare_commit_prepare.test
@@ -0,0 +1,95 @@
+# ==== Purpose ====
+#
+# Test verifies that XA PREPARE transactions are crash safe.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate 2 explicit XA transactions. 'xa1' and 'xa2'.
+#        'xa1' will be prepared and committed.
+#    1 - For 'xa2' let the XA PREPARE be done in binary log and crash the
+#        server so that it is not prepared in engine.
+#    2 - Restart the server. The recovery code should successfully recover
+#        'xa2'.
+#    3 - When server is up, execute XA RECOVER and verify that 'xa2' is
+#        present.
+#    4 - Commit the XA transaction and verify its correctness.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+connect (master2,localhost,root,,);
+--connection master
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+--sync_slave_with_master
+--source include/stop_slave.inc
+
+--connection master1
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+--error 2013 # CR_SERVER_LOST
+XA PREPARE 'xa2';
+--source include/wait_until_disconnected.inc
+
+--connection master1
+--source include/wait_until_disconnected.inc
+
+--connection master
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart
+EOF
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+# rpl_end.inc needs to use the connection server_1
+connection server_1;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection master
+SELECT * FROM t;
+XA RECOVER;
+XA COMMIT 'xa2';
+SELECT * FROM t;
+--sync_slave_with_master
+
+SELECT * FROM t;
+
+--connection master
+DROP TABLE t;
+--sync_slave_with_master
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_xa_prepare_crash_safe.test b/mysql-test/suite/rpl/t/rpl_xa_prepare_crash_safe.test
new file mode 100644
index 00000000000..9d2c5cce528
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_prepare_crash_safe.test
@@ -0,0 +1,117 @@
+# ==== Purpose ====
+#
+# Test verifies that XA PREPARE transactions are crash safe.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate 3 explicit XA transactions. 'xa1', 'xa2' and 'xa3'.
+#        Using debug simulation hold the execution of second XA PREPARE
+#        statement after the XA PREPARE is written to the binary log.
+#        With this the prepare will not be done in engine.
+#    1 - For 'xa3' allow the PREPARE statement to be written to binary log and
+#        simulate server crash.
+#    2 - Restart the server. The recovery code should successfully recover
+#        'xa2' and 'xa3'.
+#    3 - When server is up, execute XA RECOVER and verify that 'xa2' and 'xa3'
+#        are present along with 'xa1'.
+#    4 - Commit all the XA transactions and verify their correctness.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+
+
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+connect (master2,localhost,root,,);
+--connection master
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+CALL mtr.add_suppression("Found 2 prepared XA transactions");
+CALL mtr.add_suppression("Found 3 prepared XA transactions");
+
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+--sync_slave_with_master
+--source include/stop_slave.inc
+
+--connection master1
+use test;
+xa start 'xa2';
+insert into t values (30);
+xa end 'xa2';
+SET DEBUG_SYNC="simulate_hang_after_binlog_prepare SIGNAL reached WAIT_FOR go";
+send xa prepare 'xa2';
+
+--connection master2
+let $wait_condition=
+  SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST
+    WHERE STATE like "debug sync point: simulate_hang_after_binlog_prepare%";
+--source include/wait_condition.inc
+
+XA START 'xa3';
+INSERT INTO t VALUES (40);
+XA END 'xa3';
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_prepare";
+--error 2013 # CR_SERVER_LOST
+XA PREPARE 'xa3';
+--source include/wait_until_disconnected.inc
+
+--connection master1
+--error 2013
+--reap
+--source include/wait_until_disconnected.inc
+
+--connection master
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart
+EOF
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+# rpl_end.inc needs to use the connection server_1
+connection server_1;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection master
+SELECT * FROM t;
+XA RECOVER;
+XA COMMIT 'xa1';
+XA COMMIT 'xa2';
+XA COMMIT 'xa3';
+SELECT * FROM t;
+--sync_slave_with_master
+
+SELECT * FROM t;
+
+--connection master
+DROP TABLE t;
+--sync_slave_with_master
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_xa_rollback_commit_crash_safe.test b/mysql-test/suite/rpl/t/rpl_xa_rollback_commit_crash_safe.test
new file mode 100644
index 00000000000..6416602da5e
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_xa_rollback_commit_crash_safe.test
@@ -0,0 +1,97 @@
+# ==== Purpose ====
+#
+# Test verifies that XA COMMIT statements are crash safe.
+#
+# ==== Implementation ====
+#
+# Steps:
+#    0 - Generate 2 explicit XA transactions. 'xa1' and 'xa2'.
+#        'xa1' will be prepared and committed.
+#    1 - For 'xa2' let the XA ROLLBACK be done in binary log and crash the
+#        server so that it is not committed in engine.
+#    2 - Restart the server. The recovery code should successfully recover
+#        'xa2'. The ROLLBACK should be executed during recovery.
+#    3 - Check the data in table. Only one row should be present in table.
+#    4 - Trying to rollback 'xa2' should report unknown 'XA' error as rollback
+#        is already complete during recovery.
+#
+# ==== References ====
+#
+# MDEV-21469: Implement crash-safe logging of the user XA
+
+--source include/have_innodb.inc
+--source include/master-slave.inc
+--source include/have_debug.inc
+
+connect (master2,localhost,root,,);
+--connection master
+CALL mtr.add_suppression("Found 1 prepared XA transactions");
+
+CREATE TABLE t ( f INT ) ENGINE=INNODB;
+XA START 'xa1';
+INSERT INTO t VALUES (20);
+XA END 'xa1';
+XA PREPARE 'xa1';
+XA COMMIT 'xa1';
+--sync_slave_with_master
+--source include/stop_slave.inc
+
+--connection master1
+XA START 'xa2';
+INSERT INTO t VALUES (40);
+XA END 'xa2';
+XA PREPARE 'xa2';
+
+--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+wait
+EOF
+
+SET GLOBAL DEBUG_DBUG="d,simulate_crash_after_binlog_rollback";
+--error 2013 # CR_SERVER_LOST
+XA ROLLBACK 'xa2';
+--source include/wait_until_disconnected.inc
+
+--connection master1
+--source include/wait_until_disconnected.inc
+
+--connection master
+--source include/wait_until_disconnected.inc
+
+#
+# Server restart
+#
+--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
+restart
+EOF
+
+connection default;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+# rpl_end.inc needs to use the connection server_1
+connection server_1;
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection master
+--enable_reconnect
+--source include/wait_until_connected_again.inc
+
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection master
+SELECT * FROM t;
+XA RECOVER;
+--error 1397 # ER_XAER_NOTA
+XA ROLLBACK 'xa2';
+SELECT * FROM t;
+--sync_slave_with_master
+
+SELECT * FROM t;
+
+--connection master
+DROP TABLE t;
+--sync_slave_with_master
+--source include/rpl_end.inc
diff --git a/sql/handler.cc b/sql/handler.cc
index a1719f9b922..c997c52e602 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1290,6 +1290,9 @@ int ha_prepare(THD *thd)
           error=1;
           break;
         }
+        DEBUG_SYNC(thd, "simulate_hang_after_binlog_prepare");
+        DBUG_EXECUTE_IF("simulate_crash_after_binlog_prepare",
+            DBUG_SUICIDE(););
       }
       else
       {
@@ -1795,6 +1798,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
         ++count;
       ha_info_next= ha_info->next();
       ha_info->reset(); /* keep it conveniently zero-filled */
+      DBUG_EXECUTE_IF("simulate_crash_after_binlog_commit",
+          DBUG_SUICIDE(););
     }
     trans->ha_list= 0;
     trans->no_2pc=0;
@@ -1908,6 +1913,8 @@ int ha_rollback_trans(THD *thd, bool all)
       status_var_increment(thd->status_var.ha_rollback_count);
       ha_info_next= ha_info->next();
       ha_info->reset(); /* keep it conveniently zero-filled */
+      DBUG_EXECUTE_IF("simulate_crash_after_binlog_rollback",
+          DBUG_SUICIDE(););
     }
     trans->ha_list= 0;
     trans->no_2pc=0;
@@ -2107,6 +2114,7 @@ struct xarecover_st
   int len, found_foreign_xids, found_my_xids;
   XID *list;
   HASH *commit_list;
+  HASH *xa_prepared_list;
   bool dry_run;
 };
 
@@ -2155,7 +2163,23 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
             _db_doprnt_("ignore xid %s", xid_to_str(buf, info->list+i));
             });
           xid_cache_insert(info->list + i, true);
+          XID *foreign_xid= info->list + i;
           info->found_foreign_xids++;
+
+           /*
+             For each foreign xid prepraed in engine, check if it is present in
+             xa_prepared_list sent by binlog.
+           */
+            if (info->xa_prepared_list)
+            {
+              struct xa_recovery_member *member= NULL;
+              if ((member= (xa_recovery_member *)
+                   my_hash_search(info->xa_prepared_list, foreign_xid->key(),
+                                  foreign_xid->key_length())))
+              {
+                member->in_engine_prepare= true;
+              }
+            }
           continue;
         }
         if (IF_WSREP(!(wsrep_emulate_bin_log &&
@@ -2202,12 +2226,13 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
   return FALSE;
 }
 
-int ha_recover(HASH *commit_list)
+int ha_recover(HASH *commit_list, HASH *xa_prepared_list)
 {
   struct xarecover_st info;
   DBUG_ENTER("ha_recover");
   info.found_foreign_xids= info.found_my_xids= 0;
   info.commit_list= commit_list;
+  info.xa_prepared_list= xa_prepared_list;
   info.dry_run= (info.commit_list==0 && tc_heuristic_recover==0);
   info.list= NULL;
 
@@ -2254,7 +2279,7 @@ int ha_recover(HASH *commit_list)
                     info.found_my_xids, opt_tc_log_file);
     DBUG_RETURN(1);
   }
-  if (info.commit_list)
+  if (info.commit_list && !info.found_foreign_xids)
     sql_print_information("Crash recovery finished.");
   DBUG_RETURN(0);
 }
diff --git a/sql/handler.h b/sql/handler.h
index 92c2a61ed0e..4ff08d7bd08 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -521,6 +521,8 @@ enum legacy_db_type
   DB_TYPE_FIRST_DYNAMIC=45,
   DB_TYPE_DEFAULT=127 // Must be last
 };
+
+enum xa_binlog_state {XA_PREPARE=0, XA_COMPLETE};
 /*
   Better name for DB_TYPE_UNKNOWN. Should be used for engines that do not have
   a hard-coded type value here.
@@ -806,7 +808,6 @@ struct st_system_tablename
   const char *tablename;
 };
 
-
 typedef ulonglong my_xid; // this line is the same as in log_event.h
 #define MYSQL_XID_PREFIX "MySQLXid"
 #define MYSQL_XID_PREFIX_LEN 8 // must be a multiple of 8
@@ -898,6 +899,13 @@ struct xid_t {
 };
 typedef struct xid_t XID;
 
+struct xa_recovery_member
+{
+   XID xid;
+   enum xa_binlog_state state;
+   bool in_engine_prepare;
+};
+
 /* for recover() handlerton call */
 #define MIN_XID_LIST_SIZE  128
 #define MAX_XID_LIST_SIZE  (1024*128)
@@ -4996,7 +5004,7 @@ int ha_commit_one_phase(THD *thd, bool all);
 int ha_commit_trans(THD *thd, bool all);
 int ha_rollback_trans(THD *thd, bool all);
 int ha_prepare(THD *thd);
-int ha_recover(HASH *commit_list);
+int ha_recover(HASH *commit_list, HASH *xa_recover_list);
 
 /* transactions: these functions never call handlerton functions directly */
 int ha_enable_transaction(THD *thd, bool on);
diff --git a/sql/log.cc b/sql/log.cc
index e13f8fbc88f..ed6dc87b262 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -38,6 +38,7 @@
 #include "log_event.h"          // Query_log_event
 #include "rpl_filter.h"
 #include "rpl_rli.h"
+#include "rpl_mi.h"
 #include "sql_audit.h"
 #include "mysqld.h"
 
@@ -3406,6 +3407,8 @@ MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
   index_file_name[0] = 0;
   bzero((char*) &index_file, sizeof(index_file));
   bzero((char*) &purge_index_file, sizeof(purge_index_file));
+  /* non-zero is a marker to conduct xa recovery and related cleanup */
+  xa_recover_list.records= 0;
 }
 
 void MYSQL_BIN_LOG::stop_background_thread()
@@ -3467,6 +3470,11 @@ void MYSQL_BIN_LOG::cleanup()
     mysql_cond_destroy(&COND_xid_list);
     mysql_cond_destroy(&COND_binlog_background_thread);
     mysql_cond_destroy(&COND_binlog_background_thread_end);
+    if (!is_relay_log && xa_recover_list.records)
+    {
+      free_root(&mem_root, MYF(0));
+      my_hash_free(&xa_recover_list);
+    }
   }
 
   /*
@@ -8028,7 +8036,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
 
     /* Now we have in queue the list of transactions to be committed in order. */
   }
-    
+
   DBUG_ASSERT(is_open());
   if (likely(is_open()))                       // Should always be true
   {
@@ -9717,7 +9725,7 @@ int TC_LOG_MMAP::recover()
         goto err2; // OOM
   }
 
-  if (ha_recover(&xids))
+  if (ha_recover(&xids, 0))
     goto err2;
 
   my_hash_free(&xids);
@@ -9758,7 +9766,7 @@ int TC_LOG::using_heuristic_recover()
     return 0;
 
   sql_print_information("Heuristic crash recovery mode");
-  if (ha_recover(0))
+  if (ha_recover(0, 0))
     sql_print_error("Heuristic crash recovery failed");
   sql_print_information("Please restart mysqld without --tc-heuristic-recover");
   return 1;
@@ -10217,14 +10225,108 @@ start_binlog_background_thread()
   return 0;
 }
 
+/**
+  Auxiliary function for ::recover().
+  @returns a successfully created and inserted @c xa_recovery_member
+             into hash @c hash_arg,
+           or NULL.
+*/
+static xa_recovery_member*
+xa_member_insert(HASH *hash_arg, xid_t *xid_arg, xa_binlog_state state_arg,
+              MEM_ROOT *ptr_mem_root)
+{
+  xa_recovery_member *member= (xa_recovery_member*)
+    alloc_root(ptr_mem_root, sizeof(xa_recovery_member));
+  if (!member)
+    return NULL;
+
+  member->xid.set(xid_arg);
+  member->state= state_arg;
+  member->in_engine_prepare= false;
+  return my_hash_insert(hash_arg, (uchar*) member) ? NULL : member;
+}
 
+/* Inserts or update an existing hash member with a proper state */
+static bool xa_member_replace(HASH *hash_arg, xid_t *xid_arg, bool is_prepare,
+                              MEM_ROOT *ptr_mem_root)
+{
+  if(is_prepare)
+  {
+    if (!(xa_member_insert(hash_arg, xid_arg, XA_PREPARE, ptr_mem_root)))
+      return true;
+  }
+  else
+  {
+    /*
+      Search if XID is already present in recovery_list. If found
+      and the state is 'XA_PREPRAED' mark it as XA_COMPLETE.
+      Effectively, there won't be XA-prepare event group replay.
+    */
+    xa_recovery_member* member;
+    if ((member= (xa_recovery_member *)
+         my_hash_search(hash_arg, xid_arg->key(), xid_arg->key_length())))
+    {
+      if (member->state == XA_PREPARE)
+        member->state= XA_COMPLETE;
+    }
+    else // We found only XA COMMIT during recovery insert to list
+    {
+      if (!(member= xa_member_insert(hash_arg,
+                                     xid_arg, XA_COMPLETE, ptr_mem_root)))
+        return true;
+    }
+  }
+  return false;
+}
+
+extern "C" uchar *xid_get_var_key(xid_t *entry, size_t *length,
+                              my_bool not_used __attribute__((unused)))
+{
+  *length= entry->key_length();
+  return (uchar*) entry->key();
+}
+
+/**
+   Performs recovery based on transaction coordinator log for 2pc. At the
+   time of crash, if the binary log was in active state, then recovery for
+   'xid's and explicit 'XA' transactions is initiated, otherwise the gtid
+   binlog state is updated. For 'xid' and 'XA' based recovery following steps
+   are performed.
+
+   Look for latest binlog checkpoint file. There can be two cases. The active
+   binary log and the latest binlog checkpoint file can be the same.
+
+   Scan the binary log from the beginning.
+   From GTID_LIST and GTID_EVENTs reconstruct the gtid binlog state.
+   Prepare a list of 'xid's for recovery.
+   Prepare a list of explicit 'XA' transactions for recovery.
+   Recover the 'xid' transactions.
+   The explicit 'XA' transaction recovery is initiated once all the server
+   components are initialized. Please check 'execute_xa_for_recovery()'.
+
+   Called from @c MYSQL_BIN_LOG::do_binlog_recovery()
+
+   @param linfo          Store here the found log file name and position to
+                         the NEXT log file name in the index file.
+
+   @param last_log_name  Name of the last active binary log at the time of
+                         crash.
+
+   @param first_log      Pointer to IO_CACHE of active binary log
+   @param fdle           Format_description_log_event of active binary log
+   @param do_xa          Is 2pc recovery needed for 'xid's and explicit XA
+                         transactions.
+   @return               indicates success or failure of recovery.
+    @retval 0 success
+    @retval 1 failure
+
+*/
 int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
                            IO_CACHE *first_log,
                            Format_description_log_event *fdle, bool do_xa)
 {
   Log_event *ev= NULL;
   HASH xids;
-  MEM_ROOT mem_root;
   char binlog_checkpoint_name[FN_REFLEN];
   bool binlog_checkpoint_found;
   bool first_round;
@@ -10237,9 +10339,17 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
   bool last_gtid_valid= false;
 #endif
 
-  if (! fdle->is_valid() ||
-      (do_xa && my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0,
-                             sizeof(my_xid), 0, 0, MYF(0))))
+  binlog_checkpoint_name[0]= 0;
+  if (!fdle->is_valid() ||
+      (do_xa &&
+       (my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3,
+                     0,
+                     sizeof(my_xid), 0, 0, MYF(0)) ||
+        my_hash_init(&xa_recover_list,
+                     &my_charset_bin,
+                     TC_LOG_PAGE_SIZE/3,
+                     0, 0,
+                     (my_hash_get_key) xid_get_var_key,  0, MYF(0)))))
     goto err1;
 
   if (do_xa)
@@ -10313,21 +10423,29 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
 
 #ifdef HAVE_REPLICATION
       case GTID_EVENT:
-        if (first_round)
         {
           Gtid_log_event *gev= (Gtid_log_event *)ev;
-
-          /* Update the binlog state with any GTID logged after Gtid_list. */
-          last_gtid.domain_id= gev->domain_id;
-          last_gtid.server_id= gev->server_id;
-          last_gtid.seq_no= gev->seq_no;
-          last_gtid_standalone=
-            ((gev->flags2 & Gtid_log_event::FL_STANDALONE) ? true : false);
-          last_gtid_valid= true;
+          if (first_round)
+          {
+            /* Update the binlog state with any GTID logged after Gtid_list. */
+            last_gtid.domain_id= gev->domain_id;
+            last_gtid.server_id= gev->server_id;
+            last_gtid.seq_no= gev->seq_no;
+            last_gtid_standalone=
+              ((gev->flags2 & Gtid_log_event::FL_STANDALONE) ? true : false);
+            last_gtid_valid= true;
+          }
+          if (do_xa &&
+              (gev->flags2 &
+               (Gtid_log_event::FL_PREPARED_XA |
+                Gtid_log_event::FL_COMPLETED_XA)) &&
+              xa_member_replace(&xa_recover_list, &gev->xid,
+                                gev->flags2 & Gtid_log_event::FL_PREPARED_XA,
+                                &mem_root))
+              goto err2;
+          break;
         }
-        break;
 #endif
-
       case START_ENCRYPTION_EVENT:
         {
           if (fdle->start_decryption((Start_encryption_log_event*) ev))
@@ -10417,10 +10535,22 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
 
   if (do_xa)
   {
-    if (ha_recover(&xids))
+    if (ha_recover(&xids, &xa_recover_list))
       goto err2;
 
-    free_root(&mem_root, MYF(0));
+    DBUG_ASSERT(!xa_recover_list.records ||
+                (binlog_checkpoint_found && binlog_checkpoint_name[0] != 0));
+
+    if (!xa_recover_list.records)
+    {
+      free_root(&mem_root, MYF(0));
+      my_hash_free(&xa_recover_list);
+    }
+    else
+    {
+      xa_binlog_checkpoint_name= strmake_root(&mem_root, binlog_checkpoint_name,
+                                              strlen(binlog_checkpoint_name));
+    }
     my_hash_free(&xids);
   }
   return 0;
@@ -10436,6 +10566,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
   {
     free_root(&mem_root, MYF(0));
     my_hash_free(&xids);
+    my_hash_free(&xa_recover_list);
   }
 err1:
   sql_print_error("Crash recovery failed. Either correct the problem "
@@ -10445,6 +10576,214 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
   return 1;
 }
 
+void MYSQL_BIN_LOG::execute_xa_for_recovery()
+{
+  if (xa_recover_list.records)
+    (void) recover_explicit_xa_prepare();
+  free_root(&mem_root, MYF(0));
+  my_hash_free(&xa_recover_list);
+};
+
+/**
+   Performs recovery of explict XA transactions.
+   'xa_recover_list' contains the list of XA transactions to be recovered.
+   These events are replayed from the binary log to complete the recovery.
+
+   @return        indicates success or failure of recovery.
+    @retval false success
+    @retval true  failure
+
+*/
+bool MYSQL_BIN_LOG::recover_explicit_xa_prepare()
+{
+#ifndef HAVE_REPLICATION
+  /* Can't be supported without replication applier built in. */
+  return false;
+#else
+  bool err= true;
+  int error=0;
+  Relay_log_info *rli= NULL;
+  rpl_group_info *rgi;
+  THD *thd= new THD(0);  /* Needed by start_slave_threads */
+  thd->thread_stack= (char*) &thd;
+  thd->store_globals();
+  thd->security_ctx->skip_grants();
+  IO_CACHE log;
+  const char *errmsg;
+  File        file;
+  bool enable_apply_event= false;
+  Log_event *ev = 0;
+  LOG_INFO linfo;
+  int recover_xa_count= xa_recover_list.records;
+  xa_recovery_member *member= NULL;
+
+  //DBUG_ASSERT(!thd->rli_fake);
+
+  if (!(rli= thd->rli_fake= new Relay_log_info(FALSE, "Recovery")))
+  {
+    my_error(ER_OUTOFMEMORY, MYF(ME_FATAL), 1);
+    goto err2;
+  }
+  rli->sql_driver_thd= thd;
+  static LEX_CSTRING connection_name= { STRING_WITH_LEN("Recovery") };
+  rli->mi= new Master_info(&connection_name, false);
+  if (!(rgi= thd->rgi_fake))
+    rgi= thd->rgi_fake= new rpl_group_info(rli);
+  rgi->thd= thd;
+  thd->system_thread_info.rpl_sql_info=
+    new rpl_sql_thread_info(rli->mi->rpl_filter);
+
+  if (rli && !rli->relay_log.description_event_for_exec)
+  {
+    rli->relay_log.description_event_for_exec=
+      new Format_description_log_event(4);
+  }
+  if (find_log_pos(&linfo, xa_binlog_checkpoint_name, 1))
+  {
+    sql_print_error("Binlog file '%s' not found in binlog index, needed "
+                    "for recovery. Aborting.", xa_binlog_checkpoint_name);
+    goto err2;
+  }
+
+  tmp_disable_binlog(thd);
+  thd->variables.pseudo_slave_mode= TRUE;
+  for (;;)
+  {
+    if ((file= open_binlog(&log, linfo.log_file_name, &errmsg)) < 0)
+    {
+      sql_print_error("%s", errmsg);
+      goto err1;
+    }
+    while (recover_xa_count > 0 &&
+        (ev= Log_event::read_log_event(&log,
+                                       rli->relay_log.description_event_for_exec,
+                                       opt_master_verify_checksum)))
+    {
+      if (!ev->is_valid())
+      {
+        sql_print_error("Found invalid binlog query event %s"
+                        " at %s:%lu; error %d %s", ev->get_type_str(),
+                        linfo.log_file_name,
+                        (ev->log_pos - ev->data_written));
+        goto err1;
+      }
+      enum Log_event_type typ= ev->get_type_code();
+      ev->thd= thd;
+
+      if (typ == FORMAT_DESCRIPTION_EVENT)
+        enable_apply_event= true;
+
+      if (typ == GTID_EVENT)
+      {
+        Gtid_log_event *gev= (Gtid_log_event *)ev;
+        if (gev->flags2 &
+            (Gtid_log_event::FL_PREPARED_XA | Gtid_log_event::FL_COMPLETED_XA))
+        {
+          if ((member=
+               (xa_recovery_member*) my_hash_search(&xa_recover_list,
+                                                    gev->xid.key(),
+                                                    gev->xid.key_length())))
+          {
+            /* Got XA PREPARE query in binlog but check member->state. If it is
+               marked as XA_PREPARE then this PREPARE has not seen its end
+               COMMIT/ROLLBACK. Check if it exists in engine in prepared state.
+               If so apply.
+             */
+            if (gev->flags2 & Gtid_log_event::FL_PREPARED_XA)
+            {
+              if (member->state == XA_PREPARE)
+              {
+                // XA is prepared in binlog and not present in engine then apply
+                if (member->in_engine_prepare == false)
+                  enable_apply_event= true;
+                else
+                  --recover_xa_count;
+              }
+            }
+            else if (gev->flags2 & Gtid_log_event::FL_COMPLETED_XA)
+            {
+              if (member->state == XA_COMPLETE &&
+                  member->in_engine_prepare == true)
+                enable_apply_event= true;
+              else
+                --recover_xa_count;
+            }
+          }
+        }
+      }
+
+      if (enable_apply_event)
+      {
+        if (typ == XA_PREPARE_LOG_EVENT)
+          thd->transaction.xid_state.set_binlogged();
+        if ((err= ev->apply_event(rgi)))
+        {
+            sql_print_error("Failed to execute binlog query event of type: %s,"
+                            " at %s:%lu; error %d %s", ev->get_type_str(),
+                            linfo.log_file_name,
+                            (ev->log_pos - ev->data_written),
+                            thd->get_stmt_da()->sql_errno(),
+                            thd->get_stmt_da()->message());
+            delete ev;
+            goto err1;
+        }
+        else if (typ == FORMAT_DESCRIPTION_EVENT)
+          enable_apply_event=false;
+        else if (thd->lex->sql_command == SQLCOM_XA_PREPARE ||
+                 thd->lex->sql_command == SQLCOM_XA_COMMIT  ||
+                 thd->lex->sql_command == SQLCOM_XA_ROLLBACK)
+        {
+          --recover_xa_count;
+          enable_apply_event=false;
+
+          sql_print_information("Binlog event %s at %s:%lu"
+              " successfully applied",
+              typ == XA_PREPARE_LOG_EVENT ?
+              static_cast<XA_prepare_log_event *>(ev)->get_query() :
+              static_cast<Query_log_event *>(ev)->query,
+              linfo.log_file_name, (ev->log_pos - ev->data_written));
+        }
+      }
+      if (typ != FORMAT_DESCRIPTION_EVENT)
+        delete ev;
+    }
+    end_io_cache(&log);
+    mysql_file_close(file, MYF(MY_WME));
+    file= -1;
+    if (unlikely((error= find_next_log(&linfo, 1))))
+    {
+      if (error != LOG_INFO_EOF)
+        sql_print_error("find_log_pos() failed (error: %d)", error);
+      else
+        break;
+    }
+  }
+err1:
+  reenable_binlog(thd);
+  /*
+    There should be no more XA transactions to recover upon successful
+    completion.
+  */
+  if (recover_xa_count > 0)
+    goto err2;
+  sql_print_information("Crash recovery finished.");
+  err= false;
+err2:
+  if (file >= 0)
+  {
+    end_io_cache(&log);
+    mysql_file_close(file, MYF(MY_WME));
+  }
+  thd->variables.pseudo_slave_mode= FALSE;
+  delete rli->mi;
+  delete thd->system_thread_info.rpl_sql_info;
+  rgi->slave_close_thread_tables(thd);
+  thd->reset_globals();
+  delete thd;
+
+  return err;
+#endif  /* !HAVE_REPLICATION */
+}
 
 int
 MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery)
diff --git a/sql/log.h b/sql/log.h
index 8e70d3c8f4c..9bf3248d4c9 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -63,6 +63,7 @@ class TC_LOG
   virtual int unlog(ulong cookie, my_xid xid)=0;
   virtual int unlog_xa_prepare(THD *thd, bool all)= 0;
   virtual void commit_checkpoint_notify(void *cookie)= 0;
+  virtual void execute_xa_for_recovery() {};
 
 protected:
   /*
@@ -708,6 +709,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
   void commit_checkpoint_notify(void *cookie);
   int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log,
               Format_description_log_event *fdle, bool do_xa);
+  bool recover_explicit_xa_prepare();
+
   int do_binlog_recovery(const char *opt_name, bool do_xa_recovery);
 #if !defined(MYSQL_CLIENT)
 
@@ -932,7 +935,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
   mysql_mutex_t* get_binlog_end_pos_lock() { return &LOCK_binlog_end_pos; }
 
   int wait_for_update_binlog_end_pos(THD* thd, struct timespec * timeout);
-
+  void execute_xa_for_recovery();
   /*
     Binlog position of end of the binlog.
     Access to this is protected by LOCK_binlog_end_pos
@@ -945,6 +948,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
   */
   my_off_t binlog_end_pos;
   char binlog_end_pos_file[FN_REFLEN];
+  MEM_ROOT mem_root;
+  char *xa_binlog_checkpoint_name;
+  HASH xa_recover_list;
 };
 
 class Log_event_handler
diff --git a/sql/log_event.cc b/sql/log_event.cc
index ee44f7f1da4..9adfefceb97 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -18,7 +18,7 @@
 
 #include "mariadb.h"
 #include "sql_priv.h"
-
+#include "handler.h"
 #ifndef MYSQL_CLIENT
 #include "unireg.h"
 #include "log_event.h"
@@ -2812,9 +2812,25 @@ XA_prepare_log_event(const char* buf,
   buf += sizeof(temp);
   memcpy(&temp, buf, sizeof(temp));
   m_xid.gtrid_length= uint4korr(&temp);
+  // Todo: validity here and elsewhere checks to be replaced by MDEV-21839 fixes
+  if (m_xid.gtrid_length < 0 || m_xid.gtrid_length > MAXGTRIDSIZE)
+  {
+    m_xid.formatID= -1;
+    return;
+  }
   buf += sizeof(temp);
   memcpy(&temp, buf, sizeof(temp));
   m_xid.bqual_length= uint4korr(&temp);
+  if (m_xid.bqual_length < 0 || m_xid.bqual_length > MAXBQUALSIZE)
+  {
+    m_xid.formatID= -1;
+    return;
+  }
+  if (m_xid.gtrid_length + m_xid.bqual_length > XIDDATASIZE)
+  {
+    m_xid.formatID= -1;
+    return;
+  }
   buf += sizeof(temp);
   memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length);
 
diff --git a/sql/log_event.h b/sql/log_event.h
index a6543b70eb5..595dc9f6c3c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3234,6 +3234,7 @@ class XA_prepare_log_event: public Xid_apply_log_event
                        const Format_description_log_event *description_event);
   ~XA_prepare_log_event() {}
   Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; }
+  bool is_valid() const { return m_xid.formatID != -1; }
   int get_data_size()
   {
     return xid_subheader_no_data + m_xid.gtrid_length + m_xid.bqual_length;
@@ -3241,12 +3242,7 @@ class XA_prepare_log_event: public Xid_apply_log_event
 
 #ifdef MYSQL_SERVER
   bool write();
-#endif
-
-private:
-#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-  char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size];
-  int do_commit();
+#ifdef HAVE_REPLICATION
   const char* get_query()
   {
     sprintf(query,
@@ -3254,6 +3250,13 @@ class XA_prepare_log_event: public Xid_apply_log_event
             m_xid.serialize());
     return query;
   }
+#endif /* HAVE_REPLICATION */
+#endif /* MYSQL_SERVER */
+
+private:
+#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
+  char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size];
+  int do_commit();
 #endif
 };
 
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index b2f8afca7a6..f669a4ca5d8 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -5189,7 +5189,7 @@ static int init_server_components()
     unireg_abort(1);
   }
 
-  if (ha_recover(0))
+  if (ha_recover(0, 0))
   {
     unireg_abort(1);
   }
@@ -5606,7 +5606,7 @@ int mysqld_main(int argc, char **argv)
   initialize_information_schema_acl();
 
   execute_ddl_log_recovery();
-
+  tc_log->execute_xa_for_recovery();
   /*
     Change EVENTS_ORIGINAL to EVENTS_OFF (the default value) as there is no
     point in using ORIGINAL during startup
diff --git a/sql/xa.cc b/sql/xa.cc
index 786d09c2b39..df7d0229157 100644
--- a/sql/xa.cc
+++ b/sql/xa.cc
@@ -581,6 +581,10 @@ bool trans_xa_commit(THD *thd)
   XID_STATE &xid_state= thd->transaction.xid_state;
 
   DBUG_ENTER("trans_xa_commit");
+  DBUG_EXECUTE_IF("trans_xa_commit_fail",
+      {my_error(ER_OUT_OF_RESOURCES, MYF(0));
+      DBUG_RETURN(TRUE);});
+
 
   if (!xid_state.is_explicit_XA() ||
       !xid_state.xid_cache_element->xid.eq(thd->lex->xid))