ubik: remove unused UBIK_PAUSE code
[openafs.git] / src / ubik / disk.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  *
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 #include <roken.h>
14
15 #include <sys/types.h>
16 #include <string.h>
17 #include <stdarg.h>
18 #include <errno.h>
19
20 #ifdef AFS_NT40_ENV
21 #include <winsock2.h>
22 #else
23 #include <sys/file.h>
24 #include <netinet/in.h>
25 #endif
26
27 #include <lock.h>
28 #include <rx/xdr.h>
29
30
31
32 #define UBIK_INTERNALS
33 #include "ubik.h"
34 #include "ubik_int.h"
35
36 #define PHSIZE  128
37 static struct buffer {
38     struct ubik_dbase *dbase;   /*!< dbase within which the buffer resides */
39     afs_int32 file;             /*!< Unique cache key */
40     afs_int32 page;             /*!< page number */
41     struct buffer *lru_next;
42     struct buffer *lru_prev;
43     struct buffer *hashNext;    /*!< next dude in hash table */
44     char *data;                 /*!< ptr to the data */
45     char lockers;               /*!< usage ref count */
46     char dirty;                 /*!< is buffer modified */
47     char hashIndex;             /*!< back ptr to hash table */
48 } *Buffers;
49
50 #define pHash(page) ((page) & (PHSIZE-1))
51
52 afs_int32 ubik_nBuffers = NBUFFERS;
53 static struct buffer *phTable[PHSIZE];  /*!< page hash table */
54 static struct buffer *LruBuffer;
55 static int nbuffers;
56 static int calls = 0, ios = 0, lastb = 0;
57 static char *BufferData;
58 static struct buffer *newslot(struct ubik_dbase *adbase, afs_int32 afid,
59                               afs_int32 apage);
60 static int initd = 0;
61 #define BADFID      0xffffffff
62
63 static int DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length);
64
65 static struct ubik_trunc *freeTruncList = 0;
66
67 /*!
68  * \brief Remove a transaction from the database's active transaction list.  Don't free it.
69  */
70 static int
71 unthread(struct ubik_trans *atrans)
72 {
73     struct ubik_trans **lt, *tt;
74     lt = &atrans->dbase->activeTrans;
75     for (tt = *lt; tt; lt = &tt->next, tt = *lt) {
76         if (tt == atrans) {
77             /* found it */
78             *lt = tt->next;
79             return 0;
80         }
81     }
82     return 2;                   /* no entry */
83 }
84
85 /*!
86  * \brief some debugging assistance
87  */
88 void
89 udisk_Debug(struct ubik_debug *aparm)
90 {
91     struct buffer *tb;
92     int i;
93
94     memcpy(&aparm->localVersion, &ubik_dbase->version,
95            sizeof(struct ubik_version));
96     aparm->lockedPages = 0;
97     aparm->writeLockedPages = 0;
98     tb = Buffers;
99     for (i = 0; i < nbuffers; i++, tb++) {
100         if (tb->lockers) {
101             aparm->lockedPages++;
102             if (tb->dirty)
103                 aparm->writeLockedPages++;
104         }
105     }
106 }
107
108 /*!
109  * \brief Write an opcode to the log.
110  *
111  * log format is defined here, and implicitly in recovery.c
112  *
113  * 4 byte opcode, followed by parameters, each 4 bytes long.  All integers
114  * are in logged in network standard byte order, in case we want to move logs
115  * from machine-to-machine someday.
116  *
117  * Begin transaction: opcode \n
118  * Commit transaction: opcode, version (8 bytes) \n
119  * Truncate file: opcode, file number, length \n
120  * Abort transaction: opcode \n
121  * Write data: opcode, file, position, length, <length> data bytes \n
122  */
123 int
124 udisk_LogOpcode(struct ubik_dbase *adbase, afs_int32 aopcode, int async)
125 {
126     struct ubik_stat ustat;
127     afs_int32 code;
128
129     /* figure out where to write */
130     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
131     if (code < 0)
132         return code;
133
134     /* setup data and do write */
135     aopcode = htonl(aopcode);
136     code =
137         (*adbase->write) (adbase, LOGFILE, (char *)&aopcode, ustat.size,
138                           sizeof(afs_int32));
139     if (code != sizeof(afs_int32))
140         return UIOERROR;
141
142     /* optionally sync data */
143     if (async)
144         code = (*adbase->sync) (adbase, LOGFILE);
145     else
146         code = 0;
147     return code;
148 }
149
150 /*!
151  * \brief Log a commit, never syncing.
152  */
153 int
154 udisk_LogEnd(struct ubik_dbase *adbase, struct ubik_version *aversion)
155 {
156     afs_int32 code;
157     afs_int32 data[3];
158     struct ubik_stat ustat;
159
160     /* figure out where to write */
161     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
162     if (code)
163         return code;
164
165     /* setup data */
166     data[0] = htonl(LOGEND);
167     data[1] = htonl(aversion->epoch);
168     data[2] = htonl(aversion->counter);
169
170     /* do write */
171     code =
172         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
173                           3 * sizeof(afs_int32));
174     if (code != 3 * sizeof(afs_int32))
175         return UIOERROR;
176
177     /* finally sync the log */
178     code = (*adbase->sync) (adbase, LOGFILE);
179     return code;
180 }
181
182 /*!
183  * \brief Log a truncate operation, never syncing.
184  */
185 int
186 udisk_LogTruncate(struct ubik_dbase *adbase, afs_int32 afile,
187                   afs_int32 alength)
188 {
189     afs_int32 code;
190     afs_int32 data[3];
191     struct ubik_stat ustat;
192
193     /* figure out where to write */
194     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
195     if (code < 0)
196         return code;
197
198     /* setup data */
199     data[0] = htonl(LOGTRUNCATE);
200     data[1] = htonl(afile);
201     data[2] = htonl(alength);
202
203     /* do write */
204     code =
205         (*adbase->write) (adbase, LOGFILE, (char *)data, ustat.size,
206                           3 * sizeof(afs_int32));
207     if (code != 3 * sizeof(afs_int32))
208         return UIOERROR;
209     return 0;
210 }
211
212 /*!
213  * \brief Write some data to the log, never syncing.
214  */
215 int
216 udisk_LogWriteData(struct ubik_dbase *adbase, afs_int32 afile, void *abuffer,
217                    afs_int32 apos, afs_int32 alen)
218 {
219     struct ubik_stat ustat;
220     afs_int32 code;
221     afs_int32 data[4];
222     afs_int32 lpos;
223
224     /* find end of log */
225     code = (*adbase->stat) (adbase, LOGFILE, &ustat);
226     lpos = ustat.size;
227     if (code < 0)
228         return code;
229
230     /* setup header */
231     data[0] = htonl(LOGDATA);
232     data[1] = htonl(afile);
233     data[2] = htonl(apos);
234     data[3] = htonl(alen);
235
236     /* write header */
237     code =
238         (*adbase->write) (adbase, LOGFILE, (char *)data, lpos, 4 * sizeof(afs_int32));
239     if (code != 4 * sizeof(afs_int32))
240         return UIOERROR;
241     lpos += 4 * sizeof(afs_int32);
242
243     /* write data */
244     code = (*adbase->write) (adbase, LOGFILE, abuffer, lpos, alen);
245     if (code != alen)
246         return UIOERROR;
247     return 0;
248 }
249
250 static int
251 DInit(int abuffers)
252 {
253     /* Initialize the venus buffer system. */
254     int i;
255     struct buffer *tb;
256     Buffers = (struct buffer *)malloc(abuffers * sizeof(struct buffer));
257     memset(Buffers, 0, abuffers * sizeof(struct buffer));
258     BufferData = (char *)malloc(abuffers * UBIK_PAGESIZE);
259     nbuffers = abuffers;
260     for (i = 0; i < PHSIZE; i++)
261         phTable[i] = 0;
262     for (i = 0; i < abuffers; i++) {
263         /* Fill in each buffer with an empty indication. */
264         tb = &Buffers[i];
265         tb->lru_next = &(Buffers[i + 1]);
266         tb->lru_prev = &(Buffers[i - 1]);
267         tb->data = &BufferData[UBIK_PAGESIZE * i];
268         tb->file = BADFID;
269     }
270     Buffers[0].lru_prev = &(Buffers[abuffers - 1]);
271     Buffers[abuffers - 1].lru_next = &(Buffers[0]);
272     LruBuffer = &(Buffers[0]);
273     return 0;
274 }
275
276 /*!
277  * \brief Take a buffer and mark it as the least recently used buffer.
278  */
279 static void
280 Dlru(struct buffer *abuf)
281 {
282     if (LruBuffer == abuf)
283         return;
284
285     /* Unthread from where it is in the list */
286     abuf->lru_next->lru_prev = abuf->lru_prev;
287     abuf->lru_prev->lru_next = abuf->lru_next;
288
289     /* Thread onto beginning of LRU list */
290     abuf->lru_next = LruBuffer;
291     abuf->lru_prev = LruBuffer->lru_prev;
292
293     LruBuffer->lru_prev->lru_next = abuf;
294     LruBuffer->lru_prev = abuf;
295     LruBuffer = abuf;
296 }
297
298 /*!
299  * \brief Take a buffer and mark it as the most recently used buffer.
300  */
301 static void
302 Dmru(struct buffer *abuf)
303 {
304     if (LruBuffer == abuf) {
305         LruBuffer = LruBuffer->lru_next;
306         return;
307     }
308
309     /* Unthread from where it is in the list */
310     abuf->lru_next->lru_prev = abuf->lru_prev;
311     abuf->lru_prev->lru_next = abuf->lru_next;
312
313     /* Thread onto end of LRU list - making it the MRU buffer */
314     abuf->lru_next = LruBuffer;
315     abuf->lru_prev = LruBuffer->lru_prev;
316     LruBuffer->lru_prev->lru_next = abuf;
317     LruBuffer->lru_prev = abuf;
318 }
319
320 static_inline int
321 MatchBuffer(struct buffer *buf, int page, afs_int32 fid,
322             struct ubik_trans *atrans)
323 {
324     if (buf->page != page) {
325         return 0;
326     }
327     if (buf->file != fid) {
328         return 0;
329     }
330     if (atrans->type == UBIK_READTRANS && buf->dirty) {
331         /* if 'buf' is dirty, it has uncommitted changes; we do not want to
332          * see uncommitted changes if we are a read transaction, so skip over
333          * it. */
334         return 0;
335     }
336     if (buf->dbase != atrans->dbase) {
337         return 0;
338     }
339     return 1;
340 }
341
342 /*!
343  * \brief Get a pointer to a particular buffer.
344  */
345 static char *
346 DRead(struct ubik_trans *atrans, afs_int32 fid, int page)
347 {
348     /* Read a page from the disk. */
349     struct buffer *tb, *lastbuffer, *found_tb = NULL;
350     afs_int32 code;
351     struct ubik_dbase *dbase = atrans->dbase;
352
353     calls++;
354     lastbuffer = LruBuffer->lru_prev;
355
356     /* Skip for write transactions for a clean page - this may not be the right page to use */
357     if (MatchBuffer(lastbuffer, page, fid, atrans)
358                 && (atrans->type == UBIK_READTRANS || lastbuffer->dirty)) {
359         tb = lastbuffer;
360         tb->lockers++;
361         lastb++;
362         return tb->data;
363     }
364     for (tb = phTable[pHash(page)]; tb; tb = tb->hashNext) {
365         if (MatchBuffer(tb, page, fid, atrans)) {
366             if (tb->dirty || atrans->type == UBIK_READTRANS) {
367                 found_tb = tb;
368                 break;
369             }
370             /* Remember this clean page - we might use it */
371             found_tb = tb;
372         }
373     }
374     /* For a write transaction, use a matching clean page if no dirty one was found */
375     if (found_tb) {
376         Dmru(found_tb);
377         found_tb->lockers++;
378         return found_tb->data;
379     }
380
381     /* can't find it */
382     tb = newslot(dbase, fid, page);
383     if (!tb)
384         return 0;
385     memset(tb->data, 0, UBIK_PAGESIZE);
386
387     tb->lockers++;
388     code =
389         (*dbase->read) (dbase, fid, tb->data, page * UBIK_PAGESIZE,
390                         UBIK_PAGESIZE);
391     if (code < 0) {
392         tb->file = BADFID;
393         Dlru(tb);
394         tb->lockers--;
395         ubik_print("Ubik: Error reading database file: errno=%d\n", errno);
396         return 0;
397     }
398     ios++;
399
400     /* Note that findslot sets the page field in the buffer equal to
401      * what it is searching for.
402      */
403     return tb->data;
404 }
405
406 /*!
407  * \brief Zap truncated pages.
408  */
409 static int
410 DTrunc(struct ubik_trans *atrans, afs_int32 fid, afs_int32 length)
411 {
412     afs_int32 maxPage;
413     struct buffer *tb;
414     int i;
415     struct ubik_dbase *dbase = atrans->dbase;
416
417     maxPage = (length + UBIK_PAGESIZE - 1) >> UBIK_LOGPAGESIZE; /* first invalid page now in file */
418     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
419         if (tb->page >= maxPage && tb->file == fid && tb->dbase == dbase) {
420             tb->file = BADFID;
421             Dlru(tb);
422         }
423     }
424     return 0;
425 }
426
427 /*!
428  * \brief Allocate a truncation entry.
429  *
430  * We allocate special entries representing truncations, rather than
431  * performing them immediately, so that we can abort a transaction easily by simply purging
432  * the in-core memory buffers and discarding these truncation entries.
433  */
434 static struct ubik_trunc *
435 GetTrunc(void)
436 {
437     struct ubik_trunc *tt;
438     if (!freeTruncList) {
439         freeTruncList =
440             (struct ubik_trunc *)malloc(sizeof(struct ubik_trunc));
441         freeTruncList->next = (struct ubik_trunc *)0;
442     }
443     tt = freeTruncList;
444     freeTruncList = tt->next;
445     return tt;
446 }
447
448 /*!
449  * \brief Free a truncation entry.
450  */
451 static int
452 PutTrunc(struct ubik_trunc *at)
453 {
454     at->next = freeTruncList;
455     freeTruncList = at;
456     return 0;
457 }
458
459 /*!
460  * \brief Find a truncation entry for a file, if any.
461  */
462 static struct ubik_trunc *
463 FindTrunc(struct ubik_trans *atrans, afs_int32 afile)
464 {
465     struct ubik_trunc *tt;
466     for (tt = atrans->activeTruncs; tt; tt = tt->next) {
467         if (tt->file == afile)
468             return tt;
469     }
470     return (struct ubik_trunc *)0;
471 }
472
473 /*!
474  * \brief Do truncates associated with \p atrans, and free them.
475  */
476 static int
477 DoTruncs(struct ubik_trans *atrans)
478 {
479     struct ubik_trunc *tt, *nt;
480     int (*tproc) (struct ubik_dbase *, afs_int32, afs_int32);
481     afs_int32 rcode = 0, code;
482
483     tproc = atrans->dbase->truncate;
484     for (tt = atrans->activeTruncs; tt; tt = nt) {
485         nt = tt->next;
486         DTrunc(atrans, tt->file, tt->length);   /* zap pages from buffer cache */
487         code = (*tproc) (atrans->dbase, tt->file, tt->length);
488         if (code)
489             rcode = code;
490         PutTrunc(tt);
491     }
492     /* don't unthread, because we do the entire list's worth here */
493     atrans->activeTruncs = (struct ubik_trunc *)0;
494     return (rcode);
495 }
496
497 /*!
498  * \brief Mark an \p fid as invalid.
499  */
500 int
501 udisk_Invalidate(struct ubik_dbase *adbase, afs_int32 afid)
502 {
503     struct buffer *tb;
504     int i;
505
506     for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
507         if (tb->file == afid) {
508             tb->file = BADFID;
509             Dlru(tb);
510         }
511     }
512     return 0;
513 }
514
515 /*!
516  * \brief Move this page into the correct hash bucket.
517  */
518 static int
519 FixupBucket(struct buffer *ap)
520 {
521     struct buffer **lp, *tp;
522     int i;
523     /* first try to get it out of its current hash bucket, in which it might not be */
524     i = ap->hashIndex;
525     lp = &phTable[i];
526     for (tp = *lp; tp; tp = tp->hashNext) {
527         if (tp == ap) {
528             *lp = tp->hashNext;
529             break;
530         }
531         lp = &tp->hashNext;
532     }
533     /* now figure the new hash bucket */
534     i = pHash(ap->page);
535     ap->hashIndex = i;          /* remember where we are for deletion */
536     ap->hashNext = phTable[i];  /* add us to the list */
537     phTable[i] = ap;
538     return 0;
539 }
540
541 /*!
542  * \brief Create a new slot for a particular dbase page.
543  */
544 static struct buffer *
545 newslot(struct ubik_dbase *adbase, afs_int32 afid, afs_int32 apage)
546 {
547     /* Find a usable buffer slot */
548     afs_int32 i;
549     struct buffer *pp, *tp;
550
551     pp = 0;                     /* last pure */
552     for (i = 0, tp = LruBuffer; i < nbuffers; i++, tp = tp->lru_next) {
553         if (!tp->lockers && !tp->dirty) {
554             pp = tp;
555             break;
556         }
557     }
558
559     if (pp == 0) {
560         /* There are no unlocked buffers that don't need to be written to the disk. */
561         ubik_print
562             ("Ubik: Internal Error: Unable to find free buffer in ubik cache\n");
563         return NULL;
564     }
565
566     /* Now fill in the header. */
567     pp->dbase = adbase;
568     pp->file = afid;
569     pp->page = apage;
570
571     FixupBucket(pp);            /* move to the right hash bucket */
572     Dmru(pp);
573     return pp;
574 }
575
576 /*!
577  * \brief Release a buffer, specifying whether or not the buffer has been modified by the locker.
578  */
579 static void
580 DRelease(char *ap, int flag)
581 {
582     int index;
583     struct buffer *bp;
584
585     if (!ap)
586         return;
587     index = (int)(ap - (char *)BufferData) >> UBIK_LOGPAGESIZE;
588     bp = &(Buffers[index]);
589     bp->lockers--;
590     if (flag)
591         bp->dirty = 1;
592     return;
593 }
594
595 /*!
596  * \brief Flush all modified buffers, leaves dirty bits set (they're cleared
597  * by DSync()).
598  *
599  * \note Note interaction with DSync(): you call this thing first,
600  * writing the buffers to the disk.  Then you call DSync() to sync all the
601  * files that were written, and to clear the dirty bits.  You should
602  * always call DFlush/DSync as a pair.
603  */
604 static int
605 DFlush(struct ubik_trans *atrans)
606 {
607     int i;
608     afs_int32 code;
609     struct buffer *tb;
610     struct ubik_dbase *adbase = atrans->dbase;
611
612     tb = Buffers;
613     for (i = 0; i < nbuffers; i++, tb++) {
614         if (tb->dirty) {
615             code = tb->page * UBIK_PAGESIZE;    /* offset within file */
616             code =
617                 (*adbase->write) (adbase, tb->file, tb->data, code,
618                                   UBIK_PAGESIZE);
619             if (code != UBIK_PAGESIZE)
620                 return UIOERROR;
621         }
622     }
623     return 0;
624 }
625
626 /*!
627  * \brief Flush all modified buffers.
628  */
629 static int
630 DAbort(struct ubik_trans *atrans)
631 {
632     int i;
633     struct buffer *tb;
634
635     tb = Buffers;
636     for (i = 0; i < nbuffers; i++, tb++) {
637         if (tb->dirty) {
638             tb->dirty = 0;
639             tb->file = BADFID;
640             Dlru(tb);
641         }
642     }
643     return 0;
644 }
645
646 /**
647  * Invalidate any buffers that are duplicates of abuf. Duplicate buffers
648  * can appear if a read transaction reads a page that is dirty, then that
649  * dirty page is synced. The read transaction will skip over the dirty page,
650  * and create a new buffer, and when the dirty page is synced, it will be
651  * identical (except for contents) to the read-transaction buffer.
652  */
653 static void
654 DedupBuffer(struct buffer *abuf)
655 {
656     struct buffer *tb;
657     for (tb = phTable[pHash(abuf->page)]; tb; tb = tb->hashNext) {
658         if (tb->page == abuf->page && tb != abuf && tb->file == abuf->file
659             && tb->dbase == abuf->dbase) {
660
661             tb->file = BADFID;
662             Dlru(tb);
663         }
664     }
665 }
666
667 /*!
668  * \attention DSync() must only be called after DFlush(), due to its interpretation of dirty flag.
669  */
670 static int
671 DSync(struct ubik_trans *atrans)
672 {
673     int i;
674     afs_int32 code;
675     struct buffer *tb;
676     afs_int32 file;
677     afs_int32 rCode;
678     struct ubik_dbase *adbase = atrans->dbase;
679
680     rCode = 0;
681     while (1) {
682         file = BADFID;
683         for (i = 0, tb = Buffers; i < nbuffers; i++, tb++) {
684             if (tb->dirty == 1) {
685                 if (file == BADFID)
686                     file = tb->file;
687                 if (file != BADFID && tb->file == file) {
688                     tb->dirty = 0;
689                     DedupBuffer(tb);
690                 }
691             }
692         }
693         if (file == BADFID)
694             break;
695         /* otherwise we have a file to sync */
696         code = (*adbase->sync) (adbase, file);
697         if (code)
698             rCode = code;
699     }
700     return rCode;
701 }
702
703 /*!
704  * \brief Same as DRead(), only do not even try to read the page.
705  */
706 static char *
707 DNew(struct ubik_trans *atrans, afs_int32 fid, int page)
708 {
709     struct buffer *tb;
710     struct ubik_dbase *dbase = atrans->dbase;
711
712     if ((tb = newslot(dbase, fid, page)) == 0)
713         return NULL;
714     tb->lockers++;
715     memset(tb->data, 0, UBIK_PAGESIZE);
716     return tb->data;
717 }
718
719 /*!
720  * \brief Read data from database.
721  */
722 int
723 udisk_read(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
724            afs_int32 apos, afs_int32 alen)
725 {
726     char *bp;
727     afs_int32 offset, len, totalLen;
728
729     if (atrans->flags & TRDONE)
730         return UDONE;
731     totalLen = 0;
732     while (alen > 0) {
733         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
734         if (!bp)
735             return UEOF;
736         /* otherwise, min of remaining bytes and end of buffer to user mode */
737         offset = apos & (UBIK_PAGESIZE - 1);
738         len = UBIK_PAGESIZE - offset;
739         if (len > alen)
740             len = alen;
741         memcpy(abuffer, bp + offset, len);
742         abuffer = (char *)abuffer + len;
743         apos += len;
744         alen -= len;
745         totalLen += len;
746         DRelease(bp, 0);
747     }
748     return 0;
749 }
750
751 /*!
752  * \brief Truncate file.
753  */
754 int
755 udisk_truncate(struct ubik_trans *atrans, afs_int32 afile, afs_int32 alength)
756 {
757     afs_int32 code;
758     struct ubik_trunc *tt;
759
760     if (atrans->flags & TRDONE)
761         return UDONE;
762     if (atrans->type != UBIK_WRITETRANS)
763         return UBADTYPE;
764
765     /* write a truncate log record */
766     code = udisk_LogTruncate(atrans->dbase, afile, alength);
767
768     /* don't truncate until commit time */
769     tt = FindTrunc(atrans, afile);
770     if (!tt) {
771         /* this file not truncated yet */
772         tt = GetTrunc();
773         tt->next = atrans->activeTruncs;
774         atrans->activeTruncs = tt;
775         tt->file = afile;
776         tt->length = alength;
777     } else {
778         /* already truncated to a certain length */
779         if (tt->length > alength)
780             tt->length = alength;
781     }
782     return code;
783 }
784
785 /*!
786  * \brief Write data to database, using logs.
787  */
788 int
789 udisk_write(struct ubik_trans *atrans, afs_int32 afile, void *abuffer,
790             afs_int32 apos, afs_int32 alen)
791 {
792     char *bp;
793     afs_int32 offset, len, totalLen;
794     struct ubik_trunc *tt;
795     afs_int32 code;
796
797     if (atrans->flags & TRDONE)
798         return UDONE;
799     if (atrans->type != UBIK_WRITETRANS)
800         return UBADTYPE;
801
802     /* first write the data to the log */
803     code = udisk_LogWriteData(atrans->dbase, afile, abuffer, apos, alen);
804     if (code)
805         return code;
806
807     /* expand any truncations of this file */
808     tt = FindTrunc(atrans, afile);
809     if (tt) {
810         if (tt->length < apos + alen) {
811             tt->length = apos + alen;
812         }
813     }
814
815     /* now update vm */
816     totalLen = 0;
817     while (alen > 0) {
818         bp = DRead(atrans, afile, apos >> UBIK_LOGPAGESIZE);
819         if (!bp) {
820             bp = DNew(atrans, afile, apos >> UBIK_LOGPAGESIZE);
821             if (!bp)
822                 return UIOERROR;
823             memset(bp, 0, UBIK_PAGESIZE);
824         }
825         /* otherwise, min of remaining bytes and end of buffer to user mode */
826         offset = apos & (UBIK_PAGESIZE - 1);
827         len = UBIK_PAGESIZE - offset;
828         if (len > alen)
829             len = alen;
830         memcpy(bp + offset, abuffer, len);
831         abuffer = (char *)abuffer + len;
832         apos += len;
833         alen -= len;
834         totalLen += len;
835         DRelease(bp, 1);        /* buffer modified */
836     }
837     return 0;
838 }
839
840 /*!
841  * \brief Begin a new local transaction.
842  */
843 int
844 udisk_begin(struct ubik_dbase *adbase, int atype, struct ubik_trans **atrans)
845 {
846     afs_int32 code;
847     struct ubik_trans *tt;
848
849     *atrans = (struct ubik_trans *)NULL;
850     /* Make sure system is initialized before doing anything */
851     if (!initd) {
852         initd = 1;
853         DInit(ubik_nBuffers);
854     }
855     if (atype == UBIK_WRITETRANS) {
856         if (adbase->flags & DBWRITING)
857             return USYNC;
858         code = udisk_LogOpcode(adbase, LOGNEW, 0);
859         if (code)
860             return code;
861     }
862     tt = (struct ubik_trans *)malloc(sizeof(struct ubik_trans));
863     memset(tt, 0, sizeof(struct ubik_trans));
864     tt->dbase = adbase;
865     tt->next = adbase->activeTrans;
866     adbase->activeTrans = tt;
867     tt->type = atype;
868     if (atype == UBIK_READTRANS)
869         adbase->readers++;
870     else if (atype == UBIK_WRITETRANS)
871         adbase->flags |= DBWRITING;
872     *atrans = tt;
873     return 0;
874 }
875
876 /*!
877  * \brief Commit transaction.
878  */
879 int
880 udisk_commit(struct ubik_trans *atrans)
881 {
882     struct ubik_dbase *dbase;
883     afs_int32 code = 0;
884     struct ubik_version oldversion, newversion;
885
886     if (atrans->flags & TRDONE)
887         return (UTWOENDS);
888
889     if (atrans->type == UBIK_WRITETRANS) {
890         dbase = atrans->dbase;
891
892         /* On the first write to the database. We update the versions */
893         if (ubeacon_AmSyncSite() && !(urecovery_state & UBIK_RECLABELDB)) {
894             oldversion = dbase->version;
895             newversion.epoch = FT_ApproxTime();;
896             newversion.counter = 1;
897
898             code = (*dbase->setlabel) (dbase, 0, &newversion);
899             if (code)
900                 return (code);
901             ubik_epochTime = newversion.epoch;
902             dbase->version = newversion;
903
904             /* Ignore the error here. If the call fails, the site is
905              * marked down and when we detect it is up again, we will
906              * send the entire database to it.
907              */
908             ContactQuorum_DISK_SetVersion( atrans, 1 /*CStampVersion */ ,
909                                            &oldversion, &newversion);
910             urecovery_state |= UBIK_RECLABELDB;
911         }
912
913         dbase->version.counter++;       /* bump commit count */
914 #ifdef AFS_PTHREAD_ENV
915         CV_BROADCAST(&dbase->version_cond);
916 #else
917         LWP_NoYieldSignal(&dbase->version);
918 #endif
919         code = udisk_LogEnd(dbase, &dbase->version);
920         if (code) {
921             dbase->version.counter--;
922             return (code);
923         }
924
925         /* If we fail anytime after this, then panic and let the
926          * recovery replay the log.
927          */
928         code = DFlush(atrans);  /* write dirty pages to respective files */
929         if (code)
930             panic("Writing Ubik DB modifications\n");
931         code = DSync(atrans);   /* sync the files and mark pages not dirty */
932         if (code)
933             panic("Synchronizing Ubik DB modifications\n");
934
935         code = DoTruncs(atrans);        /* Perform requested truncations */
936         if (code)
937             panic("Truncating Ubik DB\n");
938
939         /* label the committed dbase */
940         code = (*dbase->setlabel) (dbase, 0, &dbase->version);
941         if (code)
942             panic("Truncating Ubik DB\n");
943
944         code = (*dbase->truncate) (dbase, LOGFILE, 0);  /* discard log (optional) */
945         if (code)
946             panic("Truncating Ubik logfile\n");
947
948     }
949
950     /* When the transaction is marked done, it also means the logfile
951      * has been truncated.
952      */
953     atrans->flags |= TRDONE;
954     return code;
955 }
956
957 /*!
958  * \brief Abort transaction.
959  */
960 int
961 udisk_abort(struct ubik_trans *atrans)
962 {
963     struct ubik_dbase *dbase;
964     afs_int32 code;
965
966     if (atrans->flags & TRDONE)
967         return UTWOENDS;
968
969     /* Check if we are the write trans before logging abort, lest we
970      * abort a good write trans in progress.
971      * We don't really care if the LOGABORT gets to the log because we
972      * truncate the log next. If the truncate fails, we panic; for
973      * otherwise, the log entries remain. On restart, replay of the log
974      * will do nothing because the abort is there or no LogEnd opcode.
975      */
976     dbase = atrans->dbase;
977     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
978         udisk_LogOpcode(dbase, LOGABORT, 1);
979         code = (*dbase->truncate) (dbase, LOGFILE, 0);
980         if (code)
981             panic("Truncating Ubik logfile during an abort\n");
982         DAbort(atrans);         /* remove all dirty pages */
983     }
984
985     /* When the transaction is marked done, it also means the logfile
986      * has been truncated.
987      */
988     atrans->flags |= (TRABORT | TRDONE);
989     return 0;
990 }
991
992 /*!
993  * \brief Destroy a transaction after it has been committed or aborted.
994  *
995  * If it hasn't committed before you call this routine, we'll abort the
996  * transaction for you.
997  */
998 int
999 udisk_end(struct ubik_trans *atrans)
1000 {
1001     struct ubik_dbase *dbase;
1002
1003     if (!(atrans->flags & TRDONE))
1004         udisk_abort(atrans);
1005     dbase = atrans->dbase;
1006
1007     ulock_relLock(atrans);
1008     unthread(atrans);
1009
1010     /* check if we are the write trans before unsetting the DBWRITING bit, else
1011      * we could be unsetting someone else's bit.
1012      */
1013     if (atrans->type == UBIK_WRITETRANS && dbase->flags & DBWRITING) {
1014         dbase->flags &= ~DBWRITING;
1015     } else {
1016         dbase->readers--;
1017     }
1018     if (atrans->iovec_info.iovec_wrt_val)
1019         free(atrans->iovec_info.iovec_wrt_val);
1020     if (atrans->iovec_data.iovec_buf_val)
1021         free(atrans->iovec_data.iovec_buf_val);
1022     free(atrans);
1023
1024     /* Wakeup any writers waiting in BeginTrans() */
1025 #ifdef AFS_PTHREAD_ENV
1026     CV_BROADCAST(&dbase->flags_cond);
1027 #else
1028     LWP_NoYieldSignal(&dbase->flags);
1029 #endif
1030     return 0;
1031 }