← Back to team overview

maria-developers team mailing list archive

Rev 2741: Group commit for maria engine. in file:///Users/bell/maria/bzr/work-maria-5.2-groupcommit2/

 

At file:///Users/bell/maria/bzr/work-maria-5.2-groupcommit2/

------------------------------------------------------------
revno: 2741
revision-id: sanja@xxxxxxxxxxxx-20100212131228-bgxli0wfybhjkvg9
parent: sergii@xxxxxxxxx-20100212084731-b5jst7oxhzp251pg
committer: sanja@xxxxxxxxxxxx
branch nick: work-maria-5.2-groupcommit2
timestamp: Fri 2010-02-12 15:12:28 +0200
message:
  Group commit for maria engine.
=== added file 'mysql-test/suite/maria/r/group_commit.result'
--- a/mysql-test/suite/maria/r/group_commit.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/maria/r/group_commit.result	2010-02-12 13:12:28 +0000
@@ -0,0 +1,17 @@
+drop table if exists t1;
+create table t1 (a int);
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 0;
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 100;
+SET GLOBAL maria_group_commit="HARD";
+SET GLOBAL maria_group_commit_interval= 0;
+SET GLOBAL maria_group_commit="HARD";
+SET GLOBAL maria_group_commit_interval= 100;
+SET GLOBAL maria_group_commit="SOFT";
+SET GLOBAL maria_group_commit_interval= 0;
+SET GLOBAL maria_group_commit="SOFT";
+SET GLOBAL maria_group_commit_interval= 100;
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 0;
+drop table t1;

=== modified file 'mysql-test/suite/maria/r/maria3.result'
--- a/mysql-test/suite/maria/r/maria3.result	2009-09-18 01:04:43 +0000
+++ b/mysql-test/suite/maria/r/maria3.result	2010-02-12 13:12:28 +0000
@@ -306,6 +306,8 @@
 maria_block_size	8192
 maria_checkpoint_interval	30
 maria_force_start_after_recovery_failures	0
+maria_group_commit	none
+maria_group_commit_interval	0
 maria_log_file_size	4294959104
 maria_log_purge_type	immediate
 maria_max_sort_file_size	9223372036853727232
@@ -328,6 +330,7 @@
 Maria_pagecache_reads	#
 Maria_pagecache_write_requests	#
 Maria_pagecache_writes	#
+Maria_transaction_log_syncs	#
 create table t1 (b char(0));
 insert into t1 values(NULL),("");
 select length(b) from t1;

=== added file 'mysql-test/suite/maria/t/group_commit.test'
--- a/mysql-test/suite/maria/t/group_commit.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/maria/t/group_commit.test	2010-02-12 13:12:28 +0000
@@ -0,0 +1,71 @@
+# Test different ways of syncing (mostly syntax)
+
+--disable_warnings
+drop table if exists t1;
+--enable_warnings
+
+create table t1 (a int);
+
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 0;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 100;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="HARD";
+SET GLOBAL maria_group_commit_interval= 0;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="HARD";
+SET GLOBAL maria_group_commit_interval= 100;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="SOFT";
+SET GLOBAL maria_group_commit_interval= 0;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="SOFT";
+SET GLOBAL maria_group_commit_interval= 100;
+--disable_query_log
+let $num = 5000;
+while ($num)
+{
+  insert into t1 values (1);
+  dec $num;
+}
+--enable_query_log
+SET GLOBAL maria_group_commit="NONE";
+SET GLOBAL maria_group_commit_interval= 0;
+drop table t1;

=== added directory 'randgen'
=== added directory 'randgen/conf'
=== added file 'randgen/conf/maria_group_commit.yy'
--- a/randgen/conf/maria_group_commit.yy	1970-01-01 00:00:00 +0000
+++ b/randgen/conf/maria_group_commit.yy	2010-02-12 13:12:28 +0000
@@ -0,0 +1,181 @@
+# test of group commit switching
+
+query:
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  select | insert | update| delete |
+  change_group_commit | change_interval;
+
+
+select:
+	SELECT select_item FROM join where order_by limit;
+
+select_item:
+	* | X . _field ;
+
+join:
+	_table AS X |
+	_table AS X LEFT JOIN _table AS Y ON ( X . _field = Y . _field ) ;
+
+where:
+	|
+	WHERE X . _field < value |
+	WHERE X . _field > value |
+	WHERE X . _field = value ;
+
+where_delete:
+	|
+	WHERE _field < value |
+	WHERE _field > value |
+	WHERE _field = value ;
+
+order_by:
+	| ORDER BY X . _field ;
+
+limit:
+	| LIMIT _digit ;
+
+insert:
+	INSERT INTO _table ( _field , _field ) VALUES ( value , value ) ;
+
+update:
+	UPDATE _table AS X SET _field = value where order_by limit ;
+
+delete:
+	DELETE FROM _table where_delete LIMIT _digit ;
+
+value:
+	' _letter ' | _digit | _date | _datetime | _time | _english ;
+
+change_group_commit:
+        SET GLOBAL MARIA_GROUP_COMMIT=none_soft_hard;
+
+none_soft_hard:
+        NONE | SOFT | HARD;
+
+change_interval:
+     set_interval | set_interval | set_interval | set_interval |
+     drop_interval;
+
+set_interval:
+        SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=_tinyint_unsigned;
+
+drop_interval:
+        SET GLOBAL MARIA_GROUP_COMMIT_INTERVAL=0;

=== modified file 'storage/maria/ha_maria.cc'
--- a/storage/maria/ha_maria.cc	2010-02-10 19:06:24 +0000
+++ b/storage/maria/ha_maria.cc	2010-02-12 13:12:28 +0000
@@ -102,22 +102,40 @@
   array_elements(maria_translog_purge_type_names) - 1, "",
   maria_translog_purge_type_names, NULL
 };
+
+/* transactional log directory sync */
 const char *maria_sync_log_dir_names[]=
 {
   "NEVER", "NEWFILE", "ALWAYS", NullS
 };
-
 TYPELIB maria_sync_log_dir_typelib=
 {
   array_elements(maria_sync_log_dir_names) - 1, "",
   maria_sync_log_dir_names, NULL
 };
 
+/* transactional log group commit */
+const char *maria_group_commit_names[]=
+{
+  "none", "hard", "soft", NullS
+};
+TYPELIB maria_group_commit_typelib=
+{
+  array_elements(maria_group_commit_names) - 1, "",
+  maria_group_commit_names, NULL
+};
+
 /** Interval between background checkpoints in seconds */
 static ulong checkpoint_interval;
 static void update_checkpoint_interval(MYSQL_THD thd,
                                        struct st_mysql_sys_var *var,
                                        void *var_ptr, const void *save);
