1 : /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 :
3 : This program is free software; you can redistribute it and/or modify
4 : it under the terms of the GNU General Public License as published by
5 : the Free Software Foundation; version 2 of the License.
6 :
7 : This program is distributed in the hope that it will be useful,
8 : but WITHOUT ANY WARRANTY; without even the implied warranty of
9 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 : GNU General Public License for more details.
11 :
12 : You should have received a copy of the GNU General Public License
13 : along with this program; if not, write to the Free Software
14 : Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15 :
16 : #include "ma_fulltext.h"
17 : #include "ma_rt_index.h"
18 : #include "trnman.h"
19 : #include "ma_key_recover.h"
20 :
21 : static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
22 : MARIA_PAGE *page);
23 : static int del(MARIA_HA *info, MARIA_KEY *key,
24 : MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
25 : uchar *keypos, my_off_t next_block, uchar *ret_key_buff);
26 : static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
27 : MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
28 : uchar *keypos);
29 : static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
30 : uchar *keypos, uchar *lastkey, uchar *page_end,
31 : my_off_t *next_block, MARIA_KEY_PARAM *s_temp);
32 :
33 : /* @breif Remove a row from a MARIA table */
34 :
35 : int maria_delete(MARIA_HA *info,const uchar *record)
36 122237 : {
37 : uint i;
38 : uchar *old_key;
39 : int save_errno;
40 : char lastpos[8];
41 122237 : MARIA_SHARE *share= info->s;
42 : MARIA_KEYDEF *keyinfo;
43 122237 : DBUG_ENTER("maria_delete");
44 :
45 : /* Test if record is in datafile */
46 122237 : DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage",
47 : maria_print_error(share, HA_ERR_CRASHED);
48 : DBUG_RETURN(my_errno= HA_ERR_CRASHED););
49 122237 : DBUG_EXECUTE_IF("my_error_test_undefined_error",
50 : maria_print_error(share, INT_MAX);
51 : DBUG_RETURN(my_errno= INT_MAX););
52 122237 : if (!(info->update & HA_STATE_AKTIV))
53 : {
54 0 : DBUG_RETURN(my_errno=HA_ERR_KEY_NOT_FOUND); /* No database read */
55 : }
56 122237 : if (share->options & HA_OPTION_READ_ONLY_DATA)
57 : {
58 0 : DBUG_RETURN(my_errno=EACCES);
59 : }
60 122237 : if (_ma_readinfo(info,F_WRLCK,1))
61 0 : DBUG_RETURN(my_errno);
62 122237 : if ((*share->compare_record)(info,record))
63 122237 : goto err; /* Error on read-check */
64 :
65 122237 : if (_ma_mark_file_changed(info))
66 122237 : goto err;
67 :
68 : /* Ensure we don't change the autoincrement value */
69 122237 : info->last_auto_increment= ~(ulonglong) 0;
70 : /* Remove all keys from the index file */
71 :
72 122237 : old_key= info->lastkey_buff2;
73 :
74 844434 : for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++)
75 : {
76 722197 : if (maria_is_key_active(share->state.key_map, i))
77 : {
78 722197 : keyinfo->version++;
79 722197 : if (keyinfo->flag & HA_FULLTEXT)
80 : {
81 0 : if (_ma_ft_del(info, i, old_key, record, info->cur_row.lastpos))
82 : goto err;
83 : }
84 : else
85 : {
86 : MARIA_KEY key;
87 722197 : if (keyinfo->ck_delete(info,
88 : (*keyinfo->make_key)(info, &key, i, old_key,
89 : record,
90 : info->cur_row.lastpos,
91 : info->cur_row.trid)))
92 722197 : goto err;
93 : }
94 : /* The above changed info->lastkey2. Inform maria_rnext_same(). */
95 722197 : info->update&= ~HA_STATE_RNEXT_SAME;
96 : }
97 : }
98 :
99 122237 : if (share->calc_checksum)
100 : {
101 : /*
102 : We can't use the row based checksum as this doesn't have enough
103 : precision.
104 : */
105 30750 : info->cur_row.checksum= (*share->calc_checksum)(info, record);
106 : }
107 :
108 122237 : if ((*share->delete_record)(info, record))
109 122237 : goto err; /* Remove record from database */
110 :
111 122237 : info->state->checksum-= info->cur_row.checksum;
112 122237 : info->state->records--;
113 122237 : info->update= HA_STATE_CHANGED+HA_STATE_DELETED+HA_STATE_ROW_CHANGED;
114 122237 : share->state.changed|= (STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE |
115 : STATE_NOT_ZEROFILLED);
116 122237 : info->state->changed=1;
117 :
118 122237 : mi_sizestore(lastpos, info->cur_row.lastpos);
119 122237 : VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
120 : allow_break(); /* Allow SIGHUP & SIGINT */
121 122237 : if (info->invalidator != 0)
122 : {
123 0 : DBUG_PRINT("info", ("invalidator... '%s' (delete)",
124 : share->open_file_name.str));
125 0 : (*info->invalidator)(share->open_file_name.str);
126 0 : info->invalidator=0;
127 : }
128 122237 : DBUG_RETURN(0);
129 :
130 0 : err:
131 0 : save_errno= my_errno;
132 0 : DBUG_ASSERT(save_errno);
133 0 : if (!save_errno)
134 0 : save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */
135 :
136 0 : mi_sizestore(lastpos, info->cur_row.lastpos);
137 0 : if (save_errno != HA_ERR_RECORD_CHANGED)
138 : {
139 0 : maria_print_error(share, HA_ERR_CRASHED);
140 0 : maria_mark_crashed(info); /* mark table crashed */
141 : }
142 0 : VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
143 0 : info->update|=HA_STATE_WRITTEN; /* Buffer changed */
144 : allow_break(); /* Allow SIGHUP & SIGINT */
145 0 : if (save_errno == HA_ERR_KEY_NOT_FOUND)
146 : {
147 0 : maria_print_error(share, HA_ERR_CRASHED);
148 0 : my_errno=HA_ERR_CRASHED;
149 : }
150 0 : DBUG_RETURN(my_errno= save_errno);
151 : } /* maria_delete */
152 :
153 :
154 : /*
155 : Remove a key from the btree index
156 :
157 : TODO:
158 : Change ma_ck_real_delete() to use another buffer for changed keys instead
159 : of key->data. This would allows us to remove the copying of the key here.
160 : */
161 :
162 : my_bool _ma_ck_delete(MARIA_HA *info, MARIA_KEY *key)
163 1032976 : {
164 1032976 : MARIA_SHARE *share= info->s;
165 : int res;
166 1032976 : LSN lsn= LSN_IMPOSSIBLE;
167 1032976 : my_off_t new_root= share->state.key_root[key->keyinfo->key_nr];
168 : uchar key_buff[MARIA_MAX_KEY_BUFF], *save_key_data;
169 : MARIA_KEY org_key;
170 1032976 : DBUG_ENTER("_ma_ck_delete");
171 :
172 1032976 : save_key_data= key->data;
173 1032976 : if (share->now_transactional)
174 : {
175 : /* Save original value as the key may change */
176 487748 : memcpy(key_buff, key->data, key->data_length + key->ref_length);
177 487748 : org_key= *key;
178 487748 : key->data= key_buff;
179 : }
180 :
181 1032976 : res= _ma_ck_real_delete(info, key, &new_root);
182 :
183 1032976 : key->data= save_key_data;
184 1520724 : if (!res && share->now_transactional)
185 487748 : res= _ma_write_undo_key_delete(info, &org_key, new_root, &lsn);
186 : else
187 : {
188 545228 : share->state.key_root[key->keyinfo->key_nr]= new_root;
189 545228 : _ma_fast_unlock_key_del(info);
190 : }
191 1032976 : _ma_unpin_all_pages_and_finalize_row(info, lsn);
192 1032976 : DBUG_RETURN(res != 0);
193 : } /* _ma_ck_delete */
194 :
195 :
196 : my_bool _ma_ck_real_delete(register MARIA_HA *info, MARIA_KEY *key,
197 : my_off_t *root)
198 1233650 : {
199 : int error;
200 1233650 : my_bool result= 0;
201 : my_off_t old_root;
202 : uchar *root_buff;
203 1233650 : MARIA_KEYDEF *keyinfo= key->keyinfo;
204 : MARIA_PAGE page;
205 1233650 : DBUG_ENTER("_ma_ck_real_delete");
206 :
207 1233650 : if ((old_root=*root) == HA_OFFSET_ERROR)
208 : {
209 0 : my_errno=HA_ERR_CRASHED;
210 0 : DBUG_RETURN(1);
211 : }
212 1233650 : if (!(root_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
213 : MARIA_MAX_KEY_BUFF*2)))
214 : {
215 0 : DBUG_PRINT("error",("Couldn't allocate memory"));
216 0 : my_errno=ENOMEM;
217 0 : DBUG_RETURN(1);
218 : }
219 1233650 : DBUG_PRINT("info",("root_page: %ld", (long) old_root));
220 1233650 : if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
221 : PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
222 : {
223 0 : result= 1;
224 0 : goto err;
225 : }
226 1233650 : if ((error= d_search(info, key, (keyinfo->flag & HA_FULLTEXT ?
227 : SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT:
228 : SEARCH_SAME),
229 : &page)))
230 : {
231 201540 : if (error < 0)
232 0 : result= 1;
233 201540 : else if (error == 2)
234 : {
235 0 : DBUG_PRINT("test",("Enlarging of root when deleting"));
236 0 : if (_ma_enlarge_root(info, key, root))
237 0 : result= 1;
238 : }
239 : else /* error == 1 */
240 : {
241 201540 : MARIA_SHARE *share= info->s;
242 :
243 201540 : page_mark_changed(info, &page);
244 :
245 201540 : if (page.size <= page.node + share->keypage_header + 1)
246 : {
247 1174 : if (page.node)
248 496 : *root= _ma_kpos(page.node, root_buff +share->keypage_header +
249 : page.node);
250 : else
251 678 : *root=HA_OFFSET_ERROR;
252 1174 : if (_ma_dispose(info, old_root, 0))
253 0 : result= 1;
254 : }
255 200366 : else if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
256 : DFLT_INIT_HITS))
257 0 : result= 1;
258 : }
259 : }
260 1233650 : err:
261 : my_afree(root_buff);
262 1233650 : DBUG_PRINT("exit",("Return: %d",result));
263 1233650 : DBUG_RETURN(result);
264 : } /* _ma_ck_real_delete */
265 :
266 :
267 : /**
268 : @brief Remove key below key root
269 :
270 : @param key Key to delete. Will contain new key if block was enlarged
271 :
272 : @return
273 : @retval 0 ok (anc_page is not changed)
274 : @retval 1 If data on page is too small; In this case anc_buff is not saved
275 : @retval 2 If data on page is too big
276 : @retval -1 On errors
277 : */
278 :
279 : static int d_search(MARIA_HA *info, MARIA_KEY *key, uint32 comp_flag,
280 : MARIA_PAGE *anc_page)
281 2043779 : {
282 : int flag,ret_value,save_flag;
283 : uint nod_flag, page_flag;
284 : my_bool last_key;
285 : uchar *leaf_buff,*keypos;
286 : uchar lastkey[MARIA_MAX_KEY_BUFF];
287 : MARIA_KEY_PARAM s_temp;
288 2043779 : MARIA_SHARE *share= info->s;
289 2043779 : MARIA_KEYDEF *keyinfo= key->keyinfo;
290 : MARIA_PAGE leaf_page;
291 2043779 : DBUG_ENTER("d_search");
292 2043779 : DBUG_DUMP("page", anc_page->buff, anc_page->size);
293 :
294 2043779 : flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos, lastkey,
295 : &last_key);
296 2043779 : if (flag == MARIA_FOUND_WRONG_KEY)
297 : {
298 0 : DBUG_PRINT("error",("Found wrong key"));
299 0 : DBUG_RETURN(-1);
300 : }
301 2043779 : page_flag= anc_page->flag;
302 2043779 : nod_flag= anc_page->node;
303 :
304 2043779 : if (!flag && (keyinfo->flag & HA_FULLTEXT))
305 : {
306 : uint off;
307 : int subkeys;
308 :
309 0 : get_key_full_length_rdonly(off, lastkey);
310 0 : subkeys=ft_sintXkorr(lastkey+off);
311 0 : DBUG_ASSERT(info->ft1_to_ft2==0 || subkeys >=0);
312 0 : comp_flag=SEARCH_SAME;
313 0 : if (subkeys >= 0)
314 : {
315 : /* normal word, one-level tree structure */
316 0 : if (info->ft1_to_ft2)
317 : {
318 : /* we're in ft1->ft2 conversion mode. Saving key data */
319 0 : insert_dynamic(info->ft1_to_ft2, (lastkey+off));
320 : }
321 : else
322 : {
323 : /* we need exact match only if not in ft1->ft2 conversion mode */
324 0 : flag=(*keyinfo->bin_search)(key, anc_page, comp_flag, &keypos,
325 : lastkey, &last_key);
326 : }
327 : /* fall through to normal delete */
328 : }
329 : else
330 : {
331 : /* popular word. two-level tree. going down */
332 : uint tmp_key_length;
333 : my_off_t root;
334 0 : uchar *kpos=keypos;
335 : MARIA_KEY tmp_key;
336 :
337 0 : tmp_key.data= lastkey;
338 0 : tmp_key.keyinfo= keyinfo;
339 :
340 0 : if (!(tmp_key_length=(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag,
341 : &kpos)))
342 : {
343 0 : my_errno= HA_ERR_CRASHED;
344 0 : DBUG_RETURN(-1);
345 : }
346 0 : root= _ma_row_pos_from_key(&tmp_key);
347 0 : if (subkeys == -1)
348 : {
349 : /* the last entry in sub-tree */
350 0 : if (_ma_dispose(info, root, 1))
351 0 : DBUG_RETURN(-1);
352 : /* fall through to normal delete */
353 : }
354 : else
355 : {
356 : MARIA_KEY word_key;
357 0 : keyinfo=&share->ft2_keyinfo;
358 : /* we'll modify key entry 'in vivo' */
359 0 : kpos-=keyinfo->keylength+nod_flag;
360 0 : get_key_full_length_rdonly(off, key->data);
361 :
362 0 : word_key.data= key->data + off;
363 0 : word_key.keyinfo= &share->ft2_keyinfo;
364 0 : word_key.data_length= HA_FT_WLEN;
365 0 : word_key.ref_length= 0;
366 0 : word_key.flag= 0;
367 0 : ret_value= _ma_ck_real_delete(info, &word_key, &root);
368 0 : _ma_dpointer(share, kpos+HA_FT_WLEN, root);
369 0 : subkeys++;
370 0 : ft_intXstore(kpos, subkeys);
371 0 : if (!ret_value)
372 : {
373 0 : page_mark_changed(info, anc_page);
374 0 : ret_value= _ma_write_keypage(anc_page,
375 : PAGECACHE_LOCK_LEFT_WRITELOCKED,
376 : DFLT_INIT_HITS);
377 : }
378 0 : DBUG_PRINT("exit",("Return: %d",ret_value));
379 0 : DBUG_RETURN(ret_value);
380 : }
381 : }
382 : }
383 2043779 : leaf_buff=0;
384 2043779 : if (nod_flag)
385 : {
386 : /* Read left child page */
387 814392 : leaf_page.pos= _ma_kpos(nod_flag,keypos);
388 814392 : if (!(leaf_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
389 : MARIA_MAX_KEY_BUFF*2)))
390 : {
391 0 : DBUG_PRINT("error", ("Couldn't allocate memory"));
392 0 : my_errno=ENOMEM;
393 0 : DBUG_RETURN(-1);
394 : }
395 814392 : if (_ma_fetch_keypage(&leaf_page, info,keyinfo, leaf_page.pos,
396 : PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, leaf_buff,
397 : 0))
398 2043779 : goto err;
399 : }
400 :
401 2043779 : if (flag != 0)
402 : {
403 810129 : if (!nod_flag)
404 : {
405 0 : DBUG_PRINT("error",("Didn't find key"));
406 0 : my_errno=HA_ERR_CRASHED; /* This should newer happend */
407 0 : goto err;
408 : }
409 810129 : save_flag=0;
410 810129 : ret_value= d_search(info, key, comp_flag, &leaf_page);
411 : }
412 : else
413 : { /* Found key */
414 : uint tmp;
415 1233650 : uint anc_buff_length= anc_page->size;
416 1233650 : uint anc_page_flag= anc_page->flag;
417 : my_off_t next_block;
418 :
419 1233650 : if (!(tmp= remove_key(keyinfo, anc_page_flag, nod_flag, keypos, lastkey,
420 : anc_page->buff + anc_buff_length,
421 : &next_block, &s_temp)))
422 1233650 : goto err;
423 :
424 1233650 : page_mark_changed(info, anc_page);
425 1233650 : anc_buff_length-= tmp;
426 1233650 : anc_page->size= anc_buff_length;
427 1233650 : page_store_size(share, anc_page);
428 :
429 : /*
430 : Log initial changes on pages
431 : If there is an underflow, there will be more changes logged to the
432 : page
433 : */
434 1233650 : if (share->now_transactional &&
435 : _ma_log_delete(anc_page, s_temp.key_pos,
436 : s_temp.changed_length, s_temp.move_length))
437 0 : DBUG_RETURN(-1);
438 :
439 1233650 : if (!nod_flag)
440 : { /* On leaf page */
441 1229387 : if (anc_buff_length <= (info->quick_mode ?
442 : MARIA_MIN_KEYBLOCK_LENGTH :
443 : (uint) keyinfo->underflow_block_length))
444 : {
445 : /* Page will be written by caller if we return 1 */
446 202016 : DBUG_RETURN(1);
447 : }
448 1027371 : if (_ma_write_keypage(anc_page,
449 : PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
450 0 : DBUG_RETURN(-1);
451 1027371 : DBUG_RETURN(0);
452 : }
453 4263 : save_flag=1; /* Mark that anc_buff is changed */
454 4263 : ret_value= del(info, key, anc_page, &leaf_page,
455 : keypos, next_block, lastkey);
456 : }
457 814392 : if (ret_value >0)
458 : {
459 3393 : save_flag=1;
460 3393 : if (ret_value == 1)
461 3393 : ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
462 : else
463 : {
464 : /* This can only happen with variable length keys */
465 : MARIA_KEY last_key;
466 0 : DBUG_PRINT("test",("Enlarging of key when deleting"));
467 :
468 0 : last_key.data= lastkey;
469 0 : last_key.keyinfo= keyinfo;
470 0 : if (!_ma_get_last_key(&last_key, anc_page, keypos))
471 0 : goto err;
472 0 : ret_value= _ma_insert(info, key, anc_page, keypos,
473 : last_key.data,
474 : (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
475 : }
476 : }
477 814392 : if (ret_value == 0 && anc_page->size >
478 : (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
479 : {
480 : /* parent buffer got too big ; We have to split the page */
481 0 : save_flag=1;
482 0 : ret_value= _ma_split_page(info, key, anc_page,
483 : (uint) (keyinfo->block_length -
484 : KEYPAGE_CHECKSUM_SIZE),
485 : (uchar*) 0, 0, 0, lastkey, 0) | 2;
486 : }
487 814392 : if (save_flag && ret_value != 1)
488 : {
489 4739 : page_mark_changed(info, anc_page);
490 4739 : if (_ma_write_keypage(anc_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
491 : DFLT_INIT_HITS))
492 0 : ret_value= -1;
493 : }
494 : else
495 : {
496 809653 : DBUG_DUMP("page", anc_page->buff, anc_page->size);
497 : }
498 : my_afree(leaf_buff);
499 814392 : DBUG_PRINT("exit",("Return: %d",ret_value));
500 814392 : DBUG_RETURN(ret_value);
501 :
502 0 : err:
503 : my_afree(leaf_buff);
504 0 : DBUG_PRINT("exit",("Error: %d",my_errno));
505 0 : DBUG_RETURN (-1);
506 : } /* d_search */
507 :
508 :
509 : /**
510 : @brief Remove a key that has a page-reference
511 :
512 : @param info Maria handler
513 : @param key Buffer for key to be inserted at upper level
514 : @param anc_page Page address for page where deleted key was
515 : @param anc_buff Page buffer (nod) where deleted key was
516 : @param leaf_page Page address for nod before the deleted key
517 : @param leaf_buff Buffer for leaf_page
518 : @param leaf_buff_link Pinned page link for leaf_buff
519 : @param keypos Pos to where deleted key was on anc_buff
520 : @param next_block Page adress for nod after deleted key
521 : @param ret_key_buff Key before keypos in anc_buff
522 :
523 : @notes
524 : leaf_page must be written to disk if retval > 0
525 : anc_page is not updated on disk. Caller should do this
526 :
527 : @return
528 : @retval < 0 Error
529 : @retval 0 OK. leaf_buff is written to disk
530 :
531 : @retval 1 key contains key to upper level (from balance page)
532 : leaf_buff has underflow
533 : @retval 2 key contains key to upper level (from split space)
534 : */
535 :
536 : static int del(MARIA_HA *info, MARIA_KEY *key,
537 : MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
538 : uchar *keypos, my_off_t next_block, uchar *ret_key_buff)
539 4282 : {
540 : int ret_value,length;
541 : uint a_length, page_flag, nod_flag, leaf_length, new_leaf_length;
542 : uchar keybuff[MARIA_MAX_KEY_BUFF],*endpos,*next_buff,*key_start, *prev_key;
543 : uchar *anc_buff;
544 : MARIA_KEY_PARAM s_temp;
545 : MARIA_KEY tmp_key;
546 4282 : MARIA_SHARE *share= info->s;
547 4282 : MARIA_KEYDEF *keyinfo= key->keyinfo;
548 : MARIA_KEY ret_key;
549 : MARIA_PAGE next_page;
550 4282 : DBUG_ENTER("del");
551 4282 : DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx", (long) leaf_page,
552 : (ulong) keypos));
553 4282 : DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
554 :
555 4282 : page_flag= leaf_page->flag;
556 4282 : leaf_length= leaf_page->size;
557 4282 : nod_flag= leaf_page->node;
558 :
559 4282 : endpos= leaf_page->buff + leaf_length;
560 4282 : tmp_key.keyinfo= keyinfo;
561 4282 : tmp_key.data= keybuff;
562 :
563 4282 : if (!(key_start= _ma_get_last_key(&tmp_key, leaf_page, endpos)))
564 0 : DBUG_RETURN(-1);
565 :
566 4282 : if (nod_flag)
567 : {
568 19 : next_page.pos= _ma_kpos(nod_flag,endpos);
569 19 : if (!(next_buff= (uchar*) my_alloca((uint) keyinfo->block_length+
570 : MARIA_MAX_KEY_BUFF*2)))
571 0 : DBUG_RETURN(-1);
572 19 : if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
573 : PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, next_buff, 0))
574 0 : ret_value= -1;
575 : else
576 : {
577 19 : DBUG_DUMP("next_page", next_page.buff, next_page.size);
578 19 : if ((ret_value= del(info, key, anc_page, &next_page,
579 : keypos, next_block, ret_key_buff)) >0)
580 : {
581 : /* Get new length after key was deleted */
582 0 : endpos= leaf_page->buff+ leaf_page->size;
583 0 : if (ret_value == 1)
584 : {
585 0 : ret_value= underflow(info, keyinfo, leaf_page, &next_page,
586 : endpos);
587 0 : if (ret_value == 0 && leaf_page->size >
588 : (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
589 : {
590 0 : ret_value= (_ma_split_page(info, key, leaf_page,
591 : (uint) (keyinfo->block_length -
592 : KEYPAGE_CHECKSUM_SIZE),
593 : (uchar*) 0, 0, 0,
594 : ret_key_buff, 0) | 2);
595 : }
596 : }
597 : else
598 : {
599 0 : DBUG_PRINT("test",("Inserting of key when deleting"));
600 0 : if (!_ma_get_last_key(&tmp_key, leaf_page, endpos))
601 0 : goto err;
602 0 : ret_value= _ma_insert(info, key, leaf_page, endpos,
603 : tmp_key.data, (MARIA_PAGE *) 0, (uchar*) 0,
604 : 0);
605 : }
606 : }
607 19 : page_mark_changed(info, leaf_page);
608 : /*
609 : If ret_value <> 0, then leaf_page underflowed and caller will have
610 : to handle underflow and write leaf_page to disk.
611 : We can't write it here, as if leaf_page is empty we get an assert
612 : in _ma_write_keypage.
613 : */
614 19 : if (ret_value == 0 && _ma_write_keypage(leaf_page,
615 : PAGECACHE_LOCK_LEFT_WRITELOCKED,
616 : DFLT_INIT_HITS))
617 19 : goto err;
618 : }
619 : my_afree(next_buff);
620 19 : DBUG_RETURN(ret_value);
621 : }
622 :
623 : /*
624 : Remove last key from leaf page
625 : Note that leaf_page page may only have had one key (can normally only
626 : happen in quick mode), in which ase it will now temporary have 0 keys
627 : on it. This will be corrected by the caller as we will return 0.
628 : */
629 4263 : new_leaf_length= (uint) (key_start - leaf_page->buff);
630 4263 : leaf_page->size= new_leaf_length;
631 4263 : page_store_size(share, leaf_page);
632 :
633 4263 : if (share->now_transactional &&
634 : _ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
635 4263 : goto err;
636 :
637 4263 : page_mark_changed(info, leaf_page); /* Safety */
638 4263 : if (new_leaf_length <= (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
639 : (uint) keyinfo->underflow_block_length))
640 : {
641 : /* Underflow, leaf_page will be written by caller */
642 81 : ret_value= 1;
643 : }
644 : else
645 : {
646 4182 : ret_value= 0;
647 4182 : if (_ma_write_keypage(leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
648 : DFLT_INIT_HITS))
649 4263 : goto err;
650 : }
651 :
652 : /* Place last key in ancestor page on deleted key position */
653 4263 : a_length= anc_page->size;
654 4263 : anc_buff= anc_page->buff;
655 4263 : endpos= anc_buff + a_length;
656 :
657 4263 : ret_key.keyinfo= keyinfo;
658 4263 : ret_key.data= ret_key_buff;
659 :
660 4263 : prev_key= 0;
661 4263 : if (keypos != anc_buff+share->keypage_header + share->base.key_reflength)
662 : {
663 3496 : if (!_ma_get_last_key(&ret_key, anc_page, keypos))
664 3496 : goto err;
665 3496 : prev_key= ret_key.data;
666 : }
667 4263 : length= (*keyinfo->pack_key)(&tmp_key, share->base.key_reflength,
668 : keypos == endpos ? (uchar*) 0 : keypos,
669 : prev_key, prev_key,
670 : &s_temp);
671 4263 : if (length > 0)
672 4263 : bmove_upp(endpos+length,endpos,(uint) (endpos-keypos));
673 : else
674 0 : bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
675 4263 : (*keyinfo->store_key)(keyinfo,keypos,&s_temp);
676 4263 : key_start= keypos;
677 4263 : if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
678 : SEARCH_PAGE_KEY_HAS_TRANSID))
679 : {
680 679 : _ma_mark_page_with_transid(share, anc_page);
681 : }
682 :
683 : /* Save pointer to next leaf on parent page */
684 4263 : if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
685 : &keypos))
686 4263 : goto err;
687 4263 : _ma_kpointer(info,keypos - share->base.key_reflength,next_block);
688 4263 : anc_page->size= a_length + length;
689 4263 : page_store_size(share, anc_page);
690 :
691 4263 : if (share->now_transactional &&
692 : _ma_log_add(anc_page, a_length,
693 : key_start, s_temp.changed_length, s_temp.move_length, 1))
694 4263 : goto err;
695 :
696 4263 : DBUG_RETURN(new_leaf_length <=
697 : (info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
698 : (uint) keyinfo->underflow_block_length));
699 0 : err:
700 0 : DBUG_RETURN(-1);
701 : } /* del */
702 :
703 :
704 : /**
705 : @brief Balances adjacent pages if underflow occours
706 :
707 : @fn underflow()
708 : @param anc_buff Anchestor page data
709 : @param leaf_page Page number of leaf page
710 : @param leaf_buff Leaf page (page that underflowed)
711 : @param leaf_page_link Pointer to pin information about leaf page
712 : @param keypos Position after current key in anc_buff
713 :
714 : @note
715 : This function writes redo entries for all changes
716 : leaf_page is saved to disk
717 : Caller must save anc_buff
718 :
719 : @return
720 : @retval 0 ok
721 : @retval 1 ok, but anc_buff did underflow
722 : @retval -1 error
723 : */
724 :
725 : static int underflow(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
726 : MARIA_PAGE *anc_page, MARIA_PAGE *leaf_page,
727 : uchar *keypos)
728 3393 : {
729 : int t_length;
730 : uint anc_length,buff_length,leaf_length,p_length,s_length,nod_flag;
731 : uint next_buff_length, new_buff_length, key_reflength;
732 : uint unchanged_leaf_length, new_leaf_length, new_anc_length;
733 : uint anc_page_flag, page_flag;
734 : uchar anc_key_buff[MARIA_MAX_KEY_BUFF], leaf_key_buff[MARIA_MAX_KEY_BUFF];
735 : uchar *endpos, *next_keypos, *anc_pos, *half_pos, *prev_key;
736 : uchar *anc_buff, *leaf_buff;
737 : uchar *after_key, *anc_end_pos;
738 : MARIA_KEY_PARAM key_deleted, key_inserted;
739 3393 : MARIA_SHARE *share= info->s;
740 : my_bool first_key;
741 : MARIA_KEY tmp_key, anc_key, leaf_key;
742 : MARIA_PAGE next_page;
743 3393 : DBUG_ENTER("underflow");
744 3393 : DBUG_PRINT("enter",("leaf_page: %ld keypos: 0x%lx",(long) leaf_page->pos,
745 : (ulong) keypos));
746 3393 : DBUG_DUMP("anc_buff", anc_page->buff, anc_page->size);
747 3393 : DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
748 :
749 3393 : anc_page_flag= anc_page->flag;
750 3393 : anc_buff= anc_page->buff;
751 3393 : leaf_buff= leaf_page->buff;
752 3393 : info->keyread_buff_used=1;
753 3393 : next_keypos=keypos;
754 3393 : nod_flag= leaf_page->node;
755 3393 : p_length= nod_flag+share->keypage_header;
756 3393 : anc_length= anc_page->size;
757 3393 : leaf_length= leaf_page->size;
758 3393 : key_reflength= share->base.key_reflength;
759 3393 : if (share->keyinfo+info->lastinx == keyinfo)
760 35 : info->page_changed=1;
761 3393 : first_key= keypos == anc_buff + share->keypage_header + key_reflength;
762 :
763 3393 : tmp_key.data= info->buff;
764 3393 : anc_key.data= anc_key_buff;
765 3393 : leaf_key.data= leaf_key_buff;
766 3393 : tmp_key.keyinfo= leaf_key.keyinfo= anc_key.keyinfo= keyinfo;
767 :
768 3393 : if ((keypos < anc_buff + anc_length && (info->state->records & 1)) ||
769 : first_key)
770 : {
771 : size_t tmp_length;
772 : uint next_page_flag;
773 : /* Use page right of anc-page */
774 1626 : DBUG_PRINT("test",("use right page"));
775 :
776 : /*
777 : Calculate position after the current key. Note that keydata itself is
778 : not used
779 : */
780 1626 : if (keyinfo->flag & HA_BINARY_PACK_KEY)
781 : {
782 63 : if (!(next_keypos= _ma_get_key(&tmp_key, anc_page, keypos)))
783 : goto err;
784 : }
785 : else
786 : {
787 : /* Avoid length error check if packed key */
788 1563 : tmp_key.data[0]= tmp_key.data[1]= 0;
789 : /* Got to end of found key */
790 1563 : if (!(*keyinfo->get_key)(&tmp_key, anc_page_flag, key_reflength,
791 : &next_keypos))
792 1626 : goto err;
793 : }
794 1626 : next_page.pos= _ma_kpos(key_reflength, next_keypos);
795 1626 : if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
796 : PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
797 1626 : goto err;
798 1626 : next_buff_length= next_page.size;
799 1626 : next_page_flag= next_page.flag;
800 1626 : DBUG_DUMP("next", next_page.buff, next_page.size);
801 :
802 : /* find keys to make a big key-page */
803 1626 : bmove(next_keypos-key_reflength, next_page.buff + share->keypage_header,
804 : key_reflength);
805 :
806 1626 : if (!_ma_get_last_key(&anc_key, anc_page, next_keypos) ||
807 : !_ma_get_last_key(&leaf_key, leaf_page, leaf_buff+leaf_length))
808 : goto err;
809 :
810 : /* merge pages and put parting key from anc_page between */
811 1626 : prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
812 1626 : t_length= (*keyinfo->pack_key)(&anc_key, nod_flag, next_page.buff+p_length,
813 : prev_key, prev_key, &key_inserted);
814 1626 : tmp_length= next_buff_length - p_length;
815 1626 : endpos= next_page.buff + tmp_length + leaf_length + t_length;
816 : /* next_page.buff will always be larger than before !*/
817 1626 : bmove_upp(endpos, next_page.buff + next_buff_length, tmp_length);
818 1626 : memcpy(next_page.buff, leaf_buff,(size_t) leaf_length);
819 1626 : (*keyinfo->store_key)(keyinfo, next_page.buff+leaf_length, &key_inserted);
820 1626 : buff_length= (uint) (endpos - next_page.buff);
821 :
822 : /* Set page flag from combination of both key pages and parting key */
823 1626 : page_flag= next_page_flag | leaf_page->flag;
824 1626 : if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
825 : SEARCH_PAGE_KEY_HAS_TRANSID))
826 234 : page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
827 :
828 1626 : next_page.size= buff_length;
829 1626 : next_page.flag= page_flag;
830 1626 : page_store_info(share, &next_page);
831 :
832 : /* remove key from anc_page */
833 1626 : if (!(s_length=remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
834 : anc_key_buff, anc_buff+anc_length,
835 : (my_off_t *) 0, &key_deleted)))
836 1626 : goto err;
837 :
838 1626 : new_anc_length= anc_length - s_length;
839 1626 : anc_page->size= new_anc_length;
840 1626 : page_store_size(share, anc_page);
841 :
842 1626 : if (buff_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
843 : {
844 : /* All keys fitted into one page */
845 1413 : page_mark_changed(info, &next_page);
846 1413 : if (_ma_dispose(info, next_page.pos, 0))
847 1413 : goto err;
848 :
849 1413 : memcpy(leaf_buff, next_page.buff, (size_t) buff_length);
850 1413 : leaf_page->size= next_page.size;
851 1413 : leaf_page->flag= next_page.flag;
852 :
853 1413 : if (share->now_transactional)
854 : {
855 : /* Log changes to parent page */
856 829 : if (_ma_log_delete(anc_page, key_deleted.key_pos,
857 : key_deleted.changed_length,
858 : key_deleted.move_length))
859 829 : goto err;
860 : /*
861 : Log changes to leaf page. Data for leaf page is in leaf_buff
862 : which contains original leaf_buff, parting key and next_buff
863 : */
864 829 : if (_ma_log_suffix(leaf_page, leaf_length, buff_length))
865 : goto err;
866 : }
867 : }
868 : else
869 : {
870 : /*
871 : Balancing didn't free a page, so we have to split 'buff' into two
872 : pages:
873 : - Find key in middle of buffer
874 : - Store everything before key in 'leaf_page'
875 : - Pack key into anc_page at position of deleted key
876 : Note that anc_page may overflow! (is handled by caller)
877 : - Store remaining keys in next_page (buff)
878 : */
879 : MARIA_KEY_PARAM anc_key_inserted;
880 :
881 213 : anc_end_pos= anc_buff + new_anc_length;
882 :
883 213 : DBUG_PRINT("test",("anc_buff: 0x%lx anc_end_pos: 0x%lx",
884 : (long) anc_buff, (long) anc_end_pos));
885 :
886 213 : if (!first_key && !_ma_get_last_key(&anc_key, anc_page, keypos))
887 213 : goto err;
888 213 : if (!(half_pos= _ma_find_half_pos(&leaf_key, &next_page, &after_key)))
889 213 : goto err;
890 213 : new_leaf_length= (uint) (half_pos - next_page.buff);
891 213 : memcpy(leaf_buff, next_page.buff, (size_t) new_leaf_length);
892 :
893 213 : leaf_page->size= new_leaf_length;
894 213 : leaf_page->flag= page_flag;
895 213 : page_store_info(share, leaf_page);
896 :
897 : /* Correct new keypointer to leaf_page */
898 213 : half_pos=after_key;
899 213 : _ma_kpointer(info,
900 : leaf_key.data + leaf_key.data_length + leaf_key.ref_length,
901 : next_page.pos);
902 :
903 : /* Save key in anc_page */
904 213 : prev_key= (first_key ? (uchar*) 0 : anc_key.data);
905 213 : t_length= (*keyinfo->pack_key)(&leaf_key, key_reflength,
906 : (keypos == anc_end_pos ? (uchar*) 0 :
907 : keypos),
908 : prev_key, prev_key, &anc_key_inserted);
909 213 : if (t_length >= 0)
910 213 : bmove_upp(anc_end_pos+t_length, anc_end_pos,
911 : (uint) (anc_end_pos - keypos));
912 : else
913 0 : bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
914 213 : (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
915 213 : new_anc_length+= t_length;
916 213 : anc_page->size= new_anc_length;
917 213 : page_store_size(share, anc_page);
918 :
919 213 : if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
920 : SEARCH_PAGE_KEY_HAS_TRANSID))
921 27 : _ma_mark_page_with_transid(share, anc_page);
922 :
923 : /* Store key first in new page */
924 213 : if (nod_flag)
925 0 : bmove(next_page.buff + share->keypage_header, half_pos-nod_flag,
926 : (size_t) nod_flag);
927 213 : if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
928 213 : goto err;
929 213 : t_length=(int) (*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
930 : (uchar*) 0, (uchar*) 0,
931 : &key_inserted);
932 : /* t_length will always be > 0 for a new page !*/
933 213 : tmp_length= (size_t) ((next_page.buff + buff_length) - half_pos);
934 213 : bmove(next_page.buff + p_length + t_length, half_pos, tmp_length);
935 213 : (*keyinfo->store_key)(keyinfo, next_page.buff + p_length, &key_inserted);
936 213 : new_buff_length= tmp_length + t_length + p_length;
937 213 : next_page.size= new_buff_length;
938 213 : page_store_size(share, &next_page);
939 : /* keypage flag is already up to date */
940 :
941 213 : if (share->now_transactional)
942 : {
943 : /*
944 : Log changes to parent page
945 : This has one key deleted from it and one key inserted to it at
946 : keypos
947 :
948 : ma_log_add ensures that we don't log changes that is outside of
949 : key block size, as the REDO code can't handle that
950 : */
951 112 : if (_ma_log_add(anc_page, anc_length, keypos,
952 : anc_key_inserted.move_length +
953 : max(anc_key_inserted.changed_length -
954 : anc_key_inserted.move_length,
955 : key_deleted.changed_length),
956 : anc_key_inserted.move_length -
957 : key_deleted.move_length, 1))
958 112 : goto err;
959 :
960 : /*
961 : Log changes to leaf page.
962 : This contains original data with new data added at end
963 : */
964 112 : DBUG_ASSERT(leaf_length <= new_leaf_length);
965 112 : if (_ma_log_suffix(leaf_page, leaf_length, new_leaf_length))
966 112 : goto err;
967 : /*
968 : Log changes to next page
969 :
970 : This contains original data with some prefix data deleted and
971 : some compressed data at start possible extended
972 :
973 : Data in buff was originally:
974 : org_leaf_buff [leaf_length]
975 : separator_key [buff_key_inserted.move_length]
976 : next_key_changes [buff_key_inserted.changed_length -move_length]
977 : next_page_data [next_buff_length - p_length -
978 : (buff_key_inserted.changed_length -move_length)]
979 :
980 : After changes it's now:
981 : unpacked_key [key_inserted.changed_length]
982 : next_suffix [next_buff_length - key_inserted.changed_length]
983 :
984 : */
985 112 : DBUG_ASSERT(new_buff_length <= next_buff_length);
986 112 : if (_ma_log_prefix(&next_page, key_inserted.changed_length,
987 : (int) (new_buff_length - next_buff_length)))
988 213 : goto err;
989 : }
990 213 : page_mark_changed(info, &next_page);
991 213 : if (_ma_write_keypage(&next_page,
992 : PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
993 1626 : goto err;
994 : }
995 :
996 1626 : page_mark_changed(info, leaf_page);
997 1626 : if (_ma_write_keypage(leaf_page,
998 : PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
999 1626 : goto err;
1000 1626 : DBUG_RETURN(new_anc_length <=
1001 : ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1002 : (uint) keyinfo->underflow_block_length)));
1003 : }
1004 :
1005 1767 : DBUG_PRINT("test",("use left page"));
1006 :
1007 1767 : keypos= _ma_get_last_key(&anc_key, anc_page, keypos);
1008 1767 : if (!keypos)
1009 1767 : goto err;
1010 1767 : next_page.pos= _ma_kpos(key_reflength,keypos);
1011 1767 : if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos,
1012 : PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, info->buff, 0))
1013 1767 : goto err;
1014 1767 : buff_length= next_page.size;
1015 1767 : endpos= next_page.buff + buff_length;
1016 1767 : DBUG_DUMP("prev", next_page.buff, next_page.size);
1017 :
1018 : /* find keys to make a big key-page */
1019 1767 : bmove(next_keypos - key_reflength, leaf_buff + share->keypage_header,
1020 : key_reflength);
1021 1767 : next_keypos=keypos;
1022 1767 : if (!(*keyinfo->get_key)(&anc_key, anc_page_flag, key_reflength,
1023 : &next_keypos))
1024 1767 : goto err;
1025 1767 : if (!_ma_get_last_key(&leaf_key, &next_page, endpos))
1026 1767 : goto err;
1027 :
1028 : /* merge pages and put parting key from anc_page between */
1029 1767 : prev_key= (leaf_length == p_length ? (uchar*) 0 : leaf_key.data);
1030 1767 : t_length=(*keyinfo->pack_key)(&anc_key, nod_flag,
1031 : (leaf_length == p_length ?
1032 : (uchar*) 0 : leaf_buff+p_length),
1033 : prev_key, prev_key,
1034 : &key_inserted);
1035 1767 : if (t_length >= 0)
1036 1753 : bmove(endpos+t_length, leaf_buff+p_length,
1037 : (size_t) (leaf_length-p_length));
1038 : else /* We gained space */
1039 14 : bmove(endpos,leaf_buff+((int) p_length-t_length),
1040 : (size_t) (leaf_length-p_length+t_length));
1041 1767 : (*keyinfo->store_key)(keyinfo,endpos, &key_inserted);
1042 :
1043 : /* Remember for logging how many bytes of leaf_buff that are not changed */
1044 1767 : DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
1045 1767 : unchanged_leaf_length= leaf_length - (key_inserted.changed_length -
1046 : key_inserted.move_length);
1047 :
1048 1767 : new_buff_length= buff_length + leaf_length - p_length + t_length;
1049 :
1050 1767 : page_flag= next_page.flag | leaf_page->flag;
1051 1767 : if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1052 : SEARCH_PAGE_KEY_HAS_TRANSID))
1053 257 : page_flag|= KEYPAGE_FLAG_HAS_TRANSID;
1054 :
1055 1767 : next_page.size= new_buff_length;
1056 1767 : next_page.flag= page_flag;
1057 1767 : page_store_info(share, &next_page);
1058 :
1059 : /* remove key from anc_page */
1060 1767 : if (!(s_length= remove_key(keyinfo, anc_page_flag, key_reflength, keypos,
1061 : anc_key_buff,
1062 : anc_buff+anc_length, (my_off_t *) 0,
1063 : &key_deleted)))
1064 1767 : goto err;
1065 :
1066 1767 : new_anc_length= anc_length - s_length;
1067 1767 : anc_page->size= new_anc_length;
1068 1767 : page_store_size(share, anc_page);
1069 :
1070 1767 : if (new_buff_length <= (uint) (keyinfo->block_length -
1071 : KEYPAGE_CHECKSUM_SIZE))
1072 : {
1073 : /* All keys fitted into one page */
1074 1484 : page_mark_changed(info, leaf_page);
1075 1484 : if (_ma_dispose(info, leaf_page->pos, 0))
1076 1484 : goto err;
1077 :
1078 1484 : if (share->now_transactional)
1079 : {
1080 : /* Log changes to parent page */
1081 958 : if (_ma_log_delete(anc_page, key_deleted.key_pos,
1082 : key_deleted.changed_length, key_deleted.move_length))
1083 :
1084 958 : goto err;
1085 : /*
1086 : Log changes to next page. Data for leaf page is in buff
1087 : that contains original leaf_buff, parting key and next_buff
1088 : */
1089 958 : if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1090 : goto err;
1091 : }
1092 : }
1093 : else
1094 : {
1095 : /*
1096 : Balancing didn't free a page, so we have to split 'next_page' into two
1097 : pages
1098 : - Find key in middle of buffer (buff)
1099 : - Pack key at half_buff into anc_page at position of deleted key
1100 : Note that anc_page may overflow! (is handled by caller)
1101 : - Move everything after middlekey to 'leaf_buff'
1102 : - Shorten buff at 'endpos'
1103 : */
1104 : MARIA_KEY_PARAM anc_key_inserted;
1105 : size_t tmp_length;
1106 :
1107 283 : if (keypos == anc_buff + share->keypage_header + key_reflength)
1108 62 : anc_pos= 0; /* First key */
1109 : else
1110 : {
1111 221 : if (!_ma_get_last_key(&anc_key, anc_page, keypos))
1112 221 : goto err;
1113 221 : anc_pos= anc_key.data;
1114 : }
1115 283 : if (!(endpos= _ma_find_half_pos(&leaf_key, &next_page, &half_pos)))
1116 283 : goto err;
1117 :
1118 : /* Correct new keypointer to leaf_page */
1119 283 : _ma_kpointer(info,leaf_key.data + leaf_key.data_length +
1120 : leaf_key.ref_length, leaf_page->pos);
1121 :
1122 : /* Save key in anc_page */
1123 283 : DBUG_DUMP("anc_buff", anc_buff, new_anc_length);
1124 283 : DBUG_DUMP_KEY("key_to_anc", &leaf_key);
1125 283 : anc_end_pos= anc_buff + new_anc_length;
1126 283 : t_length=(*keyinfo->pack_key)(&leaf_key, key_reflength,
1127 : keypos == anc_end_pos ? (uchar*) 0
1128 : : keypos,
1129 : anc_pos, anc_pos,
1130 : &anc_key_inserted);
1131 283 : if (t_length >= 0)
1132 283 : bmove_upp(anc_end_pos+t_length, anc_end_pos,
1133 : (uint) (anc_end_pos-keypos));
1134 : else
1135 0 : bmove(keypos,keypos-t_length,(uint) (anc_end_pos-keypos)+t_length);
1136 283 : (*keyinfo->store_key)(keyinfo,keypos, &anc_key_inserted);
1137 283 : new_anc_length+= t_length;
1138 283 : anc_page->size= new_anc_length;
1139 283 : page_store_size(share, anc_page);
1140 :
1141 283 : if (leaf_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
1142 : SEARCH_PAGE_KEY_HAS_TRANSID))
1143 43 : _ma_mark_page_with_transid(share, anc_page);
1144 :
1145 : /* Store first key on new page */
1146 283 : if (nod_flag)
1147 0 : bmove(leaf_buff + share->keypage_header, half_pos-nod_flag,
1148 : (size_t) nod_flag);
1149 283 : if (!(*keyinfo->get_key)(&leaf_key, page_flag, nod_flag, &half_pos))
1150 283 : goto err;
1151 283 : DBUG_DUMP_KEY("key_to_leaf", &leaf_key);
1152 283 : t_length=(*keyinfo->pack_key)(&leaf_key, nod_flag, (uchar*) 0,
1153 : (uchar*) 0, (uchar*) 0, &key_inserted);
1154 : /* t_length will always be > 0 for a new page !*/
1155 283 : tmp_length= (size_t) ((next_page.buff + new_buff_length) - half_pos);
1156 283 : DBUG_PRINT("info",("t_length: %d length: %d",t_length, (int) tmp_length));
1157 283 : bmove(leaf_buff+p_length+t_length, half_pos, tmp_length);
1158 283 : (*keyinfo->store_key)(keyinfo,leaf_buff+p_length, &key_inserted);
1159 283 : new_leaf_length= tmp_length + t_length + p_length;
1160 :
1161 283 : leaf_page->size= new_leaf_length;
1162 283 : leaf_page->flag= page_flag;
1163 283 : page_store_info(share, leaf_page);
1164 :
1165 283 : new_buff_length= (uint) (endpos - next_page.buff);
1166 283 : next_page.size= new_buff_length;
1167 283 : page_store_size(share, &next_page);
1168 :
1169 283 : if (share->now_transactional)
1170 : {
1171 : /*
1172 : Log changes to parent page
1173 : This has one key deleted from it and one key inserted to it at
1174 : keypos
1175 :
1176 : ma_log_add() ensures that we don't log changes that is outside of
1177 : key block size, as the REDO code can't handle that
1178 : */
1179 167 : if (_ma_log_add(anc_page, anc_length, keypos,
1180 : anc_key_inserted.move_length +
1181 : max(anc_key_inserted.changed_length -
1182 : anc_key_inserted.move_length,
1183 : key_deleted.changed_length),
1184 : anc_key_inserted.move_length -
1185 : key_deleted.move_length, 1))
1186 167 : goto err;
1187 :
1188 : /*
1189 : Log changes to leaf page.
1190 : This contains original data with new data added first
1191 : */
1192 167 : DBUG_ASSERT(leaf_length <= new_leaf_length);
1193 167 : if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
1194 : (int) (new_leaf_length - leaf_length)))
1195 167 : goto err;
1196 : /*
1197 : Log changes to next page
1198 : This contains original data with some suffix data deleted
1199 :
1200 : */
1201 167 : DBUG_ASSERT(new_buff_length <= buff_length);
1202 167 : if (_ma_log_suffix(&next_page, buff_length, new_buff_length))
1203 283 : goto err;
1204 : }
1205 :
1206 283 : page_mark_changed(info, leaf_page);
1207 283 : if (_ma_write_keypage(leaf_page,
1208 : PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1209 1767 : goto err;
1210 : }
1211 1767 : page_mark_changed(info, &next_page);
1212 1767 : if (_ma_write_keypage(&next_page,
1213 : PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS))
1214 1767 : goto err;
1215 :
1216 1767 : DBUG_RETURN(new_anc_length <=
1217 : ((info->quick_mode ? MARIA_MIN_KEYBLOCK_LENGTH :
1218 : (uint) keyinfo->underflow_block_length)));
1219 :
1220 0 : err:
1221 0 : DBUG_RETURN(-1);
1222 : } /* underflow */
1223 :
1224 :
1225 : /**
1226 : @brief Remove a key from page
1227 :
1228 : @fn remove_key()
1229 : keyinfo Key handle
1230 : nod_flag Length of node ptr
1231 : keypos Where on page key starts
1232 : lastkey Buffer for storing keys to be removed
1233 : page_end Pointer to end of page
1234 : next_block If <> 0 and node-page, this is set to address of
1235 : next page
1236 : s_temp Information about what changes was done one the page:
1237 : s_temp.key_pos Start of key
1238 : s_temp.move_length Number of bytes removed at keypos
1239 : s_temp.changed_length Number of bytes changed at keypos
1240 :
1241 : @todo
1242 : The current code doesn't handle the case that the next key may be
1243 : packed better against the previous key if there is a case difference
1244 :
1245 : @return
1246 : @retval 0 error
1247 : @retval # How many chars was removed
1248 : */
1249 :
1250 : static uint remove_key(MARIA_KEYDEF *keyinfo, uint page_flag, uint nod_flag,
1251 : uchar *keypos, uchar *lastkey,
1252 : uchar *page_end, my_off_t *next_block,
1253 : MARIA_KEY_PARAM *s_temp)
1254 1237043 : {
1255 : int s_length;
1256 : uchar *start;
1257 1237043 : DBUG_ENTER("remove_key");
1258 1237043 : DBUG_PRINT("enter", ("keypos: 0x%lx page_end: 0x%lx",
1259 : (long) keypos, (long) page_end));
1260 :
1261 1237043 : start= s_temp->key_pos= keypos;
1262 1237043 : s_temp->changed_length= 0;
1263 1237043 : if (!(keyinfo->flag &
1264 : (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
1265 : HA_BINARY_PACK_KEY)) &&
1266 : !(page_flag & KEYPAGE_FLAG_HAS_TRANSID))
1267 : {
1268 : /* Static length key */
1269 559513 : s_length=(int) (keyinfo->keylength+nod_flag);
1270 559513 : if (next_block && nod_flag)
1271 1974 : *next_block= _ma_kpos(nod_flag,keypos+s_length);
1272 : }
1273 : else
1274 : {
1275 : /* Let keypos point at next key */
1276 : MARIA_KEY key;
1277 :
1278 : /* Calculate length of key */
1279 677530 : key.keyinfo= keyinfo;
1280 677530 : key.data= lastkey;
1281 677530 : if (!(*keyinfo->get_key)(&key, page_flag, nod_flag, &keypos))
1282 0 : DBUG_RETURN(0); /* Error */
1283 :
1284 677530 : if (next_block && nod_flag)
1285 2289 : *next_block= _ma_kpos(nod_flag,keypos);
1286 677530 : s_length=(int) (keypos-start);
1287 677530 : if (keypos != page_end)
1288 : {
1289 671065 : if (keyinfo->flag & HA_BINARY_PACK_KEY)
1290 : {
1291 124527 : uchar *old_key= start;
1292 : uint next_length,prev_length,prev_pack_length;
1293 :
1294 : /* keypos points here on start of next key */
1295 124527 : get_key_length(next_length,keypos);
1296 124527 : get_key_pack_length(prev_length,prev_pack_length,old_key);
1297 124527 : if (next_length > prev_length)
1298 : {
1299 86615 : uint diff= (next_length-prev_length);
1300 : /* We have to copy data from the current key to the next key */
1301 86615 : keypos-= diff + prev_pack_length;
1302 86615 : store_key_length(keypos, prev_length);
1303 86615 : bmove(keypos + prev_pack_length, lastkey + prev_length, diff);
1304 86615 : s_length=(int) (keypos-start);
1305 86615 : s_temp->changed_length= diff + prev_pack_length;
1306 : }
1307 : }
1308 : else
1309 : {
1310 : /* Check if a variable length first key part */
1311 546538 : if ((keyinfo->seg->flag & HA_PACK_KEY) && *keypos & 128)
1312 : {
1313 : /* Next key is packed against the current one */
1314 : uint next_length,prev_length,prev_pack_length,lastkey_length,
1315 : rest_length;
1316 346619 : if (keyinfo->seg[0].length >= 127)
1317 : {
1318 145 : if (!(prev_length=mi_uint2korr(start) & 32767))
1319 145 : goto end;
1320 145 : next_length=mi_uint2korr(keypos) & 32767;
1321 145 : keypos+=2;
1322 145 : prev_pack_length=2;
1323 : }
1324 : else
1325 : {
1326 346474 : if (!(prev_length= *start & 127))
1327 240292 : goto end; /* Same key as previous*/
1328 240292 : next_length= *keypos & 127;
1329 240292 : keypos++;
1330 240292 : prev_pack_length=1;
1331 : }
1332 240437 : if (!(*start & 128))
1333 23387 : prev_length=0; /* prev key not packed */
1334 240437 : if (keyinfo->seg[0].flag & HA_NULL_PART)
1335 105 : lastkey++; /* Skip null marker */
1336 240437 : get_key_length(lastkey_length,lastkey);
1337 240437 : if (!next_length) /* Same key after */
1338 : {
1339 125541 : next_length=lastkey_length;
1340 125541 : rest_length=0;
1341 : }
1342 : else
1343 114896 : get_key_length(rest_length,keypos);
1344 :
1345 240437 : if (next_length >= prev_length)
1346 : {
1347 : /* Next key is based on deleted key */
1348 : uint pack_length;
1349 205917 : uint diff= (next_length-prev_length);
1350 :
1351 : /* keypos points to data of next key (after key length) */
1352 205917 : bmove(keypos - diff, lastkey + prev_length, diff);
1353 205917 : rest_length+= diff;
1354 205917 : pack_length= prev_length ? get_pack_length(rest_length): 0;
1355 205917 : keypos-= diff + pack_length + prev_pack_length;
1356 205917 : s_length=(int) (keypos-start);
1357 205917 : if (prev_length) /* Pack against prev key */
1358 : {
1359 182530 : *keypos++= start[0];
1360 182530 : if (prev_pack_length == 2)
1361 75 : *keypos++= start[1];
1362 182530 : store_key_length(keypos,rest_length);
1363 : }
1364 : else
1365 : {
1366 : /* Next key is not packed anymore */
1367 23387 : if (keyinfo->seg[0].flag & HA_NULL_PART)
1368 : {
1369 30 : rest_length++; /* Mark not null */
1370 : }
1371 23387 : if (prev_pack_length == 2)
1372 : {
1373 70 : mi_int2store(keypos,rest_length);
1374 : }
1375 : else
1376 23317 : *keypos= rest_length;
1377 : }
1378 205917 : s_temp->changed_length= diff + pack_length + prev_pack_length;
1379 : }
1380 : }
1381 : }
1382 : }
1383 : }
1384 1237043 : end:
1385 1237043 : bmove(start, start+s_length, (uint) (page_end-start-s_length));
1386 1237043 : s_temp->move_length= s_length;
1387 1237043 : DBUG_RETURN((uint) s_length);
1388 : } /* remove_key */
1389 :
1390 :
1391 : /****************************************************************************
1392 : Logging of redos
1393 : ****************************************************************************/
1394 :
1395 : /**
1396 : @brief log entry where some parts are deleted and some things are changed
1397 :
1398 : @fn _ma_log_delete()
1399 : @param info Maria handler
1400 : @param page Pageaddress for changed page
1401 : @param buff Page buffer
1402 : @param key_pos Start of change area
1403 : @param changed_length How many bytes where changed at key_pos
1404 : @param move_length How many bytes where deleted at key_pos
1405 :
1406 : */
1407 :
1408 : my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
1409 : uint changed_length, uint move_length)
1410 690209 : {
1411 : LSN lsn;
1412 : uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
1413 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
1414 : uint translog_parts;
1415 690209 : uint offset= (uint) (key_pos - ma_page->buff);
1416 690209 : MARIA_HA *info= ma_page->info;
1417 690209 : MARIA_SHARE *share= info->s;
1418 : my_off_t page;
1419 690209 : DBUG_ENTER("_ma_log_delete");
1420 690209 : DBUG_PRINT("enter", ("page: %lu changed_length: %u move_length: %d",
1421 : (ulong) ma_page->pos, changed_length, move_length));
1422 690209 : DBUG_ASSERT(share->now_transactional && move_length);
1423 690209 : DBUG_ASSERT(offset + changed_length <= ma_page->size);
1424 :
1425 : /* Store address of new root page */
1426 690209 : page= ma_page->pos / share->block_size;
1427 690209 : page_store(log_data + FILEID_STORE_SIZE, page);
1428 690209 : log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
1429 690209 : log_pos[0]= KEY_OP_OFFSET;
1430 690209 : int2store(log_pos+1, offset);
1431 690209 : log_pos[3]= KEY_OP_SHIFT;
1432 690209 : int2store(log_pos+4, -(int) move_length);
1433 690209 : log_pos+= 6;
1434 690209 : translog_parts= 1;
1435 690209 : if (changed_length)
1436 : {
1437 120481 : log_pos[0]= KEY_OP_CHANGE;
1438 120481 : int2store(log_pos+1, changed_length);
1439 120481 : log_pos+= 3;
1440 120481 : translog_parts= 2;
1441 120481 : log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + offset;
1442 120481 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
1443 : }
1444 :
1445 : #ifdef EXTRA_DEBUG_KEY_CHANGES
1446 : {
1447 690209 : int page_length= ma_page->size;
1448 : ha_checksum crc;
1449 690209 : crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
1450 : page_length - LSN_STORE_SIZE);
1451 690209 : log_pos[0]= KEY_OP_CHECK;
1452 690209 : int2store(log_pos+1, page_length);
1453 690209 : int4store(log_pos+3, crc);
1454 :
1455 690209 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
1456 690209 : log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
1457 690209 : changed_length+= 7;
1458 690209 : translog_parts++;
1459 : }
1460 : #endif
1461 :
1462 690209 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1463 690209 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1464 :
1465 690209 : if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
1466 : info->trn, info,
1467 : (translog_size_t)
1468 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1469 : changed_length,
1470 : TRANSLOG_INTERNAL_PARTS + translog_parts,
1471 : log_array, log_data, NULL))
1472 0 : DBUG_RETURN(1);
1473 690209 : DBUG_RETURN(0);
1474 : }
1475 :
1476 :
1477 : /****************************************************************************
1478 : Logging of undos
1479 : ****************************************************************************/
1480 :
1481 : my_bool _ma_write_undo_key_delete(MARIA_HA *info, const MARIA_KEY *key,
1482 : my_off_t new_root, LSN *res_lsn)
1483 487748 : {
1484 487748 : MARIA_SHARE *share= info->s;
1485 : uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
1486 : KEY_NR_STORE_SIZE + PAGE_STORE_SIZE], *log_pos;
1487 : LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
1488 : struct st_msg_to_write_hook_for_undo_key msg;
1489 487748 : enum translog_record_type log_type= LOGREC_UNDO_KEY_DELETE;
1490 487748 : uint keynr= key->keyinfo->key_nr;
1491 :
1492 487748 : lsn_store(log_data, info->trn->undo_lsn);
1493 487748 : key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, keynr);
1494 487748 : log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + KEY_NR_STORE_SIZE;
1495 :
1496 : /**
1497 : @todo BUG if we had concurrent insert/deletes, reading state's key_root
1498 : like this would be unsafe.
1499 : */
1500 487748 : if (new_root != share->state.key_root[keynr])
1501 : {
1502 : my_off_t page;
1503 432 : page= ((new_root == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO :
1504 : new_root / share->block_size);
1505 432 : page_store(log_pos, page);
1506 432 : log_pos+= PAGE_STORE_SIZE;
1507 432 : log_type= LOGREC_UNDO_KEY_DELETE_WITH_ROOT;
1508 : }
1509 :
1510 487748 : log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
1511 487748 : log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
1512 487748 : log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data;
1513 487748 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (key->data_length +
1514 : key->ref_length);
1515 :
1516 487748 : msg.root= &share->state.key_root[keynr];
1517 487748 : msg.value= new_root;
1518 : /*
1519 : set autoincrement to 1 if this is an auto_increment key
1520 : This is only used if we are now in a rollback of a duplicate key
1521 : */
1522 487748 : msg.auto_increment= share->base.auto_key == keynr + 1;
1523 :
1524 487748 : return translog_write_record(res_lsn, log_type,
1525 : info->trn, info,
1526 : (translog_size_t)
1527 : (log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
1528 : log_array[TRANSLOG_INTERNAL_PARTS + 1].length),
1529 : TRANSLOG_INTERNAL_PARTS + 2, log_array,
1530 : log_data + LSN_STORE_SIZE, &msg) ? -1 : 0;
1531 : }
|