add-missing-return-values-20031207
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID
14     ("$Header$");
15
16 #include <sys/types.h>
17 #ifdef AFS_NT40_ENV
18 #include <winsock2.h>
19 #else
20 #include <sys/file.h>
21 #include <netinet/in.h>
22 #endif
23 #include <errno.h>
24 #ifdef HAVE_STRING_H
25 #include <string.h>
26 #else
27 #ifdef HAVE_STRINGS_H
28 #include <strings.h>
29 #endif
30 #endif
31 #include <lock.h>
32 #include <rx/xdr.h>
33
34
35
36 #define UBIK_INTERNALS
37 #include "ubik.h"
38 #include "ubik_int.h"
39
40 #define PHSIZE  128
41 static struct buffer {
42     struct ubik_dbase *dbase;   /* dbase within which the buffer resides */
43     afs_int32 file;             /* Unique cache key */
44     afs_int32 page;             /* page number */
45     struct buffer *lru_next;
46     struct buffer *lru_prev;
47     struct buffer *hashNext;    /* next dude in hash table */
48     char *data;                 /* ptr to the data */
49     char lockers;               /* usage ref count */
50     char dirty;                 /* is buffer modified */
51     char hashIndex;             /* back ptr to hash table */
52 } *Buffers;
53
54 #define pHash(page) ((page) & (PHSIZE-1))
55
56 afs_int32 ubik_nBuffers = NBUFFERS;
57 static struct buffer *phTable[PHSIZE];  /* page hash table */
58 static struct buffer *LruBuffer;
59 static int nbuffers;
60 static int calls = 0, ios = 0, lastb = 0;
61 static char *BufferData;
62 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
63                               afs_int32 apage);
64 static initd = 0;
65 #define BADFID      0xffffffff
66
67 static DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length);
68
69 static struct ubik_trunc *freeTruncList = 0;
70
71 /* remove a transaction from the database's active transaction list.  Don't free it */
72 static int
73 unthread(struct ubik_trans *atrans)
74 {
75     struct ubik_trans **lt, *tt;
76     lt = &atrans->dbase->activeTrans;
77     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
78         if (tt == atrans) {
79             /* found it */
80             *lt = tt->next;
81             return 0;
82         }
83     }
84     return 2;                   /* no entry */
85 }
86
87 /* some debugging assistance */
88 int
89 udisk_Debug(struct ubik_debug *aparm)
90 {
91     struct buffer *tb;
92     int i;
93
94     memcpy(&aparm->localVersion, &ubik_dbase->version,
95            sizeof(struct ubik_version));
96     aparm->lockedPages = 0;
97     aparm->writeLockedPages = 0;
98     tb = Buffers;
99     for (i = 0; i < nbuffers; i++, tb++) {
100         if (tb->lockers) {
101             aparm->lockedPages++;
102             if (tb->dirty)
103                 aparm->writeLockedPages++;
104         }
105     }
106     return 0;
107 }
108
109 /* log format is defined here, and implicitly in recovery.c
110  *
111  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
112  * are in logged in network standard byte order, in case we want to move logs
113  * from machine-to-machine someday.
114  *
115  * Begin transaction: opcode
116  * Commit transaction: opcode, version (8 bytes)
117  * Truncate file: opcode, file number, length
118  * Abort transaction: opcode
119  * Write data: opcode, file, position, length, <length> data bytes
120  */
121
122 /* write an opcode to the log */
123 int
124 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
125 {
126     struct ubik_stat ustat;
127     afs_int32 code;
128
129     /* figure out where to write */
130     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
131     if (code < 0)
132         return code;
133
134     /* setup data and do write */
135     aopcode = htonl(aopcode);
136     code =
137         (*adbase->write) (adbase, LOGFILE, &aopcode, ustat.size,
138                           sizeof(afs_int32));
139     if (code != sizeof(afs_int32))
140         return UIOERROR;
141
142     /* optionally sync data */
143     if (async)
144         code = (*adbase->sync) (adbase, LOGFILE);
145     else
146         code = 0;
147     return code;
148 }
149
150 /* log a commit, never syncing */
151 int
152 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
153 {
154     afs_int32 code;
155     afs_int32 data[3];
156     struct ubik_stat ustat;
157
158     /* figure out where to write */
159     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
160     if (code)
161         return code;
162
163     /* setup data */
164     data[0] = htonl(LOGEND);
165     data[1] = htonl(aversion->epoch);
166     data[2] = htonl(aversion->counter);
167
168     /* do write */
169     code =
170         (*adbase->write) (adbase, LOGFILE, data, ustat.size,
171                           3 * sizeof(afs_int32));
172     if (code != 3 * sizeof(afs_int32))
173         return UIOERROR;
174
175     /* finally sync the log */
176     code = (*adbase->sync) (adbase, LOGFILE);
177     return code;
178 }
179
180 /* log a truncate operation, never syncing */
181 int
182 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
183                   afs_int32 alength)
184 {
185     afs_int32 code;
186     afs_int32 data[3];
187     struct ubik_stat ustat;
188
189     /* figure out where to write */
190     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
191     if (code < 0)
192         return code;
193
194     /* setup data */
195     data[0] = htonl(LOGTRUNCATE);
196     data[1] = htonl(afile);
197     data[2] = htonl(alength);
198
199     /* do write */
200     code =
201         (*adbase->write) (adbase, LOGFILE, data, ustat.size,
202                           3 * sizeof(afs_int32));
203     if (code != 3 * sizeof(afs_int32))
204         return UIOERROR;
205     return 0;
206 }
207
208 /* write some data to the log, never syncing */
209 int
210 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, char *abuffer,
211                    afs_int32 apos, afs_int32 alen)
212 {
213     struct ubik_stat ustat;
214     afs_int32 code;
215     afs_int32 data[4];
216     afs_int32 lpos;
217
218     /* find end of log */
219     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
220     lpos = ustat.size;
221     if (code < 0)
222         return code;
223
224     /* setup header */
225     data[0] = htonl(LOGDATA);
226     data[1] = htonl(afile);
227     data[2] = htonl(apos);
228     data[3] = htonl(alen);
229
230     /* write header */
231     code =
232         (*adbase->write) (adbase, LOGFILE, data, lpos, 4 * sizeof(afs_int32));
233     if (code != 4 * sizeof(afs_int32))
234         return UIOERROR;
235     lpos += 4 * sizeof(afs_int32);
236
237     /* write data */
238     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
239     if (code != alen)
240         return UIOERROR;
241     return 0;
242 }
243
244 static int
245 DInit(int abuffers)
246 {
247     /* Initialize the venus buffer system. */
248     int i;
249     struct buffer *tb;
250     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
251     memset(Buffers, 0, abuffers * sizeof(struct buffer));
252     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
253     nbuffers = abuffers;
254     for (i = 0; i < PHSIZE; i++)
255         phTable[i] = 0;
256     for (i = 0; i < abuffers; i++) {
257         /* Fill in each buffer with an empty indication. */
258         tb = &Buffers[i];
259         tb->lru_next = &(Buffers[i + 1]);
260         tb->lru_prev = &(Buffers[i - 1]);
261         tb->data = &BufferData[UBIK_PAGESIZE * i];
262         tb->file = BADFID;
263     }
264     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
265     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
266     LruBuffer = &(Buffers[0]);
267     return 0;
268 }
269
270 /* Take a buffer and mark it as the least recently used buffer */
271 static void
272 Dlru(struct buffer *abuf)
273 {
274     if (LruBuffer == abuf)
275         return;
276
277     /* Unthread from where it is in the list */
278     abuf->lru_next->lru_prev = abuf->lru_prev;
279     abuf->lru_prev->lru_next = abuf->lru_next;
280
281     /* Thread onto beginning of LRU list */
282     abuf->lru_next = LruBuffer;
283     abuf->lru_prev = LruBuffer->lru_prev;
284
285     LruBuffer->lru_prev->lru_next = abuf;
286     LruBuffer->lru_prev = abuf;
287     LruBuffer = abuf;
288 }
289
290 /* Take a buffer and mark it as the most recently used buffer */
291 static void
292 Dmru(struct buffer *abuf)
293 {
294     if (LruBuffer == abuf) {
295         LruBuffer = LruBuffer->lru_next;
296         return;
297     }
298
299     /* Unthread from where it is in the list */
300     abuf->lru_next->lru_prev = abuf->lru_prev;
301     abuf->lru_prev->lru_next = abuf->lru_next;
302
303     /* Thread onto end of LRU list - making it the MRU buffer */
304     abuf->lru_next = LruBuffer;
305     abuf->lru_prev = LruBuffer->lru_prev;
306     LruBuffer->lru_prev->lru_next = abuf;
307     LruBuffer->lru_prev = abuf;
308 }
309
310 /* get a pointer to a particular buffer */
311 static char *
312 DRead(struct ubik_dbase *dbase, afs_int32 fid, int page)
313 {
314     /* Read a page from the disk. */
315     struct buffer *tb, *lastbuffer;
316     afs_int32 code;
317
318     calls++;
319     lastbuffer = LruBuffer->lru_prev;
320
321     if ((lastbuffer->page == page) && (lastbuffer->file == fid)
322         && (lastbuffer->dbase == dbase)) {
323         tb = lastbuffer;
324         tb->lockers++;
325         lastb++;
326         return tb->data;
327     }
328     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
329         if (tb->page == page && tb->file == fid && tb->dbase == dbase) {
330             Dmru(tb);
331             tb->lockers++;
332             return tb->data;
333         }
334     }
335     /* can't find it */
336     tb = newslot(dbase, fid, page);
337     if (!tb)
338         return 0;
339     memset(tb->data, 0, UBIK_PAGESIZE);
340
341     tb->lockers++;
342     code =
343         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
344                         UBIK_PAGESIZE);
345     if (code < 0) {
346         tb->file = BADFID;
347         Dlru(tb);
348         tb->lockers--;
349         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
350         return 0;
351     }
352     ios++;
353
354     /* Note that findslot sets the page field in the buffer equal to
355      * what it is searching for.
356      */
357     return tb->data;
358 }
359
360 /* zap truncated pages */
361 static int
362 DTrunc(struct ubik_dbase *dbase, afs_int32 fid, afs_int32 length)
363 {
364     afs_int32 maxPage;
365     struct buffer *tb;
366     int i;
367
368     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
369     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
370         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
371             tb->file = BADFID;
372             Dlru(tb);
373         }
374     }
375     return 0;
376 }
377
378 /* allocate a truncation entry.  We allocate special entries representing truncations, rather than
379     performing them immediately, so that we can abort a transaction easily by simply purging
380     the in-core memory buffers and discarding these truncation entries.
381 */
382 static struct ubik_trunc *
383 GetTrunc(void)
384 {
385     struct ubik_trunc *tt;
386     if (!freeTruncList) {
387         freeTruncList =
388             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
389         freeTruncList->next = (struct ubik_trunc *)0;
390     }
391     tt = freeTruncList;
392     freeTruncList = tt->next;
393     return tt;
394 }
395
396 /* free a truncation entry */
397 static int
398 PutTrunc(struct ubik_trunc *at)
399 {
400     at->next = freeTruncList;
401     freeTruncList = at;
402     return 0;
403 }
404
405 /* find a truncation entry for a file, if any */
406 static struct ubik_trunc *
407 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
408 {
409     struct ubik_trunc *tt;
410     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
411         if (tt->file == afile)
412             return tt;
413     }
414     return (struct ubik_trunc *)0;
415 }
416
417 /* do truncates associated with trans, and free them */
418 static int
419 DoTruncs(struct ubik_trans *atrans)
420 {
421     struct ubik_trunc *tt, *nt;
422     int (*tproc) ();
423     afs_int32 rcode = 0, code;
424
425     tproc = atrans->dbase->truncate;
426     for (tt = atrans->activeTruncs; tt; tt = nt) {
427         nt = tt->next;
428         DTrunc(atrans->dbase, tt->file, tt->length);    /* zap pages from buffer cache */
429         code = (*tproc) (atrans->dbase, tt->file, tt->length);
430         if (code)
431             rcode = code;
432         PutTrunc(tt);
433     }
434     /* don't unthread, because we do the entire list's worth here */
435     atrans->activeTruncs = (struct ubik_trunc *)0;
436     return (rcode);
437 }
438
439 /* mark a fid as invalid */
440 int
441 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
442 {
443     struct buffer *tb;
444     int i;
445
446     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
447         if (tb->file == afid) {
448             tb->file = BADFID;
449             Dlru(tb);
450         }
451     }
452     return 0;
453 }
454
455 /* move this page into the correct hash bucket */
456 static int
457 FixupBucket(struct buffer *ap)
458 {
459     struct buffer **lp, *tp;
460     int i;
461     /* first try to get it out of its current hash bucket, in which it might not be */
462     i = ap->hashIndex;
463     lp = &phTable[i];
464     for (tp = *lp; tp; tp = tp->hashNext) {
465         if (tp == ap) {
466             *lp = tp->hashNext;
467             break;
468         }
469         lp = &tp->hashNext;
470     }
471     /* now figure the new hash bucket */
472     i = pHash(ap->page);
473     ap->hashIndex = i;          /* remember where we are for deletion */
474     ap->hashNext = phTable[i];  /* add us to the list */
475     phTable[i] = ap;
476     return 0;
477 }
478
479 /* create a new slot for a particular dbase page */
480 static struct buffer *
481 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
482 {
483     /* Find a usable buffer slot */
484     afs_int32 i;
485     struct buffer *pp, *tp;
486
487     pp = 0;                     /* last pure */
488     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
489         if (!tp->lockers && !tp->dirty) {
490             pp = tp;
491             break;
492         }
493     }
494
495     if (pp == 0) {
496         /* There are no unlocked buffers that don't need to be written to the disk. */
497         ubik_print
498             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
499         return NULL;
500     }
501
502     /* Now fill in the header. */
503     pp->dbase = adbase;
504     pp->file = afid;
505     pp->page = apage;
506
507     FixupBucket(pp);            /* move to the right hash bucket */
508     Dmru(pp);
509     return pp;
510 }
511
512 /* Release a buffer, specifying whether or not the buffer has been modified by the locker. */
513 static void
514 DRelease(char *ap, int flag)
515 {
516     int index;
517     struct buffer *bp;
518
519     if (!ap)
520         return;
521     index = (ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
522     bp = &(Buffers[index]);
523     bp->lockers--;
524     if (flag)
525         bp->dirty = 1;
526     return;
527 }
528
529 /* flush all modified buffers, leaves dirty bits set (they're cleared
530  * by DSync).  Note interaction with DSync: you call this thing first,
531  * writing the buffers to the disk.  Then you call DSync to sync all the
532  * files that were written, and to clear the dirty bits.  You should
533  * always call DFlush/DSync as a pair.
534  */
535 static int
536 DFlush(struct ubik_dbase *adbase)
537 {
538     int i;
539     afs_int32 code;
540     struct buffer *tb;
541
542     tb = Buffers;
543     for (i = 0; i < nbuffers; i++, tb++) {
544         if (tb->dirty) {
545             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
546             code =
547                 (*adbase->write) (adbase, tb->file, tb->data, code,
548                                   UBIK_PAGESIZE);
549             if (code != UBIK_PAGESIZE)
550                 return UIOERROR;
551         }
552     }
553     return 0;
554 }
555
556 /* flush all modified buffers */
557 static int
558 DAbort(struct ubik_dbase *adbase)
559 {
560     int i;
561     struct buffer *tb;
562
563     tb = Buffers;
564     for (i = 0; i < nbuffers; i++, tb++) {
565         if (tb->dirty) {
566             tb->dirty = 0;
567             tb->file = BADFID;
568             Dlru(tb);
569         }
570     }
571     return 0;
572 }
573
574 /* must only be called after DFlush, due to its interpretation of dirty flag */
575 static int
576 DSync(struct ubik_dbase *adbase)
577 {
578     int i;
579     afs_int32 code;
580     struct buffer *tb;
581     afs_int32 file;
582     afs_int32 rCode;
583
584     rCode = 0;
585     while (1) {
586         file = BADFID;
587         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
588             if (tb->dirty == 1) {
589                 if (file == BADFID)
590                     file = tb->file;
591                 if (file != BADFID && tb->file == file)
592                     tb->dirty = 0;
593             }
594         }
595         if (file == BADFID)
596             break;
597         /* otherwise we have a file to sync */
598         code = (*adbase->sync) (adbase, file);
599         if (code)
600             rCode = code;
601     }
602     return rCode;
603 }
604
605 /* Same as read, only do not even try to read the page */
606 static char *
607 DNew(struct ubik_dbase *dbase, afs_int32 fid, int page)
608 {
609     struct buffer *tb;
610
611     if ((tb = newslot(dbase, fid, page)) == 0)
612         return NULL;
613     tb->lockers++;
614     memset(tb->data, 0, UBIK_PAGESIZE);
615     return tb->data;
616 }
617
618 /* read data from database */
619 int
620 udisk_read(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
621            afs_int32 apos, afs_int32 alen)
622 {
623     char *bp;
624     afs_int32 offset, len, totalLen;
625     struct ubik_dbase *dbase;
626
627     if (atrans->flags & TRDONE)
628         return UDONE;
629     totalLen = 0;
630     dbase = atrans->dbase;
631     while (alen > 0) {
632         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
633         if (!bp)
634             return UEOF;
635         /* otherwise, min of remaining bytes and end of buffer to user mode */
636         offset = apos & (UBIK_PAGESIZE - 1);
637         len = UBIK_PAGESIZE - offset;
638         if (len > alen)
639             len = alen;
640         memcpy(abuffer, bp + offset, len);
641         abuffer += len;
642         apos += len;
643         alen -= len;
644         totalLen += len;
645         DRelease(bp, 0);
646     }
647     return 0;
648 }
649
650 /* truncate file */
651 int
652 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
653 {
654     afs_int32 code;
655     struct ubik_trunc *tt;
656
657     if (atrans->flags & TRDONE)
658         return UDONE;
659     if (atrans->type != UBIK_WRITETRANS)
660         return UBADTYPE;
661
662     /* write a truncate log record */
663     code = udisk_LogTruncate(atrans->dbase, afile, alength);
664
665     /* don't truncate until commit time */
666     tt = FindTrunc(atrans, afile);
667     if (!tt) {
668         /* this file not truncated yet */
669         tt = GetTrunc();
670         tt->next = atrans->activeTruncs;
671         atrans->activeTruncs = tt;
672         tt->file = afile;
673         tt->length = alength;
674     } else {
675         /* already truncated to a certain length */
676         if (tt->length > alength)
677             tt->length = alength;
678     }
679     return code;
680 }
681
682 /* write data to database, using logs */
683 int
684 udisk_write(struct ubik_trans *atrans, afs_int32 afile, char *abuffer,
685             afs_int32 apos, afs_int32 alen)
686 {
687     char *bp;
688     afs_int32 offset, len, totalLen;
689     struct ubik_dbase *dbase;
690     struct ubik_trunc *tt;
691     afs_int32 code;
692
693     if (atrans->flags & TRDONE)
694         return UDONE;
695     if (atrans->type != UBIK_WRITETRANS)
696         return UBADTYPE;
697
698     dbase = atrans->dbase;
699     /* first write the data to the log */
700     code = udisk_LogWriteData(dbase, afile, abuffer, apos, alen);
701     if (code)
702         return code;
703
704     /* expand any truncations of this file */
705     tt = FindTrunc(atrans, afile);
706     if (tt) {
707         if (tt->length < apos + alen) {
708             tt->length = apos + alen;
709         }
710     }
711
712     /* now update vm */
713     totalLen = 0;
714     while (alen > 0) {
715         bp = DRead(dbase, afile, apos >> UBIK_LOGPAGESIZE);
716         if (!bp) {
717             bp = DNew(dbase, afile, apos >> UBIK_LOGPAGESIZE);
718             if (!bp)
719                 return UIOERROR;
720             memset(bp, 0, UBIK_PAGESIZE);
721         }
722         /* otherwise, min of remaining bytes and end of buffer to user mode */
723         offset = apos & (UBIK_PAGESIZE - 1);
724         len = UBIK_PAGESIZE - offset;
725         if (len > alen)
726             len = alen;
727         memcpy(bp + offset, abuffer, len);
728         abuffer += len;
729         apos += len;
730         alen -= len;
731         totalLen += len;
732         DRelease(bp, 1);        /* buffer modified */
733     }
734     return 0;
735 }
736
737 /* begin a new local transaction */
738 int
739 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
740 {
741     afs_int32 code;
742     struct ubik_trans *tt;
743
744     *atrans = (struct ubik_trans *)NULL;
745     /* Make sure system is initialized before doing anything */
746     if (!initd) {
747         initd = 1;
748         DInit(ubik_nBuffers);
749     }
750     if (atype == UBIK_WRITETRANS) {
751         if (adbase->flags & DBWRITING)
752             return USYNC;
753         code = udisk_LogOpcode(adbase, LOGNEW, 0);
754         if (code)
755             return code;
756     }
757     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
758     memset(tt, 0, sizeof(struct ubik_trans));
759     tt->dbase = adbase;
760     tt->next = adbase->activeTrans;
761     adbase->activeTrans = tt;
762     tt->type = atype;
763     if (atype == UBIK_READTRANS)
764         adbase->readers++;
765     else if (atype == UBIK_WRITETRANS)
766         adbase->flags |= DBWRITING;
767     *atrans = tt;
768     return 0;
769 }
770
771 /* commit transaction */
772 int
773 udisk_commit(struct ubik_trans *atrans)
774 {
775     struct ubik_dbase *dbase;
776     afs_int32 code = 0;
777     struct ubik_version oldversion, newversion;
778
779     if (atrans->flags & TRDONE)
780         return (UTWOENDS);
781
782     if (atrans->type == UBIK_WRITETRANS) {
783         dbase = atrans->dbase;
784
785         /* On the first write to the database. We update the versions */
786         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
787             oldversion = dbase->version;
788             newversion.epoch = FT_ApproxTime();;
789             newversion.counter = 1;
790
791             code = (*dbase->setlabel) (dbase, 0, &newversion);
792             if (code)
793                 return (code);
794             ubik_epochTime = newversion.epoch;
795             dbase->version = newversion;
796
797             /* Ignore the error here. If the call fails, the site is
798              * marked down and when we detect it is up again, we will 
799              * send the entire database to it.
800              */
801             ContactQuorum(DISK_SetVersion, atrans, 1 /*CStampVersion */ ,
802                           &oldversion, &newversion);
803             urecovery_state |= UBIK_RECLABELDB;
804         }
805
806         dbase->version.counter++;       /* bump commit count */
807         LWP_NoYieldSignal(&dbase->version);
808
809         code = udisk_LogEnd(dbase, &dbase->version);
810         if (code) {
811             dbase->version.counter--;
812             return (code);
813         }
814
815         /* If we fail anytime after this, then panic and let the
816          * recovery replay the log. 
817          */
818         code = DFlush(dbase);   /* write dirty pages to respective files */
819         if (code)
820             panic("Writing Ubik DB modifications\n");
821         code = DSync(dbase);    /* sync the files and mark pages not dirty */
822         if (code)
823             panic("Synchronizing Ubik DB modifications\n");
824
825         code = DoTruncs(atrans);        /* Perform requested truncations */
826         if (code)
827             panic("Truncating Ubik DB\n");
828
829         /* label the committed dbase */
830         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
831         if (code)
832             panic("Truncating Ubik DB\n");
833
834         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
835         if (code)
836             panic("Truncating Ubik logfile\n");
837
838     }
839
840     /* When the transaction is marked done, it also means the logfile
841      * has been truncated.
842      */
843     atrans->flags |= TRDONE;
844     return code;
845 }
846
847 /* abort transaction */
848 int
849 udisk_abort(struct ubik_trans *atrans)
850 {
851     struct ubik_dbase *dbase;
852     afs_int32 code;
853
854     if (atrans->flags & TRDONE)
855         return UTWOENDS;
856
857     /* Check if we are the write trans before logging abort, lest we
858      * abort a good write trans in progress. 
859      * We don't really care if the LOGABORT gets to the log because we 
860      * truncate the log next. If the truncate fails, we panic; for 
861      * otherwise, the log entries remain. On restart, replay of the log
862      * will do nothing because the abort is there or no LogEnd opcode.
863      */
864     dbase = atrans->dbase;
865     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
866         udisk_LogOpcode(dbase, LOGABORT, 1);
867         code = (*dbase->truncate) (dbase, LOGFILE, 0);
868         if (code)
869             panic("Truncating Ubik logfile during an abort\n");
870         DAbort(dbase);          /* remove all dirty pages */
871     }
872
873     /* When the transaction is marked done, it also means the logfile
874      * has been truncated.
875      */
876     atrans->flags |= (TRABORT | TRDONE);
877     return 0;
878 }
879
880 /* destroy a transaction after it has been committed or aborted.  if
881  * it hasn't committed before you call this routine, we'll abort the
882  * transaction for you.
883  */
884 int
885 udisk_end(struct ubik_trans *atrans)
886 {
887     struct ubik_dbase *dbase;
888
889 #if defined(UBIK_PAUSE)
890     /* Another thread is trying to lock this transaction.
891      * That can only be an RPC doing SDISK_Lock.
892      * Unlock the transaction, 'cause otherwise the other
893      * thread will never wake up.  Don't free it because
894      * the caller will do that already.
895      */
896     if (atrans->flags & TRSETLOCK) {
897         atrans->flags |= TRSTALE;
898         ulock_relLock(atrans);
899         return;
900     }
901 #endif /* UBIK_PAUSE */
902     if (!(atrans->flags & TRDONE))
903         udisk_abort(atrans);
904     dbase = atrans->dbase;
905
906     ulock_relLock(atrans);
907     unthread(atrans);
908
909     /* check if we are the write trans before unsetting the DBWRITING bit, else
910      * we could be unsetting someone else's bit.
911      */
912     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
913         dbase->flags &= ~DBWRITING;
914     } else {
915         dbase->readers--;
916     }
917     if (atrans->iovec_info.iovec_wrt_val)
918         free(atrans->iovec_info.iovec_wrt_val);
919     if (atrans->iovec_data.iovec_buf_val)
920         free(atrans->iovec_data.iovec_buf_val);
921     free(atrans);
922
923     /* Wakeup any writers waiting in BeginTrans() */
924     LWP_NoYieldSignal(&dbase->flags);
925     return 0;
926 }