+static void update_maria_group_commit(MYSQL_THD thd,
+                                      struct st_mysql_sys_var *var,
+                                      void *var_ptr, const void *save);
+static void update_maria_group_commit_interval(MYSQL_THD thd,
+                                           struct st_mysql_sys_var *var,
+                                           void *var_ptr, const void *save);
 /** After that many consecutive recovery failures, remove logs */
 static ulong force_start_after_recovery_failures;
 static void update_log_file_size(MYSQL_THD thd,
@@ -164,6 +182,24 @@
        NULL, update_log_file_size, TRANSLOG_FILE_SIZE,
        TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE);
 
+static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit,
+       PLUGIN_VAR_RQCMDARG,
+       "Specifies maria group commit mode. "
+       "Possible values are \"none\" (no group commit), "
+       "\"hard\" (with waiting to actual commit), "
+       "\"soft\" (no wait for commit (DANGEROUS!!!))",
+       NULL, update_maria_group_commit,
+       TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib);
+
+static MYSQL_SYSVAR_ULONG(group_commit_interval, maria_group_commit_interval,
+       PLUGIN_VAR_RQCMDARG,
+       "Interval between commite in microseconds (1/1000000c)."
+       " 0 stands for no waiting"
+       " for other threads to come and do a commit in \"hard\" mode and no"
+       " sync()/commit at all in \"soft\" mode.  Option has only an effect"
+       " if maria_group_commit is used",
+       NULL, update_maria_group_commit_interval, 0, 0, UINT_MAX, 1);
+
 static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type,
        PLUGIN_VAR_RQCMDARG,
        "Specifies how maria transactional log will be purged. "
@@ -3278,6 +3314,8 @@
   MYSQL_SYSVAR(block_size),
   MYSQL_SYSVAR(checkpoint_interval),
   MYSQL_SYSVAR(force_start_after_recovery_failures),
+  MYSQL_SYSVAR(group_commit),
+  MYSQL_SYSVAR(group_commit_interval),
   MYSQL_SYSVAR(page_checksum),
   MYSQL_SYSVAR(log_dir_path),
   MYSQL_SYSVAR(log_file_size),
@@ -3309,6 +3347,92 @@
 }
 
 /**
+   @brief Updates group commit mode
+*/
+
+static void update_maria_group_commit(MYSQL_THD thd,
+                                      struct st_mysql_sys_var *var,
+                                      void *var_ptr, const void *save)
+{
+  ulong value= (ulong)*((long *)var_ptr);
+  DBUG_ENTER("update_maria_group_commit");
+  DBUG_PRINT("enter", ("old value: %lu  new value %lu  rate %lu",
+                       value, (ulong)(*(long *)save),
+                       maria_group_commit_interval));
+  /* old value */
+  switch (value) {
+  case TRANSLOG_GCOMMIT_NONE:
+    break;
+  case TRANSLOG_GCOMMIT_HARD:
+    translog_hard_group_commit(FALSE);
+    break;
+  case TRANSLOG_GCOMMIT_SOFT:
+    translog_soft_sync(FALSE);
+    if (maria_group_commit_interval)
+      translog_soft_sync_end();
+    break;
+  default:
+    DBUG_ASSERT(0); /* impossible */
+  }
+  value= *(ulong *)var_ptr= (ulong)(*(long *)save);
+  translog_sync();
+  /* new value */
+  switch (value) {
+  case TRANSLOG_GCOMMIT_NONE:
+    break;
+  case TRANSLOG_GCOMMIT_HARD:
+    translog_hard_group_commit(TRUE);
+    break;
+  case TRANSLOG_GCOMMIT_SOFT:
+    translog_soft_sync(TRUE);
+    /* variable change made under global lock so we can just read it */
+    if (maria_group_commit_interval)
+      translog_soft_sync_start();
+    break;
+  default:
+    DBUG_ASSERT(0); /* impossible */
+  }
+  DBUG_VOID_RETURN;
+}
+
+/**
+   @brief Updates group commit interval
+*/
+
+static void update_maria_group_commit_interval(MYSQL_THD thd,
+                                               struct st_mysql_sys_var *var,
+                                               void *var_ptr, const void *save)
+{
+  ulong new_value= (ulong)*((long *)save);
+  ulong *value_ptr= (ulong*) var_ptr;
+  DBUG_ENTER("update_maria_group_commit_interval");
+  DBUG_PRINT("enter", ("old value: %lu  new value %lu  group commit %lu",
+                        *value_ptr, new_value, maria_group_commit));
+
+  /* variable change made under global lock so we can just read it */
+  switch (maria_group_commit) {
+    case TRANSLOG_GCOMMIT_NONE:
+      *value_ptr= new_value;
+      translog_set_group_commit_interval(new_value);
+      break;
+    case TRANSLOG_GCOMMIT_HARD:
+      *value_ptr= new_value;
+      translog_set_group_commit_interval(new_value);
+      break;
+    case TRANSLOG_GCOMMIT_SOFT:
+      if (*value_ptr)
+        translog_soft_sync_end();
+      translog_set_group_commit_interval(new_value);
+      if ((*value_ptr= new_value))
+        translog_soft_sync_start();
+      break;
+    default:
+      DBUG_ASSERT(0); /* impossible */
+  }
+  DBUG_VOID_RETURN;
+}
+
+/**
    @brief Updates the transaction log file limit.
 */
 
