LTP GCOV extension - code coverage report
Current view: directory - storage/maria - ma_key_recover.c
Test: maria-mtr.html
Date: 2009-03-04 Instrumented lines: 497
Code covered: 2.4 % Executed lines: 12

       1                 : /* Copyright (C) 2007 Michael Widenius
       2                 : 
       3                 :    This program is free software; you can redistribute it and/or modify
       4                 :    it under the terms of the GNU General Public License as published by
       5                 :    the Free Software Foundation; version 2 of the License.
       6                 : 
       7                 :    This program is distributed in the hope that it will be useful,
       8                 :    but WITHOUT ANY WARRANTY; without even the implied warranty of
       9                 :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      10                 :    GNU General Public License for more details.
      11                 : 
      12                 :    You should have received a copy of the GNU General Public License
      13                 :    along with this program; if not, write to the Free Software
      14                 :    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
      15                 : 
      16                 : /* Redo of index */
      17                 : 
      18                 : #include "maria_def.h"
      19                 : #include "ma_blockrec.h"
      20                 : #include "trnman.h"
      21                 : #include "ma_key_recover.h"
      22                 : #include "ma_rt_index.h"
      23                 : 
      24                 : /****************************************************************************
      25                 :   Some helper functions used both by key page loggin and block page loggin
      26                 : ****************************************************************************/
      27                 : 
      28                 : /**
      29                 :   @brief Unpin all pinned pages
      30                 : 
      31                 :   @fn _ma_unpin_all_pages()
      32                 :   @param info      Maria handler
      33                 :   @param undo_lsn  LSN for undo pages. LSN_IMPOSSIBLE if we shouldn't write
      34                 :                    undo (like on duplicate key errors)
      35                 : 
      36                 :   info->pinned_pages is the list of pages to unpin. Each member of the list
      37                 :   must have its 'changed' saying if the page was changed or not.
      38                 : 
      39                 :   @note
      40                 :     We unpin pages in the reverse order as they where pinned; This is not
      41                 :     necessary now, but may simplify things in the future.
      42                 : 
      43                 :   @return
      44                 :   @retval   0   ok
      45                 :   @retval   1   error (fatal disk error)
      46                 : */
      47                 : 
      48                 : void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
      49            1491 : {
      50                 :   MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*)
      51            1491 :                                  dynamic_array_ptr(&info->pinned_pages, 0));
      52            1491 :   MARIA_PINNED_PAGE *pinned_page= page_link + info->pinned_pages.elements;
      53            1491 :   DBUG_ENTER("_ma_unpin_all_pages");
      54            1491 :   DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
      55                 : 
      56            1491 :   if (!info->s->now_transactional)
      57            1491 :     DBUG_ASSERT(undo_lsn == LSN_IMPOSSIBLE || maria_in_recovery);
      58                 : 
      59            2982 :   while (pinned_page-- != page_link)
      60                 :   {
      61                 :     /*
      62                 :       Note this assert fails if we got a disk error or the record file
      63                 :       is corrupted, which means we should have this enabled only in debug
      64                 :       builds.
      65                 :     */
      66                 : #ifdef EXTRA_DEBUG
      67            1491 :     DBUG_ASSERT(!pinned_page->changed ||
      68                 :                 undo_lsn != LSN_IMPOSSIBLE || !info->s->now_transactional);
      69                 : #endif
      70            1491 :     pagecache_unlock_by_link(info->s->pagecache, pinned_page->link,
      71                 :                              pinned_page->unlock, PAGECACHE_UNPIN,
      72                 :                              info->trn->rec_lsn, undo_lsn,
      73                 :                              pinned_page->changed, FALSE);
      74                 :   }
      75                 : 
      76            1491 :   info->pinned_pages.elements= 0;
      77            1491 :   DBUG_VOID_RETURN;
      78                 : }
      79                 : 
      80                 : 
      81                 : my_bool _ma_write_clr(MARIA_HA *info, LSN undo_lsn,
      82                 :                       enum translog_record_type undo_type,
      83                 :                       my_bool store_checksum, ha_checksum checksum,
      84                 :                       LSN *res_lsn, void *extra_msg)
      85               0 : {
      86                 :   uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE +
      87                 :                  HA_CHECKSUM_STORE_SIZE+ KEY_NR_STORE_SIZE + PAGE_STORE_SIZE];
      88                 :   uchar *log_pos;
      89                 :   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
      90                 :   struct st_msg_to_write_hook_for_clr_end msg;
      91                 :   my_bool res;
      92               0 :   DBUG_ENTER("_ma_write_clr");
      93                 : 
      94                 :   /* undo_lsn must be first for compression to work */
      95               0 :   lsn_store(log_data, undo_lsn);
      96               0 :   clr_type_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, undo_type);
      97               0 :   log_pos= log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE + CLR_TYPE_STORE_SIZE;
      98                 : 
      99                 :   /* Extra_msg is handled in write_hook_for_clr_end() */
     100               0 :   msg.undone_record_type= undo_type;
     101               0 :   msg.previous_undo_lsn=  undo_lsn;
     102               0 :   msg.extra_msg= extra_msg;
     103               0 :   msg.checksum_delta= 0;
     104                 : 
     105               0 :   if (store_checksum)
     106                 :   {
     107               0 :     msg.checksum_delta= checksum;
     108               0 :     ha_checksum_store(log_pos, checksum);
     109               0 :     log_pos+= HA_CHECKSUM_STORE_SIZE;
     110                 :   }
     111               0 :   else if (undo_type == LOGREC_UNDO_KEY_INSERT_WITH_ROOT ||
     112                 :            undo_type == LOGREC_UNDO_KEY_DELETE_WITH_ROOT)
     113                 :   {
     114                 :     /* Key root changed. Store new key root */
     115               0 :     struct st_msg_to_write_hook_for_undo_key *undo_msg= extra_msg;
     116                 :     pgcache_page_no_t page;
     117               0 :     key_nr_store(log_pos, undo_msg->keynr);
     118               0 :     page= (undo_msg->value == HA_OFFSET_ERROR ? IMPOSSIBLE_PAGE_NO :
     119                 :            undo_msg->value / info->s->block_size);
     120               0 :     page_store(log_pos + KEY_NR_STORE_SIZE, page);
     121               0 :     log_pos+= KEY_NR_STORE_SIZE + PAGE_STORE_SIZE;
     122                 :   }
     123               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
     124               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
     125                 : 
     126                 : 
     127                 :   /*
     128                 :     We need intern_lock mutex for calling _ma_state_info_write in the trigger.
     129                 :     We do it here to have the same sequence of mutexes locking everywhere
     130                 :     (first intern_lock then transactional log  buffer lock)
     131                 :   */
     132               0 :   if (undo_type == LOGREC_UNDO_BULK_INSERT)
     133               0 :     pthread_mutex_lock(&info->s->intern_lock);
     134                 : 
     135               0 :   res= translog_write_record(res_lsn, LOGREC_CLR_END,
     136                 :                              info->trn, info,
     137                 :                              (translog_size_t)
     138                 :                              log_array[TRANSLOG_INTERNAL_PARTS + 0].length,
     139                 :                              TRANSLOG_INTERNAL_PARTS + 1, log_array,
     140                 :                              log_data + LSN_STORE_SIZE, &msg);
     141               0 :   if (undo_type == LOGREC_UNDO_BULK_INSERT)
     142               0 :     pthread_mutex_unlock(&info->s->intern_lock);
     143               0 :   DBUG_RETURN(res);
     144                 : }
     145                 : 
     146                 : 
     147                 : /**
     148                 :    @brief Sets transaction's undo_lsn, first_undo_lsn if needed
     149                 : 
     150                 :    @return Operation status, always 0 (success)
     151                 : */
     152                 : 
     153                 : my_bool write_hook_for_clr_end(enum translog_record_type type
     154                 :                                __attribute__ ((unused)),
     155                 :                                TRN *trn, MARIA_HA *tbl_info,
     156                 :                                LSN *lsn __attribute__ ((unused)),
     157                 :                                void *hook_arg)
     158               0 : {
     159               0 :   MARIA_SHARE *share= tbl_info->s;
     160                 :   struct st_msg_to_write_hook_for_clr_end *msg=
     161               0 :     (struct st_msg_to_write_hook_for_clr_end *)hook_arg;
     162               0 :   my_bool error= FALSE;
     163               0 :   DBUG_ENTER("write_hook_for_clr_end");
     164               0 :   DBUG_ASSERT(trn->trid != 0);
     165               0 :   trn->undo_lsn= msg->previous_undo_lsn;
     166                 : 
     167               0 :   switch (msg->undone_record_type) {
     168                 :   case LOGREC_UNDO_ROW_DELETE:
     169               0 :     share->state.state.records++;
     170               0 :     share->state.state.checksum+= msg->checksum_delta;
     171               0 :     break;
     172                 :   case LOGREC_UNDO_ROW_INSERT:
     173               0 :     share->state.state.records--;
     174               0 :     share->state.state.checksum+= msg->checksum_delta;
     175               0 :     break;
     176                 :   case LOGREC_UNDO_ROW_UPDATE:
     177               0 :     share->state.state.checksum+= msg->checksum_delta;
     178               0 :     break;
     179                 :   case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
     180                 :   case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
     181                 :   {
     182                 :     /* Update key root */
     183                 :     struct st_msg_to_write_hook_for_undo_key *extra_msg=
     184               0 :       (struct st_msg_to_write_hook_for_undo_key *) msg->extra_msg;
     185               0 :     *extra_msg->root= extra_msg->value;
     186               0 :     break;
     187                 :   }
     188                 :   case LOGREC_UNDO_KEY_INSERT:
     189                 :   case LOGREC_UNDO_KEY_DELETE:
     190                 :     break;
     191                 :   case LOGREC_UNDO_BULK_INSERT:
     192               0 :     safe_mutex_assert_owner(&share->intern_lock);
     193               0 :     error= (maria_enable_indexes(tbl_info) ||
     194                 :             /* we enabled indices, need '2' below */
     195                 :             _ma_state_info_write(share,
     196                 :                                  MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
     197                 :                                  MA_STATE_INFO_WRITE_FULL_INFO));
     198                 :     /* no need for _ma_reset_status(): REDO_DELETE_ALL is just before us */
     199               0 :     break;
     200                 :   default:
     201               0 :     DBUG_ASSERT(0);
     202                 :   }
     203               0 :   if (trn->undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
     204               0 :     trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
     205               0 :   DBUG_RETURN(error);
     206                 : }
     207                 : 
     208                 : 
     209                 : /**
     210                 :   @brief write hook for undo key
     211                 : */
     212                 : 
     213                 : my_bool write_hook_for_undo_key(enum translog_record_type type,
     214                 :                                 TRN *trn, MARIA_HA *tbl_info,
     215                 :                                 LSN *lsn, void *hook_arg)
     216               0 : {
     217                 :   struct st_msg_to_write_hook_for_undo_key *msg=
     218               0 :     (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
     219                 : 
     220               0 :   *msg->root= msg->value;
     221               0 :   _ma_fast_unlock_key_del(tbl_info);
     222               0 :   return write_hook_for_undo(type, trn, tbl_info, lsn, 0);
     223                 : }
     224                 : 
     225                 : 
     226                 : /**
     227                 :    Updates "auto_increment" and calls the generic UNDO_KEY hook
     228                 : 
     229                 :    @return Operation status, always 0 (success)
     230                 : */
     231                 : 
     232                 : my_bool write_hook_for_undo_key_insert(enum translog_record_type type,
     233                 :                                        TRN *trn, MARIA_HA *tbl_info,
     234                 :                                        LSN *lsn, void *hook_arg)
     235               0 : {
     236                 :   struct st_msg_to_write_hook_for_undo_key *msg=
     237               0 :     (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
     238               0 :   MARIA_SHARE *share= tbl_info->s;
     239               0 :   if (msg->auto_increment > 0)
     240                 :   {
     241                 :     /*
     242                 :       Only reason to set it here is to have a mutex protect from checkpoint
     243                 :       reading at the same time (would see a corrupted value).
     244                 : 
     245                 :       The purpose of the following code is to set auto_increment if the row
     246                 :       has a with auto_increment value higher than the current one. We also
     247                 :       want to be able to restore the old value, in case of rollback,
     248                 :       if no one else has tried to set the value.
     249                 : 
     250                 :       The logic used is that we only restore the auto_increment value if
     251                 :       tbl_info->last_auto_increment == share->last_auto_increment
     252                 :       when it's time to do the rollback.
     253                 :     */
     254               0 :     DBUG_PRINT("info",("auto_inc: %lu new auto_inc: %lu",
     255                 :                        (ulong)share->state.auto_increment,
     256                 :                        (ulong)msg->auto_increment));
     257               0 :     if (share->state.auto_increment < msg->auto_increment)
     258                 :     {
     259                 :       /* Remember the original value, in case of rollback */
     260               0 :       tbl_info->last_auto_increment= share->last_auto_increment=
     261                 :         share->state.auto_increment;
     262               0 :       share->state.auto_increment= msg->auto_increment;
     263                 :     }
     264                 :     else
     265                 :     {
     266                 :       /*
     267                 :         If the current value would have affected the original auto_increment
     268                 :         value, set it to an impossible value so that it's not restored on
     269                 :         rollback
     270                 :       */
     271               0 :       if (msg->auto_increment > share->last_auto_increment)
     272               0 :         share->last_auto_increment= ~(ulonglong) 0;
     273                 :     }
     274                 :   }
     275               0 :   return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
     276                 : }
     277                 : 
     278                 : 
     279                 : /**
     280                 :    @brief Updates "share->auto_increment" in case of abort and calls
     281                 :    generic UNDO_KEY hook
     282                 : 
     283                 :    @return Operation status, always 0 (success)
     284                 : */
     285                 : 
     286                 : my_bool write_hook_for_undo_key_delete(enum translog_record_type type,
     287                 :                                        TRN *trn, MARIA_HA *tbl_info,
     288                 :                                        LSN *lsn, void *hook_arg)
     289               0 : {
     290                 :   struct st_msg_to_write_hook_for_undo_key *msg=
     291               0 :     (struct st_msg_to_write_hook_for_undo_key *) hook_arg;
     292               0 :   MARIA_SHARE *share= tbl_info->s;
     293               0 :   if (msg->auto_increment > 0)                  /* If auto increment key */
     294                 :   {
     295                 :     /* Restore auto increment if no one has changed it in between */
     296               0 :     if (share->last_auto_increment == tbl_info->last_auto_increment &&
     297                 :         tbl_info->last_auto_increment != ~(ulonglong) 0)
     298               0 :       share->state.auto_increment= tbl_info->last_auto_increment;
     299                 :   }
     300               0 :   return write_hook_for_undo_key(type, trn, tbl_info, lsn, hook_arg);
     301                 : }
     302                 : 
     303                 : 
     304                 : /*****************************************************************************
     305                 :   Functions for logging of key page changes
     306                 : *****************************************************************************/
     307                 : 
     308                 : /**
     309                 :    @brief
     310                 :    Write log entry for page that has got data added or deleted at start of page
     311                 : */
     312                 : 
     313                 : my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
     314                 :                        int move_length)
     315               0 : {
     316                 :   uint translog_parts;
     317                 :   LSN lsn;
     318                 :   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2], *log_pos;
     319               0 :   uchar *buff= ma_page->buff;
     320                 :   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
     321                 :   pgcache_page_no_t page;
     322               0 :   MARIA_HA *info= ma_page->info;
     323               0 :   DBUG_ENTER("_ma_log_prefix");
     324               0 :   DBUG_PRINT("enter", ("page: %lu  changed_length: %u  move_length: %d",
     325                 :                         (ulong) ma_page->pos, changed_length, move_length));
     326                 : 
     327               0 :   page= ma_page->pos / info->s->block_size;
     328               0 :   log_pos= log_data + FILEID_STORE_SIZE;
     329               0 :   page_store(log_pos, page);
     330               0 :   log_pos+= PAGE_STORE_SIZE;
     331                 : 
     332                 :   /* Store keypage_flag */
     333               0 :   *log_pos++= KEY_OP_SET_PAGEFLAG;
     334               0 :   *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
     335                 : 
     336               0 :   if (move_length < 0)
     337                 :   {
     338                 :     /* Delete prefix */
     339               0 :     log_pos[0]= KEY_OP_DEL_PREFIX;
     340               0 :     int2store(log_pos+1, -move_length);
     341               0 :     log_pos+= 3;
     342               0 :     if (changed_length)
     343                 :     {
     344                 :       /*
     345                 :         We don't need a KEY_OP_OFFSET as KEY_OP_DEL_PREFIX has an implicit
     346                 :         offset
     347                 :       */
     348               0 :       log_pos[0]= KEY_OP_CHANGE;
     349               0 :       int2store(log_pos+1, changed_length);
     350               0 :       log_pos+= 3;
     351                 :     }
     352                 :   }
     353                 :   else
     354                 :   {
     355                 :     /* Add prefix */
     356               0 :     DBUG_ASSERT(changed_length >0 && (int) changed_length >= move_length);
     357               0 :     log_pos[0]= KEY_OP_ADD_PREFIX;
     358               0 :     int2store(log_pos+1, move_length);
     359               0 :     int2store(log_pos+3, changed_length);
     360               0 :     log_pos+= 5;
     361                 :   }
     362                 : 
     363               0 :   translog_parts= 1;
     364               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
     365               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
     366                 :                                                          log_data);
     367               0 :   if (changed_length)
     368                 :   {
     369               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    (buff +
     370                 :                                                     info->s->keypage_header);
     371               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
     372               0 :     translog_parts= 2;
     373                 :   }
     374                 : 
     375                 : #ifdef EXTRA_DEBUG_KEY_CHANGES
     376                 :   {
     377               0 :     int page_length= ma_page->size;
     378                 :     ha_checksum crc;
     379               0 :     crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
     380               0 :     log_pos[0]= KEY_OP_CHECK;
     381               0 :     int2store(log_pos+1, page_length);
     382               0 :     int4store(log_pos+3, crc);
     383                 : 
     384               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
     385               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
     386               0 :     changed_length+= 7;
     387               0 :     translog_parts++;
     388                 :   }
     389                 : #endif
     390                 : 
     391               0 :   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
     392                 :                                     info->trn, info,
     393                 :                                     (translog_size_t)
     394                 :                                     log_array[TRANSLOG_INTERNAL_PARTS +
     395                 :                                               0].length + changed_length,
     396                 :                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
     397                 :                                     log_array, log_data, NULL));
     398                 : }
     399                 : 
     400                 : 
     401                 : /**
     402                 :    @brief
     403                 :    Write log entry for page that has got data added or deleted at end of page
     404                 : */
     405                 : 
     406                 : my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
     407               0 : {
     408                 :   LSN lsn;
     409                 :   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
     410                 :   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 10 + 7 + 2], *log_pos;
     411               0 :   uchar *buff= ma_page->buff;
     412                 :   int diff;
     413                 :   uint translog_parts, extra_length;
     414               0 :   MARIA_HA *info= ma_page->info;
     415                 :   pgcache_page_no_t page;
     416               0 :   DBUG_ENTER("_ma_log_suffix");
     417               0 :   DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
     418                 :                        (ulong) ma_page->pos, org_length, new_length));
     419                 : 
     420               0 :   page= ma_page->pos / info->s->block_size;
     421                 : 
     422               0 :   log_pos= log_data + FILEID_STORE_SIZE;
     423               0 :   page_store(log_pos, page);
     424               0 :   log_pos+= PAGE_STORE_SIZE;
     425                 : 
     426                 :   /* Store keypage_flag */
     427               0 :   *log_pos++= KEY_OP_SET_PAGEFLAG;
     428               0 :   *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
     429                 : 
     430               0 :   if ((diff= (int) (new_length - org_length)) < 0)
     431                 :   {
     432               0 :     log_pos[0]= KEY_OP_DEL_SUFFIX;
     433               0 :     int2store(log_pos+1, -diff);
     434               0 :     log_pos+= 3;
     435               0 :     translog_parts= 1;
     436               0 :     extra_length= 0;
     437                 :   }
     438                 :   else
     439                 :   {
     440               0 :     log_pos[0]= KEY_OP_ADD_SUFFIX;
     441               0 :     int2store(log_pos+1, diff);
     442               0 :     log_pos+= 3;
     443               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    buff + org_length;
     444               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 1].length= (uint) diff;
     445               0 :     translog_parts= 2;
     446               0 :     extra_length= (uint) diff;
     447                 :   }
     448                 : 
     449               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
     450               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
     451                 :                                                          log_data);
     452                 : 
     453                 : #ifdef EXTRA_DEBUG_KEY_CHANGES
     454                 :   {
     455                 :     ha_checksum crc;
     456               0 :     crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
     457               0 :     log_pos[0]= KEY_OP_CHECK;
     458               0 :     int2store(log_pos+1, new_length);
     459               0 :     int4store(log_pos+3, crc);
     460                 : 
     461               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
     462               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
     463               0 :     extra_length+= 7;
     464               0 :     translog_parts++;
     465                 :   }
     466                 : #endif
     467                 : 
     468               0 :   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
     469                 :                                     info->trn, info,
     470                 :                                     (translog_size_t)
     471                 :                                     log_array[TRANSLOG_INTERNAL_PARTS +
     472                 :                                               0].length + extra_length,
     473                 :                                     TRANSLOG_INTERNAL_PARTS + translog_parts,
     474                 :                                     log_array, log_data, NULL));
     475                 : }
     476                 : 
     477                 : 
     478                 : /**
     479                 :    @brief Log that a key was added to the page
     480                 : 
     481                 :    @param ma_page          Changed page
     482                 :    @param org_page_length  Length of data in page before key was added
     483                 : 
     484                 :    @note
     485                 :      If handle_overflow is set, then we have to protect against
     486                 :      logging changes that is outside of the page.
     487                 :      This may happen during underflow() handling where the buffer
     488                 :      in memory temporary contains more data than block_size
     489                 : */
     490                 : 
     491                 : my_bool _ma_log_add(MARIA_PAGE *ma_page,
     492                 :                     uint org_page_length, uchar *key_pos,
     493                 :                     uint changed_length, int move_length,
     494                 :                     my_bool handle_overflow __attribute__ ((unused)))
     495               0 : {
     496                 :   LSN lsn;
     497                 :   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 3 + 3 + 3 + 7 + 2];
     498                 :   uchar *log_pos;
     499               0 :   uchar *buff= ma_page->buff;
     500                 :   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
     501               0 :   MARIA_HA *info= ma_page->info;
     502               0 :   uint offset= (uint) (key_pos - buff);
     503               0 :   uint page_length= info->s->block_size - KEYPAGE_CHECKSUM_SIZE;
     504                 :   uint translog_parts;
     505                 :   pgcache_page_no_t page_pos;
     506               0 :   DBUG_ENTER("_ma_log_add");
     507               0 :   DBUG_PRINT("enter", ("page: %lu  org_page_length: %u  changed_length: %u  "
     508                 :                        "move_length: %d",
     509                 :                        (ulong) ma_page->pos, org_page_length, changed_length,
     510                 :                        move_length));
     511               0 :   DBUG_ASSERT(info->s->now_transactional);
     512                 : 
     513                 :   /*
     514                 :     Write REDO entry that contains the logical operations we need
     515                 :     to do the page
     516                 :   */
     517               0 :   log_pos= log_data + FILEID_STORE_SIZE;
     518               0 :   page_pos= ma_page->pos / info->s->block_size;
     519               0 :   page_store(log_pos, page_pos);
     520               0 :   log_pos+= PAGE_STORE_SIZE;
     521                 : 
     522                 :   /* Store keypage_flag */
     523               0 :   *log_pos++= KEY_OP_SET_PAGEFLAG;
     524               0 :   *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
     525                 : 
     526               0 :   if (org_page_length + move_length > page_length)
     527                 :   {
     528                 :     /*
     529                 :       Overflow. Cut either key or data from page end so that key fits
     530                 :       The code that splits the too big page will ignore logging any
     531                 :       data over org_page_length
     532                 :     */
     533               0 :     DBUG_ASSERT(handle_overflow);
     534               0 :     if (offset + changed_length > page_length)
     535                 :     {
     536               0 :       changed_length= page_length - offset;
     537               0 :       move_length= 0;
     538                 :     }
     539                 :     else
     540                 :     {
     541               0 :       uint diff= org_page_length + move_length - page_length;
     542               0 :       log_pos[0]= KEY_OP_DEL_SUFFIX;
     543               0 :       int2store(log_pos+1, diff);
     544               0 :       log_pos+= 3;
     545               0 :       org_page_length= page_length - move_length;
     546                 :     }
     547                 :   }
     548                 : 
     549               0 :   if (offset == org_page_length)
     550               0 :     log_pos[0]= KEY_OP_ADD_SUFFIX;
     551                 :   else
     552                 :   {
     553               0 :     log_pos[0]= KEY_OP_OFFSET;
     554               0 :     int2store(log_pos+1, offset);
     555               0 :     log_pos+= 3;
     556               0 :     if (move_length)
     557                 :     {
     558               0 :       log_pos[0]= KEY_OP_SHIFT;
     559               0 :       int2store(log_pos+1, move_length);
     560               0 :       log_pos+= 3;
     561                 :     }
     562               0 :     log_pos[0]= KEY_OP_CHANGE;
     563                 :   }
     564               0 :   int2store(log_pos+1, changed_length);
     565               0 :   log_pos+= 3;
     566               0 :   translog_parts= 2;
     567                 : 
     568               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
     569               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
     570                 :                                                          log_data);
     571               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
     572               0 :   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
     573                 : 
     574                 : #ifdef EXTRA_DEBUG_KEY_CHANGES
     575                 :   {
     576               0 :     MARIA_SHARE *share= info->s;
     577                 :     ha_checksum crc;
     578               0 :     uint save_page_length= ma_page->size;
     579               0 :     uint new_length= org_page_length + move_length;
     580               0 :     _ma_store_page_used(share, buff, new_length);
     581               0 :     crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
     582               0 :     log_pos[0]= KEY_OP_CHECK;
     583               0 :     int2store(log_pos+1, new_length);
     584               0 :     int4store(log_pos+3, crc);
     585                 : 
     586               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
     587               0 :     log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
     588               0 :     changed_length+= 7;
     589               0 :     translog_parts++;
     590               0 :     _ma_store_page_used(share, buff, save_page_length);
     591                 :   }
     592                 : #endif
     593                 : 
     594               0 :   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
     595                 :                             info->trn, info,
     596                 :                             (translog_size_t)
     597                 :                             log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
     598                 :                             changed_length,
     599                 :                             TRANSLOG_INTERNAL_PARTS + translog_parts,
     600                 :                             log_array, log_data, NULL))
     601               0 :     DBUG_RETURN(-1);
     602               0 :   DBUG_RETURN(0);
     603                 : }
     604                 : 
     605                 : 
     606                 : /****************************************************************************
     607                 :   Redo of key pages
     608                 : ****************************************************************************/
     609                 : 
     610                 : /**
     611                 :    @brief Apply LOGREC_REDO_INDEX_NEW_PAGE
     612                 : 
     613                 :    @param  info            Maria handler
     614                 :    @param  header          Header (without FILEID)
     615                 : 
     616                 :    @return Operation status
     617                 :      @retval 0      OK
     618                 :      @retval 1      Error
     619                 : */
     620                 : 
     621                 : uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
     622                 :                                    const uchar *header, uint length)
     623               0 : {
     624               0 :   pgcache_page_no_t root_page= page_korr(header);
     625               0 :   pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
     626               0 :   uint      key_nr=    key_nr_korr(header + PAGE_STORE_SIZE * 2);
     627               0 :   my_bool   page_type_flag= header[PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE];
     628                 :   enum pagecache_page_lock unlock_method;
     629                 :   enum pagecache_page_pin unpin_method;
     630                 :   MARIA_PINNED_PAGE page_link;
     631                 :   my_off_t file_size;
     632                 :   uchar *buff;
     633                 :   uint result;
     634               0 :   MARIA_SHARE *share= info->s;
     635               0 :   DBUG_ENTER("_ma_apply_redo_index_new_page");
     636               0 :   DBUG_PRINT("enter", ("root_page: %lu  free_page: %lu",
     637                 :                        (ulong) root_page, (ulong) free_page));
     638                 : 
     639                 :   /* Set header to point at key data */
     640                 : 
     641               0 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
     642                 :                           STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
     643                 :                           STATE_NOT_MOVABLE);
     644                 : 
     645               0 :   header+= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
     646               0 :   length-= PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + 1;
     647                 : 
     648               0 :   file_size= (my_off_t) (root_page + 1) * share->block_size;
     649               0 :   if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
     650                 :   {
     651                 :     /* free_page is 0 if we shouldn't set key_del */
     652               0 :     if (free_page)
     653                 :     {
     654               0 :       if (free_page != IMPOSSIBLE_PAGE_NO)
     655               0 :         share->state.key_del= (my_off_t) free_page * share->block_size;
     656                 :       else
     657               0 :         share->state.key_del= HA_OFFSET_ERROR;
     658                 :     }
     659               0 :     if (page_type_flag)     /* root page */
     660               0 :       share->state.key_root[key_nr]= file_size - share->block_size;
     661                 :   }
     662                 : 
     663               0 :   if (file_size > share->state.state.key_file_length)
     664                 :   {
     665               0 :     share->state.state.key_file_length= file_size;
     666               0 :     buff= info->keyread_buff;
     667               0 :     info->keyread_buff_used= 1;
     668               0 :     unlock_method= PAGECACHE_LOCK_WRITE;
     669               0 :     unpin_method=  PAGECACHE_PIN;
     670                 :   }
     671                 :   else
     672                 :   {
     673               0 :     if (!(buff= pagecache_read(share->pagecache, &share->kfile,
     674                 :                                root_page, 0, 0,
     675                 :                                PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
     676                 :                                &page_link.link)))
     677                 :     {
     678               0 :       if (my_errno != HA_ERR_FILE_TOO_SHORT &&
     679                 :           my_errno != HA_ERR_WRONG_CRC)
     680                 :       {
     681               0 :         result= 1;
     682               0 :         goto err;
     683                 :       }
     684               0 :       buff= pagecache_block_link_to_buffer(page_link.link);
     685                 :     }
     686               0 :     else if (lsn_korr(buff) >= lsn)
     687                 :     {
     688                 :       /* Already applied */
     689               0 :       DBUG_PRINT("info", ("Page is up to date, skipping redo"));
     690               0 :       result= 0;
     691               0 :       goto err;
     692                 :     }
     693               0 :     unlock_method= PAGECACHE_LOCK_LEFT_WRITELOCKED;
     694               0 :     unpin_method=  PAGECACHE_PIN_LEFT_PINNED;
     695                 :   }
     696                 : 
     697                 :   /* Write modified page */
     698               0 :   bzero(buff, LSN_STORE_SIZE);
     699               0 :   memcpy(buff + LSN_STORE_SIZE, header, length);
     700               0 :   bzero(buff + LSN_STORE_SIZE + length,
     701                 :         share->block_size - LSN_STORE_SIZE - KEYPAGE_CHECKSUM_SIZE - length);
     702               0 :   bfill(buff + share->block_size - KEYPAGE_CHECKSUM_SIZE,
     703                 :         KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
     704                 : 
     705               0 :   result= 0;
     706               0 :   if (unlock_method == PAGECACHE_LOCK_WRITE &&
     707                 :       pagecache_write(share->pagecache,
     708                 :                       &share->kfile, root_page, 0,
     709                 :                       buff, PAGECACHE_PLAIN_PAGE,
     710                 :                       unlock_method, unpin_method,
     711                 :                       PAGECACHE_WRITE_DELAY, &page_link.link,
     712                 :                       LSN_IMPOSSIBLE))
     713               0 :     result= 1;
     714                 : 
     715                 :   /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
     716               0 :   page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
     717               0 :   page_link.changed= 1;
     718               0 :   push_dynamic(&info->pinned_pages, (void*) &page_link);
     719               0 :   DBUG_RETURN(result);
     720                 : 
     721               0 : err:
     722               0 :   pagecache_unlock_by_link(share->pagecache, page_link.link,
     723                 :                            PAGECACHE_LOCK_WRITE_UNLOCK,
     724                 :                            PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
     725                 :                            LSN_IMPOSSIBLE, 0, FALSE);
     726               0 :   DBUG_RETURN(result);
     727                 : }
     728                 : 
     729                 : 
     730                 : /**
     731                 :    @brief Apply LOGREC_REDO_INDEX_FREE_PAGE
     732                 : 
     733                 :    @param  info            Maria handler
     734                 :    @param  header          Header (without FILEID)
     735                 : 
     736                 :    @return Operation status
     737                 :      @retval 0      OK
     738                 :      @retval 1      Error
     739                 : */
     740                 : 
     741                 : uint _ma_apply_redo_index_free_page(MARIA_HA *info,
     742                 :                                     LSN lsn,
     743                 :                                     const uchar *header)
     744               0 : {
     745               0 :   pgcache_page_no_t page= page_korr(header);
     746               0 :   pgcache_page_no_t free_page= page_korr(header + PAGE_STORE_SIZE);
     747                 :   my_off_t old_link;
     748                 :   MARIA_PINNED_PAGE page_link;
     749               0 :   MARIA_SHARE *share= info->s;
     750                 :   uchar *buff;
     751                 :   int result;
     752               0 :   DBUG_ENTER("_ma_apply_redo_index_free_page");
     753               0 :   DBUG_PRINT("enter", ("page: %lu  free_page: %lu",
     754                 :                        (ulong) page, (ulong) free_page));
     755                 : 
     756               0 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
     757                 :                           STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
     758                 :                           STATE_NOT_MOVABLE);
     759                 : 
     760               0 :   if (cmp_translog_addr(lsn, share->state.is_of_horizon) >= 0)
     761               0 :     share->state.key_del= (my_off_t) page * share->block_size;
     762                 : 
     763               0 :   old_link=  ((free_page != IMPOSSIBLE_PAGE_NO) ?
     764                 :               (my_off_t) free_page * share->block_size :
     765                 :               HA_OFFSET_ERROR);
     766               0 :   if (!(buff= pagecache_read(share->pagecache, &share->kfile,
     767                 :                              page, 0, 0,
     768                 :                              PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
     769                 :                              &page_link.link)))
     770                 :   {
     771               0 :     result= (uint) my_errno;
     772               0 :     goto err;
     773                 :   }
     774               0 :   if (lsn_korr(buff) >= lsn)
     775                 :   {
     776                 :     /* Already applied */
     777               0 :     result= 0;
     778               0 :     goto err;
     779                 :   }
     780                 :   /* Free page */
     781               0 :   bzero(buff + LSN_STORE_SIZE, share->keypage_header - LSN_STORE_SIZE);
     782               0 :   _ma_store_keynr(share, buff, (uchar) MARIA_DELETE_KEY_NR);
     783               0 :   _ma_store_page_used(share, buff, share->keypage_header + 8);
     784               0 :   mi_sizestore(buff + share->keypage_header, old_link);
     785                 : 
     786                 : #ifdef IDENTICAL_PAGES_AFTER_RECOVERY
     787                 :   {
     788                 :     bzero(buff + share->keypage_header + 8,
     789                 :           share->block_size - share->keypage_header - 8 -
     790                 :           KEYPAGE_CHECKSUM_SIZE);
     791                 :   }
     792                 : #endif
     793                 : 
     794                 :   /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
     795               0 :   page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
     796               0 :   page_link.changed= 1;
     797               0 :   push_dynamic(&info->pinned_pages, (void*) &page_link);
     798               0 :   DBUG_RETURN(0);
     799                 : 
     800               0 : err:
     801               0 :   pagecache_unlock_by_link(share->pagecache, page_link.link,
     802                 :                            PAGECACHE_LOCK_WRITE_UNLOCK,
     803                 :                            PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
     804                 :                            LSN_IMPOSSIBLE, 0, FALSE);
     805               0 :   DBUG_RETURN(result);
     806                 : }
     807                 : 
     808                 : 
     809                 : /**
     810                 :    @brief Apply LOGREC_REDO_INDEX
     811                 : 
     812                 :    @fn ma_apply_redo_index()
     813                 :    @param  info            Maria handler
     814                 :    @param  header          Header (without FILEID)
     815                 : 
     816                 :    @notes
     817                 :      Data for this part is a set of logical instructions of how to
     818                 :      construct the key page.
     819                 : 
     820                 :    Information of the layout of the components for REDO_INDEX:
     821                 : 
     822                 :    Name              Parameters (in byte) Information
     823                 :    KEY_OP_OFFSET     2                    Set position for next operations
     824                 :    KEY_OP_SHIFT      2 (signed int)       How much to shift down or up
     825                 :    KEY_OP_CHANGE     2 length,  data      Data to replace at 'pos'
     826                 :    KEY_OP_ADD_PREFIX 2 move-length        How much data should be moved up
     827                 :                      2 change-length      Data to be replaced at page start
     828                 :    KEY_OP_DEL_PREFIX 2 length             Bytes to be deleted at page start
     829                 :    KEY_OP_ADD_SUFFIX 2 length, data       Add data to end of page
     830                 :    KEY_OP_DEL_SUFFIX 2 length             Reduce page length with this
     831                 :                                           Sets position to start of page
     832                 :    KEY_OP_CHECK      6 page_length[2},CRC Used only when debugging
     833                 :    KEY_OP_COMPACT_PAGE  6 transid
     834                 :    KEY_OP_SET_PAGEFLAG  1 flag for page
     835                 : 
     836                 :    @return Operation status
     837                 :      @retval 0      OK
     838                 :      @retval 1      Error
     839                 : */
     840                 : 
     841                 : long my_counter= 0;
     842                 : 
     843                 : uint _ma_apply_redo_index(MARIA_HA *info,
     844                 :                           LSN lsn, const uchar *header, uint head_length)
     845               0 : {
     846               0 :   MARIA_SHARE *share= info->s;
     847               0 :   pgcache_page_no_t page_pos= page_korr(header);
     848                 :   MARIA_PINNED_PAGE page_link;
     849                 :   uchar *buff;
     850               0 :   const uchar *header_end= header + head_length;
     851               0 :   uint page_offset= 0, org_page_length;
     852                 :   uint nod_flag, page_length, keypage_header, keynr;
     853                 :   int result;
     854                 :   MARIA_PAGE page;
     855               0 :   DBUG_ENTER("_ma_apply_redo_index");
     856               0 :   DBUG_PRINT("enter", ("page: %lu", (ulong) page_pos));
     857                 : 
     858                 :   /* Set header to point at key data */
     859               0 :   header+= PAGE_STORE_SIZE;
     860                 : 
     861               0 :   if (!(buff= pagecache_read(share->pagecache, &share->kfile,
     862                 :                              page_pos, 0, 0,
     863                 :                              PAGECACHE_PLAIN_PAGE, PAGECACHE_LOCK_WRITE,
     864                 :                              &page_link.link)))
     865                 :   {
     866               0 :     result= 1;
     867               0 :     goto err;
     868                 :   }
     869               0 :   if (lsn_korr(buff) >= lsn)
     870                 :   {
     871                 :     /* Already applied */
     872               0 :     DBUG_PRINT("info", ("Page is up to date, skipping redo"));
     873               0 :     result= 0;
     874               0 :     goto err;
     875                 :   }
     876                 : 
     877               0 :   keynr= _ma_get_keynr(share, buff);
     878               0 :   _ma_page_setup(&page, info, share->keyinfo + keynr, page_pos, buff);
     879               0 :   nod_flag=    page.node;
     880               0 :   org_page_length= page_length= page.size;
     881                 : 
     882               0 :   keypage_header= share->keypage_header;
     883               0 :   DBUG_PRINT("redo", ("page_length: %u", page_length));
     884                 : 
     885                 :   /* Apply modifications to page */
     886                 :   do
     887                 :   {
     888               0 :     switch ((enum en_key_op) (*header++)) {
     889                 :     case KEY_OP_OFFSET:                         /* 1 */
     890               0 :       page_offset= uint2korr(header);
     891               0 :       header+= 2;
     892               0 :       DBUG_PRINT("redo", ("key_op_offset: %u", page_offset));
     893               0 :       DBUG_ASSERT(page_offset >= keypage_header && page_offset <= page_length);
     894                 :       break;
     895                 :     case KEY_OP_SHIFT:                          /* 2 */
     896                 :     {
     897               0 :       int length= sint2korr(header);
     898               0 :       header+= 2;
     899               0 :       DBUG_PRINT("redo", ("key_op_shift: %d", length));
     900               0 :       DBUG_ASSERT(page_offset != 0 && page_offset <= page_length &&
     901                 :                   page_length + length < share->block_size);
     902                 : 
     903               0 :       if (length < 0)
     904               0 :         bmove(buff + page_offset, buff + page_offset - length,
     905                 :               page_length - page_offset + length);
     906                 :       else
     907               0 :         bmove_upp(buff + page_length + length, buff + page_length,
     908                 :                   page_length - page_offset);
     909               0 :       page_length+= length;
     910               0 :       break;
     911                 :     }
     912                 :     case KEY_OP_CHANGE:                         /* 3 */
     913                 :     {
     914               0 :       uint length= uint2korr(header);
     915               0 :       DBUG_PRINT("redo", ("key_op_change: %u", length));
     916               0 :       DBUG_ASSERT(page_offset != 0 && page_offset + length <= page_length);
     917                 : 
     918               0 :       memcpy(buff + page_offset, header + 2 , length);
     919               0 :       header+= 2 + length;
     920               0 :       break;
     921                 :     }
     922                 :     case KEY_OP_ADD_PREFIX:                     /* 4 */
     923                 :     {
     924               0 :       uint insert_length= uint2korr(header);
     925               0 :       uint changed_length= uint2korr(header+2);
     926               0 :       DBUG_PRINT("redo", ("key_op_add_prefix: %u  %u",
     927                 :                           insert_length, changed_length));
     928                 : 
     929               0 :       DBUG_ASSERT(insert_length <= changed_length &&
     930                 :                   page_length + changed_length <= share->block_size);
     931                 : 
     932               0 :       bmove_upp(buff + page_length + insert_length, buff + page_length,
     933                 :                 page_length - keypage_header);
     934               0 :       memcpy(buff + keypage_header, header + 4 , changed_length);
     935               0 :       header+= 4 + changed_length;
     936               0 :       page_length+= insert_length;
     937               0 :       break;
     938                 :     }
     939                 :     case KEY_OP_DEL_PREFIX:                     /* 5 */
     940                 :     {
     941               0 :       uint length= uint2korr(header);
     942               0 :       header+= 2;
     943               0 :       DBUG_PRINT("redo", ("key_op_del_prefix: %u", length));
     944               0 :       DBUG_ASSERT(length <= page_length - keypage_header);
     945                 : 
     946               0 :       bmove(buff + keypage_header, buff + keypage_header +
     947                 :             length, page_length - keypage_header - length);
     948               0 :       page_length-= length;
     949                 : 
     950               0 :       page_offset= keypage_header;              /* Prepare for change */
     951               0 :       break;
     952                 :     }
     953                 :     case KEY_OP_ADD_SUFFIX:                     /* 6 */
     954                 :     {
     955               0 :       uint insert_length= uint2korr(header);
     956               0 :       DBUG_PRINT("redo", ("key_op_add_prefix: %u", insert_length));
     957               0 :       DBUG_ASSERT(page_length + insert_length <= share->block_size);
     958               0 :       memcpy(buff + page_length, header+2, insert_length);
     959                 : 
     960               0 :       page_length+= insert_length;
     961               0 :       header+= 2 + insert_length;
     962               0 :       break;
     963                 :     }
     964                 :     case KEY_OP_DEL_SUFFIX:                     /* 7 */
     965                 :     {
     966               0 :       uint del_length= uint2korr(header);
     967               0 :       header+= 2;
     968               0 :       DBUG_PRINT("redo", ("key_op_del_suffix: %u", del_length));
     969               0 :       DBUG_ASSERT(page_length - del_length >= keypage_header);
     970               0 :       page_length-= del_length;
     971               0 :       break;
     972                 :     }
     973                 :     case KEY_OP_CHECK:                          /* 8 */
     974                 :     {
     975                 : #ifdef EXTRA_DEBUG_KEY_CHANGES
     976                 :       uint check_page_length;
     977                 :       ha_checksum crc;
     978               0 :       check_page_length= uint2korr(header);
     979               0 :       crc=               uint4korr(header+2);
     980               0 :       _ma_store_page_used(share, buff, page_length);
     981               0 :       DBUG_ASSERT(check_page_length == page_length);
     982               0 :       if (crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
     983                 :                                       page_length - LSN_STORE_SIZE))
     984                 :       {
     985               0 :         DBUG_PRINT("error", ("page_length %u",page_length));
     986               0 :         DBUG_DUMP("KEY_OP_CHECK bad page", buff, share->block_size);
     987               0 :         DBUG_ASSERT("crc" == "failure in REDO_INDEX");
     988                 :       }
     989                 : #endif
     990               0 :       DBUG_PRINT("redo", ("key_op_check"));
     991               0 :       header+= 6;
     992               0 :       break;
     993                 :     }
     994                 :     case KEY_OP_MULTI_COPY:                     /* 9 */
     995                 :     {
     996                 :       /*
     997                 :         List of fixed-len memcpy() operations with their source located inside
     998                 :         the page. The log record's piece looks like:
     999                 :         first the length 'full_length' to be used by memcpy()
    1000                 :         then the number of bytes used by the list of (to,from) pairs
    1001                 :         then the (to,from) pairs, so we do:
    1002                 :         for (t,f) in [list of (to,from) pairs]:
    1003                 :             memcpy(t, f, full_length).
    1004                 :       */
    1005                 :       uint full_length, log_memcpy_length;
    1006                 :       const uchar *log_memcpy_end;
    1007                 : 
    1008               0 :       DBUG_PRINT("redo", ("key_op_multi_copy"));
    1009               0 :       full_length= uint2korr(header);
    1010               0 :       header+= 2;
    1011               0 :       log_memcpy_length= uint2korr(header);
    1012               0 :       header+= 2;
    1013               0 :       log_memcpy_end= header + log_memcpy_length;
    1014               0 :       DBUG_ASSERT(full_length < share->block_size);
    1015               0 :       while (header < log_memcpy_end)
    1016                 :       {
    1017                 :         uint to, from;
    1018               0 :         to= uint2korr(header);
    1019               0 :         header+= 2;
    1020               0 :         from= uint2korr(header);
    1021               0 :         header+= 2;
    1022                 :         /* "from" is a place in the existing page */
    1023               0 :         DBUG_ASSERT(max(from, to) < share->block_size);
    1024               0 :         memcpy(buff + to, buff + from, full_length);
    1025                 :       }
    1026                 :       break;
    1027                 :     }
    1028                 :     case KEY_OP_SET_PAGEFLAG:
    1029               0 :       DBUG_PRINT("redo", ("key_op_set_pageflag"));
    1030               0 :       buff[KEYPAGE_TRANSFLAG_OFFSET]= *header++;
    1031               0 :       break;
    1032                 :     case KEY_OP_COMPACT_PAGE:
    1033                 :     {
    1034               0 :       TrID transid= transid_korr(header);
    1035                 : 
    1036               0 :       DBUG_PRINT("redo", ("key_op_compact_page"));
    1037               0 :       header+= TRANSID_SIZE;
    1038               0 :       if (_ma_compact_keypage(&page, transid))
    1039                 :       {
    1040               0 :         result= 1;
    1041               0 :         goto err;
    1042                 :       }
    1043               0 :       page_length= page.size;
    1044                 :     }
    1045                 :     case KEY_OP_NONE:
    1046                 :     default:
    1047               0 :       DBUG_ASSERT(0);
    1048                 :       result= 1;
    1049                 :       goto err;
    1050                 :     }
    1051               0 :   } while (header < header_end);
    1052               0 :   DBUG_ASSERT(header == header_end);
    1053                 : 
    1054                 :   /* Write modified page */
    1055               0 :   page.size= page_length;
    1056               0 :   _ma_store_page_used(share, buff, page_length);
    1057                 : 
    1058                 :   /*
    1059                 :     Clean old stuff up. Gives us better compression of we archive things
    1060                 :     and makes things easer to debug
    1061                 :   */
    1062               0 :   if (page_length < org_page_length)
    1063               0 :     bzero(buff + page_length, org_page_length-page_length);
    1064                 : 
    1065                 :   /* Mark page to be unlocked and written at _ma_unpin_all_pages() */
    1066               0 :   page_link.unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
    1067               0 :   page_link.changed= 1;
    1068               0 :   push_dynamic(&info->pinned_pages, (void*) &page_link);
    1069               0 :   DBUG_RETURN(0);
    1070                 : 
    1071               0 : err:
    1072               0 :   pagecache_unlock_by_link(share->pagecache, page_link.link,
    1073                 :                            PAGECACHE_LOCK_WRITE_UNLOCK,
    1074                 :                            PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
    1075                 :                            LSN_IMPOSSIBLE, 0, FALSE);
    1076               0 :   if (result)
    1077               0 :     _ma_mark_file_crashed(share);
    1078               0 :   DBUG_RETURN(result);
    1079                 : }
    1080                 : 
    1081                 : 
    1082                 : /****************************************************************************
    1083                 :   Undo of key block changes
    1084                 : ****************************************************************************/
    1085                 : 
    1086                 : /**
    1087                 :    @brief Undo of insert of key (ie, delete the inserted key)
    1088                 : */
    1089                 : 
    1090                 : my_bool _ma_apply_undo_key_insert(MARIA_HA *info, LSN undo_lsn,
    1091                 :                                   const uchar *header, uint length)
    1092               0 : {
    1093                 :   LSN lsn;
    1094                 :   my_bool res;
    1095                 :   uint keynr;
    1096                 :   uchar key_buff[MARIA_MAX_KEY_BUFF];
    1097               0 :   MARIA_SHARE *share= info->s;
    1098                 :   MARIA_KEY key;
    1099                 :   my_off_t new_root;
    1100                 :   struct st_msg_to_write_hook_for_undo_key msg;
    1101               0 :   DBUG_ENTER("_ma_apply_undo_key_insert");
    1102                 : 
    1103               0 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
    1104                 :                           STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
    1105                 :                           STATE_NOT_MOVABLE);
    1106               0 :   keynr= key_nr_korr(header);
    1107               0 :   length-= KEY_NR_STORE_SIZE;
    1108                 : 
    1109                 :   /* We have to copy key as _ma_ck_real_delete() may change it */
    1110               0 :   memcpy(key_buff, header + KEY_NR_STORE_SIZE, length);
    1111               0 :   DBUG_DUMP("key_buff", key_buff, length);
    1112                 : 
    1113               0 :   new_root= share->state.key_root[keynr];
    1114                 :   /*
    1115                 :     Change the key to an internal structure.
    1116                 :     It's safe to have SEARCH_USER_KEY_HAS_TRANSID even if there isn't
    1117                 :     a transaction id, as ha_key_cmp() will stop comparison when key length
    1118                 :     is reached.
    1119                 :     For index with transid flag, the ref_length of the key is not correct.
    1120                 :     This should however be safe as long as this key is only used for
    1121                 :     comparsion against other keys (not for packing or for read-next etc as
    1122                 :     in this case we use data_length + ref_length, which is correct.
    1123                 :   */
    1124               0 :   key.keyinfo=     share->keyinfo + keynr;
    1125               0 :   key.data=        key_buff;
    1126               0 :   key.data_length= length - share->rec_reflength;
    1127               0 :   key.ref_length=  share->rec_reflength;
    1128               0 :   key.flag=        SEARCH_USER_KEY_HAS_TRANSID;
    1129                 : 
    1130               0 :   res= ((share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
    1131                 :         maria_rtree_real_delete(info, &key, &new_root) :
    1132                 :         _ma_ck_real_delete(info, &key, &new_root));
    1133               0 :   if (res)
    1134               0 :     _ma_mark_file_crashed(share);
    1135               0 :   msg.root= &share->state.key_root[keynr];
    1136               0 :   msg.value= new_root;
    1137               0 :   msg.keynr= keynr;
    1138                 : 
    1139               0 :   if (_ma_write_clr(info, undo_lsn, *msg.root == msg.value ?
    1140                 :                     LOGREC_UNDO_KEY_INSERT : LOGREC_UNDO_KEY_INSERT_WITH_ROOT,
    1141                 :                     0, 0, &lsn, (void*) &msg))
    1142               0 :     res= 1;
    1143                 : 
    1144               0 :   _ma_fast_unlock_key_del(info);
    1145               0 :   _ma_unpin_all_pages_and_finalize_row(info, lsn);
    1146               0 :   DBUG_RETURN(res);
    1147                 : }
    1148                 : 
    1149                 : 
    1150                 : /**
    1151                 :    @brief Undo of delete of key (ie, insert the deleted key)
    1152                 : 
    1153                 :    @param  with_root       If the UNDO is UNDO_KEY_DELETE_WITH_ROOT
    1154                 : */
    1155                 : 
    1156                 : my_bool _ma_apply_undo_key_delete(MARIA_HA *info, LSN undo_lsn,
    1157                 :                                   const uchar *header, uint length,
    1158                 :                                   my_bool with_root)
    1159               0 : {
    1160                 :   LSN lsn;
    1161                 :   my_bool res;
    1162                 :   uint keynr, skip_bytes;
    1163                 :   uchar key_buff[MARIA_MAX_KEY_BUFF];
    1164               0 :   MARIA_SHARE *share= info->s;
    1165                 :   my_off_t new_root;
    1166                 :   struct st_msg_to_write_hook_for_undo_key msg;
    1167                 :   MARIA_KEY key;
    1168               0 :   DBUG_ENTER("_ma_apply_undo_key_delete");
    1169                 : 
    1170               0 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_OPTIMIZED_KEYS |
    1171                 :                           STATE_NOT_SORTED_PAGES | STATE_NOT_ZEROFILLED |
    1172                 :                           STATE_NOT_MOVABLE);
    1173               0 :   keynr= key_nr_korr(header);
    1174               0 :   skip_bytes= KEY_NR_STORE_SIZE + (with_root ? PAGE_STORE_SIZE : 0);
    1175               0 :   header+= skip_bytes;
    1176               0 :   length-= skip_bytes;
    1177                 : 
    1178                 :   /* We have to copy key as _ma_ck_real_write_btree() may change it */
    1179               0 :   memcpy(key_buff, header, length);
    1180               0 :   DBUG_DUMP("key", key_buff, length);
    1181                 : 
    1182               0 :   key.keyinfo=     share->keyinfo + keynr;
    1183               0 :   key.data=        key_buff;
    1184               0 :   key.data_length= length - share->rec_reflength;
    1185               0 :   key.ref_length=  share->rec_reflength;
    1186               0 :   key.flag=        SEARCH_USER_KEY_HAS_TRANSID;
    1187                 : 
    1188               0 :   new_root= share->state.key_root[keynr];
    1189               0 :   res= (share->keyinfo[keynr].key_alg == HA_KEY_ALG_RTREE) ?
    1190                 :     maria_rtree_insert_level(info, &key, -1, &new_root) :
    1191                 :     _ma_ck_real_write_btree(info, &key, &new_root,
    1192                 :                             share->keyinfo[keynr].write_comp_flag |
    1193                 :                             key.flag);
    1194               0 :   if (res)
    1195               0 :     _ma_mark_file_crashed(share);
    1196                 : 
    1197               0 :   msg.root= &share->state.key_root[keynr];
    1198               0 :   msg.value= new_root;
    1199               0 :   msg.keynr= keynr;
    1200               0 :   if (_ma_write_clr(info, undo_lsn,
    1201                 :                     *msg.root == msg.value ?
    1202                 :                     LOGREC_UNDO_KEY_DELETE : LOGREC_UNDO_KEY_DELETE_WITH_ROOT,
    1203                 :                     0, 0, &lsn,
    1204                 :                     (void*) &msg))
    1205               0 :     res= 1;
    1206                 : 
    1207               0 :   _ma_fast_unlock_key_del(info);
    1208               0 :   _ma_unpin_all_pages_and_finalize_row(info, lsn);
    1209               0 :   DBUG_RETURN(res);
    1210                 : }
    1211                 : 
    1212                 : 
    1213                 : /****************************************************************************
    1214                 :   Handle some local variables
    1215                 : ****************************************************************************/
    1216                 : 
    1217                 : /**
    1218                 :   @brief lock key_del for other threads usage
    1219                 : 
    1220                 :   @fn     _ma_lock_key_del()
    1221                 :   @param  info            Maria handler
    1222                 :   @param  insert_at_end   Set to 1 if we are doing an insert
    1223                 : 
    1224                 :   @note
    1225                 :     To allow higher concurrency in the common case where we do inserts
    1226                 :     and we don't have any linked blocks we do the following:
    1227                 :     - Mark in info->key_del_used that we are not using key_del
    1228                 :     - Return at once (without marking key_del as used)
    1229                 : 
    1230                 :     This is safe as we in this case don't write key_del_current into
    1231                 :     the redo log and during recover we are not updating key_del.
    1232                 : 
    1233                 :   @retval 1  Use page at end of file
    1234                 :   @retval 0  Use page at share->key_del_current
    1235                 : */
    1236                 : 
    1237                 : my_bool _ma_lock_key_del(MARIA_HA *info, my_bool insert_at_end)
    1238               0 : {
    1239               0 :   MARIA_SHARE *share= info->s;
    1240                 : 
    1241                 :   /*
    1242                 :     info->key_del_used is 0 initially.
    1243                 :     If the caller needs a block (_ma_new()), we look at the free list:
    1244                 :     - looks empty? then caller will create a new block at end of file and
    1245                 :     remember (through info->key_del_used==2) that it will not change
    1246                 :     state.key_del and does not need to wake up waiters as nobody will wait for
    1247                 :     it.
    1248                 :     - non-empty? then we wait for other users of the state.key_del list to
    1249                 :     have finished, then we lock this list (through share->key_del_used==1)
    1250                 :     because we need to prevent some other thread to also read state.key_del
    1251                 :     and use the same page as ours. We remember through info->key_del_used==1
    1252                 :     that we will have to set state.key_del at unlock time and wake up
    1253                 :     waiters.
    1254                 :     If the caller wants to free a block (_ma_dispose()), "empty" and
    1255                 :     "non-empty" are treated as "non-empty" is treated above.
    1256                 :     When we are ready to unlock, we copy share->key_del_current into
    1257                 :     state.key_del. Unlocking happens when writing the UNDO log record, that
    1258                 :     can make a long lock time.
    1259                 :     Why we wrote "*looks* empty": because we are looking at state.key_del
    1260                 :     which may be slightly old (share->key_del_current may be more recent and
    1261                 :     exact): when we want a new page, we tolerate to treat "there was no free
    1262                 :     page 1 millisecond ago"  as "there is no free page". It's ok to non-pop
    1263                 :     (_ma_new(), page will be found later anyway) but it's not ok to non-push
    1264                 :     (_ma_dispose(), page would be lost).
    1265                 :     When we leave this function, info->key_del_used is always 1 or 2.
    1266                 :   */
    1267               0 :   if (info->key_del_used != 1)
    1268                 :   {
    1269               0 :     pthread_mutex_lock(&share->key_del_lock);
    1270               0 :     if (share->state.key_del == HA_OFFSET_ERROR && insert_at_end)
    1271                 :     {
    1272               0 :       pthread_mutex_unlock(&share->key_del_lock);
    1273               0 :       info->key_del_used= 2;                  /* insert-with-append */
    1274               0 :       return 1;
    1275                 :     }
    1276                 : #ifdef THREAD
    1277               0 :     while (share->key_del_used)
    1278               0 :       pthread_cond_wait(&share->key_del_cond, &share->key_del_lock);
    1279                 : #endif
    1280               0 :     info->key_del_used= 1;
    1281               0 :     share->key_del_used= 1;
    1282               0 :     share->key_del_current= share->state.key_del;
    1283               0 :     pthread_mutex_unlock(&share->key_del_lock);
    1284                 :   }
    1285               0 :   return share->key_del_current == HA_OFFSET_ERROR;
    1286                 : }
    1287                 : 
    1288                 : 
    1289                 : /**
    1290                 :   @brief copy changes to key_del and unlock it
    1291                 : 
    1292                 :   @notes
    1293                 :   In case of many threads using the maria table, we always have a lock
    1294                 :   on the translog when comming here.
    1295                 : */
    1296                 : 
    1297                 : void _ma_unlock_key_del(MARIA_HA *info)
    1298               0 : {
    1299               0 :   DBUG_ASSERT(info->key_del_used);
    1300               0 :   if (info->key_del_used == 1)                  /* Ignore insert-with-append */
    1301                 :   {
    1302               0 :     MARIA_SHARE *share= info->s;
    1303               0 :     pthread_mutex_lock(&share->key_del_lock);
    1304               0 :     share->key_del_used= 0;
    1305               0 :     share->state.key_del= share->key_del_current;
    1306               0 :     pthread_mutex_unlock(&share->key_del_lock);
    1307               0 :     pthread_cond_signal(&share->key_del_cond);
    1308                 :   }
    1309               0 :   info->key_del_used= 0;
    1310                 : }

Generated by: LTP GCOV extension version 1.4