← Back to team overview

maria-developers team mailing list archive

Re: [Maria-discuss] Pipeline-stype slave parallel applier

 

Let me expand on a couple of points in the last reply.

> Kristian,
>
>> andrei.elkin@xxxxxxxxxx writes:
>>
>>> I am yet to check closer thd_rpl_deadlock_check(), perhaps that's why
>>
>> Oh.
>>
>> I definitely think you should get well acquainted with existing parallel
>> replication in MariaDB to undertake a project as ambitious as this one.
>> There are a number of central concepts and algorithms that synergies
>> together, including (off the top of my head):
>>
>>  - binlog group commit, which uses the commit_ordered mechanism to
>>    efficiently perform commits for multiple threads and coordinate with
>>    storage engines.
>>
>>  - wait_for_prior_commit, which works with the binlog group commit to
>>    scalably handle dependencies between replication workers.
>>
>>  - thd_rpl_deadlock_check(), which works with supporting storage engines to
>>    detect conflicts between transactions replicated in parallel.
>>
>>  - rpl_parallel_entry and group_commit_orderer, which efficiently
>>    coordinates scheduling between worker threads (Jean François' benchmarks
>>    show this scaling to 5000 threads).
>>
>>  - And behind it all the strict ordering of commits, which underlies many
>>    assumptions around parallel replication as well as GTID and allows things
>>    like optimistic/aggressive parallel replication in a way that is
>>    completely transparent to applications.
>
> Thanks for summing them all together!
> Apparently one has to understand these various parts interactions.
>
> And when it comes to thd_rpl_deadlock_check() let me learn along the
> way.
> While the feature as you put in comments is largely about optimistic schedulers
> it still does apply to some corner cases in the conservative one.
> You mention 'different optimizer decision' which hints to SBR specifics.
> Am I correct?
>
> However I am aware of conflicts in Innodb caused by mere different timing
> of lock acquisition of two parallel branches.
> So regardless of the format that clearly sets the deadlock/lock-waiting to the current agenda.
> And yet I am not sure *how* thd_rpl_deadlock_check() could be reused.
>
> We can notice that in the pipeline method any attempt to start waiting for a granted lock
> to another branch needs interception and erroring out.
> We are bound to this because the grated lock is to be released only after 2pc.
>
> Upon the statement errors out it will be reassigned
> to the lock grantee worker. If along this way the group still could not
> be finished, we could rollback all branches and turn to conservative
> scheduler, eventually to the sequential one.
>
> I say more relating to thd_rpl_deadlock_check() to follow up your
> optimistic thinking to schedule conflicting statements that way.
>
>>
>> One thing about pipeline-style slave parallel applier is still not clear to
>> me. How will conflicts between individual statements be handled? Eg. suppose
>> we have this group commit (epoch) on the master:
>>
>>   BEGIN;  /* T1 */
>>   INSERT INTO t1 VALUES  (...);  /* S1 */
>>   COMMIT;
>>   BEGIN;  /* T2 */
>>   UPDATE t1 SET b=10 WHERE a=1;  /* S2 */
>>   UPDATE t1 SET b=20 WHERE a=1;  /* S3 */
>>   COMMIT;
>>
>> So the idea is to schedule these two transactions (T1 T2) to three different
>> worker threads, right? Each thread gets one of (S1 S2 S3).
>> But we must not commit S3 before S2, obviously, or we end up with incorrect
>> result. So how will this dependency be handled?
>
> This is dependency tracking specifics.
> In RBR case (the whole group is row-format) S2 -> S3 can be only
> pessimistically estimated right now via reading table name from Table_map.
> In SBR case that you meant Query-log-events would have to carry table names.
> We could refine the estimate with help of enriched logging on master to collect
> PK values into the event.
>
> So having learned S2->S3 at assigning in the first approximation I
> suggest we schedule T2 as one transaction.