@@ -3330,6 +3454,7 @@
   {"Maria_pagecache_reads",              (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG},
   {"Maria_pagecache_write_requests",     (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG},
   {"Maria_pagecache_writes",             (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG},
+  {"Maria_transaction_log_syncs",        (char*) &translog_syncs, SHOW_LONGLONG},
   {NullS, NullS, SHOW_LONG}
 };
 

=== modified file 'storage/maria/ma_init.c'
--- a/storage/maria/ma_init.c	2008-10-09 20:03:54 +0000
+++ b/storage/maria/ma_init.c	2010-02-12 13:12:28 +0000
@@ -82,6 +82,11 @@
     maria_inited= maria_multi_threaded= FALSE;
     ft_free_stopwords();
     ma_checkpoint_end();
+    if (translog_status == TRANSLOG_OK)
+    {
+      translog_soft_sync_end();
+      translog_sync();
+    }
     if ((trid= trnman_get_max_trid()) > max_trid_in_control_file)
     {
       /*

=== modified file 'storage/maria/ma_loghandler.c'
--- a/storage/maria/ma_loghandler.c	2010-01-06 21:27:53 +0000
+++ b/storage/maria/ma_loghandler.c	2010-02-12 13:12:28 +0000
@@ -18,6 +18,7 @@
 #include "ma_blockrec.h" /* for some constants and in-write hooks */
 #include "ma_key_recover.h" /* For some in-write hooks */
 #include "ma_checkpoint.h"
+#include "ma_servicethread.h"
 
 /*
   On Windows, neither my_open() nor my_sync() work for directories.
@@ -47,6 +48,15 @@
 #include <m_ctype.h>
 #endif
 
+/** @brief protects checkpoint_in_progress */
+static pthread_mutex_t LOCK_soft_sync;
+/** @brief for killing the background checkpoint thread */
+static pthread_cond_t  COND_soft_sync;
+/** @brief control structure for checkpoint background thread */
+static MA_SERVICE_THREAD_CONTROL soft_sync_control=
+  {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync};
+
+
 /* transaction log file descriptor */
 typedef struct st_translog_file
 {
@@ -124,10 +134,24 @@
   /* Previous buffer offset to detect it flush finish */
   TRANSLOG_ADDRESS prev_buffer_offset;
   /*
+    If the buffer was forced to close it save value of its horizon
+    otherwise LSN_IMPOSSIBLE
+  */
+  TRANSLOG_ADDRESS pre_force_close_horizon;
+  /*
      How much is written (or will be written when copy_to_buffer_in_progress
      become 0) to this buffer
   */
   translog_size_t size;
+  /*
+     When moving from one log buffer to another, we write the last of the
+     previous buffer to file and then move to start using the new log
+     buffer.  In the case of a part filed last page, this page is not moved
+     to the start of the new buffer but instead we set the 'skip_data'
+     variable to tell us how much data at the beginning of the buffer is not
+     relevant.
+  */
+  uint skipped_data;
   /* File handler for this buffer */
   TRANSLOG_FILE *file;
   /* Threads which are waiting for buffer filling/freeing */
@@ -304,6 +328,7 @@
   */
   pthread_mutex_t log_flush_lock;
   pthread_cond_t log_flush_cond;
+  pthread_cond_t new_goal_cond;
 
   /* Protects changing of headers of finished files (max_lsn) */
   pthread_mutex_t file_header_lock;
@@ -344,13 +369,39 @@
 
 ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE;
 ulong log_file_size= TRANSLOG_FILE_SIZE;
+/* sync() of log files directory mode */
 ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE;
+ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE;
+ulong maria_group_commit_interval= 0;
 
 /* Marker for end of log */
 static uchar end_of_log= 0;
 #define END_OF_LOG &end_of_log
+/**
+  Switch for "soft" sync (no real sync() but periodical sync by service
+  thread)
+*/
+static volatile my_bool soft_sync= FALSE;
+/**
+  Switch for "hard" group commit mode
+*/
+static volatile my_bool hard_group_commit= FALSE;
+/**
+  File numbers interval which have to be sync()
+*/
+static uint32 soft_sync_min= 0;
+static uint32 soft_sync_max= 0;
+static uint32 soft_need_sync= 1;
+/**
+  stores interval in microseconds
+*/
+static uint32 group_commit_wait= 0;
 
 enum enum_translog_status translog_status= TRANSLOG_UNINITED;
+ulonglong translog_syncs= 0; /* Number of sync()s */
+
+/* time of last flush */
+static ulonglong flush_start= 0;
 
 /* chunk types */
 #define TRANSLOG_CHUNK_LSN   0x00      /* 0 chunk refer as LSN (head or tail */
@@ -980,12 +1031,17 @@
 static TRANSLOG_FILE *get_current_logfile()
 {
   TRANSLOG_FILE *file;
+  DBUG_ENTER("get_current_logfile");
   rw_rdlock(&log_descriptor.open_files_lock);
+  DBUG_PRINT("info", ("max_file: %lu  min_file: %lu  open_files: %lu",
+                      (ulong) log_descriptor.max_file,
+                      (ulong) log_descriptor.min_file,
+                      (ulong) log_descriptor.open_files.elements));
   DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
               log_descriptor.open_files.elements);
   file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
   rw_unlock(&log_descriptor.open_files_lock);
-  return (file);
+  DBUG_RETURN(file);
 }
 
 uchar	NEAR maria_trans_file_magic[]=
@@ -1069,6 +1125,7 @@
 static my_bool translog_max_lsn_to_header(File file, LSN lsn)
 {
   uchar lsn_buff[LSN_STORE_SIZE];
+  my_bool rc;
   DBUG_ENTER("translog_max_lsn_to_header");
   DBUG_PRINT("enter", ("File descriptor: %ld  "
                        "lsn: (%lu,0x%lx)",
@@ -1077,11 +1134,17 @@
 
   lsn_store(lsn_buff, lsn);
 
-  DBUG_RETURN(my_pwrite(file, lsn_buff,
-                        LSN_STORE_SIZE,
-                        (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
-                        log_write_flags) != 0 ||
-              my_sync(file, MYF(MY_WME)) != 0);
+  rc= (my_pwrite(file, lsn_buff,
+                 LSN_STORE_SIZE,
+                 (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
+                 log_write_flags) != 0 ||
+       my_sync(file, MYF(MY_WME)) != 0);
+  /*
+    We should not increase counter in case of error above, but it is so
+    unlikely that we can ignore this case
+  */
+  translog_syncs++;
+  DBUG_RETURN(rc);
 }
 
 
@@ -1423,7 +1486,9 @@
 static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
 {
   DBUG_ENTER("translog_buffer_init");
-  buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+  buffer->pre_force_close_horizon=
+    buffer->prev_last_lsn= buffer->last_lsn=
+    LSN_IMPOSSIBLE;
   DBUG_PRINT("info", ("last_lsn  and prev_last_lsn set to 0  buffer: 0x%lx",
                       (ulong) buffer));
 
@@ -1435,6 +1500,7 @@
   memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
   /* Buffer size */
   buffer->size= 0;
+  buffer->skipped_data= 0;
   /* cond of thread which is waiting for buffer filling */
   if (pthread_cond_init(&buffer->waiting_filling_buffer, 0))
     DBUG_RETURN(1);
@@ -1489,7 +1555,10 @@
     TODO: sync only we have changed the log
   */
   if (!file->is_sync)
+  {
     rc= my_sync(file->handler.file, MYF(MY_WME));
+    translog_syncs++;
+  }
   rc|= my_close(file->handler.file, MYF(MY_WME));
   my_free(file, MYF(0));
   return test(rc);
@@ -2044,7 +2113,8 @@
               (ulong) LSN_OFFSET(log_descriptor.horizon),
               (ulong) LSN_OFFSET(log_descriptor.horizon)));
   DBUG_ASSERT(buffer_no == buffer->buffer_no);
-  buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+  buffer->pre_force_close_horizon=
+    buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
   DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0  buffer: 0x%lx",
                       (ulong) buffer));
   buffer->offset= log_descriptor.horizon;
@@ -2052,6 +2122,7 @@
   buffer->file= get_current_logfile();
   buffer->overlay= 0;
   buffer->size= 0;
+  buffer->skipped_data= 0;
   translog_cursor_init(cursor, buffer, buffer_no);
   DBUG_PRINT("info", ("file: #%ld (%d)  init cursor #%u: 0x%lx  "
                       "chaser: %d  Size: %lu (%lu)",
@@ -2523,6 +2594,7 @@
   TRANSLOG_ADDRESS offset= buffer->offset;
   TRANSLOG_FILE *file= buffer->file;
   uint8 ver= buffer->ver;
+  uint skipped_data;
   DBUG_ENTER("translog_buffer_flush");
   DBUG_PRINT("enter",
              ("Buffer: #%u 0x%lx file: %d  offset: (%lu,0x%lx)  size: %lu",
@@ -2557,6 +2629,8 @@
     disk
   */
   file= buffer->file;
+  skipped_data= buffer->skipped_data;
+  DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE);
   for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
        i < buffer->size;
        i+= TRANSLOG_PAGE_SIZE, pg++)
@@ -2573,13 +2647,16 @@
     DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
     if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
       DBUG_RETURN(1);
-    if (pagecache_inject(log_descriptor.pagecache,
+    if (pagecache_write_part(log_descriptor.pagecache,
                         &file->handler, pg, 3,
                         buffer->buffer + i,
                         PAGECACHE_PLAIN_PAGE,
                         PAGECACHE_LOCK_LEFT_UNLOCKED,
-                        PAGECACHE_PIN_LEFT_UNPINNED, 0,
-                        LSN_IMPOSSIBLE))
+                        PAGECACHE_PIN_LEFT_UNPINNED,
+                        PAGECACHE_WRITE_DONE, 0,
+                        LSN_IMPOSSIBLE,
+                        skipped_data,
+                        TRANSLOG_PAGE_SIZE - skipped_data))
     {
       DBUG_PRINT("error",
                  ("Can't write page (%lu,0x%lx) to pagecache, error: %d",
@@ -2589,10 +2666,12 @@
       translog_stop_writing();
       DBUG_RETURN(1);
     }
+    skipped_data= 0;
   }
   file->is_sync= 0;
-  if (my_pwrite(file->handler.file, buffer->buffer,
-                buffer->size, LSN_OFFSET(buffer->offset),
+  if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data,
+                buffer->size - buffer->skipped_data,
+                LSN_OFFSET(buffer->offset) + buffer->skipped_data,
                 log_write_flags))
   {
     DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
@@ -2985,6 +3064,7 @@
           uchar *from, *table= NULL;
           int is_last_unfinished_page;
           uint last_protected_sector= 0;
+          uint skipped_data= curr_buffer->skipped_data;
           TRANSLOG_FILE file_copy;
           uint8 ver= curr_buffer->ver;
           translog_wait_for_writers(curr_buffer);
@@ -2997,7 +3077,38 @@
           }
           DBUG_ASSERT(LSN_FILE_NO(addr) ==  LSN_FILE_NO(curr_buffer->offset));
           from= curr_buffer->buffer + (addr - curr_buffer->offset);
-          memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
+          if (skipped_data && addr == curr_buffer->offset)
+          {
+            /*
+              We read page part of which is not present in buffer,
+              so we should read absent part from file (page cache actually)
+            */
+            file= get_logfile_by_number(file_no);
+            DBUG_ASSERT(file != NULL);
+            /*
+              it's ok to not lock the page because:
+                - The log handler has it's own page cache.
+                - There is only one thread that can access the log
+                cache at a time
+            */
+            if (!(buffer= pagecache_read(log_descriptor.pagecache,
+                                         &file->handler,
+                                         LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
+                                         3, buffer,
+                                         PAGECACHE_PLAIN_PAGE,
+                                         PAGECACHE_LOCK_LEFT_UNLOCKED,
+                                         NULL)))
+              DBUG_RETURN(NULL);
+          }
+          else
+            skipped_data= 0;  /* Read after skipped in buffer data */
+          /*
+            Now we have correct data in buffer up to 'skipped_data'. The
+            following memcpy() will move the data from the internal buffer
+            that was not yet on disk.
+          */
+          memcpy(buffer + skipped_data, from + skipped_data,
+                 TRANSLOG_PAGE_SIZE - skipped_data);
           /*
             We can use copy then in translog_page_validator() because it
             do not put it permanently somewhere.
@@ -3291,6 +3402,7 @@
   uint32 next_page_offset, page_rest;
   uint32 i;
   File fd;
+  int rc;
   TRANSLOG_VALIDATOR_DATA data;
   char path[FN_REFLEN];
   uchar page_buff[TRANSLOG_PAGE_SIZE];
@@ -3316,14 +3428,19 @@
                      TRANSLOG_PAGE_SIZE);
   page_rest= next_page_offset - LSN_OFFSET(addr);
   memset(page_buff, TRANSLOG_FILLER, page_rest);
-  if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
-      ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
-        (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
-                                log_write_flags)) ||
-        my_sync(fd, MYF(MY_WME))) |
-       my_close(fd, MYF(MY_WME))) ||
-      (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
-       sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
+  rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
+       ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
+         (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
+                                 log_write_flags)) ||
+         my_sync(fd, MYF(MY_WME)))));
+  translog_syncs++;
+  rc|= (fd > 0 && my_close(fd, MYF(MY_WME)));
+  if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS)
+  {
+    rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+    translog_syncs++;
+  }
+  if (rc)
     DBUG_RETURN(1);
 
   /* fix the horizon */
@@ -3483,7 +3600,10 @@
   my_bool version_changed= 0;
   DBUG_ENTER("translog_init_with_table");
 
+  translog_syncs= 0;
+  flush_start= 0;
   id_to_share= NULL;
+
   log_descriptor.directory_fd= -1;
   log_descriptor.is_everything_flushed= 1;
   log_descriptor.flush_in_progress= 0;
@@ -3511,6 +3631,7 @@
       pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
                          MY_MUTEX_INIT_FAST) ||
       pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
+      pthread_cond_init(&log_descriptor.new_goal_cond, 0) ||
       my_rwlock_init(&log_descriptor.open_files_lock,
                      NULL) ||
       my_init_dynamic_array(&log_descriptor.open_files,
@@ -3912,7 +4033,6 @@
     log_descriptor.flushed= log_descriptor.horizon;
   log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
   log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
-  log_descriptor.previous_flush_horizon= log_descriptor.horizon;
   /*
     Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
     address of the next LSN and we want indicate that all LSNs that are
@@ -3995,6 +4115,10 @@
             It is beginning of the log => there is no LSNs in the log =>
             There is no harm in leaving it "as-is".
           */
+          log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+          DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+                              LSN_IN_PARTS(log_descriptor.
+                                           previous_flush_horizon)));
           DBUG_RETURN(0);
         }
         file_no--;
@@ -4070,6 +4194,9 @@
       translog_free_record_header(&rec);
     }
   }
+  log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+  DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+                      LSN_IN_PARTS(log_descriptor.previous_flush_horizon)));
   DBUG_RETURN(0);
 err:
   ma_message_no_user(0, "log initialization failed");
