← Back to team overview

maria-discuss team mailing list archive

Pipeline-stype slave parallel applier

 

Hello, everybody.

Let me offer a bunch of ideas for further improvements in replication,
and the parallel one specifically.

Your comments, thoughts and critical notes are most welcome!
Thank you for your time.

Andrei



Slave balanced parallel applier
================================

There are a few motivations for this project which include the
following reflections:
- the slower binlog group transactions applying rate by slave
  the more transaction sizes differ (must've been few bugs reported):
  the speed of a caravan is determined by the pace of the slowest
  camel, or more formally as specified by Amdahl's law (KrinstianN pointed that
  out to me)
- while ordered committing of grouped transactions on slave
  presents some virtues (like guaranteed consistency with master,
  monotonic changes to slave execution status as measured by GTID)
  it still must be taxing on performance.

Observe that co-prepared transactions of a master binlog group represent a
sort of a transaction on its own. Call it epoch [which must revive
memories of NDB cluster], and the original transactions as its
sub-transactions or branches.
Notice that there is no meaning to sub-transaction internal ordering within the
epoch. They can be logged to the binary log in any mutual order,
still without exposing any inconsistencies to the
user. Specifically, say there are two prepared transactions

  BEGIN /* A */    BEGIN /* B */
    INSERT...	     INSERT...  
    UPDATE...	     UPDATE...  
    DELETE...	     DELETE...  
  COMMIT       	   COMMIT       

entering the binlog group (ordered) commit module. They can end up
into binlog as A,B or B,A, and regardless of that (Gtid actually) order
also be acknowledged to the user in either way.

The latter fact infers that slave does not have to commit the group
sub-transaction/branches in the GTID order. It could be done in any
order including no order at all like to do so as one epoch commit.

Naturally the latter option is impractical to replication. The current
conservative scheduler offers a decent parallelization method still suffering from 
the aforementioned uneven branch's sizes and inter-thread
communications caused by ordering. The optimistic scheduler remedies to some level
but not fully (an optimistically executed group may be unbalanced too)
and even conceding some risk of performance degradation (rollback is costly).

There is an idea reminiscent to pipelining to handle the epoch (the
master binlog group) as a single transaction but in parallel. That is
to execute its statements by multiple workers, this time scheduling
them fairly, and "atomically" commit their works.

To the fairness part, workers get assigned by a statement at a time
from the epoch transaction(sub-transaction BEGIN, COMMIT removed).
Say 'E' an epoch consists of 's' of sub-transaction

  E := { T_1, T_2, ... T_s }

For each m from 1 to s sub-transactions T_m gets scheduled to some worker

         W_1   W_2 ... W_k

          |     |       |
          V     V       V
      
T_m :=  { e_1, e_2, ... e_l }

  Hereinafter I use the TeX math notation, '^' - a superscript attached to
  the event to designate its parent transaction sequence number (Gtid
  seq_no), '_' - a subscript to enumerate an object within its
  compound object.

e_i stands for T-ransation's statements, W_t:s are workers.
Effectively the epoch breaks into modified branches executed by
workers on one to one basic:

   E := { T'_1, T'_2, ... T'_k }

here 'k' is the number of workers engaged, T' indicates the modified
transaction.
The branches are executed until they are ready to prepare which event
is triggered by scheduling of the last statement of the last original
sub-transaction.

It's clear that sizes of the modified branches are even this time.
The last statement worker coordinates 2pc.

Thinking more technically workers can consume/pull from
the current epoch presented as a queue which is pushed into
by statement producer.

Here is a possible state of the queue when m:th statement of T_n is
about to be pushed:

e^n_k ->


 [ e^n_m-1, ..., e^n_2, e^n_1;    ...;       e^1_l1, ... e^1_2, e^1_1 ]

   ...----- T_n ------------| T_n-1,...T_2   |---------- T_1 ---------| 

A pull by consumer at this point would return e^1_1.

Very likely that by the epoch prepare event *all* the branches have been already
ready for prepare().

This pipelining parallelization method can work for the single "large"
transaction in the binlog group and also could drag into the
parallelizable input transactions from later groups if we additionally
create master side dependency tracking (think of mysql logical
timestamp method).
Also notice that the optimisitic scheduler is orthogonal to this
method so the two can be combined.

Consistency concern
-------------------

The epoch parallel execution (scheduling) must respect
intra-transaction statement dependencies (e.g FIFO execution order of
operations over the same object [record, table]). There is no
inter-transaction dependencies, at least in theory.

The notion of a leader, the two-phase-commits leader, remains roughly
the same as in the existing conservative scheduler. This time more
than initiating commits of the epoch branches it also takes care to
advance the slave gtid execution status accordingly.
It could be implemented as the very last statement of its own T' branch.

It's clear that causal dependencies of an user write and a read (to
find the effect of that write) can be tracked by WAIT_FOR_GTID() as
currently.