Naturally the first approximation admits unfairness when the input is
dependent. Yet we can safe that what is statistically corner cases.
Either (though it's not binary after all)

  with running optimistically as you've suggested, or

  with reinforcing master logging to track *intra*-transaction
  dependecies. Practically it's to record a set of 'db.table.record_id'
  arrays, an array per 'db.table'.
  Each array will be sorted per 'record_id' as part of a replication
  optimizer to build up
  non-dependant branches as equal as possible sizes.
  Also just a number
  of noticed dependecies could be checked by slave to its execution to
  conservative mode.


>
>>
>> I was assuming that this would be using the same mechanism as optimistic
>> parallel replication - in fact, I thought the point was that this is
>> precisely a clever generalisation of optimistic parallel replication,
>
> You have a very nice point and the dumb pessimistic method is refined by
> optimistic execution of *one* statement (whose cost to rollback is
> insignificant)!
>
> [Why not it be the only option...]
>
> More to that, somewhat similarly to waiting for prior trx to commit now
> we would wait for prior statements if they (may) depend. When an earlier ordered
> statement finds a locked row it should eliminate low-ranked current
> owner (could take the owner to reach the wait-for-prior-stmt
> commit). Otherwise it waits.

This obviously requires cross-group event enumeration and awareness of
potential dependency on an earlier scheduled event (of a smaller sequence number).

>
>> realising that transaction boundaries need not be respected. And since epoch
>> does not help with the dependency between S2 and S3, I was confused of the
>> point of using it to detect lack of dependency between T1 and T2...
>>
>
>> But if you say that you had not considered using the thd_rpl_deadlock_check
>> mechanism, then I am confused as to how to detect that S3 needs to wait for
>> S2?
>
> Here how it would go the simple way which must have sense 'cos just
> statically S2->S3 can't be of common pattern.
>
> To the dependency tracking specifically, each event is associated with a
> descriptor of data it modifies  (e.g db.table pair right now, db.table.record_id later).
>
> When e_1 gets assigned it initiates a slave branch. Each
> branch remembers descriptors of all its events, so e_1 contributes to that.
> Next e_2 comes in the first action is to check if its data is handled in
> the e_1's branch. When not a second branch is forked out. The process
> goes on up to a maximum number of branches. When that is reached
> the following events get assigned to the branches through first
> data descriptor checking and next "round robin" if need. Round-robin is
> just for the concept, the actual plan is to serve it to the first
> available worker, or almost the first affable in the common case sense
> (meaning my *conservative* handling of S2->S3, but I'm transiting to
> start off with your optimization at once).
>
>>
>>> Assume out of order commit of the grouped original transaction,
>>> GTID gaps would exist some little time but as long as there no crashes
>>> slave conservative (the group boundaries respective that is) execution
>>> would fill them;
>>> and in case of crash recovery Slave could specify
>>> the gaps at reconnecting (unless those transactions are the relay-log)
>>> for retransmitting.
>>> They would be logged out-of-order in the log-bin case, and may appearing
>>> as set (mathematically). However it's a range as the union
>>> of gtids, and it - the master range -  can be
>>> preserved as described along replication chain.
>>
>> I don't understand how MariaDB GTID can work unless the slave binlogs the
>> GTIDs in the exact same order as the master. At any time, a lower-level
>> slave may connect, and you have to find the right point at which to start
>> sending it events. There is only a single GTID to start from.
>
> This is an offtopic to the main discussion, but I could not resist to
> the urge of elaboration (but I indent the reply).
>
>    Consider the mysql method then. Roughly speaking a slave supplies a sort
>    of low-water mark of gtids that it has without any gaps and a sequence
>    of gtids atop of that which are responsible for the gaps. That's in the
>    general case, normally when slave was stopped gracefully there won't be
>    any gaps.
>
>    My statement that the master group as a range is an invariant which
>    holds (as long as it does break up) along replication chain.
>    When the master group is of 'k' number of
>    sub-transactions, and assuming no interruption of its replication
>    including the applying, its image on a slave is the same 'k' number
>    of the same sub-transactions:
>
>    [T_1, T_2, ... T_k] ->  ... replicate ...  -> [T_l1, T_l2, ..., T_lk]
>
>    here 1 <= li <= k, and internal indexes li <> ik are unordered.
>
>    The master group can break, but the fragments inherit the property
>    of sub-transaction commit in any order.
>
>    So in the out-of-order case a slave that was gracefully stopped
>    (and thus has applied the last master group fully)
>    requests to resume replication from some a state of like
>
>    [T_l1, T_l2, ..., T_lk]
>
>    which can be described by a single maximum gtid (max(li)).
>    In the general case of gaps like it's said the slave state description like
>
>    T_1, T_2, ... T_l^(wm), gap_l+1, T_l+2 ...
>
>    requires  T_l^(wm) - the watermark (desribing the left hand side of the
>    last master group committed sub-transaction) , and a sequence of T_l+2... that
>    are the rhs.
>    When the gaps are delivered and filled by execution the slave gtid state
>    is described by the single max(li), which is the end of the master
>    group commit recovery.
>
>>
>> Remember, it is not guaranteed that the GTIDs are in strict sequence number
>> ordering from the master! Things like master-master ring or transactions
>> executed directly on the slave can break ordering when gtid_strict_mode is
>> not set.
>
>    As you could see in my description there is some strictness implied.
>    Namely, if we enumerate the master groups, their sequence number grows
>    monotonically (abstracting from the group breakup for simplicity).
>
>> Suppose you have GTIDS 0-1-100,0-1-99 in the log and a lower-level
>> slave connects at GTID 0-1-100. How will you know whether you need to send
>> 0-1-99 to the slave (because of out-of-order master) or if you need to skip
>> it (because we did parallel execution out-of-order)?
>
>    If I got your point right,
>    in my case the slave would come with its watermark gtid which should be
>    less or equal to that of master in order to master to accept the slave.
>    So the slave requesting to start feading from 0-1-100 (watermark) would mean
>    it has 0-1-99 applied.
>
>>
>> Or if binlogging is changed so that it uses the slave relay logs instead
>> (identical to master binlogs), but replicated transactions are committed to
>> storage engines out of order. Then how do we remember which parts are
>> committed in storage engines, so we can recover from a crash?
>
> This is also a practical question to the main agenda!
> In the out-of-order but preserving original sub-transaction case
> the group sub-transactions are logged into binlog as a permutable range.
>
> There should be a method to locate the first and last sub-transactions
> in there, regardless of occurred ordering.
> In mysql such method bases on the low watermark and tracking file:pos
> of it.
> In the pipeline it would be natural to maintain the last group end-of
> file:pos like the watermark is always at the beginning of the current group.
>
> I paid some thinking to pipeline recovery to conclude positive about
> feasibility all necessary steps. I shall describe that in a separate
> document.
>
>
>>
>> In current MariaDB parallel replication, the enforced commit ordering
>> ensures that a single GTID position is enough to recover the slave from a
>> crash, as well as to connect a lower-level slave. It also guarantees to
>> users that they can enable parallel replication without introducing any new
>> read-inconsistencies.
>>
>>> Right, those 40% of rollbacks we are going to trade with extra
>>> productivity :-).
>>
>> But the point is - those 40% rollbacks were beneficial. The main cost of the
>> transaction was the the I/O to read affected pages from disk - so running
>> future transactions in parallel is a win even if we know that they need to
>> be rolled back and retried later.
>
> I got it now. We still can have this effect when run the dependent
> statements optimistically.
>
>>I/O latency is perhaps the most important
>> slave bottleneck to address with parallel replication, and speculative apply
>> can work as a pre-fetcher.
>>
>>> Very true. On one hand we can go blind "speculatively" direct ready to
>>> pay rollback price, on the other already Master does detect (some) conflicts, and
>>> the actual conflict-free range of transactions is wider than a mere one
>>> binlog group (mind a reference to the logical clock that can "measure" dependencies).
>>
>> Can you btw. point me to a detailed description of how logical clock
>> works?
>
> No detailed description that I am aware of (partly through my personal
> guilt: my blog was ready, but somehow it never saw the light of the day).
> But it is not that difficult to grasp from the following observation.
>
> When a transaction is prepared it is assigned with a sequence number
> as a logical timestamp from a shared logical clock (to step the clock).
> Also at this point another logical timestamp is acquired with describes
> the low watermark of the last committed. The latter clock is *updated*
> rather than steps monotonically by the value of being committed
> transaction (if they were to commit in order this commit clock would also step).
> The two timestamp comprise a pluralization range which meaning is that
> the transaction of the lhs is parallelizable with any in the range up to
> the rhs; it's something like this in the binlog:
>
> #190207 00:08:31 ... last_committed = 1201  sequence_number = 1203
>
>
>> My impression is that this has still limited scope for parallellisation if
>> the master has fast commits, but not knowing the details I may be missing
>> something...
>
> I agree with your intuition. The group size must be proportional to
> the ratio of cost(prepare+commit) / cost(body).
>
>>
>>> Although the assignment role of the producer is clearly better off be
>>> taken by the workers. What I meant in the original mail
>>> that the assignment mechanism can be improved to avoid any chance of
>>> worker idling but more importantly to facilitate fairness.
>>> The workers go to pick pieces of work at once they become
>>> available. The optimistic scheduler assignment is close to that but
>>> it still assigns only round robin that is speculatively.
>>
>> I don't understand "the assignment role of the producer is clearly better
>> off be taken by the workers". How would this reduce workers idling? I do not
>> think the current parallel replication scheduler leaves workers idle?
>
> The main difference of pushing in round-robin vs pulling from a shared
> queue is that the first activity is speculative (hoping the worker will
> pick up soon).
> With unfair load SHOW PROCESSLIST proves that wrong with a common
>
>  "Waiting for work from SQL thread"