@@ -4157,6 +4284,7 @@
   pthread_mutex_destroy(&log_descriptor.log_flush_lock);
   pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
   pthread_cond_destroy(&log_descriptor.log_flush_cond);
+  pthread_cond_destroy(&log_descriptor.new_goal_cond);
   rwlock_destroy(&log_descriptor.open_files_lock);
   delete_dynamic(&log_descriptor.open_files);
   delete_dynamic(&log_descriptor.unfinished_files);
@@ -6885,11 +7013,11 @@
 {
   translog_size_t res;
   DBUG_ENTER("translog_read_record_header_from_buffer");
-  DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
-  DBUG_ASSERT(translog_status == TRANSLOG_OK ||
-              translog_status == TRANSLOG_READONLY);
   DBUG_PRINT("info", ("page byte: 0x%x  offset: %u",
                       (uint) page[page_offset], (uint) page_offset));
+  DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
+  DBUG_ASSERT(translog_status == TRANSLOG_OK ||
+              translog_status == TRANSLOG_READONLY);
   buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
   buff->short_trid= uint2korr(page + page_offset + 1);
   DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
@@ -7356,27 +7484,27 @@
                        "Buffer addr: (%lu,0x%lx)  "
                        "Page addr: (%lu,0x%lx)  "
                        "size: %lu (%lu)  Pg: %u  left: %u  in progress %u",
-                       (uint) log_descriptor.bc.buffer_no,
-                       (ulong) log_descriptor.bc.buffer,
-                       LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
+                       (uint) old_buffer_no,
+                       (ulong) old_buffer,
+                       LSN_IN_PARTS(old_buffer->offset),
                        (ulong) LSN_FILE_NO(log_descriptor.horizon),
                        (ulong) (LSN_OFFSET(log_descriptor.horizon) -
                                 log_descriptor.bc.current_page_fill),
-                       (ulong) log_descriptor.bc.buffer->size,
+                       (ulong) old_buffer->size,
                        (ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
                                 buffer->buffer),
                        (uint) log_descriptor.bc.current_page_fill,
                        (uint) left,
-                       (uint) log_descriptor.bc.buffer->
+                       (uint) old_buffer->
                        copy_to_buffer_in_progress));
   translog_lock_assert_owner();
   LINT_INIT(current_page_fill);
