GList - double linked abstract list implementation
[tedtools.git] / tbtree.c
1 /*
2  * Copyright (c) 2005 Teodor Sigaev <teodor@sigaev.ru>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *      notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *      notice, this list of conditions and the following disclaimer in the
12  *      documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the author nor the names of any co-contributors
14  *      may be used to endorse or promote products derived from this software
15  *      without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20  * ARE DISCLAIMED. IN NO EVENT SHALL CONTRIBUTORS BE LIABLE FOR ANY
21  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
23  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
25  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
27  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/file.h>
38
39 #include "tmalloc.h"
40 #include "tlog.h"
41
42 #include "tbtree.h"
43
44 #define NOTFOUND        0
45 #define FOUND           1
46
47 static int TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) ;
48 static int TBTWritePage(TBTree *db, TBTMemPage *page) ;
49 static int TBTNewPage(TBTree *db, TBTMemPage **page) ;
50
51
52
53 int 
54 TBTOpen(TBTree *db, char *file) {
55         off_t offset;
56
57         if ( db->readonly ) {
58                 db->fd = open(file, O_RDONLY);
59                 if ( db->fd < 0 ) {
60                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
61                         return TBT_ERROR;
62                 }
63                 if ( flock( db->fd, LOCK_SH ) < 0 ) {
64                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
65                         close(db->fd);
66                         return TBT_ERROR;
67                 }
68         } else {
69                 db->fd = open(file, O_CREAT | O_RDWR, 0666);
70                 if ( db->fd < 0 ) {
71                         tlog(TL_CRIT,"TBTOpen: open failed: %s",strerror(errno));
72                         return TBT_ERROR;
73                 }
74                 if ( flock( db->fd, LOCK_EX ) < 0 ) {
75                         tlog(TL_CRIT,"TBTOpen: flock failed: %s",strerror(errno));
76                         close(db->fd);
77                         return TBT_ERROR;
78                 }
79         }
80
81         if ( db->npage ) {
82                 db->Cache=(TBTMemPage**)t0malloc( sizeof(TBTMemPage*)*HASHSIZE(db->npage) );
83                 db->curpage=0;
84         }
85
86         db->cachehits = db->pageread = db->pagewrite = 0;
87
88         if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
89                 tlog(TL_CRIT,"TBTOpen: lseek failed: %s",strerror(errno));
90                 flock( db->fd, LOCK_UN );
91                 close(db->fd);
92                 return TBT_ERROR;
93         }
94
95         if ( offset==0 ) {
96                 if ( db->readonly ) {
97                         tlog(TL_CRIT,"TBTOpen: file is void");
98                         flock( db->fd, LOCK_UN );
99                         close(db->fd);
100                         return TBT_ERROR;
101                 } else {
102                         TBTMemPage *page;
103                         if ( TBTNewPage(db, &page) != TBT_OK ) {
104                                 flock( db->fd, LOCK_UN );
105                                 close(db->fd);
106                                 return TBT_ERROR;
107                         }
108                         page->page.isleaf=1;
109                         if ( TBTWritePage(db, page)!=TBT_OK ){
110                                 flock( db->fd, LOCK_UN );
111                                 close(db->fd);
112                                 return TBT_ERROR;
113                         }
114                 }
115         } 
116
117         db->pointersize = TBTPOINTERSIZE(db);
118         db->lastpagenumber=0;                            
119         return TBT_OK; 
120 }
121
122 int
123 TBTClose(TBTree *db) {
124         int rc=0;
125
126         if ( !db->readonly ) 
127                 rc=TBTSync(db);
128
129         if ( db->npage ) {
130                 u_int32_t i;
131                 TBTMemPage *ptr,*tmp;
132
133                 for(i=0; i<HASHSIZE(db->npage); i++) {
134                         ptr = db->Cache[i];
135                         while(ptr) {
136                                 tmp = ptr->link;
137                                 tfree(ptr);
138                                 ptr=tmp;
139                         }
140                 }
141
142                 tfree( db->Cache );
143         }
144         
145         flock( db->fd, LOCK_UN );
146         close(db->fd);  
147         
148         return (rc) ? TBT_ERROR : TBT_OK;
149 }
150
151 static int
152 cmpNPage(const void *a, const void *b) {
153         tassert( (*(TBTMemPage**)a)->pagenumber != (*(TBTMemPage**)b)->pagenumber );
154         return ( (*(TBTMemPage**)a)->pagenumber > (*(TBTMemPage**)b)->pagenumber ) ? 1 : -1;
155 }
156
157
158 int
159 TBTSync(TBTree *db) {
160         int rc=0;
161
162         if ( db->readonly || db->npage==0 )
163                 return TBT_OK;
164
165         if ( db->npage ) {
166                 u_int32_t i, total=0;
167                 TBTMemPage *ptr, **data = (TBTMemPage**)tmalloc(db->npage*sizeof(TBTMemPage*)), **pptr;
168                 pptr=data;
169                 for(i=0; i<HASHSIZE(db->npage); i++) {
170                         ptr = db->Cache[i];
171                         while(ptr) {
172                                 if ( !ptr->issynced ) {
173                                         *pptr = ptr;
174                                         pptr++;
175                                 } 
176                                 ptr = ptr->link;
177                         }
178                 }
179                 
180                 total=pptr-data;
181                 if ( total > 0 ) {
182                         if ( total>1 )
183                                 qsort( data, total, sizeof(TBTMemPage*), cmpNPage);
184
185                         pptr=data;
186                         while( pptr-data<total) {
187                                 rc |= TBTWritePage(db, *pptr);
188                                 pptr++;
189                         }
190                 }
191                 tfree(data);
192         }
193         
194         if ( fsync(db->fd) ) {
195                 tlog(TL_CRIT,"TBTSync: fsync failed: %s",strerror(errno));
196                 rc=TBT_ERROR;
197         }
198
199         return (rc) ? TBT_ERROR : TBT_OK;
200 }
201
202 static TBTMemPage *
203 findInCache(TBTree *db, u_int32_t pagenumber) {
204         TBTMemPage *ptr;
205
206         ptr = db->Cache[ pagenumber % HASHSIZE(db->npage) ];
207
208         while(ptr && ptr->pagenumber != pagenumber)
209                 ptr = ptr->link; 
210
211         return ptr;
212 }
213
214 static void
215 removeFromCache(TBTree *db, u_int32_t pagenumber) {
216         TBTMemPage *ptr, *prev=NULL;
217         int pos = pagenumber % HASHSIZE(db->npage);
218
219         ptr = db->Cache[ pos ];
220
221         while(ptr) {
222                 if ( ptr->pagenumber == pagenumber ) {
223                         if ( prev )
224                                 prev->link = ptr->link;
225                         else
226                                 db->Cache[ pos ] = ptr->link;
227                         break;
228                 }
229                 prev=ptr;
230                 ptr = ptr->link;
231         }
232 }
233
234 static void
235 insertInCache(TBTree *db, TBTMemPage *page) {
236         int pos = page->pagenumber % HASHSIZE(db->npage);
237
238         page->link = db->Cache[ pos ];
239         db->Cache[ pos ] = page;
240 }
241
242 static TBTMemPage*
243 PageAlloc(TBTree *db) {
244         TBTMemPage *page;
245
246         if ( db->curpage < db->npage ) {
247                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
248                 memset( page, 0, TBTMEMPAGEHDRSZ );
249                 page->iscached=1;
250                 if ( db->curpage == 0 ) {
251                         db->TimeCache = db->TimeCacheLast = page;       
252                 } else {
253                         db->TimeCacheLast->next = page;
254                         page->prev = db->TimeCacheLast;
255                         db->TimeCacheLast = page;
256                 }
257                 db->curpage++;
258         } else if ( db->npage == 0 ) {
259                 page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
260                 memset( page, 0, TBTMEMPAGEHDRSZ );
261         } else {
262                 page = db->TimeCache;
263                 while( page->next && page->islocked ) 
264                         page = page->next;
265                 if ( page==db->TimeCacheLast && page->islocked ) { /*all pages locked*/
266                         /* tlog(TL_WARN,"TBTReadPage: all pages in cache locked, increase number of cached pages"); */ 
267                         page = (TBTMemPage*)tmalloc(sizeof(TBTMemPage));
268                         memset( page, 0, TBTMEMPAGEHDRSZ );
269                 } else {
270                         if ( !page->issynced )
271                                 if ( TBTWritePage(db, page) )
272                                         return NULL;
273                         removeFromCache(db, page->pagenumber);
274                         if ( page != db->TimeCacheLast ) {
275                                 if ( page == db->TimeCache ) {
276                                         db->TimeCache = page->next;
277                                         db->TimeCache->prev = NULL;
278                                 } else { 
279                                         page->prev->next = page->next;
280                                         page->next->prev = page->prev;
281                                 }
282                                 memset( page, 0, TBTMEMPAGEHDRSZ );
283                                 db->TimeCacheLast->next = page;
284                                 page->prev = db->TimeCacheLast;
285                                 db->TimeCacheLast = page;
286                         } else 
287                                 memset( page, 0, TBTMEMPAGEHDRSZ );
288                         page->iscached=1;
289                 }
290         }
291
292         return page;
293 }
294
295 static int
296 TBTReadPage(TBTree *db, u_int32_t pagenumber, TBTMemPage **page) {
297         int rc;
298
299         if ( db->npage && (*page=findInCache(db, pagenumber))!=NULL ) {
300                 db->cachehits++;
301                 if ( *page != db->TimeCacheLast ) {
302                         if ( (*page) == db->TimeCache ) {
303                                 db->TimeCache = (*page)->next;
304                                 db->TimeCache->prev=NULL;
305                         } else { 
306                                 (*page)->prev->next = (*page)->next;
307                                 (*page)->next->prev = (*page)->prev;
308                         }
309                         db->TimeCacheLast->next = (*page);
310                         (*page)->prev = db->TimeCacheLast;
311                         db->TimeCacheLast = *page;
312                         (*page)->next = NULL;
313                 }
314         } else {
315                 off_t  offset = (off_t)pagenumber * (off_t)TBTREEPAGESIZE;
316         
317                 if( (*page = PageAlloc(db))==NULL )
318                         return TBT_ERROR;
319
320                 if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
321                         tlog(TL_CRIT,"TBTReadPage: lseek failed: %s",strerror(errno));
322                         return TBT_ERROR;
323                 }
324                 (*page)->issynced=1;
325                 (*page)->pagenumber=pagenumber;
326                 if ( (rc=read(db->fd, &( (*page)->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
327                         tlog(TL_CRIT,"TBTReadPage: read failed: %s %d %d",strerror(errno), pagenumber, offset); 
328                         return TBT_ERROR;
329                 }
330                 if ( (*page)->iscached ) 
331                         insertInCache(db, *page);
332         }
333
334         db->pageread++;
335         return TBT_OK;   
336 }
337
338 static int
339 TBTWritePage(TBTree *db, TBTMemPage *page) {
340         off_t  offset = (off_t)(page->pagenumber) * (off_t)TBTREEPAGESIZE;
341         off_t  size;
342
343         if ( page->issynced )
344                 return TBT_OK;
345
346         if ( lseek(db->fd, offset, SEEK_SET) != offset ) {
347                 tlog(TL_CRIT,"TBTWritePage: lseek failed: %s",strerror(errno));
348                 return TBT_ERROR;
349         }
350
351         if ( (size=write(db->fd, &( page->page ), TBTREEPAGESIZE)) != TBTREEPAGESIZE ) {
352                 tlog(TL_CRIT,"TBTWritePage: write failed: %s (writed only %d bytes)",strerror(errno), size);
353                 return TBT_ERROR;
354         }
355
356         page->issynced=1;
357         db->pagewrite++;
358
359         return TBT_OK;
360 }
361
362 static int
363 TBTNewPage(TBTree *db, TBTMemPage **page) {
364         if ( db->lastpagenumber==0 ) {
365                 off_t  offset;
366                 if ( (offset=lseek(db->fd, 0, SEEK_END)) < 0 ) {
367                         tlog(TL_CRIT,"TBTNewPage: lseek failed: %s",strerror(errno));
368                         return TBT_ERROR;
369                 }
370
371                 if ( offset % TBTREEPAGESIZE != 0 ) {
372                         tlog(TL_CRIT,"TBTNewPage: file corrupted");
373                         return TBT_ERROR;
374                 }
375
376                 db->lastpagenumber = offset/TBTREEPAGESIZE;
377         }
378
379         if( (*page = PageAlloc(db))==NULL )
380                 return TBT_ERROR;
381
382         (*page)->pagenumber = db->lastpagenumber;
383         (*page)->issynced=0;
384         (*page)->islocked=1;
385
386         db->lastpagenumber++;
387
388         memset( &((*page)->page), 0, TBTMEMPAGEHDRSZ);
389         (*page)->page.freespace = TBTREEPAGESIZE-TBTPAGEHDRSZ;
390
391         if ( (*page)->iscached ) 
392                 insertInCache(db, *page);
393         
394         return TBT_OK;
395 }
396
397 static void
398 printValue(u_int32_t len, char *ptr) {
399         putchar('\'');
400         while(len>0) {
401                 putchar(*ptr);
402                 ptr++; len--;
403         }
404         putchar('\'');
405 }
406
407 static void
408 dumpPage(TBTree *db, TBTPage *page, int follow) {
409         u_int32_t i,j;
410
411         for(j=0;j<follow;j++)
412                 putchar(' ');
413         printf("Page: f:%d l:%d r:%d n:%d\n", 
414                 page->freespace,
415                 page->isleaf,
416                 page->rightlink,
417                 page->npointer
418         );      
419         for(i=0;i<page->npointer;i++) {
420                 TBTPointer *ptr= (TBTPointer*)(page->data+db->pointersize*i);
421                 for(j=0;j<follow+2;j++)
422                         putchar(' ');
423                 printf("i:%d(%d) kl:%d ko:%d ",
424                         i, (int)ptr, 
425                         (db->keylen) ? db->keylen : ptr->key.varlen.length,
426                         (db->keylen) ? 0 : ptr->key.varlen.offset
427                 );
428                 if ( db->keylen==0 ) 
429                         printValue(ptr->key.varlen.length, page->data + ptr->key.varlen.offset);
430                 else
431                         printf("'%d'", *(int*)(ptr->key.fixed.key));
432                 if ( page->isleaf ) {
433                         printf(" vl:%d vo:%d ", 
434                                 ptr->pointer.leaf.length,
435                                 ptr->pointer.leaf.offset
436                         );
437                         printValue(ptr->pointer.leaf.length,  page->data + ptr->pointer.leaf.offset );
438                         putchar('\n');
439                 } else {
440                         printf(" page:%d\n", ptr->pointer.internal.link);
441                         if ( follow>=0 )
442                                 dumpTree(db, ptr->pointer.internal.link, follow+4); 
443                 }
444         }
445 }
446
447 void
448 dumpTree(TBTree *db, u_int32_t pagenumber, int follow) {
449         TBTMemPage  *page;
450
451         if ( TBTReadPage(db, pagenumber, &page) ) {
452                 tlog(TL_CRIT,"BEDA %d\n", pagenumber);
453                 return;
454         }
455
456         page->islocked=1;
457         dumpPage(db, &(page->page), follow);
458         page->islocked=0;
459         if ( !page->iscached )
460                 tfree(page);
461 }
462
463 static int
464 findInPage(TBTree *db, TBTPage *page, TBTValue *key, TBTPointer **res) {
465         TBTPointer      *StopLow = (TBTPointer*)(page->data);
466         TBTPointer      *StopHigh = (TBTPointer*)(page->data + db->pointersize*page->npointer);
467         TBTPointer      *StopMiddle;
468         int diff ;
469         TBTValue  A;
470
471         A.length =  db->keylen;
472         /* Loop invariant: StopLow <= StopMiddle < StopHigh */
473         while (StopLow < StopHigh) {
474                 StopMiddle = (TBTPointer*)(((char*)StopLow) + db->pointersize*((((char*)StopHigh - (char*)StopLow)/db->pointersize) >> 1));
475                 if ( db->keylen == 0 ) {
476                         A.length = StopMiddle->key.varlen.length;
477                         A.value = page->data + StopMiddle->key.varlen.offset;
478                 } else 
479                         A.value = StopMiddle->key.fixed.key;
480                 
481         
482                 if ( ISINFPOINTER(db, page, StopMiddle) )
483                         diff = 1;
484                 else 
485                         diff = db->cmpkey( &A, key );
486                 if ( diff == 0 ) {
487                         *res = StopMiddle;
488                         return FOUND;
489                 } else if ( diff < 0 )
490                         StopLow = (TBTPointer*)((char*)StopMiddle + db->pointersize);
491                 else
492                         StopHigh = StopMiddle;
493         }
494
495         *res = StopHigh;
496         return NOTFOUND;
497 }
498
499 static int
500 findPage(TBTree *db, TBTValue *key, TBTMemPage **page) {
501         u_int32_t pagenumber = TBTPAGEROOT;
502         int rc = TBT_OK;
503
504         *page=NULL;
505         while(1) {
506                 TBTPointer *ptr;
507
508                 if ( (rc=TBTReadPage(db, pagenumber, page))!=TBT_OK )
509                         return rc;
510
511                 if ( (*page)->page.isleaf ) 
512                         return TBT_OK;
513
514                 findInPage(db, &((*page)->page), key, &ptr );
515                 pagenumber = ptr->pointer.internal.link;
516                 if ( !(*page)->iscached )
517                         tfree(*page);
518         }
519
520         return TBT_OK;
521 }
522
523 int
524 TBTFind(TBTree *db, TBTValue *key, TBTValue *value) {
525         TBTMemPage      *page;
526         TBTPointer *ptr;
527         int rc;
528
529         memset(value, 0, sizeof(TBTValue));
530
531         if ( (rc=findPage(db, key, &page))!=TBT_OK )
532                 return rc; 
533                 
534         if ( page == NULL )
535                 return TBT_OK;
536
537         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
538                 if ( !(page)->iscached )
539                         tfree(page);
540                 return TBT_OK;
541         } 
542
543         value->length = ptr->pointer.leaf.length;
544         value->value = tmalloc(value->length);
545         memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length );
546
547         if ( !(page)->iscached )
548                 tfree(page);
549
550         return TBT_OK;  
551 }
552
553 static void
554 packLeafKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, TBTValue *value) {
555         if ( ptr ) {
556                 if ( (char*)ptr - page->data  != page->npointer * db->pointersize ) 
557                         memmove((char*)ptr+db->pointersize, ptr, page->npointer * db->pointersize - ( (char*)ptr - page->data ));
558         } else
559                 ptr = (TBTPointer*)(page->data + page->npointer * db->pointersize);
560
561         page->npointer++;
562         page->freespace -= db->pointersize + PTRALIGN(value->length);
563         if ( db->keylen ) {
564                 memcpy( ptr->key.fixed.key, key->value, db->keylen );
565                 ptr->pointer.leaf.offset = page->npointer * db->pointersize + page->freespace;
566         } else {
567                 page->freespace -= PTRALIGN(key->length);
568                 ptr->key.varlen.length = key->length;
569                 ptr->key.varlen.offset = page->npointer * db->pointersize + page->freespace;
570                 memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
571                 ptr->pointer.leaf.offset = page->npointer * db->pointersize + page->freespace + PTRALIGN(key->length);
572         } 
573         ptr->pointer.leaf.length = value->length;
574         memcpy( page->data + ptr->pointer.leaf.offset, value->value, value->length );
575 }
576
577 static void
578 packInternalKV(TBTree *db, TBTPage *page, TBTPointer *ptr, TBTValue *key, u_int32_t pagenumber) {
579         if ( ptr ) {
580                 if ( (char*)ptr - page->data  != page->npointer * db->pointersize ) 
581                         memmove((char*)ptr+db->pointersize, ptr, page->npointer * db->pointersize - ( (char*)ptr - page->data ));
582         } else
583                 ptr = (TBTPointer*)(page->data + page->npointer * db->pointersize);
584
585         page->npointer++;
586         page->freespace -= db->pointersize;
587
588         if ( db->keylen ) {
589                 if ( key->value )
590                         memcpy( ptr->key.fixed.key, key->value, db->keylen );
591         } else {
592                 page->freespace -= PTRALIGN(key->length);
593                 ptr->key.varlen.length = key->length;
594                 ptr->key.varlen.offset = page->npointer * db->pointersize + page->freespace;
595                 if ( key->length ) 
596                         memcpy( page->data + ptr->key.varlen.offset, key->value, key->length);
597         }
598         ptr->pointer.internal.link = pagenumber;
599 }
600
601 static void
602 deleteKV(TBTree *db, TBTPage *page, TBTPointer **ptrs, u_int32_t num) {
603         TBTPointer *ptr = (TBTPointer*)page->data, *tmp, **ptrptr = ptrs;
604         u_int32_t i;
605
606         /* suppose ptrs are ordered! */
607         tmp = ptr;
608         while( ptrptr - ptrs < num && (char*)ptr - page->data < db->pointersize*page->npointer ) {
609                 if ( *ptrptr < ptr ) {
610                         tlog(TL_CRIT,"deleteKV: Something wrong: unexistent *ptrs"); 
611                         ptrptr++;
612                 } else if ( *ptrptr > ptr ) {
613                         if ( tmp != ptr )
614                                 memcpy(tmp, ptr, db->pointersize);
615                         ptr = (TBTPointer*)( (char*)ptr + db->pointersize );
616                         tmp = (TBTPointer*)( (char*)tmp + db->pointersize );
617                 } else {
618                         page->freespace += db->pointersize + 
619                                 ( ( page->isleaf ) ? PTRALIGN(ptr->pointer.leaf.length) : 0 ) +
620                                 ( ( db->keylen ) ? 0 : PTRALIGN(ptr->key.varlen.length) );
621                         ptrptr++;
622                         ptr = (TBTPointer*)( (char*)ptr + db->pointersize );
623                 }
624         }
625
626         if ( (char*)ptr - page->data < db->pointersize*page->npointer && tmp!=ptr )
627                 memmove( tmp, ptr, page->npointer * db->pointersize - ((char*)ptr - page->data) );
628
629         page->npointer-=num;
630
631         if ( page->isleaf || db->keylen == 0 ) {
632                 static char buf[TBTREEPAGESIZE];
633                 char  *bufptr = buf;
634                 u_int32_t start = page->npointer * db->pointersize + page->freespace;
635
636                 tmp = (TBTPointer*)page->data;
637                 for(i=0;i<page->npointer;i++) {
638                         if ( page->isleaf ) {
639                                 memcpy( bufptr, page->data + tmp->pointer.leaf.offset, tmp->pointer.leaf.length );
640                                 tmp->pointer.leaf.offset = start + (bufptr-buf);
641                                 bufptr+=PTRALIGN(tmp->pointer.leaf.length);
642                         }
643         
644                         if ( db->keylen == 0 ) {
645                                 if ( tmp->key.varlen.length )
646                                         memcpy( bufptr, page->data + tmp->key.varlen.offset, tmp->key.varlen.length );
647                                 tmp->key.varlen.offset = start + (bufptr-buf);
648                                 bufptr+=PTRALIGN(tmp->key.varlen.length);
649                         }
650                         tmp = (TBTPointer*)( (char*)tmp + db->pointersize );
651                 }
652
653                 memcpy( page->data + start, buf, bufptr-buf );
654         }
655 }
656
657 static u_int32_t
658 findDelimiter(TBTree *db, TBTPage *page, TBTPointer *ptr, int size, u_int32_t start, int *where) {
659         int  *sizes = (int*) tmalloc( sizeof(int) * (page->npointer+1) );
660         int i;
661         TBTPointer *tomove;
662         int lfree=TBTREEPAGESIZE-TBTPAGEHDRSZ, rfree=TBTREEPAGESIZE-TBTPAGEHDRSZ;
663         int nptr = ( (char*)ptr - page->data ) / db->pointersize; 
664         int was=0;
665
666         for(i=0; i<page->npointer;i++) {
667                 tomove = (TBTPointer*)(page->data + i*db->pointersize);
668                 if ( was==0 && i==nptr ) {
669                         sizes[i] = size;
670                         was=1;
671                 } 
672                 sizes[i+was] = db->pointersize +
673                         ( (page->isleaf) ? PTRALIGN(tomove->pointer.leaf.length) : 0 ) +
674                         ( ( db->keylen ) ? 0 : PTRALIGN(tomove->key.varlen.length) );
675         
676         }
677         
678         if ( was==0 ) {
679                 sizes[page->npointer]=size;
680         }
681
682         for(i=0;i<page->npointer+1;i++) 
683                 if ( i< start )
684                         lfree-=sizes[i];
685                 else 
686                         rfree-=sizes[i];
687
688         while( 1 ) {
689                 if ( lfree<0 ) {
690                         start--;
691                         lfree+=sizes[start];
692                         rfree-=sizes[start];
693                 } else if ( rfree < 0 ) { 
694                         lfree-=sizes[start];
695                         rfree+=sizes[start];
696                         start++;
697                 } else
698                         break;
699         }
700
701         *where = (nptr < start) ? -1 : 1;
702         if ( start>nptr )
703                 start--;
704
705         tfree(sizes);
706         return start;
707 }
708
709 static void
710 populatePages(TBTree *db, TBTPage *srcpage, TBTPage* newpage, u_int32_t start) {
711         u_int32_t i;
712         TBTPointer *tomove, **todelete = tmalloc( sizeof(TBTPointer*)* srcpage->npointer );
713         u_int32_t numtodelete=0;
714         TBTValue k, v;
715
716         for(i=start;i<srcpage->npointer;i++) {
717                 tomove = (TBTPointer*)(srcpage->data + i*db->pointersize);
718
719                 todelete[numtodelete] = tomove;
720                 numtodelete++;
721
722                 if ( db->keylen ) 
723                         k.value = tomove->key.fixed.key;
724                 else {
725                         k.length = tomove->key.varlen.length;
726                         k.value = srcpage->data + tomove->key.varlen.offset;
727                 } 
728                                 
729                 if ( srcpage->isleaf ) {
730                         v.length = tomove->pointer.leaf.length;
731                         v.value = srcpage->data + tomove->pointer.leaf.offset;
732                         packLeafKV(db, newpage, NULL, &k, &v);
733                 } else 
734                         packInternalKV(db, newpage, NULL, &k, tomove->pointer.internal.link);
735         }
736
737         deleteKV(db, srcpage, todelete, numtodelete);   
738         tfree(todelete);
739 }
740
741 static int
742 splitPage(TBTree *db, TBTMemPage *srcpage, TBTMemPage** newpage, TBTPointer **ptr, int size, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
743         TBTMemPage *tmp;
744         int start=0, where=0;
745         int nptr = ( (char*)(*ptr) - srcpage->page.data ) / db->pointersize;
746
747         if ( TBTNewPage(db, newpage) )
748                 return TBT_ERROR;
749
750         tmp = *newpage;
751         srcpage->issynced=tmp->issynced=0;
752         tmp->islocked=1;
753         tmp->page.rightlink = srcpage->page.rightlink;
754         srcpage->page.rightlink = tmp->pagenumber;
755         tmp->page.isleaf = srcpage->page.isleaf;
756
757         switch(db->strategy) {
758                 case TBT_STATEGY_HALF:
759                         start = findDelimiter(db, &(srcpage->page), *ptr, size, (srcpage->page.npointer+1)/2, &where);  
760                         break;
761                 case TBT_STATEGY_LEFTFILL:
762                         start = findDelimiter(db, &(srcpage->page), *ptr, size, srcpage->page.npointer, &where);        
763                         break;
764                 case TBT_STATEGY_RIGHTFILL:
765                         start = findDelimiter(db, &(srcpage->page), *ptr, size, 0, &where);     
766                         break;
767                 default:
768                         tlog(TL_CRIT,"splitPage: unknown strategy: %d", db->strategy);
769                         return TBT_ERROR;
770         }
771
772         populatePages(db, &(srcpage->page), &(tmp->page), start);
773         if ( where<0 )
774                 tmp=srcpage;
775         else
776                 nptr-=start;
777         *ptr = (TBTPointer*)(tmp->page.data + nptr*db->pointersize);
778
779         if ( tmp->page.isleaf ) 
780                 packLeafKV(db, &(tmp->page), *ptr, key, value);
781         else
782                 packInternalKV(db, &(tmp->page), *ptr, key, pagenumber);
783         return TBT_OK;
784 }
785
786 static int
787 savePage(TBTree *db, TBTMemPage *page) {
788         int rc;
789         page->islocked=0;
790
791         if ( !page->iscached ) {
792                 if (  page->issynced==0 && (rc=TBTWritePage(db, page)) != TBT_OK )
793                         return rc;
794                 tfree(page);
795         }
796         return TBT_OK;
797 }
798
799 static int
800 layerInsert(TBTree *db, u_int32_t pagenumber, TBTMemPage **left, TBTMemPage **right, TBTValue *key, TBTValue *value) {
801         int rc,fnd;
802         TBTPointer      *ptr=NULL;
803         TBTMemPage      *page=NULL, *newright=NULL;
804                         
805         if ( (rc=TBTReadPage(db, pagenumber, &page))!=TBT_OK )
806                 return rc;
807
808         page->islocked=1;
809
810         fnd = findInPage(db, &(page->page), key, &ptr );
811
812         *left=*right=NULL;
813         if ( page->page.isleaf ) {
814                 u_int32_t size = db->pointersize +
815                         PTRALIGN(value->length) +
816                         ( ( db->keylen ) ? 0 : PTRALIGN(key->length) );
817                 
818                 if ( size>(TBTREEPAGESIZE-TBTPAGEHDRSZ)/2 ) {
819                         tlog(TL_ALARM,"Huge size: %d bytes", size);
820                         return TBT_HUGE;
821                 }
822
823                 if (fnd==FOUND) {
824                         deleteKV(db, &(page->page), &ptr, 1);
825                         page->issynced=0;
826                 }
827
828                 if ( size <= page->page.freespace ) {
829                         u_int32_t       oldsize=page->page.freespace;
830                         packLeafKV(db, &(page->page), ptr, key, value);
831                         tassert( oldsize == page->page.freespace + size );
832                         page->issynced=0;
833                 } else {
834                         rc = splitPage(db, page, &newright, &ptr, size, key, value, 0);
835                         if ( rc != TBT_OK ) return rc;
836
837                         page->issynced = newright->issynced = 0;
838                         page->islocked = newright->islocked = 1;
839                 }
840         } else {
841                 if ( (rc=layerInsert(db, ptr->pointer.internal.link, left, right, key, value))!=TBT_OK ) {
842                         page->islocked=0;
843                         return rc;
844                 }
845
846                 if ( *left ) { /* child page was splited */
847                         u_int32_t size;
848                         TBTPointer *rightptr = (TBTPointer*)( (*left)->page.data + ((*left)->page.npointer-1) * db->pointersize );
849                         TBTValue K;
850
851                         if ( ISINFPOINTER(db, &(page->page), ptr) ) {
852                                 deleteKV(db, &(page->page), &ptr, 1);
853                                 K.length = 0; K.value = NULL;
854                                 packInternalKV(db, &(page->page), ptr, &K, (*right)->pagenumber);
855                         } else 
856                                 ptr->pointer.internal.link=(*right)->pagenumber;
857
858                         K.length = ( db->keylen ) ? db->keylen : rightptr->key.varlen.length;
859                         K.value  = ( db->keylen ) ? rightptr->key.fixed.key : (*left)->page.data + rightptr->key.varlen.offset;
860
861                         size = db->pointersize + ( ( db->keylen ) ? 0 : PTRALIGN(K.length) );
862
863                         if ( size <= page->page.freespace ) {
864                                 packInternalKV(db, &(page->page), ptr, &K, (*left)->pagenumber);
865                                 page->issynced=0;
866                         } else {
867                                 rc = splitPage(db, page, &newright, &ptr, size, &K, NULL, (*left)->pagenumber);
868                                 if ( rc != TBT_OK ) return rc;
869
870                                 page->issynced = newright->issynced = 0;
871                                 page->islocked = newright->islocked = 1;
872                         }
873                 }
874         }
875
876         if ( *left ) {
877                 if ( (rc=savePage(db, *left))!=TBT_OK )
878                         return rc;
879                 if ( (rc=savePage(db, *right))!=TBT_OK )
880                         return rc;
881                 *left=*right=NULL;
882         }
883
884         if ( newright ) {
885                 if ( page->pagenumber==TBTPAGEROOT ) { /* root was splited */
886                         TBTMemPage  *newleft;
887                         TBTValue K;
888                 
889
890                         if  ( (rc=TBTNewPage(db,&newleft))!=TBT_OK )
891                                 return rc;
892                         memcpy( &(newleft->page) , &(page->page), TBTREEPAGESIZE);
893                         memset( &(page->page), 0, TBTREEPAGESIZE);
894                         page->page.freespace=TBTREEPAGESIZE-TBTPAGEHDRSZ;
895         
896                         ptr = (TBTPointer*)( newleft->page.data + (newleft->page.npointer-1) * db->pointersize ); 
897                         K.length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
898                         K.value  = ( db->keylen ) ? ptr->key.fixed.key : newleft->page.data + ptr->key.varlen.offset;
899                         packInternalKV(db, &(page->page), NULL, &K, newleft->pagenumber);
900                 
901                         ptr = (TBTPointer*)( newright->page.data + (newright->page.npointer-1) * db->pointersize ); 
902                         K.length = 0;
903                         K.value  = NULL;
904                         packInternalKV(db, &(page->page), NULL, &K, newright->pagenumber);
905
906                         if ( (rc=savePage(db, newright))!=TBT_OK )
907                                 return rc;
908                         if ( (rc=savePage(db, newleft))!=TBT_OK )
909                                 return rc;
910                         if ( (rc=savePage(db, page))!=TBT_OK )
911                                 return rc;
912                 } else {
913                         *left = page;
914                         *right = newright;
915                 }
916         } else if ( (rc=savePage(db, page))!=TBT_OK )
917                 return rc;
918         
919
920         return TBT_OK;
921
922
923 int
924 TBTInsert(TBTree *db, TBTValue *key, TBTValue *value) {
925         int rc;
926         TBTMemPage *lpage=NULL, *rpage=NULL;
927
928         rc = layerInsert(db, TBTPAGEROOT, &lpage, &rpage, key, value);
929
930         tassert( lpage==NULL && rpage==NULL );
931
932         return rc;
933 }
934
935 int
936 TBTDelete(TBTree *db, TBTValue *key) {
937         TBTMemPage      *page;
938         TBTPointer *ptr;
939         int rc;
940
941         if ( (rc=findPage(db, key, &page))!=TBT_OK )
942                 return rc; 
943                 
944         if ( page == NULL )
945                 return TBT_OK;
946
947         if ( findInPage(db, &(page->page), key, &ptr) != FOUND ) {
948                 if ( !(page)->iscached )
949                         tfree(page);
950                 return TBT_OK;
951         } 
952
953         deleteKV(db, &(page->page), &ptr, 1);
954         page->issynced=0;
955
956         rc=savePage(db, page);
957
958         return rc;      
959 }
960
961 int 
962 TBTInitIterator(TBTree *db, TBTIterator *iterator) {
963         TBTMemPage      *page=NULL;
964         TBTPointer *ptr;
965         int rc;
966         u_int32_t pagenum=TBTPAGEROOT;
967
968         memset(iterator, 0, sizeof(TBTIterator));
969
970         do {
971                 if ( page && !page->iscached )
972                         tfree(page);
973                 if ( (rc=TBTReadPage(db, pagenum, &page))!=TBT_OK )
974                         return rc;
975
976                 ptr = (TBTPointer*)( page->page.data );
977                 pagenum = ptr->pointer.internal.link;
978         } while( !page->page.isleaf );
979
980         page->islocked=1;
981         iterator->page = page;  
982         iterator->ptr = ptr;    
983
984         return TBT_OK;
985 }
986
987 void 
988 TBTFreeIterator(TBTree *db, TBTIterator *iterator) {
989         
990         if ( iterator->page ) {
991                 iterator->page->islocked=0;
992                 if ( !iterator->page->iscached )
993                         tfree(iterator->page);
994         }
995
996         memset(iterator, 0, sizeof(TBTIterator));
997 }
998
999 int 
1000 TBTIterate(TBTree *db, TBTIterator *iterator, TBTValue *key, TBTValue *value ) {
1001         int rc;
1002
1003         if (  iterator->page==NULL ) {
1004                 memset(key, 0, sizeof(TBTValue));
1005                 memset(value, 0, sizeof(TBTValue));
1006                 return TBT_OK;
1007         }
1008
1009         if ( (char*)(iterator->ptr) - iterator->page->page.data >= db->pointersize * iterator->page->page.npointer ) {
1010                 do {
1011                         u_int32_t pagenumber = iterator->page->page.rightlink;
1012                         iterator->page->islocked=0;
1013                         if ( !iterator->page->iscached )
1014                                 tfree(iterator->page);
1015                         iterator->page=NULL;
1016
1017                         if ( pagenumber==TBTPAGEROOT ) {
1018                                 memset(key, 0, sizeof(TBTValue));
1019                                 memset(value, 0, sizeof(TBTValue));
1020                                 return TBT_OK;
1021                         }
1022                         if ( (rc=TBTReadPage(db, pagenumber, &(iterator->page)))!=TBT_OK )
1023                                 return rc;
1024                 } while ( iterator->page->page.npointer == 0 );
1025
1026                 iterator->page->islocked=1;
1027                 iterator->ptr = (TBTPointer*)( iterator->page->page.data );
1028         }
1029
1030         if ( db->keylen ) {
1031                 key->length = db->keylen;
1032                 key->value = iterator->ptr->key.fixed.key;
1033         } else {
1034                 key->length = iterator->ptr->key.varlen.length; 
1035                 key->value = iterator->page->page.data + iterator->ptr->key.varlen.offset;
1036         }
1037
1038         value->length = iterator->ptr->pointer.leaf.length; 
1039         value->value = iterator->page->page.data + iterator->ptr->pointer.leaf.offset;
1040
1041         iterator->ptr = ( TBTPointer* ) (  (char*)(iterator->ptr) + db->pointersize );
1042
1043         return TBT_OK;
1044 }
1045
1046 int 
1047 TBTGetFirst(TBTree *db, TBTValue *key, TBTValue *value) {
1048         TBTIterator     iterator;
1049         int rc;
1050
1051         if ( (rc=TBTInitIterator(db, &iterator))!=TBT_OK )
1052                 return rc;
1053
1054         rc=TBTIterate(db, &iterator, key, value);
1055
1056         if ( key->value ) {
1057                 char * ptr;
1058
1059                 ptr = tmalloc( key->length );
1060                 memcpy( ptr, key->value, key->length );
1061                 key->value = ptr;
1062  
1063                 ptr = tmalloc( value->length );
1064                 memcpy( ptr, value->value, value->length );
1065                 value->value = ptr; 
1066         }
1067
1068         TBTFreeIterator(db, &iterator);
1069
1070         return rc;
1071 }
1072
1073 static int
1074 findLast(TBTree *db, TBTValue *key, TBTValue *value, u_int32_t pagenumber) {
1075         TBTMemPage      *page;
1076         TBTPointer      *ptr;
1077         int rc=TBT_OK,i;
1078
1079         if ( (rc=TBTReadPage(db, pagenumber, &page)) != TBT_OK )
1080                 return rc;
1081
1082         if ( page->page.npointer==0 ) {
1083                 if ( !page->iscached )
1084                         tfree(page);
1085                 return TBT_OK;
1086         }
1087
1088         page->islocked=1;
1089         if ( page->page.isleaf ) {
1090                 ptr = (TBTPointer*)(page->page.data + db->pointersize * (page->page.npointer-1));
1091
1092                 key->length = ( db->keylen ) ? db->keylen : ptr->key.varlen.length;
1093                 key->value = tmalloc( key->length );
1094                 memcpy( key->value, ( db->keylen ) ? ptr->key.fixed.key : page->page.data + ptr->key.varlen.offset, key->length ); 
1095
1096                 value->length = ptr->pointer.leaf.length;
1097                 value->value = tmalloc( value->length );
1098                 memcpy( value->value, page->page.data + ptr->pointer.leaf.offset, value->length ); 
1099         } else {
1100                 for(i=page->page.npointer-1; i>=0; i--) {
1101                         ptr = (TBTPointer*)( page->page.data + db->pointersize * i );
1102                         rc = findLast(db, key, value, ptr->pointer.internal.link);
1103                         if ( rc!=TBT_OK || key->value )
1104                                 break;
1105                 } 
1106         }
1107
1108         page->islocked=0;
1109         if ( !page->iscached )
1110                 tfree(page);
1111
1112         return rc;
1113 }
1114
1115 int 
1116 TBTGetLast(TBTree *db, TBTValue *key, TBTValue *value) {
1117         memset(key, 0, sizeof(TBTValue));
1118         memset(value, 0, sizeof(TBTValue));
1119         return  findLast(db, key, value, TBTPAGEROOT);
1120 }
1121