← Back to team overview

maria-developers team mailing list archive

Re: [Maria-discuss] Pipeline-stype slave parallel applier


Kristian, salute!

> Thanks for the interesting idea and detailed description, Andrei! I have
> written some initial comments inline, below:

It was a great piece of input, few notes are taken. I stumbled at one
point though where I struggled to find out what made you apparently
misled (see your example of a strayed e^2_2 event). Made my best to
amend it, but your turn is needed.

>> entering the binlog group (ordered) commit module. They can end up
>> into binlog as A,B or B,A, and regardless of that (Gtid actually) order
>> also be acknowledged to the user in either way.
>> The latter fact infers that slave does not have to commit the group
>> sub-transaction/branches in the GTID order. It could be done in any
>> order including no order at all like to do so as one epoch commit.
> However, MariaDB GTID requires that transactions be binlogged in the same
> order on the slave as on the master. Otherwise it is not possible to move a
> lower-level GTID slave from one master to the other, as the slave position
> only includes a single GTID in MariaDB.

And that's one of two technical obstacles that the proposed method
needs to address. However we shall face it.
Assume out of order commit of the grouped original transaction,
GTID gaps would exist some little time but as long as there no crashes
slave conservative (the group boundaries respective that is) execution
would fill them;
and in case of crash recovery Slave could specify
the gaps at reconnecting (unless those transactions are the relay-log)
for retransmitting.
They would be logged out-of-order in the log-bin case, and may appearing
as set (mathematically). However it's a range as the union
of gtids, and it - the master range -  can be
preserved as described along replication chain.

(I've not called here for doing that. We continue to treat grids as
gap-free ranges for now)

>> conservative scheduler offers a decent parallelization method still suffering from 
>> the aforementioned uneven branch's sizes and inter-thread
>> communications caused by ordering. The optimistic scheduler remedies to some level
>> but not fully (an optimistically executed group may be unbalanced too)
>> and even conceding some risk of performance degradation (rollback is costly).
> Jean François Gagné's comprehensive benchmarking of parallel replication
> actually has some very interesting data on this. He ran a real Booking.com
> workload with literally thousands of worker threads and had a huge rollback
> ratio (I remember it as up around 40%), and still saw speedup and good
> scalabitily. Apparently rollback isn't necessarily so costly; maybe because
> replication servers often have spare cores idling anyway, and because the
> aborted transaction acts as a pre-fetcher, improving I/O parallelism.
> But the fact that thousands of threads still improved speed suggests exactly
> that there was a problem of uneven transaction size - a few large
> transactions spread far apart, so lots of transaction lookahead was needed.
> I do not know if those transactions were single large queries or contained
> many individual statements, but in the latter case this is exactly what this
> idea could help with!

Right, those 40% of rollbacks we are going to trade with extra
productivity :-).

