maria-developers team mailing list archive
-
maria-developers team
-
Mailing list archive
-
Message #00489
bzr commit into Mariadb 5.2, with Maria 2.0:maria/5.2 branch (sanja:2725)
#At lp:maria/5.2
2725 sanja@xxxxxxxxxxxx 2009-07-07
Group commit and small optimisation of flush log (in case of page without CRC and sector protection) added. (for review)
modified:
mysql-test/suite/maria/r/maria3.result
storage/maria/ha_maria.cc
storage/maria/ma_init.c
storage/maria/ma_loghandler.c
storage/maria/ma_loghandler.h
per-file messages:
mysql-test/suite/maria/r/maria3.result
New variables added
storage/maria/ha_maria.cc
New variables for group commit and routines to control them added.
storage/maria/ma_init.c
Service thread stop (if it is present)
storage/maria/ma_loghandler.c
2 types of group commit added.
If page is not protetcted by CRC or sector protection (i.e. we do not change header when add data to the page) we do not copy beginning of the page during forcing the buffer where the page is last to close.
storage/maria/ma_loghandler.h
Routines to control group commit added.
=== modified file 'mysql-test/suite/maria/r/maria3.result'
--- a/mysql-test/suite/maria/r/maria3.result 2009-02-19 09:01:25 +0000
+++ b/mysql-test/suite/maria/r/maria3.result 2009-07-07 00:37:23 +0000
@@ -264,6 +264,8 @@ Variable_name Value
maria_block_size 8192
maria_checkpoint_interval 30
maria_force_start_after_recovery_failures 0
+maria_group_commit none
+maria_group_commit_rate 800
maria_log_file_size 4294959104
maria_log_purge_type immediate
maria_max_sort_file_size 9223372036853727232
@@ -285,6 +287,7 @@ Maria_pagecache_read_requests #
Maria_pagecache_reads #
Maria_pagecache_write_requests #
Maria_pagecache_writes #
+Maria_transaction_log_syncs #
create table t1 (b char(0));
insert into t1 values(NULL),("");
select length(b) from t1;
=== modified file 'storage/maria/ha_maria.cc'
--- a/storage/maria/ha_maria.cc 2009-05-19 09:28:05 +0000
+++ b/storage/maria/ha_maria.cc 2009-07-07 00:37:23 +0000
@@ -101,22 +101,40 @@ TYPELIB maria_translog_purge_type_typeli
array_elements(maria_translog_purge_type_names) - 1, "",
maria_translog_purge_type_names, NULL
};
+
+/* transactional log directory sync */
const char *maria_sync_log_dir_names[]=
{
"NEVER", "NEWFILE", "ALWAYS", NullS
};
-
TYPELIB maria_sync_log_dir_typelib=
{
array_elements(maria_sync_log_dir_names) - 1, "",
maria_sync_log_dir_names, NULL
};
+/* transactional log group commit */
+const char *maria_group_commit_names[]=
+{
+ "none", "hard", "soft", NullS
+};
+TYPELIB maria_group_commit_typelib=
+{
+ array_elements(maria_group_commit_names) - 1, "",
+ maria_group_commit_names, NULL
+};
+
/** Interval between background checkpoints in seconds */
static ulong checkpoint_interval;
static void update_checkpoint_interval(MYSQL_THD thd,
struct st_mysql_sys_var *var,
void *var_ptr, const void *save);
+static void update_maria_group_commit(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save);
+static void update_maria_group_commit_rate(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save);
/** After that many consecutive recovery failures, remove logs */
static ulong force_start_after_recovery_failures;
static void update_log_file_size(MYSQL_THD thd,
@@ -163,6 +181,24 @@ static MYSQL_SYSVAR_ULONG(log_file_size,
NULL, update_log_file_size, TRANSLOG_FILE_SIZE,
TRANSLOG_MIN_FILE_SIZE, 0xffffffffL, TRANSLOG_PAGE_SIZE);
+static MYSQL_SYSVAR_ENUM(group_commit, maria_group_commit,
+ PLUGIN_VAR_RQCMDARG,
+ "Specifies maria group commit mode. "
+ "Possible values are \"none\" (no group commit), "
+ "\"hard\" (with waiting to actual commit), "
+ "\"soft\" (no wait for commit (DANGEROUS!!!))",
+ NULL, update_maria_group_commit,
+ TRANSLOG_GCOMMIT_NONE, &maria_group_commit_typelib);
+
+static MYSQL_SYSVAR_ULONG(group_commit_rate, maria_group_commit_rate,
+ PLUGIN_VAR_RQCMDARG,
+ "Number of commits per 100 seconds. (in other words one commit for"
+ "every 100/maria_group_commit_rate second). 0 stands for no waiting"
+ "for other threads to come and do a commit in \"hard\" mode and no"
+ " sync()/commit at all in \"soft\" mode. Option has only an effect"
+ "if maria_group_commit is used",
+ NULL, update_maria_group_commit_rate, 800, 0, UINT_MAX, 1);
+
static MYSQL_SYSVAR_ENUM(log_purge_type, log_purge_type,
PLUGIN_VAR_RQCMDARG,
"Specifies how maria transactional log will be purged. "
@@ -3254,6 +3290,8 @@ static struct st_mysql_sys_var* system_v
MYSQL_SYSVAR(block_size),
MYSQL_SYSVAR(checkpoint_interval),
MYSQL_SYSVAR(force_start_after_recovery_failures),
+ MYSQL_SYSVAR(group_commit),
+ MYSQL_SYSVAR(group_commit_rate),
MYSQL_SYSVAR(page_checksum),
MYSQL_SYSVAR(log_dir_path),
MYSQL_SYSVAR(log_file_size),
@@ -3284,6 +3322,97 @@ static void update_checkpoint_interval(M
}
/**
+ @brief Updates group commit mode
+*/
+
+static void update_maria_group_commit(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong value= (ulong)*((long *)var_ptr);
+ DBUG_ENTER("update_maria_group_commit");
+ DBUG_PRINT("enter", ("old value: %lu new value %lu rate %lu",
+ value, (ulong)(*(long *)save),
+ maria_group_commit_rate));
+ /* old value */
+ switch (value) {
+ case TRANSLOG_GCOMMIT_NONE:
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ translog_hard_group_commit(FALSE);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ translog_soft_sync(FALSE);
+ if (maria_group_commit_rate)
+ translog_soft_sync_end();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ value= *(ulong *)var_ptr= (ulong)(*(long *)save);
+ translog_sync();
+ /* new value */
+ switch (value) {
+ case TRANSLOG_GCOMMIT_NONE:
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ translog_hard_group_commit(TRUE);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ translog_soft_sync(TRUE);
+ /* variable change made under global lock so we can just read it */
+ if (maria_group_commit_rate)
+ translog_soft_sync_start();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ DBUG_VOID_RETURN;
+}
+
+/**
+ @brief Updates group commit rate
+*/
+
+static void update_maria_group_commit_rate(MYSQL_THD thd,
+ struct st_mysql_sys_var *var,
+ void *var_ptr, const void *save)
+{
+ ulong new_value= (ulong)*((long *)save);
+ ulong *value_ptr= (ulong*) var_ptr;
+ DBUG_ENTER("update_maria_group_commit_rate");
+ DBUG_PRINT("enter", ("old value: %lu new value %lu group commit %lu",
+ *value_ptr, new_value, maria_group_commit));
+ if (new_value &&
+ ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value +
+ TRANSLOG_RATE_BASE / 2) /
+ TRANSLOG_RATE_BASE) == 0)
+ new_value= 0; /* protection against too small value */
+
+ /* variable change made under global lock so we can just read it */
+ switch (maria_group_commit) {
+ case TRANSLOG_GCOMMIT_NONE:
+ *value_ptr= new_value;
+ translog_set_group_commit_rate(new_value);
+ break;
+ case TRANSLOG_GCOMMIT_HARD:
+ *value_ptr= new_value;
+ translog_set_group_commit_rate(new_value);
+ break;
+ case TRANSLOG_GCOMMIT_SOFT:
+ if (*value_ptr)
+ translog_soft_sync_end();
+ translog_set_group_commit_rate(new_value);
+ if ((*value_ptr= new_value))
+ translog_soft_sync_start();
+ break;
+ default:
+ DBUG_ASSERT(0); /* impossible */
+ }
+ DBUG_VOID_RETURN;
+}
+
+/**
@brief Updates the transaction log file limit.
*/
@@ -3305,6 +3434,7 @@ static SHOW_VAR status_variables[]= {
{"Maria_pagecache_reads", (char*) &maria_pagecache_var.global_cache_read, SHOW_LONGLONG},
{"Maria_pagecache_write_requests", (char*) &maria_pagecache_var.global_cache_w_requests, SHOW_LONGLONG},
{"Maria_pagecache_writes", (char*) &maria_pagecache_var.global_cache_write, SHOW_LONGLONG},
+ {"Maria_transaction_log_syncs", (char*) &translog_syncs, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
=== modified file 'storage/maria/ma_init.c'
--- a/storage/maria/ma_init.c 2008-10-09 20:03:54 +0000
+++ b/storage/maria/ma_init.c 2009-07-07 00:37:23 +0000
@@ -82,6 +82,11 @@ void maria_end(void)
maria_inited= maria_multi_threaded= FALSE;
ft_free_stopwords();
ma_checkpoint_end();
+ if (translog_status == TRANSLOG_OK)
+ {
+ translog_soft_sync_end();
+ translog_sync();
+ }
if ((trid= trnman_get_max_trid()) > max_trid_in_control_file)
{
/*
=== modified file 'storage/maria/ma_loghandler.c'
--- a/storage/maria/ma_loghandler.c 2009-05-19 09:28:05 +0000
+++ b/storage/maria/ma_loghandler.c 2009-07-07 00:37:23 +0000
@@ -18,6 +18,7 @@
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "ma_key_recover.h" /* For some in-write hooks */
#include "ma_checkpoint.h"
+#include "ma_servicethread.h"
/*
On Windows, neither my_open() nor my_sync() work for directories.
@@ -47,6 +48,15 @@
#include <m_ctype.h>
#endif
+/** @brief protects checkpoint_in_progress */
+static pthread_mutex_t LOCK_soft_sync;
+/** @brief for killing the background checkpoint thread */
+static pthread_cond_t COND_soft_sync;
+/** @brief control structure for checkpoint background thread */
+static MA_SERVICE_THREAD_CONTROL soft_sync_control=
+ {THREAD_DEAD, FALSE, &LOCK_soft_sync, &COND_soft_sync};
+
+
/* transaction log file descriptor */
typedef struct st_translog_file
{
@@ -124,10 +134,20 @@ struct st_translog_buffer
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/*
+ If the buffer was forced to close it save value of its horizon
+ otherwise LSN_IMPOSSIBLE
+ */
+ TRANSLOG_ADDRESS pre_force_close_horizon;
+ /*
How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer
*/
translog_size_t size;
+ /*
+ How much data was skipped during moving page from previous buffer
+ to this one (it is optimisation of forcing buffer to finish
+ */
+ uint skipped_data;
/* File handler for this buffer */
TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */
@@ -304,6 +324,7 @@ struct st_translog_descriptor
*/
pthread_mutex_t log_flush_lock;
pthread_cond_t log_flush_cond;
+ pthread_cond_t new_goal_cond;
/* Protects changing of headers of finished files (max_lsn) */
pthread_mutex_t file_header_lock;
@@ -344,13 +365,39 @@ static struct st_translog_descriptor log
ulong log_purge_type= TRANSLOG_PURGE_IMMIDIATE;
ulong log_file_size= TRANSLOG_FILE_SIZE;
+/* sync() of log files directory mode */
ulong sync_log_dir= TRANSLOG_SYNC_DIR_NEWFILE;
+ulong maria_group_commit= TRANSLOG_GCOMMIT_NONE;
+ulong maria_group_commit_rate= 0;
/* Marker for end of log */
static uchar end_of_log= 0;
#define END_OF_LOG &end_of_log
+/**
+ Switch for "soft" sync (no real sync() but periodical sync by service
+ thread)
+*/
+static volatile my_bool soft_sync= FALSE;
+/**
+ Switch for "hard" group commit mode
+*/
+static volatile my_bool hard_group_commit= FALSE;
+/**
+ File numbers interval which have to be sync()
+*/
+static uint32 soft_sync_min= 0;
+static uint32 soft_sync_max= 0;
+/**
+ stores interval in nanoseconds/TRANSLOG_RATE_BASE (to
+ fit into uint32)
+*/
+static uint32 group_commit_wait= 0;
enum enum_translog_status translog_status= TRANSLOG_UNINITED;
+ulonglong translog_syncs= 0; /* Number of sync()s */
+
+/* time of last flush */
+static ulonglong flush_start= 0;
/* chunk types */
#define TRANSLOG_CHUNK_LSN 0x00 /* 0 chunk refer as LSN (head or tail */
@@ -980,12 +1027,17 @@ static TRANSLOG_FILE *get_logfile_by_num
static TRANSLOG_FILE *get_current_logfile()
{
TRANSLOG_FILE *file;
+ DBUG_ENTER("get_current_logfile");
rw_rdlock(&log_descriptor.open_files_lock);
+ DBUG_PRINT("info", ("max_file: %lu min_file: %lu open_files: %lu",
+ (ulong) log_descriptor.max_file,
+ (ulong) log_descriptor.min_file,
+ (ulong) log_descriptor.open_files.elements));
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
- return (file);
+ DBUG_RETURN(file);
}
uchar NEAR maria_trans_file_magic[]=
@@ -1069,6 +1121,7 @@ static my_bool translog_write_file_heade
static my_bool translog_max_lsn_to_header(File file, LSN lsn)
{
uchar lsn_buff[LSN_STORE_SIZE];
+ my_bool rc;
DBUG_ENTER("translog_max_lsn_to_header");
DBUG_PRINT("enter", ("File descriptor: %ld "
"lsn: (%lu,0x%lx)",
@@ -1077,11 +1130,13 @@ static my_bool translog_max_lsn_to_heade
lsn_store(lsn_buff, lsn);
- DBUG_RETURN(my_pwrite(file, lsn_buff,
- LSN_STORE_SIZE,
- (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
- log_write_flags) != 0 ||
- my_sync(file, MYF(MY_WME)) != 0);
+ if (!(rc= (my_pwrite(file, lsn_buff,
+ LSN_STORE_SIZE,
+ (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
+ log_write_flags) != 0 ||
+ my_sync(file, MYF(MY_WME)) != 0)))
+ translog_syncs++;
+ DBUG_RETURN(rc);
}
@@ -1423,7 +1478,9 @@ LSN translog_get_file_max_lsn_stored(uin
static my_bool translog_buffer_init(struct st_translog_buffer *buffer, int num)
{
DBUG_ENTER("translog_buffer_init");
- buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+ buffer->pre_force_close_horizon=
+ buffer->prev_last_lsn= buffer->last_lsn=
+ LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
@@ -1435,6 +1492,7 @@ static my_bool translog_buffer_init(stru
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
/* Buffer size */
buffer->size= 0;
+ buffer->skipped_data= 0;
/* cond of thread which is waiting for buffer filling */
if (pthread_cond_init(&buffer->waiting_filling_buffer, 0))
DBUG_RETURN(1);
@@ -1489,7 +1547,10 @@ static my_bool translog_close_log_file(T
TODO: sync only we have changed the log
*/
if (!file->is_sync)
+ {
rc= my_sync(file->handler.file, MYF(MY_WME));
+ translog_syncs++;
+ }
rc|= my_close(file->handler.file, MYF(MY_WME));
my_free(file, MYF(0));
return test(rc);
@@ -2044,7 +2105,8 @@ static void translog_start_buffer(struct
(ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no);
- buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
+ buffer->pre_force_close_horizon=
+ buffer->prev_last_lsn= buffer->last_lsn= LSN_IMPOSSIBLE;
DBUG_PRINT("info", ("last_lsn and prev_last_lsn set to 0 buffer: 0x%lx",
(ulong) buffer));
buffer->offset= log_descriptor.horizon;
@@ -2052,6 +2114,7 @@ static void translog_start_buffer(struct
buffer->file= get_current_logfile();
buffer->overlay= 0;
buffer->size= 0;
+ buffer->skipped_data= 0;
translog_cursor_init(cursor, buffer, buffer_no);
DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
"chaser: %d Size: %lu (%lu)",
@@ -2523,6 +2586,7 @@ static my_bool translog_buffer_flush(str
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
+ uint skipped_data;
DBUG_ENTER("translog_buffer_flush");
DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
@@ -2557,6 +2621,8 @@ static my_bool translog_buffer_flush(str
disk
*/
file= buffer->file;
+ skipped_data= buffer->skipped_data;
+ DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE);
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
i < buffer->size;
i+= TRANSLOG_PAGE_SIZE, pg++)
@@ -2573,13 +2639,16 @@ static my_bool translog_buffer_flush(str
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
DBUG_RETURN(1);
- if (pagecache_inject(log_descriptor.pagecache,
+ if (pagecache_write_part(log_descriptor.pagecache,
&file->handler, pg, 3,
buffer->buffer + i,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
- PAGECACHE_PIN_LEFT_UNPINNED, 0,
- LSN_IMPOSSIBLE))
+ PAGECACHE_PIN_LEFT_UNPINNED,
+ PAGECACHE_WRITE_DONE, 0,
+ LSN_IMPOSSIBLE,
+ skipped_data,
+ TRANSLOG_PAGE_SIZE - skipped_data))
{
DBUG_PRINT("error",
("Can't write page (%lu,0x%lx) to pagecache, error: %d",
@@ -2589,10 +2658,12 @@ static my_bool translog_buffer_flush(str
translog_stop_writing();
DBUG_RETURN(1);
}
+ skipped_data= 0;
}
file->is_sync= 0;
- if (my_pwrite(file->handler.file, buffer->buffer,
- buffer->size, LSN_OFFSET(buffer->offset),
+ if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data,
+ buffer->size - buffer->skipped_data,
+ LSN_OFFSET(buffer->offset) + buffer->skipped_data,
log_write_flags))
{
DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
@@ -2985,6 +3056,7 @@ restart:
uchar *from, *table= NULL;
int is_last_unfinished_page;
uint last_protected_sector= 0;
+ uint skipped_data= curr_buffer->skipped_data;
TRANSLOG_FILE file_copy;
uint8 ver= curr_buffer->ver;
translog_wait_for_writers(curr_buffer);
@@ -2997,7 +3069,25 @@ restart:
}
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset);
- memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
+ if (skipped_data > (addr - curr_buffer->offset))
+ {
+ /*
+ We read page part of which is not present in buffer,
+ so we should read absent part from file (page cache actually)
+ */
+ file= get_logfile_by_number(file_no);
+ DBUG_ASSERT(file != NULL);
+ buffer= pagecache_read(log_descriptor.pagecache, &file->handler,
+ LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
+ 3, buffer,
+ PAGECACHE_PLAIN_PAGE,
+ PAGECACHE_LOCK_LEFT_UNLOCKED,
+ NULL);
+ }
+ else
+ skipped_data= 0; /* Read after skipped in buffer data */
+ memcpy(buffer + skipped_data, from + skipped_data,
+ TRANSLOG_PAGE_SIZE - skipped_data);
/*
We can use copy then in translog_page_validator() because it
do not put it permanently somewhere.
@@ -3291,6 +3381,7 @@ static my_bool translog_truncate_log(TRA
uint32 next_page_offset, page_rest;
uint32 i;
File fd;
+ int rc;
TRANSLOG_VALIDATOR_DATA data;
char path[FN_REFLEN];
uchar page_buff[TRANSLOG_PAGE_SIZE];
@@ -3316,14 +3407,19 @@ static my_bool translog_truncate_log(TRA
TRANSLOG_PAGE_SIZE);
page_rest= next_page_offset - LSN_OFFSET(addr);
memset(page_buff, TRANSLOG_FILLER, page_rest);
- if ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
- ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
- (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
- log_write_flags)) ||
- my_sync(fd, MYF(MY_WME))) |
- my_close(fd, MYF(MY_WME))) ||
- (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
- sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD))))
+ rc= ((fd= open_logfile_by_number_no_cache(LSN_FILE_NO(addr))) < 0 ||
+ ((my_chsize(fd, next_page_offset, TRANSLOG_FILLER, MYF(MY_WME)) ||
+ (page_rest && my_pwrite(fd, page_buff, page_rest, LSN_OFFSET(addr),
+ log_write_flags)) ||
+ my_sync(fd, MYF(MY_WME)))));
+ translog_syncs++;
+ rc|= (fd > 0 && my_close(fd, MYF(MY_WME)));
+ if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS)
+ {
+ rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+ translog_syncs++;
+ }
+ if (rc)
DBUG_RETURN(1);
/* fix the horizon */
@@ -3511,6 +3607,7 @@ my_bool translog_init_with_table(const c
pthread_mutex_init(&log_descriptor.dirty_buffer_mask_lock,
MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&log_descriptor.log_flush_cond, 0) ||
+ pthread_cond_init(&log_descriptor.new_goal_cond, 0) ||
my_rwlock_init(&log_descriptor.open_files_lock,
NULL) ||
my_init_dynamic_array(&log_descriptor.open_files,
@@ -3912,7 +4009,6 @@ my_bool translog_init_with_table(const c
log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
log_descriptor.max_lsn= LSN_IMPOSSIBLE; /* set to 0 */
- log_descriptor.previous_flush_horizon= log_descriptor.horizon;
/*
Now 'flushed' is set to 'horizon' value, but 'horizon' is (potentially)
address of the next LSN and we want indicate that all LSNs that are
@@ -3995,6 +4091,10 @@ my_bool translog_init_with_table(const c
It is beginning of the log => there is no LSNs in the log =>
There is no harm in leaving it "as-is".
*/
+ log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+ DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.
+ previous_flush_horizon)));
DBUG_RETURN(0);
}
file_no--;
@@ -4070,6 +4170,9 @@ my_bool translog_init_with_table(const c
translog_free_record_header(&rec);
}
}
+ log_descriptor.previous_flush_horizon= log_descriptor.horizon;
+ DBUG_PRINT("info", ("previous_flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.previous_flush_horizon)));
DBUG_RETURN(0);
err:
ma_message_no_user(0, "log initialization failed");
@@ -4157,6 +4260,7 @@ void translog_destroy()
pthread_mutex_destroy(&log_descriptor.log_flush_lock);
pthread_mutex_destroy(&log_descriptor.dirty_buffer_mask_lock);
pthread_cond_destroy(&log_descriptor.log_flush_cond);
+ pthread_cond_destroy(&log_descriptor.new_goal_cond);
rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files);
@@ -6885,11 +6989,11 @@ int translog_read_record_header_from_buf
{
translog_size_t res;
DBUG_ENTER("translog_read_record_header_from_buffer");
+ DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
+ (uint) page[page_offset], (uint) page_offset));
DBUG_ASSERT(translog_is_LSN_chunk(page[page_offset]));
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
- DBUG_PRINT("info", ("page byte: 0x%x offset: %u",
- (uint) page[page_offset], (uint) page_offset));
buff->type= (page[page_offset] & TRANSLOG_REC_TYPE);
buff->short_trid= uint2korr(page + page_offset + 1);
DBUG_PRINT("info", ("Type %u, Short TrID %u, LSN (%lu,0x%lx)",
@@ -7356,27 +7460,27 @@ static void translog_force_current_buffe
"Buffer addr: (%lu,0x%lx) "
"Page addr: (%lu,0x%lx) "
"size: %lu (%lu) Pg: %u left: %u in progress %u",
- (uint) log_descriptor.bc.buffer_no,
- (ulong) log_descriptor.bc.buffer,
- LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
+ (uint) old_buffer_no,
+ (ulong) old_buffer,
+ LSN_IN_PARTS(old_buffer->offset),
(ulong) LSN_FILE_NO(log_descriptor.horizon),
(ulong) (LSN_OFFSET(log_descriptor.horizon) -
log_descriptor.bc.current_page_fill),
- (ulong) log_descriptor.bc.buffer->size,
+ (ulong) old_buffer->size,
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
buffer->buffer),
(uint) log_descriptor.bc.current_page_fill,
(uint) left,
- (uint) log_descriptor.bc.buffer->
+ (uint) old_buffer->
copy_to_buffer_in_progress));
translog_lock_assert_owner();
LINT_INIT(current_page_fill);
- new_buff_beginning= log_descriptor.bc.buffer->offset;
- new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
+ new_buff_beginning= old_buffer->offset;
+ new_buff_beginning+= old_buffer->size; /* increase offset */
DBUG_ASSERT(log_descriptor.bc.ptr !=NULL);
DBUG_ASSERT(LSN_FILE_NO(log_descriptor.horizon) ==
- LSN_FILE_NO(log_descriptor.bc.buffer->offset));
+ LSN_FILE_NO(old_buffer->offset));
translog_check_cursor(&log_descriptor.bc);
DBUG_ASSERT(left < TRANSLOG_PAGE_SIZE);
if (left)
@@ -7387,18 +7491,20 @@ static void translog_force_current_buffe
*/
DBUG_PRINT("info", ("left: %u", (uint) left));
+ old_buffer->pre_force_close_horizon=
+ old_buffer->offset + old_buffer->size;
/* decrease offset */
new_buff_beginning-= log_descriptor.bc.current_page_fill;
current_page_fill= log_descriptor.bc.current_page_fill;
memset(log_descriptor.bc.ptr, TRANSLOG_FILLER, left);
- log_descriptor.bc.buffer->size+= left;
+ old_buffer->size+= left;
DBUG_PRINT("info", ("Finish Page buffer #%u: 0x%lx "
"Size: %lu",
- (uint) log_descriptor.bc.buffer->buffer_no,
- (ulong) log_descriptor.bc.buffer,
- (ulong) log_descriptor.bc.buffer->size));
- DBUG_ASSERT(log_descriptor.bc.buffer->buffer_no ==
+ (uint) old_buffer->buffer_no,
+ (ulong) old_buffer,
+ (ulong) old_buffer->size));
+ DBUG_ASSERT(old_buffer->buffer_no ==
log_descriptor.bc.buffer_no);
}
else
@@ -7509,11 +7615,21 @@ static void translog_force_current_buffe
if (left)
{
- /*
- TODO: do not copy beginning of the page if we have no CRC or sector
- checks on
- */
- memcpy(new_buffer->buffer, data, current_page_fill);
+ if (log_descriptor.flags &
+ (TRANSLOG_PAGE_CRC | TRANSLOG_SECTOR_PROTECTION))
+ memcpy(new_buffer->buffer, data, current_page_fill);
+ else
+ {
+ /*
+ This page header does not change if we add more data to the page so
+ we can not copy it and will not overwrite later
+ */
+ new_buffer->skipped_data= current_page_fill;
+#ifndef DBUG_OFF
+ memset(new_buffer->buffer, 0xa5, current_page_fill);
+#endif
+ DBUG_ASSERT(new_buffer->skipped_data < TRANSLOG_PAGE_SIZE);
+ }
}
old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer);
@@ -7561,6 +7677,7 @@ void translog_flush_set_new_goal_and_wai
{
log_descriptor.next_pass_max_lsn= lsn;
log_descriptor.max_lsn_requester= pthread_self();
+ pthread_cond_broadcast(&log_descriptor.new_goal_cond);
}
while (flush_no == log_descriptor.flush_no)
{
@@ -7572,67 +7689,79 @@ void translog_flush_set_new_goal_and_wai
/**
- @brief Flush the log up to given LSN (included)
+ @brief sync() range of files (inclusive) and directory (by request)
- @param lsn log record serial number up to which (inclusive)
- the log has to be flushed
+ @param min min internal file number to flush
+ @param max max internal file number to flush
+ @param sync_dir need sync directory
- @return Operation status
+ return Operation status
@retval 0 OK
@retval 1 Error
-
*/
-my_bool translog_flush(TRANSLOG_ADDRESS lsn)
+static my_bool translog_sync_files(uint32 min, uint32 max,
+ my_bool sync_dir)
{
- LSN sent_to_disk= LSN_IMPOSSIBLE;
- TRANSLOG_ADDRESS flush_horizon;
- uint fn, i;
- dirty_buffer_mask_t dirty_buffer_mask;
- uint8 last_buffer_no, start_buffer_no;
+ uint fn;
my_bool rc= 0;
- DBUG_ENTER("translog_flush");
- DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
- DBUG_ASSERT(translog_status == TRANSLOG_OK ||
- translog_status == TRANSLOG_READONLY);
- LINT_INIT(sent_to_disk);
-
- pthread_mutex_lock(&log_descriptor.log_flush_lock);
- DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
- LSN_IN_PARTS(log_descriptor.flushed)));
- if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
+ ulonglong flush_interval;
+ DBUG_ENTER("translog_sync_files");
+ DBUG_PRINT("info", ("min: %lu max: %lu sync dir: %d",
+ (ulong) min, (ulong) max, (int) sync_dir));
+ DBUG_ASSERT(min <= max);
+
+ flush_interval= group_commit_wait;
+ if (flush_interval)
+ flush_start= my_micro_time();
+ for (fn= min; fn <= max; fn++)
{
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- DBUG_RETURN(0);
- }
- if (log_descriptor.flush_in_progress)
- {
- translog_flush_set_new_goal_and_wait(lsn);
- if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
+ TRANSLOG_FILE *file= get_logfile_by_number(fn);
+ DBUG_ASSERT(file != NULL);
+ if (!file->is_sync)
{
- /* fix lsn if it was horizon */
- if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
- lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
- translog_flush_wait_for_end(lsn);
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- DBUG_RETURN(0);
+ if (my_sync(file->handler.file, MYF(MY_WME)))
+ {
+ rc= 1;
+ translog_stop_writing();
+ DBUG_RETURN(rc);
+ }
+ translog_syncs++;
+ file->is_sync= 1;
}
- log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
}
- log_descriptor.flush_in_progress= 1;
- flush_horizon= log_descriptor.previous_flush_horizon;
- DBUG_PRINT("info", ("flush_in_progress is set"));
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);
- translog_lock();
- if (log_descriptor.is_everything_flushed)
+ if (sync_dir)
{
- DBUG_PRINT("info", ("everything is flushed"));
- rc= (translog_status == TRANSLOG_READONLY);
- translog_unlock();
- goto out;
+ if (!(rc= sync_dir(log_descriptor.directory_fd,
+ MYF(MY_WME | MY_IGNORE_BADFD))))
+ translog_syncs++;
}
+ DBUG_RETURN(rc);
+}
+
+
+/*
+ @brief Flushes buffers with LSNs in them less or equal address <lsn>
+
+ @param lsn address up to which all LSNs should be flushed,
+ can be reset to real last LSN address
+ @parem sent_to_disk returns 'sent to disk' position
+ @param flush_horizon returns horizon of the flush
+
+ @note About terminology see comment to translog_flush().
+*/
+
+void translog_flush_buffers(TRANSLOG_ADDRESS *lsn,
+ TRANSLOG_ADDRESS *sent_to_disk,
+ TRANSLOG_ADDRESS *flush_horizon)
+{
+ dirty_buffer_mask_t dirty_buffer_mask;
+ uint i;
+ uint8 last_buffer_no, start_buffer_no;
+ DBUG_ENTER("translog_flush_buffers");
+
/*
We will recheck information when will lock buffers one by
one so we can use unprotected read here (this is just for
@@ -7656,15 +7785,15 @@ my_bool translog_flush(TRANSLOG_ADDRESS
/*
if LSN up to which we have to flush bigger then maximum LSN of previous
buffer and at least one LSN was saved in the current buffer (last_lsn !=
- LSN_IMPOSSIBLE) then we better finish the current buffer.
+ LSN_IMPOSSIBLE) then we have to close the current buffer.
*/
- if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
+ if (cmp_translog_addr(*lsn, log_descriptor.bc.buffer->prev_last_lsn) > 0 &&
log_descriptor.bc.buffer->last_lsn != LSN_IMPOSSIBLE)
{
struct st_translog_buffer *buffer= log_descriptor.bc.buffer;
- lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
+ *lsn= log_descriptor.bc.buffer->last_lsn; /* fix lsn if it was horizon */
DBUG_PRINT("info", ("LSN to flush fixed to last lsn: (%lu,0x%lx)",
- LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
+ LSN_IN_PARTS(log_descriptor.bc.buffer->last_lsn)));
last_buffer_no= log_descriptor.bc.buffer_no;
log_descriptor.is_everything_flushed= 1;
translog_force_current_buffer_to_finish();
@@ -7676,8 +7805,10 @@ my_bool translog_flush(TRANSLOG_ADDRESS
TRANSLOG_BUFFERS_NO);
translog_unlock();
}
- sent_to_disk= translog_get_sent_to_disk();
- if (cmp_translog_addr(lsn, sent_to_disk) > 0)
+
+ /* flush buffers */
+ *sent_to_disk= translog_get_sent_to_disk();
+ if (cmp_translog_addr(*lsn, *sent_to_disk) > 0)
{
DBUG_PRINT("info", ("Start buffer #: %u last buffer #: %u",
@@ -7697,53 +7828,237 @@ my_bool translog_flush(TRANSLOG_ADDRESS
LSN_IN_PARTS(buffer->last_lsn),
(buffer->file ?
"dirty" : "closed")));
- if (buffer->prev_last_lsn <= lsn &&
+ if (buffer->prev_last_lsn <= *lsn &&
buffer->file != NULL)
{
- DBUG_ASSERT(flush_horizon <= buffer->offset + buffer->size);
- flush_horizon= buffer->offset + buffer->size;
+ DBUG_ASSERT(*flush_horizon <= buffer->offset + buffer->size);
+ *flush_horizon= (buffer->pre_force_close_horizon != LSN_IMPOSSIBLE ?
+ buffer->pre_force_close_horizon :
+ buffer->offset + buffer->size);
+ /* pre_force_close_horizon is reset during new buffer start */
+ DBUG_PRINT("info", ("flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(*flush_horizon)));
+ DBUG_ASSERT(*flush_horizon <= log_descriptor.horizon);
+
translog_buffer_flush(buffer);
}
translog_buffer_unlock(buffer);
i= (i + 1) % TRANSLOG_BUFFERS_NO;
} while (i != last_buffer_no);
- sent_to_disk= translog_get_sent_to_disk();
+ *sent_to_disk= translog_get_sent_to_disk();
+ }
+
+ DBUG_VOID_RETURN;
+}
+
+/**
+ @brief Flush the log up to given LSN (included)
+
+ @param lsn log record serial number up to which (inclusive)
+ the log has to be flushed
+
+ @return Operation status
+ @retval 0 OK
+ @retval 1 Error
+
+ @note
+
+ - Non group commit logic: Commits made in passes. Thread which started
+ flush first is performing actual flush, other threads sets new goal (LSN)
+ of the next pass (if it is maximum) and waits for the pass end or just
+ wait for the pass end.
+
+ - If hard group commit enabled and rate set to zero:
+ The first thread sends all changed buffers to disk. This is repeated
+ as long as there are new LSNs added. The process can not loop
+ forever because we have limited number of threads and they will wait
+ for the data to be synced.
+ Pseudo code:
+
+ do
+ send changed buffers to disk
+ while new_goal
+ sync
+
+ - If hard group commit switched ON and less than rate microseconds has
+ passed from last sync, then after buffers have been sent to disk
+ wait until rate microseconds has passed since last sync, do sync and return.
+ This ensures that if we call sync infrequently we don't do any waits.
+
+ - If soft group commit enabled everything works as with 'non group commit'
+ but the thread doesn't do any real sync(). If rate is not zero the
+ sync() will be performed by a service thread with the given rate
+ when needed (new LSN appears).
+
+ @note Terminology:
+ 'sent to disk' means written to disk but not sync()ed,
+ 'flushed' mean sent to disk and synced().
+*/
+
+my_bool translog_flush(TRANSLOG_ADDRESS lsn)
+{
+ struct timespec abstime;
+ ulonglong flush_interval;
+ ulonglong time_spent;
+ LSN sent_to_disk= LSN_IMPOSSIBLE;
+ TRANSLOG_ADDRESS flush_horizon;
+ my_bool rc= 0;
+ my_bool hgroup_commit_at_start;
+ DBUG_ENTER("translog_flush");
+ DBUG_PRINT("enter", ("Flush up to LSN: (%lu,0x%lx)", LSN_IN_PARTS(lsn)));
+ DBUG_ASSERT(translog_status == TRANSLOG_OK ||
+ translog_status == TRANSLOG_READONLY);
+ LINT_INIT(sent_to_disk);
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ DBUG_PRINT("info", ("Everything is flushed up to (%lu,0x%lx)",
+ LSN_IN_PARTS(log_descriptor.flushed)));
+ if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
+
+
+ {
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_RETURN(0);
}
+ if (log_descriptor.flush_in_progress)
+ {
+ translog_lock();
+ /* fix lsn if it was horizon */
+ if (cmp_translog_addr(lsn, log_descriptor.bc.buffer->last_lsn) > 0)
+ lsn= BUFFER_MAX_LSN(log_descriptor.bc.buffer);
+ translog_unlock();
+ translog_flush_set_new_goal_and_wait(lsn);
+ if (!pthread_equal(log_descriptor.max_lsn_requester, pthread_self()))
+ {
+ /*
+ translog_flush_wait_for_end() release log_flush_lock while is
+ waiting then acquire it again
+ */
+ translog_flush_wait_for_end(lsn);
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_RETURN(0);
+ }
+ log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+ }
+ log_descriptor.flush_in_progress= 1;
+ flush_horizon= log_descriptor.previous_flush_horizon;
+ DBUG_PRINT("info", ("flush_in_progress is set, flush_horizon: (%lu,0x%lx)",
+ LSN_IN_PARTS(flush_horizon)));
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+ hgroup_commit_at_start= hard_group_commit;
+ if (hgroup_commit_at_start)
+ flush_interval= group_commit_wait * TRANSLOG_RATE_BASE;
- /* sync files from previous flush till current one */
- for (fn= LSN_FILE_NO(log_descriptor.flushed); fn <= LSN_FILE_NO(lsn); fn++)
+ translog_lock();
+ if (log_descriptor.is_everything_flushed)
{
- TRANSLOG_FILE *file= get_logfile_by_number(fn);
- DBUG_ASSERT(file != NULL);
- if (!file->is_sync)
+ DBUG_PRINT("info", ("everything is flushed"));
+ translog_unlock();
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ goto out;
+ }
+
+ for (;;)
+ {
+ /* Following function flushes buffers and makes translog_unlock() */
+ translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon);
+
+ if (!hgroup_commit_at_start)
+ break; /* flush pass is ended */
+
+retest:
+ if (flush_interval != 0 &&
+ (my_micro_time() - flush_start) >= flush_interval)
+ break; /* flush pass is ended */
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE)
+ {
+ /* take next goal */
+ lsn= log_descriptor.next_pass_max_lsn;
+ log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
+ /* prevent other thread from continue */
+ log_descriptor.max_lsn_requester= pthread_self();
+ DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)",
+ LSN_IN_PARTS(lsn)));
+ }
+ else
{
- if (my_sync(file->handler.file, MYF(MY_WME)))
+ if (flush_interval == 0 ||
+ (time_spent= (my_micro_time() - flush_start)) >= flush_interval)
{
- rc= 1;
- translog_stop_writing();
- sent_to_disk= LSN_IMPOSSIBLE;
- goto out;
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ break;
}
- file->is_sync= 1;
+ DBUG_PRINT("info", ("flush waits: %llu interval: %llu spent: %llu",
+ flush_interval - time_spent,
+ flush_interval, time_spent));
+ /* wait time or next goal */
+ set_timespec_nsec(abstime, flush_interval - time_spent);
+ pthread_cond_timedwait(&log_descriptor.new_goal_cond,
+ &log_descriptor.log_flush_lock,
+ &abstime);
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+ DBUG_PRINT("info", ("retest conditions"));
+ goto retest;
}
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
+
+ /* next flush pass */
+ DBUG_PRINT("info", ("next flush pass"));
+ translog_lock();
}
- if (sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
- (LSN_FILE_NO(log_descriptor.previous_flush_horizon) !=
- LSN_FILE_NO(flush_horizon) ||
- ((LSN_OFFSET(log_descriptor.previous_flush_horizon) - 1) /
- TRANSLOG_PAGE_SIZE) !=
- ((LSN_OFFSET(flush_horizon) - 1) / TRANSLOG_PAGE_SIZE)))
- rc|= sync_dir(log_descriptor.directory_fd, MYF(MY_WME | MY_IGNORE_BADFD));
+ /*
+ sync() files from previous flush till current one
+ */
+ if (!soft_sync || hgroup_commit_at_start)
+ {
+ if ((rc=
+ translog_sync_files(LSN_FILE_NO(log_descriptor.flushed),
+ LSN_FILE_NO(lsn),
+ sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
+ (LSN_FILE_NO(log_descriptor.
+ previous_flush_horizon) !=
+ LSN_FILE_NO(flush_horizon) ||
+ (LSN_OFFSET(log_descriptor.
+ previous_flush_horizon) /
+ TRANSLOG_PAGE_SIZE) !=
+ (LSN_OFFSET(flush_horizon) /
+ TRANSLOG_PAGE_SIZE)))))
+ {
+ sent_to_disk= LSN_IMPOSSIBLE;
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
+ goto out;
+ }
+ /* keep values for soft sync() and forced sync() actual */
+ {
+ uint32 fileno= LSN_FILE_NO(lsn);
+ my_atomic_rwlock_wrlock(&soft_sync_rwl);
+ my_atomic_store32(&soft_sync_min, fileno);
+ my_atomic_store32(&soft_sync_max, fileno);
+ my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+ }
+ }
+ else
+ {
+ my_atomic_rwlock_wrlock(&soft_sync_rwl);
+ my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn));
+ my_atomic_rwlock_wrunlock(&soft_sync_rwl);
+ }
+
+ DBUG_ASSERT(flush_horizon <= log_descriptor.horizon);
+
+ pthread_mutex_lock(&log_descriptor.log_flush_lock);
log_descriptor.previous_flush_horizon= flush_horizon;
out:
- pthread_mutex_lock(&log_descriptor.log_flush_lock);
if (sent_to_disk != LSN_IMPOSSIBLE)
log_descriptor.flushed= sent_to_disk;
log_descriptor.flush_in_progress= 0;
log_descriptor.flush_no++;
DBUG_PRINT("info", ("flush_in_progress is dropped"));
- pthread_mutex_unlock(&log_descriptor.log_flush_lock);\
+ pthread_mutex_unlock(&log_descriptor.log_flush_lock);
pthread_cond_broadcast(&log_descriptor.log_flush_cond);
DBUG_RETURN(rc);
}
@@ -8113,6 +8428,8 @@ LSN translog_first_theoretical_lsn()
my_bool translog_purge(TRANSLOG_ADDRESS low)
{
uint32 last_need_file= LSN_FILE_NO(low);
+ uint32 min_unsync;
+ int soft;
TRANSLOG_ADDRESS horizon= translog_get_horizon();
int rc= 0;
DBUG_ENTER("translog_purge");
@@ -8120,12 +8437,23 @@ my_bool translog_purge(TRANSLOG_ADDRESS
DBUG_ASSERT(translog_status == TRANSLOG_OK ||
translog_status == TRANSLOG_READONLY);
+ soft= soft_sync;
+ DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync));
+ if (soft && min_unsync < last_need_file)
+ {
+ last_need_file= min_unsync;
+ DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file));
+ }
+
pthread_mutex_lock(&log_descriptor.purger_lock);
+ DBUG_PRINT("info", ("last_lsn_checked file: %lu:",
+ (ulong) log_descriptor.last_lsn_checked));
if (LSN_FILE_NO(log_descriptor.last_lsn_checked) < last_need_file)
{
uint32 i;
uint32 min_file= translog_first_file(horizon, 1);
DBUG_ASSERT(min_file != 0); /* log is already started */
+ DBUG_PRINT("info", ("min_file: %lu:",(ulong) min_file));
for(i= min_file; i < last_need_file && rc == 0; i++)
{
LSN lsn= translog_get_file_max_lsn_stored(i);
@@ -8356,6 +8684,153 @@ my_bool translog_log_debug_info(TRN *trn
}
+
+/**
+ Sets soft sync mode
+
+ @param mode TRUE if we need switch soft sync on else off
+*/
+
+void translog_soft_sync(my_bool mode)
+{
+ soft_sync= mode;
+}
+
+
+/**
+ Sets hard group commit
+
+ @param mode TRUE if we need switch hard group commit on else off
+*/
+
+void translog_hard_group_commit(my_bool mode)
+{
+ hard_group_commit= mode;
+}
+
+
+/**
+ @brief forced log sync (used when we are switching modes)
+*/
+
+void translog_sync()
+{
+ uint32 max= get_current_logfile()->number;
+ uint32 min;
+ DBUG_ENTER("ma_translog_sync");
+
+ my_atomic_rwlock_rdlock(&soft_sync_rwl);
+ min= my_atomic_load32(&soft_sync_min);
+ my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+ if (!min)
+ min= max;
+
+ translog_sync_files(min, max, sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS);
+
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief set rate for group commit
+
+ @param rate rate to set.
+
+ @note internally it stores interval in nanoseconds/TRANSLOG_RATE_BASE (to
+ fit into uint32)
+*/
+
+void translog_set_group_commit_rate(uint32 rate)
+{
+ DBUG_ENTER("translog_set_group_commit_rate");
+ ulonglong wait_time;
+ if (rate)
+ {
+ wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate +
+ TRANSLOG_RATE_BASE / 2) /
+ TRANSLOG_RATE_BASE);
+ if (wait_time == 0)
+ wait_time= 1; /* protection from getting special value */
+ }
+ else
+ wait_time= 0;
+ group_commit_wait= wait_time;
+ DBUG_PRINT("info", ("rate: %lu wait: %llu",
+ (ulong)rate, (ulonglong)wait_time));
+ DBUG_VOID_RETURN;
+}
+
+
+/**
+ @brief syncing service thread
+*/
+
+static pthread_handler_t
+ma_soft_sync_background( void *arg __attribute__((unused)))
+{
+
+ my_thread_init();
+ {
+ DBUG_ENTER("ma_soft_sync_background");
+ for(;;)
+ {
+ ulonglong prev_loop= my_micro_time();
+ ulonglong time, sleep;
+ uint32 min, max;
+ my_atomic_rwlock_rdlock(&soft_sync_rwl);
+ min= my_atomic_load32(&soft_sync_min);
+ max= my_atomic_load32(&soft_sync_max);
+ my_atomic_store32(&soft_sync_min, max);
+ my_atomic_rwlock_rdunlock(&soft_sync_rwl);
+
+ sleep= group_commit_wait * TRANSLOG_RATE_BASE;
+ translog_sync_files(min, max, FALSE);
+ time= my_micro_time() - prev_loop;
+ if (time > sleep)
+ sleep= 0;
+ else
+ sleep-= time;
+ if (my_service_thread_sleep(&soft_sync_control, sleep))
+ break;
+ }
+ my_service_thread_signal_end(&soft_sync_control);
+ my_thread_end();
+ DBUG_RETURN(0);
+ }
+}
+
+
+/**
+ @brief Starts syncing thread
+*/
+
+int translog_soft_sync_start(void)
+{
+ pthread_t th;
+ int res= 0;
+ DBUG_ENTER("translog_soft_sync_start");
+ if (!(res= ma_service_thread_control_init(&soft_sync_control)))
+ if (!(res= pthread_create(&th, NULL, ma_soft_sync_background, NULL)))
+ soft_sync_control.status= THREAD_RUNNING;
+ DBUG_RETURN(res);
+}
+
+
+/**
+ @brief Stops syncing thread
+*/
+
+void translog_soft_sync_end(void)
+{
+ DBUG_ENTER("translog_soft_sync_end");
+ if (soft_sync_control.inited)
+ {
+ ma_service_thread_control_end(&soft_sync_control);
+ }
+ DBUG_VOID_RETURN;
+}
+
+
#ifdef MARIA_DUMP_LOG
#include <my_getopt.h>
extern void translog_example_table_init();
=== modified file 'storage/maria/ma_loghandler.h'
--- a/storage/maria/ma_loghandler.h 2009-01-15 22:25:53 +0000
+++ b/storage/maria/ma_loghandler.h 2009-07-07 00:37:23 +0000
@@ -342,6 +342,14 @@ enum enum_translog_status
TRANSLOG_SHUTDOWN /* going to shutdown the loghandler */
};
extern enum enum_translog_status translog_status;
+extern ulonglong translog_syncs; /* Number of sync()s */
+
+void translog_soft_sync(my_bool mode);
+void translog_hard_group_commit(my_bool mode);
+int translog_soft_sync_start(void);
+void translog_soft_sync_end(void);
+void translog_sync();
+void translog_set_group_commit_rate(uint32 rate);
/*
all the rest added because of recovery; should we make
@@ -441,6 +449,18 @@ extern LOG_DESC log_record_type_descript
typedef enum
{
+ TRANSLOG_GCOMMIT_NONE,
+ TRANSLOG_GCOMMIT_HARD,
+ TRANSLOG_GCOMMIT_SOFT
+} enum_maria_group_commit;
+extern ulong maria_group_commit;
+extern ulong maria_group_commit_rate;
+/**
+ group commit interval is TRANSLOG_RATE_BASE/<rate> seconds
+*/
+#define TRANSLOG_RATE_BASE 100
+typedef enum
+{
TRANSLOG_PURGE_IMMIDIATE,
TRANSLOG_PURGE_EXTERNAL,
TRANSLOG_PURGE_ONDEMAND
Follow ups