← Back to team overview

maria-developers team mailing list archive

Re: bzr commit into Mariadb 5.2, with Maria 2.0:maria/5.2 branch (sanja:2725)

 

Hi!

>>>>> "sanja" == sanja  <sanja@xxxxxxxxxxxx> writes:

sanja> #At lp:maria/5.2
sanja>  2725 sanja@xxxxxxxxxxxx	2009-07-07
sanja>       Group commit and small optimisation of flush log (in case of page without CRC and sector protection) added. (for review)

<cut>

sanja> === modified file 'storage/maria/ha_maria.cc'

<cut>

sanja> +static void update_maria_group_commit(MYSQL_THD thd,
sanja> +                                      struct st_mysql_sys_var *var,
sanja> +                                      void *var_ptr, const void *save)
sanja> +{
sanja> +  ulong value= (ulong)*((long *)var_ptr);
sanja> +  DBUG_ENTER("update_maria_group_commit");
sanja> +  DBUG_PRINT("enter", ("old value: %lu  new value %lu  rate %lu",
sanja> +                       value, (ulong)(*(long *)save),
sanja> +                       maria_group_commit_rate));
sanja> +  /* old value */
sanja> +  switch (value) {
sanja> +  case TRANSLOG_GCOMMIT_NONE:
sanja> +    break;
sanja> +  case TRANSLOG_GCOMMIT_HARD:
sanja> +    translog_hard_group_commit(FALSE);
sanja> +    break;
sanja> +  case TRANSLOG_GCOMMIT_SOFT:
sanja> +    translog_soft_sync(FALSE);
sanja> +    if (maria_group_commit_rate)
sanja> +      translog_soft_sync_end();
sanja> +    break;
sanja> +  default:
sanja> +    DBUG_ASSERT(0); /* impossible */
sanja> +  }
sanja> +  value= *(ulong *)var_ptr= (ulong)(*(long *)save);
sanja> +  translog_sync();
sanja> +  /* new value */
sanja> +  switch (value) {
sanja> +  case TRANSLOG_GCOMMIT_NONE:
sanja> +    break;
sanja> +  case TRANSLOG_GCOMMIT_HARD:
sanja> +    translog_hard_group_commit(TRUE);
sanja> +    break;
sanja> +  case TRANSLOG_GCOMMIT_SOFT:
sanja> +    translog_soft_sync(TRUE);
sanja> +    /* variable change made under global lock so we can just read it */
sanja> +    if (maria_group_commit_rate)
sanja> +      translog_soft_sync_start();
sanja> +    break;
sanja> +  default:
sanja> +    DBUG_ASSERT(0); /* impossible */
sanja> +  }
sanja> +  DBUG_VOID_RETURN;
sanja> +}

A small optimization: wouldn't it be good to read the new value at
start and just return if value didn't change ?

sanja> +/**
sanja> +   @brief Updates group commit rate
sanja> +*/
sanja> +
sanja> +static void update_maria_group_commit_rate(MYSQL_THD thd,
sanja> +                                           struct st_mysql_sys_var *var,
sanja> +                                           void *var_ptr, const void *save)
sanja> +{
sanja> +  ulong new_value= (ulong)*((long *)save);
sanja> +  ulong *value_ptr= (ulong*) var_ptr;
sanja> +  DBUG_ENTER("update_maria_group_commit_rate");
sanja> +  DBUG_PRINT("enter", ("old value: %lu  new value %lu  group commit %lu",
sanja> +                        *value_ptr, new_value, maria_group_commit));
sanja> +  if (new_value &&
sanja> +      ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value +
sanja> +        TRANSLOG_RATE_BASE / 2) /
sanja> +       TRANSLOG_RATE_BASE) == 0)
sanja> +    new_value= 0; /* protection against too small value */

Note the ULL is not portable to older windows versions.
You have to use ULL(1000000000)

The forumla is also a bit complex to understand.

It's also the same as:

  ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value +
    TRANSLOG_RATE_BASE / 2) /
    TRANSLOG_RATE_BASE) == 0)
->
  ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value +
    TRANSLOG_RATE_BASE / 2) < TRANSLOG_RATE_BASE)