-  new_buff_beginning= log_descriptor.bc.buffer->offset;
-  new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
+  new_buff_beginning= old_buffer->offset;
+  new_buff_beginning+= old_buffer->size; /* increase offset */
 
   DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
   DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
-              LSN_FILE_NO(log_descriptor.bc.buffer->offset));
+              LSN_FILE_NO(old_buffer->offset));
   translog_check_cursor(&log_descriptor.bc);
   DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
   if (left)
@@ -7387,18 +7515,20 @@
     */
     DBUG_PRINT("info", ("left: %u", (uint) left));
 
+    old_buffer->pre_force_close_horizon=
+      old_buffer->offset + old_buffer->size;
     /* decrease offset */
     new_buff_beginning-= log_descriptor.bc.current_page_fill;
     current_page_fill= log_descriptor.bc.current_page_fill;
 
     memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left);
-    log_descriptor.bc.buffer->size+= left;
+    old_buffer->size+= left;
     DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx  "
                         "Size: %lu",
-                        (uint) log_descriptor.bc.buffer->buffer_no,
-                        (ulong) log_descriptor.bc.buffer,
-                        (ulong) log_descriptor.bc.buffer->size));
-    DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
+                        (uint) old_buffer->buffer_no,
+                        (ulong) old_buffer,
+                        (ulong) old_buffer->size));
+    DBUG_ASSERT(old_buffer->buffer_no ==
                 log_descriptor.bc.buffer_no);
   }
   else
@@ -7509,11 +7639,21 @@
 
   if (left)
   {
-    /*
-      TODO: do not copy beginning of the page if we have no CRC or sector
-      checks on
-    */
-    memcpy(new_buffer->buffer, data, current_page_fill);
+    if (log_descriptor.flags &
+        (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION))
+      memcpy(new_buffer->buffer, data, current_page_fill);
+    else
+    {
+      /*
+        This page header does not change if we add more data to the page so
+        we can not copy it and will not overwrite later
+      */
+      new_buffer->skipped_data= current_page_fill;
+#ifndef DBUG_OFF
+      memset(new_buffer->buffer, 0xa5, current_page_fill);
+#endif
+      DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE);
+    }
   }
   old_buffer->next_buffer_offset= new_buffer->offset;
   translog_buffer_lock(new_buffer);
