a7b46182491458439a9d18a3b1bf71e0e8f0fd6f
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #endif
22 #include <errno.h>
23 #ifdef HAVE_STRING_H
24 #include <string.h>
25 #else
26 #ifdef HAVE_STRINGS_H
27 #include <strings.h>
28 #endif
29 #endif
30 #include <lock.h>
31 #include <rx/xdr.h>
32
33
34
35 #define UBIK_INTERNALS
36 #include "ubik.h"
37 #include "ubik_int.h"
38
39 #define PHSIZE  128
40 struct buffer  {
41     struct ubik_dbase *dbase;       /* dbase within which the buffer resides */
42     afs_int32 file;                         /* Unique cache key */
43     afs_int32 page;                         /* page number */
44     struct buffer *lru_next;
45     struct buffer *lru_prev;
46     struct buffer *hashNext;        /* next dude in hash table */
47     char *data;                     /* ptr to the data */
48     char lockers;                   /* usage ref count */
49     char dirty;                     /* is buffer modified */
50     char hashIndex;                 /* back ptr to hash table */
51 } *Buffers;
52
53 #define pHash(page) ((page) & (PHSIZE-1))
54
55 afs_int32 ubik_nBuffers = NBUFFERS;
56 static struct buffer *phTable[PHSIZE];  /* page hash table */
57 static struct buffer *LruBuffer;
58 static int nbuffers;
59 static int calls=0, ios=0, lastb=0;
60 static char *BufferData;
61 static struct buffer *newslot();
62 static initd = 0;
63 #define BADFID      0xffffffff
64
65 static DTrunc();
66
67 static struct ubik_trunc *freeTruncList=0;
68
69 /* remove a transaction from the database's active transaction list.  Don't free it */
70 static unthread(atrans)
71     struct ubik_trans *atrans; {
72     struct ubik_trans **lt, *tt;
73     lt = &atrans->dbase->activeTrans;
74     for(tt = *lt; tt; lt = &tt->next, tt = *lt) {
75         if (tt == atrans) {
76             /* found it */
77             *lt = tt->next;
78             return 0;
79         }
80     }
81     return 2;   /* no entry */
82 }
83
84 /* some debugging assistance */
85 udisk_Debug(aparm)
86     struct ubik_debug *aparm; {
87     struct buffer *tb;
88     int i;
89
90     memcpy(&aparm->localVersion, &ubik_dbase->version, sizeof(struct ubik_version));
91     aparm->lockedPages = 0;
92     aparm->writeLockedPages = 0;
93     tb = Buffers;
94     for(i=0;i<nbuffers;i++, tb++) {
95         if (tb->lockers) {
96             aparm->lockedPages++;
97             if (tb->dirty) aparm->writeLockedPages++;
98         }
99     }
100 }
101
102 /* log format is defined here, and implicitly in recovery.c
103  *
104  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
105  * are in logged in network standard byte order, in case we want to move logs
106  * from machine-to-machine someday.
107  *
108  * Begin transaction: opcode
109  * Commit transaction: opcode, version (8 bytes)
110  * Truncate file: opcode, file number, length
111  * Abort transaction: opcode
112  * Write data: opcode, file, position, length, <length> data bytes
113  */
114
115 /* write an opcode to the log */
116 udisk_LogOpcode(adbase, aopcode, async)
117     struct ubik_dbase *adbase;
118     afs_int32 aopcode;
119     int async; {
120     struct ubik_stat ustat;
121     afs_int32 code;
122     
123     /* figure out where to write */
124     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
125     if (code < 0) return code;
126
127     /* setup data and do write */
128     aopcode = htonl(aopcode);
129     code = (*adbase->write)(adbase, LOGFILE, &aopcode, ustat.size, sizeof(afs_int32));
130     if (code != sizeof(afs_int32)) return UIOERROR;
131
132     /* optionally sync data */
133     if (async) code = (*adbase->sync)(adbase, LOGFILE);
134     else code = 0;
135     return code;
136 }
137
138 /* log a commit, never syncing */
139 udisk_LogEnd(adbase, aversion)
140     struct ubik_dbase *adbase;
141     struct ubik_version *aversion; {
142     afs_int32 code;
143     afs_int32 data[3];
144     struct ubik_stat ustat;
145     
146     /* figure out where to write */
147     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
148     if (code) return code;
149
150     /* setup data */
151     data[0] = htonl(LOGEND);
152     data[1] = htonl(aversion->epoch);
153     data[2] = htonl(aversion->counter);
154     
155     /* do write */
156     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
157     if (code != 3*sizeof(afs_int32)) return UIOERROR;
158
159     /* finally sync the log */
160     code = (*adbase->sync)(adbase, LOGFILE);
161     return code;
162 }
163     
164 /* log a truncate operation, never syncing */
165 udisk_LogTruncate(adbase, afile, alength)
166     struct ubik_dbase *adbase;
167     afs_int32 afile, alength; {
168     afs_int32 code;
169     afs_int32 data[3];
170     struct ubik_stat ustat;
171     
172     /* figure out where to write */
173     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
174     if (code < 0) return code;
175
176     /* setup data */
177     data[0] = htonl(LOGTRUNCATE);
178     data[1] = htonl(afile);
179     data[2] = htonl(alength);
180     
181     /* do write */
182     code = (*adbase->write)(adbase, LOGFILE, data, ustat.size, 3*sizeof(afs_int32));
183     if (code != 3*sizeof(afs_int32)) return UIOERROR;
184     return 0;
185 }
186     
187 /* write some data to the log, never syncing */
188 udisk_LogWriteData(adbase, afile, abuffer, apos, alen)
189     struct ubik_dbase *adbase;
190     char *abuffer;
191     afs_int32 afile;
192     afs_int32 apos;
193     afs_int32 alen; {
194     struct ubik_stat ustat;
195     afs_int32 code;
196     afs_int32 data[4];
197     afs_int32 lpos;
198     
199     /* find end of log */
200     code = (*adbase->stat)(adbase, LOGFILE, &ustat);
201     lpos = ustat.size;
202     if (code < 0) return code;
203
204     /* setup header */
205     data[0] = htonl(LOGDATA);
206     data[1] = htonl(afile);
207     data[2] = htonl(apos);
208     data[3] = htonl(alen);
209
210     /* write header */
211     code = (*adbase->write)(adbase, LOGFILE, data, lpos, 4*sizeof(afs_int32));
212     if (code != 4*sizeof(afs_int32)) return UIOERROR;
213     lpos += 4*sizeof(afs_int32);
214     
215     /* write data */
216     code = (*adbase->write)(adbase, LOGFILE, abuffer, lpos, alen);
217     if (code != alen) return UIOERROR;
218     return 0;
219 }
220
221 static int DInit (abuffers)
222     int abuffers; {
223     /* Initialize the venus buffer system. */
224     int i;
225     struct buffer *tb;
226     Buffers = (struct buffer *) malloc(abuffers * sizeof(struct buffer));
227     memset(Buffers, 0, abuffers * sizeof(struct buffer));
228     BufferData = (char *) malloc(abuffers * UBIK_PAGESIZE);
229     nbuffers = abuffers;
230     for(i=0;i<PHSIZE;i++) phTable[i] = 0;
231     for (i=0;i<abuffers;i++) {
232         /* Fill in each buffer with an empty indication. */
233         tb = &Buffers[i];
234         tb->lru_next = &(Buffers[i+1]);
235         tb->lru_prev = &(Buffers[i-1]);
236         tb->data = &BufferData[UBIK_PAGESIZE*i];
237         tb->file = BADFID;
238     }
239     Buffers[0].lru_prev = &(Buffers[abuffers-1]);
240     Buffers[abuffers-1].lru_next = &(Buffers[0]);
241     LruBuffer  = &(Buffers[0]);
242     return 0;
243 }
244
245 /* Take a buffer and mark it as the least recently used buffer */
246 static void Dlru(abuf)
247   struct buffer *abuf;
248 {
249   if (LruBuffer == abuf)
250      return;
251
252   /* Unthread from where it is in the list */
253   abuf->lru_next->lru_prev = abuf->lru_prev;
254   abuf->lru_prev->lru_next = abuf->lru_next;
255
256   /* Thread onto beginning of LRU list */
257   abuf->lru_next = LruBuffer;
258   abuf->lru_prev = LruBuffer->lru_prev;
259
260   LruBuffer->lru_prev->lru_next = abuf;
261   LruBuffer->lru_prev = abuf;
262   LruBuffer = abuf;
263 }
264
265 /* Take a buffer and mark it as the most recently used buffer */
266 static void Dmru(abuf)
267      struct buffer *abuf;
268 {
269   if (LruBuffer == abuf) {
270      LruBuffer = LruBuffer->lru_next;
271      return;
272   }
273
274   /* Unthread from where it is in the list */
275   abuf->lru_next->lru_prev = abuf->lru_prev;
276   abuf->lru_prev->lru_next = abuf->lru_next;
277
278   /* Thread onto end of LRU list - making it the MRU buffer */
279   abuf->lru_next = LruBuffer;
280   abuf->lru_prev = LruBuffer->lru_prev;
281   LruBuffer->lru_prev->lru_next = abuf;
282   LruBuffer->lru_prev = abuf;
283 }
284
285 /* get a pointer to a particular buffer */
286 static char *DRead(dbase, fid, page)
287     struct ubik_dbase *dbase;
288     afs_int32 fid;
289     int page; {
290     /* Read a page from the disk. */
291     struct buffer *tb, *lastbuffer;
292     afs_int32 code;
293
294     calls++;
295     lastbuffer = LruBuffer->lru_prev;
296
297     if ((lastbuffer->page  == page ) && 
298         (lastbuffer->file  == fid  ) && 
299         (lastbuffer->dbase == dbase)) {
300         tb = lastbuffer;
301         tb->lockers++;
302         lastb++;
303         return tb->data;
304     }
305     for(tb=phTable[pHash(page)]; tb; tb=tb->hashNext) {
306         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
307             Dmru(tb);
308             tb->lockers++;
309             return tb->data;
310         }
311     }
312     /* can't find it */
313     tb = newslot(dbase, fid, page);
314     if (!tb) return 0;
315     memset(tb->data, 0, UBIK_PAGESIZE);
316
317     tb->lockers++;
318     code = (*dbase->read)(dbase, fid, tb->data, page*UBIK_PAGESIZE, UBIK_PAGESIZE);
319     if (code < 0) {
320        tb->file = BADFID;
321        Dlru(tb);
322        tb->lockers--;
323        ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
324        return 0;
325     }
326     ios++;
327
328     /* Note that findslot sets the page field in the buffer equal to
329      * what it is searching for.
330      */
331     return tb->data;
332 }
333
334 /* zap truncated pages */
335 static DTrunc(dbase, fid, length)
336     struct ubik_dbase *dbase;
337     afs_int32 fid;
338     afs_int32 length; {
339     afs_int32 maxPage;
340     struct buffer *tb;
341     int   i;
342     
343     maxPage = (length+UBIK_PAGESIZE-1)>>UBIK_LOGPAGESIZE;       /* first invalid page now in file */
344     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
345         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
346             tb->file = BADFID;
347             Dlru(tb);
348         }
349     }
350     return 0;
351 }
352
353 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
354     performing them immediately, so that we can abort a transaction easily by simply purging
355     the in-core memory buffers and discarding these truncation entries.
356 */
357 static struct ubik_trunc *GetTrunc() {
358     struct ubik_trunc *tt;
359     if (!freeTruncList) {
360         freeTruncList = (struct ubik_trunc *) malloc(sizeof(struct ubik_trunc));
361         freeTruncList->next = (struct ubik_trunc *) 0;
362     }
363     tt = freeTruncList;
364     freeTruncList = tt->next;
365     return tt;
366 }
367
368 /* free a truncation entry */
369 static PutTrunc(at)
370     struct ubik_trunc *at; {
371     at->next = freeTruncList;
372     freeTruncList = at;
373     return 0;
374 }
375
376 /* find a truncation entry for a file, if any */
377 static struct ubik_trunc *FindTrunc(atrans, afile)
378     struct ubik_trans *atrans;
379     afs_int32 afile; {
380     struct ubik_trunc *tt;
381     for(tt=atrans->activeTruncs; tt; tt=tt->next) {
382         if (tt->file == afile) return tt;
383     }
384     return (struct ubik_trunc *) 0;
385 }
386
387 /* do truncates associated with trans, and free them */
388 static DoTruncs(atrans)
389     struct ubik_trans *atrans; {
390     struct ubik_trunc *tt, *nt;
391     int (*tproc)();
392     afs_int32 rcode=0, code;
393
394     tproc = atrans->dbase->truncate;
395     for(tt = atrans->activeTruncs; tt; tt=nt) {
396         nt = tt->next;
397         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
398         code = (*tproc)(atrans->dbase, tt->file, tt->length);
399         if (code) rcode = code;
400         PutTrunc(tt);
401     }
402     /* don't unthread, because we do the entire list's worth here */
403     atrans->activeTruncs = (struct ubik_trunc *) 0;
404     return(rcode);
405 }
406
407 /* mark a fid as invalid */
408 udisk_Invalidate(adbase, afid)
409 struct ubik_dbase *adbase;
410 afs_int32 afid; {
411     struct buffer *tb;
412     int    i;
413
414     for (i=0,tb=Buffers; i<nbuffers; i++,tb++) {
415         if (tb->file == afid) {
416             tb->file = BADFID;
417             Dlru(tb);
418         }
419     }
420     return 0;
421 }
422
423 /* move this page into the correct hash bucket */
424 static FixupBucket(ap)
425     struct buffer *ap; {
426     struct buffer **lp, *tp;
427     int i;
428     /* first try to get it out of its current hash bucket, in which it might not be */
429     i = ap->hashIndex;
430     lp = &phTable[i];
431     for(tp = *lp; tp; tp=tp->hashNext) {
432         if (tp == ap) {
433             *lp = tp->hashNext;
434             break;
435         }
436         lp = &tp->hashNext;
437     }
438     /* now figure the new hash bucket */
439     i = pHash(ap->page);
440     ap->hashIndex = i;          /* remember where we are for deletion */
441     ap->hashNext = phTable[i];  /* add us to the list */
442     phTable[i] = ap;
443 }
444
445 /* create a new slot for a particular dbase page */
446 static struct buffer *newslot (adbase, afid, apage)
447     struct ubik_dbase *adbase;
448     afs_int32 afid, apage; {
449     /* Find a usable buffer slot */
450     afs_int32 i;
451     struct buffer *pp, *tp;
452
453     pp = 0;             /* last pure */
454     for (i=0,tp=LruBuffer; i<nbuffers; i++,tp=tp->lru_next) {
455        if (!tp->lockers && !tp->dirty) {
456           pp = tp;
457           break;
458        }
459     }
460
461     if (pp == 0) {
462         /* There are no unlocked buffers that don't need to be written to the disk. */
463         ubik_print("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
464         return NULL;
465     }
466
467     /* Now fill in the header. */
468     pp->dbase = adbase;
469     pp->file = afid;
470     pp->page = apage;
471
472     FixupBucket(pp);            /* move to the right hash bucket */
473     Dmru(pp);
474     return pp;
475 }
476
477 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
478 static void DRelease (ap,flag)
479     char *ap;
480     int flag; {
481     int index;
482     struct buffer *bp;
483
484     if (!ap) return;
485     index = (ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
486     bp = &(Buffers[index]);
487     bp->lockers--;
488     if (flag) bp->dirty=1;
489     return;
490 }
491
492 /* flush all modified buffers, leaves dirty bits set (they're cleared
493  * by DSync).  Note interaction with DSync: you call this thing first,
494  * writing the buffers to the disk.  Then you call DSync to sync all the
495  * files that were written, and to clear the dirty bits.  You should
496  * always call DFlush/DSync as a pair.
497  */
498 static DFlush (adbase)
499     struct ubik_dbase *adbase; {
500     int i;
501     afs_int32 code;
502     struct buffer *tb;
503
504     tb = Buffers;
505     for(i=0;i<nbuffers;i++,tb++) {
506         if (tb->dirty) {
507             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
508             code = (*adbase->write)(adbase, tb->file, tb->data, code, UBIK_PAGESIZE);
509             if (code != UBIK_PAGESIZE) return UIOERROR;
510         }
511     }
512     return 0;
513 }
514
515 /* flush all modified buffers */
516 static DAbort (adbase)
517     struct ubik_dbase *adbase; {
518     int i;
519     struct buffer *tb;
520
521     tb = Buffers;
522     for(i=0;i<nbuffers;i++,tb++) {
523         if (tb->dirty) {
524             tb->dirty = 0;
525             tb->file = BADFID;
526             Dlru(tb);
527         }
528     }
529     return 0;
530 }
531
532 /* must only be called after DFlush, due to its interpretation of dirty flag */
533 static DSync(adbase)
534     struct ubik_dbase *adbase; {
535     int i;
536     afs_int32 code;
537     struct buffer *tb;
538     afs_int32 file;
539     afs_int32 rCode;
540
541     rCode = 0;
542     while (1) {
543         file = BADFID;
544         for(i=0,tb = Buffers; i<nbuffers; i++,tb++) {
545             if (tb->dirty == 1) {
546                 if (file == BADFID) file = tb->file;
547                 if (file != BADFID && tb->file == file) tb->dirty = 0;
548             }
549         }
550         if (file == BADFID) break;
551         /* otherwise we have a file to sync */
552         code = (*adbase->sync)(adbase, file);
553         if (code) rCode = code;
554     }
555     return rCode;
556 }
557
558 /* Same as read, only do not even try to read the page */
559 static char *DNew (dbase, fid, page)
560     struct ubik_dbase *dbase;
561     int page;
562     afs_int32 fid; {
563     struct buffer *tb;
564
565     if ((tb = newslot(dbase, fid, page)) == 0) return NULL;
566     tb->lockers++;
567     memset(tb->data, 0, UBIK_PAGESIZE);
568     return tb->data;
569 }
570
571 /* read data from database */
572 udisk_read(atrans, afile, abuffer, apos, alen)
573     afs_int32 afile;
574     char *abuffer;
575     afs_int32 apos, alen;
576     struct ubik_trans *atrans; {
577     char *bp;
578     afs_int32 offset, len, totalLen;
579     struct ubik_dbase *dbase;
580
581     if (atrans->flags & TRDONE) return UDONE;
582     totalLen = 0;
583     dbase = atrans->dbase;
584     while (alen > 0) {
585         bp = DRead(dbase, afile, apos>>UBIK_LOGPAGESIZE);
586         if (!bp) return UEOF;
587         /* otherwise, min of remaining bytes and end of buffer to user mode */
588         offset = apos & (UBIK_PAGESIZE-1);
589         len = UBIK_PAGESIZE - offset;
590         if (len > alen) len = alen;
591         memcpy(abuffer, bp+offset, len);
592         abuffer += len;
593         apos += len;
594         alen -= len;
595         totalLen += len;
596         DRelease(bp, 0);
597     }
598     return 0;
599 }
600
601 /* truncate file */
602 udisk_truncate(atrans, afile, alength)
603     struct ubik_trans *atrans;
604     afs_int32 afile;
605     afs_int32 alength; {
606     afs_int32 code;
607     struct ubik_trunc *tt;
608
609     if (atrans->flags & TRDONE) return UDONE;
610     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
611
612     /* write a truncate log record */
613     code = udisk_LogTruncate(atrans->dbase, afile, alength);
614
615     /* don't truncate until commit time */
616     tt = FindTrunc(atrans, afile);
617     if (!tt) {
618         /* this file not truncated yet */
619         tt=GetTrunc();
620         tt->next = atrans->activeTruncs;
621         atrans->activeTruncs = tt;
622         tt->file = afile;
623         tt->length = alength;
624     }
625     else {
626         /* already truncated to a certain length */
627         if (tt->length > alength) tt->length = alength;
628     }
629     return code;
630 }
631
632 /* write data to database, using logs */
633 udisk_write(atrans, afile, abuffer, apos, alen)
634     afs_int32 afile;
635     char *abuffer;
636     afs_int32 apos, alen;
637     struct ubik_trans *atrans; {
638     char *bp;
639     afs_int32 offset, len, totalLen;
640     struct ubik_dbase *dbase;
641     struct ubik_trunc *tt;
642     afs_int32 code;
643
644     if (atrans->flags & TRDONE) return UDONE;
645     if (atrans->type != UBIK_WRITETRANS) return UBADTYPE;
646
647     dbase = atrans->dbase;
648     /* first write the data to the log */
649     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
650     if (code) return code;
651
652     /* expand any truncations of this file */
653     tt = FindTrunc(atrans, afile);
654     if (tt) {
655         if (tt->length < apos + alen) {
656             tt->length = apos + alen;
657         }
658     }
659
660     /* now update vm */
661     totalLen = 0;
662     while (alen > 0) {
663         bp = DRead(dbase, afile, apos>>UBIK_LOGPAGESIZE);
664         if (!bp) {
665             bp = DNew(dbase, afile, apos>>UBIK_LOGPAGESIZE);
666             if (!bp) return UIOERROR;
667             memset(bp, 0, UBIK_PAGESIZE);
668         }
669         /* otherwise, min of remaining bytes and end of buffer to user mode */
670         offset = apos & (UBIK_PAGESIZE-1);
671         len = UBIK_PAGESIZE-offset;
672         if (len > alen) len = alen;
673         memcpy(bp+offset, abuffer, len);
674         abuffer += len;
675         apos += len;
676         alen -= len;
677         totalLen += len;
678         DRelease(bp, 1);    /* buffer modified */
679     }
680     return 0;
681 }
682
683 /* begin a new local transaction */
684 udisk_begin(adbase, atype, atrans)
685     struct ubik_trans **atrans;
686     int atype;
687     struct ubik_dbase *adbase; {
688     afs_int32 code;
689     struct ubik_trans *tt;
690
691     *atrans = (struct ubik_trans *)NULL;
692     /* Make sure system is initialized before doing anything */
693     if (!initd) {
694         initd = 1;
695         DInit(ubik_nBuffers);
696     }
697     if (atype == UBIK_WRITETRANS) {
698         if (adbase->flags & DBWRITING) return USYNC;
699         code = udisk_LogOpcode(adbase, LOGNEW, 0);
700         if (code) return code;
701     }
702     tt = (struct ubik_trans *) malloc(sizeof(struct ubik_trans));
703     memset(tt, 0, sizeof(struct ubik_trans));
704     tt->dbase = adbase;
705     tt->next = adbase->activeTrans;
706     adbase->activeTrans = tt;
707     tt->type = atype;
708     if (atype == UBIK_READTRANS) adbase->readers++;
709     else if (atype == UBIK_WRITETRANS) adbase->flags |= DBWRITING;
710     *atrans = tt;
711     return 0;
712 }
713
714 /* commit transaction */
715 udisk_commit(atrans)
716     struct ubik_trans *atrans; {
717     struct ubik_dbase *dbase;
718     afs_int32 code=0;
719     struct ubik_version oldversion, newversion;
720
721     if (atrans->flags & TRDONE)
722        return(UTWOENDS);
723
724     if (atrans->type == UBIK_WRITETRANS) {
725         dbase = atrans->dbase;
726
727         /* On the first write to the database. We update the versions */
728         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
729            oldversion = dbase->version;
730            newversion.epoch   = FT_ApproxTime();;
731            newversion.counter = 1;
732           
733            code = (*dbase->setlabel)(dbase, 0, &newversion);
734            if (code) return(code);
735            ubik_epochTime = newversion.epoch;
736            dbase->version = newversion;
737
738            /* Ignore the error here. If the call fails, the site is
739             * marked down and when we detect it is up again, we will 
740             * send the entire database to it.
741             */
742            ContactQuorum(DISK_SetVersion, atrans, 1/*CStampVersion*/,
743                          &oldversion, &newversion);
744            urecovery_state |= UBIK_RECLABELDB;
745         }
746
747         dbase->version.counter++;       /* bump commit count */
748         LWP_NoYieldSignal(&dbase->version);
749
750         code = udisk_LogEnd(dbase, &dbase->version);
751         if (code) {
752            dbase->version.counter--;
753            return(code);
754         }
755
756         /* If we fail anytime after this, then panic and let the
757          * recovery replay the log. 
758          */
759         code = DFlush(dbase);   /* write dirty pages to respective files */
760         if (code) panic("Writing Ubik DB modifications\n");
761         code = DSync(dbase);    /* sync the files and mark pages not dirty */
762         if (code) panic("Synchronizing Ubik DB modifications\n");
763
764         code = DoTruncs(atrans); /* Perform requested truncations */
765         if (code) panic("Truncating Ubik DB\n");
766
767         /* label the committed dbase */
768         code = (*dbase->setlabel)(dbase, 0, &dbase->version);
769         if (code) panic("Truncating Ubik DB\n");
770
771         code = (*dbase->truncate)(dbase, LOGFILE, 0);   /* discard log (optional) */
772         if (code) panic("Truncating Ubik logfile\n");
773
774     }
775
776     /* When the transaction is marked done, it also means the logfile
777      * has been truncated.
778      */
779     atrans->flags |= TRDONE;
780     return code;
781 }
782
783 /* abort transaction */
784 udisk_abort(atrans)
785     struct ubik_trans *atrans;
786 {
787     struct ubik_dbase *dbase;
788     afs_int32 code;
789     
790     if (atrans->flags & TRDONE)
791        return UTWOENDS;
792
793     /* Check if we are the write trans before logging abort, lest we
794      * abort a good write trans in progress. 
795      * We don't really care if the LOGABORT gets to the log because we 
796      * truncate the log next. If the truncate fails, we panic; for 
797      * otherwise, the log entries remain. On restart, replay of the log
798      * will do nothing because the abort is there or no LogEnd opcode.
799      */
800     dbase = atrans->dbase;
801     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
802         udisk_LogOpcode(dbase, LOGABORT, 1);
803         code = (*dbase->truncate)(dbase, LOGFILE, 0);
804         if (code) panic("Truncating Ubik logfile during an abort\n");
805         DAbort(dbase);         /* remove all dirty pages */
806     }
807
808     /* When the transaction is marked done, it also means the logfile
809      * has been truncated.
810      */
811     atrans->flags |= (TRABORT | TRDONE);
812     return 0;
813 }
814
815 /* destroy a transaction after it has been committed or aborted.  if
816  * it hasn't committed before you call this routine, we'll abort the
817  * transaction for you.
818  */
819 udisk_end(atrans)
820     struct ubik_trans *atrans; {
821     struct ubik_dbase *dbase;
822
823 #if defined(UBIK_PAUSE)
824        /* Another thread is trying to lock this transaction.
825         * That can only be an RPC doing SDISK_Lock.
826         * Unlock the transaction, 'cause otherwise the other
827         * thread will never wake up.  Don't free it because
828         * the caller will do that already.
829         */
830     if (atrans->flags & TRSETLOCK) {
831         atrans->flags |= TRSTALE;
832         ulock_relLock(atrans);
833         return;
834     }
835 #endif /* UBIK_PAUSE */
836     if (!(atrans->flags & TRDONE)) udisk_abort(atrans);
837     dbase = atrans->dbase;
838
839     ulock_relLock(atrans);
840     unthread(atrans);
841
842     /* check if we are the write trans before unsetting the DBWRITING bit, else
843      * we could be unsetting someone else's bit.
844      */
845     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
846        dbase->flags &= ~DBWRITING;
847     } else {
848        dbase->readers--;
849     }
850     if (atrans->iovec_info.iovec_wrt_val) free(atrans->iovec_info.iovec_wrt_val);
851     if (atrans->iovec_data.iovec_buf_val) free(atrans->iovec_data.iovec_buf_val);
852     free(atrans);
853
854     /* Wakeup any writers waiting in BeginTrans() */
855     LWP_NoYieldSignal(&dbase->flags);
856     return 0;
857 }