A plain two-phase commit does admit possibility to catch on slave combinations
of the old (pre-epoch) and new version (post-epoch) or some objects of
the same transaction if the user hits with non-locking SELECT in a
"tiny" time window between commit():s of two branches
like in the following example.

Let t1,t2 some tables and the binlog group consists of just one
transaction

     T_1 := { update(t1), update(t2) }

Let on slave it's modified into two parallel ones:

   T'_1 := { update(t1) } || T'2 := { update(t2) }

After both respond OK to 2pc prepare() and then the leader initiates
commit() where could be 4 possible result sets to the contemporary SELECT

   SET @@session.tx_isolation='REPEATABLE-READ';
   SELECT * from t1 as t_1, t2 as t_2;
   => ...

While this is perhaps more than imperfect, I believe it may be
relevant only to remote corner cases, but more importantly, we still can
refine that doing some *adjustments* in the Engine studied briefly
myself of what would be a sub-transaction in Innodb,
"ideologically" endorsed  and guided by Marko Makela.
The idea is to hold the branch's locks even after the branch commits,
and delegate their release to the leader.

Recoverability
--------------

When slave is configured to log in the binary log
we must make sure to rollback any prepared
epoch branches at the server recovery unless the last branch (of the
leader) is also there, prepared.

When it does not binlog the problem is that the modified branch's binlog
images can't be just flushed one after another especially
if the user specifies to log on the slave uniformly with the master
that is to preserve the original sub-transaction structures and their
boundaries.
In such case there are several multiple option, here are some:

- use relay-logged image of original sub-transactions (may be limited
  to the  master and slave version combination though;)

- reconstruct original sub-transaction's binlog images. That is
  binlog events "belonging" to an original sub-transaction will be
  cached cooperatively by workers that took share of the
  original sub-transaction execution.

  Optionally at least the caching could be set to allow out
  of order while the statements are parallelizable, like in the above
  T1(t1,t2) example. Otherwise the ordering will be preserved through
  implementing the shared cache as as a queue with an interface
  to insert an item into the middle.

- exotics like to create the slave side "merge" GTID which would
  embed the original "commit". (The quotes here to allude to the Git's
  way).

While it's a serious subject I am leaving out mariabackup for this moment.
I believe it will be feasible to adjust its logics to
account as prepared only such transactions that that
are prepared in all their branches.

More about pipeline style optimization
--------------------------------------

As mentioned workers can be made self-serving to assign events for
execution at once when become available. That must turn to think of
producer-consumer model, potentially lock-free queues, dynamical # of
workers to stay in balance with the receiver's pace etc.

I think *eventually* we will need to assign the producer role to
the IO (receiver) thread, and convert the current SQL (driver) thread
into (relay-) logger (we still may have to have such logger for
the semisync replication, the relay logger can be opted out
otherwise).
Here is a picture

IO_thread.push()
   |                                  +---- worker.pull()
   V                                  V
  e^s_i -> [  e^s_i-1, ... e^s_1 }, T_s-1, ... ]
                                      ^
                                      |
                                      SQL.pull()

where on the right hand side the two consumers handle
a transaction and when both of them have done with it
(a worker has read its last event and the logger written
it to the relay-log) the query element becomes
garbage-collectable.