@@ -7561,6 +7701,7 @@
   {
     log_descriptor.next_pass_max_lsn= lsn;
     log_descriptor.max_lsn_requester= pthread_self();
+    pthread_cond_broadcast(&log_descriptor.new_goal_cond);
   }
   while (flush_no == log_descriptor.flush_no)
   {
@@ -7572,66 +7713,78 @@
 
 
 /**
-  @brief Flush the log up to given LSN (included)
-
-  @param  lsn            log record serial number up to which (inclusive)
-                         the log has to be flushed
-
-  @return Operation status
+  @brief sync() range of files (inclusive) and directory (by request)
+
+  @param min             min internal file number to flush
+  @param max             max internal file number to flush
+  @param sync_dir        need sync directory
+
+  return Operation status
     @retval 0      OK
     @retval 1      Error
-
-*/
-
-my_bool translog_flush(TRANSLOG_ADDRESS lsn)
-{
-  LSN sent_to_disk= LSN_IMPOSSIBLE;
-  TRANSLOG_ADDRESS flush_horizon;
-  uint fn, i;
+*/
+
+static my_bool translog_sync_files(uint32 min, uint32 max,
+                                   my_bool sync_dir)
+{
+  uint fn;
+  my_bool rc= 0;
+  ulonglong flush_interval;
+  DBUG_ENTER("translog_sync_files");
+  DBUG_PRINT("info", ("min: %lu  max: %lu  sync dir: %d",
+                      (ulong) min, (ulong) max, (int) sync_dir));
+  DBUG_ASSERT(min <= max);
+
+  flush_interval= group_commit_wait;
+  if (flush_interval)
+    flush_start= my_micro_time();
+  for (fn= min; fn <= max; fn++)
+  {
+    TRANSLOG_FILE *file= get_logfile_by_number(fn);
+    DBUG_ASSERT(file != NULL);
+    if (!file->is_sync)
+    {
+      if (my_sync(file->handler.file, MYF(MY_WME)))
+      {
+        rc= 1;
+        translog_stop_writing();
+        DBUG_RETURN(rc);
+      }
+      translog_syncs++;
+      file->is_sync= 1;
+    }
+  }
+
+  if (sync_dir)
+  {
+    if (!(rc= sync_dir(log_descriptor.directory_fd,
+                       MYF(MY_WME | MY_IGNORE_BADFD))))
+      translog_syncs++;
+  }
+
+  DBUG_RETURN(rc);
+}
+
+
+/*
+  @brief Flushes buffers with LSNs in them less or equal address <lsn>
+
+  @param lsn             address up to which all LSNs should be flushed,
+                         can be reset to real last LSN address
+  @parem sent_to_disk    returns 'sent to disk' position
+  @param flush_horizon   returns horizon of the flush
+
+  @note About terminology see comment to translog_flush().
+*/
+
+void translog_flush_buffers(TRANSLOG_ADDRESS *lsn,
+                               TRANSLOG_ADDRESS *sent_to_disk,
+                               TRANSLOG_ADDRESS *flush_horizon)
+{
   dirty_buffer_mask_t dirty_buffer_mask;
+  uint i;
   uint8 last_buffer_no, start_buffer_no;
-  my_bool rc= 0;
-  DBUG_ENTER("translog_flush");
-  DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
-  DBUG_ASSERT(translog_status == TRANSLOG_OK ||
-              translog_status == TRANSLOG_READONLY);
-  LINT_INIT(sent_to_disk);
-
-  pthread_mutex_lock(&log_descriptor.log_flush_lock);
-  DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
-                      LSN_IN_PARTS(log_descriptor.flushed)));
-  if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
-  {
-    pthread_mutex_unlock(&log_descriptor.log_flush_lock);
-    DBUG_RETURN(0);
-  }
-  if (log_descriptor.flush_in_progress)
-  {
-    translog_flush_set_new_goal_and_wait(lsn);
-    if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
-    {
-      /* fix lsn if it was horizon */
-      if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
-          lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
-      translog_flush_wait_for_end(lsn);
-      pthread_mutex_unlock(&log_descriptor.log_flush_lock);
-      DBUG_RETURN(0);
-    }
-    log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
-  }
-  log_descriptor.flush_in_progress= 1;
-  flush_horizon= log_descriptor.previous_flush_horizon;
-  DBUG_PRINT("info", ("flush_in_progress is set"));
-  pthread_mutex_unlock(&log_descriptor.log_flush_lock);
-
-  translog_lock();
-  if (log_descriptor.is_everything_flushed)
-  {
-    DBUG_PRINT("info", ("everything is flushed"));
-    rc= (translog_status == TRANSLOG_READONLY);
-    translog_unlock();
-    goto out;
-  }
+  DBUG_ENTER("translog_flush_buffers");
 
   /*
     We will recheck information when will lock buffers one by
@@ -7656,15 +7809,15 @@
   /*
     if LSN up to which we have to flush bigger then maximum LSN of previous
     buffer and at least one LSN was saved in the current buffer (last_lsn !=
-    LSN_IMPOSSIBLE) then we better finish the current buffer.
+    LSN_IMPOSSIBLE) then we have to close the current buffer.
   */
-  if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
+  if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
       log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE)
   {
     struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
-    lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
+    *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
     DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)",
-               LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
+                        LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
     last_buffer_no= log_descriptor.bc.buffer_no;
     log_descriptor.is_everything_flushed= 1;
     translog_force_current_buffer_to_finish();
@@ -7676,8 +7829,10 @@
                      TRANSLOG_BUFFERS_NO);
     translog_unlock();
   }
-  sent_to_disk= translog_get_sent_to_disk();
-  if (cmp_translog_addr(lsn, sent_to_disk) > 0)
+
+  /* flush buffers */
+  *sent_to_disk= translog_get_sent_to_disk();
+  if (cmp_translog_addr(*lsn, *sent_to_disk) > 0)
   {
 
     DBUG_PRINT("info", ("Start buffer #: %u  last buffer #: %u",
@@ -7697,53 +7852,238 @@
                           LSN_IN_PARTS(buffer->last_lsn),
                           (buffer->file ?
                            "dirty" : "closed")));
-      if (buffer->prev_last_lsn <= lsn &&
+      if (buffer->prev_last_lsn <= *lsn &&
           buffer->file != NULL)
       {
-        DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
-        flush_horizon= buffer->offset + buffer->size;
+        DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size);
+        *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ?
+                         buffer->pre_force_close_horizon :
+                         buffer->offset + buffer->size);
+        /* pre_force_close_horizon is reset during new buffer start */
+        DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)",
+                            LSN_IN_PARTS(*flush_horizon)));
+        DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon);
+
         translog_buffer_flush(buffer);
       }
       translog_buffer_unlock(buffer);
       i= (i + 1) % TRANSLOG_BUFFERS_NO;
     } while (i != last_buffer_no);