which currently happens to the conservative scheduler (for the
reason of unequal group (sub-)transaction sizes).

In the pipeline case the driver thread would remain with the speculative
mood, which the worker's self-assigning is alway precise.

Cheers,

Andrei


>
> On the other hand the pulling is volunteer and assuming that the queue
> always has something the worker becomes busy non stop until prepare at
> least (perhaps we could involve it into as well).
> Optimistic version this method would be to leave the prepared modified
> branch to the hands of the leader and switch to the next group's events.
>
>>
>> On the other hand, if workers are to individually pull work from a shared
>> queue, then they will all be contending for access to the queue (imagine
>> 5000 threads doing this at once...). The idea with the existing scheduler is
>> that this does not happen - each worker has its own queue, so contention is
>> reduced.
>
> Notice that contention time lasts for as "long" as it takes to *mark* a queue
> item as taken. So it can't be of an issue.
> There must be a purging mechanism, but it
> does not overlap with access the hot part of the queue. 
>
>
>>
>> (Just to be clear, I'm trying to understand here, the current MariaDB
>> scheduler is obviously not perfect).
>
> I read that too and very distinctively (and I meant the first part mostly :-)).
>
>>
>>>> In fact, if the existing mechanism can be slightly extended, maybe it can
>>>> already do much of what is needed to ensure consistency. Eg. suppose we have
>>>> 3 original transactions each with two statements:
>>>>
>>>>   e^1_1 e^1_2  e^2_1 e^2_2  e^3_1 e^3_2
>>>>
>>>> Suppose the first 3 of these are ready to commit at some point:
>>>>
>>>>   e^1_1 e^1_2 e^2_1
>>>>
>>>> The leader can check that it doesn't try to commit only part of an original
>>>> transaction.
>>>
>>> Probably I will correct your initial perception here (otherwise I am
>>> lost in how e^2_1 got separated).
>>> In this case you mean three (original) T:s (E) are mapped into a modified
>>>
>>> T'_1 = {e^1_1 e^1_2 e^2_1}
>>>
>>> and some more e.g just one T'_2 unions of
>>>
>>>  T'_2 \cup T'_1 =  T_1 \cup T_2 \cup T_3  =  E
>>>
>>> As the leader can't initiate 2p-commit without T'_2.
>>
>> To illustrate what I mean, take my earlier example, with two transactions T1
>> and T2 containing individual statements S1, S2, and S3.
>>
>> My understanding is that on the slave, we will start 3 transactions for S1,
>> S2, and S3, each running in its own worker threads. Right?
>>
>> So suppose S1 and S2 finish and are ready to complete, but S3 is still
>> running. Now, we cannot just commit S1 and S2, as that would leave us in an
>> inconsistent state (in case of crash for example). So we should either
>> commit just S1 at this point. Or we could wait for S3 and then group commit
>> all of S1, S2, and S3 together. Right?
>
> True. My preference is to wait for S3 which preserves the master group
> with all bonuses of doing that.
>
>>
>> Or is there some mechanism by which two different threads could run S2 and
>> S3 in parallel, but still within a single transactions (inside InnoDB eg?) -
>> I imagine this is not the case.
>
> We are certainly going to that direction. Let me shy off detailing in
> already a day long mail :-), anyway it's
> better off to show a patch that templates an idea.
>
>>
>>> Sorry, previously I omitted one important detail. A Worker
>>> pulls from the share queue its tail statement not only for itself.
>>> When the pull result is a dependent statement whose parent has been or is in
>>> processing by another worker, that statement will be redirected to that
>>> worker.
>>
>> Right - but how will you determine whether there is such a dependency? It
>> seems very hard to do in the general case at pull time (where we haven't
>> even parsed the statement yet)?
>
> Not really if we will be content with pessimistic estimates.
> But I already admitted your optimistic "encouragement"!
>
>>
>>> With some efforts we can make use of relay-log for both recovery and
>>> as binlog for further replication. In above I hinted at the latter.
>>> So why won't the slave worker "appropriate" the master binlog images? :-)
>>>
>>> While the master and slave server version are the same this optimization
>>> must be feasible.
>>
>> Aha, I see. This is something like a built-in binlog server.
>>
>> I agree this seems feasible. Surely it is a separate project from this (of
>> course one may benefit from the other).
>>
>> In a sense, this is the "natural" way to do things. Why go to all the
>> trouble of trying to re-produce the original binlog from executing
>> individual statements, when we already have an (almost) copy directly from
>> the master?
>
> It's a shame it's not there already long time ago :-)
>
>>
>> Still, this may not be as easy as that... for example, how do we interleave
>> the multiple binlogs from multi-source replication in a way that is
>> consistent with the actual execution order? And what about extra
>> transactions executed directly on the slave?
>
> Binlog groups from multiple sources, including the slave local ones,
> must remain as the original groups at binary logging.
> This can fail only through a crash.
> Recovery has not been scrutinized here much, it needs a description
> that I owe.
>
> "Lastly" I need to detail any possible group split and its consequence.
>
> Many thanks for this round of discussion and I am all ears to hear more!
>
> Andrei
>
>
>>
>> Thanks,
>>
>>  - Kristian.
>
> _______________________________________________
> Mailing list: https://launchpad.net/~maria-developers
> Post to     : maria-developers@xxxxxxxxxxxxxxxxxxx
> Unsubscribe : https://launchpad.net/~maria-developers
> More help   : https://help.launchpad.net/ListHelp


Follow ups

References