1 : /* Copyright (C) 2007 Michael Widenius
2 :
3 : This program is free software; you can redistribute it and/or modify
4 : it under the terms of the GNU General Public License as published by
5 : the Free Software Foundation; version 2 of the License.
6 :
7 : This program is distributed in the hope that it will be useful,
8 : but WITHOUT ANY WARRANTY; without even the implied warranty of
9 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 : GNU General Public License for more details.
11 :
12 : You should have received a copy of the GNU General Public License
13 : along with this program; if not, write to the Free Software
14 : Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15 :
16 : /* Redo of index */
17 :
18 : #include "maria_def.h"
19 : #include "ma_blockrec.h"
20 : #include "trnman.h"
21 : #include "ma_key_recover.h"
22 : #include "ma_rt_index.h"
23 :
24 : /****************************************************************************
25 : Some helper functions used both by key page loggin and block page loggin
26 : ****************************************************************************/
27 :
28 : /**
29 : @brief Unpin all pinned pages
30 :
31 : @fn _ma_unpin_all_pages()
32 : @param info Maria handler
33 : @param undo_lsn LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write
34 : undo (like on duplicate key errors)
35 :
36 : info->pinned_pages is the list of pages to unpin. Each member of the list
37 : must have its 'changed' saying if the page was changed or not.
38 :
39 : @note
40 : We unpin pages in the reverse order as they where pinned; This is not
41 : necessary now, but may simplify things in the future.
42 :
43 : @return
44 : @retval 0 ok
45 : @retval 1 error (fatal disk error)
46 : */
47 :
48 : void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
49 5010033 : {
50 : MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*)
51 5010033 : dynamic_array_ptr(&info->pinned_pages, 0));
52 5010033 : MARIA_PINNED_PAGE *pinned_page= page_link + info->pinned_pages.elements;
53 5010033 : DBUG_ENTER("_ma_unpin_all_pages");
54 5010033 : DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
55 :
56 5010033 : if (!info->s->now_transactional)
57 3260750 : DBUG_ASSERT(undo_lsn == LSN_IMPOSSIBLE || maria_in_recovery);
58 :
59 10746777 : while (pinned_page-- != page_link)
60 : {
61 : /*
62 : Note this assert fails if we got a disk error or the record file
63 : is corrupted, which means we should have this enabled only in debug
64 : builds.
65 : */
66 : #ifdef EXTRA_DEBUG
67 5736744 : DBUG_ASSERT(!pinned_page->changed ||
68 : undo_lsn != LSN_IMPOSSIBLE || !info->s->now_transactional);
69 : #endif
70 5736744 : pagecache_unlock_by_link(info->s->pagecache, pinned_page->link,
71 : pinned_page->unlock, PAGECACHE_UNPIN,
72 : info->trn->rec_lsn, undo_lsn,
73 : pinned_page->changed, FALSE);
74 : }
75 :
76 5010033 : info->pinned_pages.elements= 0;
77 5010033 : DBUG_VOID_RETURN;
78 : }
79 :
80 :
81 : my_bool _ma_write_clr(MARIA_HA *info, LSN undo_lsn,
82 : enum translog_record_type undo_type,
83 : my_bool store_checksum, ha_checksum checksum,
84 : LSN *res_lsn, void *extra_msg)
85 379769 : {
86 : uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE +
87 : HA_CHECKSUM_STORE_SIZE+ KEY_NR_STORE_SIZE + PAGE_STORE_SIZE];
88 : uchar *log_pos;
89 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
90 : struct st_msg_to_write_hook_for_clr_end msg;
91 : my_bool res;
92 379769 : DBUG_ENTER("_ma_write_clr");
93 :
94 : /* undo_lsn must be first for compression to work */
95 379769 : lsn_store(log_data, undo_lsn);
96 379769 : clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, undo_type);
97 379769 : log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE;
98 :
99 : /* Extra_msg is handled in write_hook_for_clr_end() */
100 379769 : msg.undone_record_type= undo_type;
101 379769 : msg.previous_undo_lsn= undo_lsn;
102 379769 : msg.extra_msg= extra_msg;
103 379769 : msg.checksum_delta= 0;
104 :
105 379769 : if (store_checksum)
106 : {
107 52049 : msg.checksum_delta= checksum;
108 52049 : ha_checksum_store(log_pos, checksum);
109 52049 : log_pos+= HA_CHECKSUM_STORE_SIZE;
110 : }
111 327720 : else if (undo_type == LOGREC_UNDO_KEY_INSERT_WITH_ROOT ||
112 : undo_type == LOGREC_UNDO_KEY_DELETE_WITH_ROOT)
113 : {
114 : /* Key root changed. Store new key root */
115 507 : struct st_msg_to_write_hook_for_undo_key *undo_msg= extra_msg;
116 : pgcache_page_no_t page;
117 507 : key_nr_store(log_pos, undo_msg->keynr);
118 507 : page= (undo_msg->value == HA_OFFSET_ERROR ? IMPOSSIBLE_PAGE_NO :
119 : undo_msg->value / info->s->block_size);
120 507 : page_store(log_pos + KEY_NR_STORE_SIZE, page);
121 507 : log_pos+= KEY_NR_STORE_SIZE + PAGE_STORE_SIZE;
122 : }
123 379769 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
124 379769 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
125 :
126 :
127 : /*
128 : We need intern_lock mutex for calling _ma_state_info_write in the trigger.
129 : We do it here to have the same sequence of mutexes locking everywhere
130 : (first intern_lock then transactional log buffer lock)
131 : */
132 379769 : if (undo_type == LOGREC_UNDO_BULK_INSERT)
133 0 : pthread_mutex_lock(&info->s->intern_lock);
134 :
135 379769 : res= translog_write_record(res_lsn, LOGREC_CLR_END,
136 : info->trn, info,
137 : (translog_size_t)
138 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length,
139 : TRANSLOG_INTERNAL_PARTS + 1, log_array,
140 : log_data + LSN_STORE_SIZE, &msg);
141 379769 : if (undo_type == LOGREC_UNDO_BULK_INSERT)
142 0 : pthread_mutex_unlock(&info->s->intern_lock);
143 379769 : DBUG_RETURN(res);
144 : }
145 :
146 :
147 : /**
148 : @brief Sets transaction's undo_lsn, first_undo_lsn if needed
149 :
150 : @return Operation status, always 0 (success)
151 : */
152 :
153 : my_bool write_hook_for_clr_end(enum translog_record_type type
154 : __attribute__ ((unused)),
155 : TRN *trn, MARIA_HA *tbl_info,
156 : LSN *lsn __attribute__ ((unused)),
157 : void *hook_arg)
158 379769 : {
159 379769 : MARIA_SHARE *share= tbl_info->s;
160 : struct st_msg_to_write_hook_for_clr_end *msg=
161 379769 : (struct st_msg_to_write_hook_for_clr_end *)hook_arg;
162 379769 : my_bool error= FALSE;
163 379769 : DBUG_ENTER("write_hook_for_clr_end");
164 379769 : DBUG_ASSERT(trn->trid != 0);
165 379769 : trn->undo_lsn= msg->previous_undo_lsn;
166 :
167 379769 : switch (msg->undone_record_type) {
168 : case LOGREC_UNDO_ROW_DELETE:
169 13914 : share->state.state.records++;
170 13914 : share->state.state.checksum+= msg->checksum_delta;
171 13914 : break;
172 : case LOGREC_UNDO_ROW_INSERT:
173 71460 : share->state.state.records--;
174 71460 : share->state.state.checksum+= msg->checksum_delta;
175 71460 : break;
176 : case LOGREC_UNDO_ROW_UPDATE:
177 1671 : share->state.state.checksum+= msg->checksum_delta;
178 1671 : break;
179 : case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
180 : case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
181 : {
182 : /* Update key root */
183 : struct st_msg_to_write_hook_for_undo_key *extra_msg=
184 507 : (struct st_msg_to_write_hook_for_undo_key *) msg->extra_msg;
185 507 : *extra_msg->root= extra_msg->value;
186 507 : break;
187 : }
188 : case LOGREC_UNDO_KEY_INSERT:
189 : case LOGREC_UNDO_KEY_DELETE:
190 : break;
191 : case LOGREC_UNDO_BULK_INSERT:
192 0 : safe_mutex_assert_owner(&share->intern_lock);
193 0 : error= (maria_enable_indexes(tbl_info) ||
194 : /* we enabled indices, need '2' below */
195 : _ma_state_info_write(share,
196 : MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
197 : MA_STATE_INFO_WRITE_FULL_INFO));
198 : /* no need for _ma_reset_status(): REDO_DELETE_ALL is just before us */
199 0 : break;
200 : default:
201 0 : DBUG_ASSERT(0);
202 : }
203 379769 : if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
204 83 : trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
205 379769 : DBUG_RETURN(error);
206 : }
207 :
208 :
209 : /**
210 : @brief write hook for undo key
211 : */
212 :
213 : my_bool write_hook_for_undo_key(enum translog_record_type type,
214 : TRN *trn, MARIA_HA *tbl_info,
215 : LSN *lsn, void *hook_arg)
216 1128048 : {
217 : struct st_msg_to_write_hook_for_undo_key *msg=
218 1128048 : (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
219 :
220 1128048 : *msg->root= msg->value;
221 1128048 : _ma_fast_unlock_key_del(tbl_info);
222 1128048 : return write_hook_for_undo(type, trn, tbl_info, lsn, 0);
223 : }
224 :
225 :
226 : /**
227 : Updates "auto_increment" and calls the generic UNDO_KEY hook
228 :
229 : @return Operation status, always 0 (success)
230 : */
231 :
232 : my_bool write_hook_for_undo_key_insert(enum translog_record_type type,
233 : TRN *trn, MARIA_HA *tbl_info,
234 : LSN *lsn, void *hook_arg)
235 640300 : {
236 : struct st_msg_to_write_hook_for_undo_key *msg=
237 640300 : (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
238 640300 : MARIA_SHARE *share= tbl_info->s;
239 640300 : if (msg->auto_increment > 0)
240 : {
241 : /*
242 : Only reason to set it here is to have a mutex protect from checkpoint
243 : reading at the same time (would see a corrupted value).
244 :
245 : The purpose of the following code is to set auto_increment if the row
246 : has a with auto_increment value higher than the current one. We also
247 : want to be able to restore the old value, in case of rollback,
248 : if no one else has tried to set the value.
249 :
250 : The logic used is that we only restore the auto_increment value if
251 : tbl_info->last_auto_increment == share->last_auto_increment
252 : when it's time to do the rollback.
253 : */
254 0 : DBUG_PRINT("info",("auto_inc: %lu new auto_inc: %lu",
255 : (ulong)share->state.auto_increment,
256 : (ulong)msg->auto_increment));
257 0 : if (share->state.auto_increment < msg->auto_increment)
258 : {
259 : /* Remember the original value, in case of rollback */
260 0 : tbl_info->last_auto_increment= share->last_auto_increment=
261 : share->state.auto_increment;
262 0 : share->state.auto_increment= msg->auto_increment;
263 : }
264 : else
265 : {
266 : /*
267 : If the current value would have affected the original auto_increment
268 : value, set it to an impossible value so that it's not restored on
269 : rollback
270 : */
271 0 : if (msg->auto_increment > share->last_auto_increment)
272 0 : share->last_auto_increment= ~(ulonglong) 0;
273 : }
274 : }
275 640300 : return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
276 : }
277 :
278 :
279 : /**
280 : @brief Updates "share->auto_increment" in case of abort and calls
281 : generic UNDO_KEY hook
282 :
283 : @return Operation status, always 0 (success)
284 : */
285 :
286 : my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
287 : TRN *trn, MARIA_HA *tbl_info,
288 : LSN *lsn, void *hook_arg)
289 487748 : {
290 : struct st_msg_to_write_hook_for_undo_key *msg=
291 487748 : (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
292 487748 : MARIA_SHARE *share= tbl_info->s;
293 487748 : if (msg->auto_increment > 0) /* If auto increment key */
294 : {
295 : /* Restore auto increment if no one has changed it in between */
296 0 : if (share->last_auto_increment == tbl_info->last_auto_increment &&
297 : tbl_info->last_auto_increment != ~(ulonglong) 0)
298 0 : share->state.auto_increment= tbl_info->last_auto_increment;
299 : }
300 487748 : return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
301 : }
302 :
303 :
304 : /*****************************************************************************
305 : Functions for logging of key page changes
306 : *****************************************************************************/
307 :
308 : /**
309 : @brief
310 : Write log entry for page that has got data added or deleted at start of page
311 : */
312 :
313 : my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
314 : int move_length)
315 279 : {
316 : uint translog_parts;
317 : LSN lsn;
318 : uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2], *log_pos;
319 279 : uchar *buff= ma_page->buff;
320 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
321 : pgcache_page_no_t page;
322 279 : MARIA_HA *info= ma_page->info;
323 279 : DBUG_ENTER("_ma_log_prefix");
324 279 : DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
325 : (ulong) ma_page->pos, changed_length, move_length));
326 :
327 279 : page= ma_page->pos / info->s->block_size;
328 279 : log_pos= log_data + FILEID_STORE_SIZE;
329 279 : page_store(log_pos, page);
330 279 : log_pos+= PAGE_STORE_SIZE;
331 :
332 : /* Store keypage_flag */
333 279 : *log_pos++= KEY_OP_SET_PAGEFLAG;
334 279 : *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
335 :
336 279 : if (move_length < 0)
337 : {
338 : /* Delete prefix */
339 112 : log_pos[0]= KEY_OP_DEL_PREFIX;
340 112 : int2store(log_pos+1, -move_length);
341 112 : log_pos+= 3;
342 112 : if (changed_length)
343 : {
344 : /*
345 : We don't need a KEY_OP_OFFSET as KEY_OP_DEL_PREFIX has an implicit
346 : offset
347 : */
348 112 : log_pos[0]= KEY_OP_CHANGE;
349 112 : int2store(log_pos+1, changed_length);
350 112 : log_pos+= 3;
351 : }
352 : }
353 : else
354 : {
355 : /* Add prefix */
356 167 : DBUG_ASSERT(changed_length >0 && (int) changed_length >= move_length);
357 167 : log_pos[0]= KEY_OP_ADD_PREFIX;
358 167 : int2store(log_pos+1, move_length);
359 167 : int2store(log_pos+3, changed_length);
360 167 : log_pos+= 5;
361 : }
362 :
363 279 : translog_parts= 1;
364 279 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
365 279 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
366 : log_data);
367 279 : if (changed_length)
368 : {
369 279 : log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (buff +
370 : info->s->keypage_header);
371 279 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
372 279 : translog_parts= 2;
373 : }
374 :
375 : #ifdef EXTRA_DEBUG_KEY_CHANGES
376 : {
377 279 : int page_length= ma_page->size;
378 : ha_checksum crc;
379 279 : crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
380 279 : log_pos[0]= KEY_OP_CHECK;
381 279 : int2store(log_pos+1, page_length);
382 279 : int4store(log_pos+3, crc);
383 :
384 279 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
385 279 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
386 279 : changed_length+= 7;
387 279 : translog_parts++;
388 : }
389 : #endif
390 :
391 279 : DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
392 : info->trn, info,
393 : (translog_size_t)
394 : log_array[TRANSLOG_INTERNAL_PARTS +
395 : 0].length + changed_length,
396 : TRANSLOG_INTERNAL_PARTS + translog_parts,
397 : log_array, log_data, NULL));
398 : }
399 :
400 :
401 : /**
402 : @brief
403 : Write log entry for page that has got data added or deleted at end of page
404 : */
405 :
406 : my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
407 4662 : {
408 : LSN lsn;
409 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
410 : uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 10 + 7 + 2], *log_pos;
411 4662 : uchar *buff= ma_page->buff;
412 : int diff;
413 : uint translog_parts, extra_length;
414 4662 : MARIA_HA *info= ma_page->info;
415 : pgcache_page_no_t page;
416 4662 : DBUG_ENTER("_ma_log_suffix");
417 4662 : DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u",
418 : (ulong) ma_page->pos, org_length, new_length));
419 :
420 4662 : page= ma_page->pos / info->s->block_size;
421 :
422 4662 : log_pos= log_data + FILEID_STORE_SIZE;
423 4662 : page_store(log_pos, page);
424 4662 : log_pos+= PAGE_STORE_SIZE;
425 :
426 : /* Store keypage_flag */
427 4662 : *log_pos++= KEY_OP_SET_PAGEFLAG;
428 4662 : *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
429 :
430 4662 : if ((diff= (int) (new_length - org_length)) < 0)
431 : {
432 2763 : log_pos[0]= KEY_OP_DEL_SUFFIX;
433 2763 : int2store(log_pos+1, -diff);
434 2763 : log_pos+= 3;
435 2763 : translog_parts= 1;
436 2763 : extra_length= 0;
437 : }
438 : else
439 : {
440 1899 : log_pos[0]= KEY_OP_ADD_SUFFIX;
441 1899 : int2store(log_pos+1, diff);
442 1899 : log_pos+= 3;
443 1899 : log_array[TRANSLOG_INTERNAL_PARTS + 1].str= buff + org_length;
444 1899 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (uint) diff;
445 1899 : translog_parts= 2;
446 1899 : extra_length= (uint) diff;
447 : }
448 :
449 4662 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
450 4662 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
451 : log_data);
452 :
453 : #ifdef EXTRA_DEBUG_KEY_CHANGES
454 : {
455 : ha_checksum crc;
456 4662 : crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
457 4662 : log_pos[0]= KEY_OP_CHECK;
458 4662 : int2store(log_pos+1, new_length);
459 4662 : int4store(log_pos+3, crc);
460 :
461 4662 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
462 4662 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
463 4662 : extra_length+= 7;
464 4662 : translog_parts++;
465 : }
466 : #endif
467 :
468 4662 : DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
469 : info->trn, info,
470 : (translog_size_t)
471 : log_array[TRANSLOG_INTERNAL_PARTS +
472 : 0].length + extra_length,
473 : TRANSLOG_INTERNAL_PARTS + translog_parts,
474 : log_array, log_data, NULL));
475 : }
476 :
477 :
478 : /**
479 : @brief Log that a key was added to the page
480 :
481 : @param ma_page Changed page
482 : @param org_page_length Length of data in page before key was added
483 :
484 : @note
485 : If handle_overflow is set, then we have to protect against
486 : logging changes that is outside of the page.
487 : This may happen during underflow() handling where the buffer
488 : in memory temporary contains more data than block_size
489 : */
490 :
491 : my_bool _ma_log_add(MARIA_PAGE *ma_page,
492 : uint org_page_length, uchar *key_pos,
493 : uint changed_length, int move_length,
494 : my_bool handle_overflow __attribute__ ((unused)))
495 734112 : {
496 : LSN lsn;
497 : uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 3 + 3 + 3 + 7 + 2];
498 : uchar *log_pos;
499 734112 : uchar *buff= ma_page->buff;
500 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
501 734112 : MARIA_HA *info= ma_page->info;
502 734112 : uint offset= (uint) (key_pos - buff);
503 734112 : uint page_length= info->s->block_size - KEYPAGE_CHECKSUM_SIZE;
504 : uint translog_parts;
505 : pgcache_page_no_t page_pos;
506 734112 : DBUG_ENTER("_ma_log_add");
507 734112 : DBUG_PRINT("enter", ("page: %lu org_page_length: %u changed_length: %u "
508 : "move_length: %d",
509 : (ulong) ma_page->pos, org_page_length, changed_length,
510 : move_length));
511 734112 : DBUG_ASSERT(info->s->now_transactional);
512 :
513 : /*
514 : Write REDO entry that contains the logical operations we need
515 : to do the page
516 : */
517 734112 : log_pos= log_data + FILEID_STORE_SIZE;
518 734112 : page_pos= ma_page->pos / info->s->block_size;
519 734112 : page_store(log_pos, page_pos);
520 734112 : log_pos+= PAGE_STORE_SIZE;
521 :
522 : /* Store keypage_flag */
523 734112 : *log_pos++= KEY_OP_SET_PAGEFLAG;
524 734112 : *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
525 :
526 734112 : if (org_page_length + move_length > page_length)
527 : {
528 : /*
529 : Overflow. Cut either key or data from page end so that key fits
530 : The code that splits the too big page will ignore logging any
531 : data over org_page_length
532 : */
533 0 : DBUG_ASSERT(handle_overflow);
534 0 : if (offset + changed_length > page_length)
535 : {
536 0 : changed_length= page_length - offset;
537 0 : move_length= 0;
538 : }
539 : else
540 : {
541 0 : uint diff= org_page_length + move_length - page_length;
542 0 : log_pos[0]= KEY_OP_DEL_SUFFIX;
543 0 : int2store(log_pos+1, diff);
544 0 : log_pos+= 3;
545 0 : org_page_length= page_length - move_length;
546 : }
547 : }
548 :
549 734112 : if (offset == org_page_length)
550 14255 : log_pos[0]= KEY_OP_ADD_SUFFIX;
551 : else
552 : {
553 719857 : log_pos[0]= KEY_OP_OFFSET;
554 719857 : int2store(log_pos+1, offset);
555 719857 : log_pos+= 3;
556 719857 : if (move_length)
557 : {
558 719659 : log_pos[0]= KEY_OP_SHIFT;
559 719659 : int2store(log_pos+1, move_length);
560 719659 : log_pos+= 3;
561 : }
562 719857 : log_pos[0]= KEY_OP_CHANGE;
563 : }
564 734112 : int2store(log_pos+1, changed_length);
565 734112 : log_pos+= 3;
566 734112 : translog_parts= 2;
567 :
568 734112 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
569 734112 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
570 : log_data);
571 734112 : log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos;
572 734112 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
573 :
574 : #ifdef EXTRA_DEBUG_KEY_CHANGES
575 : {
576 734112 : MARIA_SHARE *share= info->s;
577 : ha_checksum crc;
578 734112 : uint save_page_length= ma_page->size;
579 734112 : uint new_length= org_page_length + move_length;
580 734112 : _ma_store_page_used(share, buff, new_length);
581 734112 : crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
582 734112 : log_pos[0]= KEY_OP_CHECK;
583 734112 : int2store(log_pos+1, new_length);
584 734112 : int4store(log_pos+3, crc);
585 :
586 734112 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
587 734112 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
588 734112 : changed_length+= 7;
589 734112 : translog_parts++;
590 734112 : _ma_store_page_used(share, buff, save_page_length);
591 : }
592 : #endif
593 :
594 734112 : if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
595 : info->trn, info,
596 : (translog_size_t)
597 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
598 : changed_length,
599 : TRANSLOG_INTERNAL_PARTS + translog_parts,
600 : log_array, log_data, NULL))
601 0 : DBUG_RETURN(-1);
602 734112 : DBUG_RETURN(0);
603 : }
604 :
605 :
606 : /****************************************************************************
607 : Redo of key pages
608 : ****************************************************************************/
609 :
610 : /**
611 : @brief Apply LOGREC_REDO_INDEX_NEW_PAGE
612 :
613 : @param info Maria handler
614 : @param header Header (without FILEID)
615 :
616 : @return Operation status
617 : @retval 0 OK
618 : @retval 1 Error
619 : */
620 :
621 : uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
622 : const uchar *header, uint length)
623 4190 : {
624 4190 : pgcache_page_no_t root_page= page_korr(header);
625 4190 : pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
626 4190 : uint key_nr= key_nr_korr(header + PAGE_STORE_SIZE * 2);
627 4190 : my_bool page_type_flag= header[PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE];
628 : enum pagecache_page_lock unlock_method;
629 : enum pagecache_page_pin unpin_method;
630 : MARIA_PINNED_PAGE page_link;
631 : my_off_t file_size;
632 : uchar *buff;
633 : uint result;
634 4190 : MARIA_SHARE *share= info->s;
635 4190 : DBUG_ENTER("_ma_apply_redo_index_new_page");
636 4190 : DBUG_PRINT("enter", ("root_page: %lu free_page: %lu",
637 : (ulong) root_page, (ulong) free_page));
638 :
639 : /* Set header to point at key data */
640 :
641 4190 : share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
642 : STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
643 : STATE_NOT_MOVABLE);
644 :
645 4190 : header+= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
646 4190 : length-= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
647 :
648 4190 : file_size= (my_off_t) (root_page + 1) * share->block_size;
649 4190 : if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
650 : {
651 : /* free_page is 0 if we shouldn't set key_del */
652 2610 : if (free_page)
653 : {
654 406 : if (free_page != IMPOSSIBLE_PAGE_NO)
655 392 : share->state.key_del= (my_off_t) free_page * share->block_size;
656 : else
657 14 : share->state.key_del= HA_OFFSET_ERROR;
658 : }
659 2610 : if (page_type_flag) /* root page */
660 997 : share->state.key_root[key_nr]= file_size - share->block_size;
661 : }
662 :
663 4190 : if (file_size > share->state.state.key_file_length)
664 : {
665 1276 : share->state.state.key_file_length= file_size;
666 1276 : buff= info->keyread_buff;
667 1276 : info->keyread_buff_used= 1;
668 1276 : unlock_method= PAGECACHE_LOCK_WRITE;
669 1276 : unpin_method= PAGECACHE_PIN;
670 : }
671 : else
672 : {
673 2914 : if (!(buff= pagecache_read(share->pagecache, &share->kfile,
674 : root_page, 0, 0,
675 : PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
676 : &page_link.link)))
677 : {
678 120 : if (my_errno != HA_ERR_FILE_TOO_SHORT &&
679 : my_errno != HA_ERR_WRONG_CRC)
680 : {
681 0 : result= 1;
682 0 : goto err;
683 : }
684 120 : buff= pagecache_block_link_to_buffer(page_link.link);
685 : }
686 2794 : else if (lsn_korr(buff) >= lsn)
687 : {
688 : /* Already applied */
689 2388 : DBUG_PRINT("info", ("Page is up to date, skipping redo"));
690 2388 : result= 0;
691 2388 : goto err;
692 : }
693 526 : unlock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
694 526 : unpin_method= PAGECACHE_PIN_LEFT_PINNED;
695 : }
696 :
697 : /* Write modified page */
698 1802 : bzero(buff, LSN_STORE_SIZE);
699 1802 : memcpy(buff + LSN_STORE_SIZE, header, length);
700 1802 : bzero(buff + LSN_STORE_SIZE + length,
701 : share->block_size - LSN_STORE_SIZE - KEYPAGE_CHECKSUM_SIZE - length);
702 1802 : bfill(buff + share->block_size - KEYPAGE_CHECKSUM_SIZE,
703 : KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
704 :
705 1802 : result= 0;
706 1802 : if (unlock_method == PAGECACHE_LOCK_WRITE &&
707 : pagecache_write(share->pagecache,
708 : &share->kfile, root_page, 0,
709 : buff, PAGECACHE_PLAIN_PAGE,
710 : unlock_method, unpin_method,
711 : PAGECACHE_WRITE_DELAY, &page_link.link,
712 : LSN_IMPOSSIBLE))
713 0 : result= 1;
714 :
715 : /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
716 1802 : page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
717 1802 : page_link.changed= 1;
718 1802 : push_dynamic(&info->pinned_pages, (void*) &page_link);
719 1802 : DBUG_RETURN(result);
720 :
721 2388 : err:
722 2388 : pagecache_unlock_by_link(share->pagecache, page_link.link,
723 : PAGECACHE_LOCK_WRITE_UNLOCK,
724 : PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
725 : LSN_IMPOSSIBLE, 0, FALSE);
726 2388 : DBUG_RETURN(result);
727 : }
728 :
729 :
730 : /**
731 : @brief Apply LOGREC_REDO_INDEX_FREE_PAGE
732 :
733 : @param info Maria handler
734 : @param header Header (without FILEID)
735 :
736 : @return Operation status
737 : @retval 0 OK
738 : @retval 1 Error
739 : */
740 :
741 : uint _ma_apply_redo_index_free_page(MARIA_HA *info,
742 : LSN lsn,
743 : const uchar *header)
744 3310 : {
745 3310 : pgcache_page_no_t page= page_korr(header);
746 3310 : pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
747 : my_off_t old_link;
748 : MARIA_PINNED_PAGE page_link;
749 3310 : MARIA_SHARE *share= info->s;
750 : uchar *buff;
751 : int result;
752 3310 : DBUG_ENTER("_ma_apply_redo_index_free_page");
753 3310 : DBUG_PRINT("enter", ("page: %lu free_page: %lu",
754 : (ulong) page, (ulong) free_page));
755 :
756 3310 : share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
757 : STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
758 : STATE_NOT_MOVABLE);
759 :
760 3310 : if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
761 1888 : share->state.key_del= (my_off_t) page * share->block_size;
762 :
763 3310 : old_link= ((free_page != IMPOSSIBLE_PAGE_NO) ?
764 : (my_off_t) free_page * share->block_size :
765 : HA_OFFSET_ERROR);
766 3310 : if (!(buff= pagecache_read(share->pagecache, &share->kfile,
767 : page, 0, 0,
768 : PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
769 : &page_link.link)))
770 : {
771 0 : result= (uint) my_errno;
772 0 : goto err;
773 : }
774 3310 : if (lsn_korr(buff) >= lsn)
775 : {
776 : /* Already applied */
777 1686 : result= 0;
778 1686 : goto err;
779 : }
780 : /* Free page */
781 1624 : bzero(buff + LSN_STORE_SIZE, share->keypage_header - LSN_STORE_SIZE);
782 1624 : _ma_store_keynr(share, buff, (uchar) MARIA_DELETE_KEY_NR);
783 1624 : _ma_store_page_used(share, buff, share->keypage_header + 8);
784 1624 : mi_sizestore(buff + share->keypage_header, old_link);
785 :
786 : #ifdef IDENTICAL_PAGES_AFTER_RECOVERY
787 : {
788 : bzero(buff + share->keypage_header + 8,
789 : share->block_size - share->keypage_header - 8 -
790 : KEYPAGE_CHECKSUM_SIZE);
791 : }
792 : #endif
793 :
794 : /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
795 1624 : page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
796 1624 : page_link.changed= 1;
797 1624 : push_dynamic(&info->pinned_pages, (void*) &page_link);
798 1624 : DBUG_RETURN(0);
799 :
800 1686 : err:
801 1686 : pagecache_unlock_by_link(share->pagecache, page_link.link,
802 : PAGECACHE_LOCK_WRITE_UNLOCK,
803 : PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
804 : LSN_IMPOSSIBLE, 0, FALSE);
805 1686 : DBUG_RETURN(result);
806 : }
807 :
808 :
809 : /**
810 : @brief Apply LOGREC_REDO_INDEX
811 :
812 : @fn ma_apply_redo_index()
813 : @param info Maria handler
814 : @param header Header (without FILEID)
815 :
816 : @notes
817 : Data for this part is a set of logical instructions of how to
818 : construct the key page.
819 :
820 : Information of the layout of the components for REDO_INDEX:
821 :
822 : Name Parameters (in byte) Information
823 : KEY_OP_OFFSET 2 Set position for next operations
824 : KEY_OP_SHIFT 2 (signed int) How much to shift down or up
825 : KEY_OP_CHANGE 2 length, data Data to replace at 'pos'
826 : KEY_OP_ADD_PREFIX 2 move-length How much data should be moved up
827 : 2 change-length Data to be replaced at page start
828 : KEY_OP_DEL_PREFIX 2 length Bytes to be deleted at page start
829 : KEY_OP_ADD_SUFFIX 2 length, data Add data to end of page
830 : KEY_OP_DEL_SUFFIX 2 length Reduce page length with this
831 : Sets position to start of page
832 : KEY_OP_CHECK 6 page_length[2},CRC Used only when debugging
833 : KEY_OP_COMPACT_PAGE 6 transid
834 : KEY_OP_SET_PAGEFLAG 1 flag for page
835 :
836 : @return Operation status
837 : @retval 0 OK
838 : @retval 1 Error
839 : */
840 :
841 : long my_counter= 0;
842 :
843 : uint _ma_apply_redo_index(MARIA_HA *info,
844 : LSN lsn, const uchar *header, uint head_length)
845 1586696 : {
846 1586696 : MARIA_SHARE *share= info->s;
847 1586696 : pgcache_page_no_t page_pos= page_korr(header);
848 : MARIA_PINNED_PAGE page_link;
849 : uchar *buff;
850 1586696 : const uchar *header_end= header + head_length;
851 1586696 : uint page_offset= 0, org_page_length;
852 : uint nod_flag, page_length, keypage_header, keynr;
853 : int result;
854 : MARIA_PAGE page;
855 1586696 : DBUG_ENTER("_ma_apply_redo_index");
856 1586696 : DBUG_PRINT("enter", ("page: %lu", (ulong) page_pos));
857 :
858 : /* Set header to point at key data */
859 1586696 : header+= PAGE_STORE_SIZE;
860 :
861 1586696 : if (!(buff= pagecache_read(share->pagecache, &share->kfile,
862 : page_pos, 0, 0,
863 : PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
864 : &page_link.link)))
865 : {
866 0 : result= 1;
867 0 : goto err;
868 : }
869 1586696 : if (lsn_korr(buff) >= lsn)
870 : {
871 : /* Already applied */
872 830651 : DBUG_PRINT("info", ("Page is up to date, skipping redo"));
873 830651 : result= 0;
874 830651 : goto err;
875 : }
876 :
877 756045 : keynr= _ma_get_keynr(share, buff);
878 756045 : _ma_page_setup(&page, info, share->keyinfo + keynr, page_pos, buff);
879 756045 : nod_flag= page.node;
880 756045 : org_page_length= page_length= page.size;
881 :
882 756045 : keypage_header= share->keypage_header;
883 756045 : DBUG_PRINT("redo", ("page_length: %u", page_length));
884 :
885 : /* Apply modifications to page */
886 : do
887 : {
888 3030197 : switch ((enum en_key_op) (*header++)) {
889 : case KEY_OP_OFFSET: /* 1 */
890 744838 : page_offset= uint2korr(header);
891 744838 : header+= 2;
892 744838 : DBUG_PRINT("redo", ("key_op_offset: %u", page_offset));
893 744838 : DBUG_ASSERT(page_offset >= keypage_header && page_offset <= page_length);
894 : break;
895 : case KEY_OP_SHIFT: /* 2 */
896 : {
897 744681 : int length= sint2korr(header);
898 744681 : header+= 2;
899 744681 : DBUG_PRINT("redo", ("key_op_shift: %d", length));
900 744681 : DBUG_ASSERT(page_offset != 0 && page_offset <= page_length &&
901 : page_length + length < share->block_size);
902 :
903 744681 : if (length < 0)
904 365839 : bmove(buff + page_offset, buff + page_offset - length,
905 : page_length - page_offset + length);
906 : else
907 378842 : bmove_upp(buff + page_length + length, buff + page_length,
908 : page_length - page_offset);
909 744681 : page_length+= length;
910 744681 : break;
911 : }
912 : case KEY_OP_CHANGE: /* 3 */
913 : {
914 384844 : uint length= uint2korr(header);
915 384844 : DBUG_PRINT("redo", ("key_op_change: %u", length));
916 384844 : DBUG_ASSERT(page_offset != 0 && page_offset + length <= page_length);
917 :
918 384844 : memcpy(buff + page_offset, header + 2 , length);
919 384844 : header+= 2 + length;
920 384844 : break;
921 : }
922 : case KEY_OP_ADD_PREFIX: /* 4 */
923 : {
924 94 : uint insert_length= uint2korr(header);
925 94 : uint changed_length= uint2korr(header+2);
926 94 : DBUG_PRINT("redo", ("key_op_add_prefix: %u %u",
927 : insert_length, changed_length));
928 :
929 94 : DBUG_ASSERT(insert_length <= changed_length &&
930 : page_length + changed_length <= share->block_size);
931 :
932 94 : bmove_upp(buff + page_length + insert_length, buff + page_length,
933 : page_length - keypage_header);
934 94 : memcpy(buff + keypage_header, header + 4 , changed_length);
935 94 : header+= 4 + changed_length;
936 94 : page_length+= insert_length;
937 94 : break;
938 : }
939 : case KEY_OP_DEL_PREFIX: /* 5 */
940 : {
941 53 : uint length= uint2korr(header);
942 53 : header+= 2;
943 53 : DBUG_PRINT("redo", ("key_op_del_prefix: %u", length));
944 53 : DBUG_ASSERT(length <= page_length - keypage_header);
945 :
946 53 : bmove(buff + keypage_header, buff + keypage_header +
947 : length, page_length - keypage_header - length);
948 53 : page_length-= length;
949 :
950 53 : page_offset= keypage_header; /* Prepare for change */
951 53 : break;
952 : }
953 : case KEY_OP_ADD_SUFFIX: /* 6 */
954 : {
955 8918 : uint insert_length= uint2korr(header);
956 8918 : DBUG_PRINT("redo", ("key_op_add_prefix: %u", insert_length));
957 8918 : DBUG_ASSERT(page_length + insert_length <= share->block_size);
958 8918 : memcpy(buff + page_length, header+2, insert_length);
959 :
960 8918 : page_length+= insert_length;
961 8918 : header+= 2 + insert_length;
962 8918 : break;
963 : }
964 : case KEY_OP_DEL_SUFFIX: /* 7 */
965 : {
966 2610 : uint del_length= uint2korr(header);
967 2610 : header+= 2;
968 2610 : DBUG_PRINT("redo", ("key_op_del_suffix: %u", del_length));
969 2610 : DBUG_ASSERT(page_length - del_length >= keypage_header);
970 2610 : page_length-= del_length;
971 2610 : break;
972 : }
973 : case KEY_OP_CHECK: /* 8 */
974 : {
975 : #ifdef EXTRA_DEBUG_KEY_CHANGES
976 : uint check_page_length;
977 : ha_checksum crc;
978 754999 : check_page_length= uint2korr(header);
979 754999 : crc= uint4korr(header+2);
980 754999 : _ma_store_page_used(share, buff, page_length);
981 754999 : DBUG_ASSERT(check_page_length == page_length);
982 754999 : if (crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
983 : page_length - LSN_STORE_SIZE))
984 : {
985 0 : DBUG_PRINT("error", ("page_length %u",page_length));
986 0 : DBUG_DUMP("KEY_OP_CHECK bad page", buff, share->block_size);
987 0 : DBUG_ASSERT("crc" == "failure in REDO_INDEX");
988 : }
989 : #endif
990 754999 : DBUG_PRINT("redo", ("key_op_check"));
991 754999 : header+= 6;
992 754999 : break;
993 : }
994 : case KEY_OP_MULTI_COPY: /* 9 */
995 : {
996 : /*
997 : List of fixed-len memcpy() operations with their source located inside
998 : the page. The log record's piece looks like:
999 : first the length 'full_length' to be used by memcpy()
1000 : then the number of bytes used by the list of (to,from) pairs
1001 : then the (to,from) pairs, so we do:
1002 : for (t,f) in [list of (to,from) pairs]:
1003 : memcpy(t, f, full_length).
1004 : */
1005 : uint full_length, log_memcpy_length;
1006 : const uchar *log_memcpy_end;
1007 :
1008 0 : DBUG_PRINT("redo", ("key_op_multi_copy"));
1009 0 : full_length= uint2korr(header);
1010 0 : header+= 2;
1011 0 : log_memcpy_length= uint2korr(header);
1012 0 : header+= 2;
1013 0 : log_memcpy_end= header + log_memcpy_length;
1014 0 : DBUG_ASSERT(full_length < share->block_size);
1015 0 : while (header < log_memcpy_end)
1016 : {
1017 : uint to, from;
1018 0 : to= uint2korr(header);
1019 0 : header+= 2;
1020 0 : from= uint2korr(header);
1021 0 : header+= 2;
1022 : /* "from" is a place in the existing page */
1023 0 : DBUG_ASSERT(max(from, to) < share->block_size);
1024 0 : memcpy(buff + to, buff + from, full_length);
1025 : }
1026 : break;
1027 : }
1028 : case KEY_OP_SET_PAGEFLAG:
1029 389160 : DBUG_PRINT("redo", ("key_op_set_pageflag"));
1030 389160 : buff[KEYPAGE_TRANSFLAG_OFFSET]= *header++;
1031 389160 : break;
1032 : case KEY_OP_COMPACT_PAGE:
1033 : {
1034 0 : TrID transid= transid_korr(header);
1035 :
1036 0 : DBUG_PRINT("redo", ("key_op_compact_page"));
1037 0 : header+= TRANSID_SIZE;
1038 0 : if (_ma_compact_keypage(&page, transid))
1039 : {
1040 0 : result= 1;
1041 0 : goto err;
1042 : }
1043 0 : page_length= page.size;
1044 : }
1045 : case KEY_OP_NONE:
1046 : default:
1047 0 : DBUG_ASSERT(0);
1048 : result= 1;
1049 : goto err;
1050 : }
1051 3030197 : } while (header < header_end);
1052 756045 : DBUG_ASSERT(header == header_end);
1053 :
1054 : /* Write modified page */
1055 756045 : page.size= page_length;
1056 756045 : _ma_store_page_used(share, buff, page_length);
1057 :
1058 : /*
1059 : Clean old stuff up. Gives us better compression of we archive things
1060 : and makes things easer to debug
1061 : */
1062 756045 : if (page_length < org_page_length)
1063 368502 : bzero(buff + page_length, org_page_length-page_length);
1064 :
1065 : /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
1066 756045 : page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
1067 756045 : page_link.changed= 1;
1068 756045 : push_dynamic(&info->pinned_pages, (void*) &page_link);
1069 756045 : DBUG_RETURN(0);
1070 :
1071 830651 : err:
1072 830651 : pagecache_unlock_by_link(share->pagecache, page_link.link,
1073 : PAGECACHE_LOCK_WRITE_UNLOCK,
1074 : PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
1075 : LSN_IMPOSSIBLE, 0, FALSE);
1076 830651 : if (result)
1077 0 : _ma_mark_file_crashed(share);
1078 830651 : DBUG_RETURN(result);
1079 : }
1080 :
1081 :
1082 : /****************************************************************************
1083 : Undo of key block changes
1084 : ****************************************************************************/
1085 :
1086 : /**
1087 : @brief Undo of insert of key (ie, delete the inserted key)
1088 : */
1089 :
1090 : my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
1091 : const uchar *header, uint length)
1092 200674 : {
1093 : LSN lsn;
1094 : my_bool res;
1095 : uint keynr;
1096 : uchar key_buff[MARIA_MAX_KEY_BUFF];
1097 200674 : MARIA_SHARE *share= info->s;
1098 : MARIA_KEY key;
1099 : my_off_t new_root;
1100 : struct st_msg_to_write_hook_for_undo_key msg;
1101 200674 : DBUG_ENTER("_ma_apply_undo_key_insert");
1102 :
1103 200674 : share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
1104 : STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
1105 : STATE_NOT_MOVABLE);
1106 200674 : keynr= key_nr_korr(header);
1107 200674 : length-= KEY_NR_STORE_SIZE;
1108 :
1109 : /* We have to copy key as _ma_ck_real_delete() may change it */
1110 200674 : memcpy(key_buff, header + KEY_NR_STORE_SIZE, length);
1111 200674 : DBUG_DUMP("key_buff", key_buff, length);
1112 :
1113 200674 : new_root= share->state.key_root[keynr];
1114 : /*
1115 : Change the key to an internal structure.
1116 : It's safe to have SEARCH_USER_KEY_HAS_TRANSID even if there isn't
1117 : a transaction id, as ha_key_cmp() will stop comparison when key length
1118 : is reached.
1119 : For index with transid flag, the ref_length of the key is not correct.
1120 : This should however be safe as long as this key is only used for
1121 : comparsion against other keys (not for packing or for read-next etc as
1122 : in this case we use data_length + ref_length, which is correct.
1123 : */
1124 200674 : key.keyinfo= share->keyinfo + keynr;
1125 200674 : key.data= key_buff;
1126 200674 : key.data_length= length - share->rec_reflength;
1127 200674 : key.ref_length= share->rec_reflength;
1128 200674 : key.flag= SEARCH_USER_KEY_HAS_TRANSID;
1129 :
1130 200674 : res= ((share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
1131 : maria_rtree_real_delete(info, &key, &new_root) :
1132 : _ma_ck_real_delete(info, &key, &new_root));
1133 200674 : if (res)
1134 0 : _ma_mark_file_crashed(share);
1135 200674 : msg.root= &share->state.key_root[keynr];
1136 200674 : msg.value= new_root;
1137 200674 : msg.keynr= keynr;
1138 :
1139 200674 : if (_ma_write_clr(info, undo_lsn, *msg.root == msg.value ?
1140 : LOGREC_UNDO_KEY_INSERT : LOGREC_UNDO_KEY_INSERT_WITH_ROOT,
1141 : 0, 0, &lsn, (void*) &msg))
1142 0 : res= 1;
1143 :
1144 200674 : _ma_fast_unlock_key_del(info);
1145 200674 : _ma_unpin_all_pages_and_finalize_row(info, lsn);
1146 200674 : DBUG_RETURN(res);
1147 : }
1148 :
1149 :
1150 : /**
1151 : @brief Undo of delete of key (ie, insert the deleted key)
1152 :
1153 : @param with_root If the UNDO is UNDO_KEY_DELETE_WITH_ROOT
1154 : */
1155 :
1156 : my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
1157 : const uchar *header, uint length,
1158 : my_bool with_root)
1159 92050 : {
1160 : LSN lsn;
1161 : my_bool res;
1162 : uint keynr, skip_bytes;
1163 : uchar key_buff[MARIA_MAX_KEY_BUFF];
1164 92050 : MARIA_SHARE *share= info->s;
1165 : my_off_t new_root;
1166 : struct st_msg_to_write_hook_for_undo_key msg;
1167 : MARIA_KEY key;
1168 92050 : DBUG_ENTER("_ma_apply_undo_key_delete");
1169 :
1170 92050 : share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
1171 : STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
1172 : STATE_NOT_MOVABLE);
1173 92050 : keynr= key_nr_korr(header);
1174 92050 : skip_bytes= KEY_NR_STORE_SIZE + (with_root ? PAGE_STORE_SIZE : 0);
1175 92050 : header+= skip_bytes;
1176 92050 : length-= skip_bytes;
1177 :
1178 : /* We have to copy key as _ma_ck_real_write_btree() may change it */
1179 92050 : memcpy(key_buff, header, length);
1180 92050 : DBUG_DUMP("key", key_buff, length);
1181 :
1182 92050 : key.keyinfo= share->keyinfo + keynr;
1183 92050 : key.data= key_buff;
1184 92050 : key.data_length= length - share->rec_reflength;
1185 92050 : key.ref_length= share->rec_reflength;
1186 92050 : key.flag= SEARCH_USER_KEY_HAS_TRANSID;
1187 :
1188 92050 : new_root= share->state.key_root[keynr];
1189 92050 : res= (share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
1190 : maria_rtree_insert_level(info, &key, -1, &new_root) :
1191 : _ma_ck_real_write_btree(info, &key, &new_root,
1192 : share->keyinfo[keynr].write_comp_flag |
1193 : key.flag);
1194 92050 : if (res)
1195 0 : _ma_mark_file_crashed(share);
1196 :
1197 92050 : msg.root= &share->state.key_root[keynr];
1198 92050 : msg.value= new_root;
1199 92050 : msg.keynr= keynr;
1200 92050 : if (_ma_write_clr(info, undo_lsn,
1201 : *msg.root == msg.value ?
1202 : LOGREC_UNDO_KEY_DELETE : LOGREC_UNDO_KEY_DELETE_WITH_ROOT,
1203 : 0, 0, &lsn,
1204 : (void*) &msg))
1205 0 : res= 1;
1206 :
1207 92050 : _ma_fast_unlock_key_del(info);
1208 92050 : _ma_unpin_all_pages_and_finalize_row(info, lsn);
1209 92050 : DBUG_RETURN(res);
1210 : }
1211 :
1212 :
1213 : /****************************************************************************
1214 : Handle some local variables
1215 : ****************************************************************************/
1216 :
1217 : /**
1218 : @brief lock key_del for other threads usage
1219 :
1220 : @fn _ma_lock_key_del()
1221 : @param info Maria handler
1222 : @param insert_at_end Set to 1 if we are doing an insert
1223 :
1224 : @note
1225 : To allow higher concurrency in the common case where we do inserts
1226 : and we don't have any linked blocks we do the following:
1227 : - Mark in info->key_del_used that we are not using key_del
1228 : - Return at once (without marking key_del as used)
1229 :
1230 : This is safe as we in this case don't write key_del_current into
1231 : the redo log and during recover we are not updating key_del.
1232 :
1233 : @retval 1 Use page at end of file
1234 : @retval 0 Use page at share->key_del_current
1235 : */
1236 :
1237 : my_bool _ma_lock_key_del(MARIA_HA *info, my_bool insert_at_end)
1238 9441 : {
1239 9441 : MARIA_SHARE *share= info->s;
1240 :
1241 : /*
1242 : info->key_del_used is 0 initially.
1243 : If the caller needs a block (_ma_new()), we look at the free list:
1244 : - looks empty? then caller will create a new block at end of file and
1245 : remember (through info->key_del_used==2) that it will not change
1246 : state.key_del and does not need to wake up waiters as nobody will wait for
1247 : it.
1248 : - non-empty? then we wait for other users of the state.key_del list to
1249 : have finished, then we lock this list (through share->key_del_used==1)
1250 : because we need to prevent some other thread to also read state.key_del
1251 : and use the same page as ours. We remember through info->key_del_used==1
1252 : that we will have to set state.key_del at unlock time and wake up
1253 : waiters.
1254 : If the caller wants to free a block (_ma_dispose()), "empty" and
1255 : "non-empty" are treated as "non-empty" is treated above.
1256 : When we are ready to unlock, we copy share->key_del_current into
1257 : state.key_del. Unlocking happens when writing the UNDO log record, that
1258 : can make a long lock time.
1259 : Why we wrote "*looks* empty": because we are looking at state.key_del
1260 : which may be slightly old (share->key_del_current may be more recent and
1261 : exact): when we want a new page, we tolerate to treat "there was no free
1262 : page 1 millisecond ago" as "there is no free page". It's ok to non-pop
1263 : (_ma_new(), page will be found later anyway) but it's not ok to non-push
1264 : (_ma_dispose(), page would be lost).
1265 : When we leave this function, info->key_del_used is always 1 or 2.
1266 : */
1267 9441 : if (info->key_del_used != 1)
1268 : {
1269 8881 : pthread_mutex_lock(&share->key_del_lock);
1270 8881 : if (share->state.key_del == HA_OFFSET_ERROR && insert_at_end)
1271 : {
1272 4964 : pthread_mutex_unlock(&share->key_del_lock);
1273 4964 : info->key_del_used= 2; /* insert-with-append */
1274 4964 : return 1;
1275 : }
1276 : #ifdef THREAD
1277 3917 : while (share->key_del_used)
1278 0 : pthread_cond_wait(&share->key_del_cond, &share->key_del_lock);
1279 : #endif
1280 3917 : info->key_del_used= 1;
1281 3917 : share->key_del_used= 1;
1282 3917 : share->key_del_current= share->state.key_del;
1283 3917 : pthread_mutex_unlock(&share->key_del_lock);
1284 : }
1285 4477 : return share->key_del_current == HA_OFFSET_ERROR;
1286 : }
1287 :
1288 :
1289 : /**
1290 : @brief copy changes to key_del and unlock it
1291 :
1292 : @notes
1293 : In case of many threads using the maria table, we always have a lock
1294 : on the translog when comming here.
1295 : */
1296 :
1297 : void _ma_unlock_key_del(MARIA_HA *info)
1298 8293 : {
1299 8293 : DBUG_ASSERT(info->key_del_used);
1300 8293 : if (info->key_del_used == 1) /* Ignore insert-with-append */
1301 : {
1302 3917 : MARIA_SHARE *share= info->s;
1303 3917 : pthread_mutex_lock(&share->key_del_lock);
1304 3917 : share->key_del_used= 0;
1305 3917 : share->state.key_del= share->key_del_current;
1306 3917 : pthread_mutex_unlock(&share->key_del_lock);
1307 3917 : pthread_cond_signal(&share->key_del_cond);
1308 : }
1309 8293 : info->key_del_used= 0;
1310 : }
|