->
  ((TRANSLOG_RATE_BASE * 1000000000ULL / new_value <
    TRANSLOG_RATE_BASE / 2)
->
  ((1000000000ULL / new_value < 1/2))
->
new_value/2 > 1000000000ULL
->
new_value > 500000000ULL

Which doesn't look correct as this doesn't have TRANSLOG_RATE_BASE anywhere.

The idea (as dicussed on IRC) is to ensure that:

wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate +
           TRANSLOG_RATE_BASE / 2) /
           TRANSLOG_RATE_BASE);

And that wait_time * TRANSLOG_RATE_BASE fits in an uint32

After some discussion on IRC Sanja and me decided to change the code
and instead use microseconds to wait directly.
(We already have spent way to much time trying to understand the
above and similar code)

That means, that in this case we should limit the number of
microseconds to 1 minute (syncing less often than once a minute would
be stupid);  This limit you probably don't have to check here as it
would be automaticly checked by getopt.

<cut>

sanja> +++ b/storage/maria/ma_init.c	2009-07-07 00:37:23 +0000
sanja> @@ -82,6 +82,11 @@ void maria_end(void)
sanja>      maria_inited= maria_multi_threaded= FALSE;
sanja>      ft_free_stopwords();
sanja>      ma_checkpoint_end();
sanja> +    if (translog_status == TRANSLOG_OK)
sanja> +    {
sanja> +      translog_soft_sync_end();
sanja> +      translog_sync();
sanja> +    }

Why do we have to call translog_sync() when we end usage of maria ?

Note that in maria_init() you should set soft_sync and other variables
as in case of internal mysqld restart (like for embedded server) you
can't trust the initial value for variables.

sanja> +++ b/storage/maria/ma_loghandler.c	2009-07-07 00:37:23 +0000
sanja> @@ -18,6 +18,7 @@
sanja>  #include "ma_blockrec.h" /* for some constants and in-write hooks */
sanja>  #include "ma_key_recover.h" /* For some in-write hooks */
sanja>  #include "ma_checkpoint.h"
sanja> +#include "ma_servicethread.h"
 
Please call the file 'ma_service_thread.h'

<cut>
sanja> +  /*
sanja> +    How much data was skipped during moving page from previous buffer
sanja> +    to this one (it is optimisation of forcing buffer to finish
sanja> +  */
sanja> +  uint skipped_data;

Change comment to:

/*
  When moving from one log buffer to another, we write the last of the
  previous buffer to file and then move to start using the new log buffer.
  In the case of a part filed last page, this page is not moved to the
  start of the new buffer but instead we set the 'skip_data' variable
  to tell us how much data at the beginning of the buffer is not relevant.
*/

Change also variable to 'skiped_data' or 'not_initialized_data'
(Over time we should change all the old skipp variables to skip...)