>> There is an idea reminiscent to pipelining to handle the epoch (the
>> master binlog group) as a single transaction but in parallel. That is
>> to execute its statements by multiple workers, this time scheduling
>> them fairly, and "atomically" commit their works.
> So this is a very interesting idea. Once the transactions are recorded in
> the binlog, we have a serial stream of statements that will reproduce the
> original data on the master. And they will do this regardless of if we
> remove some of the COMMITs/BEGINs - or if we add some. (I wonder if there
> are any exceptions to this, I can't think of any off the top of my head).
> So instead of trying to parallelise transactions, just do all the individual
> statements in parallel, disregarding transaction boundaries (with some
> restrictions to ensure consistency, as you describe below).
> I wonder how much this will benefit real-life workloads. There seems to be a
> lot of potential. For some workloads it will not help - if most transactions
> are single-statement, or if the multi-statement transactions have
> dependencies between the statements. But that doesn't seem likely to be the
> typical scenario.

My lovely example is a stream of epochs of 1:1000 ratio between the smallest and largest
transactions in the group, feasible with 1000 master connections. The
Amdahl's law asymptotic estimates  1 / (1 - 0.999) speedup.

I wish we'll have it done ... and gained even more exactly in this case.

> But why tie this to the master binlog group? It seems to fit perfectly to
> the optimistic parallel replication mode, where different transactions are
> run in parallel speculatively, and any conflicts are detected and resolved.
> In fact, it seems the exact same mechanism will work to detect conflicts
> between the more fine-grained per-statement parallelisation.

Very true. On one hand we can go blind "speculatively" direct ready to
pay rollback price, on the other already Master does detect (some) conflicts, and
the actual conflict-free range of transactions is wider than a mere one
binlog group (mind a reference to the logical clock that can "measure" dependencies).

> The conservative parallle replication I consider obsolete. It causes a lot
> of hassle for the user to try to get good-sized group commits on the master
> to ensure parallel replication opportunities on the slave. And it doesn't
> even ensure consistency; there are corner cases (in InnoDB and elsewhere)
> that can cause conflicts to appear anyway on the slave, so the
> rollback+retry mechanism is needed anyway for conservative, just like for
> optimistic.

Indeed, asymmetry of lock conflicts in Innodb was the last time I saw
it in the mysql parallel replication though. Such known threats can be
evaded through reordering of vulnerable statements.

>> To the fairness part, workers get assigned by a statement at a time
>> from the epoch transaction(sub-transaction BEGIN, COMMIT removed).
>> Say 'E' an epoch consists of 's' of sub-transaction
>>   E := { T_1, T_2, ... T_s }
>> For each m from 1 to s sub-transactions T_m gets scheduled to some worker
>>          W_1   W_2 ... W_k
>>           |     |       |
>>           V     V       V
>> T_m :=  { e_1, e_2, ... e_l }
>>   Hereinafter I use the TeX math notation, '^' - a superscript attached to
>>   the event to designate its parent transaction sequence number (Gtid
>>   seq_no), '_' - a subscript to enumerate an object within its
>>   compound object.
>> e_i stands for T-ransation's statements, W_t:s are workers.
>> Effectively the epoch breaks into modified branches executed by
>> workers on one to one basic:
>>    E := { T'_1, T'_2, ... T'_k }
>> here 'k' is the number of workers engaged, T' indicates the modified
>> transaction.
>> The branches are executed until they are ready to prepare which event
>> is triggered by scheduling of the last statement of the last original
>> sub-transaction.
>> It's clear that sizes of the modified branches are even this time.
> Well... each worker has exactly one statement at a time, but each statement
> can be very different in execution cost. But more even than
> transaction-based scheduling, certainly.

Let me elaborate, the claim is still not far stretched.
The row-based events can be further split apart, so the
difference in prepare readiness is
at most one ha_{write,update,delete}_row() which still
were scheduled ahead and are executed in parallel
with the very last statement of updating `mysql.gtid_slave_pos`
by the leader.

This ultimate row-level granularity chase may be deferred though.

>> The last statement worker coordinates 2pc.
>> Thinking more technically workers can consume/pull from
>> the current epoch presented as a queue which is pushed into
>> by statement producer.
>> Here is a possible state of the queue when m:th statement of T_n is
>> about to be pushed:
>> e^n_k ->
>>  [ e^n_m-1, ..., e^n_2, e^n_1;    ...;       e^1_l1, ... e^1_2, e^1_1 ]
>>    ...----- T_n ------------| T_n-1,...T_2   |---------- T_1 ---------| 
>> A pull by consumer at this point would return e^1_1.
> How do you imagine integrating this in the current parallel replication
> scheduling algorithm?
> In the current algorithm, the producer (the SQL thread) assigns work to
> worker threads in a round-robin way. There is a lot of care taken to
> minimise contention on a single global queue-lock or similar, and I think
> that is an important reason why Jean François' tests were able to scale so
> well to many thousand worker threads.
> I wonder if the existing scheduling could not be used directly for this idea
> also - but it needs to be extended of course to schedule individual
> statements rather than whole transactions, with some way to let worker
> threads coordinate transaction start/end between them.
> What are your thoughts on this?

I thought to reuse from the conservative scheduler as much as possible.
We could (and the plan is do so for initial comparative benchmarking)
leave the driver thread as the assigner.

Although the assignment role of the producer is clearly better off be
taken by the workers. What I meant in the original mail
that the assignment mechanism can be improved to avoid any chance of
worker idling but more importantly to facilitate fairness.
The workers go to pick pieces of work at once they become
available. The optimistic scheduler assignment is close to that but
it still assigns only round robin that is speculatively.

I've been considering a shared circular buffer as communication channel
between a producer (actually could be the IO thread, read below) and
workers (consumers).
It must feature lock-free potentially or most of the time actually (Why won't it be
Boost.Circular Buffer..).

The push by producer (the driver as of current, but the IO can be as
well, I expand that below) and pull by worker to
the buffer should *not* be computationally more
costly than the current push/pull to/from the private worker
queues. The cost just must be less because of the fairness of model.
Conservative, optimistic and "fair" schedulers all can use it.

>> Very likely that by the epoch prepare event *all* the branches have been already
>> ready for prepare().
> Hm, any particular reason you think so? I would imagine the opposite, that
> individual statements will be scheduled quickly, and it will be quite random
> in which order they happen to become ready in?

Like I said above, I meant the Row format statement which we technically
can refine further to sub-statements each of cost of one handler call.

>> This pipelining parallelization method can work for the single "large"
>> transaction in the binlog group and also could drag into the
>> parallelizable input transactions from later groups if we additionally
>> create master side dependency tracking (think of mysql logical
>> timestamp method).
>> Also notice that the optimisitic scheduler is orthogonal to this
>> method so the two can be combined.
>> Consistency concern
>> -------------------
>> The epoch parallel execution (scheduling) must respect
>> intra-transaction statement dependencies (e.g FIFO execution order of
>> operations over the same object [record, table]). There is no
>> inter-transaction dependencies, at least in theory.
> In fact, there do exist such inter-transaction dependencies in a number of
> corner cases,


> at least if we want to enforce the same commit order on slave
> as on master, which is required for MariaDB GTID. But that is already
> handled in current code, by detecting the dependency inside the storage
> engine (the thd_rpl_deadlock_check() mechanism).

so we shall continue with this method, and at the same time to keep a "repository"
of corner cases in order to consult it at assignment decision
(to schedule suspicious to hidden dependency events to one worker).

> I wonder if the exact same thd_rpl_deadlock_check() mechanism would not work
> as well for this per-statement scheduling idea? It would be interesting to
> see a proof-of-concept based on this...

So the question is how we would recover from a deadlock now involving
modified transactions, correct?

I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why
I can't see what could require extra to the existing retry mechanisms
which is about to retry the whole group.
But it certainly simpler (as no binlogging action can take place yet)
than crash-recovery which I highlighted to some details.

>> The notion of a leader, the two-phase-commits leader, remains roughly
>> the same as in the existing conservative scheduler. This time more
>> than initiating commits of the epoch branches it also takes care to
>> advance the slave gtid execution status accordingly.
>> It could be implemented as the very last statement of its own T' branch.
> Hm, right, interesting, agree that this seems a good fit.

> In fact, if the existing mechanism can be slightly extended, maybe it can
> already do much of what is needed to ensure consistency. Eg. suppose we have
> 3 original transactions each with two statements:
>   e^1_1 e^1_2  e^2_1 e^2_2  e^3_1 e^3_2
> Suppose the first 3 of these are ready to commit at some point:
>   e^1_1 e^1_2 e^2_1
> The leader can check that it doesn't try to commit only part of an original
> transaction.

Probably I will correct your initial perception here (otherwise I am
lost in how e^2_1 got separated).
In this case you mean three (original) T:s (E) are mapped into a modified

T'_1 = {e^1_1 e^1_2 e^2_1}

and some more e.g just one T'_2 unions of

 T'_2 \cup T'_1 =  T_1 \cup T_2 \cup T_3  =  E

As the leader can't initiate 2p-commit without T'_2.

> In this case it sees that e^2_1 is missing e_2_2, and it can
> commit only the first part and leave the partial transaction for a following
> group commit:
>   e^2_1
> This way, we already ensure that a crash will not leave us with an
> inconsistent storage engine state on the slave. Because we never commit part
> of an original master transaction - all parts of it are always
> group-committed together. Seems promising, though that code is also heavily
> optimised for scalability, so will require some case.

Or you're trying to extend the method to collapse few master groups..
(But in this case the leader would also initiate commit at the
boundary of that "big" epoch.)

> Hm, one case needs to be handled though - when there are more statements in
> one original master transaction, than there are worker threads - otherwise
> we end up with a deadlock.

Sorry, previously I omitted one important detail. A Worker
pulls from the share queue its tail statement not only for itself.
When the pull result is a dependent statement whose parent has been or is in
processing by another worker, that statement will be redirected to that

Does this dismiss your deadlock point?

>> It's clear that causal dependencies of an user write and a read (to
>> find the effect of that write) can be tracked by WAIT_FOR_GTID() as
>> currently.
>> A plain two-phase commit does admit possibility to catch on slave combinations
>> of the old (pre-epoch) and new version (post-epoch) or some objects of
>> the same transaction if the user hits with non-locking SELECT in a
>> "tiny" time window between commit():s of two branches
>> like in the following example.
>> Let t1,t2 some tables and the binlog group consists of just one
>> transaction
>>      T_1 := { update(t1), update(t2) }
>> Let on slave it's modified into two parallel ones:
>>    T'_1 := { update(t1) } || T'2 := { update(t2) }
>> After both respond OK to 2pc prepare() and then the leader initiates
>> commit() where could be 4 possible result sets to the contemporary SELECT
>>    SET @@session.tx_isolation='REPEATABLE-READ';
>>    SELECT * from t1 as t_1, t2 as t_2;
>>    => ...
> Well, due to the enforced commit order, it will not be possible to see the
> case where update(t2) is committed but update(t1) is not.
> In fact, if the group commit is extended as suggested above (to only
> group-commit the statements of an original transaction together),

(always meant!)

> then I
> think it is possible for the user to avoid this problem entirely using
> In MariaDB, this ensures a consistent read view against 2pc transactions
> (reader will always see all of the 2pc commit or none of it).

I missed to involve this what was recently a novelty (to myself)!

> And IIRC, it
> works by holding the same lock (LOCK_commit_ordered) that is used by the
> group commit leader. So if the user has some special case where this minor
> read-inconsistency is sensitive, the START TRANSACTION WITH CONSISTENT
> SNAPSHOT mechanism can be used to avoid it completely.

Just underscores how good the SNAPSHOT feature is then :-)!

