This source file includes following definitions.
- rf_ShutdownEngine
- rf_ConfigureEngine
- rf_BranchDone
- rf_NodeReady
- rf_FireNode
- rf_FireNodeArray
- rf_FireNodeList
- rf_PropagateResults
- rf_ProcessNode
- rf_FinishNode
- rf_DispatchDAG
- rf_DAGExecutionThread_pre
- rf_DAGExecutionThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 #include "rf_threadstuff.h"
60
61 #include <sys/errno.h>
62
63 #include "rf_dag.h"
64 #include "rf_engine.h"
65 #include "rf_etimer.h"
66 #include "rf_general.h"
67 #include "rf_dagutils.h"
68 #include "rf_shutdown.h"
69 #include "rf_raid.h"
70
71 int rf_BranchDone(RF_DagNode_t *);
72 int rf_NodeReady(RF_DagNode_t *);
73 void rf_FireNode(RF_DagNode_t *);
74 void rf_FireNodeArray(int, RF_DagNode_t **);
75 void rf_FireNodeList(RF_DagNode_t *);
76 void rf_PropagateResults(RF_DagNode_t *, int);
77 void rf_ProcessNode(RF_DagNode_t *, int);
78
79 void rf_DAGExecutionThread(RF_ThreadArg_t);
80 #ifdef RAID_AUTOCONFIG
81 #define RF_ENGINE_PID 10
82 void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
83 extern pid_t lastpid;
84 #endif
85 void **rf_hook_cookies;
86 extern int numraid;
87
88 #define DO_INIT(_l_,_r_) \
89 do { \
90 int _rc; \
91 _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex); \
92 if (_rc) { \
93 return(_rc); \
94 } \
95 _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond); \
96 if (_rc) { \
97 return(_rc); \
98 } \
99 } while (0)
100
101
102
103
104
105
106
107
108
109 #define DO_LOCK(_r_) \
110 do { \
111 ks = splbio(); \
112 RF_LOCK_MUTEX((_r_)->node_queue_mutex); \
113 } while (0)
114
115 #define DO_UNLOCK(_r_) \
116 do { \
117 RF_UNLOCK_MUTEX((_r_)->node_queue_mutex); \
118 splx(ks); \
119 } while (0)
120
121 #define DO_WAIT(_r_) \
122 RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
123
124
125 #define DO_SIGNAL(_r_) \
126 RF_BROADCAST_COND((_r_)->node_queue)
127
128 void rf_ShutdownEngine(void *);
129
130 void
131 rf_ShutdownEngine(void *arg)
132 {
133 RF_Raid_t *raidPtr;
134
135 raidPtr = (RF_Raid_t *) arg;
136 raidPtr->shutdown_engine = 1;
137 DO_SIGNAL(raidPtr);
138 }
139
140 int
141 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
142 RF_Config_t *cfgPtr)
143 {
144 int rc;
145 char raidname[16];
146
147 DO_INIT(listp, raidPtr);
148
149 raidPtr->node_queue = NULL;
150 raidPtr->dags_in_flight = 0;
151
152 rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
153 if (rc)
154 return (rc);
155
156
157
158
159
160
161 if (rf_engineDebug) {
162 printf("raid%d: %s engine thread\n", raidPtr->raidid,
163 (initproc)?"Starting":"Creating");
164 }
165 if (rf_hook_cookies == NULL) {
166 rf_hook_cookies =
167 malloc(numraid * sizeof(void *),
168 M_RAIDFRAME, M_NOWAIT);
169 if (rf_hook_cookies == NULL)
170 return (ENOMEM);
171 bzero(rf_hook_cookies, numraid * sizeof(void *));
172 }
173 #ifdef RAID_AUTOCONFIG
174 if (initproc == NULL) {
175 rf_hook_cookies[raidPtr->raidid] =
176 startuphook_establish(rf_DAGExecutionThread_pre,
177 raidPtr);
178 } else {
179 #endif
180 snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
181 if (RF_CREATE_THREAD(raidPtr->engine_thread,
182 rf_DAGExecutionThread, raidPtr, &raidname[0])) {
183 RF_ERRORMSG("RAIDFRAME: Unable to start engine"
184 " thread\n");
185 return (ENOMEM);
186 }
187 if (rf_engineDebug) {
188 printf("raid%d: Engine thread started\n",
189 raidPtr->raidid);
190 }
191 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
192 #ifdef RAID_AUTOCONFIG
193 }
194 #endif
195
196 #ifdef debug
197 printf("Skipping the WAIT_START !!!\n");
198 #endif
199
200 if (rf_engineDebug) {
201 printf("raid%d: Engine thread running and waiting for events\n",
202 raidPtr->raidid);
203 }
204 rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
205 if (rc) {
206 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
207 " rc=%d\n", __FILE__, __LINE__, rc);
208 rf_ShutdownEngine(NULL);
209 }
210 return (rc);
211 }
212
213 int
214 rf_BranchDone(RF_DagNode_t *node)
215 {
216 int i;
217
218
219
220
221
222 switch (node->status) {
223 case rf_wait:
224
225 RF_PANIC();
226 break;
227 case rf_fired:
228
229 return (RF_FALSE);
230 case rf_good:
231
232 for (i = 0; i < node->numSuccedents; i++)
233
234 if (!rf_BranchDone(node->succedents[i]))
235 return RF_FALSE;
236
237 return RF_TRUE;
238
239
240
241 break;
242 case rf_bad:
243
244 return (RF_TRUE);
245 case rf_recover:
246
247 RF_PANIC();
248 break;
249 case rf_undone:
250 case rf_panic:
251
252
253 return (RF_TRUE);
254 break;
255 default:
256
257 RF_PANIC();
258 break;
259 }
260 }
261
262 int
263 rf_NodeReady(RF_DagNode_t *node)
264 {
265 int ready;
266
267 switch (node->dagHdr->status) {
268 case rf_enable:
269 case rf_rollForward:
270 if ((node->status == rf_wait) &&
271 (node->numAntecedents == node->numAntDone))
272 ready = RF_TRUE;
273 else
274 ready = RF_FALSE;
275 break;
276 case rf_rollBackward:
277 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
278 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
279 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
280 if ((node->status == rf_good) &&
281 (node->numSuccDone == node->numSuccedents))
282 ready = RF_TRUE;
283 else
284 ready = RF_FALSE;
285 break;
286 default:
287 printf("Execution engine found illegal DAG status"
288 " in rf_NodeReady\n");
289 RF_PANIC();
290 break;
291 }
292
293 return (ready);
294 }
295
296
297
298
299
300
301
302
303
304 void
305 rf_FireNode(RF_DagNode_t *node)
306 {
307 switch (node->status) {
308 case rf_fired:
309
310 if (rf_engineDebug>1) {
311 printf("raid%d: Firing node 0x%lx (%s)\n",
312 node->dagHdr->raidPtr->raidid,
313 (unsigned long) node, node->name);
314 }
315 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
316 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
317
318
319
320
321
322
323 #else
324 thread_block();
325 #endif
326 }
327 (*(node->doFunc)) (node);
328 break;
329 case rf_recover:
330
331 if (rf_engineDebug>1) {
332 printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
333 node->dagHdr->raidPtr->raidid,
334 (unsigned long) node, node->name);
335 }
336 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
337 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
338
339
340
341
342
343
344 #else
345 thread_block();
346 #endif
347 }
348 (*(node->undoFunc)) (node);
349 break;
350 default:
351 RF_PANIC();
352 break;
353 }
354 }
355
356
357
358
359
360
361
362 void
363 rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
364 {
365 RF_DagStatus_t dstat;
366 RF_DagNode_t *node;
367 int i, j;
368
369
370 for (i = 0; i < numNodes; i++) {
371 node = nodeList[i];
372 dstat = node->dagHdr->status;
373 RF_ASSERT((node->status == rf_wait) ||
374 (node->status == rf_good));
375 if (rf_NodeReady(node)) {
376 if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
377 RF_ASSERT(node->status == rf_wait);
378 if (node->commitNode)
379 node->dagHdr->numCommits++;
380 node->status = rf_fired;
381 for (j = 0; j < node->numAntecedents; j++)
382 node->antecedents[j]->numSuccFired++;
383 } else {
384 RF_ASSERT(dstat == rf_rollBackward);
385 RF_ASSERT(node->status == rf_good);
386
387 RF_ASSERT(node->commitNode == RF_FALSE);
388 node->status = rf_recover;
389 }
390 }
391 }
392
393 for (i = 0; i < numNodes; i++) {
394 if ((nodeList[i]->status == rf_fired) ||
395 (nodeList[i]->status == rf_recover))
396 rf_FireNode(nodeList[i]);
397 }
398 }
399
400
401
402
403
404
405
406 void
407 rf_FireNodeList(RF_DagNode_t *nodeList)
408 {
409 RF_DagNode_t *node, *next;
410 RF_DagStatus_t dstat;
411 int j;
412
413 if (nodeList) {
414
415 for (node = nodeList; node; node = next) {
416 next = node->next;
417 dstat = node->dagHdr->status;
418 RF_ASSERT((node->status == rf_wait) ||
419 (node->status == rf_good));
420 if (rf_NodeReady(node)) {
421 if ((dstat == rf_enable) ||
422 (dstat == rf_rollForward)) {
423 RF_ASSERT(node->status == rf_wait);
424 if (node->commitNode)
425 node->dagHdr->numCommits++;
426 node->status = rf_fired;
427 for (j = 0; j < node->numAntecedents;
428 j++)
429 node->antecedents[j]
430 ->numSuccFired++;
431 } else {
432 RF_ASSERT(dstat == rf_rollBackward);
433 RF_ASSERT(node->status == rf_good);
434
435 RF_ASSERT(node->commitNode == RF_FALSE);
436 node->status = rf_recover;
437 }
438 }
439 }
440
441 for (node = nodeList; node; node = next) {
442 next = node->next;
443 if ((node->status == rf_fired) ||
444 (node->status == rf_recover))
445 rf_FireNode(node);
446 }
447 }
448 }
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468 void
469 rf_PropagateResults(RF_DagNode_t *node, int context)
470 {
471 RF_DagNode_t *s, *a;
472 RF_Raid_t *raidPtr;
473 int i, ks;
474
475 RF_DagNode_t *finishlist = NULL;
476
477 RF_DagNode_t *skiplist = NULL;
478 RF_DagNode_t *firelist = NULL;
479 RF_DagNode_t *q = NULL, *qh = NULL, *next;
480 int j, skipNode;
481
482 raidPtr = node->dagHdr->raidPtr;
483
484 DO_LOCK(raidPtr);
485
486
487 for (i = 0; i < node->numAntecedents; i++) {
488 a = *(node->antecedents + i);
489 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
490 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
491 a->numSuccDone++;
492 }
493
494 switch (node->dagHdr->status) {
495 case rf_enable:
496 case rf_rollForward:
497 for (i = 0; i < node->numSuccedents; i++) {
498 s = *(node->succedents + i);
499 RF_ASSERT(s->status == rf_wait);
500 (s->numAntDone)++;
501 if (s->numAntDone == s->numAntecedents) {
502
503 if (s->doFunc == rf_NullNodeFunc) {
504
505
506
507
508 s->next = finishlist;
509 finishlist = s;
510 } else {
511
512
513
514
515 skipNode = RF_FALSE;
516 for (j = 0; j < s->numAntecedents; j++)
517 if ((s->antType[j] ==
518 rf_trueData) &&
519 (s->antecedents[j]->status
520 == rf_bad))
521 skipNode = RF_TRUE;
522 if (skipNode) {
523
524
525
526
527
528 s->next = skiplist;
529 skiplist = s;
530 } else {
531
532
533
534
535 if (context != RF_INTR_CONTEXT)
536 {
537
538
539
540
541
542
543
544
545
546
547 s->next = firelist;
548 firelist = s;
549 } else {
550
551
552
553
554
555 RF_ASSERT(rf_NodeReady(s));
556 if (q) {
557 q->next = s;
558 q = s;
559 } else {
560 qh = q = s;
561 qh->next = NULL;
562 }
563 }
564 }
565 }
566 }
567 }
568
569 if (q) {
570
571
572
573
574 q->next = raidPtr->node_queue;
575 raidPtr->node_queue = qh;
576 DO_SIGNAL(raidPtr);
577 }
578 DO_UNLOCK(raidPtr);
579
580 for (; skiplist; skiplist = next) {
581 next = skiplist->next;
582 skiplist->status = rf_skipped;
583 for (i = 0; i < skiplist->numAntecedents; i++) {
584 skiplist->antecedents[i]->numSuccFired++;
585 }
586 if (skiplist->commitNode) {
587 skiplist->dagHdr->numCommits++;
588 }
589 rf_FinishNode(skiplist, context);
590 }
591 for (; finishlist; finishlist = next) {
592
593 next = finishlist->next;
594 finishlist->status = rf_good;
595 for (i = 0; i < finishlist->numAntecedents; i++) {
596 finishlist->antecedents[i]->numSuccFired++;
597 }
598 if (finishlist->commitNode)
599 finishlist->dagHdr->numCommits++;
600
601
602
603
604
605
606
607
608
609
610
611 rf_FinishNode(finishlist, context);
612 }
613
614 rf_FireNodeList(firelist);
615 break;
616
617 case rf_rollBackward:
618 for (i = 0; i < node->numAntecedents; i++) {
619 a = *(node->antecedents + i);
620 RF_ASSERT(a->status == rf_good);
621 RF_ASSERT(a->numSuccDone <= a->numSuccedents);
622 RF_ASSERT(a->numSuccDone <= a->numSuccFired);
623
624 if (a->numSuccDone == a->numSuccFired) {
625 if (a->undoFunc == rf_NullNodeFunc) {
626
627
628
629
630 a->next = finishlist;
631 finishlist = a;
632 } else {
633 if (context != RF_INTR_CONTEXT) {
634
635
636
637
638
639
640
641
642
643 a->next = firelist;
644 firelist = a;
645 } else {
646
647
648
649
650
651 RF_ASSERT(rf_NodeReady(a));
652 if (q) {
653 q->next = a;
654 q = a;
655 } else {
656 qh = q = a;
657 qh->next = NULL;
658 }
659 }
660 }
661 }
662 }
663 if (q) {
664
665
666
667
668 q->next = raidPtr->node_queue;
669 raidPtr->node_queue = qh;
670 DO_SIGNAL(raidPtr);
671 }
672 DO_UNLOCK(raidPtr);
673 for (; finishlist; finishlist = next) {
674
675 next = finishlist->next;
676 finishlist->status = rf_good;
677
678
679
680
681
682
683
684
685
686
687 rf_FinishNode(finishlist, context);
688
689 }
690
691 rf_FireNodeList(firelist);
692
693 break;
694 default:
695 printf("Engine found illegal DAG status in"
696 " rf_PropagateResults()\n");
697 RF_PANIC();
698 break;
699 }
700 }
701
702
703
704
705
706 void
707 rf_ProcessNode(RF_DagNode_t *node, int context)
708 {
709 RF_Raid_t *raidPtr;
710
711 raidPtr = node->dagHdr->raidPtr;
712
713 switch (node->status) {
714 case rf_good:
715
716 break;
717 case rf_bad:
718 if ((node->dagHdr->numCommits > 0) ||
719 (node->dagHdr->numCommitNodes == 0)) {
720
721 node->dagHdr->status = rf_rollForward;
722 if (rf_engineDebug || 1) {
723 printf("raid%d: node (%s) returned fail,"
724 " rolling forward\n", raidPtr->raidid,
725 node->name);
726 }
727 } else {
728
729 node->dagHdr->status = rf_rollBackward;
730 if (rf_engineDebug || 1) {
731 printf("raid%d: node (%s) returned fail,"
732 " rolling backward\n", raidPtr->raidid,
733 node->name);
734 }
735 }
736 break;
737 case rf_undone:
738
739 break;
740 case rf_panic:
741
742 printf("UNDO of a node failed !!!/n");
743 break;
744 default:
745 printf("node finished execution with an illegal status !!!\n");
746 RF_PANIC();
747 break;
748 }
749
750
751
752
753
754 rf_PropagateResults(node, context);
755 }
756
757
758
759
760
761
762
763
764 int
765 rf_FinishNode(RF_DagNode_t *node, int context)
766 {
767
768 int retcode = RF_FALSE;
769 node->dagHdr->numNodesCompleted++;
770 rf_ProcessNode(node, context);
771
772 return (retcode);
773 }
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789 int
790 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
791 {
792 RF_Raid_t *raidPtr;
793
794 raidPtr = dag->raidPtr;
795 if (dag->tracerec) {
796 RF_ETIMER_START(dag->tracerec->timer);
797 }
798 if (rf_engineDebug || rf_validateDAGDebug) {
799 if (rf_ValidateDAG(dag))
800 RF_PANIC();
801 }
802 if (rf_engineDebug>1) {
803 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
804 }
805 raidPtr->dags_in_flight++;
806
807
808
809 dag->cbFunc = cbFunc;
810 dag->cbArg = cbArg;
811 dag->numNodesCompleted = 0;
812 dag->status = rf_enable;
813 rf_FireNodeArray(dag->numSuccedents, dag->succedents);
814 return (1);
815 }
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831 #ifdef RAID_AUTOCONFIG
832 void
833 rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
834 {
835 RF_Raid_t *raidPtr;
836 char raidname[16];
837 pid_t oldpid = lastpid;
838
839 raidPtr = (RF_Raid_t *) arg;
840
841 if (rf_engineDebug) {
842 printf("raid%d: Starting engine thread\n", raidPtr->raidid);
843 }
844
845 lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
846 snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
847
848 if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
849 raidPtr, &raidname[0])) {
850 RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
851 return;
852 }
853
854 lastpid = oldpid;
855 if (rf_engineDebug) {
856 printf("raid%d: Engine thread started\n", raidPtr->raidid);
857 }
858 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
859 }
860 #endif
861
862 void
863 rf_DAGExecutionThread(RF_ThreadArg_t arg)
864 {
865 RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
866 RF_Raid_t *raidPtr;
867 int ks;
868 int s;
869
870 raidPtr = (RF_Raid_t *) arg;
871
872 while (!(&raidPtr->engine_tg)->created)
873 (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
874 "raidinit", 0);
875
876 if (rf_engineDebug) {
877 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
878 }
879
880
881 s = splbio();
882
883 RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
884
885 rf_hook_cookies[raidPtr->raidid] =
886 shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
887
888 DO_LOCK(raidPtr);
889 while (!raidPtr->shutdown_engine) {
890
891 while (raidPtr->node_queue != NULL) {
892 local_nq = raidPtr->node_queue;
893 fire_nq = NULL;
894 term_nq = NULL;
895 raidPtr->node_queue = NULL;
896 DO_UNLOCK(raidPtr);
897
898
899 while (local_nq) {
900 nd = local_nq;
901 local_nq = local_nq->next;
902 switch (nd->dagHdr->status) {
903 case rf_enable:
904 case rf_rollForward:
905 if (nd->numSuccedents == 0) {
906
907
908
909
910 nd->next = term_nq;
911 term_nq = nd;
912 } else {
913
914
915
916
917 nd->next = fire_nq;
918 fire_nq = nd;
919 }
920 break;
921 case rf_rollBackward:
922 if (nd->numAntecedents == 0) {
923
924
925
926
927 nd->next = term_nq;
928 term_nq = nd;
929 } else {
930
931
932
933
934 nd->next = fire_nq;
935 fire_nq = nd;
936 }
937 break;
938 default:
939 RF_PANIC();
940 break;
941 }
942 }
943
944
945
946
947
948 while (term_nq) {
949 nd = term_nq;
950 term_nq = term_nq->next;
951 nd->next = NULL;
952 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
953 raidPtr->dags_in_flight--;
954 }
955
956
957 rf_FireNodeList(fire_nq);
958
959 DO_LOCK(raidPtr);
960 }
961 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
962 DO_WAIT(raidPtr);
963 }
964 DO_UNLOCK(raidPtr);
965
966 if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
967 shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
968 rf_hook_cookies[raidPtr->raidid] = NULL;
969 }
970
971 RF_THREADGROUP_DONE(&raidPtr->engine_tg);
972
973 splx(s);
974 kthread_exit(0);
975 }