← Back to team overview

maria-developers team mailing list archive

bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (knielsen:2850)

 

#At lp:maria

 2850 knielsen@xxxxxxxxxxxxxxx	2010-06-09
      MWL#116: Fix a couple of races in group commit.
      modified:
        include/atomic/gcc_builtins.h
        include/atomic/x86-gcc.h
        sql/handler.cc

=== modified file 'include/atomic/gcc_builtins.h'
--- a/include/atomic/gcc_builtins.h	2008-02-06 16:55:04 +0000
+++ b/include/atomic/gcc_builtins.h	2010-06-09 11:17:39 +0000
@@ -19,8 +19,9 @@
   v= __sync_lock_test_and_set(a, v);
 #define make_atomic_cas_body(S)                     \
   int ## S sav;                                     \
-  sav= __sync_val_compare_and_swap(a, *cmp, set);   \
-  if (!(ret= (sav == *cmp))) *cmp= sav;
+  int ## S cmp_val= *cmp;                           \
+  sav= __sync_val_compare_and_swap(a, cmp_val, set);\
+  if (!(ret= (sav == cmp_val))) *cmp= sav
 
 #ifdef MY_ATOMIC_MODE_DUMMY
 #define make_atomic_load_body(S)   ret= *a

=== modified file 'include/atomic/x86-gcc.h'
--- a/include/atomic/x86-gcc.h	2007-02-28 16:50:51 +0000
+++ b/include/atomic/x86-gcc.h	2010-06-09 11:17:39 +0000
@@ -38,15 +38,33 @@
 #define asm __asm__
 #endif
 
+/*
+  The atomic operations imply a memory barrier for the CPU, to ensure that all
+  prior writes are flushed from cache, and all subsequent reads reloaded into
+  cache.
+
+  We need to imply a similar memory barrier for the compiler, so that all
+  pending stores (to memory that may be aliased in other parts of the code)
+  will be flushed to memory before the operation, and all reads from such
+  memory be re-loaded. This is achieved by adding the "memory" pseudo-register
+  to the clobber list, see GCC documentation for more explanation.
+
+  The compiler and CPU memory barriers are needed to make sure changes in one
+  thread are made visible in another by the atomic operation.
+*/
 #ifndef MY_ATOMIC_NO_XADD
 #define make_atomic_add_body(S)					\
-  asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a))
+  asm volatile (LOCK_prefix "; xadd %0, %1;" : "+r" (v) , "+m" (*a): : "memory")
 #endif
 #define make_atomic_fas_body(S)				\
-  asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a))
+  asm volatile ("xchg %0, %1;" : "+r" (v) , "+m" (*a) : : "memory")
 #define make_atomic_cas_body(S)					\
+  int ## S sav;                                     \
   asm volatile (LOCK_prefix "; cmpxchg %3, %0; setz %2;"	\
-               : "+m" (*a), "+a" (*cmp), "=q" (ret): "r" (set))
+               : "+m" (*a), "=a" (sav), "=q" (ret)              \
+               : "r" (set), "a" (*cmp) : "memory");             \
+  if (!ret)                                                     \
+    *cmp= sav
 
 #ifdef MY_ATOMIC_MODE_DUMMY
 #define make_atomic_load_body(S)   ret=*a
@@ -59,9 +77,9 @@
 #define make_atomic_load_body(S)				\
   ret=0;							\
   asm volatile (LOCK_prefix "; cmpxchg %2, %0"			\
-               : "+m" (*a), "+a" (ret): "r" (ret))
+               : "+m" (*a), "+a" (ret) : "r" (ret) : "memory")
 #define make_atomic_store_body(S)				\
-  asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v))
+  asm volatile ("; xchg %0, %1;" : "+m" (*a), "+r" (v) : : "memory")
 #endif
 
 /* TODO test on intel whether the below helps. on AMD it makes no difference */

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2010-05-26 08:16:18 +0000
+++ b/sql/handler.cc	2010-06-09 11:17:39 +0000
@@ -1103,14 +1103,30 @@ ha_check_and_coalesce_trx_read_only(THD 
 static THD *
 enqueue_atomic(THD *thd)
 {
-  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+  THD *orig_queue;
+
   thd->next_commit_ordered= group_commit_queue;
+
+  my_atomic_rwlock_wrlock(&LOCK_group_commit_queue);
+  do
+  {
+    /*
+      Save the read value of group_commit_queue in each iteration of the loop.
+      When my_atomic_casptr() returns TRUE, we know that orig_queue is equal
+      to the value of group_commit_queue when we enqueued.
+
+      However, as soon as we enqueue, thd->next_commit_ordered may be
+      invalidated by another thread (the group commit leader). So we need to
+      save the old queue value in a local variable orig_queue like this.
+    */
+    orig_queue= thd->next_commit_ordered;
+  }
   while (!my_atomic_casptr((void **)(&group_commit_queue),
                            (void **)(&thd->next_commit_ordered),
-                           thd))
-    ;
+                           thd));
   my_atomic_rwlock_wrunlock(&LOCK_group_commit_queue);
-  return thd->next_commit_ordered;
+
+  return orig_queue;
 }
 
 static THD *
@@ -1399,6 +1415,9 @@ int ha_commit_trans(THD *thd, bool all)
   int cookie;
   if (tc_log->use_group_log_xid)
   {
+    // ToDo: if xid==NULL here, we may use is_group_commit_leader uninitialised.
+    // ToDo: Same for cookie below when xid==NULL.
+    // Seems we generally need to check the case xid==NULL.
     if (is_group_commit_leader)
     {
       pthread_mutex_lock(&LOCK_group_commit);
@@ -1434,9 +1453,18 @@ int ha_commit_trans(THD *thd, bool all)
       }
       pthread_mutex_unlock(&LOCK_group_commit);
 
-        /* Wake up everyone except ourself. */
-      while ((queue= queue->next_commit_ordered)  != NULL)
-        group_commit_wakeup_other(queue);
+      /* Wake up everyone except ourself. */
+      THD *current= queue->next_commit_ordered;
+      while (current != NULL)
+      {
+        /*
+          Careful not to access current->next_commit_ordered after waking up
+          the other thread! As it may change immediately after wakeup.
+        */
+        THD *next= current->next_commit_ordered;
+        group_commit_wakeup_other(current);
+        current= next;
+      }
     }
     else
     {