>> While this is perhaps more than imperfect, I believe it may be
>> relevant only to remote corner cases, but more importantly, we still can
>> refine that doing some *adjustments* in the Engine studied briefly
>> myself of what would be a sub-transaction in Innodb,
>> "ideologically" endorsed  and guided by Marko Makela.
>> The idea is to hold the branch's locks even after the branch commits,
>> and delegate their release to the leader.
>> Recoverability
>> --------------
>> When slave is configured to log in the binary log
>> we must make sure to rollback any prepared
>> epoch branches at the server recovery unless the last branch (of the
>> leader) is also there, prepared.
>> When it does not binlog the problem is that the modified branch's binlog
>> images can't be just flushed one after another especially
>> if the user specifies to log on the slave uniformly with the master
>> that is to preserve the original sub-transaction structures and their
>> boundaries.
> "if the user specified" - but this is always a requirement in MariaDB,
> right? Otherwise GTID order is broken?

It meant a possibility of gaps that I advertise above where the gaps
are property of execution, but the integrity of master groups is
never violated regardless of how long is their replication chain travel.

>> In such case there are several multiple option, here are some:
>> - use relay-logged image of original sub-transactions (may be limited
>>   to the  master and slave version combination though;)
> I did not follow what this means - can you elaborate?

With some efforts we can make use of relay-log for both recovery and
as binlog for further replication. In above I hinted at the latter.
So why won't the slave worker "appropriate" the master binlog images? :-)

