1 : /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2 :
3 : This program is free software; you can redistribute it and/or modify
4 : it under the terms of the GNU General Public License as published by
5 : the Free Software Foundation; version 2 of the License.
6 :
7 : This program is distributed in the hope that it will be useful,
8 : but WITHOUT ANY WARRANTY; without even the implied warranty of
9 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 : GNU General Public License for more details.
11 :
12 : You should have received a copy of the GNU General Public License
13 : along with this program; if not, write to the Free Software
14 : Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
15 :
16 : /*
17 : Creates a index for a database by reading keys, sorting them and outputing
18 : them in sorted order through MARIA_SORT_INFO functions.
19 : */
20 :
21 : #include "ma_fulltext.h"
22 : #if defined(MSDOS) || defined(__WIN__)
23 : #include <fcntl.h>
24 : #else
25 : #include <stddef.h>
26 : #endif
27 : #include <queues.h>
28 :
29 : /* static variables */
30 :
31 : #undef MIN_SORT_MEMORY
32 : #undef MYF_RW
33 : #undef DISK_BUFFER_SIZE
34 :
35 : #define MERGEBUFF 15
36 : #define MERGEBUFF2 31
37 : #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
38 : #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
39 : #define DISK_BUFFER_SIZE (IO_SIZE*16)
40 :
41 :
42 : /*
43 : Pointers of functions for store and read keys from temp file
44 : */
45 :
46 : extern void print_error _VARARGS((const char *fmt,...));
47 :
48 : /* Functions defined in this file */
49 :
50 : static ha_rows find_all_keys(MARIA_SORT_PARAM *info,uint keys,
51 : uchar **sort_keys,
52 : DYNAMIC_ARRAY *buffpek,int *maxbuffer,
53 : IO_CACHE *tempfile,
54 : IO_CACHE *tempfile_for_exceptions);
55 : static int write_keys(MARIA_SORT_PARAM *info, uchar **sort_keys,
56 : uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
57 : static int write_key(MARIA_SORT_PARAM *info, uchar *key,
58 : IO_CACHE *tempfile);
59 : static int write_index(MARIA_SORT_PARAM *info, uchar **sort_keys,
60 : uint count);
61 : static int merge_many_buff(MARIA_SORT_PARAM *info,uint keys,
62 : uchar **sort_keys,
63 : BUFFPEK *buffpek,int *maxbuffer,
64 : IO_CACHE *t_file);
65 : static uint read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
66 : uint sort_length);
67 : static int merge_buffers(MARIA_SORT_PARAM *info,uint keys,
68 : IO_CACHE *from_file, IO_CACHE *to_file,
69 : uchar **sort_keys, BUFFPEK *lastbuff,
70 : BUFFPEK *Fb, BUFFPEK *Tb);
71 : static int merge_index(MARIA_SORT_PARAM *,uint, uchar **,BUFFPEK *, int,
72 : IO_CACHE *);
73 : static int flush_maria_ft_buf(MARIA_SORT_PARAM *info);
74 :
75 : static int write_keys_varlen(MARIA_SORT_PARAM *info, uchar **sort_keys,
76 : uint count, BUFFPEK *buffpek,
77 : IO_CACHE *tempfile);
78 : static uint read_to_buffer_varlen(IO_CACHE *fromfile,BUFFPEK *buffpek,
79 : uint sort_length);
80 : static int write_merge_key(MARIA_SORT_PARAM *info, IO_CACHE *to_file,
81 : uchar *key, uint sort_length, uint count);
82 : static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
83 : IO_CACHE *to_file, uchar *key,
84 : uint sort_length, uint count);
85 : static inline int
86 : my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs);
87 :
88 : /*
89 : Creates a index of sorted keys
90 :
91 : SYNOPSIS
92 : _ma_create_index_by_sort()
93 : info Sort parameters
94 : no_messages Set to 1 if no output
95 : sortbuff_size Size of sortbuffer to allocate
96 :
97 : RESULT
98 : 0 ok
99 : <> 0 Error
100 : */
101 :
102 : int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
103 : size_t sortbuff_size)
104 175 : {
105 : int error,maxbuffer,skr;
106 : size_t memavl,old_memavl;
107 : uint keys,sort_length;
108 : DYNAMIC_ARRAY buffpek;
109 : ha_rows records;
110 : uchar **sort_keys;
111 : IO_CACHE tempfile, tempfile_for_exceptions;
112 175 : DBUG_ENTER("_ma_create_index_by_sort");
113 175 : DBUG_PRINT("enter",("sort_buff_size: %lu sort_length: %d max_records: %lu",
114 : (ulong) sortbuff_size, info->key_length,
115 : (ulong) info->sort_info->max_records));
116 :
117 175 : if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
118 : {
119 80 : info->write_keys= write_keys_varlen;
120 80 : info->read_to_buffer=read_to_buffer_varlen;
121 80 : info->write_key=write_merge_key_varlen;
122 : }
123 : else
124 : {
125 95 : info->write_keys= write_keys;
126 95 : info->read_to_buffer=read_to_buffer;
127 95 : info->write_key=write_merge_key;
128 : }
129 :
130 175 : my_b_clear(&tempfile);
131 175 : my_b_clear(&tempfile_for_exceptions);
132 175 : bzero((char*) &buffpek,sizeof(buffpek));
133 175 : sort_keys= (uchar **) NULL; error= 1;
134 175 : maxbuffer=1;
135 :
136 175 : memavl=max(sortbuff_size,MIN_SORT_MEMORY);
137 175 : records= info->sort_info->max_records;
138 175 : sort_length= info->key_length;
139 175 : LINT_INIT(keys);
140 :
141 350 : while (memavl >= MIN_SORT_MEMORY)
142 : {
143 350 : if ((records < UINT_MAX32) &&
144 : ((my_off_t) (records + 1) *
145 : (sort_length + sizeof(char*)) <= (my_off_t) memavl))
146 175 : keys= (uint)records+1;
147 : else
148 : do
149 : {
150 0 : skr=maxbuffer;
151 0 : if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
152 : (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
153 : (sort_length+sizeof(char*))) <= 1 ||
154 : keys < (uint) maxbuffer)
155 : {
156 0 : _ma_check_print_error(info->sort_info->param,
157 : "maria_sort_buffer_size is too small");
158 0 : goto err;
159 : }
160 : }
161 0 : while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
162 :
163 175 : if ((sort_keys=(uchar**) my_malloc(keys*(sort_length+sizeof(char*))+
164 : HA_FT_MAXBYTELEN, MYF(0))))
165 : {
166 175 : if (my_init_dynamic_array(&buffpek, sizeof(BUFFPEK), maxbuffer,
167 : maxbuffer/2))
168 : {
169 0 : my_free(sort_keys,MYF(0));
170 0 : sort_keys= 0;
171 : }
172 : else
173 0 : break;
174 : }
175 0 : old_memavl=memavl;
176 0 : if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
177 0 : memavl=MIN_SORT_MEMORY;
178 : }
179 175 : if (memavl < MIN_SORT_MEMORY)
180 : {
181 0 : _ma_check_print_error(info->sort_info->param, "Maria sort buffer"
182 : " too small"); /* purecov: tested */
183 0 : goto err; /* purecov: tested */
184 : }
185 175 : (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
186 :
187 175 : if (!no_messages)
188 0 : printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
189 :
190 175 : if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
191 : &tempfile,&tempfile_for_exceptions))
192 : == HA_POS_ERROR)
193 175 : goto err; /* purecov: tested */
194 175 : if (maxbuffer == 0)
195 : {
196 175 : if (!no_messages)
197 0 : printf(" - Dumping %lu keys\n", (ulong) records);
198 175 : if (write_index(info,sort_keys, (uint) records))
199 : goto err; /* purecov: inspected */
200 : }
201 : else
202 : {
203 0 : keys=(keys*(sort_length+sizeof(char*)))/sort_length;
204 0 : if (maxbuffer >= MERGEBUFF2)
205 : {
206 0 : if (!no_messages)
207 0 : printf(" - Merging %lu keys\n", (ulong) records); /* purecov: tested */
208 0 : if (merge_many_buff(info,keys,sort_keys,
209 : dynamic_element(&buffpek,0,BUFFPEK *),&maxbuffer,&tempfile))
210 0 : goto err; /* purecov: inspected */
211 : }
212 0 : if (flush_io_cache(&tempfile) ||
213 : reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
214 : goto err; /* purecov: inspected */
215 0 : if (!no_messages)
216 0 : printf(" - Last merge and dumping keys\n"); /* purecov: tested */
217 0 : if (merge_index(info,keys,sort_keys,dynamic_element(&buffpek,0,BUFFPEK *),
218 : maxbuffer,&tempfile))
219 175 : goto err; /* purecov: inspected */
220 : }
221 :
222 175 : if (flush_maria_ft_buf(info) || _ma_flush_pending_blocks(info))
223 : goto err;
224 :
225 175 : if (my_b_inited(&tempfile_for_exceptions))
226 : {
227 0 : MARIA_HA *idx=info->sort_info->info;
228 : uint16 key_length;
229 : MARIA_KEY key;
230 0 : key.keyinfo= idx->s->keyinfo + info->key;
231 :
232 0 : if (!no_messages)
233 0 : printf(" - Adding exceptions\n"); /* purecov: tested */
234 0 : if (flush_io_cache(&tempfile_for_exceptions) ||
235 : reinit_io_cache(&tempfile_for_exceptions,READ_CACHE,0L,0,0))
236 : goto err;
237 :
238 0 : while (!my_b_read(&tempfile_for_exceptions,(uchar*)&key_length,
239 : sizeof(key_length))
240 : && !my_b_read(&tempfile_for_exceptions,(uchar*)sort_keys,
241 : (uint) key_length))
242 : {
243 0 : key.data= (uchar*) sort_keys;
244 0 : key.ref_length= idx->s->rec_reflength;
245 0 : key.data_length= key_length - key.ref_length;
246 0 : key.flag= 0;
247 0 : if (_ma_ck_write(idx, &key))
248 0 : goto err;
249 : }
250 : }
251 :
252 175 : error =0;
253 :
254 175 : err:
255 175 : my_free(sort_keys, MYF(MY_ALLOW_ZERO_PTR));
256 175 : delete_dynamic(&buffpek);
257 175 : close_cached_file(&tempfile);
258 175 : close_cached_file(&tempfile_for_exceptions);
259 :
260 175 : DBUG_RETURN(error ? -1 : 0);
261 : } /* _ma_create_index_by_sort */
262 :
263 :
264 : /* Search after all keys and place them in a temp. file */
265 :
266 : static ha_rows find_all_keys(MARIA_SORT_PARAM *info, uint keys,
267 : uchar **sort_keys, DYNAMIC_ARRAY *buffpek,
268 : int *maxbuffer, IO_CACHE *tempfile,
269 : IO_CACHE *tempfile_for_exceptions)
270 175 : {
271 : int error;
272 : uint idx;
273 175 : DBUG_ENTER("find_all_keys");
274 :
275 175 : idx=error=0;
276 175 : sort_keys[0]= (uchar*) (sort_keys+keys);
277 :
278 83075 : while (!(error=(*info->key_read)(info,sort_keys[idx])))
279 : {
280 82725 : if (info->real_key_length > info->key_length)
281 : {
282 0 : if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
283 0 : DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
284 : continue;
285 : }
286 :
287 82725 : if (++idx == keys)
288 : {
289 0 : if (info->write_keys(info,sort_keys,idx-1,
290 : (BUFFPEK *)alloc_dynamic(buffpek),
291 : tempfile))
292 0 : DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
293 :
294 0 : sort_keys[0]=(uchar*) (sort_keys+keys);
295 0 : memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
296 0 : idx=1;
297 : }
298 82725 : sort_keys[idx]=sort_keys[idx-1]+info->key_length;
299 : }
300 175 : if (error > 0)
301 0 : DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
302 175 : if (buffpek->elements)
303 : {
304 0 : if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)alloc_dynamic(buffpek),
305 : tempfile))
306 0 : DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
307 0 : *maxbuffer=buffpek->elements-1;
308 : }
309 : else
310 175 : *maxbuffer=0;
311 :
312 175 : DBUG_RETURN((*maxbuffer)*(keys-1)+idx);
313 : } /* find_all_keys */
314 :
315 :
316 : #ifdef THREAD
317 : /* Search after all keys and place them in a temp. file */
318 :
319 : pthread_handler_t _ma_thr_find_all_keys(void *arg)
320 60 : {
321 60 : MARIA_SORT_PARAM *sort_param= (MARIA_SORT_PARAM*) arg;
322 : int error;
323 : size_t memavl,old_memavl;
324 : uint sort_length;
325 : ulong idx, maxbuffer, keys;
326 60 : uchar **sort_keys=0;
327 :
328 60 : LINT_INIT(keys);
329 :
330 60 : error=1;
331 :
332 60 : if (my_thread_init())
333 60 : goto err;
334 :
335 : { /* Add extra block since DBUG_ENTER declare variables */
336 60 : DBUG_ENTER("_ma_thr_find_all_keys");
337 60 : DBUG_PRINT("enter", ("master: %d", sort_param->master));
338 60 : if (sort_param->sort_info->got_error)
339 60 : goto err;
340 :
341 60 : if (sort_param->keyinfo->flag & HA_VAR_LENGTH_KEY)
342 : {
343 40 : sort_param->write_keys= write_keys_varlen;
344 40 : sort_param->read_to_buffer= read_to_buffer_varlen;
345 40 : sort_param->write_key= write_merge_key_varlen;
346 : }
347 : else
348 : {
349 20 : sort_param->write_keys= write_keys;
350 20 : sort_param->read_to_buffer= read_to_buffer;
351 20 : sort_param->write_key= write_merge_key;
352 : }
353 :
354 60 : my_b_clear(&sort_param->tempfile);
355 60 : my_b_clear(&sort_param->tempfile_for_exceptions);
356 60 : bzero((char*) &sort_param->buffpek,sizeof(sort_param->buffpek));
357 60 : bzero((char*) &sort_param->unique, sizeof(sort_param->unique));
358 :
359 60 : memavl= max(sort_param->sortbuff_size, MIN_SORT_MEMORY);
360 60 : idx= (uint)sort_param->sort_info->max_records;
361 60 : sort_length= sort_param->key_length;
362 60 : maxbuffer= 1;
363 :
364 120 : while (memavl >= MIN_SORT_MEMORY)
365 : {
366 60 : if ((my_off_t) (idx+1)*(sort_length+sizeof(char*)) <= (my_off_t) memavl)
367 60 : keys= idx+1;
368 : else
369 : {
370 : ulong skr;
371 : do
372 : {
373 0 : skr= maxbuffer;
374 0 : if (memavl < sizeof(BUFFPEK)*maxbuffer ||
375 : (keys=(memavl-sizeof(BUFFPEK)*maxbuffer)/
376 : (sort_length+sizeof(char*))) <= 1 ||
377 : keys < maxbuffer)
378 : {
379 0 : _ma_check_print_error(sort_param->sort_info->param,
380 : "maria_sort_buffer_size is too small");
381 0 : goto err;
382 : }
383 : }
384 0 : while ((maxbuffer= (int) (idx/(keys-1)+1)) != skr);
385 : }
386 60 : if ((sort_keys= (uchar **)
387 : my_malloc(keys*(sort_length+sizeof(char*))+
388 : ((sort_param->keyinfo->flag & HA_FULLTEXT) ?
389 : HA_FT_MAXBYTELEN : 0), MYF(0))))
390 : {
391 60 : if (my_init_dynamic_array(&sort_param->buffpek, sizeof(BUFFPEK),
392 : maxbuffer, maxbuffer/2))
393 : {
394 0 : my_free(sort_keys, MYF(0));
395 0 : sort_keys= (uchar **) NULL; /* for err: label */
396 : }
397 : else
398 0 : break;
399 : }
400 0 : old_memavl= memavl;
401 0 : if ((memavl= memavl/4*3) < MIN_SORT_MEMORY &&
402 : old_memavl > MIN_SORT_MEMORY)
403 0 : memavl= MIN_SORT_MEMORY;
404 : }
405 60 : if (memavl < MIN_SORT_MEMORY)
406 : {
407 0 : _ma_check_print_error(sort_param->sort_info->param,
408 : "Maria sort buffer too small");
409 0 : goto err; /* purecov: tested */
410 : }
411 :
412 60 : if (sort_param->sort_info->param->testflag & T_VERBOSE)
413 0 : printf("Key %d - Allocating buffer for %lu keys\n",
414 : sort_param->key+1, (ulong) keys);
415 60 : sort_param->sort_keys= sort_keys;
416 :
417 60 : idx= error= 0;
418 60 : sort_keys[0]= (uchar*) (sort_keys+keys);
419 :
420 60 : DBUG_PRINT("info", ("reading keys"));
421 54715 : while (!(error= sort_param->sort_info->got_error) &&
422 : !(error= (*sort_param->key_read)(sort_param, sort_keys[idx])))
423 : {
424 54595 : if (sort_param->real_key_length > sort_param->key_length)
425 : {
426 0 : if (write_key(sort_param,sort_keys[idx],
427 : &sort_param->tempfile_for_exceptions))
428 : goto err;
429 : continue;
430 : }
431 :
432 54595 : if (++idx == keys)
433 : {
434 0 : if (sort_param->write_keys(sort_param, sort_keys, idx - 1,
435 : (BUFFPEK *)alloc_dynamic(&sort_param->
436 : buffpek),
437 : &sort_param->tempfile))
438 0 : goto err;
439 0 : sort_keys[0]= (uchar*) (sort_keys+keys);
440 0 : memcpy(sort_keys[0], sort_keys[idx - 1],
441 : (size_t) sort_param->key_length);
442 0 : idx= 1;
443 : }
444 54595 : sort_keys[idx]=sort_keys[idx - 1] + sort_param->key_length;
445 : }
446 56 : if (error > 0)
447 60 : goto err;
448 60 : if (sort_param->buffpek.elements)
449 : {
450 0 : if (sort_param->write_keys(sort_param,sort_keys, idx,
451 : (BUFFPEK *) alloc_dynamic(&sort_param->
452 : buffpek),
453 : &sort_param->tempfile))
454 0 : goto err;
455 0 : sort_param->keys= (sort_param->buffpek.elements - 1) * (keys - 1) + idx;
456 : }
457 : else
458 60 : sort_param->keys= idx;
459 :
460 60 : sort_param->sort_keys_length= keys;
461 60 : goto ok;
462 :
463 0 : err:
464 0 : DBUG_PRINT("error", ("got some error"));
465 0 : sort_param->sort_info->got_error= 1; /* no need to protect with a mutex */
466 0 : my_free(sort_keys,MYF(MY_ALLOW_ZERO_PTR));
467 0 : sort_param->sort_keys=0;
468 0 : delete_dynamic(& sort_param->buffpek);
469 0 : close_cached_file(&sort_param->tempfile);
470 0 : close_cached_file(&sort_param->tempfile_for_exceptions);
471 :
472 60 : ok:
473 60 : free_root(&sort_param->wordroot, MYF(0));
474 : /*
475 : Detach from the share if the writer is involved. Avoid others to
476 : be blocked. This includes a flush of the write buffer. This will
477 : also indicate EOF to the readers.
478 : */
479 60 : if (sort_param->sort_info->info->rec_cache.share)
480 8 : remove_io_thread(&sort_param->sort_info->info->rec_cache);
481 :
482 : /* Readers detach from the share if any. Avoid others to be blocked. */
483 60 : if (sort_param->read_cache.share)
484 52 : remove_io_thread(&sort_param->read_cache);
485 :
486 60 : pthread_mutex_lock(&sort_param->sort_info->mutex);
487 60 : if (!--sort_param->sort_info->threads_running)
488 10 : pthread_cond_signal(&sort_param->sort_info->cond);
489 60 : pthread_mutex_unlock(&sort_param->sort_info->mutex);
490 60 : DBUG_PRINT("exit", ("======== ending thread ========"));
491 : }
492 60 : my_thread_end();
493 60 : return NULL;
494 : }
495 :
496 :
497 : int _ma_thr_write_keys(MARIA_SORT_PARAM *sort_param)
498 10 : {
499 10 : MARIA_SORT_INFO *sort_info=sort_param->sort_info;
500 10 : HA_CHECK *param=sort_info->param;
501 : ulong length, keys;
502 10 : double *rec_per_key_part= param->new_rec_per_key_part;
503 10 : int got_error=sort_info->got_error;
504 : uint i;
505 10 : MARIA_HA *info=sort_info->info;
506 10 : MARIA_SHARE *share= info->s;
507 : MARIA_SORT_PARAM *sinfo;
508 10 : uchar *mergebuf=0;
509 10 : DBUG_ENTER("_ma_thr_write_keys");
510 10 : LINT_INIT(length);
511 :
512 10 : for (i= 0, sinfo= sort_param ;
513 80 : i < sort_info->total_keys ;
514 60 : i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
515 : {
516 60 : if (!sinfo->sort_keys)
517 : {
518 0 : got_error=1;
519 0 : my_free(sinfo->rec_buff, MYF(MY_ALLOW_ZERO_PTR));
520 0 : continue;
521 : }
522 60 : if (!got_error)
523 : {
524 60 : maria_set_key_active(share->state.key_map, sinfo->key);
525 :
526 60 : if (!sinfo->buffpek.elements)
527 : {
528 60 : if (param->testflag & T_VERBOSE)
529 : {
530 0 : printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
531 0 : fflush(stdout);
532 : }
533 60 : if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) ||
534 : flush_maria_ft_buf(sinfo) || _ma_flush_pending_blocks(sinfo))
535 0 : got_error=1;
536 : }
537 60 : if (!got_error && param->testflag & T_STATISTICS)
538 0 : maria_update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
539 : param->stats_method ==
540 : MI_STATS_METHOD_IGNORE_NULLS ?
541 : sinfo->notnull : NULL,
542 : (ulonglong) share->state.state.records);
543 : }
544 60 : my_free(sinfo->sort_keys,MYF(0));
545 60 : my_free(sinfo->rec_buff, MYF(MY_ALLOW_ZERO_PTR));
546 60 : sinfo->sort_keys=0;
547 : }
548 :
549 10 : for (i= 0, sinfo= sort_param ;
550 80 : i < sort_info->total_keys ;
551 : i++,
552 : delete_dynamic(&sinfo->buffpek),
553 : close_cached_file(&sinfo->tempfile),
554 : close_cached_file(&sinfo->tempfile_for_exceptions),
555 60 : sinfo++)
556 : {
557 60 : if (got_error)
558 60 : continue;
559 60 : if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
560 : {
561 40 : sinfo->write_keys=write_keys_varlen;
562 40 : sinfo->read_to_buffer=read_to_buffer_varlen;
563 40 : sinfo->write_key=write_merge_key_varlen;
564 : }
565 : else
566 : {
567 20 : sinfo->write_keys=write_keys;
568 20 : sinfo->read_to_buffer=read_to_buffer;
569 20 : sinfo->write_key=write_merge_key;
570 : }
571 60 : if (sinfo->buffpek.elements)
572 : {
573 0 : uint maxbuffer=sinfo->buffpek.elements-1;
574 0 : if (!mergebuf)
575 : {
576 0 : length=param->sort_buffer_length;
577 0 : while (length >= MIN_SORT_MEMORY)
578 : {
579 0 : if ((mergebuf= my_malloc(length, MYF(0))))
580 0 : break;
581 0 : length=length*3/4;
582 : }
583 0 : if (!mergebuf)
584 : {
585 0 : got_error=1;
586 0 : continue;
587 : }
588 : }
589 0 : keys=length/sinfo->key_length;
590 0 : if (maxbuffer >= MERGEBUFF2)
591 : {
592 0 : if (param->testflag & T_VERBOSE)
593 0 : printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
594 0 : if (merge_many_buff(sinfo, keys, (uchar **) mergebuf,
595 : dynamic_element(&sinfo->buffpek, 0, BUFFPEK *),
596 : (int*) &maxbuffer, &sinfo->tempfile))
597 : {
598 0 : got_error=1;
599 0 : continue;
600 : }
601 : }
602 0 : if (flush_io_cache(&sinfo->tempfile) ||
603 : reinit_io_cache(&sinfo->tempfile,READ_CACHE,0L,0,0))
604 : {
605 0 : got_error=1;
606 0 : continue;
607 : }
608 0 : if (param->testflag & T_VERBOSE)
609 0 : printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
610 0 : if (merge_index(sinfo, keys, (uchar**) mergebuf,
611 : dynamic_element(&sinfo->buffpek,0,BUFFPEK *),
612 : maxbuffer,&sinfo->tempfile) ||
613 : flush_maria_ft_buf(sinfo) ||
614 : _ma_flush_pending_blocks(sinfo))
615 : {
616 0 : got_error=1;
617 0 : continue;
618 : }
619 : }
620 60 : if (my_b_inited(&sinfo->tempfile_for_exceptions))
621 : {
622 : uint16 key_length;
623 :
624 0 : if (param->testflag & T_VERBOSE)
625 0 : printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
626 :
627 0 : if (flush_io_cache(&sinfo->tempfile_for_exceptions) ||
628 : reinit_io_cache(&sinfo->tempfile_for_exceptions,READ_CACHE,0L,0,0))
629 : {
630 0 : got_error=1;
631 0 : continue;
632 : }
633 :
634 0 : while (!got_error &&
635 : !my_b_read(&sinfo->tempfile_for_exceptions,(uchar*)&key_length,
636 : sizeof(key_length)))
637 : {
638 : uchar maria_ft_buf[HA_FT_MAXBYTELEN + HA_FT_WLEN + 10];
639 0 : if (key_length > sizeof(maria_ft_buf) ||
640 : my_b_read(&sinfo->tempfile_for_exceptions, (uchar*)maria_ft_buf,
641 : (uint) key_length))
642 0 : got_error= 1;
643 : else
644 : {
645 : MARIA_KEY tmp_key;
646 0 : tmp_key.keyinfo= info->s->keyinfo + sinfo->key;
647 0 : tmp_key.data= maria_ft_buf;
648 0 : tmp_key.ref_length= info->s->rec_reflength;
649 0 : tmp_key.data_length= key_length - info->s->rec_reflength;
650 0 : tmp_key.flag= 0;
651 0 : if (_ma_ck_write(info, &tmp_key))
652 0 : got_error=1;
653 : }
654 : }
655 : }
656 : }
657 10 : my_free(mergebuf,MYF(MY_ALLOW_ZERO_PTR));
658 10 : DBUG_RETURN(got_error);
659 : }
660 : #endif /* THREAD */
661 :
662 :
663 : /* Write all keys in memory to file for later merge */
664 :
665 : static int write_keys(MARIA_SORT_PARAM *info, register uchar **sort_keys,
666 : uint count, BUFFPEK *buffpek, IO_CACHE *tempfile)
667 0 : {
668 : uchar **end;
669 0 : uint sort_length=info->key_length;
670 0 : DBUG_ENTER("write_keys");
671 :
672 0 : my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
673 : info);
674 0 : if (!my_b_inited(tempfile) &&
675 : open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
676 : DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
677 0 : DBUG_RETURN(1); /* purecov: inspected */
678 :
679 0 : buffpek->file_pos=my_b_tell(tempfile);
680 0 : buffpek->count=count;
681 :
682 0 : for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
683 : {
684 0 : if (my_b_write(tempfile, *sort_keys, (uint) sort_length))
685 0 : DBUG_RETURN(1); /* purecov: inspected */
686 : }
687 0 : DBUG_RETURN(0);
688 : } /* write_keys */
689 :
690 :
691 : static inline int
692 : my_var_write(MARIA_SORT_PARAM *info, IO_CACHE *to_file, uchar *bufs)
693 0 : {
694 : int err;
695 0 : uint16 len= _ma_keylength(info->keyinfo, bufs);
696 :
697 : /* The following is safe as this is a local file */
698 0 : if ((err= my_b_write(to_file, (uchar*)&len, sizeof(len))))
699 0 : return (err);
700 0 : if ((err= my_b_write(to_file,bufs, (uint) len)))
701 0 : return (err);
702 0 : return (0);
703 : }
704 :
705 :
706 : static int write_keys_varlen(MARIA_SORT_PARAM *info,
707 : register uchar **sort_keys,
708 : uint count, BUFFPEK *buffpek,
709 : IO_CACHE *tempfile)
710 0 : {
711 : uchar **end;
712 : int err;
713 0 : DBUG_ENTER("write_keys_varlen");
714 :
715 0 : my_qsort2((uchar*) sort_keys,count,sizeof(uchar*),(qsort2_cmp) info->key_cmp,
716 : info);
717 0 : if (!my_b_inited(tempfile) &&
718 : open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
719 : DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
720 0 : DBUG_RETURN(1); /* purecov: inspected */
721 :
722 0 : buffpek->file_pos=my_b_tell(tempfile);
723 0 : buffpek->count=count;
724 0 : for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
725 : {
726 0 : if ((err= my_var_write(info,tempfile, *sort_keys)))
727 0 : DBUG_RETURN(err);
728 : }
729 0 : DBUG_RETURN(0);
730 : } /* write_keys_varlen */
731 :
732 :
733 : static int write_key(MARIA_SORT_PARAM *info, uchar *key,
734 : IO_CACHE *tempfile)
735 0 : {
736 0 : uint16 key_length=info->real_key_length;
737 0 : DBUG_ENTER("write_key");
738 :
739 0 : if (!my_b_inited(tempfile) &&
740 : open_cached_file(tempfile, my_tmpdir(info->tmpdir), "ST",
741 : DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
742 0 : DBUG_RETURN(1);
743 :
744 0 : if (my_b_write(tempfile, (uchar*)&key_length,sizeof(key_length)) ||
745 : my_b_write(tempfile, key, (uint) key_length))
746 0 : DBUG_RETURN(1);
747 0 : DBUG_RETURN(0);
748 : } /* write_key */
749 :
750 :
751 : /* Write index */
752 :
753 : static int write_index(MARIA_SORT_PARAM *info,
754 : register uchar **sort_keys,
755 : register uint count)
756 235 : {
757 235 : DBUG_ENTER("write_index");
758 :
759 235 : my_qsort2((uchar*) sort_keys,(size_t) count,sizeof(uchar*),
760 : (qsort2_cmp) info->key_cmp,info);
761 137795 : while (count--)
762 : {
763 137325 : if ((*info->key_write)(info, *sort_keys++))
764 0 : DBUG_RETURN(-1); /* purecov: inspected */
765 : }
766 235 : DBUG_RETURN(0);
767 : } /* write_index */
768 :
769 :
770 : /* Merge buffers to make < MERGEBUFF2 buffers */
771 :
772 : static int merge_many_buff(MARIA_SORT_PARAM *info, uint keys,
773 : uchar **sort_keys, BUFFPEK *buffpek,
774 : int *maxbuffer, IO_CACHE *t_file)
775 0 : {
776 : register int i;
777 : IO_CACHE t_file2, *from_file, *to_file, *temp;
778 : BUFFPEK *lastbuff;
779 0 : DBUG_ENTER("merge_many_buff");
780 :
781 0 : if (*maxbuffer < MERGEBUFF2)
782 0 : DBUG_RETURN(0); /* purecov: inspected */
783 0 : if (flush_io_cache(t_file) ||
784 : open_cached_file(&t_file2,my_tmpdir(info->tmpdir),"ST",
785 : DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
786 0 : DBUG_RETURN(1); /* purecov: inspected */
787 :
788 0 : from_file= t_file ; to_file= &t_file2;
789 0 : while (*maxbuffer >= MERGEBUFF2)
790 : {
791 0 : reinit_io_cache(from_file,READ_CACHE,0L,0,0);
792 0 : reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
793 0 : lastbuff=buffpek;
794 0 : for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
795 : {
796 0 : if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
797 : buffpek+i,buffpek+i+MERGEBUFF-1))
798 0 : goto cleanup;
799 : }
800 0 : if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
801 : buffpek+i,buffpek+ *maxbuffer))
802 0 : break; /* purecov: inspected */
803 0 : if (flush_io_cache(to_file))
804 0 : break; /* purecov: inspected */
805 0 : temp=from_file; from_file=to_file; to_file=temp;
806 0 : *maxbuffer= (int) (lastbuff-buffpek)-1;
807 : }
808 0 : cleanup:
809 0 : close_cached_file(to_file); /* This holds old result */
810 0 : if (to_file == t_file)
811 0 : *t_file=t_file2; /* Copy result file */
812 :
813 0 : DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
814 : } /* merge_many_buff */
815 :
816 :
817 : /*
818 : Read data to buffer
819 :
820 : SYNOPSIS
821 : read_to_buffer()
822 : fromfile File to read from
823 : buffpek Where to read from
824 : sort_length max length to read
825 : RESULT
826 : > 0 Ammount of bytes read
827 : -1 Error
828 : */
829 :
830 : static uint read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
831 : uint sort_length)
832 0 : {
833 : register uint count;
834 : uint length;
835 :
836 0 : if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
837 : {
838 0 : if (my_pread(fromfile->file, buffpek->base,
839 : (length= sort_length*count),buffpek->file_pos,MYF_RW))
840 0 : return((uint) -1); /* purecov: inspected */
841 0 : buffpek->key=buffpek->base;
842 0 : buffpek->file_pos+= length; /* New filepos */
843 0 : buffpek->count-= count;
844 0 : buffpek->mem_count= count;
845 : }
846 0 : return (count*sort_length);
847 : } /* read_to_buffer */
848 :
849 : static uint read_to_buffer_varlen(IO_CACHE *fromfile, BUFFPEK *buffpek,
850 : uint sort_length)
851 0 : {
852 : register uint count;
853 : uint idx;
854 : uchar *buffp;
855 :
856 0 : if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
857 : {
858 0 : buffp= buffpek->base;
859 :
860 0 : for (idx=1;idx<=count;idx++)
861 : {
862 : uint16 length_of_key;
863 0 : if (my_pread(fromfile->file,(uchar*)&length_of_key,sizeof(length_of_key),
864 : buffpek->file_pos,MYF_RW))
865 0 : return((uint) -1);
866 0 : buffpek->file_pos+=sizeof(length_of_key);
867 0 : if (my_pread(fromfile->file, buffp, length_of_key,
868 : buffpek->file_pos,MYF_RW))
869 0 : return((uint) -1);
870 0 : buffpek->file_pos+=length_of_key;
871 0 : buffp = buffp + sort_length;
872 : }
873 0 : buffpek->key=buffpek->base;
874 0 : buffpek->count-= count;
875 0 : buffpek->mem_count= count;
876 : }
877 0 : return (count*sort_length);
878 : } /* read_to_buffer_varlen */
879 :
880 :
881 : static int write_merge_key_varlen(MARIA_SORT_PARAM *info,
882 : IO_CACHE *to_file, uchar* key,
883 : uint sort_length, uint count)
884 0 : {
885 : uint idx;
886 0 : uchar *bufs = key;
887 :
888 0 : for (idx=1;idx<=count;idx++)
889 : {
890 : int err;
891 0 : if ((err= my_var_write(info, to_file, bufs)))
892 0 : return (err);
893 0 : bufs=bufs+sort_length;
894 : }
895 0 : return(0);
896 : }
897 :
898 :
899 : static int write_merge_key(MARIA_SORT_PARAM *info __attribute__((unused)),
900 : IO_CACHE *to_file, uchar *key,
901 : uint sort_length, uint count)
902 0 : {
903 0 : return my_b_write(to_file, key, (size_t) sort_length*count);
904 : }
905 :
906 : /*
907 : Merge buffers to one buffer
908 : If to_file == 0 then use info->key_write
909 : */
910 :
911 : static int NEAR_F
912 : merge_buffers(MARIA_SORT_PARAM *info, uint keys, IO_CACHE *from_file,
913 : IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
914 : BUFFPEK *Fb, BUFFPEK *Tb)
915 0 : {
916 : int error;
917 : uint sort_length,maxcount;
918 : ha_rows count;
919 : my_off_t to_start_filepos;
920 : uchar *strpos;
921 : BUFFPEK *buffpek,**refpek;
922 : QUEUE queue;
923 0 : volatile int *killed= _ma_killed_ptr(info->sort_info->param);
924 0 : DBUG_ENTER("merge_buffers");
925 :
926 0 : count=error=0;
927 0 : maxcount=keys/((uint) (Tb-Fb) +1);
928 0 : DBUG_ASSERT(maxcount > 0);
929 0 : LINT_INIT(to_start_filepos);
930 0 : if (to_file)
931 0 : to_start_filepos=my_b_tell(to_file);
932 0 : strpos= (uchar*) sort_keys;
933 0 : sort_length=info->key_length;
934 :
935 0 : if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
936 : (int (*)(void*, uchar *,uchar*)) info->key_cmp,
937 : (void*) info))
938 0 : DBUG_RETURN(1); /* purecov: inspected */
939 :
940 0 : for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
941 : {
942 0 : count+= buffpek->count;
943 0 : buffpek->base= strpos;
944 0 : buffpek->max_keys=maxcount;
945 0 : strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
946 : sort_length));
947 0 : if (error == -1)
948 0 : goto err; /* purecov: inspected */
949 0 : queue_insert(&queue,(uchar*) buffpek);
950 : }
951 :
952 0 : while (queue.elements > 1)
953 : {
954 : for (;;)
955 : {
956 0 : if (*killed)
957 : {
958 0 : error=1; goto err;
959 : }
960 0 : buffpek=(BUFFPEK*) queue_top(&queue);
961 0 : if (to_file)
962 : {
963 0 : if (info->write_key(info,to_file, buffpek->key,
964 : (uint) sort_length,1))
965 : {
966 0 : error=1; goto err; /* purecov: inspected */
967 : }
968 : }
969 : else
970 : {
971 0 : if ((*info->key_write)(info,(void*) buffpek->key))
972 : {
973 0 : error=1; goto err; /* purecov: inspected */
974 : }
975 : }
976 0 : buffpek->key+=sort_length;
977 0 : if (! --buffpek->mem_count)
978 : {
979 0 : if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
980 : {
981 0 : uchar *base= buffpek->base;
982 0 : uint max_keys=buffpek->max_keys;
983 :
984 0 : VOID(queue_remove(&queue,0));
985 :
986 : /* Put room used by buffer to use in other buffer */
987 0 : for (refpek= (BUFFPEK**) &queue_top(&queue);
988 0 : refpek <= (BUFFPEK**) &queue_end(&queue);
989 0 : refpek++)
990 : {
991 0 : buffpek= *refpek;
992 0 : if (buffpek->base+buffpek->max_keys*sort_length == base)
993 : {
994 0 : buffpek->max_keys+=max_keys;
995 0 : break;
996 : }
997 0 : else if (base+max_keys*sort_length == buffpek->base)
998 : {
999 0 : buffpek->base=base;
1000 0 : buffpek->max_keys+=max_keys;
1001 0 : break;
1002 : }
1003 : }
1004 : break; /* One buffer have been removed */
1005 : }
1006 : }
1007 0 : else if (error == -1)
1008 0 : goto err; /* purecov: inspected */
1009 0 : queue_replaced(&queue); /* Top element has been replaced */
1010 0 : }
1011 : }
1012 0 : buffpek=(BUFFPEK*) queue_top(&queue);
1013 0 : buffpek->base= (uchar*) sort_keys;
1014 0 : buffpek->max_keys=keys;
1015 : do
1016 : {
1017 0 : if (to_file)
1018 : {
1019 0 : if (info->write_key(info, to_file, buffpek->key,
1020 : sort_length,buffpek->mem_count))
1021 : {
1022 0 : error=1; goto err; /* purecov: inspected */
1023 : }
1024 : }
1025 : else
1026 : {
1027 : register uchar *end;
1028 0 : strpos= buffpek->key;
1029 0 : for (end= strpos+buffpek->mem_count*sort_length;
1030 0 : strpos != end ;
1031 0 : strpos+=sort_length)
1032 : {
1033 0 : if ((*info->key_write)(info, strpos))
1034 : {
1035 0 : error=1; goto err; /* purecov: inspected */
1036 : }
1037 : }
1038 : }
1039 : }
1040 : while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) !=
1041 0 : -1 && error != 0);
1042 :
1043 0 : lastbuff->count=count;
1044 0 : if (to_file)
1045 0 : lastbuff->file_pos=to_start_filepos;
1046 0 : err:
1047 0 : delete_queue(&queue);
1048 0 : DBUG_RETURN(error);
1049 : } /* merge_buffers */
1050 :
1051 :
1052 : /* Do a merge to output-file (save only positions) */
1053 :
1054 : static int NEAR_F
1055 : merge_index(MARIA_SORT_PARAM *info, uint keys, uchar **sort_keys,
1056 : BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
1057 0 : {
1058 0 : DBUG_ENTER("merge_index");
1059 0 : if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
1060 : buffpek+maxbuffer))
1061 0 : DBUG_RETURN(1); /* purecov: inspected */
1062 0 : DBUG_RETURN(0);
1063 : } /* merge_index */
1064 :
1065 :
1066 : static int flush_maria_ft_buf(MARIA_SORT_PARAM *info)
1067 235 : {
1068 235 : int err=0;
1069 235 : if (info->sort_info->ft_buf)
1070 : {
1071 0 : err=_ma_sort_ft_buf_flush(info);
1072 0 : my_free(info->sort_info->ft_buf, MYF(0));
1073 0 : info->sort_info->ft_buf=0;
1074 : }
1075 235 : return err;
1076 : }
|