613efbf4bc79713ac68b21389d9a042f130c4daf
[openafs.git] / src / mcas / replay.c
1 /******************************************************************************
2  * replay.c
3  *
4  * Replay the log output of search-structure runs.
5  * Must build set_harness.c with DO_WRITE_LOG defined.
6  *
7  * Copyright (c) 2002-2003, K A Fraser
8  *
9
10 Redistribution and use in source and binary forms, with or without
11 modification, are permitted provided that the following conditions are
12 met:
13
14     * Redistributions of source code must retain the above copyright
15     * notice, this list of conditions and the following disclaimer.
16     * Redistributions in binary form must reproduce the above
17     * copyright notice, this list of conditions and the following
18     * disclaimer in the documentation and/or other materials provided
19     * with the distribution.  Neither the name of the Keir Fraser
20     * nor the names of its contributors may be used to endorse or
21     * promote products derived from this software without specific
22     * prior written permission.
23
24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
27 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
28 OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
29 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
30 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
31 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
32 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
33 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
34 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <limits.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <assert.h>
43 #include <sys/types.h>
44 #include <sys/wait.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <unistd.h>
48
49 #include "portable_defns.h"
50
51 #define RMAX_THREADS 256
52 #define VERIFY_ORDERINGS
53
54 #define LOG_REPLAYED     (1<<26)
55 #define LOG_KEY_MASK     0xffffff
56
57 typedef struct log_st
58 {
59     interval_t   start, end;
60     unsigned int data;       /* key, and replay flag */
61     void *val, *old_val;     /* op changed mapping from old_val to val */
62 } log_t;
63
64 #define REPLAYED(_l) ((_l)->data & LOG_REPLAYED)
65
66 static log_t *global_log;
67 static int nr_threads, nr_updates, nr_keys;
68 static int *key_offsets;
69 static int *success;
70 static unsigned int next_key = 0;
71 static pthread_mutex_t key_lock;
72
73
74 /*
75  * GLOBAL LOGS SORTED ON:
76  *  1. Key value
77  *  2. Start time
78  *
79  * Replayer deals with each key value in turn.
80  */
81 static int compare(const void *t1, const void *t2)
82 {
83     const log_t *l1 = t1;
84     const log_t *l2 = t2;
85     const int k1 = l1->data & LOG_KEY_MASK;
86     const int k2 = l2->data & LOG_KEY_MASK;
87
88     if ( k1 < k2 ) return(-1);
89     if ( k1 > k2 ) return(+1);
90
91     if ( l1->start < l2->start ) return(-1);
92
93     return(+1);
94 }
95
96
97 static int do_op(log_t *log, void **key_state)
98 {
99     if ( REPLAYED(log) || (log->old_val != *key_state) ) return(0);
100     *key_state = log->val;
101     log->data |= LOG_REPLAYED;
102     return(1);
103 }
104
105
106 static void undo_op(log_t *log, void **key_state)
107 {
108     assert(REPLAYED(log));
109     log->data &= ~LOG_REPLAYED;
110     *key_state = log->old_val;
111 }
112
113
114 /* Sink down element @pos of @heap. */
115 static void down_heap(log_t **heap, int *heap_offsets, log_t *log, int pos)
116 {
117     int sz = (int)heap[0], nxt;
118     log_t *tmp;
119     while ( (nxt = (pos << 1)) <= sz )
120     {
121         if ( ((nxt+1) <= sz) && (heap[nxt+1]->end < heap[nxt]->end) ) nxt++;
122         if ( heap[nxt]->end > heap[pos]->end ) break;
123         heap_offsets[heap[pos] - log] = nxt;
124         heap_offsets[heap[nxt] - log] = pos;
125         tmp = heap[pos];
126         heap[pos] = heap[nxt];
127         heap[nxt] = tmp;
128         pos = nxt;
129     }
130 }
131
132 /* Float element @pos up @heap. */
133 static void up_heap(log_t **heap, int *heap_offsets, log_t *log, int pos)
134 {
135     log_t *tmp;
136     while ( pos > 1 )
137     {
138         if ( heap[pos]->end > heap[pos>>1]->end ) break;
139         heap_offsets[heap[pos]    - log] = pos >> 1;
140         heap_offsets[heap[pos>>1] - log] = pos;
141         tmp = heap[pos];
142         heap[pos]    = heap[pos>>1];
143         heap[pos>>1] = tmp;
144         pos >>= 1;
145     }
146 }
147
148
149 /* Delete @entry from @heap. */
150 static void remove_entry(log_t **heap, int *heap_offsets,
151                          log_t *log, log_t *entry)
152 {
153     int sz = (int)heap[0];
154     int pos = heap_offsets[entry - log];
155     heap_offsets[heap[sz] - log] = pos;
156     heap[pos] = heap[sz];
157     heap[0] = (void *)(--sz);
158     if ( (pos > 1) && (heap[pos]->end < heap[pos>>1]->end) )
159     {
160         up_heap(heap, heap_offsets, log, pos);
161     }
162     else
163     {
164         down_heap(heap, heap_offsets, log, pos);
165     }
166 }
167
168
169 /* Add new entry @new to @heap. */
170 static void add_entry(log_t **heap, int *heap_offsets, log_t *log, log_t *new)
171 {
172     int sz = (int)heap[0];
173     heap[0] = (void *)(++sz);
174     heap_offsets[new - log] = sz;
175     heap[sz] = new;
176     up_heap(heap, heap_offsets, log, sz);
177 }
178
179
180 /*
181  * This linearisation algorithm is a depth-first search of all feasible
182  * orderings. At each step, the next available operation is selected.
183  * The set of "available" operations is those which:
184  *  (1) have not already been selected on this search path
185  *  (2) are operations whose results are correct given current state
186  *      (eg. a failed delete couldn't be selected if the key is in the set!)
187  *  (3) have start times <= the earliest end time in the set.
188  * (1) ensures that each operation happens only once. (2) ensures that
189  * abstract state is consistent between operations. (3) ensures that time
190  * ordering is conserved.
191  */
192 static int linearise_ops_for_key(
193     log_t *log, int nr_items, log_t **stack,
194     log_t **cutoff_heap, int *heap_offsets, void **key_state)
195 {
196     int i;
197     log_t **sp = stack;
198     interval_t cutoff;
199
200     /* Construct cutoff heap. */
201     cutoff_heap[0] = (void *)nr_items;
202     for ( i = 0; i < nr_items; i++ )
203     {
204         cutoff_heap[i+1] = log + i;
205         heap_offsets[i]  = i+1;
206     }
207     for ( i = nr_items>>1; i > 0; i-- )
208     {
209         down_heap(cutoff_heap, heap_offsets, log, i);
210     }
211
212     cutoff = cutoff_heap[1]->end;
213
214     for ( i = 0; ; )
215     {
216         while ( (i < nr_items) && (log[i].start <= cutoff) )
217         {
218             if ( !do_op(&log[i], key_state) ) { i++; continue; }
219
220             *sp++ = &log[i];
221
222             /* Done? */
223             if ( (sp - stack) == nr_items ) goto success;
224
225             remove_entry(cutoff_heap, heap_offsets, log, &log[i]);
226             cutoff = cutoff_heap[1]->end;
227             i = 0;
228         }
229
230         /* Failure? */
231         if ( (sp - stack) == 0 )
232         {
233             for ( i = -3; i < nr_items + 3; i++ )
234             {
235 #if 1
236                 printf("%08x -> %08x -- %d: %08x -> %08x\n",
237                        (unsigned int)log[i].start,
238                        (unsigned int)log[i].end,
239                        log[i].data & LOG_KEY_MASK,
240                        (unsigned int)log[i].old_val,
241                        (unsigned int)log[i].val);
242 #endif
243             }
244             return(0);
245         }
246
247         i = *--sp - log;
248         undo_op(&log[i], key_state);
249         add_entry(cutoff_heap, heap_offsets, log, &log[i]);
250         cutoff = cutoff_heap[1]->end;
251         i++;
252     }
253
254  success:
255     return(1);
256 }
257
258
259 static void *thread_start(void *arg)
260 {
261     unsigned long tid = (unsigned long)arg;
262     unsigned int our_key;
263     int ch_start, ch_end, start, end, nr_items, *heap_offsets;
264     log_t **stack;
265     log_t **cutoff_heap;
266     interval_t cutoff;
267     void *key_state;
268 #ifdef VERIFY_ORDERINGS
269     int i;
270 #endif
271
272     stack        = malloc((nr_threads*nr_updates+1)*sizeof(log_t*));
273     cutoff_heap  = malloc((nr_threads*nr_updates+1)*sizeof(*cutoff_heap));
274     heap_offsets = malloc((nr_threads*nr_updates+1)*sizeof(*heap_offsets));
275     if ( !stack || !cutoff_heap || !heap_offsets )
276     {
277         fprintf(stderr, "Error allocating space for stacks\n");
278         return(NULL);
279     }
280
281  again:
282     pthread_mutex_lock(&key_lock);
283     our_key = next_key++;
284     pthread_mutex_unlock(&key_lock);
285     if ( our_key >= nr_keys ) goto out;
286
287     start    = key_offsets[our_key];
288     end      = key_offsets[our_key+1];
289     nr_items = end - start;
290
291     printf("[Thread %lu] ++ Linearising key %d (%d events)\n",
292            tid, our_key, nr_items);
293
294 #if 0
295     {
296         int i;
297         for ( i = start; i < end; i++ )
298         {
299             printf("%04d/%04d -- %08x -> %08x -- %d: %08x -> %08x\n",
300                    our_key, i - start,
301                    (unsigned int)global_log[i].start,
302                    (unsigned int)global_log[i].end,
303                    global_log[i].data & LOG_KEY_MASK,
304                    (unsigned int)global_log[i].old_val,
305                    (unsigned int)global_log[i].val);
306         }
307     }
308 #endif
309
310     /*
311      * We divide operations into independent chunks. A chunk is a maximal
312      * sequence of operations, ordered on start time, that does not
313      * overlap with any operation in any other chunk. Clearly, finding
314      * a linearisation for each chunk produces a total schedule.
315      */
316     success[our_key] = 1;
317     key_state        = 0;
318     for ( ch_start = start; ch_start < end; ch_start = ch_end )
319     {
320         cutoff = global_log[ch_start].end;
321         for ( ch_end = ch_start; ch_end < end; ch_end++ )
322         {
323             if ( global_log[ch_end].start > cutoff ) break;
324             if ( global_log[ch_end].end > cutoff )
325                 cutoff = global_log[ch_end].end;
326         }
327
328         /* Linearise chunk ch_start -> ch_end. */
329         success[our_key] = linearise_ops_for_key(
330             &global_log[ch_start],
331             ch_end - ch_start,
332             &stack[ch_start - start],
333             cutoff_heap,
334             heap_offsets,
335             &key_state);
336
337         if ( !success[our_key] )
338         {
339             printf("[Thread %lu] -- Linearisation FAILED for key %d\n",
340                    tid, our_key);
341             goto again;
342         }
343     }
344
345     printf("[Thread %lu] -- Linearisation %s for key %d\n",
346            tid, (success[our_key] ? "found" : "FAILED"), our_key);
347
348 #ifdef VERIFY_ORDERINGS
349     printf("[Thread %lu] ++ Verifying key %d\n", tid, our_key);
350     cutoff    = 0;
351     key_state = 0;
352     for ( i = 0; i < nr_items; i++ )
353     {
354         stack[i]->data &= ~LOG_REPLAYED; /* stop valid_op() from choking */
355         if ( !do_op(stack[i], &key_state) || (stack[i]->end < cutoff) )
356         {
357             int j;
358             fprintf(stderr, "\t*** INTERNAL ERROR: "
359                     "Assigned ordering is invalid!\n");
360             for ( j = (i < 2) ? 0 : (i-2); j < i+6; j++ )
361             {
362                 printf("%08x -> %08x -- %d: %08x -> %08x\n",
363                        (unsigned int)stack[j]->start,
364                        (unsigned int)stack[j]->end,
365                        stack[j]->data & LOG_KEY_MASK,
366                        (unsigned int)stack[j]->old_val,
367                        (unsigned int)stack[j]->val);
368             }
369             exit(-1);
370         }
371         if ( stack[i]->start > cutoff ) cutoff = stack[i]->start;
372     }
373     printf("[Thread %lu] -- Verified key %d\n", tid, our_key);
374 #endif
375
376     goto again;
377
378  out:
379     return(NULL);
380 }
381
382
383 int main(int argc, char **argv)
384 {
385     pthread_t thread[RMAX_THREADS];
386     int fd, i, j, failed = 0, nr_cpus;
387     unsigned long log_header[3];
388
389     if ( argc != 2 )
390     {
391         fprintf(stderr, "%s <log name>\n", argv[0]);
392         exit(1);
393     }
394
395     nr_cpus = (int)sysconf(_SC_NPROCESSORS_ONLN);
396     if ( nr_cpus > RMAX_THREADS ) nr_cpus = RMAX_THREADS;
397
398     if ( (fd = open(argv[1], O_RDONLY, 0)) == -1 )
399     {
400         fprintf(stderr, "Error opening log\n");
401         exit(-1);
402     }
403
404     /* Grok the log header. */
405     read(fd, log_header, sizeof(log_header));
406     nr_threads = log_header[0];
407     nr_updates = log_header[1];
408     nr_keys    = log_header[2];
409     printf("Read log header: nr_updates=%d, nr_threads=%d, nr_keys=%d\n",
410            nr_updates, nr_threads, nr_keys);
411
412     /* Allocate state for processing log entries. */
413     global_log  = malloc((nr_threads*nr_updates+1)*sizeof(log_t));
414     key_offsets = malloc((nr_keys+1)*sizeof(*key_offsets));
415     success     = malloc(nr_keys*sizeof(*success));
416     if ( !global_log || !key_offsets || !success )
417     {
418         fprintf(stderr, "Error allocating space for log\n");
419         exit(-1);
420     }
421
422     /* Read log entries, and sort into key and timestamp order. */
423     read(fd, global_log, nr_threads*nr_updates*sizeof(log_t));
424     global_log[nr_threads*nr_updates].data = LOG_KEY_MASK; /* sentinel */
425
426     printf("Sorting logs..."); fflush(stdout);
427     qsort(global_log, nr_threads*nr_updates, sizeof(log_t), compare);
428     printf(" done\n");
429
430     /* Find offsets of key regions in global table. */
431     key_offsets[0] = 0;
432     nr_keys        = 0;
433     for ( i = 0; i < (nr_threads * nr_updates); i = j )
434     {
435         j = i+1;
436         while ( (global_log[j].data & LOG_KEY_MASK) ==
437                 (global_log[i].data & LOG_KEY_MASK) ) j++;
438         key_offsets[++nr_keys] = j;
439     }
440
441     /* Set up a bunch of worker threads.... */
442     pthread_mutex_init(&key_lock, NULL);
443     for ( i = 0; i < nr_cpus; i++ )
444     {
445         if ( pthread_create(&thread[i], NULL, thread_start, (void *)i) )
446         {
447             fprintf(stderr, "Error creating thread %d (%d)\n", i, errno);
448             exit(1);
449         }
450     }
451
452     /* ...and wait for them all to complete. */
453     for ( i = 0; i < nr_cpus; i++ )
454     {
455         pthread_join(thread[i], NULL);
456     }
457
458     /* Summarise results from worker threads. */
459     for ( i = 0; i < nr_keys; i++ )
460     {
461         if ( success[i] ) continue;
462         printf("FAILED on key %d\n", i);
463         failed++;
464     }
465
466     if ( failed )
467     {
468         printf("Failed on %d keys\n", failed);
469         return(1);
470     }
471
472     printf("All assigned orderings are valid\n");
473     return(0);
474 }