sanja> @@ -1069,6 +1121,7 @@ static my_bool translog_write_file_heade
sanja>  static my_bool translog_max_lsn_to_header(File file, LSN lsn)
sanja>  {
sanja>    uchar lsn_buff[LSN_STORE_SIZE];
sanja> +  my_bool rc;
sanja>    DBUG_ENTER("translog_max_lsn_to_header");
sanja>    DBUG_PRINT("enter", ("File descriptor: %ld  "
sanja>                         "lsn: (%lu,0x%lx)",
sanja> @@ -1077,11 +1130,13 @@ static my_bool translog_max_lsn_to_heade
 
sanja>    lsn_store(lsn_buff, lsn);
 
sanja> -  DBUG_RETURN(my_pwrite(file, lsn_buff,
sanja> -                        LSN_STORE_SIZE,
sanja> -                        (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
sanja> -                        log_write_flags) != 0 ||
sanja> -              my_sync(file, MYF(MY_WME)) != 0);
sanja> +  if (!(rc= (my_pwrite(file, lsn_buff,
sanja> +                       LSN_STORE_SIZE,
sanja> +                       (LOG_HEADER_DATA_SIZE - LSN_STORE_SIZE),
sanja> +                       log_write_flags) != 0 ||
sanja> +             my_sync(file, MYF(MY_WME)) != 0)))
sanja> +    translog_syncs++;
sanja> +   DBUG_RETURN(rc);
sanja>  }

Good to not count syncs in case of errors, but as this is such an
unlikely occurrence it would have been ok to always increment the
variable and thus avoid one 'if' and branch.
(This is after all a statistical variable)
No need to change (if you don't want to :)

sanja> @@ -2557,6 +2621,8 @@ static my_bool translog_buffer_flush(str
sanja>      disk
sanja>    */
sanja>    file= buffer->file;
sanja> +  skipped_data= buffer->skipped_data;
sanja> +  DBUG_ASSERT(skipped_data < TRANSLOG_PAGE_SIZE);
sanja>    for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
sanja>         i < buffer->size;
sanja>         i+= TRANSLOG_PAGE_SIZE, pg++)
sanja> @@ -2573,13 +2639,16 @@ static my_bool translog_buffer_flush(str
sanja>      DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
sanja>      if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
sanja>        DBUG_RETURN(1);
sanja> -    if (pagecache_inject(log_descriptor.pagecache,
sanja> +    if (pagecache_write_part(log_descriptor.pagecache,
sanja>                          &file->handler, pg, 3,
sanja>                          buffer->buffer + i,
sanja>                          PAGECACHE_PLAIN_PAGE,
sanja>                          PAGECACHE_LOCK_LEFT_UNLOCKED,
sanja> -                        PAGECACHE_PIN_LEFT_UNPINNED, 0,
sanja> -                        LSN_IMPOSSIBLE))
sanja> +                        PAGECACHE_PIN_LEFT_UNPINNED,
sanja> +                        PAGECACHE_WRITE_DONE, 0,
sanja> +                        LSN_IMPOSSIBLE,
sanja> +                        skipped_data,
sanja> +                        TRANSLOG_PAGE_SIZE - skipped_data))
sanja>      {
sanja>        DBUG_PRINT("error",
sanja>                   ("Can't write page (%lu,0x%lx) to pagecache, error: %d",
sanja> @@ -2589,10 +2658,12 @@ static my_bool translog_buffer_flush(str
sanja>        translog_stop_writing();
sanja>        DBUG_RETURN(1);
sanja>      }
sanja> +    skipped_data= 0;
sanja>    }
sanja>    file->is_sync= 0;
sanja> -  if (my_pwrite(file->handler.file, buffer->buffer,
sanja> -                buffer->size, LSN_OFFSET(buffer->offset),
sanja> +  if (my_pwrite(file->handler.file, buffer->buffer + buffer->skipped_data,
sanja> +                buffer->size - buffer->skipped_data,
sanja> +                LSN_OFFSET(buffer->offset) + buffer->skipped_data,
sanja>                  log_write_flags))
sanja>    {
sanja>      DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
sanja> @@ -2985,6 +3056,7 @@ restart:
sanja>            uchar *from, *table= NULL;
sanja>            int is_last_unfinished_page;
sanja>            uint last_protected_sector= 0;
sanja> +          uint skipped_data= curr_buffer->skipped_data;
sanja>            TRANSLOG_FILE file_copy;
sanja>            uint8 ver= curr_buffer->ver;
sanja>            translog_wait_for_writers(curr_buffer);
sanja> @@ -2997,7 +3069,25 @@ restart:
sanja>            }
sanja>            DBUG_ASSERT(LSN_FILE_NO(addr) ==  LSN_FILE_NO(curr_buffer->offset));
sanja>            from= curr_buffer->buffer + (addr - curr_buffer->offset);
sanja> -          memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
sanja> +          if (skipped_data > (addr - curr_buffer->offset))

Change test to:

/* If first page in buffer and skipped_date <> 0 */
   if (skipped_data && addr == curr_buffer->offset)

As this is easier to understand.

sanja> +          {
sanja> +            /*
sanja> +              We read page part of which is not present in buffer,
sanja> +              so we should read absent part from file (page cache actually)
sanja> +            */
sanja> +            file= get_logfile_by_number(file_no);
sanja> +            DBUG_ASSERT(file != NULL);
sanja> +            buffer= pagecache_read(log_descriptor.pagecache, &file->handler,
sanja> +                                   LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
sanja> +                                   3, buffer,
sanja> +                                   PAGECACHE_PLAIN_PAGE,
sanja> +                                   PAGECACHE_LOCK_LEFT_UNLOCKED,
sanja> +                                   NULL);

I assume it's ok to not lock the page because:
- The log handler has it's own page cache.
- There is only one thread that can access the log cache at a time ?

If this is correct, then please add this as a comment so that we know
that we have to fix this if the above changes.

sanja> +          }
sanja> +          else
sanja> +            skipped_data= 0;  /* Read after skipped in buffer data */
sanja> +          memcpy(buffer + skipped_data, from + skipped_data,
sanja> +                 TRANSLOG_PAGE_SIZE - skipped_data);

Took me some time to understand why we set skipped_data above.

It could help if we add a comment after the pagecache_read:

/*
  Now we have correct data in buffer up to 'skipped_data'. The
  following memcpy() will move the data from the internal buffer that was not
  yet on disk.
*/

sanja> +static my_bool translog_sync_files(uint32 min, uint32 max,

<cut>

sanja> +  flush_interval= group_commit_wait;
sanja> +  if (flush_interval)
sanja> +    flush_start= my_micro_time();
sanja> +  for (fn= min; fn <= max; fn++)
sanja>    {
sanja> +    TRANSLOG_FILE *file= get_logfile_by_number(fn);
sanja> +    DBUG_ASSERT(file != NULL);

Actually, no reason to have assert here as following if will die
anyway if file is NULL.

sanja> +    if (!file->is_sync)

<cut>

sanja> +my_bool translog_flush(TRANSLOG_ADDRESS lsn)
sanja> +{

<cut>

sanja> +  if (cmp_translog_addr(log_descriptor.flushed, lsn) >= 0)
sanja> +
sanja> +

Remove above two empty lines

sanja> +  {
sanja> +    pthread_mutex_unlock(&log_descriptor.log_flush_lock);
sanja> +    DBUG_RETURN(0);
sanja>    }

<cut>

sanja> +  for (;;)
sanja> +  {
sanja> +    /* Following function flushes buffers and makes translog_unlock() */
sanja> +    translog_flush_buffers(&lsn, &sent_to_disk, &flush_horizon);
sanja> +
sanja> +    if (!hgroup_commit_at_start)
sanja> +      break;  /* flush pass is ended */
sanja> +
sanja> +retest:
sanja> +    if (flush_interval != 0 &&
sanja> +        (my_micro_time() - flush_start) >= flush_interval)
sanja> +      break;  /* flush pass is ended */
sanja> +
sanja> +    pthread_mutex_lock(&log_descriptor.log_flush_lock);
sanja> +    if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE)
sanja> +    {
sanja> +      /* take next goal */
sanja> +      lsn= log_descriptor.next_pass_max_lsn;
sanja> +      log_descriptor.next_pass_max_lsn= LSN_IMPOSSIBLE;
sanja> +      /* prevent other thread from continue */
sanja> +      log_descriptor.max_lsn_requester= pthread_self();
sanja> +      DBUG_PRINT("info", ("flush took next goal: (%lu,0x%lx)",
sanja> +                          LSN_IN_PARTS(lsn)));
sanja> +    }
sanja> +    else
sanja>      {
sanja> +      if (flush_interval == 0 ||
sanja> +          (time_spent= (my_micro_time() - flush_start)) >= flush_interval)
sanja>        {
sanja> +        pthread_mutex_unlock(&log_descriptor.log_flush_lock);
sanja> +        break;
sanja>        }
sanja> +      DBUG_PRINT("info", ("flush waits: %llu  interval: %llu  spent: %llu",
sanja> +                          flush_interval - time_spent,
sanja> +                          flush_interval, time_spent));
sanja> +      /* wait time or next goal */
sanja> +      set_timespec_nsec(abstime, flush_interval - time_spent);
sanja> +      pthread_cond_timedwait(&log_descriptor.new_goal_cond,
sanja> +                             &log_descriptor.log_flush_lock,
sanja> +                             &abstime);
sanja> +      pthread_mutex_unlock(&log_descriptor.log_flush_lock);
sanja> +      DBUG_PRINT("info", ("retest conditions"));
sanja> +      goto retest;
sanja>      }

If you invert the above if to:

 if (log_descriptor.next_pass_max_lsn != LSN_IMPOSSIBLE)

and move the above code up inside the if, the code will be clearer and
you can remove the else part.

Another optimization, assuming that log_descriptor.log_flush_lock() is
never hold over a slow operation, then you could move the setting of
time_spent to the first call to my_micro_time() and then just have the
second test as:

 if (flush_interval == 0)
   break;

It's not likely that in the few cases where we don't get the
mutex_lock() at once, that there will be a significant difference in
the microtime value.  (The extra call to microtime() may make things
slower than what we win by trying to do calculations 100 % exact).

sanja> +    pthread_mutex_unlock(&log_descriptor.log_flush_lock);
sanja> +
sanja> +    /* next flush pass */
sanja> +    DBUG_PRINT("info", ("next flush pass"));
sanja> +    translog_lock();
sanja>    }
 
sanja> +  /*
sanja> +    sync() files from previous flush till current one
sanja> +  */
sanja> +  if (!soft_sync || hgroup_commit_at_start)
sanja> +  {
sanja> +    if ((rc=
sanja> +         translog_sync_files(LSN_FILE_NO(log_descriptor.flushed),
sanja> +                             LSN_FILE_NO(lsn),
sanja> +                             sync_log_dir >= TRANSLOG_SYNC_DIR_ALWAYS &&
sanja> +                             (LSN_FILE_NO(log_descriptor.
sanja> +                                          previous_flush_horizon) !=
sanja> +                              LSN_FILE_NO(flush_horizon) ||
sanja> +                              (LSN_OFFSET(log_descriptor.
sanja> +                                          previous_flush_horizon) /
sanja> +                               TRANSLOG_PAGE_SIZE) !=
sanja> +                              (LSN_OFFSET(flush_horizon) /
sanja> +                               TRANSLOG_PAGE_SIZE)))))
sanja> +    {
sanja> +      sent_to_disk= LSN_IMPOSSIBLE;
sanja> +      pthread_mutex_lock(&log_descriptor.log_flush_lock);
sanja> +      goto out;
sanja> +    }
sanja> +    /* keep values for soft sync() and forced sync() actual */
sanja> +    {
sanja> +      uint32 fileno= LSN_FILE_NO(lsn);
sanja> +      my_atomic_rwlock_wrlock(&soft_sync_rwl);
sanja> +      my_atomic_store32(&soft_sync_min, fileno);
sanja> +      my_atomic_store32(&soft_sync_max, fileno);
sanja> +      my_atomic_rwlock_wrunlock(&soft_sync_rwl);

Don't understand why my_atomic_rwlock_wrlock is enough protection here.
Assuming we are only using atomic operations, doesn't the above code
give a chance that someone could read unrelated soft_sync_min and
soft_sync_max values?

sanja> +    }
sanja> +  }
sanja> +  else
sanja> +  {
sanja> +    my_atomic_rwlock_wrlock(&soft_sync_rwl);
sanja> +    my_atomic_store32(&soft_sync_max, LSN_FILE_NO(lsn));
sanja> +    my_atomic_rwlock_wrunlock(&soft_sync_rwl);

Do we really need a lock to store one variable?
what is the problem with just doing:

soft_sync_max= LSN_FILE_NO(lsn);

As after all, only one thread can be here at once.

sanja> @@ -8113,6 +8428,8 @@ LSN translog_first_theoretical_lsn()
sanja>  my_bool translog_purge(TRANSLOG_ADDRESS low)
sanja>  {
sanja>    uint32 last_need_file= LSN_FILE_NO(low);
sanja> +  uint32 min_unsync;
sanja> +  int soft;
sanja>    TRANSLOG_ADDRESS horizon= translog_get_horizon();
sanja>    int rc= 0;
sanja>    DBUG_ENTER("translog_purge");
sanja> @@ -8120,12 +8437,23 @@ my_bool translog_purge(TRANSLOG_ADDRESS 
sanja>    DBUG_ASSERT(translog_status == TRANSLOG_OK ||
sanja>                translog_status == TRANSLOG_READONLY);
 
sanja> +  soft= soft_sync;
sanja> +  DBUG_PRINT("info", ("min_unsync: %lu", (ulong) min_unsync));
sanja> +  if (soft && min_unsync < last_need_file)
sanja> +  {
sanja> +    last_need_file= min_unsync;
sanja> +    DBUG_PRINT("info", ("last_need_file set to %lu", (ulong)last_need_file));
sanja> +  }

The above must be a bug as you are never giving min_unsync a value.

<cut>

sanja> +void translog_sync()
sanja> +{
sanja> +  uint32 max= get_current_logfile()->number;
sanja> +  uint32 min;
sanja> +  DBUG_ENTER("ma_translog_sync");
sanja> +
sanja> +  my_atomic_rwlock_rdlock(&soft_sync_rwl);
sanja> +  min= my_atomic_load32(&soft_sync_min);
sanja> +  my_atomic_rwlock_rdunlock(&soft_sync_rwl);

Don't understand why you need to do atomic operations above:
- You are only reading on value
- get_current_logfile() is read without a mutex, so the values are
  already independent.

<cut>

sanja> +void translog_set_group_commit_rate(uint32 rate)
sanja> +{
sanja> +  DBUG_ENTER("translog_set_group_commit_rate");
sanja> +  ulonglong wait_time;
sanja> +  if (rate)
sanja> +  {
sanja> +    wait_time= ((TRANSLOG_RATE_BASE * 1000000000ULL / rate +
sanja> +                 TRANSLOG_RATE_BASE / 2) /
sanja> +                TRANSLOG_RATE_BASE);
sanja> +    if (wait_time == 0)
sanja> +      wait_time= 1; /* protection from getting special value */
sanja> +  }
sanja> +  else
sanja> +    wait_time= 0;
sanja> +  group_commit_wait= wait_time;
sanja> +  DBUG_PRINT("info", ("rate: %lu  wait: %llu",
sanja> +                      (ulong)rate, (ulonglong)wait_time));
sanja> +  DBUG_VOID_RETURN;
sanja> +}

Change to use microseconds instead of rate.

sanja> +
sanja> +/**
sanja> +  @brief syncing service thread
sanja> +*/
sanja> +
sanja> +static pthread_handler_t
sanja> +ma_soft_sync_background( void *arg __attribute__((unused)))
sanja> +{
sanja> +
sanja> +  my_thread_init();
sanja> +  {
sanja> +    DBUG_ENTER("ma_soft_sync_background");
sanja> +    for(;;)
sanja> +    {
sanja> +      ulonglong prev_loop= my_micro_time();
sanja> +      ulonglong time, sleep;
sanja> +      uint32 min, max;
sanja> +      my_atomic_rwlock_rdlock(&soft_sync_rwl);
sanja> +      min= my_atomic_load32(&soft_sync_min);
sanja> +      max= my_atomic_load32(&soft_sync_max);
sanja> +      my_atomic_store32(&soft_sync_min, max);
sanja> +      my_atomic_rwlock_rdunlock(&soft_sync_rwl);

Don't still understand what ensures that the above operations are safe
from other threads as my_atomic_rwlock_rdlock() and
my_atomic_rwlock_rdunlock() may be dummy operations.

Please check the code and logic with Serg (if you haven't already done
that).

sanja> +
sanja> +      sleep= group_commit_wait * TRANSLOG_RATE_BASE;
sanja> +      translog_sync_files(min, max, FALSE);

It would be good to have a test above that if there is nothing to
sync, we don't call translog_sync_files().

sanja> +      time= my_micro_time() - prev_loop;
sanja> +      if (time > sleep)
sanja> +        sleep= 0;
sanja> +      else
sanja> +        sleep-= time;
sanja> +      if (my_service_thread_sleep(&soft_sync_control, sleep))
sanja> +        break;
sanja> +    }
sanja> +    my_service_thread_signal_end(&soft_sync_control);
sanja> +    my_thread_end();
sanja> +    DBUG_RETURN(0);
sanja> +  }
sanja> +}

<cut>

Regards,
Monty



References