LTP GCOV extension - code coverage report
Current view: directory - storage/maria - ma_recovery.c
Test: mtr_and_unit.info
Date: 2009-03-05 Instrumented lines: 1427
Code covered: 60.5 % Executed lines: 863

       1                 : /* Copyright (C) 2006, 2007 MySQL AB
       2                 : 
       3                 :    This program is free software; you can redistribute it and/or modify
       4                 :    it under the terms of the GNU General Public License as published by
       5                 :    the Free Software Foundation; version 2 of the License.
       6                 : 
       7                 :    This program is distributed in the hope that it will be useful,
       8                 :    but WITHOUT ANY WARRANTY; without even the implied warranty of
       9                 :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      10                 :    GNU General Public License for more details.
      11                 : 
      12                 :    You should have received a copy of the GNU General Public License
      13                 :    along with this program; if not, write to the Free Software
      14                 :    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
      15                 : 
      16                 : /*
      17                 :   WL#3072 Maria recovery
      18                 :   First version written by Guilhem Bichot on 2006-04-27.
      19                 : */
      20                 : 
      21                 : /* Here is the implementation of this module */
      22                 : 
      23                 : #include "maria_def.h"
      24                 : #include "ma_recovery.h"
      25                 : #include "ma_blockrec.h"
      26                 : #include "ma_checkpoint.h"
      27                 : #include "trnman.h"
      28                 : #include "ma_key_recover.h"
      29                 : #include "ma_recovery_util.h"
      30                 : 
      31                 : struct st_trn_for_recovery /* used only in the REDO phase */
      32                 : {
      33                 :   LSN group_start_lsn, undo_lsn, first_undo_lsn;
      34                 :   TrID long_trid;
      35                 : };
      36                 : struct st_table_for_recovery /* used in the REDO and UNDO phase */
      37                 : {
      38                 :   MARIA_HA *info;
      39                 : };
      40                 : /* Variables used by all functions of this module. Ok as single-threaded */
      41                 : static struct st_trn_for_recovery *all_active_trans;
      42                 : static struct st_table_for_recovery *all_tables;
      43                 : static struct st_dirty_page *dirty_pages_pool;
      44                 : static LSN current_group_end_lsn;
      45                 : #ifndef DBUG_OFF
      46                 : /** Current group of REDOs is about this table and only this one */
      47                 : static MARIA_HA *current_group_table;
      48                 : #endif
      49                 : static TrID max_long_trid= 0; /**< max long trid seen by REDO phase */
      50                 : static my_bool skip_DDLs; /**< if REDO phase should skip DDL records */
      51                 : /** @brief to avoid writing a checkpoint if recovery did nothing. */
      52                 : static my_bool checkpoint_useful;
      53                 : static my_bool in_redo_phase;
      54                 : static my_bool trns_created;
      55                 : static ulong skipped_undo_phase;
      56                 : static ulonglong now; /**< for tracking execution time of phases */
      57                 : static int (*save_error_handler_hook)(uint, const char *,myf);
      58                 : static uint recovery_warnings; /**< count of warnings */
      59                 : 
      60                 : #define prototype_redo_exec_hook(R)                                          \
      61                 :   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec)
      62                 : 
      63                 : #define prototype_redo_exec_hook_dummy(R)                                    \
      64                 :   static int exec_REDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec        \
      65                 :                                __attribute__ ((unused)))
      66                 : 
      67                 : #define prototype_undo_exec_hook(R)                                          \
      68                 :   static int exec_UNDO_LOGREC_ ## R(const TRANSLOG_HEADER_BUFFER *rec, TRN *trn)
      69                 : 
      70                 : prototype_redo_exec_hook(LONG_TRANSACTION_ID);
      71                 : prototype_redo_exec_hook_dummy(CHECKPOINT);
      72                 : prototype_redo_exec_hook(REDO_CREATE_TABLE);
      73                 : prototype_redo_exec_hook(REDO_RENAME_TABLE);
      74                 : prototype_redo_exec_hook(REDO_REPAIR_TABLE);
      75                 : prototype_redo_exec_hook(REDO_DROP_TABLE);
      76                 : prototype_redo_exec_hook(FILE_ID);
      77                 : prototype_redo_exec_hook(INCOMPLETE_LOG);
      78                 : prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP);
      79                 : prototype_redo_exec_hook(UNDO_BULK_INSERT);
      80                 : prototype_redo_exec_hook(IMPORTED_TABLE);
      81                 : prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
      82                 : prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL);
      83                 : prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD);
      84                 : prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD);
      85                 : prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL);
      86                 : prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
      87                 : prototype_redo_exec_hook(REDO_FREE_BLOCKS);
      88                 : prototype_redo_exec_hook(REDO_DELETE_ALL);
      89                 : prototype_redo_exec_hook(REDO_INDEX);
      90                 : prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE);
      91                 : prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE);
      92                 : prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
      93                 : prototype_redo_exec_hook(UNDO_ROW_INSERT);
      94                 : prototype_redo_exec_hook(UNDO_ROW_DELETE);
      95                 : prototype_redo_exec_hook(UNDO_ROW_UPDATE);
      96                 : prototype_redo_exec_hook(UNDO_KEY_INSERT);
      97                 : prototype_redo_exec_hook(UNDO_KEY_DELETE);
      98                 : prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
      99                 : prototype_redo_exec_hook(COMMIT);
     100                 : prototype_redo_exec_hook(CLR_END);
     101                 : prototype_redo_exec_hook(DEBUG_INFO);
     102                 : prototype_undo_exec_hook(UNDO_ROW_INSERT);
     103                 : prototype_undo_exec_hook(UNDO_ROW_DELETE);
     104                 : prototype_undo_exec_hook(UNDO_ROW_UPDATE);
     105                 : prototype_undo_exec_hook(UNDO_KEY_INSERT);
     106                 : prototype_undo_exec_hook(UNDO_KEY_DELETE);
     107                 : prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
     108                 : prototype_undo_exec_hook(UNDO_BULK_INSERT);
     109                 : 
     110                 : static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply);
     111                 : static uint end_of_redo_phase(my_bool prepare_for_undo_phase);
     112                 : static int run_undo_phase(uint uncommitted);
     113                 : static void display_record_position(const LOG_DESC *log_desc,
     114                 :                                     const TRANSLOG_HEADER_BUFFER *rec,
     115                 :                                     uint number);
     116                 : static int display_and_apply_record(const LOG_DESC *log_desc,
     117                 :                                     const TRANSLOG_HEADER_BUFFER *rec);
     118                 : static MARIA_HA *get_MARIA_HA_from_REDO_record(const
     119                 :                                                TRANSLOG_HEADER_BUFFER *rec);
     120                 : static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
     121                 :                                                TRANSLOG_HEADER_BUFFER *rec);
     122                 : static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon);
     123                 : static LSN parse_checkpoint_record(LSN lsn);
     124                 : static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
     125                 :                             LSN first_undo_lsn);
     126                 : static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id);
     127                 : static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
     128                 :                     struct st_dirty_page *dirty_page);
     129                 : static int close_all_tables(void);
     130                 : static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr);
     131                 : static void print_redo_phase_progress(TRANSLOG_ADDRESS addr);
     132                 : static void delete_all_transactions();
     133                 : 
     134                 : /** @brief global [out] buffer for translog_read_record(); never shrinks */
     135                 : static struct
     136                 : {
     137                 :   /*
     138                 :     uchar* is more adapted (less casts) than char*, thus we don't use
     139                 :     LEX_STRING.
     140                 :   */
     141                 :   uchar *str;
     142                 :   size_t length;
     143                 : } log_record_buffer;
     144                 : static void enlarge_buffer(const TRANSLOG_HEADER_BUFFER *rec)
     145         2781719 : {
     146         2781719 :   if (log_record_buffer.length < rec->record_length)
     147                 :   {
     148            2548 :     log_record_buffer.length= rec->record_length;
     149            2548 :     log_record_buffer.str= my_realloc(log_record_buffer.str,
     150                 :                                       rec->record_length,
     151                 :                                       MYF(MY_WME | MY_ALLOW_ZERO_PTR));
     152                 :   }
     153                 : }
     154                 : /** @brief Tells what kind of progress message was printed to the error log */
     155                 : static enum recovery_message_type
     156                 : {
     157                 :   REC_MSG_NONE= 0, REC_MSG_REDO, REC_MSG_UNDO, REC_MSG_FLUSH
     158                 : } recovery_message_printed;
     159                 : 
     160                 : 
     161                 : /* Hook to ensure we get nicer output if we get an error */
     162                 : 
     163                 : int maria_recover_error_handler_hook(uint error, const char *str,
     164                 :                                      myf flags)
     165               7 : {
     166               7 :   if (procent_printed)
     167                 :   {
     168               0 :     procent_printed= 0;
     169               0 :     fputc('\n', stderr);
     170               0 :     fflush(stderr);
     171                 :   }
     172               7 :   return (*save_error_handler_hook)(error, str, flags);
     173                 : }
     174                 : 
     175                 : /* Define this if you want gdb to break in some interesting situations */
     176                 : #define ALERT_USER()
     177                 : 
     178                 : static void print_preamble()
     179               7 : {
     180               7 :   ma_message_no_user(ME_JUST_INFO, "starting recovery");
     181                 : }
     182                 : 
     183                 : 
     184                 : /**
     185                 :    @brief Recovers from the last checkpoint.
     186                 : 
     187                 :    Runs the REDO phase using special structures, then sets up the playground
     188                 :    of runtime: recreates transactions inside trnman, open tables with their
     189                 :    two-byte-id mapping; takes a checkpoint and runs the UNDO phase. Closes all
     190                 :    tables.
     191                 : 
     192                 :    @return Operation status
     193                 :      @retval 0      OK
     194                 :      @retval !=0    Error
     195                 : */
     196                 : 
     197                 : int maria_recovery_from_log(void)
     198               3 : {
     199               3 :   int res= 1;
     200                 :   FILE *trace_file;
     201                 :   uint warnings_count;
     202               3 :   DBUG_ENTER("maria_recovery_from_log");
     203                 : 
     204               3 :   DBUG_ASSERT(!maria_in_recovery);
     205               3 :   maria_in_recovery= TRUE;
     206                 : 
     207                 : #ifdef EXTRA_DEBUG
     208               3 :   trace_file= fopen("maria_recovery.trace", "a+");
     209                 : #else
     210                 :   trace_file= NULL; /* no trace file for being fast */
     211                 : #endif
     212               3 :   tprint(trace_file, "TRACE of the last MARIA recovery from mysqld\n");
     213               3 :   DBUG_ASSERT(maria_pagecache->inited);
     214               3 :   res= maria_apply_log(LSN_IMPOSSIBLE, MARIA_LOG_APPLY, trace_file,
     215                 :                        TRUE, TRUE, TRUE, &warnings_count);
     216               3 :   if (!res)
     217                 :   {
     218               3 :     if (warnings_count == 0)
     219               3 :       tprint(trace_file, "SUCCESS\n");
     220                 :     else
     221               0 :       tprint(trace_file, "DOUBTFUL (%u warnings, check previous output)\n",
     222                 :              warnings_count);
     223                 :   }
     224               3 :   if (trace_file)
     225               3 :     fclose(trace_file);
     226               3 :   maria_in_recovery= FALSE;
     227               3 :   DBUG_RETURN(res);
     228                 : }
     229                 : 
     230                 : 
     231                 : /**
     232                 :    @brief Displays and/or applies the log
     233                 : 
     234                 :    @param  from_lsn        LSN from which log reading/applying should start;
     235                 :                            LSN_IMPOSSIBLE means "use last checkpoint"
     236                 :    @param  apply           how log records should be applied or not
     237                 :    @param  trace_file      trace file where progress/debug messages will go
     238                 :    @param  skip_DDLs_arg   Should DDL records (CREATE/RENAME/DROP/REPAIR)
     239                 :                            be skipped by the REDO phase or not
     240                 :    @param  take_checkpoints Should we take checkpoints or not.
     241                 :    @param[out] warnings_count Count of warnings will be put there
     242                 : 
     243                 :    @todo This trace_file thing is primitive; soon we will make it similar to
     244                 :    ma_check_print_warning() etc, and a successful recovery does not need to
     245                 :    create a trace file. But for debugging now it is useful.
     246                 : 
     247                 :    @return Operation status
     248                 :      @retval 0      OK
     249                 :      @retval !=0    Error
     250                 : */
     251                 : 
     252                 : int maria_apply_log(LSN from_lsn, enum maria_apply_log_way apply,
     253                 :                     FILE *trace_file,
     254                 :                     my_bool should_run_undo_phase, my_bool skip_DDLs_arg,
     255                 :                     my_bool take_checkpoints, uint *warnings_count)
     256             310 : {
     257             310 :   int error= 0;
     258                 :   uint uncommitted_trans;
     259                 :   ulonglong old_now;
     260             310 :   DBUG_ENTER("maria_apply_log");
     261                 : 
     262             310 :   DBUG_ASSERT(apply == MARIA_LOG_APPLY || !should_run_undo_phase);
     263             310 :   DBUG_ASSERT(!maria_multi_threaded);
     264             310 :   recovery_warnings= 0;
     265                 :   /* checkpoints can happen only if TRNs have been built */
     266             310 :   DBUG_ASSERT(should_run_undo_phase || !take_checkpoints);
     267             310 :   all_active_trans= (struct st_trn_for_recovery *)
     268                 :     my_malloc((SHORT_TRID_MAX + 1) * sizeof(struct st_trn_for_recovery),
     269                 :               MYF(MY_ZEROFILL));
     270             310 :   all_tables= (struct st_table_for_recovery *)
     271                 :     my_malloc((SHARE_ID_MAX + 1) * sizeof(struct st_table_for_recovery),
     272                 :               MYF(MY_ZEROFILL));
     273                 : 
     274             310 :   save_error_handler_hook= error_handler_hook;
     275             310 :   error_handler_hook= maria_recover_error_handler_hook;
     276                 : 
     277             310 :   if (!all_active_trans || !all_tables)
     278                 :     goto err;
     279                 : 
     280             310 :   if (take_checkpoints && ma_checkpoint_init(0))
     281             310 :     goto err;
     282                 : 
     283             310 :   recovery_message_printed= REC_MSG_NONE;
     284             310 :   checkpoint_useful= trns_created= FALSE;
     285             310 :   tracef= trace_file;
     286                 : #ifdef INSTANT_FLUSH_OF_MESSAGES
     287                 :   /* enable this for instant flush of messages to trace file */
     288                 :   setbuf(tracef, NULL);
     289                 : #endif
     290             310 :   skip_DDLs= skip_DDLs_arg;
     291             310 :   skipped_undo_phase= 0;
     292                 : 
     293             310 :   if (from_lsn == LSN_IMPOSSIBLE)
     294                 :   {
     295               3 :     if (last_checkpoint_lsn == LSN_IMPOSSIBLE)
     296                 :     {
     297               3 :       from_lsn= translog_first_lsn_in_log();
     298               3 :       if (unlikely(from_lsn == LSN_ERROR))
     299                 :         goto err;
     300                 :     }
     301                 :     else
     302                 :     {
     303               0 :       from_lsn= parse_checkpoint_record(last_checkpoint_lsn);
     304               0 :       if (from_lsn == LSN_ERROR)
     305             310 :         goto err;
     306                 :     }
     307                 :   }
     308                 : 
     309             310 :   now= my_getsystime();
     310             310 :   in_redo_phase= TRUE;
     311             310 :   if (run_redo_phase(from_lsn, apply))
     312                 :   {
     313               0 :     ma_message_no_user(0, "Redo phase failed");
     314               0 :     goto err;
     315                 :   }
     316                 : 
     317             310 :   if ((uncommitted_trans=
     318                 :        end_of_redo_phase(should_run_undo_phase)) == (uint)-1)
     319                 :   {
     320               0 :     ma_message_no_user(0, "End of redo phase failed");
     321               0 :     goto err;
     322                 :   }
     323             310 :   in_redo_phase= FALSE;
     324                 : 
     325             310 :   old_now= now;
     326             310 :   now= my_getsystime();
     327             310 :   if (recovery_message_printed == REC_MSG_REDO)
     328                 :   {
     329               7 :     double phase_took= (now - old_now)/10000000.0;
     330                 :     /*
     331                 :       Detailed progress info goes to stderr, because ma_message_no_user()
     332                 :       cannot put several messages on one line.
     333                 :     */
     334               7 :     procent_printed= 1;
     335               7 :     fprintf(stderr, " (%.1f seconds); ", phase_took);
     336               7 :     fflush(stderr);
     337                 :   }
     338                 : 
     339                 :   /**
     340                 :      REDO phase does not fill blocks' rec_lsn, so a checkpoint now would be
     341                 :      wrong: if a future recovery used it, the REDO phase would always
     342                 :      start from the checkpoint and never from before, wrongly skipping REDOs
     343                 :      (tested). Another problem is that the REDO phase uses
     344                 :      PAGECACHE_PLAIN_PAGE, while Checkpoint only collects PAGECACHE_LSN_PAGE.
     345                 : 
     346                 :      @todo fix this. pagecache_write() now can have a rec_lsn argument. And we
     347                 :      could make a function which goes through pages at end of REDO phase and
     348                 :      changes their type.
     349                 :   */
     350                 : #ifdef FIX_AND_ENABLE_LATER
     351                 :   if (take_checkpoints && checkpoint_useful)
     352                 :   {
     353                 :     /*
     354                 :       We take a checkpoint as it can save future recovery work if we crash
     355                 :       during the UNDO phase. But we don't flush pages, as UNDOs will change
     356                 :       them again probably.
     357                 :       If we wanted to take checkpoints in the middle of the REDO phase, at a
     358                 :       moment when we haven't reached the end of log so don't have exact data
     359                 :       about transactions, we could write a special checkpoint: containing only
     360                 :       the list of dirty pages, otherwise to be treated as if it was at the
     361                 :       same LSN as the last checkpoint.
     362                 :     */
     363                 :     if (ma_checkpoint_execute(CHECKPOINT_INDIRECT, FALSE))
     364                 :       goto err;
     365                 :   }
     366                 : #endif
     367                 : 
     368             310 :   if (should_run_undo_phase)
     369                 :   {
     370             310 :     if (run_undo_phase(uncommitted_trans))
     371                 :     {
     372               0 :       ma_message_no_user(0, "Undo phase failed");
     373               0 :       goto err;
     374                 :     }
     375                 :   }
     376               0 :   else if (uncommitted_trans > 0)
     377                 :   {
     378               0 :     eprint(tracef, "***WARNING: %u uncommitted transactions; some tables may"
     379                 :            " be left inconsistent!***", uncommitted_trans);
     380               0 :     recovery_warnings++;
     381                 :   }
     382                 : 
     383             310 :   if (skipped_undo_phase)
     384                 :   {
     385                 :     /*
     386                 :       We could want to print a list of tables for which UNDOs were skipped,
     387                 :       but not one line per skipped UNDO.
     388                 :     */
     389               0 :     eprint(tracef, "***WARNING: %lu UNDO records skipped in UNDO phase; some"
     390                 :            " tables may be left inconsistent!***", skipped_undo_phase);
     391               0 :     recovery_warnings++;
     392                 :   }
     393                 : 
     394             310 :   old_now= now;
     395             310 :   now= my_getsystime();
     396             310 :   if (recovery_message_printed == REC_MSG_UNDO)
     397                 :   {
     398               3 :     double phase_took= (now - old_now)/10000000.0;
     399               3 :     procent_printed= 1;
     400               3 :     fprintf(stderr, " (%.1f seconds); ", phase_took);
     401               3 :     fflush(stderr);
     402                 :   }
     403                 : 
     404                 :   /*
     405                 :     we don't use maria_panic() because it would maria_end(), and Recovery does
     406                 :     not want that (we want to keep some modules initialized for runtime).
     407                 :   */
     408             310 :   if (close_all_tables())
     409                 :   {
     410               0 :     ma_message_no_user(0, "closing of tables failed");
     411               0 :     goto err;
     412                 :   }
     413                 : 
     414             310 :   old_now= now;
     415             310 :   now= my_getsystime();
     416             310 :   if (recovery_message_printed == REC_MSG_FLUSH)
     417                 :   {
     418               7 :     double phase_took= (now - old_now)/10000000.0;
     419               7 :     procent_printed= 1;
     420               7 :     fprintf(stderr, " (%.1f seconds); ", phase_took);
     421               7 :     fflush(stderr);
     422                 :   }
     423                 : 
     424             310 :   if (take_checkpoints && checkpoint_useful)
     425                 :   {
     426                 :     /* No dirty pages, all tables are closed, no active transactions, save: */
     427               0 :     if (ma_checkpoint_execute(CHECKPOINT_FULL, FALSE))
     428                 :       goto err;
     429                 :   }
     430                 : 
     431                 :   goto end;
     432               0 : err:
     433               0 :   error= 1;
     434               0 :   tprint(tracef, "\nRecovery of tables with transaction logs FAILED\n");
     435               0 :   if (trns_created)
     436               0 :     delete_all_transactions();
     437             310 : end:
     438             310 :   error_handler_hook= save_error_handler_hook;
     439             310 :   hash_free(&all_dirty_pages);
     440             310 :   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
     441             310 :   my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
     442             310 :   dirty_pages_pool= NULL;
     443             310 :   my_free(all_tables, MYF(MY_ALLOW_ZERO_PTR));
     444             310 :   all_tables= NULL;
     445             310 :   my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
     446             310 :   all_active_trans= NULL;
     447             310 :   my_free(log_record_buffer.str, MYF(MY_ALLOW_ZERO_PTR));
     448             310 :   log_record_buffer.str= NULL;
     449             310 :   log_record_buffer.length= 0;
     450             310 :   ma_checkpoint_end();
     451             310 :   *warnings_count= recovery_warnings;
     452             310 :   if (recovery_message_printed != REC_MSG_NONE)
     453                 :   {
     454               7 :     if (procent_printed)
     455                 :     {
     456               7 :       procent_printed= 0;
     457               7 :       fprintf(stderr, "\n");
     458               7 :       fflush(stderr);
     459                 :     }
     460               7 :     if (!error)
     461               7 :       ma_message_no_user(ME_JUST_INFO, "recovery done");
     462                 :   }
     463             310 :   if (error)
     464               0 :     my_message(HA_ERR_INITIALIZATION,
     465                 :                "Maria recovery failed. Please run maria_chk -r on all maria "
     466                 :                "tables and delete all maria_log.######## files", MYF(0));
     467             310 :   procent_printed= 0;
     468                 :   /*
     469                 :     We don't cleanly close tables if we hit some error (may corrupt them by
     470                 :     flushing some wrong blocks made from wrong REDOs). It also leaves their
     471                 :     open_count>0, which ensures that --maria-recover, if used, will try to
     472                 :     repair them.
     473                 :   */
     474             310 :   DBUG_RETURN(error);
     475                 : }
     476                 : 
     477                 : 
     478                 : /* very basic info about the record's header */
     479                 : static void display_record_position(const LOG_DESC *log_desc,
     480                 :                                     const TRANSLOG_HEADER_BUFFER *rec,
     481                 :                                     uint number)
     482         7821432 : {
     483                 :   /*
     484                 :     if number==0, we're going over records which we had already seen and which
     485                 :     form a group, so we indent below the group's end record
     486                 :   */
     487         7821432 :   tprint(tracef,
     488                 :          "%sRec#%u LSN (%lu,0x%lx) short_trid %u %s(num_type:%u) len %lu\n",
     489                 :          number ? "" : "   ", number, LSN_IN_PARTS(rec->lsn),
     490                 :          rec->short_trid, log_desc->name, rec->type,
     491                 :          (ulong)rec->record_length);
     492         7821432 :   if (rec->type == LOGREC_DEBUG_INFO)
     493                 :   {
     494                 :     /* Print some extra information */
     495               0 :     (*log_desc->record_execute_in_redo_phase)(rec);
     496                 :   }
     497                 : }
     498                 : 
     499                 : 
     500                 : static int display_and_apply_record(const LOG_DESC *log_desc,
     501                 :                                     const TRANSLOG_HEADER_BUFFER *rec)
     502         3741798 : {
     503                 :   int error;
     504         3741798 :   if (log_desc->record_execute_in_redo_phase == NULL)
     505                 :   {
     506                 :     /* die on all not-yet-handled records :) */
     507               0 :     DBUG_ASSERT("one more hook" == "to write");
     508               0 :     return 1;
     509                 :   }
     510         3741798 :   if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
     511               0 :     eprint(tracef, "Got error %d when executing record %s",
     512                 :            my_errno, log_desc->name);
     513         3741798 :   return error;
     514                 : }
     515                 : 
     516                 : 
     517                 : prototype_redo_exec_hook(LONG_TRANSACTION_ID)
     518            1006 : {
     519            1006 :   uint16 sid= rec->short_trid;
     520            1006 :   TrID long_trid= all_active_trans[sid].long_trid;
     521                 :   /*
     522                 :     Any incomplete group should be of an old crash which already had a
     523                 :     recovery and thus has logged INCOMPLETE_GROUP which we must have seen.
     524                 :   */
     525            1006 :   DBUG_ASSERT(all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE);
     526            1006 :   if (long_trid != 0)
     527                 :   {
     528               0 :     LSN ulsn= all_active_trans[sid].undo_lsn;
     529                 :     /*
     530                 :       If the first record of that transaction is after 'rec', it's probably
     531                 :       because that transaction was found in the checkpoint record, and then
     532                 :       it's ok, we can forget about that transaction (we'll meet it later
     533                 :       again in the REDO phase) and replace it with the one in 'rec'.
     534                 :     */
     535               0 :     if ((ulsn != LSN_IMPOSSIBLE) &&
     536                 :         (cmp_translog_addr(ulsn, rec->lsn) < 0))
     537                 :     {
     538                 :       char llbuf[22];
     539               0 :       llstr(long_trid, llbuf);
     540               0 :       eprint(tracef, "Found an old transaction long_trid %s short_trid %u"
     541                 :              " with same short id as this new transaction, and has neither"
     542                 :              " committed nor rollback (undo_lsn: (%lu,0x%lx))",
     543                 :              llbuf, sid, LSN_IN_PARTS(ulsn));
     544               0 :       goto err;
     545                 :     }
     546                 :   }
     547            1006 :   long_trid= uint6korr(rec->header);
     548            1006 :   new_transaction(sid, long_trid, LSN_IMPOSSIBLE, LSN_IMPOSSIBLE);
     549            1006 :   goto end;
     550               0 : err:
     551                 :   ALERT_USER();
     552               0 :   return 1;
     553            1006 : end:
     554            1006 :   return 0;
     555                 : }
     556                 : 
     557                 : 
     558                 : static void new_transaction(uint16 sid, TrID long_id, LSN undo_lsn,
     559                 :                             LSN first_undo_lsn)
     560            1006 : {
     561                 :   char llbuf[22];
     562            1006 :   all_active_trans[sid].long_trid= long_id;
     563            1006 :   llstr(long_id, llbuf);
     564            1006 :   tprint(tracef, "Transaction long_trid %s short_trid %u starts,"
     565                 :          " undo_lsn (%lu,0x%lx) first_undo_lsn (%lu,0x%lx)\n",
     566                 :          llbuf, sid, LSN_IN_PARTS(undo_lsn), LSN_IN_PARTS(first_undo_lsn));
     567            1006 :   all_active_trans[sid].undo_lsn= undo_lsn;
     568            1006 :   all_active_trans[sid].first_undo_lsn= first_undo_lsn;
     569            1006 :   set_if_bigger(max_long_trid, long_id);
     570                 : }
     571                 : 
     572                 : 
     573                 : prototype_redo_exec_hook_dummy(CHECKPOINT)
     574             144 : {
     575                 :   /* the only checkpoint we care about was found via control file, ignore */
     576             144 :   return 0;
     577                 : }
     578                 : 
     579                 : 
     580                 : prototype_redo_exec_hook_dummy(INCOMPLETE_GROUP)
     581               0 : {
     582                 :   /* abortion was already made */
     583               0 :   return 0;
     584                 : }
     585                 : 
     586                 : 
     587                 : prototype_redo_exec_hook(INCOMPLETE_LOG)
     588               0 : {
     589                 :   MARIA_HA *info;
     590               0 :   if (skip_DDLs)
     591                 :   {
     592               0 :     tprint(tracef, "we skip DDLs\n");
     593               0 :     return 0;
     594                 :   }
     595               0 :   if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
     596                 :   {
     597                 :     /* no such table, don't need to warn */
     598               0 :     return 0;
     599                 :   }
     600                 :   /*
     601                 :     Example of what can go wrong when replaying DDLs:
     602                 :     CREATE TABLE t (logged); INSERT INTO t VALUES(1) (logged);
     603                 :     ALTER TABLE t ... which does
     604                 :     CREATE a temporary table #sql... (logged)
     605                 :     INSERT data from t into #sql... (not logged)
     606                 :     RENAME #sql TO t (logged)
     607                 :     Removing tables by hand and replaying the log will leave in the
     608                 :     end an empty table "t": missing records. If after the RENAME an INSERT
     609                 :     into t was done, that row had number 1 in its page, executing the
     610                 :     REDO_INSERT_ROW_HEAD on the recreated empty t will fail (assertion
     611                 :     failure in _ma_apply_redo_insert_row_head_or_tail(): new data page is
     612                 :     created whereas rownr is not 0).
     613                 :     So when the server disables logging for ALTER TABLE or CREATE SELECT, it
     614                 :     logs LOGREC_INCOMPLETE_LOG to warn maria_read_log and then the user.
     615                 : 
     616                 :     Another issue is that replaying of DDLs is not correct enough to work if
     617                 :     there was a crash during a DDL (see comment in execution of
     618                 :     REDO_RENAME_TABLE ).
     619                 :   */
     620               0 :   tprint(tracef, "***WARNING: MySQL server currently logs no records"
     621                 :          " about insertion of data by ALTER TABLE and CREATE SELECT,"
     622                 :          " as they are not necessary for recovery;"
     623                 :          " present applying of log records may well not work.***\n");
     624               0 :   recovery_warnings++;
     625               0 :   return 0;
     626                 : }
     627                 : 
     628                 : 
     629                 : prototype_redo_exec_hook(REDO_CREATE_TABLE)
     630             307 : {
     631             307 :   File dfile= -1, kfile= -1;
     632                 :   char *linkname_ptr, filename[FN_REFLEN], *name, *ptr, *ptr2,
     633                 :     *data_file_name, *index_file_name;
     634                 :   uchar *kfile_header;
     635                 :   myf create_flag;
     636                 :   uint flags;
     637             307 :   int error= 1, create_mode= O_RDWR | O_TRUNC, i;
     638             307 :   MARIA_HA *info= NULL;
     639                 :   uint kfile_size_before_extension, keystart;
     640                 : 
     641             307 :   if (skip_DDLs)
     642                 :   {
     643               0 :     tprint(tracef, "we skip DDLs\n");
     644               0 :     return 0;
     645                 :   }
     646             307 :   enlarge_buffer(rec);
     647             307 :   if (log_record_buffer.str == NULL ||
     648                 :       translog_read_record(rec->lsn, 0, rec->record_length,
     649                 :                            log_record_buffer.str, NULL) !=
     650                 :       rec->record_length)
     651                 :   {
     652               0 :     eprint(tracef, "Failed to read record");
     653               0 :     goto end;
     654                 :   }
     655             307 :   name= (char *)log_record_buffer.str;
     656                 :   /*
     657                 :     TRUNCATE TABLE and REPAIR USE_FRM call maria_create(), so below we can
     658                 :     find a REDO_CREATE_TABLE for a table which we have open, that's why we
     659                 :     need to look for any open instances and close them first.
     660                 :   */
     661             307 :   if (close_one_table(name, rec->lsn))
     662                 :   {
     663               0 :     eprint(tracef, "Table '%s' got error %d on close", name, my_errno);
     664                 :     ALERT_USER();
     665               0 :     goto end;
     666                 :   }
     667                 :   /* we try hard to get create_rename_lsn, to avoid mistakes if possible */
     668             307 :   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
     669             307 :   if (info)
     670                 :   {
     671             202 :     MARIA_SHARE *share= info->s;
     672                 :     /* check that we're not already using it */
     673             202 :     if (share->reopen != 1)
     674                 :     {
     675               0 :       eprint(tracef, "Table '%s is already open (reopen=%u)",
     676                 :              name, share->reopen);
     677                 :       ALERT_USER();
     678               0 :       goto end;
     679                 :     }
     680             202 :     DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
     681             202 :     if (!share->base.born_transactional)
     682                 :     {
     683                 :       /*
     684                 :         could be that transactional table was later dropped, and a non-trans
     685                 :         one was renamed to its name, thus create_rename_lsn is 0 and should
     686                 :         not be trusted.
     687                 :       */
     688               0 :       tprint(tracef, "Table '%s' is not transactional, ignoring creation\n",
     689                 :              name);
     690                 :       ALERT_USER();
     691               0 :       error= 0;
     692               0 :       goto end;
     693                 :     }
     694             202 :     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
     695                 :     {
     696             202 :       tprint(tracef, "Table '%s' has create_rename_lsn (%lu,0x%lx) more "
     697                 :              "recent than record, ignoring creation",
     698                 :              name, LSN_IN_PARTS(share->state.create_rename_lsn));
     699             202 :       error= 0;
     700             202 :       goto end;
     701                 :     }
     702               0 :     if (maria_is_crashed(info))
     703                 :     {
     704               0 :       eprint(tracef, "Table '%s' is crashed, can't recreate it", name);
     705                 :       ALERT_USER();
     706               0 :       goto end;
     707                 :     }
     708               0 :     maria_close(info);
     709               0 :     info= NULL;
     710                 :   }
     711                 :   else /* one or two files absent, or header corrupted... */
     712             105 :     tprint(tracef, "Table '%s' can't be opened, probably does not exist\n",
     713                 :            name);
     714                 :   /* if does not exist, or is older, overwrite it */
     715             105 :   ptr= name + strlen(name) + 1;
     716             105 :   if ((flags= ptr[0] ? HA_DONT_TOUCH_DATA : 0))
     717               0 :     tprint(tracef, ", we will only touch index file");
     718             105 :   ptr++;
     719             105 :   kfile_size_before_extension= uint2korr(ptr);
     720             105 :   ptr+= 2;
     721             105 :   keystart= uint2korr(ptr);
     722             105 :   ptr+= 2;
     723             105 :   kfile_header= (uchar *)ptr;
     724             105 :   ptr+= kfile_size_before_extension;
     725                 :   /* set header lsns */
     726             105 :   ptr2= (char *) kfile_header + sizeof(info->s->state.header) +
     727                 :     MARIA_FILE_CREATE_RENAME_LSN_OFFSET;
     728             420 :   for (i= 0; i<3; i++)
     729                 :   {
     730             315 :     lsn_store(ptr2, rec->lsn);
     731             315 :     ptr2+= LSN_STORE_SIZE;
     732                 :   }
     733             105 :   data_file_name= ptr;
     734             105 :   ptr+= strlen(data_file_name) + 1;
     735             105 :   index_file_name= ptr;
     736             105 :   ptr+= strlen(index_file_name) + 1;
     737                 :   /** @todo handle symlinks */
     738             105 :   if (data_file_name[0] || index_file_name[0])
     739                 :   {
     740               0 :     eprint(tracef, "Table '%s' DATA|INDEX DIRECTORY clauses are not handled",
     741                 :            name);
     742               0 :     goto end;
     743                 :   }
     744             105 :   fn_format(filename, name, "", MARIA_NAME_IEXT,
     745                 :             (MY_UNPACK_FILENAME |
     746                 :              (flags & HA_DONT_TOUCH_DATA) ? MY_RETURN_REAL_PATH : 0) |
     747                 :             MY_APPEND_EXT);
     748             105 :   linkname_ptr= NULL;
     749             105 :   create_flag= MY_DELETE_OLD;
     750             105 :   tprint(tracef, "Table '%s' creating as '%s'\n", name, filename);
     751             105 :   if ((kfile= my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
     752                 :                                      MYF(MY_WME|create_flag))) < 0)
     753                 :   {
     754               0 :     eprint(tracef, "Failed to create index file");
     755               0 :     goto end;
     756                 :   }
     757             105 :   if (my_pwrite(kfile, kfile_header,
     758                 :                 kfile_size_before_extension, 0, MYF(MY_NABP|MY_WME)) ||
     759                 :       my_chsize(kfile, keystart, 0, MYF(MY_WME)))
     760                 :   {
     761               0 :     eprint(tracef, "Failed to write to index file");
     762               0 :     goto end;
     763                 :   }
     764             105 :   if (!(flags & HA_DONT_TOUCH_DATA))
     765                 :   {
     766             105 :     fn_format(filename,name,"", MARIA_NAME_DEXT,
     767                 :               MY_UNPACK_FILENAME | MY_APPEND_EXT);
     768             105 :     linkname_ptr= NULL;
     769             105 :     create_flag=MY_DELETE_OLD;
     770             105 :     if (((dfile=
     771                 :           my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
     772                 :                                  MYF(MY_WME | create_flag))) < 0) ||
     773                 :         my_close(dfile, MYF(MY_WME)))
     774                 :     {
     775               0 :       eprint(tracef, "Failed to create data file");
     776               0 :       goto end;
     777                 :     }
     778                 :     /*
     779                 :       we now have an empty data file. To be able to
     780                 :       _ma_initialize_data_file() we need some pieces of the share to be
     781                 :       correctly filled. So we just open the table (fortunately, an empty
     782                 :       data file does not preclude this).
     783                 :     */
     784             105 :     if (((info= maria_open(name, O_RDONLY, 0)) == NULL) ||
     785                 :         _ma_initialize_data_file(info->s, info->dfile.file))
     786                 :     {
     787               0 :       eprint(tracef, "Failed to open new table or write to data file");
     788               0 :       goto end;
     789                 :     }
     790                 :   }
     791             105 :   error= 0;
     792             307 : end:
     793             307 :   if (kfile >= 0)
     794             105 :     error|= my_close(kfile, MYF(MY_WME));
     795             307 :   if (info != NULL)
     796             307 :     error|= maria_close(info);
     797             307 :   return error;
     798                 : }
     799                 : 
     800                 : 
     801                 : prototype_redo_exec_hook(REDO_RENAME_TABLE)
     802               0 : {
     803                 :   char *old_name, *new_name;
     804               0 :   int error= 1;
     805               0 :   MARIA_HA *info= NULL;
     806               0 :   if (skip_DDLs)
     807                 :   {
     808               0 :     tprint(tracef, "we skip DDLs\n");
     809               0 :     return 0;
     810                 :   }
     811               0 :   enlarge_buffer(rec);
     812               0 :   if (log_record_buffer.str == NULL ||
     813                 :       translog_read_record(rec->lsn, 0, rec->record_length,
     814                 :                            log_record_buffer.str, NULL) !=
     815                 :       rec->record_length)
     816                 :   {
     817               0 :     eprint(tracef, "Failed to read record");
     818               0 :     goto end;
     819                 :   }
     820               0 :   old_name= (char *)log_record_buffer.str;
     821               0 :   new_name= old_name + strlen(old_name) + 1;
     822               0 :   tprint(tracef, "Table '%s' to rename to '%s'; old-name table ", old_name,
     823                 :          new_name);
     824                 :   /*
     825                 :     Here is why we skip CREATE/DROP/RENAME when doing a recovery from
     826                 :     ha_maria (whereas we do when called from maria_read_log). Consider:
     827                 :     CREATE TABLE t;
     828                 :     RENAME TABLE t to u;
     829                 :     DROP TABLE u;
     830                 :     RENAME TABLE v to u; # crash between index rename and data rename.
     831                 :     And do a Recovery (not removing tables beforehand).
     832                 :     Recovery replays CREATE, then RENAME: the maria_open("t") works,
     833                 :     maria_open("u") does not (no data file) so table "u" is considered
     834                 :     inexistent and so maria_rename() is done which overwrites u's index file,
     835                 :     which is lost. Ok, the data file (v.MAD) is still available, but only a
     836                 :     REPAIR USE_FRM can rebuild the index, which is unsafe and downtime.
     837                 :     So it is preferrable to not execute RENAME, and leave the "mess" of files,
     838                 :     rather than possibly destroy a file. DBA will manually rename files.
     839                 :     A safe recovery method would probably require checking the existence of
     840                 :     the index file and of the data file separately (not via maria_open()), and
     841                 :     maybe also to store a create_rename_lsn in the data file too
     842                 :     For now, all we risk is to leave the mess (half-renamed files) left by the
     843                 :     crash. We however sync files and directories at each file rename. The SQL
     844                 :     layer is anyway not crash-safe for DDLs (except the repartioning-related
     845                 :     ones).
     846                 :     We replay DDLs in maria_read_log to be able to recreate tables from
     847                 :     scratch. It means that "maria_read_log -a" should not be used on a
     848                 :     database which just crashed during a DDL. And also ALTER TABLE does not
     849                 :     log insertions of records into the temporary table, so replaying may
     850                 :     fail (grep for INCOMPLETE_LOG in files).
     851                 :   */
     852               0 :   info= maria_open(old_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
     853               0 :   if (info)
     854                 :   {
     855               0 :     MARIA_SHARE *share= info->s;
     856               0 :     if (!share->base.born_transactional)
     857                 :     {
     858               0 :       tprint(tracef, ", is not transactional, ignoring renaming\n");
     859                 :       ALERT_USER();
     860               0 :       error= 0;
     861               0 :       goto end;
     862                 :     }
     863               0 :     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
     864                 :     {
     865               0 :       tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
     866                 :              " record, ignoring renaming",
     867                 :              LSN_IN_PARTS(share->state.create_rename_lsn));
     868               0 :       error= 0;
     869               0 :       goto end;
     870                 :     }
     871               0 :     if (maria_is_crashed(info))
     872                 :     {
     873               0 :       tprint(tracef, ", is crashed, can't rename it");
     874                 :       ALERT_USER();
     875               0 :       goto end;
     876                 :     }
     877               0 :     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
     878                 :         maria_close(info))
     879                 :       goto end;
     880               0 :     info= NULL;
     881               0 :     tprint(tracef, ", is ok for renaming; new-name table ");
     882                 :   }
     883                 :   else /* one or two files absent, or header corrupted... */
     884                 :   {
     885               0 :     tprint(tracef, ", can't be opened, probably does not exist");
     886               0 :     error= 0;
     887               0 :     goto end;
     888                 :   }
     889                 :   /*
     890                 :     We must also check the create_rename_lsn of the 'new_name' table if it
     891                 :     exists: otherwise we may, with our rename which overwrites, destroy
     892                 :     another table. For example:
     893                 :     CREATE TABLE t;
     894                 :     RENAME t to u;
     895                 :     DROP TABLE u;
     896                 :     RENAME v to u; # v is an old table, its creation/insertions not in log
     897                 :     And start executing the log (without removing tables beforehand): creates
     898                 :     t, renames it to u (if not testing create_rename_lsn) thus overwriting
     899                 :     old-named v, drops u, and we are stuck, we have lost data.
     900                 :   */
     901               0 :   info= maria_open(new_name, O_RDONLY, HA_OPEN_FOR_REPAIR);
     902               0 :   if (info)
     903                 :   {
     904               0 :     MARIA_SHARE *share= info->s;
     905                 :     /* We should not have open instances on this table. */
     906               0 :     if (share->reopen != 1)
     907                 :     {
     908               0 :       tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
     909                 :       ALERT_USER();
     910               0 :       goto end;
     911                 :     }
     912               0 :     if (!share->base.born_transactional)
     913                 :     {
     914               0 :       tprint(tracef, ", is not transactional, ignoring renaming\n");
     915                 :       ALERT_USER();
     916               0 :       goto drop;
     917                 :     }
     918               0 :     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
     919                 :     {
     920               0 :       tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
     921                 :              " record, ignoring renaming",
     922                 :              LSN_IN_PARTS(share->state.create_rename_lsn));
     923                 :       /*
     924                 :         We have to drop the old_name table. Consider:
     925                 :         CREATE TABLE t;
     926                 :         CREATE TABLE v;
     927                 :         RENAME TABLE t to u;
     928                 :         DROP TABLE u;
     929                 :         RENAME TABLE v to u;
     930                 :         and apply the log without removing tables beforehand. t will be
     931                 :         created, v too; in REDO_RENAME u will be more recent, but we still
     932                 :         have to drop t otherwise it stays.
     933                 :       */
     934               0 :       goto drop;
     935                 :     }
     936               0 :     if (maria_is_crashed(info))
     937                 :     {
     938               0 :       tprint(tracef, ", is crashed, can't rename it");
     939                 :       ALERT_USER();
     940               0 :       goto end;
     941                 :     }
     942               0 :     if (maria_close(info))
     943               0 :       goto end;
     944               0 :     info= NULL;
     945                 :     /* abnormal situation */
     946               0 :     tprint(tracef, ", exists but is older than record, can't rename it");
     947               0 :     goto end;
     948                 :   }
     949                 :   else /* one or two files absent, or header corrupted... */
     950               0 :     tprint(tracef, ", can't be opened, probably does not exist");
     951               0 :   tprint(tracef, ", renaming '%s'", old_name);
     952               0 :   if (maria_rename(old_name, new_name))
     953                 :   {
     954               0 :     eprint(tracef, "Failed to rename table");
     955               0 :     goto end;
     956                 :   }
     957               0 :   info= maria_open(new_name, O_RDONLY, 0);
     958               0 :   if (info == NULL)
     959                 :   {
     960               0 :     eprint(tracef, "Failed to open renamed table");
     961               0 :     goto end;
     962                 :   }
     963               0 :   if (_ma_update_state_lsns(info->s, rec->lsn, info->s->state.create_trid,
     964                 :                             TRUE, TRUE))
     965               0 :     goto end;
     966               0 :   if (maria_close(info))
     967               0 :     goto end;
     968               0 :   info= NULL;
     969               0 :   error= 0;
     970               0 :   goto end;
     971               0 : drop:
     972               0 :   tprint(tracef, ", only dropping '%s'", old_name);
     973               0 :   if (maria_delete_table(old_name))
     974                 :   {
     975               0 :     eprint(tracef, "Failed to drop table");
     976               0 :     goto end;
     977                 :   }
     978               0 :   error= 0;
     979                 :   goto end;
     980               0 : end:
     981               0 :   tprint(tracef, "\n");
     982               0 :   if (info != NULL)
     983               0 :     error|= maria_close(info);
     984               0 :   return error;
     985                 : }
     986                 : 
     987                 : 
     988                 : /*
     989                 :   The record may come from REPAIR, ALTER TABLE ENABLE KEYS, OPTIMIZE.
     990                 : */
     991                 : prototype_redo_exec_hook(REDO_REPAIR_TABLE)
     992               0 : {
     993               0 :   int error= 1;
     994                 :   MARIA_HA *info;
     995                 :   HA_CHECK param;
     996                 :   char *name;
     997                 :   my_bool quick_repair;
     998               0 :   DBUG_ENTER("exec_REDO_LOGREC_REDO_REPAIR_TABLE");
     999                 : 
    1000               0 :   if (skip_DDLs)
    1001                 :   {
    1002                 :     /*
    1003                 :       REPAIR is not exactly a DDL, but it manipulates files without logging
    1004                 :       insertions into them.
    1005                 :     */
    1006               0 :     tprint(tracef, "we skip DDLs\n");
    1007               0 :     DBUG_RETURN(0);
    1008                 :   }
    1009               0 :   if ((info= get_MARIA_HA_from_REDO_record(rec)) == NULL)
    1010               0 :     DBUG_RETURN(0);
    1011                 : 
    1012                 :   /*
    1013                 :     Otherwise, the mapping is newer than the table, and our record is newer
    1014                 :     than the mapping, so we can repair.
    1015                 :   */
    1016               0 :   tprint(tracef, "   repairing...\n");
    1017                 : 
    1018               0 :   maria_chk_init(&param);
    1019               0 :   param.isam_file_name= name= info->s->open_file_name.str;
    1020               0 :   param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
    1021               0 :   param.tmpdir= maria_tmpdir;
    1022               0 :   DBUG_ASSERT(maria_tmpdir);
    1023                 : 
    1024               0 :   info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);
    1025               0 :   quick_repair= test(param.testflag & T_QUICK);
    1026                 : 
    1027               0 :   if (param.testflag & T_REP_PARALLEL)
    1028                 :   {
    1029               0 :     if (maria_repair_parallel(&param, info, name, quick_repair))
    1030                 :       goto end;
    1031                 :   }
    1032               0 :   else if (param.testflag & T_REP_BY_SORT)
    1033                 :   {
    1034               0 :     if (maria_repair_by_sort(&param, info, name, quick_repair))
    1035                 :       goto end;
    1036                 :   }
    1037               0 :   else if (maria_repair(&param, info, name, quick_repair))
    1038               0 :     goto end;
    1039                 : 
    1040               0 :   if (_ma_update_state_lsns(info->s, rec->lsn, trnman_get_min_safe_trid(),
    1041                 :                             TRUE, !(param.testflag & T_NO_CREATE_RENAME_LSN)))
    1042               0 :     goto end;
    1043               0 :   error= 0;
    1044                 : 
    1045               0 : end:
    1046               0 :   DBUG_RETURN(error);
    1047                 : }
    1048                 : 
    1049                 : 
    1050                 : prototype_redo_exec_hook(REDO_DROP_TABLE)
    1051               0 : {
    1052                 :   char *name;
    1053               0 :   int error= 1;
    1054                 :   MARIA_HA *info;
    1055               0 :   if (skip_DDLs)
    1056                 :   {
    1057               0 :     tprint(tracef, "we skip DDLs\n");
    1058               0 :     return 0;
    1059                 :   }
    1060               0 :   enlarge_buffer(rec);
    1061               0 :   if (log_record_buffer.str == NULL ||
    1062                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1063                 :                            log_record_buffer.str, NULL) !=
    1064                 :       rec->record_length)
    1065                 :   {
    1066               0 :     eprint(tracef, "Failed to read record");
    1067               0 :     return 1;
    1068                 :   }
    1069               0 :   name= (char *)log_record_buffer.str;
    1070               0 :   tprint(tracef, "Table '%s'", name);
    1071               0 :   info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
    1072               0 :   if (info)
    1073                 :   {
    1074               0 :     MARIA_SHARE *share= info->s;
    1075               0 :     if (!share->base.born_transactional)
    1076                 :     {
    1077               0 :       tprint(tracef, ", is not transactional, ignoring removal\n");
    1078                 :       ALERT_USER();
    1079               0 :       error= 0;
    1080               0 :       goto end;
    1081                 :     }
    1082               0 :     if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
    1083                 :     {
    1084               0 :       tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
    1085                 :              " record, ignoring removal",
    1086                 :              LSN_IN_PARTS(share->state.create_rename_lsn));
    1087               0 :       error= 0;
    1088               0 :       goto end;
    1089                 :     }
    1090               0 :     if (maria_is_crashed(info))
    1091                 :     {
    1092               0 :       tprint(tracef, ", is crashed, can't drop it");
    1093                 :       ALERT_USER();
    1094               0 :       goto end;
    1095                 :     }
    1096               0 :     if (close_one_table(info->s->open_file_name.str, rec->lsn) ||
    1097                 :         maria_close(info))
    1098                 :       goto end;
    1099               0 :     info= NULL;
    1100                 :     /* if it is older, or its header is corrupted, drop it */
    1101               0 :     tprint(tracef, ", dropping '%s'", name);
    1102               0 :     if (maria_delete_table(name))
    1103                 :     {
    1104               0 :       eprint(tracef, "Failed to drop table");
    1105               0 :       goto end;
    1106                 :     }
    1107                 :   }
    1108                 :   else /* one or two files absent, or header corrupted... */
    1109               0 :     tprint(tracef,", can't be opened, probably does not exist");
    1110               0 :   error= 0;
    1111               0 : end:
    1112               0 :   tprint(tracef, "\n");
    1113               0 :   if (info != NULL)
    1114               0 :     error|= maria_close(info);
    1115               0 :   return error;
    1116                 : }
    1117                 : 
    1118                 : 
    1119                 : prototype_redo_exec_hook(FILE_ID)
    1120             383 : {
    1121                 :   uint16 sid;
    1122             383 :   int error= 1;
    1123                 :   const char *name;
    1124                 :   MARIA_HA *info;
    1125             383 :   DBUG_ENTER("exec_REDO_LOGREC_FILE_ID");
    1126                 : 
    1127             383 :   if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
    1128                 :   {
    1129                 :     /*
    1130                 :       If that mapping was still true at checkpoint time, it was found in
    1131                 :       checkpoint record, no need to recreate it. If that mapping had ended at
    1132                 :       checkpoint time (table was closed or repaired), a flush and force
    1133                 :       happened and so mapping is not needed.
    1134                 :     */
    1135               0 :     tprint(tracef, "ignoring because before checkpoint\n");
    1136               0 :     DBUG_RETURN(0);
    1137                 :   }
    1138                 : 
    1139             383 :   enlarge_buffer(rec);
    1140             383 :   if (log_record_buffer.str == NULL ||
    1141                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1142                 :                            log_record_buffer.str, NULL) !=
    1143                 :        rec->record_length)
    1144                 :   {
    1145               0 :     eprint(tracef, "Failed to read record");
    1146               0 :     goto end;
    1147                 :   }
    1148             383 :   sid= fileid_korr(log_record_buffer.str);
    1149             383 :   info= all_tables[sid].info;
    1150             383 :   if (info != NULL)
    1151                 :   {
    1152              76 :     tprint(tracef, "   Closing table '%s'\n", info->s->open_file_name.str);
    1153              76 :     prepare_table_for_close(info, rec->lsn);
    1154              76 :     if (maria_close(info))
    1155                 :     {
    1156               0 :       eprint(tracef, "Failed to close table");
    1157               0 :       goto end;
    1158                 :     }
    1159              76 :     all_tables[sid].info= NULL;
    1160                 :   }
    1161             383 :   name= (char *)log_record_buffer.str + FILEID_STORE_SIZE;
    1162             383 :   if (new_table(sid, name, rec->lsn))
    1163             383 :     goto end;
    1164             383 :   error= 0;
    1165             383 : end:
    1166             383 :   DBUG_RETURN(error);
    1167                 : }
    1168                 : 
    1169                 : 
    1170                 : static int new_table(uint16 sid, const char *name, LSN lsn_of_file_id)
    1171             383 : {
    1172                 :   /*
    1173                 :     -1 (skip table): close table and return 0;
    1174                 :     1 (error): close table and return 1;
    1175                 :     0 (success): leave table open and return 0.
    1176                 :   */
    1177             383 :   int error= 1;
    1178                 :   MARIA_HA *info;
    1179                 :   MARIA_SHARE *share;
    1180                 :   my_off_t dfile_len, kfile_len;
    1181                 : 
    1182             383 :   checkpoint_useful= TRUE;
    1183             383 :   if ((name == NULL) || (name[0] == 0))
    1184                 :   {
    1185                 :     /*
    1186                 :       we didn't use DBUG_ASSERT() because such record corruption could
    1187                 :       silently pass in the "info == NULL" test below.
    1188                 :     */
    1189               0 :     tprint(tracef, ", record is corrupted");
    1190               0 :     info= NULL;
    1191               0 :     goto end;
    1192                 :   }
    1193             383 :   tprint(tracef, "Table '%s', id %u", name, sid);
    1194             383 :   info= maria_open(name, O_RDWR, HA_OPEN_FOR_REPAIR);
    1195             383 :   if (info == NULL)
    1196                 :   {
    1197               0 :     tprint(tracef, ", is absent (must have been dropped later?)"
    1198                 :            " or its header is so corrupted that we cannot open it;"
    1199                 :            " we skip it");
    1200               0 :     error= 0;
    1201               0 :     goto end;
    1202                 :   }
    1203             383 :   share= info->s;
    1204                 :   /* check that we're not already using it */
    1205             383 :   if (share->reopen != 1)
    1206                 :   {
    1207               0 :     tprint(tracef, ", is already open (reopen=%u)\n", share->reopen);
    1208                 :     /*
    1209                 :       It could be that we have in the log
    1210                 :       FILE_ID(t1,10) ... (t1 was flushed) ... FILE_ID(t1,12);
    1211                 :     */
    1212               0 :     if (close_one_table(share->open_file_name.str, lsn_of_file_id))
    1213             383 :       goto end;
    1214                 :   }
    1215             383 :   if (!share->base.born_transactional)
    1216                 :   {
    1217                 :     /*
    1218                 :       This can happen if one converts a transactional table to a
    1219                 :       not transactional table
    1220                 :     */
    1221               0 :     tprint(tracef, ", is not transactional.  Ignoring open request");
    1222               0 :     error= -1;
    1223               0 :     goto end;
    1224                 :   }
    1225             383 :   if (cmp_translog_addr(lsn_of_file_id, share->state.create_rename_lsn) <= 0)
    1226                 :   {
    1227               0 :     tprint(tracef, ", has create_rename_lsn (%lu,0x%lx) more recent than"
    1228                 :            " LOGREC_FILE_ID's LSN (%lu,0x%lx), ignoring open request",
    1229                 :            LSN_IN_PARTS(share->state.create_rename_lsn),
    1230                 :            LSN_IN_PARTS(lsn_of_file_id));
    1231               0 :     error= -1;
    1232               0 :     goto end;
    1233                 :     /*
    1234                 :       Note that we tested that before testing corruption; a recent corrupted
    1235                 :       table is not a blocker for the present log record.
    1236                 :     */
    1237                 :   }
    1238             383 :   if (maria_is_crashed(info))
    1239                 :   {
    1240               0 :     eprint(tracef, "Table '%s' is crashed, skipping it. Please repair it with"
    1241                 :            " maria_chk -r", share->open_file_name.str);
    1242               0 :     error= -1; /* not fatal, try with other tables */
    1243               0 :     goto end;
    1244                 :     /*
    1245                 :       Note that if a first recovery fails to apply a REDO, it marks the table
    1246                 :       corrupted and stops the entire recovery. A second recovery will find the
    1247                 :       table is marked corrupted and skip it (and thus possibly handle other
    1248                 :       tables).
    1249                 :     */
    1250                 :   }
    1251                 :   /* don't log any records for this work */
    1252             383 :   _ma_tmp_disable_logging_for_table(info, FALSE);
    1253                 :   /* execution of some REDO records relies on data_file_length */
    1254             383 :   dfile_len= my_seek(info->dfile.file, 0, SEEK_END, MYF(MY_WME));
    1255             383 :   kfile_len= my_seek(info->s->kfile.file, 0, SEEK_END, MYF(MY_WME));
    1256             383 :   if ((dfile_len == MY_FILEPOS_ERROR) ||
    1257                 :       (kfile_len == MY_FILEPOS_ERROR))
    1258                 :   {
    1259               0 :     tprint(tracef, ", length unknown\n");
    1260               0 :     goto end;
    1261                 :   }
    1262             383 :   if (share->state.state.data_file_length != dfile_len)
    1263                 :   {
    1264              51 :     tprint(tracef, ", has wrong state.data_file_length (fixing it)");
    1265              51 :     share->state.state.data_file_length= dfile_len;
    1266                 :   }
    1267             383 :   if (share->state.state.key_file_length != kfile_len)
    1268                 :   {
    1269              51 :     tprint(tracef, ", has wrong state.key_file_length (fixing it)");
    1270              51 :     share->state.state.key_file_length= kfile_len;
    1271                 :   }
    1272             383 :   if ((dfile_len % share->block_size) || (kfile_len % share->block_size))
    1273                 :   {
    1274               0 :     tprint(tracef, ", has too short last page\n");
    1275                 :     /* Recovery will fix this, no error */
    1276                 :     ALERT_USER();
    1277                 :   }
    1278                 :   /*
    1279                 :     This LSN serves in this situation; assume log is:
    1280                 :     FILE_ID(6->"t2") REDO_INSERT(6) FILE_ID(6->"t1") CHECKPOINT(6->"t1")
    1281                 :     then crash, checkpoint record is parsed and opens "t1" with id 6; assume
    1282                 :     REDO phase starts from the REDO_INSERT above: it will wrongly try to
    1283                 :     update a page of "t1". With this LSN below, REDO_INSERT can realize the
    1284                 :     mapping is newer than itself, and not execute.
    1285                 :     Same example is possible with UNDO_INSERT (update of the state).
    1286                 :   */
    1287             383 :   info->s->lsn_of_file_id= lsn_of_file_id;
    1288             383 :   all_tables[sid].info= info;
    1289                 :   /*
    1290                 :     We don't set info->s->id, it would be useless (no logging in REDO phase);
    1291                 :     if you change that, know that some records in REDO phase call
    1292                 :     _ma_update_state_lsns() which resets info->s->id.
    1293                 :   */
    1294             383 :   tprint(tracef, ", opened");
    1295             383 :   error= 0;
    1296             383 : end:
    1297             383 :   tprint(tracef, "\n");
    1298             383 :   if (error)
    1299                 :   {
    1300               0 :     if (info != NULL)
    1301               0 :       maria_close(info);
    1302               0 :     if (error == -1)
    1303               0 :       error= 0;
    1304                 :   }
    1305             383 :   return error;
    1306                 : }
    1307                 : 
    1308                 : /*
    1309                 :   NOTE
    1310                 :   This is called for REDO_INSERT_ROW_HEAD and READ_NEW_ROW_HEAD
    1311                 : */
    1312                 : 
    1313                 : prototype_redo_exec_hook(REDO_INSERT_ROW_HEAD)
    1314          155659 : {
    1315          155659 :   int error= 1;
    1316          155659 :   uchar *buff= NULL;
    1317          155659 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1318          155659 :   if (info == NULL)
    1319                 :   {
    1320                 :     /*
    1321                 :       Table was skipped at open time (because later dropped/renamed, not
    1322                 :       transactional, or create_rename_lsn newer than LOGREC_FILE_ID), or
    1323                 :       record was skipped due to skip_redo_lsn; it is not an error.
    1324                 :     */
    1325               0 :     return 0;
    1326                 :   }
    1327                 :   /*
    1328                 :     Note that REDO is per page, we still consider it if its transaction
    1329                 :     committed long ago and is unknown.
    1330                 :   */
    1331                 :   /*
    1332                 :     If REDO's LSN is > page's LSN (read from disk), we are going to modify the
    1333                 :     page and change its LSN. The normal runtime code stores the UNDO's LSN
    1334                 :     into the page. Here storing the REDO's LSN (rec->lsn) would work
    1335                 :     (we are not writing to the log here, so don't have to "flush up to UNDO's
    1336                 :     LSN"). But in a test scenario where we do updates at runtime, then remove
    1337                 :     tables, apply the log and check that this results in the same table as at
    1338                 :     runtime, putting the same LSN as runtime had done will decrease
    1339                 :     differences. So we use the UNDO's LSN which is current_group_end_lsn.
    1340                 :   */
    1341          155659 :   enlarge_buffer(rec);
    1342          155659 :   if (log_record_buffer.str == NULL)
    1343                 :   {
    1344               0 :     eprint(tracef, "Failed to read allocate buffer for record");
    1345               0 :     goto end;
    1346                 :   }
    1347          155659 :   if (translog_read_record(rec->lsn, 0, rec->record_length,
    1348                 :                            log_record_buffer.str, NULL) !=
    1349                 :       rec->record_length)
    1350                 :   {
    1351               0 :     eprint(tracef, "Failed to read record");
    1352               0 :     goto end;
    1353                 :   }
    1354          155659 :   buff= log_record_buffer.str;
    1355          155659 :   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
    1356                 :                                              HEAD_PAGE,
    1357                 :                                              (rec->type ==
    1358                 :                                               LOGREC_REDO_NEW_ROW_HEAD),
    1359                 :                                              buff + FILEID_STORE_SIZE,
    1360                 :                                              buff +
    1361                 :                                              FILEID_STORE_SIZE +
    1362                 :                                              PAGE_STORE_SIZE +
    1363                 :                                              DIRPOS_STORE_SIZE,
    1364                 :                                              rec->record_length -
    1365                 :                                              (FILEID_STORE_SIZE +
    1366                 :                                               PAGE_STORE_SIZE +
    1367                 :                                               DIRPOS_STORE_SIZE)))
    1368          155659 :     goto end;
    1369          155659 :   error= 0;
    1370          155659 : end:
    1371          155659 :   return error;
    1372                 : }
    1373                 : 
    1374                 : /*
    1375                 :   NOTE
    1376                 :   This is called for REDO_INSERT_ROW_TAIL and READ_NEW_ROW_TAIL
    1377                 : */
    1378                 : 
    1379                 : prototype_redo_exec_hook(REDO_INSERT_ROW_TAIL)
    1380            5132 : {
    1381            5132 :   int error= 1;
    1382                 :   uchar *buff;
    1383            5132 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1384            5132 :   if (info == NULL)
    1385               0 :     return 0;
    1386            5132 :   enlarge_buffer(rec);
    1387            5132 :   if (log_record_buffer.str == NULL ||
    1388                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1389                 :                            log_record_buffer.str, NULL) !=
    1390                 :        rec->record_length)
    1391                 :   {
    1392               0 :     eprint(tracef, "Failed to read record");
    1393               0 :     goto end;
    1394                 :   }
    1395            5132 :   buff= log_record_buffer.str;
    1396            5132 :   if (_ma_apply_redo_insert_row_head_or_tail(info, current_group_end_lsn,
    1397                 :                                              TAIL_PAGE,
    1398                 :                                              (rec->type ==
    1399                 :                                               LOGREC_REDO_NEW_ROW_TAIL),
    1400                 :                                              buff + FILEID_STORE_SIZE,
    1401                 :                                              buff +
    1402                 :                                              FILEID_STORE_SIZE +
    1403                 :                                              PAGE_STORE_SIZE +
    1404                 :                                              DIRPOS_STORE_SIZE,
    1405                 :                                              rec->record_length -
    1406                 :                                              (FILEID_STORE_SIZE +
    1407                 :                                               PAGE_STORE_SIZE +
    1408                 :                                               DIRPOS_STORE_SIZE)))
    1409            5132 :     goto end;
    1410            5132 :   error= 0;
    1411                 : 
    1412            5132 : end:
    1413            5132 :   return error;
    1414                 : }
    1415                 : 
    1416                 : 
    1417                 : prototype_redo_exec_hook(REDO_INSERT_ROW_BLOBS)
    1418            6269 : {
    1419            6269 :   int error= 1;
    1420                 :   uchar *buff;
    1421                 :   uint number_of_blobs, number_of_ranges;
    1422                 :   pgcache_page_no_t first_page, last_page;
    1423                 :   char llbuf1[22], llbuf2[22];
    1424            6269 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1425            6269 :   if (info == NULL)
    1426               0 :     return 0;
    1427            6269 :   enlarge_buffer(rec);
    1428            6269 :   if (log_record_buffer.str == NULL ||
    1429                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1430                 :                            log_record_buffer.str, NULL) !=
    1431                 :        rec->record_length)
    1432                 :   {
    1433               0 :     eprint(tracef, "Failed to read record");
    1434               0 :     goto end;
    1435                 :   }
    1436            6269 :   buff= log_record_buffer.str;
    1437            6269 :   if (_ma_apply_redo_insert_row_blobs(info, current_group_end_lsn,
    1438                 :                                       buff, rec->lsn, &number_of_blobs,
    1439                 :                                       &number_of_ranges,
    1440                 :                                       &first_page, &last_page))
    1441            6269 :     goto end;
    1442            6269 :   llstr(first_page, llbuf1);
    1443            6269 :   llstr(last_page, llbuf2);
    1444            6269 :   tprint(tracef, " %u blobs %u ranges, first page %s last %s",
    1445                 :          number_of_blobs, number_of_ranges, llbuf1, llbuf2);
    1446                 : 
    1447            6269 :   error= 0;
    1448                 : 
    1449            6269 : end:
    1450            6269 :   tprint(tracef, " \n");
    1451            6269 :   return error;
    1452                 : }
    1453                 : 
    1454                 : 
    1455                 : prototype_redo_exec_hook(REDO_PURGE_ROW_HEAD)
    1456          116727 : {
    1457          116727 :   int error= 1;
    1458          116727 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1459          116727 :   if (info == NULL)
    1460               0 :     return 0;
    1461          116727 :   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
    1462                 :                                             HEAD_PAGE,
    1463                 :                                             rec->header + FILEID_STORE_SIZE))
    1464          116727 :     goto end;
    1465          116727 :   error= 0;
    1466          116727 : end:
    1467          116727 :   return error;
    1468                 : }
    1469                 : 
    1470                 : 
    1471                 : prototype_redo_exec_hook(REDO_PURGE_ROW_TAIL)
    1472            2167 : {
    1473            2167 :   int error= 1;
    1474            2167 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1475            2167 :   if (info == NULL)
    1476               0 :     return 0;
    1477            2167 :   if (_ma_apply_redo_purge_row_head_or_tail(info, current_group_end_lsn,
    1478                 :                                             TAIL_PAGE,
    1479                 :                                             rec->header + FILEID_STORE_SIZE))
    1480            2167 :     goto end;
    1481            2167 :   error= 0;
    1482            2167 : end:
    1483            2167 :   return error;
    1484                 : }
    1485                 : 
    1486                 : 
    1487                 : prototype_redo_exec_hook(REDO_FREE_BLOCKS)
    1488            5544 : {
    1489            5544 :   int error= 1;
    1490                 :   uchar *buff;
    1491            5544 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1492            5544 :   if (info == NULL)
    1493               0 :     return 0;
    1494            5544 :   enlarge_buffer(rec);
    1495                 : 
    1496            5544 :   if (log_record_buffer.str == NULL ||
    1497                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1498                 :                            log_record_buffer.str, NULL) !=
    1499                 :        rec->record_length)
    1500                 :   {
    1501               0 :     eprint(tracef, "Failed to read record");
    1502               0 :     goto end;
    1503                 :   }
    1504                 : 
    1505            5544 :   buff= log_record_buffer.str;
    1506            5544 :   if (_ma_apply_redo_free_blocks(info, current_group_end_lsn,
    1507                 :                                  buff + FILEID_STORE_SIZE))
    1508            5544 :     goto end;
    1509            5544 :   error= 0;
    1510            5544 : end:
    1511            5544 :   return error;
    1512                 : }
    1513                 : 
    1514                 : 
    1515                 : prototype_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL)
    1516            4624 : {
    1517            4624 :   int error= 1;
    1518            4624 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1519            4624 :   if (info == NULL)
    1520               0 :     return 0;
    1521                 : 
    1522            4624 :   if (_ma_apply_redo_free_head_or_tail(info, current_group_end_lsn,
    1523                 :                                        rec->header + FILEID_STORE_SIZE))
    1524            4624 :     goto end;
    1525            4624 :   error= 0;
    1526            4624 : end:
    1527            4624 :   return error;
    1528                 : }
    1529                 : 
    1530                 : 
    1531                 : prototype_redo_exec_hook(REDO_DELETE_ALL)
    1532               0 : {
    1533               0 :   int error= 1;
    1534               0 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1535               0 :   if (info == NULL)
    1536               0 :     return 0;
    1537               0 :   tprint(tracef, "   deleting all %lu rows\n",
    1538                 :          (ulong)info->s->state.state.records);
    1539               0 :   if (maria_delete_all_rows(info))
    1540               0 :     goto end;
    1541               0 :   error= 0;
    1542               0 : end:
    1543               0 :   return error;
    1544                 : }
    1545                 : 
    1546                 : 
    1547                 : prototype_redo_exec_hook(REDO_INDEX)
    1548         1586696 : {
    1549         1586696 :   int error= 1;
    1550         1586696 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1551         1586696 :   if (info == NULL)
    1552               0 :     return 0;
    1553         1586696 :   enlarge_buffer(rec);
    1554                 : 
    1555         1586696 :   if (log_record_buffer.str == NULL ||
    1556                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1557                 :                            log_record_buffer.str, NULL) !=
    1558                 :        rec->record_length)
    1559                 :   {
    1560               0 :     eprint(tracef, "Failed to read record");
    1561               0 :     goto end;
    1562                 :   }
    1563                 : 
    1564         1586696 :   if (_ma_apply_redo_index(info, current_group_end_lsn,
    1565                 :                            log_record_buffer.str + FILEID_STORE_SIZE,
    1566                 :                            rec->record_length - FILEID_STORE_SIZE))
    1567         1586696 :     goto end;
    1568         1586696 :   error= 0;
    1569         1586696 : end:
    1570         1586696 :   return error;
    1571                 : }
    1572                 : 
    1573                 : prototype_redo_exec_hook(REDO_INDEX_NEW_PAGE)
    1574            4190 : {
    1575            4190 :   int error= 1;
    1576            4190 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1577            4190 :   if (info == NULL)
    1578               0 :     return 0;
    1579            4190 :   enlarge_buffer(rec);
    1580                 : 
    1581            4190 :   if (log_record_buffer.str == NULL ||
    1582                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1583                 :                            log_record_buffer.str, NULL) !=
    1584                 :        rec->record_length)
    1585                 :   {
    1586               0 :     eprint(tracef, "Failed to read record");
    1587               0 :     goto end;
    1588                 :   }
    1589                 : 
    1590            4190 :   if (_ma_apply_redo_index_new_page(info, current_group_end_lsn,
    1591                 :                                     log_record_buffer.str + FILEID_STORE_SIZE,
    1592                 :                                     rec->record_length - FILEID_STORE_SIZE))
    1593            4190 :     goto end;
    1594            4190 :   error= 0;
    1595            4190 : end:
    1596            4190 :   return error;
    1597                 : }
    1598                 : 
    1599                 : 
    1600                 : prototype_redo_exec_hook(REDO_INDEX_FREE_PAGE)
    1601            3310 : {
    1602            3310 :   int error= 1;
    1603            3310 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1604            3310 :   if (info == NULL)
    1605               0 :     return 0;
    1606                 : 
    1607            3310 :   if (_ma_apply_redo_index_free_page(info, current_group_end_lsn,
    1608                 :                                      rec->header + FILEID_STORE_SIZE))
    1609            3310 :     goto end;
    1610            3310 :   error= 0;
    1611            3310 : end:
    1612            3310 :   return error;
    1613                 : }
    1614                 : 
    1615                 : 
    1616                 : prototype_redo_exec_hook(REDO_BITMAP_NEW_PAGE)
    1617               0 : {
    1618               0 :   int error= 1;
    1619               0 :   MARIA_HA *info= get_MARIA_HA_from_REDO_record(rec);
    1620               0 :   if (info == NULL)
    1621               0 :     return 0;
    1622               0 :   enlarge_buffer(rec);
    1623                 : 
    1624               0 :   if (log_record_buffer.str == NULL ||
    1625                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1626                 :                            log_record_buffer.str, NULL) !=
    1627                 :        rec->record_length)
    1628                 :   {
    1629               0 :     eprint(tracef, "Failed to read record");
    1630               0 :     goto end;
    1631                 :   }
    1632                 : 
    1633               0 :   if (cmp_translog_addr(rec->lsn, checkpoint_start) >= 0)
    1634                 :   {
    1635                 :     /*
    1636                 :       Record is potentially after the bitmap flush made by Checkpoint, so has
    1637                 :       to be replayed. It may overwrite a more recent state but that will be
    1638                 :       corrected by all upcoming REDOs for data pages.
    1639                 :       If the condition is false, we must not apply the record: it is unneeded
    1640                 :       and nocive (may not be corrected as REDOs can be skipped due to
    1641                 :       dirty-pages list).
    1642                 :     */
    1643               0 :     if (_ma_apply_redo_bitmap_new_page(info, current_group_end_lsn,
    1644                 :                                        log_record_buffer.str +
    1645                 :                                        FILEID_STORE_SIZE))
    1646               0 :       goto end;
    1647                 :   }
    1648               0 :   error= 0;
    1649               0 : end:
    1650               0 :   return error;
    1651                 : }
    1652                 : 
    1653                 : 
    1654                 : static inline void set_undo_lsn_for_active_trans(uint16 short_trid, LSN lsn)
    1655         1849188 : {
    1656         1849188 :   if (all_active_trans[short_trid].long_trid == 0)
    1657                 :   {
    1658                 :     /* transaction unknown, so has committed or fully rolled back long ago */
    1659         1849188 :     return;
    1660                 :   }
    1661         1849188 :   all_active_trans[short_trid].undo_lsn= lsn;
    1662         1849188 :   if (all_active_trans[short_trid].first_undo_lsn == LSN_IMPOSSIBLE)
    1663             699 :     all_active_trans[short_trid].first_undo_lsn= lsn;
    1664                 : }
    1665                 : 
    1666                 : 
    1667                 : prototype_redo_exec_hook(UNDO_ROW_INSERT)
    1668          116056 : {
    1669          116056 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    1670                 :   MARIA_SHARE *share;
    1671                 : 
    1672          116056 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1673          116056 :   if (info == NULL)
    1674                 :   {
    1675                 :     /*
    1676                 :       Note that we set undo_lsn anyway. So that if the transaction is later
    1677                 :       rolled back, this UNDO is tried for execution and we get a warning (as
    1678                 :       it would then be abnormal that info==NULL).
    1679                 :     */
    1680               0 :     return 0;
    1681                 :   }
    1682          116056 :   share= info->s;
    1683          116056 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1684                 :   {
    1685           76074 :     tprint(tracef, "   state has LSN (%lu,0x%lx) older than record, updating"
    1686                 :            " rows' count\n", LSN_IN_PARTS(share->state.is_of_horizon));
    1687           76074 :     share->state.state.records++;
    1688           76074 :     if (share->calc_checksum)
    1689                 :     {
    1690                 :       uchar buff[HA_CHECKSUM_STORE_SIZE];
    1691           76074 :       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1692                 :                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
    1693                 :                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
    1694                 :           HA_CHECKSUM_STORE_SIZE)
    1695                 :       {
    1696               0 :         eprint(tracef, "Failed to read record");
    1697               0 :         return 1;
    1698                 :       }
    1699           76074 :       share->state.state.checksum+= ha_checksum_korr(buff);
    1700                 :     }
    1701           76074 :     info->s->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    1702                 :                               STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    1703                 :   }
    1704          116056 :   tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
    1705                 :   /* Unpin all pages, stamp them with UNDO's LSN */
    1706          116056 :   _ma_unpin_all_pages(info, rec->lsn);
    1707          116056 :   return 0;
    1708                 : }
    1709                 : 
    1710                 : 
    1711                 : prototype_redo_exec_hook(UNDO_ROW_DELETE)
    1712           47212 : {
    1713           47212 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    1714                 :   MARIA_SHARE *share;
    1715                 : 
    1716           47212 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1717           47212 :   if (info == NULL)
    1718               0 :     return 0;
    1719           47212 :   share= info->s;
    1720           47212 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1721                 :   {
    1722           30563 :     tprint(tracef, "   state older than record\n");
    1723           30563 :     share->state.state.records--;
    1724           30563 :     if (share->calc_checksum)
    1725                 :     {
    1726                 :       uchar buff[HA_CHECKSUM_STORE_SIZE];
    1727           30563 :       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1728                 :                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE + 2 +
    1729                 :                                PAGERANGE_STORE_SIZE,
    1730                 :                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
    1731                 :           HA_CHECKSUM_STORE_SIZE)
    1732                 :       {
    1733               0 :         eprint(tracef, "Failed to read record");
    1734               0 :         return 1;
    1735                 :       }
    1736           30563 :       share->state.state.checksum+= ha_checksum_korr(buff);
    1737                 :     }
    1738           30563 :     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    1739                 :                             STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
    1740                 :                             STATE_NOT_MOVABLE);
    1741                 :   }
    1742           47212 :   tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
    1743           47212 :   _ma_unpin_all_pages(info, rec->lsn);
    1744           47212 :   return 0;
    1745                 : }
    1746                 : 
    1747                 : 
    1748                 : prototype_redo_exec_hook(UNDO_ROW_UPDATE)
    1749            8433 : {
    1750            8433 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    1751                 :   MARIA_SHARE *share;
    1752                 : 
    1753            8433 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1754            8433 :   if (info == NULL)
    1755               0 :     return 0;
    1756            8433 :   share= info->s;
    1757            8433 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1758                 :   {
    1759            4604 :     if (share->calc_checksum)
    1760                 :     {
    1761                 :       uchar buff[HA_CHECKSUM_STORE_SIZE];
    1762            4604 :       if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1763                 :                                PAGE_STORE_SIZE + DIRPOS_STORE_SIZE,
    1764                 :                                HA_CHECKSUM_STORE_SIZE, buff, NULL) !=
    1765                 :           HA_CHECKSUM_STORE_SIZE)
    1766                 :       {
    1767               0 :         eprint(tracef, "Failed to read record");
    1768               0 :         return 1;
    1769                 :       }
    1770            4604 :       share->state.state.checksum+= ha_checksum_korr(buff);
    1771                 :     }
    1772            4604 :     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    1773                 :                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    1774                 :   }
    1775            8433 :   _ma_unpin_all_pages(info, rec->lsn);
    1776            8433 :   return 0;
    1777                 : }
    1778                 : 
    1779                 : 
    1780                 : prototype_redo_exec_hook(UNDO_KEY_INSERT)
    1781          665596 : {
    1782                 :   MARIA_HA *info;
    1783                 :   MARIA_SHARE *share;
    1784                 : 
    1785          665596 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1786          665596 :   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
    1787               0 :     return 0;
    1788          665596 :   share= info->s;
    1789          665596 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1790                 :   {
    1791          441300 :     const uchar *ptr= rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE;
    1792          441300 :     uint keynr= key_nr_korr(ptr);
    1793          441300 :     if (share->base.auto_key == (keynr + 1)) /* it's auto-increment */
    1794                 :     {
    1795               0 :       const HA_KEYSEG *keyseg= info->s->keyinfo[keynr].seg;
    1796                 :       ulonglong value;
    1797                 :       char llbuf[22];
    1798                 :       uchar *to;
    1799               0 :       tprint(tracef, "   state older than record\n");
    1800                 :       /* we read the record to find the auto_increment value */
    1801               0 :       enlarge_buffer(rec);
    1802               0 :       if (log_record_buffer.str == NULL ||
    1803                 :           translog_read_record(rec->lsn, 0, rec->record_length,
    1804                 :                                log_record_buffer.str, NULL) !=
    1805                 :           rec->record_length)
    1806                 :       {
    1807               0 :         eprint(tracef, "Failed to read record");
    1808               0 :         return 1;
    1809                 :       }
    1810               0 :       to= log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1811                 :         KEY_NR_STORE_SIZE;
    1812               0 :       if (keyseg->flag & HA_SWAP_KEY)
    1813                 :       {
    1814                 :         /* We put key from log record to "data record" packing format... */
    1815                 :         uchar reversed[MARIA_MAX_KEY_BUFF];
    1816               0 :         uchar *key_ptr= to;
    1817               0 :         uchar *key_end= key_ptr + keyseg->length;
    1818               0 :         to= reversed + keyseg->length;
    1819                 :         do
    1820                 :         {
    1821               0 :           *--to= *key_ptr++;
    1822               0 :         } while (key_ptr != key_end);
    1823                 :         /* ... so that we can read it with: */
    1824                 :       }
    1825               0 :       value= ma_retrieve_auto_increment(to, keyseg->type);
    1826               0 :       set_if_bigger(share->state.auto_increment, value);
    1827               0 :       llstr(share->state.auto_increment, llbuf);
    1828               0 :       tprint(tracef, "   auto-inc %s\n", llbuf);
    1829                 :     }
    1830                 :   }
    1831          665596 :   _ma_unpin_all_pages(info, rec->lsn);
    1832          665596 :   return 0;
    1833                 : }
    1834                 : 
    1835                 : 
    1836                 : prototype_redo_exec_hook(UNDO_KEY_DELETE)
    1837          334034 : {
    1838                 :   MARIA_HA *info;
    1839                 : 
    1840          334034 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1841          334034 :   if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
    1842               0 :     return 0;
    1843          334034 :   _ma_unpin_all_pages(info, rec->lsn);
    1844          334034 :   return 0;
    1845                 : }
    1846                 : 
    1847                 : 
    1848                 : prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
    1849             446 : {
    1850             446 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    1851                 :   MARIA_SHARE *share;
    1852                 : 
    1853             446 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1854             446 :   if (info == NULL)
    1855               0 :     return 0;
    1856             446 :   share= info->s;
    1857             446 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1858                 :   {
    1859                 :     uint key_nr;
    1860                 :     my_off_t page;
    1861             292 :     key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
    1862             292 :     page=  page_korr(rec->header +  LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1863                 :                      KEY_NR_STORE_SIZE);
    1864             292 :     share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
    1865                 :                                     HA_OFFSET_ERROR :
    1866                 :                                     page * share->block_size);
    1867                 :   }
    1868             446 :   _ma_unpin_all_pages(info, rec->lsn);
    1869             446 :   return 0;
    1870                 : }
    1871                 : 
    1872                 : 
    1873                 : prototype_redo_exec_hook(UNDO_BULK_INSERT)
    1874               0 : {
    1875                 :   /*
    1876                 :     If the repair finished it wrote and sync the state. If it didn't finish,
    1877                 :     we are going to empty the table and that will fix the state.
    1878                 :   */
    1879               0 :   set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
    1880               0 :   return 0;
    1881                 : }
    1882                 : 
    1883                 : 
    1884                 : prototype_redo_exec_hook(IMPORTED_TABLE)
    1885               0 : {
    1886                 :   char *name;
    1887               0 :   enlarge_buffer(rec);
    1888               0 :   if (log_record_buffer.str == NULL ||
    1889                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1890                 :                            log_record_buffer.str, NULL) !=
    1891                 :       rec->record_length)
    1892                 :   {
    1893               0 :     eprint(tracef, "Failed to read record");
    1894               0 :     return 1;
    1895                 :   }
    1896               0 :   name= (char *)log_record_buffer.str;
    1897               0 :   tprint(tracef, "Table '%s' was imported (auto-zerofilled) in this Maria instance\n", name);
    1898               0 :   return 0;
    1899                 : }
    1900                 : 
    1901                 : 
    1902                 : prototype_redo_exec_hook(COMMIT)
    1903             452 : {
    1904             452 :   uint16 sid= rec->short_trid;
    1905             452 :   TrID long_trid= all_active_trans[sid].long_trid;
    1906                 :   char llbuf[22];
    1907             452 :   if (long_trid == 0)
    1908                 :   {
    1909               0 :     tprint(tracef, "We don't know about transaction with short_trid %u;"
    1910                 :            "it probably committed long ago, forget it\n", sid);
    1911               0 :     bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
    1912               0 :     return 0;
    1913                 :   }
    1914             452 :   llstr(long_trid, llbuf);
    1915             452 :   tprint(tracef, "Transaction long_trid %s short_trid %u committed\n",
    1916                 :          llbuf, sid);
    1917             452 :   bzero(&all_active_trans[sid], sizeof(all_active_trans[sid]));
    1918                 : #ifdef MARIA_VERSIONING
    1919                 :   /*
    1920                 :     if real recovery:
    1921                 :     transaction was committed, move it to some separate list for later
    1922                 :     purging (but don't purge now! purging may have been started before, we
    1923                 :     may find REDO_PURGE records soon).
    1924                 :   */
    1925                 : #endif
    1926             452 :   return 0;
    1927                 : }
    1928                 : 
    1929                 : prototype_redo_exec_hook(CLR_END)
    1930          677411 : {
    1931          677411 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    1932                 :   MARIA_SHARE *share;
    1933                 :   LSN previous_undo_lsn;
    1934                 :   enum translog_record_type undone_record_type;
    1935                 :   const LOG_DESC *log_desc;
    1936          677411 :   my_bool row_entry= 0;
    1937                 :   uchar *logpos;
    1938          677411 :   DBUG_ENTER("exec_REDO_LOGREC_CLR_END");
    1939                 : 
    1940          677411 :   previous_undo_lsn= lsn_korr(rec->header);
    1941          677411 :   undone_record_type=
    1942                 :     clr_type_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
    1943          677411 :   log_desc= &log_record_type_descriptor[undone_record_type];
    1944                 : 
    1945          677411 :   set_undo_lsn_for_active_trans(rec->short_trid, previous_undo_lsn);
    1946          677411 :   if (info == NULL)
    1947               0 :     DBUG_RETURN(0);
    1948          677411 :   share= info->s;
    1949          677411 :   tprint(tracef, "   CLR_END was about %s, undo_lsn now LSN (%lu,0x%lx)\n",
    1950                 :          log_desc->name, LSN_IN_PARTS(previous_undo_lsn));
    1951                 : 
    1952          677411 :   enlarge_buffer(rec);
    1953          677411 :   if (log_record_buffer.str == NULL ||
    1954                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    1955                 :                            log_record_buffer.str, NULL) !=
    1956                 :       rec->record_length)
    1957                 :   {
    1958               0 :     eprint(tracef, "Failed to read record");
    1959               0 :     return 1;
    1960                 :   }
    1961          677411 :   logpos= (log_record_buffer.str + LSN_STORE_SIZE + FILEID_STORE_SIZE +
    1962                 :            CLR_TYPE_STORE_SIZE);
    1963                 : 
    1964          677411 :   if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
    1965                 :   {
    1966          346764 :     tprint(tracef, "   state older than record\n");
    1967          346764 :     switch (undone_record_type) {
    1968                 :     case LOGREC_UNDO_ROW_DELETE:
    1969           13914 :       row_entry= 1;
    1970           13914 :       share->state.state.records++;
    1971           13914 :       break;
    1972                 :     case LOGREC_UNDO_ROW_INSERT:
    1973           38455 :       share->state.state.records--;
    1974           38455 :       share->state.changed|= STATE_NOT_OPTIMIZED_ROWS;
    1975           38455 :       row_entry= 1;
    1976           38455 :       break;
    1977                 :     case LOGREC_UNDO_ROW_UPDATE:
    1978            1671 :       row_entry= 1;
    1979            1671 :       break;
    1980                 :     case LOGREC_UNDO_KEY_INSERT:
    1981                 :     case LOGREC_UNDO_KEY_DELETE:
    1982                 :       break;
    1983                 :     case LOGREC_UNDO_KEY_INSERT_WITH_ROOT:
    1984                 :     case LOGREC_UNDO_KEY_DELETE_WITH_ROOT:
    1985                 :     {
    1986                 :       uint key_nr;
    1987                 :       my_off_t page;
    1988             507 :       key_nr= key_nr_korr(logpos);
    1989             507 :       page=  page_korr(logpos + KEY_NR_STORE_SIZE);
    1990             507 :       share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
    1991                 :                                       HA_OFFSET_ERROR :
    1992                 :                                       page * share->block_size);
    1993             507 :       break;
    1994                 :     }
    1995                 :     case LOGREC_UNDO_BULK_INSERT:
    1996                 :       break;
    1997                 :     default:
    1998               0 :       DBUG_ASSERT(0);
    1999                 :     }
    2000          346764 :     if (row_entry && share->calc_checksum)
    2001           54040 :       share->state.state.checksum+= ha_checksum_korr(logpos);
    2002          346764 :     share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2003                 :                             STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2004                 :   }
    2005          677411 :   if (row_entry)
    2006           54040 :     tprint(tracef, "   rows' count %lu\n", (ulong)share->state.state.records);
    2007          677411 :   _ma_unpin_all_pages(info, rec->lsn);
    2008          677411 :   DBUG_RETURN(0);
    2009                 : }
    2010                 : 
    2011                 : 
    2012                 : /**
    2013                 :    Hock to print debug information (like MySQL query)
    2014                 : */
    2015                 : 
    2016                 : prototype_redo_exec_hook(DEBUG_INFO)
    2017               0 : {
    2018                 :   uchar *data;
    2019                 :   enum translog_debug_info_type debug_info;
    2020                 : 
    2021               0 :   enlarge_buffer(rec);
    2022               0 :   if (log_record_buffer.str == NULL ||
    2023                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2024                 :                            log_record_buffer.str, NULL) !=
    2025                 :       rec->record_length)
    2026                 :   {
    2027               0 :     eprint(tracef, "Failed to read record debug record");
    2028               0 :     return 1;
    2029                 :   }
    2030               0 :   debug_info= (enum translog_debug_info_type) log_record_buffer.str[0];
    2031               0 :   data= log_record_buffer.str + 1;
    2032               0 :   switch (debug_info) {
    2033                 :   case LOGREC_DEBUG_INFO_QUERY:
    2034               0 :     tprint(tracef, "Query: %s\n", (char*) data);
    2035                 :     break;
    2036                 :   default:
    2037               0 :     DBUG_ASSERT(0);
    2038                 :   }
    2039               0 :   return 0;
    2040                 : }
    2041                 : 
    2042                 : 
    2043                 : /**
    2044                 :   In some cases we have to skip execution of an UNDO record during the UNDO
    2045                 :   phase.
    2046                 : */
    2047                 : 
    2048                 : static void skip_undo_record(LSN previous_undo_lsn, TRN *trn)
    2049               0 : {
    2050               0 :   trn->undo_lsn= previous_undo_lsn;
    2051               0 :   if (previous_undo_lsn == LSN_IMPOSSIBLE) /* has fully rolled back */
    2052               0 :     trn->first_undo_lsn= LSN_WITH_FLAGS_TO_FLAGS(trn->first_undo_lsn);
    2053               0 :   skipped_undo_phase++;
    2054                 : }
    2055                 : 
    2056                 : 
    2057                 : prototype_undo_exec_hook(UNDO_ROW_INSERT)
    2058           31819 : {
    2059                 :   my_bool error;
    2060           31819 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2061           31819 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2062                 :   MARIA_SHARE *share;
    2063                 :   const uchar *record_ptr;
    2064                 : 
    2065           31819 :   if (info == NULL)
    2066                 :   {
    2067                 :     /*
    2068                 :       Unlike for REDOs, if the table was skipped it is abnormal; we have a
    2069                 :       transaction to rollback which used this table, as it is not rolled back
    2070                 :       it was supposed to hold this table and so the table should still be
    2071                 :       there. Skip it (user may have repaired the table with maria_chk because
    2072                 :       it was so badly corrupted that a previous recovery failed) but warn.
    2073                 :     */
    2074               0 :     skip_undo_record(previous_undo_lsn, trn);
    2075               0 :     return 0;
    2076                 :   }
    2077           31819 :   share= info->s;
    2078           31819 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2079                 :                           STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
    2080                 :                           STATE_NOT_MOVABLE);
    2081           31819 :   record_ptr= rec->header;
    2082           31819 :   if (share->calc_checksum)
    2083                 :   {
    2084                 :     /*
    2085                 :       We need to read more of the record to put the checksum into the record
    2086                 :       buffer used by _ma_apply_undo_row_insert().
    2087                 :       If the table has no live checksum, rec->header will be enough.
    2088                 :     */
    2089           31819 :     enlarge_buffer(rec);
    2090           31819 :     if (log_record_buffer.str == NULL ||
    2091                 :         translog_read_record(rec->lsn, 0, rec->record_length,
    2092                 :                              log_record_buffer.str, NULL) !=
    2093                 :         rec->record_length)
    2094                 :     {
    2095               0 :       eprint(tracef, "Failed to read record");
    2096               0 :       return 1;
    2097                 :     }
    2098           31819 :     record_ptr= log_record_buffer.str;
    2099                 :   }
    2100                 : 
    2101           31819 :   info->trn= trn;
    2102           31819 :   error= _ma_apply_undo_row_insert(info, previous_undo_lsn,
    2103                 :                                    record_ptr + LSN_STORE_SIZE +
    2104                 :                                    FILEID_STORE_SIZE);
    2105           31819 :   info->trn= 0;
    2106                 :   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
    2107           31819 :   tprint(tracef, "   rows' count %lu\n", (ulong)info->s->state.state.records);
    2108           31819 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2109                 :          LSN_IN_PARTS(trn->undo_lsn));
    2110           31819 :   return error;
    2111                 : }
    2112                 : 
    2113                 : 
    2114                 : prototype_undo_exec_hook(UNDO_ROW_DELETE)
    2115           13914 : {
    2116                 :   my_bool error;
    2117           13914 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2118           13914 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2119                 :   MARIA_SHARE *share;
    2120                 : 
    2121           13914 :   if (info == NULL)
    2122                 :   {
    2123               0 :     skip_undo_record(previous_undo_lsn, trn);
    2124               0 :     return 0;
    2125                 :   }
    2126                 : 
    2127           13914 :   share= info->s;
    2128           13914 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2129                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2130           13914 :   enlarge_buffer(rec);
    2131           13914 :   if (log_record_buffer.str == NULL ||
    2132                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2133                 :                            log_record_buffer.str, NULL) !=
    2134                 :        rec->record_length)
    2135                 :   {
    2136               0 :     eprint(tracef, "Failed to read record");
    2137               0 :     return 1;
    2138                 :   }
    2139                 : 
    2140           13914 :   info->trn= trn;
    2141           13914 :   error= _ma_apply_undo_row_delete(info, previous_undo_lsn,
    2142                 :                                    log_record_buffer.str + LSN_STORE_SIZE +
    2143                 :                                    FILEID_STORE_SIZE,
    2144                 :                                    rec->record_length -
    2145                 :                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
    2146           13914 :   info->trn= 0;
    2147           13914 :   tprint(tracef, "   rows' count %lu\n   undo_lsn now LSN (%lu,0x%lx)\n",
    2148                 :          (ulong)share->state.state.records, LSN_IN_PARTS(trn->undo_lsn));
    2149           13914 :   return error;
    2150                 : }
    2151                 : 
    2152                 : 
    2153                 : prototype_undo_exec_hook(UNDO_ROW_UPDATE)
    2154            1671 : {
    2155                 :   my_bool error;
    2156            1671 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2157            1671 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2158                 :   MARIA_SHARE *share;
    2159                 : 
    2160            1671 :   if (info == NULL)
    2161                 :   {
    2162               0 :     skip_undo_record(previous_undo_lsn, trn);
    2163               0 :     return 0;
    2164                 :   }
    2165                 : 
    2166            1671 :   share= info->s;
    2167            1671 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2168                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2169            1671 :   enlarge_buffer(rec);
    2170            1671 :   if (log_record_buffer.str == NULL ||
    2171                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2172                 :                            log_record_buffer.str, NULL) !=
    2173                 :        rec->record_length)
    2174                 :   {
    2175               0 :     eprint(tracef, "Failed to read record");
    2176               0 :     return 1;
    2177                 :   }
    2178                 : 
    2179            1671 :   info->trn= trn;
    2180            1671 :   error= _ma_apply_undo_row_update(info, previous_undo_lsn,
    2181                 :                                    log_record_buffer.str + LSN_STORE_SIZE +
    2182                 :                                    FILEID_STORE_SIZE,
    2183                 :                                    rec->record_length -
    2184                 :                                    (LSN_STORE_SIZE + FILEID_STORE_SIZE));
    2185            1671 :   info->trn= 0;
    2186            1671 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2187                 :          LSN_IN_PARTS(trn->undo_lsn));
    2188            1671 :   return error;
    2189                 : }
    2190                 : 
    2191                 : 
    2192                 : prototype_undo_exec_hook(UNDO_KEY_INSERT)
    2193          200674 : {
    2194                 :   my_bool error;
    2195          200674 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2196          200674 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2197                 :   MARIA_SHARE *share;
    2198                 : 
    2199          200674 :   if (info == NULL)
    2200                 :   {
    2201               0 :     skip_undo_record(previous_undo_lsn, trn);
    2202               0 :     return 0;
    2203                 :   }
    2204                 : 
    2205          200674 :   share= info->s;
    2206          200674 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2207                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2208                 : 
    2209          200674 :   enlarge_buffer(rec);
    2210          200674 :   if (log_record_buffer.str == NULL ||
    2211                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2212                 :                            log_record_buffer.str, NULL) !=
    2213                 :         rec->record_length)
    2214                 :   {
    2215               0 :     eprint(tracef, "Failed to read record");
    2216               0 :     return 1;
    2217                 :   }
    2218                 : 
    2219          200674 :   info->trn= trn;
    2220          200674 :   error= _ma_apply_undo_key_insert(info, previous_undo_lsn,
    2221                 :                                    log_record_buffer.str + LSN_STORE_SIZE +
    2222                 :                                    FILEID_STORE_SIZE,
    2223                 :                                    rec->record_length - LSN_STORE_SIZE -
    2224                 :                                    FILEID_STORE_SIZE);
    2225          200674 :   info->trn= 0;
    2226                 :   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
    2227          200674 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2228                 :          LSN_IN_PARTS(trn->undo_lsn));
    2229          200674 :   return error;
    2230                 : }
    2231                 : 
    2232                 : 
    2233                 : prototype_undo_exec_hook(UNDO_KEY_DELETE)
    2234           91912 : {
    2235                 :   my_bool error;
    2236           91912 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2237           91912 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2238                 :   MARIA_SHARE *share;
    2239                 : 
    2240           91912 :   if (info == NULL)
    2241                 :   {
    2242               0 :     skip_undo_record(previous_undo_lsn, trn);
    2243               0 :     return 0;
    2244                 :   }
    2245                 : 
    2246           91912 :   share= info->s;
    2247           91912 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2248                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2249                 : 
    2250           91912 :   enlarge_buffer(rec);
    2251           91912 :   if (log_record_buffer.str == NULL ||
    2252                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2253                 :                            log_record_buffer.str, NULL) !=
    2254                 :         rec->record_length)
    2255                 :   {
    2256               0 :     eprint(tracef, "Failed to read record");
    2257               0 :     return 1;
    2258                 :   }
    2259                 : 
    2260           91912 :   info->trn= trn;
    2261           91912 :   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
    2262                 :                                    log_record_buffer.str + LSN_STORE_SIZE +
    2263                 :                                    FILEID_STORE_SIZE,
    2264                 :                                    rec->record_length - LSN_STORE_SIZE -
    2265                 :                                    FILEID_STORE_SIZE, FALSE);
    2266           91912 :   info->trn= 0;
    2267                 :   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
    2268           91912 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2269                 :          LSN_IN_PARTS(trn->undo_lsn));
    2270           91912 :   return error;
    2271                 : }
    2272                 : 
    2273                 : 
    2274                 : prototype_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
    2275             138 : {
    2276                 :   my_bool error;
    2277             138 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2278             138 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2279                 :   MARIA_SHARE *share;
    2280                 : 
    2281             138 :   if (info == NULL)
    2282                 :   {
    2283               0 :     skip_undo_record(previous_undo_lsn, trn);
    2284               0 :     return 0;
    2285                 :   }
    2286                 : 
    2287             138 :   share= info->s;
    2288             138 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2289                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2290                 : 
    2291             138 :   enlarge_buffer(rec);
    2292             138 :   if (log_record_buffer.str == NULL ||
    2293                 :       translog_read_record(rec->lsn, 0, rec->record_length,
    2294                 :                            log_record_buffer.str, NULL) !=
    2295                 :         rec->record_length)
    2296                 :   {
    2297               0 :     eprint(tracef, "Failed to read record");
    2298               0 :     return 1;
    2299                 :   }
    2300                 : 
    2301             138 :   info->trn= trn;
    2302             138 :   error= _ma_apply_undo_key_delete(info, previous_undo_lsn,
    2303                 :                                    log_record_buffer.str + LSN_STORE_SIZE +
    2304                 :                                    FILEID_STORE_SIZE,
    2305                 :                                    rec->record_length - LSN_STORE_SIZE -
    2306                 :                                    FILEID_STORE_SIZE, TRUE);
    2307             138 :   info->trn= 0;
    2308                 :   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
    2309             138 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2310                 :          LSN_IN_PARTS(trn->undo_lsn));
    2311             138 :   return error;
    2312                 : }
    2313                 : 
    2314                 : 
    2315                 : prototype_undo_exec_hook(UNDO_BULK_INSERT)
    2316               0 : {
    2317                 :   my_bool error;
    2318               0 :   MARIA_HA *info= get_MARIA_HA_from_UNDO_record(rec);
    2319               0 :   LSN previous_undo_lsn= lsn_korr(rec->header);
    2320                 :   MARIA_SHARE *share;
    2321                 : 
    2322               0 :   if (info == NULL)
    2323                 :   {
    2324               0 :     skip_undo_record(previous_undo_lsn, trn);
    2325               0 :     return 0;
    2326                 :   }
    2327                 : 
    2328               0 :   share= info->s;
    2329               0 :   share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
    2330                 :                           STATE_NOT_ZEROFILLED | STATE_NOT_MOVABLE);
    2331                 : 
    2332               0 :   info->trn= trn;
    2333               0 :   error= _ma_apply_undo_bulk_insert(info, previous_undo_lsn);
    2334               0 :   info->trn= 0;
    2335                 :   /* trn->undo_lsn is updated in an inwrite_hook when writing the CLR_END */
    2336               0 :   tprint(tracef, "   undo_lsn now LSN (%lu,0x%lx)\n",
    2337                 :          LSN_IN_PARTS(trn->undo_lsn));
    2338               0 :   return error;
    2339                 : }
    2340                 : 
    2341                 : 
    2342                 : static int run_redo_phase(LSN lsn, enum maria_apply_log_way apply)
    2343             310 : {
    2344                 :   TRANSLOG_HEADER_BUFFER rec;
    2345                 :   struct st_translog_scanner_data scanner;
    2346                 :   int len;
    2347                 :   uint i;
    2348                 : 
    2349                 :   /* install hooks for execution */
    2350                 : #define install_redo_exec_hook(R)                                        \
    2351                 :   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
    2352                 :     exec_REDO_LOGREC_ ## R;
    2353                 : #define install_redo_exec_hook_shared(R,S)                               \
    2354                 :   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_redo_phase= \
    2355                 :     exec_REDO_LOGREC_ ## S;
    2356                 : #define install_undo_exec_hook(R)                                        \
    2357                 :   log_record_type_descriptor[LOGREC_ ## R].record_execute_in_undo_phase= \
    2358                 :     exec_UNDO_LOGREC_ ## R;
    2359             310 :   install_redo_exec_hook(LONG_TRANSACTION_ID);
    2360             310 :   install_redo_exec_hook(CHECKPOINT);
    2361             310 :   install_redo_exec_hook(REDO_CREATE_TABLE);
    2362             310 :   install_redo_exec_hook(REDO_RENAME_TABLE);
    2363             310 :   install_redo_exec_hook(REDO_REPAIR_TABLE);
    2364             310 :   install_redo_exec_hook(REDO_DROP_TABLE);
    2365             310 :   install_redo_exec_hook(FILE_ID);
    2366             310 :   install_redo_exec_hook(INCOMPLETE_LOG);
    2367             310 :   install_redo_exec_hook(INCOMPLETE_GROUP);
    2368             310 :   install_redo_exec_hook(REDO_INSERT_ROW_HEAD);
    2369             310 :   install_redo_exec_hook(REDO_INSERT_ROW_TAIL);
    2370             310 :   install_redo_exec_hook(REDO_INSERT_ROW_BLOBS);
    2371             310 :   install_redo_exec_hook(REDO_PURGE_ROW_HEAD);
    2372             310 :   install_redo_exec_hook(REDO_PURGE_ROW_TAIL);
    2373             310 :   install_redo_exec_hook(REDO_FREE_HEAD_OR_TAIL);
    2374             310 :   install_redo_exec_hook(REDO_FREE_BLOCKS);
    2375             310 :   install_redo_exec_hook(REDO_DELETE_ALL);
    2376             310 :   install_redo_exec_hook(REDO_INDEX);
    2377             310 :   install_redo_exec_hook(REDO_INDEX_NEW_PAGE);
    2378             310 :   install_redo_exec_hook(REDO_INDEX_FREE_PAGE);
    2379             310 :   install_redo_exec_hook(REDO_BITMAP_NEW_PAGE);
    2380             310 :   install_redo_exec_hook(UNDO_ROW_INSERT);
    2381             310 :   install_redo_exec_hook(UNDO_ROW_DELETE);
    2382             310 :   install_redo_exec_hook(UNDO_ROW_UPDATE);
    2383             310 :   install_redo_exec_hook(UNDO_KEY_INSERT);
    2384             310 :   install_redo_exec_hook(UNDO_KEY_DELETE);
    2385             310 :   install_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
    2386             310 :   install_redo_exec_hook(COMMIT);
    2387             310 :   install_redo_exec_hook(CLR_END);
    2388             310 :   install_undo_exec_hook(UNDO_ROW_INSERT);
    2389             310 :   install_undo_exec_hook(UNDO_ROW_DELETE);
    2390             310 :   install_undo_exec_hook(UNDO_ROW_UPDATE);
    2391             310 :   install_undo_exec_hook(UNDO_KEY_INSERT);
    2392             310 :   install_undo_exec_hook(UNDO_KEY_DELETE);
    2393             310 :   install_undo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT);
    2394                 :   /* REDO_NEW_ROW_HEAD shares entry with REDO_INSERT_ROW_HEAD */
    2395             310 :   install_redo_exec_hook_shared(REDO_NEW_ROW_HEAD, REDO_INSERT_ROW_HEAD);
    2396                 :   /* REDO_NEW_ROW_TAIL shares entry with REDO_INSERT_ROW_TAIL */
    2397             310 :   install_redo_exec_hook_shared(REDO_NEW_ROW_TAIL, REDO_INSERT_ROW_TAIL);
    2398             310 :   install_redo_exec_hook(UNDO_BULK_INSERT);
    2399             310 :   install_undo_exec_hook(UNDO_BULK_INSERT);
    2400             310 :   install_redo_exec_hook(IMPORTED_TABLE);
    2401             310 :   install_redo_exec_hook(DEBUG_INFO);
    2402                 : 
    2403             310 :   current_group_end_lsn= LSN_IMPOSSIBLE;
    2404                 : #ifndef DBUG_OFF
    2405             310 :   current_group_table= NULL;
    2406                 : #endif
    2407                 : 
    2408             310 :   if (unlikely(lsn == LSN_IMPOSSIBLE || lsn == translog_get_horizon()))
    2409                 :   {
    2410               3 :     tprint(tracef, "checkpoint address refers to the log end log or "
    2411                 :            "log is empty, nothing to do.\n");
    2412               3 :     return 0;
    2413                 :   }
    2414                 : 
    2415             307 :   len= translog_read_record_header(lsn, &rec);
    2416                 : 
    2417             307 :   if (len == RECHEADER_READ_ERROR)
    2418                 :   {
    2419               0 :     eprint(tracef, "Failed to read header of the first record.");
    2420               0 :     return 1;
    2421                 :   }
    2422             307 :   if (translog_scanner_init(lsn, 1, &scanner, 1))
    2423                 :   {
    2424               0 :     tprint(tracef, "Scanner init failed\n");
    2425               0 :     return 1;
    2426                 :   }
    2427         3741798 :   for (i= 1;;i++)
    2428                 :   {
    2429         3741798 :     uint16 sid= rec.short_trid;
    2430         3741798 :     const LOG_DESC *log_desc= &log_record_type_descriptor[rec.type];
    2431         3741798 :     display_record_position(log_desc, &rec, i);
    2432                 :     /*
    2433                 :       A complete group is a set of log records with an "end mark" record
    2434                 :       (e.g. a set of REDOs for an operation, terminated by an UNDO for this
    2435                 :       operation); if there is no "end mark" record the group is incomplete and
    2436                 :       won't be executed.
    2437                 :     */
    2438         5593278 :     if ((log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF) ||
    2439                 :         (log_desc->record_in_group == LOGREC_LAST_IN_GROUP))
    2440                 :     {
    2441         1851480 :       if (all_active_trans[sid].group_start_lsn != LSN_IMPOSSIBLE)
    2442                 :       {
    2443         1849188 :         if (log_desc->record_in_group == LOGREC_IS_GROUP_ITSELF)
    2444                 :         {
    2445                 :           /*
    2446                 :             Can happen if the transaction got a table write error, then
    2447                 :             unlocked tables thus wrote a COMMIT record. Or can be an
    2448                 :             INCOMPLETE_GROUP record written by a previous recovery.
    2449                 :           */
    2450               0 :           tprint(tracef, "\nDiscarding incomplete group before this record\n");
    2451               0 :           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
    2452                 :         }
    2453                 :         else
    2454                 :         {
    2455                 :           struct st_translog_scanner_data scanner2;
    2456                 :           TRANSLOG_HEADER_BUFFER rec2;
    2457                 :           /*
    2458                 :             There is a complete group for this transaction, containing more
    2459                 :             than this event.
    2460                 :           */
    2461         1849188 :           tprint(tracef, "   ends a group:\n");
    2462         1849188 :           len=
    2463                 :             translog_read_record_header(all_active_trans[sid].group_start_lsn,
    2464                 :                                         &rec2);
    2465         1849188 :           if (len < 0) /* EOF or error */
    2466                 :           {
    2467               0 :             tprint(tracef, "Cannot find record where it should be\n");
    2468               0 :             goto err;
    2469                 :           }
    2470         1849188 :           if (translog_scanner_init(rec2.lsn, 1, &scanner2, 1))
    2471                 :           {
    2472               0 :             tprint(tracef, "Scanner2 init failed\n");
    2473               0 :             goto err;
    2474                 :           }
    2475         1849188 :           current_group_end_lsn= rec.lsn;
    2476                 :           do
    2477                 :           {
    2478         1890318 :             if (rec2.short_trid == sid) /* it's in our group */
    2479                 :             {
    2480         1890318 :               const LOG_DESC *log_desc2= &log_record_type_descriptor[rec2.type];
    2481         1890318 :               display_record_position(log_desc2, &rec2, 0);
    2482         1890318 :               if (apply == MARIA_LOG_CHECK)
    2483                 :               {
    2484                 :                 translog_size_t read_len;
    2485               0 :                 enlarge_buffer(&rec2);
    2486               0 :                 read_len=
    2487                 :                   translog_read_record(rec2.lsn, 0, rec2.record_length,
    2488                 :                                        log_record_buffer.str, NULL);
    2489               0 :                 if (read_len != rec2.record_length)
    2490                 :                 {
    2491               0 :                   tprint(tracef, "Cannot read record's body: read %u of"
    2492                 :                          " %u bytes\n", read_len, rec2.record_length);
    2493               0 :                   translog_destroy_scanner(&scanner2);
    2494               0 :                   translog_free_record_header(&rec2);
    2495               0 :                   goto err;
    2496                 :                 }
    2497                 :               }
    2498         1890318 :               if (apply == MARIA_LOG_APPLY &&
    2499                 :                   display_and_apply_record(log_desc2, &rec2))
    2500                 :               {
    2501               0 :                 translog_destroy_scanner(&scanner2);
    2502               0 :                 translog_free_record_header(&rec2);
    2503               0 :                 goto err;
    2504                 :               }
    2505                 :             }
    2506         1890318 :             translog_free_record_header(&rec2);
    2507         1890318 :             len= translog_read_next_record_header(&scanner2, &rec2);
    2508         1890318 :             if (len < 0) /* EOF or error */
    2509                 :             {
    2510               0 :               tprint(tracef, "Cannot find record where it should be\n");
    2511               0 :               translog_destroy_scanner(&scanner2);
    2512               0 :               translog_free_record_header(&rec2);
    2513               0 :               goto err;
    2514                 :             }
    2515                 :           }
    2516         1890318 :           while (rec2.lsn < rec.lsn);
    2517                 :           /* group finished */
    2518         1849188 :           all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
    2519         1849188 :           current_group_end_lsn= LSN_IMPOSSIBLE; /* for debugging */
    2520         1849188 :           display_record_position(log_desc, &rec, 0);
    2521         1849188 :           translog_destroy_scanner(&scanner2);
    2522         1849188 :           translog_free_record_header(&rec2);
    2523                 :         }
    2524                 :       }
    2525         1851480 :       if (apply == MARIA_LOG_APPLY &&
    2526                 :           display_and_apply_record(log_desc, &rec))
    2527         1851480 :         goto err;
    2528                 : #ifndef DBUG_OFF
    2529         1851480 :       current_group_table= NULL;
    2530                 : #endif
    2531                 :     }
    2532                 :     else /* record does not end group */
    2533                 :     {
    2534                 :       /* just record the fact, can't know if can execute yet */
    2535         1890318 :       if (all_active_trans[sid].group_start_lsn == LSN_IMPOSSIBLE)
    2536                 :       {
    2537                 :         /* group not yet started */
    2538         1849188 :         all_active_trans[sid].group_start_lsn= rec.lsn;
    2539                 :       }
    2540                 :     }
    2541         3741798 :     translog_free_record_header(&rec);
    2542         3741798 :     len= translog_read_next_record_header(&scanner, &rec);
    2543         3741798 :     if (len < 0)
    2544                 :     {
    2545             307 :       switch (len)
    2546                 :       {
    2547                 :       case RECHEADER_READ_EOF:
    2548             307 :         tprint(tracef, "EOF on the log\n");
    2549             307 :         break;
    2550                 :       case RECHEADER_READ_ERROR:
    2551               0 :         tprint(tracef, "Error reading log\n");
    2552               0 :         goto err;
    2553                 :       }
    2554                 :       break;
    2555                 :     }
    2556         3741491 :   }
    2557             307 :   translog_destroy_scanner(&scanner);
    2558             307 :   translog_free_record_header(&rec);
    2559             307 :   if (recovery_message_printed == REC_MSG_REDO)
    2560                 :   {
    2561               7 :     fprintf(stderr, " 100%%");
    2562               7 :     fflush(stderr);
    2563               7 :     procent_printed= 1;
    2564                 :   }
    2565             307 :   return 0;
    2566                 : 
    2567               0 : err:
    2568               0 :   translog_destroy_scanner(&scanner);
    2569               0 :   translog_free_record_header(&rec);
    2570               0 :   return 1;
    2571                 : }
    2572                 : 
    2573                 : 
    2574                 : /**
    2575                 :    @brief Informs about any aborted groups or uncommitted transactions,
    2576                 :    prepares for the UNDO phase if needed.
    2577                 : 
    2578                 :    @note Observe that it may init trnman.
    2579                 : */
    2580                 : static uint end_of_redo_phase(my_bool prepare_for_undo_phase)
    2581             310 : {
    2582             310 :   uint sid, uncommitted= 0;
    2583                 :   char llbuf[22];
    2584                 :   LSN addr;
    2585                 : 
    2586             310 :   hash_free(&all_dirty_pages);
    2587                 :   /*
    2588                 :     hash_free() can be called multiple times probably, but be safe if that
    2589                 :     changes
    2590                 :   */
    2591             310 :   bzero(&all_dirty_pages, sizeof(all_dirty_pages));
    2592             310 :   my_free(dirty_pages_pool, MYF(MY_ALLOW_ZERO_PTR));
    2593             310 :   dirty_pages_pool= NULL;
    2594                 : 
    2595             310 :   llstr(max_long_trid, llbuf);
    2596             310 :   tprint(tracef, "Maximum transaction long id seen: %s\n", llbuf);
    2597             310 :   llstr(max_trid_in_control_file, llbuf);
    2598             310 :   tprint(tracef, "Maximum transaction long id seen in control file: %s\n",
    2599                 :          llbuf);
    2600                 :   /*
    2601                 :     If logs were deleted, or lost, trid in control file is needed to set
    2602                 :     trnman's generator:
    2603                 :   */
    2604             310 :   set_if_bigger(max_long_trid, max_trid_in_control_file);
    2605             310 :   if (prepare_for_undo_phase && trnman_init(max_long_trid))
    2606               0 :     return -1;
    2607                 : 
    2608             310 :   trns_created= TRUE;
    2609                 : 
    2610        20316470 :   for (sid= 0; sid <= SHORT_TRID_MAX; sid++)
    2611                 :   {
    2612        20316160 :     TrID long_trid= all_active_trans[sid].long_trid;
    2613        20316160 :     LSN gslsn= all_active_trans[sid].group_start_lsn;
    2614                 :     TRN *trn;
    2615        20316160 :     if (gslsn != LSN_IMPOSSIBLE)
    2616                 :     {
    2617               0 :       tprint(tracef, "Group at LSN (%lu,0x%lx) short_trid %u incomplete\n",
    2618                 :              LSN_IN_PARTS(gslsn), sid);
    2619               0 :       all_active_trans[sid].group_start_lsn= LSN_IMPOSSIBLE;
    2620                 :     }
    2621        20316160 :     if (all_active_trans[sid].undo_lsn != LSN_IMPOSSIBLE)
    2622                 :     {
    2623              83 :       llstr(long_trid, llbuf);
    2624              83 :       tprint(tracef, "Transaction long_trid %s short_trid %u uncommitted\n",
    2625                 :              llbuf, sid);
    2626                 :       /*
    2627                 :         dummy_transaction_object serves only for DDLs, where there is never a
    2628                 :         rollback or incomplete group. And unknown transactions (which have
    2629                 :         long_trid==0) should have undo_lsn==LSN_IMPOSSIBLE.
    2630                 :       */
    2631              83 :       if (long_trid ==0)
    2632                 :       {
    2633               0 :         eprint(tracef, "Transaction with long_trid 0 should not roll back");
    2634                 :         ALERT_USER();
    2635               0 :         return -1;
    2636                 :       }
    2637              83 :       if (prepare_for_undo_phase)
    2638                 :       {
    2639              83 :         if ((trn= trnman_recreate_trn_from_recovery(sid, long_trid)) == NULL)
    2640               0 :           return -1;
    2641              83 :         trn->undo_lsn= all_active_trans[sid].undo_lsn;
    2642              83 :         trn->first_undo_lsn= all_active_trans[sid].first_undo_lsn |
    2643                 :           TRANSACTION_LOGGED_LONG_ID; /* because trn is known in log */
    2644              83 :         if (gslsn != LSN_IMPOSSIBLE)
    2645                 :         {
    2646                 :           /*
    2647                 :             UNDO phase will log some records. So, a future recovery may see:
    2648                 :             REDO(from incomplete group) - REDO(from rollback) - CLR_END
    2649                 :             and thus execute the first REDO (finding it in "a complete
    2650                 :             group"). To prevent that:
    2651                 :           */
    2652                 :           LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS];
    2653                 :           LSN lsn;
    2654               0 :           if (translog_write_record(&lsn, LOGREC_INCOMPLETE_GROUP,
    2655                 :                                     trn, NULL, 0,
    2656                 :                                     TRANSLOG_INTERNAL_PARTS, log_array,
    2657                 :                                     NULL, NULL))
    2658               0 :             return -1;
    2659                 :         }
    2660                 :       }
    2661              83 :       uncommitted++;
    2662                 :     }
    2663                 : #ifdef MARIA_VERSIONING
    2664                 :     /*
    2665                 :       If real recovery: if transaction was committed, move it to some separate
    2666                 :       list for soon purging.
    2667                 :     */
    2668                 : #endif
    2669                 :   }
    2670                 : 
    2671             310 :   my_free(all_active_trans, MYF(MY_ALLOW_ZERO_PTR));
    2672             310 :   all_active_trans= NULL;
    2673                 : 
    2674                 :   /*
    2675                 :     The UNDO phase uses some normal run-time code of ROLLBACK: generates log
    2676                 :     records, etc; prepare tables for that
    2677                 :   */
    2678             310 :   addr= translog_get_horizon();
    2679        20316470 :   for (sid= 0; sid <= SHARE_ID_MAX; sid++)
    2680                 :   {
    2681        20316160 :     MARIA_HA *info= all_tables[sid].info;
    2682        20316160 :     if (info != NULL)
    2683                 :     {
    2684             307 :       prepare_table_for_close(info, addr);
    2685                 :       /*
    2686                 :         But we don't close it; we leave it available for the UNDO phase;
    2687                 :         it's likely that the UNDO phase will need it.
    2688                 :       */
    2689             307 :       if (prepare_for_undo_phase)
    2690             307 :         translog_assign_id_to_share_from_recovery(info->s, sid);
    2691                 :     }
    2692                 :   }
    2693             310 :   return uncommitted;
    2694                 : }
    2695                 : 
    2696                 : 
    2697                 : static int run_undo_phase(uint uncommitted)
    2698             310 : {
    2699                 :   LSN last_undo;
    2700             310 :   DBUG_ENTER("run_undo_phase");
    2701                 : 
    2702             310 :   if (uncommitted > 0)
    2703                 :   {
    2704              83 :     checkpoint_useful= TRUE;
    2705              83 :     if (tracef != stdout)
    2706                 :     {
    2707               3 :       if (recovery_message_printed == REC_MSG_NONE)
    2708               0 :         print_preamble();
    2709               3 :       fprintf(stderr, "transactions to roll back:");
    2710               3 :       recovery_message_printed= REC_MSG_UNDO;
    2711                 :     }
    2712              83 :     tprint(tracef, "%u transactions will be rolled back\n", uncommitted);
    2713              83 :     procent_printed= 1;
    2714                 :     for( ; ; )
    2715                 :     {
    2716                 :       char llbuf[22];
    2717                 :       TRN *trn;
    2718             166 :       if (recovery_message_printed == REC_MSG_UNDO)
    2719                 :       {
    2720               6 :         fprintf(stderr, " %u", uncommitted);
    2721               6 :         fflush(stderr);
    2722                 :       }
    2723             166 :       if ((uncommitted--) == 0)
    2724              83 :         break;
    2725              83 :       trn= trnman_get_any_trn();
    2726              83 :       DBUG_ASSERT(trn != NULL);
    2727              83 :       llstr(trn->trid, llbuf);
    2728              83 :       tprint(tracef, "Rolling back transaction of long id %s\n", llbuf);
    2729              83 :       last_undo= trn->undo_lsn + 1;
    2730                 : 
    2731                 :       /* Execute all undo entries */
    2732          340294 :       while (trn->undo_lsn)
    2733                 :       {
    2734                 :         TRANSLOG_HEADER_BUFFER rec;
    2735                 :         LOG_DESC *log_desc;
    2736          340128 :         DBUG_ASSERT(trn->undo_lsn < last_undo);
    2737          340128 :         last_undo= trn->undo_lsn;
    2738                 : 
    2739          340128 :         if (translog_read_record_header(trn->undo_lsn, &rec) ==
    2740                 :             RECHEADER_READ_ERROR)
    2741               0 :           DBUG_RETURN(1);
    2742          340128 :         log_desc= &log_record_type_descriptor[rec.type];
    2743          340128 :         display_record_position(log_desc, &rec, 0);
    2744          340128 :         if (log_desc->record_execute_in_undo_phase(&rec, trn))
    2745                 :         {
    2746               0 :           eprint(tracef, "Got error %d when executing undo %s", my_errno,
    2747                 :                  log_desc->name);
    2748               0 :           translog_free_record_header(&rec);
    2749               0 :           DBUG_RETURN(1);
    2750                 :         }
    2751          340128 :         translog_free_record_header(&rec);
    2752                 :       }
    2753                 : 
    2754              83 :       if (trnman_rollback_trn(trn))
    2755               0 :         DBUG_RETURN(1);
    2756                 :       /* We could want to span a few threads (4?) instead of 1 */
    2757                 :       /* In the future, we want to have this phase *online* */
    2758                 :     }
    2759                 :   }
    2760             310 :   procent_printed= 0;
    2761             310 :   DBUG_RETURN(0);
    2762                 : }
    2763                 : 
    2764                 : 
    2765                 : /**
    2766                 :   In case of error in recovery, deletes all transactions from the transaction
    2767                 :   manager so that this module does not assert.
    2768                 : 
    2769                 :   @note no checkpoint should be taken as those transactions matter for the
    2770                 :   next recovery (they still haven't been properly dealt with).
    2771                 : */
    2772                 : 
    2773                 : static void delete_all_transactions()
    2774               0 : {
    2775                 :   for( ; ; )
    2776                 :   {
    2777               0 :     TRN *trn= trnman_get_any_trn();
    2778               0 :     if (trn == NULL)
    2779               0 :       break;
    2780               0 :     trn->undo_lsn= trn->first_undo_lsn= LSN_IMPOSSIBLE;
    2781               0 :     trnman_rollback_trn(trn); /* ignore error */
    2782               0 :   }
    2783                 : }
    2784                 : 
    2785                 : 
    2786                 : /**
    2787                 :    @brief re-enables transactionality, updates is_of_horizon
    2788                 : 
    2789                 :    @param  info                table
    2790                 :    @param  horizon             address to set is_of_horizon
    2791                 : */
    2792                 : 
    2793                 : static void prepare_table_for_close(MARIA_HA *info, TRANSLOG_ADDRESS horizon)
    2794             690 : {
    2795             690 :   MARIA_SHARE *share= info->s;
    2796                 :   /*
    2797                 :     In a fully-forward REDO phase (no checkpoint record),
    2798                 :     state is now at least as new as the LSN of the current record. It may be
    2799                 :     newer, in case we are seeing a LOGREC_FILE_ID which tells us to close a
    2800                 :     table, but that table was later modified further in the log.
    2801                 :     But if we parsed a checkpoint record, it may be this way in the log:
    2802                 :     FILE_ID(6->t2)... FILE_ID(6->t1)... CHECKPOINT(6->t1)
    2803                 :     Checkpoint parsing opened t1 with id 6; first FILE_ID above is going to
    2804                 :     make t1 close; the first condition below is however false (when checkpoint
    2805                 :     was taken it increased is_of_horizon) and so it works. For safety we
    2806                 :     add the second condition.
    2807                 :   */
    2808             690 :   if (cmp_translog_addr(share->state.is_of_horizon, horizon) < 0 &&
    2809                 :       cmp_translog_addr(share->lsn_of_file_id, horizon) < 0)
    2810                 :   {
    2811             329 :     share->state.is_of_horizon= horizon;
    2812             329 :     _ma_state_info_write_sub(share->kfile.file, &share->state,
    2813                 :                              MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET);
    2814                 :   }
    2815                 : 
    2816                 :   /*
    2817                 :    Ensure that info->state is up to date as
    2818                 :    _ma_renable_logging_for_table() is depending on this
    2819                 :   */
    2820             690 :   *info->state= info->s->state.state;
    2821                 : 
    2822                 :   /*
    2823                 :     This leaves PAGECACHE_PLAIN_PAGE pages into the cache, while the table is
    2824                 :     going to switch back to transactional. So the table will be a mix of
    2825                 :     pages, which is ok as long as we don't take any checkpoints until all
    2826                 :     tables get closed at the end of the UNDO phase.
    2827                 :   */
    2828             690 :   _ma_reenable_logging_for_table(info, FALSE);
    2829             690 :   info->trn= NULL; /* safety */
    2830                 : }
    2831                 : 
    2832                 : 
    2833                 : static MARIA_HA *get_MARIA_HA_from_REDO_record(const
    2834                 :                                                TRANSLOG_HEADER_BUFFER *rec)
    2835         1890318 : {
    2836                 :   uint16 sid;
    2837                 :   pgcache_page_no_t page;
    2838                 :   MARIA_HA *info;
    2839                 :   MARIA_SHARE *share;
    2840                 :   char llbuf[22];
    2841         1890318 :   my_bool index_page_redo_entry= FALSE, page_redo_entry= FALSE;
    2842         1890318 :   LINT_INIT(page);
    2843                 : 
    2844         1890318 :   print_redo_phase_progress(rec->lsn);
    2845         1890318 :   sid= fileid_korr(rec->header);
    2846         1890318 :   switch (rec->type) {
    2847                 :     /* not all REDO records have a page: */
    2848                 :   case LOGREC_REDO_INDEX_NEW_PAGE:
    2849                 :   case LOGREC_REDO_INDEX:
    2850                 :   case LOGREC_REDO_INDEX_FREE_PAGE:
    2851         1594196 :     index_page_redo_entry= 1;
    2852                 :     /* Fall trough*/
    2853                 :   case LOGREC_REDO_INSERT_ROW_HEAD:
    2854                 :   case LOGREC_REDO_INSERT_ROW_TAIL:
    2855                 :   case LOGREC_REDO_PURGE_ROW_HEAD:
    2856                 :   case LOGREC_REDO_PURGE_ROW_TAIL:
    2857                 :   case LOGREC_REDO_NEW_ROW_HEAD:
    2858                 :   case LOGREC_REDO_NEW_ROW_TAIL:
    2859                 :   case LOGREC_REDO_FREE_HEAD_OR_TAIL:
    2860         1878505 :     page_redo_entry= TRUE;
    2861         1878505 :     page= page_korr(rec->header + FILEID_STORE_SIZE);
    2862         1878505 :     llstr(page, llbuf);
    2863                 :     break;
    2864                 :     /*
    2865                 :       For REDO_FREE_BLOCKS, no need to look at dirty pages list: it does not
    2866                 :       read data pages, only reads/modifies bitmap page(s) which is cheap.
    2867                 :     */
    2868                 :   default:
    2869                 :     break;
    2870                 :   }
    2871         1890318 :   tprint(tracef, "   For table of short id %u", sid);
    2872         1890318 :   info= all_tables[sid].info;
    2873                 : #ifndef DBUG_OFF
    2874         1890318 :   DBUG_ASSERT(current_group_table == NULL || current_group_table == info);
    2875         1890318 :   current_group_table= info;
    2876                 : #endif
    2877         1890318 :   if (info == NULL)
    2878                 :   {
    2879               0 :     tprint(tracef, ", table skipped, so skipping record\n");
    2880               0 :     return NULL;
    2881                 :   }
    2882         1890318 :   share= info->s;
    2883         1890318 :   tprint(tracef, ", '%s'", share->open_file_name.str);
    2884         1890318 :   DBUG_ASSERT(in_redo_phase);
    2885         1890318 :   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
    2886                 :   {
    2887                 :     /*
    2888                 :       This can happen only if processing a record before the checkpoint
    2889                 :       record.
    2890                 :       id->name mapping is newer than REDO record: for sure the table subject
    2891                 :       of the REDO has been flushed and forced (id re-assignment implies this);
    2892                 :       REDO can be ignored (and must be, as we don't know what this subject
    2893                 :       table was).
    2894                 :     */
    2895               0 :     DBUG_ASSERT(cmp_translog_addr(rec->lsn, checkpoint_start) < 0);
    2896               0 :     tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
    2897                 :            " than record, skipping record",
    2898                 :            LSN_IN_PARTS(share->lsn_of_file_id));
    2899               0 :     return NULL;
    2900                 :   }
    2901         1890318 :   if (cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
    2902                 :   {
    2903                 :     /* probably a bulk insert repair */
    2904               0 :     tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
    2905                 :            " record, skipping record\n",
    2906                 :            LSN_IN_PARTS(share->state.skip_redo_lsn));
    2907               0 :     return NULL;
    2908                 :   }
    2909                 :   /* detect if an open instance of a dropped table (internal bug) */
    2910         1890318 :   DBUG_ASSERT(share->last_version != 0);
    2911         1890318 :   if (page_redo_entry)
    2912                 :   {
    2913                 :     /*
    2914                 :       Consult dirty pages list.
    2915                 :       REDO_INSERT_ROW_BLOBS will consult list by itself, as it covers several
    2916                 :       pages.
    2917                 :     */
    2918         1878505 :     tprint(tracef, " page %s", llbuf);
    2919         1878505 :     if (_ma_redo_not_needed_for_page(sid, rec->lsn, page,
    2920                 :                                      index_page_redo_entry))
    2921               0 :       return NULL;
    2922                 :   }
    2923                 :   /*
    2924                 :     So we are going to read the page, and if its LSN is older than the
    2925                 :     record's we will modify the page
    2926                 :   */
    2927         1890318 :   tprint(tracef, ", applying record\n");
    2928         1890318 :   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
    2929         1890318 :   return info;
    2930                 : }
    2931                 : 
    2932                 : 
    2933                 : static MARIA_HA *get_MARIA_HA_from_UNDO_record(const
    2934                 :                                                TRANSLOG_HEADER_BUFFER *rec)
    2935         2189316 : {
    2936                 :   uint16 sid;
    2937                 :   MARIA_HA *info;
    2938                 :   MARIA_SHARE *share;
    2939                 : 
    2940         2189316 :   sid= fileid_korr(rec->header + LSN_STORE_SIZE);
    2941         2189316 :   tprint(tracef, "   For table of short id %u", sid);
    2942         2189316 :   info= all_tables[sid].info;
    2943                 : #ifndef DBUG_OFF
    2944         2189316 :   DBUG_ASSERT(!in_redo_phase ||
    2945                 :               current_group_table == NULL || current_group_table == info);
    2946         2189316 :   current_group_table= info;
    2947                 : #endif
    2948         2189316 :   if (info == NULL)
    2949                 :   {
    2950               0 :     tprint(tracef, ", table skipped, so skipping record\n");
    2951               0 :     return NULL;
    2952                 :   }
    2953         2189316 :   share= info->s;
    2954         2189316 :   tprint(tracef, ", '%s'", share->open_file_name.str);
    2955         2189316 :   if (cmp_translog_addr(rec->lsn, share->lsn_of_file_id) <= 0)
    2956                 :   {
    2957               0 :     tprint(tracef, ", table's LOGREC_FILE_ID has LSN (%lu,0x%lx) more recent"
    2958                 :            " than record, skipping record",
    2959                 :            LSN_IN_PARTS(share->lsn_of_file_id));
    2960               0 :     return NULL;
    2961                 :   }
    2962         2189316 :   if (in_redo_phase &&
    2963                 :       cmp_translog_addr(rec->lsn, share->state.skip_redo_lsn) <= 0)
    2964                 :   {
    2965                 :     /* probably a bulk insert repair */
    2966               0 :     tprint(tracef, ", has skip_redo_lsn (%lu,0x%lx) more recent than"
    2967                 :            " record, skipping record\n",
    2968                 :            LSN_IN_PARTS(share->state.skip_redo_lsn));
    2969               0 :     return NULL;
    2970                 :   }
    2971         2189316 :   DBUG_ASSERT(share->last_version != 0);
    2972         2189316 :   _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); /* to flush state on close */
    2973         2189316 :   tprint(tracef, ", applying record\n");
    2974         2189316 :   return info;
    2975                 : }
    2976                 : 
    2977                 : 
    2978                 : /**
    2979                 :    @brief Parses checkpoint record.
    2980                 : 
    2981                 :    Builds from it the dirty_pages list (a hash), opens tables and maps them to
    2982                 :    their 2-byte IDs, recreates transactions (not real TRNs though).
    2983                 : 
    2984                 :    @return LSN from where in the log the REDO phase should start
    2985                 :      @retval LSN_ERROR error
    2986                 :      @retval other     ok
    2987                 : */
    2988                 : 
    2989                 : static LSN parse_checkpoint_record(LSN lsn)
    2990               0 : {
    2991                 :   ulong i;
    2992                 :   ulonglong nb_dirty_pages;
    2993                 :   TRANSLOG_HEADER_BUFFER rec;
    2994                 :   TRANSLOG_ADDRESS start_address;
    2995                 :   int len;
    2996                 :   uint nb_active_transactions, nb_committed_transactions, nb_tables;
    2997                 :   uchar *ptr;
    2998                 :   LSN minimum_rec_lsn_of_active_transactions, minimum_rec_lsn_of_dirty_pages;
    2999                 :   struct st_dirty_page *next_dirty_page_in_pool;
    3000                 : 
    3001               0 :   tprint(tracef, "Loading data from checkpoint record at LSN (%lu,0x%lx)\n",
    3002                 :          LSN_IN_PARTS(lsn));
    3003               0 :   if ((len= translog_read_record_header(lsn, &rec)) == RECHEADER_READ_ERROR)
    3004                 :   {
    3005               0 :     tprint(tracef, "Cannot find checkpoint record where it should be\n");
    3006               0 :     return LSN_ERROR;
    3007                 :   }
    3008                 : 
    3009               0 :   enlarge_buffer(&rec);
    3010               0 :   if (log_record_buffer.str == NULL ||
    3011                 :       translog_read_record(rec.lsn, 0, rec.record_length,
    3012                 :                            log_record_buffer.str, NULL) !=
    3013                 :       rec.record_length)
    3014                 :   {
    3015               0 :     eprint(tracef, "Failed to read record");
    3016               0 :     return LSN_ERROR;
    3017                 :   }
    3018                 : 
    3019               0 :   ptr= log_record_buffer.str;
    3020               0 :   start_address= lsn_korr(ptr);
    3021               0 :   ptr+= LSN_STORE_SIZE;
    3022               0 :   tprint(tracef, "Checkpoint record has start_horizon at (%lu,0x%lx)\n",
    3023                 :          LSN_IN_PARTS(start_address));
    3024                 : 
    3025                 :   /* transactions */
    3026               0 :   nb_active_transactions= uint2korr(ptr);
    3027               0 :   ptr+= 2;
    3028               0 :   tprint(tracef, "%u active transactions\n", nb_active_transactions);
    3029               0 :   minimum_rec_lsn_of_active_transactions= lsn_korr(ptr);
    3030               0 :   ptr+= LSN_STORE_SIZE;
    3031               0 :   max_long_trid= transid_korr(ptr);
    3032               0 :   ptr+= TRANSID_SIZE;
    3033                 : 
    3034                 :   /*
    3035                 :     how much brain juice and discussions there was to come to writing this
    3036                 :     line. It may make start_address slightly decrease (only by the time it
    3037                 :     takes to write one or a few rows, roughly).
    3038                 :   */
    3039               0 :   tprint(tracef, "Checkpoint record has min_rec_lsn of active transactions"
    3040                 :          " at (%lu,0x%lx)\n",
    3041                 :          LSN_IN_PARTS(minimum_rec_lsn_of_active_transactions));
    3042               0 :   set_if_smaller(start_address, minimum_rec_lsn_of_active_transactions);
    3043                 : 
    3044               0 :   for (i= 0; i < nb_active_transactions; i++)
    3045                 :   {
    3046               0 :     uint16 sid= uint2korr(ptr);
    3047                 :     TrID long_id;
    3048                 :     LSN undo_lsn, first_undo_lsn;
    3049               0 :     ptr+= 2;
    3050               0 :     long_id= uint6korr(ptr);
    3051               0 :     ptr+= 6;
    3052               0 :     DBUG_ASSERT(sid > 0 && long_id > 0);
    3053               0 :     undo_lsn= lsn_korr(ptr);
    3054               0 :     ptr+= LSN_STORE_SIZE;
    3055               0 :     first_undo_lsn= lsn_korr(ptr);
    3056               0 :     ptr+= LSN_STORE_SIZE;
    3057               0 :     new_transaction(sid, long_id, undo_lsn, first_undo_lsn);
    3058                 :   }
    3059               0 :   nb_committed_transactions= uint4korr(ptr);
    3060               0 :   ptr+= 4;
    3061               0 :   tprint(tracef, "%lu committed transactions\n",
    3062                 :          (ulong)nb_committed_transactions);
    3063                 :   /* no purging => committed transactions are not important */
    3064               0 :   ptr+= (6 + LSN_STORE_SIZE) * nb_committed_transactions;
    3065                 : 
    3066                 :   /* tables  */
    3067               0 :   nb_tables= uint4korr(ptr);
    3068               0 :   ptr+= 4;
    3069               0 :   tprint(tracef, "%u open tables\n", nb_tables);
    3070               0 :   for (i= 0; i< nb_tables; i++)
    3071                 :   {
    3072                 :     char name[FN_REFLEN];
    3073                 :     LSN first_log_write_lsn;
    3074                 :     uint name_len;
    3075               0 :     uint16 sid= uint2korr(ptr);
    3076               0 :     ptr+= 2;
    3077               0 :     DBUG_ASSERT(sid > 0);
    3078               0 :     first_log_write_lsn= lsn_korr(ptr);
    3079               0 :     ptr+= LSN_STORE_SIZE;
    3080               0 :     name_len= strlen((char *)ptr) + 1;
    3081               0 :     strmake(name, (char *)ptr, sizeof(name)-1);
    3082               0 :     ptr+= name_len;
    3083               0 :     if (new_table(sid, name, first_log_write_lsn))
    3084               0 :       return LSN_ERROR;
    3085                 :   }
    3086                 : 
    3087                 :   /* dirty pages */
    3088               0 :   nb_dirty_pages= uint8korr(ptr);
    3089                 : 
    3090                 :   /* Ensure casts later will not loose significant bits. */
    3091               0 :   DBUG_ASSERT((nb_dirty_pages <= SIZE_T_MAX/sizeof(struct st_dirty_page)) &&
    3092                 :               (nb_dirty_pages <= ULONG_MAX));
    3093                 : 
    3094               0 :   ptr+= 8;
    3095               0 :   tprint(tracef, "%lu dirty pages\n", (ulong) nb_dirty_pages);
    3096               0 :   if (hash_init(&all_dirty_pages, &my_charset_bin, (ulong)nb_dirty_pages,
    3097                 :                 offsetof(struct st_dirty_page, file_and_page_id),
    3098                 :                 sizeof(((struct st_dirty_page *)NULL)->file_and_page_id),
    3099                 :                 NULL, NULL, 0))
    3100               0 :     return LSN_ERROR;
    3101               0 :   dirty_pages_pool=
    3102                 :     (struct st_dirty_page *)my_malloc((size_t)nb_dirty_pages *
    3103                 :                                       sizeof(struct st_dirty_page),
    3104                 :                                       MYF(MY_WME));
    3105               0 :   if (unlikely(dirty_pages_pool == NULL))
    3106               0 :     return LSN_ERROR;
    3107               0 :   next_dirty_page_in_pool= dirty_pages_pool;
    3108               0 :   minimum_rec_lsn_of_dirty_pages= LSN_MAX;
    3109               0 :   for (i= 0; i < nb_dirty_pages ; i++)
    3110                 :   {
    3111                 :     pgcache_page_no_t page_id;
    3112                 :     LSN rec_lsn;
    3113                 :     uint32 is_index;
    3114               0 :     uint16 table_id= uint2korr(ptr);
    3115               0 :     ptr+= 2;
    3116               0 :     is_index= ptr[0];
    3117               0 :     ptr++;
    3118               0 :     page_id= page_korr(ptr);
    3119               0 :     ptr+= PAGE_STORE_SIZE;
    3120               0 :     rec_lsn= lsn_korr(ptr);
    3121               0 :     ptr+= LSN_STORE_SIZE;
    3122               0 :     if (new_page((is_index << 16) | table_id,
    3123                 :                  page_id, rec_lsn, next_dirty_page_in_pool++))
    3124               0 :       return LSN_ERROR;
    3125               0 :     set_if_smaller(minimum_rec_lsn_of_dirty_pages, rec_lsn);
    3126                 :   }
    3127                 :   /* after that, there will be no insert/delete into the hash */
    3128                 :   /*
    3129                 :     sanity check on record (did we screw up with all those "ptr+=", did the
    3130                 :     checkpoint write code and checkpoint read code go out of sync?).
    3131                 :   */
    3132               0 :   if (ptr != (log_record_buffer.str + log_record_buffer.length))
    3133                 :   {
    3134               0 :     eprint(tracef, "checkpoint record corrupted\n");
    3135               0 :     return LSN_ERROR;
    3136                 :   }
    3137                 : 
    3138                 :   /*
    3139                 :     start_address is now from where the dirty pages list can be ignored.
    3140                 :     Find LSN higher or equal to this TRANSLOG_ADDRESS, suitable for
    3141                 :     translog_read_record() functions.
    3142                 :   */
    3143               0 :   start_address= checkpoint_start=
    3144                 :     translog_next_LSN(start_address, LSN_IMPOSSIBLE);
    3145               0 :   tprint(tracef, "Checkpoint record start_horizon now adjusted to"
    3146                 :          " LSN (%lu,0x%lx)\n", LSN_IN_PARTS(start_address));
    3147               0 :   if (checkpoint_start == LSN_IMPOSSIBLE)
    3148                 :   {
    3149                 :     /*
    3150                 :       There must be a problem, as our checkpoint record exists and is >= the
    3151                 :       address which is stored in its first bytes, which is >= start_address.
    3152                 :     */
    3153               0 :     return LSN_ERROR;
    3154                 :   }
    3155                 :   /* now, where the REDO phase should start reading log: */
    3156               0 :   tprint(tracef, "Checkpoint has min_rec_lsn of dirty pages at"
    3157                 :          " LSN (%lu,0x%lx)\n", LSN_IN_PARTS(minimum_rec_lsn_of_dirty_pages));
    3158               0 :   set_if_smaller(start_address, minimum_rec_lsn_of_dirty_pages);
    3159               0 :   DBUG_PRINT("info",
    3160                 :              ("checkpoint_start: (%lu,0x%lx) start_address: (%lu,0x%lx)",
    3161                 :               LSN_IN_PARTS(checkpoint_start), LSN_IN_PARTS(start_address)));
    3162               0 :   return start_address;
    3163                 : }
    3164                 : 
    3165                 : 
    3166                 : static int new_page(uint32 fileid, pgcache_page_no_t pageid, LSN rec_lsn,
    3167                 :                     struct st_dirty_page *dirty_page)
    3168               0 : {
    3169                 :   /* serves as hash key */
    3170               0 :   dirty_page->file_and_page_id= (((uint64)fileid) << 40) | pageid;
    3171               0 :   dirty_page->rec_lsn= rec_lsn;
    3172               0 :   return my_hash_insert(&all_dirty_pages, (uchar *)dirty_page);
    3173                 : }
    3174                 : 
    3175                 : 
    3176                 : static int close_all_tables(void)
    3177             310 : {
    3178             310 :   int error= 0;
    3179             310 :   uint count= 0;
    3180                 :   LIST *list_element, *next_open;
    3181                 :   MARIA_HA *info;
    3182                 :   TRANSLOG_ADDRESS addr;
    3183             310 :   DBUG_ENTER("close_all_tables");
    3184                 : 
    3185             310 :   pthread_mutex_lock(&THR_LOCK_maria);
    3186             310 :   if (maria_open_list == NULL)
    3187             307 :     goto end;
    3188             307 :   tprint(tracef, "Closing all tables\n");
    3189             307 :   if (tracef != stdout)
    3190                 :   {
    3191               7 :     if (recovery_message_printed == REC_MSG_NONE)
    3192               0 :       print_preamble();
    3193               7 :     for (count= 0, list_element= maria_open_list ;
    3194               7 :          list_element ; count++, (list_element= list_element->next))
    3195                 :       ;
    3196               7 :     fprintf(stderr, "tables to flush:");
    3197               7 :     recovery_message_printed= REC_MSG_FLUSH;
    3198                 :   }
    3199                 :   /*
    3200                 :     Since the end of end_of_redo_phase(), we may have written new records
    3201                 :     (if UNDO phase ran)  and thus the state is newer than at
    3202                 :     end_of_redo_phase(), we need to bump is_of_horizon again.
    3203                 :   */
    3204             307 :   addr= translog_get_horizon();
    3205             614 :   for (list_element= maria_open_list ; ; list_element= next_open)
    3206                 :   {
    3207             614 :     if (recovery_message_printed == REC_MSG_FLUSH)
    3208                 :     {
    3209              14 :       fprintf(stderr, " %u", count--);
    3210              14 :       fflush(stderr);
    3211                 :     }
    3212             614 :     if (list_element == NULL)
    3213             307 :       break;
    3214             307 :     next_open= list_element->next;
    3215             307 :     info= (MARIA_HA*)list_element->data;
    3216             307 :     pthread_mutex_unlock(&THR_LOCK_maria); /* ok, UNDO phase not online yet */
    3217                 :     /*
    3218                 :       Tables which we see here are exactly those which were open at time of
    3219                 :       crash. They might have open_count>0 as Checkpoint maybe flushed their
    3220                 :       state while they were used. As Recovery corrected them, don't alarm the
    3221                 :       user, don't ask for a table check:
    3222                 :     */
    3223             307 :     info->s->state.open_count= 0;
    3224             307 :     prepare_table_for_close(info, addr);
    3225             307 :     error|= maria_close(info);
    3226             307 :     pthread_mutex_lock(&THR_LOCK_maria);
    3227             307 :   }
    3228             310 : end:
    3229             310 :   pthread_mutex_unlock(&THR_LOCK_maria);
    3230             310 :   DBUG_RETURN(error);
    3231                 : }
    3232                 : 
    3233                 : 
    3234                 : /**
    3235                 :    @brief Close all table instances with a certain name which are present in
    3236                 :    all_tables.
    3237                 : 
    3238                 :    @param  name                Name of table
    3239                 :    @param  addr                Log address passed to prepare_table_for_close()
    3240                 : */
    3241                 : 
    3242                 : static my_bool close_one_table(const char *name, TRANSLOG_ADDRESS addr)
    3243             307 : {
    3244             307 :   my_bool res= 0;
    3245                 :   /* There are no other threads using the tables, so we don't need any locks */
    3246                 :   struct st_table_for_recovery *internal_table, *end;
    3247             307 :   for (internal_table= all_tables, end= internal_table + SHARE_ID_MAX + 1;
    3248        20120166 :        internal_table < end ;
    3249        20119552 :        internal_table++)
    3250                 :   {
    3251        20119552 :     MARIA_HA *info= internal_table->info;
    3252        20119552 :     if ((info != NULL) && !strcmp(info->s->open_file_name.str, name))
    3253                 :     {
    3254               0 :       prepare_table_for_close(info, addr);
    3255               0 :       if (maria_close(info))
    3256               0 :         res= 1;
    3257               0 :       internal_table->info= NULL;
    3258                 :     }
    3259                 :   }
    3260             307 :   return res;
    3261                 : }
    3262                 : 
    3263                 : 
    3264                 : /**
    3265                 :    Temporarily disables logging for this table.
    3266                 : 
    3267                 :    If that makes the log incomplete, writes a LOGREC_INCOMPLETE_LOG to the log
    3268                 :    to warn log readers.
    3269                 : 
    3270                 :    @param  info            table
    3271                 :    @param  log_incomplete  if that disabling makes the log incomplete
    3272                 : 
    3273                 :    @note for example in the REDO phase we disable logging but that does not
    3274                 :    make the log incomplete.
    3275                 : */
    3276                 : 
    3277                 : void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
    3278                 :                                        my_bool log_incomplete)
    3279             647 : {
    3280             647 :   MARIA_SHARE *share= info->s;
    3281             647 :   DBUG_ENTER("_ma_tmp_disable_logging_for_table");
    3282             647 :   if (log_incomplete)
    3283                 :   {
    3284                 :     uchar log_data[FILEID_STORE_SIZE];
    3285                 :     LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
    3286                 :     LSN lsn;
    3287               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
    3288               0 :     log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
    3289               0 :     translog_write_record(&lsn, LOGREC_INCOMPLETE_LOG,
    3290                 :                           &dummy_transaction_object, info,
    3291                 :                           (translog_size_t) sizeof(log_data),
    3292                 :                           TRANSLOG_INTERNAL_PARTS + 1, log_array,
    3293                 :                           log_data, NULL);
    3294                 :   }
    3295                 : 
    3296                 :   /* if we disabled before writing the record, record wouldn't reach log */
    3297             647 :   share->now_transactional= FALSE;
    3298                 : 
    3299                 :   /*
    3300                 :     Reset state pointers. This is needed as in ALTER table we may do
    3301                 :     commit fllowed by _ma_renable_logging_for_table and then
    3302                 :     info->state may point to a state that was deleted by
    3303                 :     _ma_trnman_end_trans_hook()
    3304                 :    */
    3305             647 :   share->state.common= *info->state;
    3306             647 :   info->state= &share->state.common;
    3307             647 :   info->switched_transactional= TRUE;
    3308                 : 
    3309                 :   /*
    3310                 :     Some code in ma_blockrec.c assumes a trn even if !now_transactional but in
    3311                 :     this case it only reads trn->rec_lsn, which has to be LSN_IMPOSSIBLE and
    3312                 :     should be now. info->trn may be NULL in maria_chk.
    3313                 :   */
    3314             647 :   if (info->trn == NULL)
    3315             575 :     info->trn= &dummy_transaction_object;
    3316             647 :   DBUG_ASSERT(info->trn->rec_lsn == LSN_IMPOSSIBLE);
    3317             647 :   share->page_type= PAGECACHE_PLAIN_PAGE;
    3318                 :   /* Functions below will pick up now_transactional and change callbacks */
    3319             647 :   _ma_set_data_pagecache_callbacks(&info->dfile, share);
    3320             647 :   _ma_set_index_pagecache_callbacks(&share->kfile, share);
    3321             647 :   _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
    3322             647 :   DBUG_VOID_RETURN;
    3323                 : }
    3324                 : 
    3325                 : 
    3326                 : /**
    3327                 :    Re-enables logging for a table which had it temporarily disabled.
    3328                 : 
    3329                 :    Only the thread which disabled logging is allowed to reenable it. Indeed,
    3330                 :    re-enabling logging affects all open instances, one must have exclusive
    3331                 :    access to the table to do that. In practice, the one which disables has
    3332                 :    such access.
    3333                 : 
    3334                 :    @param  info            table
    3335                 :    @param  flush_pages     if function needs to flush pages first
    3336                 : */
    3337                 : 
    3338                 : my_bool _ma_reenable_logging_for_table(MARIA_HA *info, my_bool flush_pages)
    3339            1629 : {
    3340            1629 :   MARIA_SHARE *share= info->s;
    3341            1629 :   DBUG_ENTER("_ma_reenable_logging_for_table");
    3342                 : 
    3343            1629 :   if (share->now_transactional == share->base.born_transactional ||
    3344                 :       !info->switched_transactional)
    3345            1054 :     DBUG_RETURN(0);
    3346             575 :   info->switched_transactional= FALSE;
    3347                 : 
    3348             575 :   if ((share->now_transactional= share->base.born_transactional))
    3349                 :   {
    3350             575 :     share->page_type= PAGECACHE_LSN_PAGE;
    3351                 : 
    3352                 :     /*
    3353                 :       Copy state information that where updated while the table was used
    3354                 :       in not transactional mode
    3355                 :     */
    3356             575 :     _ma_copy_nontrans_state_information(info);
    3357             575 :     _ma_reset_history(info->s);
    3358                 : 
    3359             575 :     if (flush_pages)
    3360                 :     {
    3361                 :       /*
    3362                 :         We are going to change callbacks; if a page is flushed at this moment
    3363                 :         this can cause race conditions, that's one reason to flush pages
    3364                 :         now. Other reasons: a checkpoint could be running and miss pages; the
    3365                 :         pages have type PAGECACHE_PLAIN_PAGE which should not remain. As
    3366                 :         there are no REDOs for pages, them, bitmaps and the state also have to
    3367                 :         be flushed and synced.
    3368                 :       */
    3369               0 :       if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
    3370                 :                                 FLUSH_RELEASE, FLUSH_RELEASE) ||
    3371                 :           _ma_state_info_write(share,
    3372                 :                                MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
    3373                 :                                MA_STATE_INFO_WRITE_LOCK) ||
    3374                 :           _ma_sync_table_files(info))
    3375               0 :         DBUG_RETURN(1);
    3376                 :     }
    3377             575 :     else if (!maria_in_recovery)
    3378                 :     {
    3379                 :       /*
    3380                 :         Except in Recovery, we mustn't leave dirty pages (see comments above).
    3381                 :         Note that this does not verify that the state was flushed, but hey.
    3382                 :       */
    3383             192 :       pagecache_file_no_dirty_page(share->pagecache, &info->dfile);
    3384             192 :       pagecache_file_no_dirty_page(share->pagecache, &share->kfile);
    3385                 :     }
    3386             575 :     _ma_set_data_pagecache_callbacks(&info->dfile, share);
    3387             575 :     _ma_set_index_pagecache_callbacks(&share->kfile, share);
    3388             575 :     _ma_bitmap_set_pagecache_callbacks(&share->bitmap.file, share);
    3389                 :     /*
    3390                 :       info->trn was not changed in the disable/enable combo, so that it's
    3391                 :       still usable in this kind of combination:
    3392                 :       external_lock;
    3393                 :       start_bulk_insert; # table is empty, disables logging
    3394                 :       end_bulk_insert;   # enables logging
    3395                 :       start_bulk_insert; # table is not empty, logging stays
    3396                 :                          # so rows insertion needs the real trn.
    3397                 :       as happens during row-based replication on the slave.
    3398                 :     */
    3399                 :   }
    3400             575 :   DBUG_RETURN(0);
    3401                 : }
    3402                 : 
    3403                 : 
    3404                 : static void print_redo_phase_progress(TRANSLOG_ADDRESS addr)
    3405         1890318 : {
    3406                 :   static uint end_logno= FILENO_IMPOSSIBLE, percentage_printed= 0;
    3407                 :   static ulong end_offset;
    3408                 :   static ulonglong initial_remainder= ~(ulonglong) 0;
    3409                 : 
    3410                 :   uint cur_logno;
    3411                 :   ulong cur_offset;
    3412                 :   ulonglong local_remainder;
    3413                 :   uint percentage_done;
    3414                 : 
    3415         1890318 :   if (tracef == stdout)
    3416           85410 :     return;
    3417           85410 :   if (recovery_message_printed == REC_MSG_NONE)
    3418                 :   {
    3419               7 :     print_preamble();
    3420               7 :     fprintf(stderr, "recovered pages: 0%%");
    3421               7 :     fflush(stderr);
    3422               7 :     procent_printed= 1;
    3423               7 :     recovery_message_printed= REC_MSG_REDO;
    3424                 :   }
    3425           85410 :   if (end_logno == FILENO_IMPOSSIBLE)
    3426                 :   {
    3427               7 :     LSN end_addr= translog_get_horizon();
    3428               7 :     end_logno= LSN_FILE_NO(end_addr);
    3429               7 :     end_offset= LSN_OFFSET(end_addr);
    3430                 :   }
    3431           85410 :   cur_logno= LSN_FILE_NO(addr);
    3432           85410 :   cur_offset= LSN_OFFSET(addr);
    3433           85410 :   local_remainder= (cur_logno == end_logno) ? (end_offset - cur_offset) :
    3434                 :     (((longlong)log_file_size) - cur_offset +
    3435                 :      max(end_logno - cur_logno - 1, 0) * ((longlong)log_file_size) +
    3436                 :      end_offset);
    3437           85410 :   if (initial_remainder == (ulonglong)(-1))
    3438               7 :     initial_remainder= local_remainder;
    3439           85410 :   percentage_done= (uint) ((initial_remainder - local_remainder) * ULL(100) /
    3440                 :                            initial_remainder);
    3441           85410 :   if ((percentage_done - percentage_printed) >= 10)
    3442                 :   {
    3443              63 :     percentage_printed= percentage_done;
    3444              63 :     fprintf(stderr, " %u%%", percentage_done);
    3445              63 :     fflush(stderr);
    3446              63 :     procent_printed= 1;
    3447                 :   }
    3448                 : }
    3449                 : 
    3450                 : 
    3451                 : #ifdef MARIA_EXTERNAL_LOCKING
    3452                 : #error Marias Checkpoint and Recovery are really not ready for it
    3453                 : #endif
    3454                 : 
    3455                 : /*
    3456                 : Recovery of the state :  how it works
    3457                 : =====================================
    3458                 : 
    3459                 : Here we ignore Checkpoints for a start.
    3460                 : 
    3461                 : The state (MARIA_HA::MARIA_SHARE::MARIA_STATE_INFO) is updated in
    3462                 : memory frequently (at least at every row write/update/delete) but goes
    3463                 : to disk at few moments: maria_close() when closing the last open
    3464                 : instance, and a few rare places like CHECK/REPAIR/ALTER
    3465                 : (non-transactional tables also do it at maria_lock_database() but we
    3466                 : needn't cover them here).
    3467                 : 
    3468                 : In case of crash, state on disk is likely to be older than what it was
    3469                 : in memory, the REDO phase needs to recreate the state as it was in
    3470                 : memory at the time of crash. When we say Recovery here we will always
    3471                 : mean "REDO phase".
    3472                 : 
    3473                 : For example MARIA_STATUS_INFO::records (count of records). It is updated at
    3474                 : the end of every row write/update/delete/delete_all. When Recovery sees the
    3475                 : sign of such row operation (UNDO or REDO), it may need to update the records'
    3476                 : count if that count does not reflect that operation (is older). How to know
    3477                 : the age of the state compared to the log record: every time the state
    3478                 : goes to disk at runtime, its member "is_of_horizon" is updated to the
    3479                 : current end-of-log horizon. So Recovery just needs to compare is_of_horizon
    3480                 : and the record's LSN to know if it should modify "records".
    3481                 : 
    3482                 : Other operations like ALTER TABLE DISABLE KEYS update the state but
    3483                 : don't write log records, thus the REDO phase cannot repeat their
    3484                 : effect on the state in case of crash. But we make them sync the state
    3485                 : as soon as they have finished. This reduces the window for a problem.
    3486                 : 
    3487                 : It looks like only one thread at a time updates the state in memory or
    3488                 : on disk. We assume that the upper level (normally MySQL) has protection
    3489                 : against issuing HA_EXTRA_(FORCE_REOPEN|PREPARE_FOR_RENAME) so that these
    3490                 : are not issued while there are any running transactions on the given table.
    3491                 : If this is not done, we may write a corrupted state to disk.
    3492                 : 
    3493                 : With checkpoints
    3494                 : ================
    3495                 : 
    3496                 : Checkpoint module needs to read the state in memory and write it to
    3497                 : disk. This may happen while some other thread is modifying the state
    3498                 : in memory or on disk. Checkpoint thus may be reading changing data, it
    3499                 : needs a mutex to not have it corrupted, and concurrent modifiers of
    3500                 : the state need that mutex too for the same reason.
    3501                 : "records" is modified for every row write/update/delete, we don't want
    3502                 : to add a mutex lock/unlock there. So we re-use the mutex lock/unlock
    3503                 : which is already present in these moments, namely the log's mutex which is
    3504                 : taken when UNDO_ROW_INSERT|UPDATE|DELETE is written: we update "records" in
    3505                 : under-log-mutex hooks when writing these records (thus "records" is
    3506                 : not updated at the end of maria_write/update/delete() anymore).
    3507                 : Thus Checkpoint takes the log's lock and can read "records" from
    3508                 : memory an write it to disk and release log's lock.
    3509                 : We however want to avoid having the disk write under the log's
    3510                 : lock. So it has to be under another mutex, natural choice is
    3511                 : intern_lock (as Checkpoint needs it anyway to read MARIA_SHARE::kfile,
    3512                 : and as maria_close() takes it too). All state writes to disk are
    3513                 : changed to be protected with intern_lock.
    3514                 : So Checkpoint takes intern_lock, log's lock, reads "records" from
    3515                 : memory, releases log's lock, updates is_of_horizon and writes "records" to
    3516                 : disk, release intern_lock.
    3517                 : In practice, not only "records" needs to be written but the full
    3518                 : state. So, Checkpoint reads the full state from memory. Some other
    3519                 : thread may at this moment be modifying in memory some pieces of the
    3520                 : state which are not protected by the lock's log (see ma_extra.c
    3521                 : HA_EXTRA_NO_KEYS), and Checkpoint would be reading a corrupted state
    3522                 : from memory; to guard against that we extend the intern_lock-zone to
    3523                 : changes done to the state in memory by HA_EXTRA_NO_KEYS et al, and
    3524                 : also any change made in memory to create_rename_lsn/state_is_of_horizon.
    3525                 : Last, we don't want in Checkpoint to do
    3526                 :  log lock; read state from memory; release log lock;
    3527                 : for each table, it may hold the log's lock too much in total.
    3528                 : So, we instead do
    3529                 :  log lock; read N states from memory; release log lock;
    3530                 : Thus, the sequence above happens outside of any intern_lock.
    3531                 : But this re-introduces the problem that some other thread may be changing the
    3532                 : state in memory and on disk under intern_lock, without log's lock, like
    3533                 : HA_EXTRA_NO_KEYS, while we read the N states. However, when Checkpoint later
    3534                 : comes to handling the table under intern_lock, which is serialized with
    3535                 : HA_EXTRA_NO_KEYS, it can see that is_of_horizon is higher then when the state
    3536                 : was read from memory under log's lock, and thus can decide to not flush the
    3537                 : obsolete state it has, knowing that the other thread flushed a more recent
    3538                 : state already. If on the other hand is_of_horizon is not higher, the read
    3539                 : state is current and can be flushed. So we have a per-table sequence:
    3540                 :  lock intern_lock; test if is_of_horizon is higher than when we read the state
    3541                 :  under log's lock; if no then flush the read state to disk.
    3542                 : */
    3543                 : 
    3544                 : /* some comments and pseudo-code which we keep for later */
    3545                 : #if 0
    3546                 :   /*
    3547                 :     MikaelR suggests: support checkpoints during REDO phase too: do checkpoint
    3548                 :     after a certain amount of log records have been executed. This helps
    3549                 :     against repeated crashes. Those checkpoints could not be user-requested
    3550                 :     (as engine is not communicating during the REDO phase), so they would be
    3551                 :     automatic: this changes the original assumption that we don't write to the
    3552                 :     log while in the REDO phase, but why not. How often should we checkpoint?
    3553                 :   */
    3554                 : 
    3555                 :   /*
    3556                 :     We want to have two steps:
    3557                 :     engine->recover_with_max_memory();
    3558                 :     next_engine->recover_with_max_memory();
    3559                 :     engine->init_with_normal_memory();
    3560                 :     next_engine->init_with_normal_memory();
    3561                 :     So: in recover_with_max_memory() allocate a giant page cache, do REDO
    3562                 :     phase, then all page cache is flushed and emptied and freed (only retain
    3563                 :     small structures like TM): take full checkpoint, which is useful if
    3564                 :     next engine crashes in its recovery the next second.
    3565                 :     Destroy all shares (maria_close()), then at init_with_normal_memory() we
    3566                 :     do this:
    3567                 :   */
    3568                 : 
    3569                 :   /**** UNDO PHASE *****/
    3570                 : 
    3571                 :   /*
    3572                 :     Launch one or more threads to do the background rollback. Don't wait for
    3573                 :     them to complete their rollback (background rollback; for debugging, we
    3574                 :     can have an option which waits). Set a counter (total_of_rollback_threads)
    3575                 :     to the number of threads to lauch.
    3576                 : 
    3577                 :     Note that InnoDB's rollback-in-background works as long as InnoDB is the
    3578                 :     last engine to recover, otherwise MySQL will refuse new connections until
    3579                 :     the last engine has recovered so it's not "background" from the user's
    3580                 :     point of view. InnoDB is near top of sys_table_types so all others
    3581                 :     (e.g. BDB) recover after it... So it's really "online rollback" only if
    3582                 :     InnoDB is the only engine.
    3583                 :   */
    3584                 : 
    3585                 :   /* wake up delete/update handler */
    3586                 :   /* tell the TM that it can now accept new transactions */
    3587                 : 
    3588                 :   /*
    3589                 :     mark that checkpoint requests are now allowed.
    3590                 :   */
    3591                 : #endif

Generated by: LTP GCOV extension version 1.4