-    sent_to_disk= translog_get_sent_to_disk();
-  }
-
-  /* sync files from previous flush till current one */
-  for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
-  {
-    TRANSLOG_FILE *file= get_logfile_by_number(fn);
-    DBUG_ASSERT(file != NULL);
-    if (!file->is_sync)
-    {
-      if (my_sync(file->handler.file, MYF(MY_WME)))
+    *sent_to_disk= translog_get_sent_to_disk();
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+/**
+  @brief Flush the log up to given LSN (included)
+
+  @param  lsn            log record serial number up to which (inclusive)
+                         the log has to be flushed
+
+  @return Operation status
+    @retval 0      OK
+    @retval 1      Error
+
+  @note
+
+  - Non group commit logic: Commits made in passes. Thread which started
+  flush first is performing actual flush, other threads sets new goal (LSN)
+  of the next pass (if it is maximum) and waits for the pass end or just
+  wait for the pass end.
+
+  - If hard group commit enabled and rate set to zero:
+  The first thread sends all changed buffers to disk. This is repeated
+  as long as there are new LSNs added. The process can not loop
+  forever because we have limited number of threads and they will wait
+  for the data to be synced.
+  Pseudo code:
+
+   do
+     send changed buffers to disk
+   while new_goal
+   sync
+
+  - If hard group commit switched ON and less than rate microseconds has
+  passed from last sync, then after buffers have been sent to disk
+  wait until rate microseconds has passed since last sync, do sync and return.
+  This ensures that if we call sync infrequently we don't do any waits.
+
+  - If soft group commit enabled everything works as with 'non group commit'
+  but the thread doesn't do any real sync(). If rate is not zero the
+  sync() will be performed by a service thread with the given rate
+  when needed (new LSN appears).
+
+  @note Terminology:
+  'sent to disk' means written to disk but not sync()ed,
+  'flushed' mean sent to disk and synced().
+*/
+
+my_bool translog_flush(TRANSLOG_ADDRESS lsn)
+{
+  struct timespec abstime;
+  ulonglong flush_interval;
+  ulonglong time_spent;
+  LSN sent_to_disk= LSN_IMPOSSIBLE;
+  TRANSLOG_ADDRESS flush_horizon;
+  my_bool rc= 0;
+  my_bool hgroup_commit_at_start;
+  DBUG_ENTER("translog_flush");
+  DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
+  DBUG_ASSERT(translog_status == TRANSLOG_OK ||
+              translog_status == TRANSLOG_READONLY);
+  LINT_INIT(sent_to_disk);
+
+  pthread_mutex_lock(&log_descriptor.log_flush_lock);
+  DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
+                      LSN_IN_PARTS(log_descriptor.flushed)));
+  if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
+  {
+    pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+    DBUG_RETURN(0);
+  }
+  if (log_descriptor.flush_in_progress)
+  {
+    translog_lock();
+    /* fix lsn if it was horizon */
+    if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
+      lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
+    translog_unlock();
+    translog_flush_set_new_goal_and_wait(lsn);
+    if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
+    {
+      /*
+        translog_flush_wait_for_end() release log_flush_lock while is
+        waiting then acquire it again
+      */
+      translog_flush_wait_for_end(lsn);
+      pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+      DBUG_RETURN(0);
+    }
+    log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+  }
+  log_descriptor.flush_in_progress= 1;
+  flush_horizon= log_descriptor.previous_flush_horizon;
+  DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)",
+                      LSN_IN_PARTS(flush_horizon)));
+  pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+  hgroup_commit_at_start= hard_group_commit;
+  if (hgroup_commit_at_start)
+    flush_interval= group_commit_wait;
+
+  translog_lock();
+  if (log_descriptor.is_everything_flushed)
+  {
+    DBUG_PRINT("info", ("everything is flushed"));
+    translog_unlock();
+    pthread_mutex_lock(&log_descriptor.log_flush_lock);
+    goto out;
+  }
+
+  for (;;)
+  {
+    /* Following function flushes buffers and makes translog_unlock() */
+    translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon);
+
+    if (!hgroup_commit_at_start)
+      break;  /* flush pass is ended */
+
+retest:
+    /*
+      We do not check time here because pthread_mutex_lock rarely takes
+      a lot of time so we can sacrifice a bit precision to performance
+      (taking into account that my_micro_time() might be expensive call).
+    */
+    if (flush_interval == 0)
+      break;  /* flush pass is ended */
+
+    pthread_mutex_lock(&log_descriptor.log_flush_lock);
+    if (log_descriptor.next_pass_max_lsn == LSN_IMPOSSIBLE)
+    {
+      if (flush_interval == 0 ||
+          (time_spent= (my_micro_time() - flush_start)) >= flush_interval)
       {
-        rc= 1;
-        translog_stop_writing();
-        sent_to_disk= LSN_IMPOSSIBLE;
-        goto out;
+        pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+        break;
       }
-      file->is_sync= 1;
-    }
-  }
-
-  if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
-      (LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
-       LSN_FILE_NO(flush_horizon) ||
-       ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
-        TRANSLOG_PAGE_SIZE) !=
-       ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
-    rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+      DBUG_PRINT("info", ("flush waits: %llu  interval: %llu  spent: %llu",
+                          flush_interval - time_spent,
+                          flush_interval, time_spent));
+      /* wait time or next goal */
+      set_timespec_nsec(abstime, flush_interval - time_spent);
+      pthread_cond_timedwait(&log_descriptor.new_goal_cond,
+                             &log_descriptor.log_flush_lock,
+                             &abstime);
+      pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+      DBUG_PRINT("info", ("retest conditions"));
+      goto retest;
+    }
+
+    /* take next goal */
+    lsn= log_descriptor.next_pass_max_lsn;
+    log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+    /* prevent other thread from continue */
+    log_descriptor.max_lsn_requester= pthread_self();
+    DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)",
+                        LSN_IN_PARTS(lsn)));
+    pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+    /* next flush pass */
+    DBUG_PRINT("info", ("next flush pass"));
+    translog_lock();
+  }
+
+  /*
+    sync() files from previous flush till current one
+  */
+  if (!soft_sync || hgroup_commit_at_start)
+  {
+    if ((rc=
+         translog_sync_files(LSN_FILE_NO(log_descriptor.flushed),
+                             LSN_FILE_NO(lsn),
+                             sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
+                             (LSN_FILE_NO(log_descriptor.
+                                          previous_flush_horizon) !=
+                              LSN_FILE_NO(flush_horizon) ||
+                              (LSN_OFFSET(log_descriptor.
+                                          previous_flush_horizon) /
+                               TRANSLOG_PAGE_SIZE) !=
+                              (LSN_OFFSET(flush_horizon) /
+                               TRANSLOG_PAGE_SIZE)))))
+    {
+      sent_to_disk= LSN_IMPOSSIBLE;
+      pthread_mutex_lock(&log_descriptor.log_flush_lock);
+      goto out;
+    }
+    /* keep values for soft sync() and forced sync() actual */
+    {
+      uint32 fileno= LSN_FILE_NO(lsn);
+      my_atomic_rwlock_wrlock(&soft_sync_rwl);
+      my_atomic_store32(&soft_sync_min, fileno);
+      my_atomic_store32(&soft_sync_max, fileno);
+      my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+    }
+  }
+  else
+  {
+    my_atomic_rwlock_wrlock(&soft_sync_rwl);
+    my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn));
+    my_atomic_store32(&soft_need_sync, 1);
+    my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+  }
+
+  DBUG_ASSERT(flush_horizon <= log_descriptor.horizon);
+
+  pthread_mutex_lock(&log_descriptor.log_flush_lock);
   log_descriptor.previous_flush_horizon= flush_horizon;
 out:
-  pthread_mutex_lock(&log_descriptor.log_flush_lock);
   if (sent_to_disk != LSN_IMPOSSIBLE)
     log_descriptor.flushed= sent_to_disk;
   log_descriptor.flush_in_progress= 0;
   log_descriptor.flush_no++;
   DBUG_PRINT("info", ("flush_in_progress is dropped"));
-  pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
+  pthread_mutex_unlock(&log_descriptor.log_flush_lock);
   pthread_cond_broadcast(&log_descriptor.log_flush_cond);
   DBUG_RETURN(rc);
 }