While the master and slave server version are the same this optimization
must be feasible.

>> - reconstruct original sub-transaction's binlog images. That is
>>   binlog events "belonging" to an original sub-transaction will be
>>   cached cooperatively by workers that took share of the
>>   original sub-transaction execution.
>>   Optionally at least the caching could be set to allow out
>>   of order while the statements are parallelizable, like in the above
>>   T1(t1,t2) example. Otherwise the ordering will be preserved through
>>   implementing the shared cache as as a queue with an interface
>>   to insert an item into the middle.
> Isn't it possible to do the binlog caching locally, each worker caching just
> the binlogging for its own statement? If we make sure they group-commit
> together, they will be written in sequence into the binlog, so they can just
> write each their part one after the other?

Well, the aimed assignment policy is 'to the first available
worker'. Doing that way T':s - the modified branches - are determined
dynamically so you can see that logging them as they are would
only satisfy the epoch boundaries. Internally the original T:s would
be interleaved.

However all input events in the epoch are totally enumerated, and each input
event maps to an output one that will inherit the enumeration
then the leader - that flushes to binlog file - would be able to
simulate a compatible slave binlog epoch.

>> - exotics like to create the slave side "merge" GTID which would
>>   embed the original "commit". (The quotes here to allude to the Git's
>>   way).
>> While it's a serious subject I am leaving out mariabackup for this moment.
>> I believe it will be feasible to adjust its logics to
>> account as prepared only such transactions that that
>> are prepared in all their branches.
>> More about pipeline style optimization
>> --------------------------------------
>> As mentioned workers can be made self-serving to assign events for
>> execution at once when become available. That must turn to think of
>> producer-consumer model, potentially lock-free queues, dynamical # of
>> workers to stay in balance with the receiver's pace etc.
> So are you planning to implement a completely new parallel replication
> scheduler, rather than build on the existing one?

