1 /* $OpenBSD: rf_engine.c,v 1.15 2003/04/27 11:22:54 ho Exp $ */
2 /* $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $ */
3
4 /*
5 * Copyright (c) 1995 Carnegie-Mellon University.
6 * All rights reserved.
7 *
8 * Author: William V. Courtright II, Mark Holland, Rachad Youssef
9 *
10 * Permission to use, copy, modify and distribute this software and
11 * its documentation is hereby granted, provided that both the copyright
12 * notice and this permission notice appear in all copies of the
13 * software, derivative works or modified versions, and any portions
14 * thereof, and that both notices appear in supporting documentation.
15 *
16 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
17 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
18 * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
19 *
20 * Carnegie Mellon requests users of this software to return to
21 *
22 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
23 * School of Computer Science
24 * Carnegie Mellon University
25 * Pittsburgh PA 15213-3890
26 *
27 * any improvements or extensions that they make and grant Carnegie the
28 * rights to redistribute these changes.
29 */
30
31 /****************************************************************************
32 * *
33 * engine.c -- Code for DAG execution engine. *
34 * *
35 * Modified to work as follows (holland): *
36 * A user-thread calls into DispatchDAG, which fires off the nodes that *
37 * are direct successors to the header node. DispatchDAG then returns, *
38 * and the rest of the I/O continues asynchronously. As each node *
39 * completes, the node execution function calls FinishNode(). FinishNode *
40 * scans the list of successors to the node and increments the antecedent *
41 * counts. Each node that becomes enabled is placed on a central node *
42 * queue. A dedicated dag-execution thread grabs nodes off of this *
43 * queue and fires them. *
44 * *
45 * NULL nodes are never fired. *
46 * *
47 * Terminator nodes are never fired, but rather cause the callback *
48 * associated with the DAG to be invoked. *
49 * *
50 * If a node fails, the dag either rolls forward to the completion or *
51 * rolls back, undoing previously-completed nodes and fails atomically. *
52 * The direction of recovery is determined by the location of the failed *
53 * node in the graph. If the failure occurred before the commit node in *
54 * the graph, backward recovery is used. Otherwise, forward recovery is *
55 * used. *
56 * *
57 ****************************************************************************/
58
59 #include "rf_threadstuff.h"
60
61 #include <sys/errno.h>
62
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70
71 int rf_BranchDone(RF_DagNode_t *);
72 int rf_NodeReady(RF_DagNode_t *);
73 void rf_FireNode(RF_DagNode_t *);
74 void rf_FireNodeArray(int, RF_DagNode_t **);
75 void rf_FireNodeList(RF_DagNode_t *);
76 void rf_PropagateResults(RF_DagNode_t *, int);
77 void rf_ProcessNode(RF_DagNode_t *, int);
78
79 void rf_DAGExecutionThread(RF_ThreadArg_t);
80 #ifdef RAID_AUTOCONFIG
81 #define RF_ENGINE_PID 10
82 void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
83 extern pid_t lastpid;
84 #endif /* RAID_AUTOCONFIG */
85 void **rf_hook_cookies;
86 extern int numraid;
87
88 #define DO_INIT(_l_,_r_) \
89 do { \
90 int _rc; \
91 _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex); \
92 if (_rc) { \
93 return(_rc); \
94 } \
95 _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond); \
96 if (_rc) { \
97 return(_rc); \
98 } \
99 } while (0)
100
101 /*
102 * Synchronization primitives for this file. DO_WAIT should be enclosed
103 * in a while loop.
104 */
105
106 /*
107 * XXX Is this spl-ing really necessary ?
108 */
109 #define DO_LOCK(_r_) \
110 do { \
111 ks = splbio(); \
112 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
113 } while (0)
114
115 #define DO_UNLOCK(_r_) \
116 do { \
117 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
118 splx(ks); \
119 } while (0)
120
121 #define DO_WAIT(_r_) \
122 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
123
124 /* XXX RF_SIGNAL_COND? */
125 #define DO_SIGNAL(_r_) \
126 RF_BROADCAST_COND((_r_)->node_queue)
127
128 void rf_ShutdownEngine(void *);
129
130 void
131 rf_ShutdownEngine(void *arg)
132 {
133 RF_Raid_t *raidPtr;
134
135 raidPtr = (RF_Raid_t *) arg;
136 raidPtr->shutdown_engine = 1;
137 DO_SIGNAL(raidPtr);
138 }
139
140 int
141 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
142 RF_Config_t *cfgPtr)
143 {
144 int rc;
145 char raidname[16];
146
147 DO_INIT(listp, raidPtr);
148
149 raidPtr->node_queue = NULL;
150 raidPtr->dags_in_flight = 0;
151
152 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
153 if (rc)
154 return (rc);
155
156 /*
157 * We create the execution thread only once per system boot. No need
158 * to check return code b/c the kernel panics if it can't create the
159 * thread.
160 */
161 if (rf_engineDebug) {
162 printf("raid%d: %s engine thread\n", raidPtr->raidid,
163 (initproc)?"Starting":"Creating");
164 }
165 if (rf_hook_cookies == NULL) {
166 rf_hook_cookies =
167 malloc(numraid * sizeof(void *),
168 M_RAIDFRAME, M_NOWAIT);
169 if (rf_hook_cookies == NULL)
170 return (ENOMEM);
171 bzero(rf_hook_cookies, numraid * sizeof(void *));
172 }
173 #ifdef RAID_AUTOCONFIG
174 if (initproc == NULL) {
175 rf_hook_cookies[raidPtr->raidid] =
176 startuphook_establish(rf_DAGExecutionThread_pre,
177 raidPtr);
178 } else {
179 #endif /* RAID_AUTOCONFIG */
180 snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
181 if (RF_CREATE_THREAD(raidPtr->engine_thread,
182 rf_DAGExecutionThread, raidPtr, &raidname[0])) {
183 RF_ERRORMSG("RAIDFRAME: Unable to start engine"
184 " thread\n");
185 return (ENOMEM);
186 }
187 if (rf_engineDebug) {
188 printf("raid%d: Engine thread started\n",
189 raidPtr->raidid);
190 }
191 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
192 #ifdef RAID_AUTOCONFIG
193 }
194 #endif
195 /* XXX Something is missing here... */
196 #ifdef debug
197 printf("Skipping the WAIT_START !!!\n");
198 #endif
199 /* Engine thread is now running and waiting for work. */
200 if (rf_engineDebug) {
201 printf("raid%d: Engine thread running and waiting for events\n",
202 raidPtr->raidid);
203 }
204 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
205 if (rc) {
206 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
207 " rc=%d\n", __FILE__, __LINE__, rc);
208 rf_ShutdownEngine(NULL);
209 }
210 return (rc);
211 }
212
213 int
214 rf_BranchDone(RF_DagNode_t *node)
215 {
216 int i;
217
218 /*
219 * Return true if forward execution is completed for a node and it's
220 * succedents.
221 */
222 switch (node->status) {
223 case rf_wait:
224 /* Should never be called in this state. */
225 RF_PANIC();
226 break;
227 case rf_fired:
228 /* Node is currently executing, so we're not done. */
229 return (RF_FALSE);
230 case rf_good:
231 /* For each succedent. */
232 for (i = 0; i < node->numSuccedents; i++)
233 /* Recursively check branch. */
234 if (!rf_BranchDone(node->succedents[i]))
235 return RF_FALSE;
236
237 return RF_TRUE; /*
238 * Node and all succedent branches aren't in
239 * fired state.
240 */
241 break;
242 case rf_bad:
243 /* Succedents can't fire. */
244 return (RF_TRUE);
245 case rf_recover:
246 /* Should never be called in this state. */
247 RF_PANIC();
248 break;
249 case rf_undone:
250 case rf_panic:
251 /* XXX Need to fix this case. */
252 /* For now, assume that we're done. */
253 return (RF_TRUE);
254 break;
255 default:
256 /* Illegal node status. */
257 RF_PANIC();
258 break;
259 }
260 }
261
262 int
263 rf_NodeReady(RF_DagNode_t *node)
264 {
265 int ready;
266
267 switch (node->dagHdr->status) {
268 case rf_enable:
269 case rf_rollForward:
270 if ((node->status == rf_wait) &&
271 (node->numAntecedents == node->numAntDone))
272 ready = RF_TRUE;
273 else
274 ready = RF_FALSE;
275 break;
276 case rf_rollBackward:
277 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
278 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
279 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
280 if ((node->status == rf_good) &&
281 (node->numSuccDone == node->numSuccedents))
282 ready = RF_TRUE;
283 else
284 ready = RF_FALSE;
285 break;
286 default:
287 printf("Execution engine found illegal DAG status"
288 " in rf_NodeReady\n");
289 RF_PANIC();
290 break;
291 }
292
293 return (ready);
294 }
295
296
297 /*
298 * User context and dag-exec-thread context:
299 * Fire a node. The node's status field determines which function, do or undo,
300 * to be fired.
301 * This routine assumes that the node's status field has alread been set to
302 * "fired" or "recover" to indicate the direction of execution.
303 */
304 void
305 rf_FireNode(RF_DagNode_t *node)
306 {
307 switch (node->status) {
308 case rf_fired:
309 /* Fire the do function of a node. */
310 if (rf_engineDebug>1) {
311 printf("raid%d: Firing node 0x%lx (%s)\n",
312 node->dagHdr->raidPtr->raidid,
313 (unsigned long) node, node->name);
314 }
315 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
316 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
317 /* thread_block(); */
318 /* printf("Need to block the thread here...\n"); */
319 /*
320 * XXX thread_block is actually mentioned in
321 * /usr/include/vm/vm_extern.h
322 */
323 #else
324 thread_block();
325 #endif
326 }
327 (*(node->doFunc)) (node);
328 break;
329 case rf_recover:
330 /* Fire the undo function of a node. */
331 if (rf_engineDebug>1) {
332 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
333 node->dagHdr->raidPtr->raidid,
334 (unsigned long) node, node->name);
335 }
336 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
337 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
338 /* thread_block(); */
339 /* printf("Need to block the thread here...\n"); */
340 /*
341 * XXX thread_block is actually mentioned in
342 * /usr/include/vm/vm_extern.h
343 */
344 #else
345 thread_block();
346 #endif
347 }
348 (*(node->undoFunc)) (node);
349 break;
350 default:
351 RF_PANIC();
352 break;
353 }
354 }
355
356
357 /*
358 * User context:
359 * Attempt to fire each node in a linear array.
360 * The entire list is fired atomically.
361 */
362 void
363 rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
364 {
365 RF_DagStatus_t dstat;
366 RF_DagNode_t *node;
367 int i, j;
368
369 /* First, mark all nodes which are ready to be fired. */
370 for (i = 0; i < numNodes; i++) {
371 node = nodeList[i];
372 dstat = node->dagHdr->status;
373 RF_ASSERT((node->status == rf_wait) ||
374 (node->status == rf_good));
375 if (rf_NodeReady(node)) {
376 if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
377 RF_ASSERT(node->status == rf_wait);
378 if (node->commitNode)
379 node->dagHdr->numCommits++;
380 node->status = rf_fired;
381 for (j = 0; j < node->numAntecedents; j++)
382 node->antecedents[j]->numSuccFired++;
383 } else {
384 RF_ASSERT(dstat == rf_rollBackward);
385 RF_ASSERT(node->status == rf_good);
386 /* Only one commit node per graph. */
387 RF_ASSERT(node->commitNode == RF_FALSE);
388 node->status = rf_recover;
389 }
390 }
391 }
392 /* Now, fire the nodes. */
393 for (i = 0; i < numNodes; i++) {
394 if ((nodeList[i]->status == rf_fired) ||
395 (nodeList[i]->status == rf_recover))
396 rf_FireNode(nodeList[i]);
397 }
398 }
399
400
401 /*
402 * User context:
403 * Attempt to fire each node in a linked list.
404 * The entire list is fired atomically.
405 */
406 void
407 rf_FireNodeList(RF_DagNode_t *nodeList)
408 {
409 RF_DagNode_t *node, *next;
410 RF_DagStatus_t dstat;
411 int j;
412
413 if (nodeList) {
414 /* First, mark all nodes which are ready to be fired. */
415 for (node = nodeList; node; node = next) {
416 next = node->next;
417 dstat = node->dagHdr->status;
418 RF_ASSERT((node->status == rf_wait) ||
419 (node->status == rf_good));
420 if (rf_NodeReady(node)) {
421 if ((dstat == rf_enable) ||
422 (dstat == rf_rollForward)) {
423 RF_ASSERT(node->status == rf_wait);
424 if (node->commitNode)
425 node->dagHdr->numCommits++;
426 node->status = rf_fired;
427 for (j = 0; j < node->numAntecedents;
428 j++)
429 node->antecedents[j]
430 ->numSuccFired++;
431 } else {
432 RF_ASSERT(dstat == rf_rollBackward);
433 RF_ASSERT(node->status == rf_good);
434 /* Only one commit node per graph. */
435 RF_ASSERT(node->commitNode == RF_FALSE);
436 node->status = rf_recover;
437 }
438 }
439 }
440 /* Now, fire the nodes. */
441 for (node = nodeList; node; node = next) {
442 next = node->next;
443 if ((node->status == rf_fired) ||
444 (node->status == rf_recover))
445 rf_FireNode(node);
446 }
447 }
448 }
449
450
451 /*
452 * Interrupt context:
453 * For each succedent,
454 * propagate required results from node to succedent.
455 * increment succedent's numAntDone.
456 * place newly-enable nodes on node queue for firing.
457 *
458 * To save context switches, we don't place NIL nodes on the node queue,
459 * but rather just process them as if they had fired. Note that NIL nodes
460 * that are the direct successors of the header will actually get fired by
461 * DispatchDAG, which is fine because no context switches are involved.
462 *
463 * Important: when running at user level, this can be called by any
464 * disk thread, and so the increment and check of the antecedent count
465 * must be locked. I used the node queue mutex and locked down the
466 * entire function, but this is certainly overkill.
467 */
468 void
469 rf_PropagateResults(RF_DagNode_t *node, int context)
470 {
471 RF_DagNode_t *s, *a;
472 RF_Raid_t *raidPtr;
473 int i, ks;
474 /* A list of NIL nodes to be finished. */
475 RF_DagNode_t *finishlist = NULL;
476 /* List of nodes with failed truedata antecedents. */
477 RF_DagNode_t *skiplist = NULL;
478 RF_DagNode_t *firelist = NULL; /* A list of nodes to be fired. */
479 RF_DagNode_t *q = NULL, *qh = NULL, *next;
480 int j, skipNode;
481
482 raidPtr = node->dagHdr->raidPtr;
483
484 DO_LOCK(raidPtr);
485
486 /* Debug - validate fire counts. */
487 for (i = 0; i < node->numAntecedents; i++) {
488 a = *(node->antecedents + i);
489 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
490 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
491 a->numSuccDone++;
492 }
493
494 switch (node->dagHdr->status) {
495 case rf_enable:
496 case rf_rollForward:
497 for (i = 0; i < node->numSuccedents; i++) {
498 s = *(node->succedents + i);
499 RF_ASSERT(s->status == rf_wait);
500 (s->numAntDone)++;
501 if (s->numAntDone == s->numAntecedents) {
502 /* Look for NIL nodes. */
503 if (s->doFunc == rf_NullNodeFunc) {
504 /*
505 * Don't fire NIL nodes, just process
506 * them.
507 */
508 s->next = finishlist;
509 finishlist = s;
510 } else {
511 /*
512 * Look to see if the node is to be
513 * skipped.
514 */
515 skipNode = RF_FALSE;
516 for (j = 0; j < s->numAntecedents; j++)
517 if ((s->antType[j] ==
518 rf_trueData) &&
519 (s->antecedents[j]->status
520 == rf_bad))
521 skipNode = RF_TRUE;
522 if (skipNode) {
523 /*
524 * This node has one or more
525 * failed true data
526 * dependencies, so skip it.
527 */
528 s->next = skiplist;
529 skiplist = s;
530 } else {
531 /*
532 * Add s to list of nodes (q)
533 * to execute.
534 */
535 if (context != RF_INTR_CONTEXT)
536 {
537 /*
538 * We only have to
539 * enqueue if we're at
540 * intr context.
541 */
542 /*
543 * Put node on a list to
544 * be fired after we
545 * unlock.
546 */
547 s->next = firelist;
548 firelist = s;
549 } else {
550 /*
551 * Enqueue the node for
552 * the dag exec thread
553 * to fire.
554 */
555 RF_ASSERT(rf_NodeReady(s));
556 if (q) {
557 q->next = s;
558 q = s;
559 } else {
560 qh = q = s;
561 qh->next = NULL;
562 }
563 }
564 }
565 }
566 }
567 }
568
569 if (q) {
570 /*
571 * Transfer our local list of nodes to the node
572 * queue.
573 */
574 q->next = raidPtr->node_queue;
575 raidPtr->node_queue = qh;
576 DO_SIGNAL(raidPtr);
577 }
578 DO_UNLOCK(raidPtr);
579
580 for (; skiplist; skiplist = next) {
581 next = skiplist->next;
582 skiplist->status = rf_skipped;
583 for (i = 0; i < skiplist->numAntecedents; i++) {
584 skiplist->antecedents[i]->numSuccFired++;
585 }
586 if (skiplist->commitNode) {
587 skiplist->dagHdr->numCommits++;
588 }
589 rf_FinishNode(skiplist, context);
590 }
591 for (; finishlist; finishlist = next) {
592 /* NIL nodes: no need to fire them. */
593 next = finishlist->next;
594 finishlist->status = rf_good;
595 for (i = 0; i < finishlist->numAntecedents; i++) {
596 finishlist->antecedents[i]->numSuccFired++;
597 }
598 if (finishlist->commitNode)
599 finishlist->dagHdr->numCommits++;
600 /*
601 * Okay, here we're calling rf_FinishNode() on nodes
602 * that have the null function as their work proc.
603 * Such a node could be the terminal node in a DAG.
604 * If so, it will cause the DAG to complete, which will
605 * in turn free memory used by the DAG, which includes
606 * the node in question.
607 * Thus, we must avoid referencing the node at all
608 * after calling rf_FinishNode() on it.
609 */
610 /* Recursive call. */
611 rf_FinishNode(finishlist, context);
612 }
613 /* Fire all nodes in firelist. */
614 rf_FireNodeList(firelist);
615 break;
616
617 case rf_rollBackward:
618 for (i = 0; i < node->numAntecedents; i++) {
619 a = *(node->antecedents + i);
620 RF_ASSERT(a->status == rf_good);
621 RF_ASSERT(a->numSuccDone <= a->numSuccedents);
622 RF_ASSERT(a->numSuccDone <= a->numSuccFired);
623
624 if (a->numSuccDone == a->numSuccFired) {
625 if (a->undoFunc == rf_NullNodeFunc) {
626 /*
627 * Don't fire NIL nodes, just process
628 * them.
629 */
630 a->next = finishlist;
631 finishlist = a;
632 } else {
633 if (context != RF_INTR_CONTEXT) {
634 /*
635 * We only have to enqueue if
636 * we're at intr context.
637 */
638 /*
639 * Put node on a list to
640 * be fired after we
641 * unlock.
642 */
643 a->next = firelist;
644 firelist = a;
645 } else {
646 /*
647 * Enqueue the node for
648 * the dag exec thread
649 * to fire.
650 */
651 RF_ASSERT(rf_NodeReady(a));
652 if (q) {
653 q->next = a;
654 q = a;
655 } else {
656 qh = q = a;
657 qh->next = NULL;
658 }
659 }
660 }
661 }
662 }
663 if (q) {
664 /*
665 * Transfer our local list of nodes to the node
666 * queue.
667 */
668 q->next = raidPtr->node_queue;
669 raidPtr->node_queue = qh;
670 DO_SIGNAL(raidPtr);
671 }
672 DO_UNLOCK(raidPtr);
673 for (; finishlist; finishlist = next) {
674 /* NIL nodes: no need to fire them. */
675 next = finishlist->next;
676 finishlist->status = rf_good;
677 /*
678 * Okay, here we're calling rf_FinishNode() on nodes
679 * that have the null function as their work proc.
680 * Such a node could be the first node in a DAG.
681 * If so, it will cause the DAG to complete, which will
682 * in turn free memory used by the DAG, which includes
683 * the node in question.
684 * Thus, we must avoid referencing the node at all
685 * after calling rf_FinishNode() on it.
686 */
687 rf_FinishNode(finishlist, context);
688 /* Recursive call. */
689 }
690 /* Fire all nodes in firelist. */
691 rf_FireNodeList(firelist);
692
693 break;
694 default:
695 printf("Engine found illegal DAG status in"
696 " rf_PropagateResults()\n");
697 RF_PANIC();
698 break;
699 }
700 }
701
702
703 /*
704 * Process a fired node which has completed.
705 */
706 void
707 rf_ProcessNode(RF_DagNode_t *node, int context)
708 {
709 RF_Raid_t *raidPtr;
710
711 raidPtr = node->dagHdr->raidPtr;
712
713 switch (node->status) {
714 case rf_good:
715 /* Normal case, don't need to do anything. */
716 break;
717 case rf_bad:
718 if ((node->dagHdr->numCommits > 0) ||
719 (node->dagHdr->numCommitNodes == 0)) {
720 /* Crossed commit barrier. */
721 node->dagHdr->status = rf_rollForward;
722 if (rf_engineDebug || 1) {
723 printf("raid%d: node (%s) returned fail,"
724 " rolling forward\n", raidPtr->raidid,
725 node->name);
726 }
727 } else {
728 /* Never reached commit barrier. */
729 node->dagHdr->status = rf_rollBackward;
730 if (rf_engineDebug || 1) {
731 printf("raid%d: node (%s) returned fail,"
732 " rolling backward\n", raidPtr->raidid,
733 node->name);
734 }
735 }
736 break;
737 case rf_undone:
738 /* Normal rollBackward case, don't need to do anything. */
739 break;
740 case rf_panic:
741 /* An undo node failed !!! */
742 printf("UNDO of a node failed !!!/n");
743 break;
744 default:
745 printf("node finished execution with an illegal status !!!\n");
746 RF_PANIC();
747 break;
748 }
749
750 /*
751 * Enqueue node's succedents (antecedents if rollBackward) for
752 * execution.
753 */
754 rf_PropagateResults(node, context);
755 }
756
757
758 /*
759 * User context or dag-exec-thread context:
760 * This is the first step in post-processing a newly-completed node.
761 * This routine is called by each node execution function to mark the node
762 * as complete and fire off any successors that have been enabled.
763 */
764 int
765 rf_FinishNode(RF_DagNode_t *node, int context)
766 {
767 /* As far as I can tell, retcode is not used -wvcii. */
768 int retcode = RF_FALSE;
769 node->dagHdr->numNodesCompleted++;
770 rf_ProcessNode(node, context);
771
772 return (retcode);
773 }
774
775
776 /*
777 * User context:
778 * Submit dag for execution, return non-zero if we have to wait for completion.
779 * If and only if we return non-zero, we'll cause cbFunc to get invoked with
780 * cbArg when the DAG has completed.
781 *
782 * For now we always return 1. If the DAG does not cause any I/O, then the
783 * callback may get invoked before DispatchDAG returns. There's code in state
784 * 5 of ContinueRaidAccess to handle this.
785 *
786 * All we do here is fire the direct successors of the header node. The DAG
787 * execution thread does the rest of the dag processing.
788 */
789 int
790 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
791 {
792 RF_Raid_t *raidPtr;
793
794 raidPtr = dag->raidPtr;
795 if (dag->tracerec) {
796 RF_ETIMER_START(dag->tracerec->timer);
797 }
798 if (rf_engineDebug || rf_validateDAGDebug) {
799 if (rf_ValidateDAG(dag))
800 RF_PANIC();
801 }
802 if (rf_engineDebug>1) {
803 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
804 }
805 raidPtr->dags_in_flight++; /*
806 * Debug only: blow off proper
807 * locking.
808 */
809 dag->cbFunc = cbFunc;
810 dag->cbArg = cbArg;
811 dag->numNodesCompleted = 0;
812 dag->status = rf_enable;
813 rf_FireNodeArray(dag->numSuccedents, dag->succedents);
814 return (1);
815 }
816
817
818 /*
819 * Dedicated kernel thread:
820 * The thread that handles all DAG node firing.
821 * To minimize locking and unlocking, we grab a copy of the entire node queue
822 * and then set the node queue to NULL before doing any firing of nodes.
823 * This way we only have to release the lock once. Of course, it's probably
824 * rare that there's more than one node in the queue at any one time, but it
825 * sometimes happens.
826 *
827 * In the kernel, this thread runs at spl0 and is not swappable. I copied these
828 * characteristics from the aio_completion_thread.
829 */
830
831 #ifdef RAID_AUTOCONFIG
832 void
833 rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
834 {
835 RF_Raid_t *raidPtr;
836 char raidname[16];
837 pid_t oldpid = lastpid;
838
839 raidPtr = (RF_Raid_t *) arg;
840
841 if (rf_engineDebug) {
842 printf("raid%d: Starting engine thread\n", raidPtr->raidid);
843 }
844
845 lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
846 snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
847
848 if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
849 raidPtr, &raidname[0])) {
850 RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
851 return;
852 }
853
854 lastpid = oldpid;
855 if (rf_engineDebug) {
856 printf("raid%d: Engine thread started\n", raidPtr->raidid);
857 }
858 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
859 }
860 #endif /* RAID_AUTOCONFIG */
861
862 void
863 rf_DAGExecutionThread(RF_ThreadArg_t arg)
864 {
865 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
866 RF_Raid_t *raidPtr;
867 int ks;
868 int s;
869
870 raidPtr = (RF_Raid_t *) arg;
871
872 while (!(&raidPtr->engine_tg)->created)
873 (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
874 "raidinit", 0);
875
876 if (rf_engineDebug) {
877 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
878 }
879 /* XXX What to put here ? XXX */
880
881 s = splbio();
882
883 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
884
885 rf_hook_cookies[raidPtr->raidid] =
886 shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
887
888 DO_LOCK(raidPtr);
889 while (!raidPtr->shutdown_engine) {
890
891 while (raidPtr->node_queue != NULL) {
892 local_nq = raidPtr->node_queue;
893 fire_nq = NULL;
894 term_nq = NULL;
895 raidPtr->node_queue = NULL;
896 DO_UNLOCK(raidPtr);
897
898 /* First, strip out the terminal nodes. */
899 while (local_nq) {
900 nd = local_nq;
901 local_nq = local_nq->next;
902 switch (nd->dagHdr->status) {
903 case rf_enable:
904 case rf_rollForward:
905 if (nd->numSuccedents == 0) {
906 /*
907 * End of the dag, add to
908 * callback list.
909 */
910 nd->next = term_nq;
911 term_nq = nd;
912 } else {
913 /*
914 * Not the end, add to the
915 * fire queue.
916 */
917 nd->next = fire_nq;
918 fire_nq = nd;
919 }
920 break;
921 case rf_rollBackward:
922 if (nd->numAntecedents == 0) {
923 /*
924 * End of the dag, add to the
925 * callback list.
926 */
927 nd->next = term_nq;
928 term_nq = nd;
929 } else {
930 /*
931 * Not the end, add to the
932 * fire queue.
933 */
934 nd->next = fire_nq;
935 fire_nq = nd;
936 }
937 break;
938 default:
939 RF_PANIC();
940 break;
941 }
942 }
943
944 /*
945 * Execute callback of dags which have reached the
946 * terminal node.
947 */
948 while (term_nq) {
949 nd = term_nq;
950 term_nq = term_nq->next;
951 nd->next = NULL;
952 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
953 raidPtr->dags_in_flight--; /* Debug only. */
954 }
955
956 /* Fire remaining nodes. */
957 rf_FireNodeList(fire_nq);
958
959 DO_LOCK(raidPtr);
960 }
961 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
962 DO_WAIT(raidPtr);
963 }
964 DO_UNLOCK(raidPtr);
965
966 if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
967 shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
968 rf_hook_cookies[raidPtr->raidid] = NULL;
969 }
970
971 RF_THREADGROUP_DONE(&raidPtr->engine_tg);
972
973 splx(s);
974 kthread_exit(0);
975 }