@@ -8113,6 +8453,8 @@
 my_bool translog_purge(TRANSLOG_ADDRESS low)
 {
   uint32 last_need_file= LSN_FILE_NO(low);
+  uint32 min_unsync;
+  int soft;
   TRANSLOG_ADDRESS horizon= translog_get_horizon();
   int rc= 0;
   DBUG_ENTER("translog_purge");
@@ -8120,12 +8462,26 @@
   DBUG_ASSERT(translog_status == TRANSLOG_OK ||
               translog_status == TRANSLOG_READONLY);
 
+  soft= soft_sync;
+  my_atomic_rwlock_wrlock(&soft_sync_rwl);
+  min_unsync= my_atomic_load32(&soft_sync_min);
+  my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+  DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync));
+  if (soft && min_unsync < last_need_file)
+  {
+    last_need_file= min_unsync;
+    DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file));
+  }
+
   pthread_mutex_lock(&log_descriptor.purger_lock);
+  DBUG_PRINT("info", ("last_lsn_checked file: %lu:",
+                      (ulong) log_descriptor.last_lsn_checked));
   if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
   {
     uint32 i;
     uint32 min_file= translog_first_file(horizon, 1);
     DBUG_ASSERT(min_file != 0); /* log is already started */
+    DBUG_PRINT("info", ("min_file:  %lu:",(ulong) min_file));
     for(i= min_file; i < last_need_file && rc == 0; i++)
     {
       LSN lsn= translog_get_file_max_lsn_stored(i);
@@ -8356,6 +8712,159 @@
 }
 
 
+
+/**
+  Sets soft sync mode
+
+  @param mode            TRUE if we need switch soft sync on else off
+*/
+
+void translog_soft_sync(my_bool mode)
+{
+  soft_sync= mode;
+}
+
+
+/**
+  Sets hard group commit
+
+  @param mode            TRUE if we need switch hard group commit on else off
+*/
+
+void translog_hard_group_commit(my_bool mode)
+{
+  hard_group_commit= mode;
+}
+
+
+/**
+  @brief forced log sync (used when we are switching modes)
+*/
+
+void translog_sync()
+{
+  uint32 max= get_current_logfile()->number;
+  uint32 min;
+  DBUG_ENTER("ma_translog_sync");
+
+  my_atomic_rwlock_rdlock(&soft_sync_rwl);
+  min= my_atomic_load32(&soft_sync_min);
+  my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+  if (!min)
+    min= max;
+
+  translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS);
+
+  DBUG_VOID_RETURN;
+}
+
+
+/**
+  @brief set rate for group commit
+
+  @param interval            interval to set.
+
+  @note We use this function with additional variable because have to
+  restart service thread with new value which we can't make inside changing
+  variable routine (update_maria_group_commit_interval)
+*/
+
+void translog_set_group_commit_interval(uint32 interval)
+{
+  DBUG_ENTER("translog_set_group_commit_interval");
+  group_commit_wait= interval;
+  DBUG_PRINT("info", ("wait: %llu",
+                      (ulonglong)group_commit_wait));
+  DBUG_VOID_RETURN;
+}
+
+
+/**
+  @brief syncing service thread
+*/
+
+static pthread_handler_t
+ma_soft_sync_background( void *arg __attribute__((unused)))
+{
+
+  my_thread_init();
+  {
+    DBUG_ENTER("ma_soft_sync_background");
+    for(;;)
+    {
+      ulonglong prev_loop= my_micro_time();
+      ulonglong time, sleep;
+      uint32 min, max, sync_request;
+      my_atomic_rwlock_rdlock(&soft_sync_rwl);
+      min= my_atomic_load32(&soft_sync_min);
+      max= my_atomic_load32(&soft_sync_max);
+      sync_request= my_atomic_load32(&soft_need_sync);
+      my_atomic_store32(&soft_sync_min, max);
+      my_atomic_store32(&soft_need_sync, 0);
+      my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+
+      sleep= group_commit_wait;
+      if (sync_request)
+        translog_sync_files(min, max, FALSE);
+      time= my_micro_time() - prev_loop;
+      if (time > sleep)
+        sleep= 0;
+      else
+        sleep-= time;
+      if (my_service_thread_sleep(&soft_sync_control, sleep))
+        break;
+    }
+    my_service_thread_signal_end(&soft_sync_control);
+    my_thread_end();
+    DBUG_RETURN(0);
+  }
+}
+
+
+/**
+  @brief Starts syncing thread
+*/
+
+int translog_soft_sync_start(void)
+{
+  pthread_t th;
+  int res= 0;
+  uint32 min, max;
+  DBUG_ENTER("translog_soft_sync_start");
+
+  /* check and init variables */
+  my_atomic_rwlock_rdlock(&soft_sync_rwl);
+  min= my_atomic_load32(&soft_sync_min);
+  max= my_atomic_load32(&soft_sync_max);
+  if (!max)
+    my_atomic_store32(&soft_sync_max, (max= get_current_logfile()->number));
+  if (!min)
+    my_atomic_store32(&soft_sync_min, max);
+  my_atomic_store32(&soft_need_sync, 1);
+  my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+
+  if (!(res= ma_service_thread_control_init(&soft_sync_control)))
+    if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL)))
+      soft_sync_control.status= THREAD_RUNNING;
+  DBUG_RETURN(res);
+}
+
+
+/**
+  @brief Stops syncing thread
+*/
+
+void  translog_soft_sync_end(void)
+{
+  DBUG_ENTER("translog_soft_sync_end");
+  if (soft_sync_control.inited)
+  {
+    ma_service_thread_control_end(&soft_sync_control);
+  }
+  DBUG_VOID_RETURN;
+}
+
+
 #ifdef MARIA_DUMP_LOG
 #include <my_getopt.h>
 extern void translog_example_table_init();

=== modified file 'storage/maria/ma_loghandler.h'
--- a/storage/maria/ma_loghandler.h	2009-01-15 22:25:53 +0000
+++ b/storage/maria/ma_loghandler.h	2010-02-12 13:12:28 +0000
@@ -342,6 +342,14 @@
   TRANSLOG_SHUTDOWN  /* going to shutdown the loghandler */
 };
 extern enum enum_translog_status translog_status;
+extern ulonglong translog_syncs; /* Number of sync()s */
+
+void translog_soft_sync(my_bool mode);
+void translog_hard_group_commit(my_bool mode);
+int translog_soft_sync_start(void);
+void  translog_soft_sync_end(void);
+void translog_sync();
+void translog_set_group_commit_interval(uint32 interval);
 
 /*
   all the rest added because of recovery; should we make
@@ -441,6 +449,14 @@
 
 typedef enum
 {
+  TRANSLOG_GCOMMIT_NONE,
+  TRANSLOG_GCOMMIT_HARD,
+  TRANSLOG_GCOMMIT_SOFT
+} enum_maria_group_commit;
+extern ulong maria_group_commit;
+extern ulong maria_group_commit_interval;
+typedef enum
+{
   TRANSLOG_PURGE_IMMIDIATE,
   TRANSLOG_PURGE_EXTERNAL,
   TRANSLOG_PURGE_ONDEMAND