The actual agenda is of multiple activities that could be prioritized
(if at all) for different phases/times. It consists of:

-  a new type of scheduler that features 2pc rather that the orderly committing.
   It requires *a* to change granularity of input to workers and *b*
   handle statement dependencies. The SQL thread may remain
   as the current scheduler,

and a number of optimization potentially useful by the current schedulers 
too, including
- changing the assigning to be self-direct by Workers;
  effectively that leaves the SQL thread a role of relay-log reader,
  or it could be turned into relay-logger altogether (to relax the IO
  thread from that burden, why is below);
  also the relay-logging could be turned into an user option in which
  case or regardless of it;

- the IO thread to turn from the logger into a mere event producer
  that queues into a memory buffer from where the consumer such as
  the relay-logger (SQL thread) and worker threads pull.
  This apparently streamlines event execution and in combination with
  the scheduler's fairness should give us I dare to say a maximum that
  replication is capable of. When always limited buffer size
  comes to play the IO thread will yield its producer role e.g to the
  logger thread (the SQL) which takes care to queue into the shared
  buffer now through reading into it from the relay-log.

>> I think *eventually* we will need to assign the producer role to
>> the IO (receiver) thread, and convert the current SQL (driver) thread
>> into (relay-) logger (we still may have to have such logger for
>> the semisync replication, the relay logger can be opted out
>> otherwise).
>> Here is a picture
>> IO_thread.push()
>>    |                                  +---- worker.pull()
>>    V                                  V
>>   e^s_i -> [  e^s_i-1, ... e^s_1 }, T_s-1, ... ]
>>                                       ^
>>                                       |
>>                                       SQL.pull()
>> where on the right hand side the two consumers handle
>> a transaction and when both of them have done with it
>> (a worker has read its last event and the logger written
>> it to the relay-log) the query element becomes
>> garbage-collectable.
> So overall, this was an interesting idea to see! I never thought before of
> doing parallelisation beyond the original transaction boundaries from the
> master. But now that you have described it, it actually seems quite a
> natural extension of the optimistic parallel replication, and it fits very
> well with the fundamental principle in MariaDB replication of preserving
> strict GTID ordering, which might work to handle much of the consistency
> requirements.
> It would be quite interesting to see this in practice on real workloads and
> with benchmarks - I could imagine very large improvements for some
> workloads.
> Though I suppose you shouldn't underestimate the work required to
> complete something like this in production quality (the devil is in the
> detail, once you get to nasty things like temporary tables and
> non-transactional engines and other horrors that are part of the
> MySQL/MariaDB ecosystem).

You're absolutely right. We should handle that, first of all - together,
secondly with proper prioritization and finally
there's a modest hope from my side that these ideas represent a natural
evolution out of bloody :-) experience with the best of two (actually more !:-)
parallel replication realms.



Follow ups