ubik-pause-collapsing-20020624
[openafs.git] / src / ubik / remote.c
1 /*
2  * Copyright 2000, International Business Machines Corporation and others.
3  * All Rights Reserved.
4  * 
5  * This software has been released under the terms of the IBM Public
6  * License.  For details, see the LICENSE file in the top-level source
7  * directory or online at http://www.openafs.org/dl/license10.html
8  */
9
10 #include <afsconfig.h>
11 #include <afs/param.h>
12
13 RCSID("$Header$");
14
15 #include <sys/types.h>
16 #ifdef AFS_NT40_ENV
17 #include <winsock2.h>
18 #else
19 #include <sys/file.h>
20 #include <netinet/in.h>
21 #endif
22 #ifdef HAVE_STRING_H
23 #include <string.h>
24 #else
25 #ifdef HAVE_STRINGS_H
26 #include <strings.h>
27 #endif
28 #endif
29 #include <lock.h>
30 #include <rx/xdr.h>
31 #include <rx/rx.h>
32
33 #define UBIK_INTERNALS
34 #include "ubik.h"
35 #include "ubik_int.h"
36 int (*ubik_CheckRXSecurityProc)();
37 char *ubik_CheckRXSecurityRock;
38 void printServerInfo();
39
40 /* routines for handling requests remotely-submitted by the sync site.  These are
41     only write transactions (we don't propagate read trans), and there is at most one
42     write transaction extant at any one time.
43 */
44
45 struct ubik_trans *ubik_currentTrans = 0;
46
47
48 ubik_CheckAuth(acall)
49 register struct rx_call *acall; {
50     register afs_int32 code;
51     if (ubik_CheckRXSecurityProc) {
52         code = (*ubik_CheckRXSecurityProc)(ubik_CheckRXSecurityRock, acall);
53         return code;
54     }
55     else return 0;
56 }
57
58 /* the rest of these guys handle remote execution of write
59  * transactions: this is the code executed on the other servers when a
60  * sync site is executing a write transaction.
61  */
62 afs_int32 SDISK_Begin(rxcall, atid)
63     register struct rx_call *rxcall;
64     struct ubik_tid *atid;
65 {
66     register afs_int32 code;
67
68     if ((code = ubik_CheckAuth(rxcall))) {
69       return code;
70     }
71     DBHOLD(ubik_dbase);
72     urecovery_CheckTid(atid);
73     if (ubik_currentTrans) {
74         /* If the thread is not waiting for lock - ok to end it */
75 #if !defined(UBIK_PAUSE)
76         if (ubik_currentTrans->locktype != LOCKWAIT) {
77 #endif /* UBIK_PAUSE */
78            udisk_end(ubik_currentTrans);
79 #if !defined(UBIK_PAUSE)
80         }
81 #endif /* UBIK_PAUSE */
82         ubik_currentTrans = (struct ubik_trans *) 0;
83     }
84     code = udisk_begin(ubik_dbase, UBIK_WRITETRANS, &ubik_currentTrans);
85     if (!code && ubik_currentTrans) {
86         /* label this trans with the right trans id */
87         ubik_currentTrans->tid.epoch = atid->epoch;
88         ubik_currentTrans->tid.counter = atid->counter;
89     }
90     DBRELE(ubik_dbase);
91     return code;}
92
93
94 afs_int32 SDISK_Commit(rxcall, atid)
95     register struct rx_call *rxcall;
96     struct ubik_tid *atid;
97 {
98     register afs_int32 code;
99     register struct ubik_dbase *dbase;
100     
101     if ((code = ubik_CheckAuth(rxcall))) {
102       return code;
103     }
104
105     if (!ubik_currentTrans) {
106       return USYNC;
107     }
108     /*
109      * sanity check to make sure only write trans appear here
110      */
111     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
112       return UBADTYPE;
113     }
114
115     dbase = ubik_currentTrans->dbase;
116     DBHOLD(dbase);
117     urecovery_CheckTid(atid);
118     if (!ubik_currentTrans) {
119         DBRELE(dbase);
120         return USYNC;
121     }
122
123     code = udisk_commit(ubik_currentTrans);
124     if (code == 0) {
125         /* sync site should now match */
126         ubik_dbVersion = ubik_dbase->version;
127     }
128     DBRELE(dbase);
129     return code;
130 }
131
132 afs_int32 SDISK_ReleaseLocks(rxcall, atid)
133     register struct rx_call *rxcall;
134     struct ubik_tid *atid;
135 {
136     register struct ubik_dbase *dbase;
137     register afs_int32 code;
138
139     if ((code = ubik_CheckAuth(rxcall))) {
140       return code;
141     }
142
143     if (!ubik_currentTrans) {
144       return USYNC;
145     }
146     /* sanity check to make sure only write trans appear here */
147     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
148       return UBADTYPE;
149     }
150
151     dbase = ubik_currentTrans->dbase;
152     DBHOLD(dbase);
153     urecovery_CheckTid(atid);
154     if (!ubik_currentTrans) {
155         DBRELE(dbase);
156         return USYNC;
157     }
158
159     /* If the thread is not waiting for lock - ok to end it */
160 #if !defined(UBIK_PAUSE)
161     if (ubik_currentTrans->locktype != LOCKWAIT) {
162 #endif /* UBIK_PAUSE */
163         udisk_end(ubik_currentTrans);
164 #if !defined(UBIK_PAUSE)
165     }    
166 #endif /* UBIK_PAUSE */
167     ubik_currentTrans = (struct ubik_trans *) 0;
168     DBRELE(dbase);
169     return 0;
170 }
171
172 afs_int32 SDISK_Abort(rxcall, atid)
173     register struct rx_call *rxcall;
174     struct ubik_tid *atid;
175 {
176     register afs_int32 code;
177     register struct ubik_dbase *dbase;
178     
179     if ((code = ubik_CheckAuth(rxcall))) {
180       return code;
181     }
182
183     if (!ubik_currentTrans) {
184       return USYNC;
185     }
186     /* sanity check to make sure only write trans appear here  */
187     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
188       return UBADTYPE;
189     }
190
191     dbase = ubik_currentTrans->dbase;
192     DBHOLD(dbase);
193     urecovery_CheckTid(atid);
194     if (!ubik_currentTrans) {
195         DBRELE(dbase);
196         return USYNC;
197     }
198
199     code = udisk_abort(ubik_currentTrans);
200     /* If the thread is not waiting for lock - ok to end it */
201 #if !defined(UBIK_PAUSE)
202     if (ubik_currentTrans->locktype != LOCKWAIT) {
203 #endif /* UBIK_PAUSE */
204         udisk_end(ubik_currentTrans);
205 #if !defined(UBIK_PAUSE)
206      }
207 #endif /* UBIK_PAUSE */
208     ubik_currentTrans = (struct ubik_trans *) 0;
209     DBRELE(dbase);
210     return code;
211 }
212
213 afs_int32 SDISK_Lock(rxcall, atid, afile, apos, alen, atype)
214     register struct rx_call *rxcall;
215     struct ubik_tid *atid;
216     afs_int32 afile, apos, alen, atype;   /* apos and alen are not used */
217 {
218     register afs_int32 code;
219     register struct ubik_dbase *dbase;
220     struct ubik_trans *ubik_thisTrans;
221
222     if ((code = ubik_CheckAuth(rxcall))) {
223       return code;
224     }
225     if (!ubik_currentTrans) {
226       return USYNC;
227     }
228     /* sanity check to make sure only write trans appear here */
229     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
230       return UBADTYPE;
231     }
232     if (alen != 1) {
233       return UBADLOCK;
234     }
235     dbase = ubik_currentTrans->dbase;
236     DBHOLD(dbase);
237     urecovery_CheckTid(atid);
238     if (!ubik_currentTrans) {
239         DBRELE(dbase);
240         return USYNC;
241     }
242
243     ubik_thisTrans = ubik_currentTrans;
244     code = ulock_getLock(ubik_currentTrans, atype, 1);
245
246     /* While waiting, the transaction may have been ended/
247      * aborted from under us (urecovery_CheckTid). In that
248      * case, end the transaction here.
249      */
250     if (!code && (ubik_currentTrans != ubik_thisTrans)) {
251        udisk_end(ubik_thisTrans);
252        code = USYNC;
253     }
254
255     DBRELE(dbase);
256     return code;
257 }
258
259 /* Write a vector of data */
260 afs_int32 SDISK_WriteV(rxcall, atid, io_vector, io_buffer)
261     register struct rx_call *rxcall;
262     struct ubik_tid *atid;
263     iovec_wrt *io_vector;
264     iovec_buf *io_buffer;
265 {
266     afs_int32 code, i, offset;
267     struct ubik_dbase *dbase;
268     struct ubik_iovec *iovec;
269     char              *iobuf;
270
271     if ((code = ubik_CheckAuth(rxcall))) {
272       return code;
273     }
274     if (!ubik_currentTrans) {
275       return USYNC;
276     }
277     /* sanity check to make sure only write trans appear here */
278     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
279       return UBADTYPE;
280     }
281
282     dbase = ubik_currentTrans->dbase;
283     DBHOLD(dbase);
284     urecovery_CheckTid(atid);
285     if (!ubik_currentTrans) {
286         DBRELE(dbase);
287         return USYNC;
288     }
289
290     iovec = (struct ubik_iovec *)io_vector->iovec_wrt_val;
291     iobuf = (char *)io_buffer->iovec_buf_val;
292     for (i=0, offset=0; i<io_vector->iovec_wrt_len; i++) {
293        /* Sanity check for going off end of buffer */
294        if ((offset + iovec[i].length) > io_buffer->iovec_buf_len) {
295           code = UINTERNAL;
296        } else {
297           code = udisk_write(ubik_currentTrans, iovec[i].file,     &iobuf[offset],
298                                                 iovec[i].position, iovec[i].length);
299        }
300        if (code) break;
301
302        offset += iovec[i].length;
303     }
304
305     DBRELE(dbase);
306     return code;
307 }
308
309 afs_int32 SDISK_Write(rxcall, atid, afile, apos, adata)
310     register struct rx_call *rxcall;
311     struct ubik_tid *atid;
312     afs_int32 afile, apos;
313     register bulkdata *adata; 
314 {
315     register afs_int32 code;
316     register struct ubik_dbase *dbase;
317
318     if ((code = ubik_CheckAuth(rxcall))) {
319       return code;
320     }
321     if (!ubik_currentTrans) {
322       return USYNC;
323     }
324     /* sanity check to make sure only write trans appear here */
325     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
326       return UBADTYPE;
327     }
328
329     dbase = ubik_currentTrans->dbase;
330     DBHOLD(dbase);
331     urecovery_CheckTid(atid);
332     if (!ubik_currentTrans) {
333         DBRELE(dbase);
334         return USYNC;
335     }
336     code = udisk_write(ubik_currentTrans, afile, adata->bulkdata_val, apos, adata->bulkdata_len);
337     DBRELE(dbase);
338     return code;
339 }
340
341 afs_int32 SDISK_Truncate(rxcall, atid, afile, alen)
342     register struct rx_call *rxcall;
343     struct ubik_tid *atid;
344     afs_int32 afile;
345     afs_int32 alen; 
346 {
347     register afs_int32 code;
348     register struct ubik_dbase *dbase;
349
350     if ((code = ubik_CheckAuth(rxcall))) {
351       return code;
352     }
353     if (!ubik_currentTrans) { 
354       return USYNC;
355     }
356     /* sanity check to make sure only write trans appear here */
357     if (ubik_currentTrans->type != UBIK_WRITETRANS) {
358       return UBADTYPE;
359     }
360
361     dbase = ubik_currentTrans->dbase;
362     DBHOLD(dbase);
363     urecovery_CheckTid(atid);
364     if (!ubik_currentTrans) {
365         DBRELE(dbase);
366         return USYNC;
367     }
368     code = udisk_truncate(ubik_currentTrans, afile, alen);
369     DBRELE(dbase);
370     return code;
371 }
372
373 afs_int32 SDISK_GetVersion(rxcall, aversion)
374     register struct rx_call *rxcall;
375     register struct ubik_version *aversion; 
376 {
377     register afs_int32 code;
378
379     if ((code = ubik_CheckAuth(rxcall))) {
380       return code;
381     }
382
383     /*
384      * If we are the sync site, recovery shouldn't be running on any
385      * other site. We shouldn't be getting this RPC as long as we are
386      * the sync site.  To prevent any unforseen activity, we should
387      * reject this RPC until we have recognized that we are not the
388      * sync site anymore, and/or if we have any pending WRITE
389      * transactions that have to complete. This way we can be assured
390      * that this RPC would not block any pending transactions that
391      * should either fail or pass. If we have recognized the fact that
392      * we are not the sync site any more, all write transactions would
393      * fail with UNOQUORUM anyway.
394      */
395     if (ubeacon_AmSyncSite()) {
396       return UDEADLOCK;
397     }
398
399     DBHOLD(ubik_dbase);
400     code = (*ubik_dbase->getlabel) (ubik_dbase, 0, aversion);
401     DBRELE(ubik_dbase);
402     if (code) {
403         /* tell other side there's no dbase */
404         aversion->epoch = 0;
405         aversion->counter = 0;
406     }
407     return 0;
408 }
409
410 afs_int32 SDISK_GetFile(rxcall, file, version)
411     register struct rx_call *rxcall;
412     register afs_int32 file;
413     struct ubik_version *version;
414 {
415     register afs_int32 code;
416     register struct ubik_dbase *dbase;
417     register afs_int32 offset;
418     struct ubik_stat ubikstat;
419     char tbuffer[256];
420     afs_int32 tlen;
421     afs_int32 length;
422     
423     if ((code = ubik_CheckAuth(rxcall))) {
424       return code;
425     }
426 /* temporarily disabled because it causes problems for migration tool.  Hey, it's just
427  * a sanity check, anyway. 
428     if (ubeacon_AmSyncSite()) {
429       return UDEADLOCK;
430     }
431 */
432     dbase = ubik_dbase;
433     DBHOLD(dbase);
434     code = (*dbase->stat) (dbase, file, &ubikstat);
435     if (code < 0) {
436         DBRELE(dbase);
437         return code;
438     }
439     length = ubikstat.size;
440     tlen = htonl(length);
441     code = rx_Write(rxcall, &tlen, sizeof(afs_int32));
442     if (code != sizeof(afs_int32)) {
443         DBRELE(dbase);
444         return BULK_ERROR;
445     }
446     offset = 0;
447     while (length > 0) {
448         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
449         code = (*dbase->read)(dbase, file, tbuffer, offset, tlen);
450         if (code != tlen) {
451             DBRELE(dbase);
452             return UIOERROR;
453         }
454         code = rx_Write(rxcall, tbuffer, tlen);
455         if (code != tlen) {
456             DBRELE(dbase);
457             return BULK_ERROR;
458         }
459         length -= tlen;
460         offset += tlen;
461     }
462     code = (*dbase->getlabel)(dbase, file, version);    /* return the dbase, too */
463     DBRELE(dbase);
464     return code;
465 }
466
467 afs_int32 SDISK_SendFile(rxcall, file, length, avers)
468     register struct rx_call *rxcall;
469     afs_int32 file;
470     afs_int32 length;
471     struct ubik_version *avers;
472 {
473     register afs_int32 code;
474     register struct ubik_dbase *dbase;
475     char tbuffer[256];
476     afs_int32 offset;
477     struct ubik_version tversion;
478     register int tlen;
479     struct rx_peer *tpeer;
480     struct rx_connection *tconn;
481     afs_uint32  otherHost;
482
483     /* send the file back to the requester */
484     
485     if ((code = ubik_CheckAuth(rxcall))) {
486       goto failed;
487     }
488
489     /* next, we do a sanity check to see if the guy sending us the database is
490      * the guy we think is the sync site.  It turns out that we might not have
491      * decided yet that someone's the sync site, but they could have enough
492      * votes from others to be sync site anyway, and could send us the database
493      * in advance of getting our votes.  This is fine, what we're really trying
494      * to check is that some authenticated bogon isn't sending a random database
495      * into another configuration.  This could happen on a bad configuration
496      * screwup.  Thus, we only object if we're sure we know who the sync site
497      * is, and it ain't the guy talking to us.
498      */
499     offset = uvote_GetSyncSite();
500     tconn = rx_ConnectionOf(rxcall);
501     tpeer = rx_PeerOf(tconn);
502     otherHost = ubikGetPrimaryInterfaceAddr(rx_HostOf(tpeer));
503     if (offset && offset != otherHost ) {
504                         /* we *know* this is the wrong guy */
505         code = USYNC;
506         goto failed;
507     }
508
509     dbase = ubik_dbase;
510     DBHOLD(dbase);
511
512     /* abort any active trans that may scribble over the database */
513     urecovery_AbortAll(dbase);
514
515     ubik_print("Ubik: Synchronize database with server %s\n",
516                afs_inet_ntoa(otherHost));
517
518     offset = 0;
519     (*dbase->truncate) (dbase, file, 0);                /* truncate first */
520     tversion.epoch = 0; /* start off by labelling in-transit db as invalid */
521     tversion.counter = 0;
522     (*dbase->setlabel) (dbase, file, &tversion);        /* setlabel does sync */
523     while (length > 0) {
524         tlen = (length > sizeof(tbuffer) ? sizeof(tbuffer) : length);
525         code = rx_Read(rxcall, tbuffer, tlen);
526         if (code != tlen) {
527             DBRELE(dbase);
528             code = BULK_ERROR;
529             goto failed;
530         }
531         code = (*dbase->write)(dbase, file, tbuffer, offset, tlen);
532         if (code != tlen) {
533             DBRELE(dbase);
534             code = UIOERROR;
535             goto failed;
536         }
537         offset += tlen;
538         length -= tlen;
539     }
540
541     /* sync data first, then write label and resync (resync done by setlabel call).
542        This way, good label is only on good database. */
543     (*ubik_dbase->sync)(dbase, file);
544     code = (*ubik_dbase->setlabel)(dbase, file, avers);
545     memcpy(&ubik_dbase->version, avers, sizeof(struct ubik_version));
546     udisk_Invalidate(dbase, file); /* new dbase, flush disk buffers */
547     LWP_NoYieldSignal(&dbase->version);
548     DBRELE(dbase);
549 failed:
550     if (code) {
551        ubik_print("Ubik: Synchronize database with server %s failed (error = %d)\n", 
552                   afs_inet_ntoa(otherHost), code);
553     } else {
554        ubik_print("Ubik: Synchronize database completed\n");
555     }
556     return code;
557 }
558
559
560 afs_int32 SDISK_Probe(rxcall)
561     register struct rx_call *rxcall;
562 {
563     return 0;
564 }
565
566 /*
567 * Update remote machines addresses in my server list
568 * Send back my addresses to caller of this RPC
569 * Returns zero on success, else 1.
570 */
571 afs_int32 SDISK_UpdateInterfaceAddr(rxcall, inAddr, outAddr)
572 register struct rx_call *rxcall;
573 UbikInterfaceAddr       *inAddr, *outAddr;
574 {
575     struct ubik_server *ts, *tmp;
576     afs_uint32 remoteAddr;              /* in net byte order */
577     int    i, j, found=0, probableMatch=0;
578
579     /* copy the output parameters */
580     for ( i=0; i < UBIK_MAX_INTERFACE_ADDR; i++)
581         outAddr->hostAddr[i] = ntohl(ubik_host[i]);
582
583     remoteAddr = htonl(inAddr->hostAddr[0]);
584     for(ts = ubik_servers; ts; ts=ts->next)
585         if ( ts->addr[0] == remoteAddr ) /* both in net byte order */
586         {
587             probableMatch = 1;
588             break;
589         }
590
591     if ( probableMatch )                
592     {
593         /* verify that all addresses in the incoming RPC are
594         ** not part of other server entries in my CellServDB
595         */
596         for ( i=0; !found && (i<UBIK_MAX_INTERFACE_ADDR) 
597                                 && inAddr->hostAddr[i]; i++)
598         {
599             remoteAddr = htonl(inAddr->hostAddr[i]);
600             for(tmp = ubik_servers; (!found && tmp); tmp=tmp->next)
601             {
602                 if ( ts == tmp )        /* this is my server */ 
603                         continue;
604                 for ( j=0; (j<UBIK_MAX_INTERFACE_ADDR) && tmp->addr[j]; j++)
605                     if ( remoteAddr == tmp->addr[j] )
606                     {
607                         found = 1;
608                         break;
609                     }
610             }
611         }
612     }           /* if (probableMatch) */
613     
614     /* inconsistent addresses in CellServDB */
615     if ( !probableMatch || found )      
616     {
617         ubik_print("Inconsistent Cell Info from server: ");
618         for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && inAddr->hostAddr[i]; i++)
619             ubik_print("%s ", afs_inet_ntoa(htonl(inAddr->hostAddr[i])));
620         ubik_print("\n");
621         fflush(stdout); fflush(stderr);
622         printServerInfo();
623         return UBADHOST;
624     }
625
626     /* update our data structures */
627     for ( i=1; i < UBIK_MAX_INTERFACE_ADDR; i++)
628             ts->addr[i] = htonl(inAddr->hostAddr[i]);
629     
630     ubik_print("ubik: A Remote Server has addresses: ");
631     for ( i=0; i < UBIK_MAX_INTERFACE_ADDR && ts->addr[i]; i++)
632         ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
633     ubik_print("\n");
634
635     return 0;
636 }
637
638 void
639 printServerInfo()
640 {
641     struct ubik_server *ts;
642     int         i,j=1;
643
644     ubik_print("Local CellServDB:");
645     for ( ts=ubik_servers; ts; ts= ts->next, j++)
646     {
647         ubik_print("Server %d: ", j);
648         for ( i=0; (i<UBIK_MAX_INTERFACE_ADDR) && ts->addr[i]; i++)
649             ubik_print("%s ", afs_inet_ntoa(ts->addr[i]));
650     }
651     ubik_print("\n");
652 }
653
654 afs_int32 SDISK_SetVersion(rxcall, atid, oldversionp, newversionp)
655   struct rx_call      *rxcall;
656   struct ubik_tid     *atid;
657   struct ubik_version *oldversionp; 
658   struct ubik_version *newversionp; 
659 {
660   afs_int32               code=0;
661   struct ubik_dbase   *dbase;
662
663   if ((code = ubik_CheckAuth(rxcall))) {
664      return(code);
665   }
666
667   if (!ubik_currentTrans) {
668      return USYNC;
669   }
670   /* sanity check to make sure only write trans appear here */
671   if (ubik_currentTrans->type != UBIK_WRITETRANS) {
672      return UBADTYPE;
673   }
674
675   /* Should not get this for the sync site */
676   if (ubeacon_AmSyncSite()) {
677      return UDEADLOCK;
678   }
679
680   dbase = ubik_currentTrans->dbase;
681   DBHOLD(dbase);
682   urecovery_CheckTid(atid);
683   if (!ubik_currentTrans) {
684      DBRELE(dbase);
685      return USYNC;
686   }
687
688   /* Set the label if its version matches the sync-site's */
689   if ((oldversionp->epoch   == ubik_dbVersion.epoch) &&
690       (oldversionp->counter == ubik_dbVersion.counter)) {
691      code = (*dbase->setlabel) (ubik_dbase, 0, newversionp);
692      if (!code) {
693         ubik_dbase->version = *newversionp;
694         ubik_dbVersion      = *newversionp;
695      }
696   } else {
697      code = USYNC;
698   }
699
700   DBRELE(dbase);
701   return code;
702 }