This source file includes following definitions.
- rf_CreateNonRedundantWriteDAG
- rf_CreateRAID0WriteDAG
- rf_CreateSmallWriteDAG
- rf_CreateLargeWriteDAG
- rf_CommonCreateLargeWriteDAG
- rf_CommonCreateSmallWriteDAG
- rf_CreateRaidOneWriteDAG
- rf_CommonCreateLargeWriteDAGFwd
- rf_CommonCreateSmallWriteDAGFwd
- rf_CreateRaidOneWriteDAGFwd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 #include "rf_types.h"
39 #include "rf_raid.h"
40 #include "rf_dag.h"
41 #include "rf_dagutils.h"
42 #include "rf_dagfuncs.h"
43 #include "rf_debugMem.h"
44 #include "rf_dagffrd.h"
45 #include "rf_memchunk.h"
46 #include "rf_general.h"
47 #include "rf_dagffwr.h"
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 void
79 rf_CreateNonRedundantWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
80 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
81 RF_AllocListElem_t *allocList, RF_IoType_t type)
82 {
83 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
84 RF_IO_TYPE_WRITE);
85 }
86
87 void
88 rf_CreateRAID0WriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
89 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
90 RF_AllocListElem_t *allocList, RF_IoType_t type)
91 {
92 rf_CreateNonredundantDAG(raidPtr, asmap, dag_h, bp, flags, allocList,
93 RF_IO_TYPE_WRITE);
94 }
95
96 void
97 rf_CreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
98 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
99 RF_AllocListElem_t *allocList)
100 {
101
102 rf_CommonCreateSmallWriteDAG(raidPtr, asmap, dag_h, bp, flags,
103 allocList, &rf_xorFuncs, NULL);
104 }
105
106 void
107 rf_CreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
108 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
109 RF_AllocListElem_t *allocList)
110 {
111
112 rf_CommonCreateLargeWriteDAG(raidPtr, asmap, dag_h, bp, flags,
113 allocList, 1, rf_RegularXorFunc, RF_TRUE);
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 void
156 rf_CommonCreateLargeWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
157 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
158 RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
159 int allowBufferRecycle)
160 {
161 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
162 RF_DagNode_t *wnqNode, *blockNode, *commitNode, *termNode;
163 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
164 RF_AccessStripeMapHeader_t *new_asm_h[2];
165 RF_StripeNum_t parityStripeID;
166 char *sosBuffer, *eosBuffer;
167 RF_ReconUnitNum_t which_ru;
168 RF_RaidLayout_t *layoutPtr;
169 RF_PhysDiskAddr_t *pda;
170
171 layoutPtr = &(raidPtr->Layout);
172 parityStripeID = rf_RaidAddressToParityStripeID(layoutPtr,
173 asmap->raidAddress, &which_ru);
174
175 if (rf_dagDebug) {
176 printf("[Creating large-write DAG]\n");
177 }
178 dag_h->creator = "LargeWriteDAG";
179
180 dag_h->numCommitNodes = 1;
181 dag_h->numCommits = 0;
182 dag_h->numSuccedents = 1;
183
184
185 nWndNodes = asmap->numStripeUnitsAccessed;
186 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
187 (RF_DagNode_t *), allocList);
188 i = 0;
189 wndNodes = &nodes[i];
190 i += nWndNodes;
191 xorNode = &nodes[i];
192 i += 1;
193 wnpNode = &nodes[i];
194 i += 1;
195 blockNode = &nodes[i];
196 i += 1;
197 commitNode = &nodes[i];
198 i += 1;
199 termNode = &nodes[i];
200 i += 1;
201 if (nfaults == 2) {
202 wnqNode = &nodes[i];
203 i += 1;
204 } else {
205 wnqNode = NULL;
206 }
207 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
208 new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
209 if (nRodNodes > 0) {
210 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
211 (RF_DagNode_t *), allocList);
212 } else {
213 rodNodes = NULL;
214 }
215
216
217 if (nRodNodes > 0) {
218 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
219 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
220 "Nil", allocList);
221 } else {
222 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
223 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
224 allocList);
225 }
226
227 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
228 rf_NullNodeUndoFunc, NULL, nWndNodes + nfaults, 1, 0, 0,
229 dag_h, "Cmt", allocList);
230 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
231 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
232 dag_h, "Trm", allocList);
233
234
235 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
236 if (new_asm_h[asmNum]) {
237 pda = new_asm_h[asmNum]->stripeMap->physInfo;
238 while (pda) {
239 rf_InitNode(&rodNodes[nodeNum], rf_wait,
240 RF_FALSE, rf_DiskReadFunc,
241 rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
242 1, 1, 4, 0, dag_h, "Rod", allocList);
243 rodNodes[nodeNum].params[0].p = pda;
244 rodNodes[nodeNum].params[1].p = pda->bufPtr;
245 rodNodes[nodeNum].params[2].v = parityStripeID;
246 rodNodes[nodeNum].params[3].v =
247 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
248 0, 0, which_ru);
249 nodeNum++;
250 pda = pda->next;
251 }
252 }
253 }
254 RF_ASSERT(nodeNum == nRodNodes);
255
256
257 pda = asmap->physInfo;
258 for (i = 0; i < nWndNodes; i++) {
259 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
260 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
261 dag_h, "Wnd", allocList);
262 RF_ASSERT(pda != NULL);
263 wndNodes[i].params[0].p = pda;
264 wndNodes[i].params[1].p = pda->bufPtr;
265 wndNodes[i].params[2].v = parityStripeID;
266 wndNodes[i].params[3].v =
267 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
268 pda = pda->next;
269 }
270
271
272 if (nRodNodes > 0) {
273 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
274 rf_NullNodeUndoFunc, NULL, 1, nRodNodes,
275 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
276 "Xr ", allocList);
277 } else {
278 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc,
279 rf_NullNodeUndoFunc, NULL, 1, 1,
280 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
281 "Xr ", allocList);
282 }
283 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
284 for (i = 0; i < nWndNodes; i++) {
285 xorNode->params[2 * i + 0] =
286 wndNodes[i].params[0];
287 xorNode->params[2 * i + 1] =
288 wndNodes[i].params[1];
289 }
290 for (i = 0; i < nRodNodes; i++) {
291 xorNode->params[2 * (nWndNodes + i) + 0] =
292 rodNodes[i].params[0];
293 xorNode->params[2 * (nWndNodes + i) + 1] =
294 rodNodes[i].params[1];
295 }
296
297 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
298
299
300
301
302
303
304
305 if (allowBufferRecycle) {
306 for (i = 0; i < nRodNodes; i++) {
307 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
308 ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
309 break;
310 }
311 }
312 if ((!allowBufferRecycle) || (i == nRodNodes)) {
313 RF_CallocAndAdd(xorNode->results[0], 1,
314 rf_RaidAddressToByte(raidPtr,
315 raidPtr->Layout.sectorsPerStripeUnit),
316 (void *), allocList);
317 } else {
318 xorNode->results[0] = rodNodes[i].params[1].p;
319 }
320
321
322 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
323 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
324 dag_h, "Wnp", allocList);
325 wnpNode->params[0].p = asmap->parityInfo;
326 wnpNode->params[1].p = xorNode->results[0];
327 wnpNode->params[2].v = parityStripeID;
328 wnpNode->params[3].v =
329 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
330
331 RF_ASSERT(asmap->parityInfo->next == NULL);
332
333 if (nfaults == 2) {
334
335
336
337
338
339
340 RF_CallocAndAdd(xorNode->results[1], 1,
341 rf_RaidAddressToByte(raidPtr,
342 raidPtr->Layout.sectorsPerStripeUnit),
343 (void *), allocList);
344 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
345 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
346 dag_h, "Wnq", allocList);
347 wnqNode->params[0].p = asmap->qInfo;
348 wnqNode->params[1].p = xorNode->results[1];
349 wnqNode->params[2].v = parityStripeID;
350 wnqNode->params[3].v =
351 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
352
353 RF_ASSERT(asmap->parityInfo->next == NULL);
354 }
355
356
357
358
359
360 RF_ASSERT(blockNode->numAntecedents == 0);
361 dag_h->succedents[0] = blockNode;
362
363 if (nRodNodes > 0) {
364
365 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
366 RF_ASSERT(xorNode->numAntecedents == nRodNodes);
367 for (i = 0; i < nRodNodes; i++) {
368 RF_ASSERT(rodNodes[i].numAntecedents == 1);
369 blockNode->succedents[i] = &rodNodes[i];
370 rodNodes[i].antecedents[0] = blockNode;
371 rodNodes[i].antType[0] = rf_control;
372
373
374 RF_ASSERT(rodNodes[i].numSuccedents == 1);
375 rodNodes[i].succedents[0] = xorNode;
376 xorNode->antecedents[i] = &rodNodes[i];
377 xorNode->antType[i] = rf_trueData;
378 }
379 } else {
380
381 RF_ASSERT(blockNode->numSuccedents == 1);
382 RF_ASSERT(xorNode->numAntecedents == 1);
383 blockNode->succedents[0] = xorNode;
384 xorNode->antecedents[0] = blockNode;
385 xorNode->antType[0] = rf_control;
386 }
387
388
389 RF_ASSERT(xorNode->numSuccedents == 1);
390 RF_ASSERT(commitNode->numAntecedents == 1);
391 xorNode->succedents[0] = commitNode;
392 commitNode->antecedents[0] = xorNode;
393 commitNode->antType[0] = rf_control;
394
395
396 RF_ASSERT(commitNode->numSuccedents == nWndNodes + nfaults);
397 for (i = 0; i < nWndNodes; i++) {
398 RF_ASSERT(wndNodes->numAntecedents == 1);
399 commitNode->succedents[i] = &wndNodes[i];
400 wndNodes[i].antecedents[0] = commitNode;
401 wndNodes[i].antType[0] = rf_control;
402 }
403 RF_ASSERT(wnpNode->numAntecedents == 1);
404 commitNode->succedents[nWndNodes] = wnpNode;
405 wnpNode->antecedents[0] = commitNode;
406 wnpNode->antType[0] = rf_trueData;
407 if (nfaults == 2) {
408 RF_ASSERT(wnqNode->numAntecedents == 1);
409 commitNode->succedents[nWndNodes + 1] = wnqNode;
410 wnqNode->antecedents[0] = commitNode;
411 wnqNode->antType[0] = rf_trueData;
412 }
413
414 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
415 RF_ASSERT(termNode->numSuccedents == 0);
416 for (i = 0; i < nWndNodes; i++) {
417 RF_ASSERT(wndNodes->numSuccedents == 1);
418 wndNodes[i].succedents[0] = termNode;
419 termNode->antecedents[i] = &wndNodes[i];
420 termNode->antType[i] = rf_control;
421 }
422 RF_ASSERT(wnpNode->numSuccedents == 1);
423 wnpNode->succedents[0] = termNode;
424 termNode->antecedents[nWndNodes] = wnpNode;
425 termNode->antType[nWndNodes] = rf_control;
426 if (nfaults == 2) {
427 RF_ASSERT(wnqNode->numSuccedents == 1);
428 wnqNode->succedents[0] = termNode;
429 termNode->antecedents[nWndNodes + 1] = wnqNode;
430 termNode->antType[nWndNodes + 1] = rf_control;
431 }
432 }
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466 void
467 rf_CommonCreateSmallWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
468 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
469 RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
470 {
471 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
472 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
473 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *commitNode, *nodes;
474 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
475 int i, j, nNodes, totalNumNodes, lu_flag;
476 RF_ReconUnitNum_t which_ru;
477 int (*func) (RF_DagNode_t *);
478 int (*undoFunc) (RF_DagNode_t *);
479 int (*qfunc) (RF_DagNode_t *);
480 int numDataNodes, numParityNodes;
481 RF_StripeNum_t parityStripeID;
482 RF_PhysDiskAddr_t *pda;
483 char *name, *qname;
484 long nfaults;
485
486 nfaults = qfuncs ? 2 : 1;
487 lu_flag = (rf_enableAtomicRMW) ? 1 : 0;
488
489 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
490 asmap->raidAddress, &which_ru);
491 pda = asmap->physInfo;
492 numDataNodes = asmap->numStripeUnitsAccessed;
493 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
494
495 if (rf_dagDebug) {
496 printf("[Creating small-write DAG]\n");
497 }
498 RF_ASSERT(numDataNodes > 0);
499 dag_h->creator = "SmallWriteDAG";
500
501 dag_h->numCommitNodes = 1;
502 dag_h->numCommits = 0;
503 dag_h->numSuccedents = 1;
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
525 + (nfaults * 2 * numParityNodes) + 3;
526 if (lu_flag) {
527 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
528 }
529
530
531
532 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
533 (RF_DagNode_t *), allocList);
534 i = 0;
535 blockNode = &nodes[i];
536 i += 1;
537 commitNode = &nodes[i];
538 i += 1;
539 readDataNodes = &nodes[i];
540 i += numDataNodes;
541 readParityNodes = &nodes[i];
542 i += numParityNodes;
543 writeDataNodes = &nodes[i];
544 i += numDataNodes;
545 writeParityNodes = &nodes[i];
546 i += numParityNodes;
547 xorNodes = &nodes[i];
548 i += numParityNodes;
549 termNode = &nodes[i];
550 i += 1;
551 if (lu_flag) {
552 unlockDataNodes = &nodes[i];
553 i += numDataNodes;
554 unlockParityNodes = &nodes[i];
555 i += numParityNodes;
556 } else {
557 unlockDataNodes = unlockParityNodes = NULL;
558 }
559 if (nfaults == 2) {
560 readQNodes = &nodes[i];
561 i += numParityNodes;
562 writeQNodes = &nodes[i];
563 i += numParityNodes;
564 qNodes = &nodes[i];
565 i += numParityNodes;
566 if (lu_flag) {
567 unlockQNodes = &nodes[i];
568 i += numParityNodes;
569 } else {
570 unlockQNodes = NULL;
571 }
572 } else {
573 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
574 }
575 RF_ASSERT(i == totalNumNodes);
576
577
578
579
580
581 nNodes = numDataNodes + (nfaults * numParityNodes);
582 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
583 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
584 "Nil", allocList);
585
586
587 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
588 rf_NullNodeUndoFunc, NULL, nNodes, (nfaults * numParityNodes),
589 0, 0, dag_h, "Cmt", allocList);
590
591
592 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
593 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
594 "Trm", allocList);
595
596
597 for (i = 0; i < numDataNodes; i++) {
598 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
599 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
600 (nfaults * numParityNodes), 1, 4, 0, dag_h, "Rod",
601 allocList);
602 RF_ASSERT(pda != NULL);
603
604 readDataNodes[i].params[0].p = pda;
605
606 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
607 dag_h, pda, allocList);
608 readDataNodes[i].params[2].v = parityStripeID;
609 readDataNodes[i].params[3].v =
610 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
611 lu_flag, 0, which_ru);
612 pda = pda->next;
613 for (j = 0; j < readDataNodes[i].numSuccedents; j++) {
614 readDataNodes[i].propList[j] = NULL;
615 }
616 }
617
618
619 pda = asmap->parityInfo;
620 i = 0;
621 for (i = 0; i < numParityNodes; i++) {
622 RF_ASSERT(pda != NULL);
623 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
624 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
625 numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
626 readParityNodes[i].params[0].p = pda;
627
628 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
629 dag_h, pda, allocList);
630 readParityNodes[i].params[2].v = parityStripeID;
631 readParityNodes[i].params[3].v =
632 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
633 lu_flag, 0, which_ru);
634 pda = pda->next;
635 for (j = 0; j < readParityNodes[i].numSuccedents; j++) {
636 readParityNodes[i].propList[0] = NULL;
637 }
638 }
639
640
641 if (nfaults == 2) {
642 pda = asmap->qInfo;
643 for (i = 0; i < numParityNodes; i++) {
644 RF_ASSERT(pda != NULL);
645 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
646 rf_DiskReadFunc, rf_DiskReadUndoFunc,
647 rf_GenericWakeupFunc, numParityNodes,
648 1, 4, 0, dag_h, "Roq", allocList);
649 readQNodes[i].params[0].p = pda;
650
651 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
652 dag_h, pda, allocList);
653 readQNodes[i].params[2].v = parityStripeID;
654 readQNodes[i].params[3].v =
655 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
656 lu_flag, 0, which_ru);
657 pda = pda->next;
658 for (j = 0; j < readQNodes[i].numSuccedents; j++) {
659 readQNodes[i].propList[0] = NULL;
660 }
661 }
662 }
663
664 pda = asmap->physInfo;
665 for (i = 0; i < numDataNodes; i++) {
666 RF_ASSERT(pda != NULL);
667 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
668 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
669 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
670 "Wnd", allocList);
671
672 writeDataNodes[i].params[0].p = pda;
673
674 writeDataNodes[i].params[1].p = pda->bufPtr;
675 writeDataNodes[i].params[2].v = parityStripeID;
676 writeDataNodes[i].params[3].v =
677 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
678 if (lu_flag) {
679
680 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
681 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
682 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
683 "Und", allocList);
684
685 unlockDataNodes[i].params[0].p = pda;
686 unlockDataNodes[i].params[1].v =
687 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
688 0, lu_flag, which_ru);
689 }
690 pda = pda->next;
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705 if ((numParityNodes == 2) || ((numDataNodes == 1) &&
706 (asmap->totalSectorsAccessed <
707 raidPtr->Layout.sectorsPerStripeUnit))) {
708 func = pfuncs->simple;
709 undoFunc = rf_NullNodeUndoFunc;
710 name = pfuncs->SimpleName;
711 if (qfuncs) {
712 qfunc = qfuncs->simple;
713 qname = qfuncs->SimpleName;
714 } else {
715 qfunc = NULL;
716 qname = NULL;
717 }
718 } else {
719 func = pfuncs->regular;
720 undoFunc = rf_NullNodeUndoFunc;
721 name = pfuncs->RegularName;
722 if (qfuncs) {
723 qfunc = qfuncs->regular;
724 qname = qfuncs->RegularName;
725 } else {
726 qfunc = NULL;
727 qname = NULL;
728 }
729 }
730
731
732
733
734 if (numParityNodes == 2) {
735
736 for (i = 0; i < numParityNodes; i++) {
737
738 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
739 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
740 7, 1, dag_h, name, allocList);
741 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
742 xorNodes[i].params[0] = readDataNodes[i].params[0];
743 xorNodes[i].params[1] = readDataNodes[i].params[1];
744 xorNodes[i].params[2] = readParityNodes[i].params[0];
745 xorNodes[i].params[3] = readParityNodes[i].params[1];
746 xorNodes[i].params[4] = writeDataNodes[i].params[0];
747 xorNodes[i].params[5] = writeDataNodes[i].params[1];
748 xorNodes[i].params[6].p = raidPtr;
749
750 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
751 if (nfaults == 2) {
752
753 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
754 qfunc, undoFunc, NULL, 1,
755 (numDataNodes + numParityNodes), 7, 1,
756 dag_h, qname, allocList);
757 qNodes[i].params[0] =
758 readDataNodes[i].params[0];
759 qNodes[i].params[1] =
760 readDataNodes[i].params[1];
761 qNodes[i].params[2] = readQNodes[i].params[0];
762 qNodes[i].params[3] = readQNodes[i].params[1];
763 qNodes[i].params[4] =
764 writeDataNodes[i].params[0];
765 qNodes[i].params[5] =
766 writeDataNodes[i].params[1];
767 qNodes[i].params[6].p = raidPtr;
768
769 qNodes[i].results[0] =
770 readQNodes[i].params[1].p;
771 }
772 }
773 } else {
774
775 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
776 NULL, 1, (numDataNodes + numParityNodes),
777 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
778 dag_h, name, allocList);
779 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
780 for (i = 0; i < numDataNodes + 1; i++) {
781
782 xorNodes[0].params[2 * i + 0] =
783 readDataNodes[i].params[0];
784 xorNodes[0].params[2 * i + 1] =
785 readDataNodes[i].params[1];
786 }
787 for (i = 0; i < numDataNodes; i++) {
788
789 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
790 writeDataNodes[i].params[0];
791 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
792 writeDataNodes[i].params[1];
793 }
794
795 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
796 raidPtr;
797 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
798 if (nfaults == 2) {
799 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
800 undoFunc, NULL, 1, (numDataNodes + numParityNodes),
801 (2 * (numDataNodes + numDataNodes + 1) + 1), 1,
802 dag_h, qname, allocList);
803 for (i = 0; i < numDataNodes; i++) {
804
805 qNodes[0].params[2 * i + 0] =
806 readDataNodes[i].params[0];
807 qNodes[0].params[2 * i + 1] =
808 readDataNodes[i].params[1];
809 }
810
811 qNodes[0].params[2 * numDataNodes + 0] =
812 readQNodes[0].params[0];
813 qNodes[0].params[2 * numDataNodes + 1] =
814 readQNodes[0].params[1];
815 for (i = 0; i < numDataNodes; i++) {
816
817 qNodes[0].params
818 [2 * (numDataNodes + 1 + i) + 0] =
819
820 writeDataNodes[i].params[0];
821 qNodes[0].params
822 [2 * (numDataNodes + 1 + i) + 1] =
823
824 writeDataNodes[i].params[1];
825 }
826
827 qNodes[0].params
828 [2 * (numDataNodes + numDataNodes + 1)].p = raidPtr;
829 qNodes[0].results[0] = readQNodes[0].params[1].p;
830 }
831 }
832
833
834 pda = asmap->parityInfo;
835 for (i = 0; i < numParityNodes; i++) {
836 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
837 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
838 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
839 "Wnp", allocList);
840 RF_ASSERT(pda != NULL);
841
842 writeParityNodes[i].params[0].p = pda;
843
844 writeParityNodes[i].params[1].p = xorNodes[i].results[0];
845 writeParityNodes[i].params[2].v = parityStripeID;
846 writeParityNodes[i].params[3].v =
847 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
848 if (lu_flag) {
849
850 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
851 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
852 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
853 "Unp", allocList);
854
855 unlockParityNodes[i].params[0].p = pda;
856 unlockParityNodes[i].params[1].v =
857 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
858 0, lu_flag, which_ru);
859 }
860 pda = pda->next;
861 }
862
863
864 if (nfaults == 2) {
865 pda = asmap->qInfo;
866 for (i = 0; i < numParityNodes; i++) {
867 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
868 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
869 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
870 "Wnq", allocList);
871 RF_ASSERT(pda != NULL);
872
873 writeQNodes[i].params[0].p = pda;
874 writeQNodes[i].params[1].p = qNodes[i].results[0];
875
876 writeQNodes[i].params[2].v = parityStripeID;
877 writeQNodes[i].params[3].v =
878 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
879 0, 0, which_ru);
880 if (lu_flag) {
881
882 rf_InitNode(&unlockQNodes[i], rf_wait,
883 RF_FALSE, rf_DiskUnlockFunc,
884 rf_DiskUnlockUndoFunc,
885 rf_GenericWakeupFunc, 1, 1, 2, 0,
886 dag_h, "Unq", allocList);
887
888 unlockQNodes[i].params[0].p = pda;
889 unlockQNodes[i].params[1].v =
890 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
891 0, lu_flag, which_ru);
892 }
893 pda = pda->next;
894 }
895 }
896
897
898
899
900
901 dag_h->succedents[0] = blockNode;
902
903
904 RF_ASSERT(blockNode->numSuccedents ==
905 (numDataNodes + (numParityNodes * nfaults)));
906 for (i = 0; i < numDataNodes; i++) {
907 blockNode->succedents[i] = &readDataNodes[i];
908 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
909 readDataNodes[i].antecedents[0] = blockNode;
910 readDataNodes[i].antType[0] = rf_control;
911 }
912
913
914 for (i = 0; i < numParityNodes; i++) {
915 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
916 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
917 readParityNodes[i].antecedents[0] = blockNode;
918 readParityNodes[i].antType[0] = rf_control;
919 }
920
921
922 if (nfaults == 2) {
923 for (i = 0; i < numParityNodes; i++) {
924 blockNode->succedents[numDataNodes + numParityNodes + i]
925 = &readQNodes[i];
926 RF_ASSERT(readQNodes[i].numAntecedents == 1);
927 readQNodes[i].antecedents[0] = blockNode;
928 readQNodes[i].antType[0] = rf_control;
929 }
930 }
931
932 for (i = 0; i < numDataNodes; i++) {
933 RF_ASSERT(readDataNodes[i].numSuccedents ==
934 (nfaults * numParityNodes));
935 for (j = 0; j < numParityNodes; j++) {
936 RF_ASSERT(xorNodes[j].numAntecedents ==
937 numDataNodes + numParityNodes);
938 readDataNodes[i].succedents[j] = &xorNodes[j];
939 xorNodes[j].antecedents[i] = &readDataNodes[i];
940 xorNodes[j].antType[i] = rf_trueData;
941 }
942 }
943
944
945 if (nfaults == 2) {
946 for (i = 0; i < numDataNodes; i++) {
947 for (j = 0; j < numParityNodes; j++) {
948 RF_ASSERT(qNodes[j].numAntecedents ==
949 numDataNodes + numParityNodes);
950 readDataNodes[i].succedents[numParityNodes + j]
951 = &qNodes[j];
952 qNodes[j].antecedents[i] = &readDataNodes[i];
953 qNodes[j].antType[i] = rf_trueData;
954 }
955 }
956 }
957
958 for (i = 0; i < numParityNodes; i++) {
959 RF_ASSERT(readParityNodes[i].numSuccedents == numParityNodes);
960 for (j = 0; j < numParityNodes; j++) {
961 readParityNodes[i].succedents[j] = &xorNodes[j];
962 xorNodes[j].antecedents[numDataNodes + i] =
963 &readParityNodes[i];
964 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
965 }
966 }
967
968
969 if (nfaults == 2) {
970 for (i = 0; i < numParityNodes; i++) {
971 RF_ASSERT(readParityNodes[i].numSuccedents ==
972 numParityNodes);
973 for (j = 0; j < numParityNodes; j++) {
974 readQNodes[i].succedents[j] = &qNodes[j];
975 qNodes[j].antecedents[numDataNodes + i] =
976 &readQNodes[i];
977 qNodes[j].antType[numDataNodes + i] =
978 rf_trueData;
979 }
980 }
981 }
982
983 RF_ASSERT(commitNode->numAntecedents == (nfaults * numParityNodes));
984 for (i = 0; i < numParityNodes; i++) {
985 RF_ASSERT(xorNodes[i].numSuccedents == 1);
986 xorNodes[i].succedents[0] = commitNode;
987 commitNode->antecedents[i] = &xorNodes[i];
988 commitNode->antType[i] = rf_control;
989 }
990
991
992 if (nfaults == 2) {
993 for (i = 0; i < numParityNodes; i++) {
994 RF_ASSERT(qNodes[i].numSuccedents == 1);
995 qNodes[i].succedents[0] = commitNode;
996 commitNode->antecedents[i + numParityNodes] =
997 &qNodes[i];
998 commitNode->antType[i + numParityNodes] = rf_control;
999 }
1000 }
1001
1002 RF_ASSERT(commitNode->numSuccedents ==
1003 (numDataNodes + (nfaults * numParityNodes)));
1004 for (i = 0; i < numDataNodes; i++) {
1005 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
1006 commitNode->succedents[i] = &writeDataNodes[i];
1007 writeDataNodes[i].antecedents[0] = commitNode;
1008 writeDataNodes[i].antType[0] = rf_trueData;
1009 }
1010 for (i = 0; i < numParityNodes; i++) {
1011 RF_ASSERT(writeParityNodes[i].numAntecedents == 1);
1012 commitNode->succedents[i + numDataNodes] = &writeParityNodes[i];
1013 writeParityNodes[i].antecedents[0] = commitNode;
1014 writeParityNodes[i].antType[0] = rf_trueData;
1015 }
1016 if (nfaults == 2) {
1017 for (i = 0; i < numParityNodes; i++) {
1018 RF_ASSERT(writeQNodes[i].numAntecedents == 1);
1019 commitNode->succedents
1020 [i + numDataNodes + numParityNodes] =
1021 &writeQNodes[i];
1022 writeQNodes[i].antecedents[0] = commitNode;
1023 writeQNodes[i].antType[0] = rf_trueData;
1024 }
1025 }
1026 RF_ASSERT(termNode->numAntecedents ==
1027 (numDataNodes + (nfaults * numParityNodes)));
1028 RF_ASSERT(termNode->numSuccedents == 0);
1029 for (i = 0; i < numDataNodes; i++) {
1030 if (lu_flag) {
1031
1032 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1033 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
1034 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
1035 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
1036 unlockDataNodes[i].antType[0] = rf_control;
1037
1038
1039 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
1040 unlockDataNodes[i].succedents[0] = termNode;
1041 termNode->antecedents[i] = &unlockDataNodes[i];
1042 termNode->antType[i] = rf_control;
1043 } else {
1044
1045 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
1046 RF_ASSERT(termNode->numAntecedents ==
1047 (numDataNodes + (nfaults * numParityNodes)));
1048 writeDataNodes[i].succedents[0] = termNode;
1049 termNode->antecedents[i] = &writeDataNodes[i];
1050 termNode->antType[i] = rf_control;
1051 }
1052 }
1053
1054 for (i = 0; i < numParityNodes; i++) {
1055 if (lu_flag) {
1056
1057 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1058 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
1059 writeParityNodes[i].succedents[0] =
1060 &unlockParityNodes[i];
1061 unlockParityNodes[i].antecedents[0] =
1062 &writeParityNodes[i];
1063 unlockParityNodes[i].antType[0] = rf_control;
1064
1065
1066 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
1067 unlockParityNodes[i].succedents[0] = termNode;
1068 termNode->antecedents[numDataNodes + i] =
1069 &unlockParityNodes[i];
1070 termNode->antType[numDataNodes + i] = rf_control;
1071 } else {
1072 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
1073 writeParityNodes[i].succedents[0] = termNode;
1074 termNode->antecedents[numDataNodes + i] =
1075 &writeParityNodes[i];
1076 termNode->antType[numDataNodes + i] = rf_control;
1077 }
1078 }
1079
1080 if (nfaults == 2) {
1081 for (i = 0; i < numParityNodes; i++) {
1082 if (lu_flag) {
1083
1084 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1085 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
1086 writeQNodes[i].succedents[0] = &unlockQNodes[i];
1087 unlockQNodes[i].antecedents[0] =
1088 &writeQNodes[i];
1089 unlockQNodes[i].antType[0] = rf_control;
1090
1091
1092 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
1093 unlockQNodes[i].succedents[0] = termNode;
1094 termNode->antecedents
1095 [numDataNodes + numParityNodes + i] =
1096 &unlockQNodes[i];
1097 termNode->antType
1098 [numDataNodes + numParityNodes + i] =
1099 rf_control;
1100 } else {
1101 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
1102 writeQNodes[i].succedents[0] = termNode;
1103 termNode->antecedents
1104 [numDataNodes + numParityNodes + i] =
1105 &writeQNodes[i];
1106 termNode->antType
1107 [numDataNodes + numParityNodes + i] =
1108 rf_control;
1109 }
1110 }
1111 }
1112 }
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131 void
1132 rf_CreateRaidOneWriteDAG(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1133 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1134 RF_AllocListElem_t *allocList)
1135 {
1136 RF_DagNode_t *unblockNode, *termNode, *commitNode;
1137 RF_DagNode_t *nodes, *wndNode, *wmirNode;
1138 int nWndNodes, nWmirNodes, i;
1139 RF_ReconUnitNum_t which_ru;
1140 RF_PhysDiskAddr_t *pda, *pdaP;
1141 RF_StripeNum_t parityStripeID;
1142
1143 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1144 asmap->raidAddress, &which_ru);
1145 if (rf_dagDebug) {
1146 printf("[Creating RAID level 1 write DAG]\n");
1147 }
1148 dag_h->creator = "RaidOneWriteDAG";
1149
1150
1151 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
1152 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
1153
1154
1155 if (asmap->numDataFailed == 1)
1156 nWndNodes--;
1157 if (asmap->numParityFailed == 1)
1158 nWmirNodes--;
1159
1160
1161
1162
1163
1164 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3, sizeof(RF_DagNode_t),
1165 (RF_DagNode_t *), allocList);
1166 i = 0;
1167 wndNode = &nodes[i];
1168 i += nWndNodes;
1169 wmirNode = &nodes[i];
1170 i += nWmirNodes;
1171 commitNode = &nodes[i];
1172 i += 1;
1173 unblockNode = &nodes[i];
1174 i += 1;
1175 termNode = &nodes[i];
1176 i += 1;
1177 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
1178
1179
1180 dag_h->numCommitNodes = 1;
1181 dag_h->numCommits = 0;
1182 dag_h->numSuccedents = 1;
1183
1184
1185 rf_InitNode(commitNode, rf_wait, RF_TRUE, rf_NullNodeFunc,
1186 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes), 0, 0, 0,
1187 dag_h, "Cmt", allocList);
1188 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1189 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes), 0, 0,
1190 dag_h, "Nil", allocList);
1191 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1192 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
1193
1194
1195 if (nWndNodes > 0) {
1196 pda = asmap->physInfo;
1197 for (i = 0; i < nWndNodes; i++) {
1198 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
1199 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1200 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1201 "Wpd", allocList);
1202 RF_ASSERT(pda != NULL);
1203 wndNode[i].params[0].p = pda;
1204 wndNode[i].params[1].p = pda->bufPtr;
1205 wndNode[i].params[2].v = parityStripeID;
1206 wndNode[i].params[3].v =
1207 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1208 0, 0, which_ru);
1209 pda = pda->next;
1210 }
1211 RF_ASSERT(pda == NULL);
1212 }
1213
1214 if (nWmirNodes > 0) {
1215 pda = asmap->physInfo;
1216 pdaP = asmap->parityInfo;
1217 for (i = 0; i < nWmirNodes; i++) {
1218 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
1219 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1220 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
1221 "Wsd", allocList);
1222 RF_ASSERT(pda != NULL);
1223 wmirNode[i].params[0].p = pdaP;
1224 wmirNode[i].params[1].p = pda->bufPtr;
1225 wmirNode[i].params[2].v = parityStripeID;
1226 wmirNode[i].params[3].v =
1227 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1228 0, 0, which_ru);
1229 pda = pda->next;
1230 pdaP = pdaP->next;
1231 }
1232 RF_ASSERT(pda == NULL);
1233 RF_ASSERT(pdaP == NULL);
1234 }
1235
1236 RF_ASSERT(dag_h->numSuccedents == 1);
1237 RF_ASSERT(commitNode->numAntecedents == 0);
1238 dag_h->succedents[0] = commitNode;
1239
1240
1241 RF_ASSERT(commitNode->numSuccedents == (nWndNodes + nWmirNodes));
1242 for (i = 0; i < nWndNodes; i++) {
1243 RF_ASSERT(wndNode[i].numAntecedents == 1);
1244 commitNode->succedents[i] = &wndNode[i];
1245 wndNode[i].antecedents[0] = commitNode;
1246 wndNode[i].antType[0] = rf_control;
1247 }
1248 for (i = 0; i < nWmirNodes; i++) {
1249 RF_ASSERT(wmirNode[i].numAntecedents == 1);
1250 commitNode->succedents[i + nWndNodes] = &wmirNode[i];
1251 wmirNode[i].antecedents[0] = commitNode;
1252 wmirNode[i].antType[0] = rf_control;
1253 }
1254
1255
1256 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
1257 for (i = 0; i < nWndNodes; i++) {
1258 RF_ASSERT(wndNode[i].numSuccedents == 1);
1259 wndNode[i].succedents[0] = unblockNode;
1260 unblockNode->antecedents[i] = &wndNode[i];
1261 unblockNode->antType[i] = rf_control;
1262 }
1263 for (i = 0; i < nWmirNodes; i++) {
1264 RF_ASSERT(wmirNode[i].numSuccedents == 1);
1265 wmirNode[i].succedents[0] = unblockNode;
1266 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
1267 unblockNode->antType[i + nWndNodes] = rf_control;
1268 }
1269
1270
1271 RF_ASSERT(unblockNode->numSuccedents == 1);
1272 RF_ASSERT(termNode->numAntecedents == 1);
1273 RF_ASSERT(termNode->numSuccedents == 0);
1274 unblockNode->succedents[0] = termNode;
1275 termNode->antecedents[0] = unblockNode;
1276 termNode->antType[0] = rf_control;
1277 }
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291 void
1292 rf_CommonCreateLargeWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1293 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1294 RF_AllocListElem_t *allocList, int nfaults, int (*redFunc) (RF_DagNode_t *),
1295 int allowBufferRecycle)
1296 {
1297 RF_DagNode_t *nodes, *wndNodes, *rodNodes, *xorNode, *wnpNode;
1298 RF_DagNode_t *wnqNode, *blockNode, *syncNode, *termNode;
1299 int nWndNodes, nRodNodes, i, nodeNum, asmNum;
1300 RF_AccessStripeMapHeader_t *new_asm_h[2];
1301 RF_StripeNum_t parityStripeID;
1302 char *sosBuffer, *eosBuffer;
1303 RF_ReconUnitNum_t which_ru;
1304 RF_RaidLayout_t *layoutPtr;
1305 RF_PhysDiskAddr_t *pda;
1306
1307 layoutPtr = &(raidPtr->Layout);
1308 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1309 asmap->raidAddress, &which_ru);
1310
1311 if (rf_dagDebug)
1312 printf("[Creating large-write DAG]\n");
1313 dag_h->creator = "LargeWriteDAGFwd";
1314
1315 dag_h->numCommitNodes = 0;
1316 dag_h->numCommits = 0;
1317 dag_h->numSuccedents = 1;
1318
1319
1320 nWndNodes = asmap->numStripeUnitsAccessed;
1321 RF_CallocAndAdd(nodes, nWndNodes + 4 + nfaults, sizeof(RF_DagNode_t),
1322 (RF_DagNode_t *), allocList);
1323 i = 0;
1324 wndNodes = &nodes[i];
1325 i += nWndNodes;
1326 xorNode = &nodes[i];
1327 i += 1;
1328 wnpNode = &nodes[i];
1329 i += 1;
1330 blockNode = &nodes[i];
1331 i += 1;
1332 syncNode = &nodes[i];
1333 i += 1;
1334 termNode = &nodes[i];
1335 i += 1;
1336 if (nfaults == 2) {
1337 wnqNode = &nodes[i];
1338 i += 1;
1339 } else {
1340 wnqNode = NULL;
1341 }
1342 rf_MapUnaccessedPortionOfStripe(raidPtr, layoutPtr, asmap, dag_h,
1343 new_asm_h, &nRodNodes, &sosBuffer, &eosBuffer, allocList);
1344 if (nRodNodes > 0) {
1345 RF_CallocAndAdd(rodNodes, nRodNodes, sizeof(RF_DagNode_t),
1346 (RF_DagNode_t *), allocList);
1347 } else {
1348 rodNodes = NULL;
1349 }
1350
1351
1352 if (nRodNodes > 0) {
1353 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1354 rf_NullNodeUndoFunc, NULL, nRodNodes, 0, 0, 0, dag_h,
1355 "Nil", allocList);
1356 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1357 rf_NullNodeUndoFunc, NULL, nWndNodes + 1, nRodNodes, 0, 0,
1358 dag_h, "Nil", allocList);
1359 } else {
1360 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1361 rf_NullNodeUndoFunc, NULL, 1, 0, 0, 0, dag_h, "Nil",
1362 allocList);
1363 rf_InitNode(syncNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1364 rf_NullNodeUndoFunc, NULL, nWndNodes + 1, 1, 0, 0, dag_h,
1365 "Nil", allocList);
1366 }
1367
1368 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1369 rf_TerminateUndoFunc, NULL, 0, nWndNodes + nfaults, 0, 0,
1370 dag_h, "Trm", allocList);
1371
1372
1373 for (nodeNum = asmNum = 0; asmNum < 2; asmNum++) {
1374 if (new_asm_h[asmNum]) {
1375 pda = new_asm_h[asmNum]->stripeMap->physInfo;
1376 while (pda) {
1377 rf_InitNode(&rodNodes[nodeNum], rf_wait,
1378 RF_FALSE, rf_DiskReadFunc,
1379 rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1380 1, 1, 4, 0, dag_h, "Rod", allocList);
1381 rodNodes[nodeNum].params[0].p = pda;
1382 rodNodes[nodeNum].params[1].p = pda->bufPtr;
1383 rodNodes[nodeNum].params[2].v = parityStripeID;
1384 rodNodes[nodeNum].params[3].v =
1385 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1386 0, 0, which_ru);
1387 nodeNum++;
1388 pda = pda->next;
1389 }
1390 }
1391 }
1392 RF_ASSERT(nodeNum == nRodNodes);
1393
1394
1395 pda = asmap->physInfo;
1396 for (i = 0; i < nWndNodes; i++) {
1397 rf_InitNode(&wndNodes[i], rf_wait, RF_FALSE, rf_DiskWriteFunc,
1398 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1399 dag_h, "Wnd", allocList);
1400 RF_ASSERT(pda != NULL);
1401 wndNodes[i].params[0].p = pda;
1402 wndNodes[i].params[1].p = pda->bufPtr;
1403 wndNodes[i].params[2].v = parityStripeID;
1404 wndNodes[i].params[3].v =
1405 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1406 pda = pda->next;
1407 }
1408
1409
1410 rf_InitNode(xorNode, rf_wait, RF_FALSE, redFunc, rf_NullNodeUndoFunc,
1411 NULL, 1, nfaults, 2 * (nWndNodes + nRodNodes) + 1, nfaults, dag_h,
1412 "Xr ", allocList);
1413 xorNode->flags |= RF_DAGNODE_FLAG_YIELD;
1414 for (i = 0; i < nWndNodes; i++) {
1415 xorNode->params[2 * i + 0] =
1416 wndNodes[i].params[0];
1417 xorNode->params[2 * i + 1] =
1418 wndNodes[i].params[1];
1419 }
1420 for (i = 0; i < nRodNodes; i++) {
1421 xorNode->params[2 * (nWndNodes + i) + 0] =
1422 rodNodes[i].params[0];
1423 xorNode->params[2 * (nWndNodes + i) + 1] =
1424 rodNodes[i].params[1];
1425 }
1426
1427 xorNode->params[2 * (nWndNodes + nRodNodes)].p = raidPtr;
1428
1429
1430
1431
1432
1433
1434
1435 if (allowBufferRecycle) {
1436 for (i = 0; i < nRodNodes; i++)
1437 if (((RF_PhysDiskAddr_t *) rodNodes[i].params[0].p)
1438 ->numSector == raidPtr->Layout.sectorsPerStripeUnit)
1439 break;
1440 }
1441 if ((!allowBufferRecycle) || (i == nRodNodes)) {
1442 RF_CallocAndAdd(xorNode->results[0], 1,
1443 rf_RaidAddressToByte(raidPtr,
1444 raidPtr->Layout.sectorsPerStripeUnit),
1445 (void *), allocList);
1446 } else
1447 xorNode->results[0] = rodNodes[i].params[1].p;
1448
1449
1450 rf_InitNode(wnpNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1451 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1452 dag_h, "Wnp", allocList);
1453 wnpNode->params[0].p = asmap->parityInfo;
1454 wnpNode->params[1].p = xorNode->results[0];
1455 wnpNode->params[2].v = parityStripeID;
1456 wnpNode->params[3].v =
1457 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1458
1459 RF_ASSERT(asmap->parityInfo->next == NULL);
1460
1461 if (nfaults == 2) {
1462
1463
1464
1465
1466
1467
1468 RF_CallocAndAdd(xorNode->results[1], 1,
1469 rf_RaidAddressToByte(raidPtr,
1470 raidPtr->Layout.sectorsPerStripeUnit),
1471 (void *), allocList);
1472 rf_InitNode(wnqNode, rf_wait, RF_FALSE, rf_DiskWriteFunc,
1473 rf_DiskWriteUndoFunc, rf_GenericWakeupFunc, 1, 1, 4, 0,
1474 dag_h, "Wnq", allocList);
1475 wnqNode->params[0].p = asmap->qInfo;
1476 wnqNode->params[1].p = xorNode->results[1];
1477 wnqNode->params[2].v = parityStripeID;
1478 wnqNode->params[3].v =
1479 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1480
1481 RF_ASSERT(asmap->parityInfo->next == NULL);
1482 }
1483
1484
1485
1486
1487 RF_ASSERT(blockNode->numAntecedents == 0);
1488 dag_h->succedents[0] = blockNode;
1489
1490 if (nRodNodes > 0) {
1491
1492 RF_ASSERT(blockNode->numSuccedents == nRodNodes);
1493 RF_ASSERT(syncNode->numAntecedents == nRodNodes);
1494 for (i = 0; i < nRodNodes; i++) {
1495 RF_ASSERT(rodNodes[i].numAntecedents == 1);
1496 blockNode->succedents[i] = &rodNodes[i];
1497 rodNodes[i].antecedents[0] = blockNode;
1498 rodNodes[i].antType[0] = rf_control;
1499
1500
1501 RF_ASSERT(rodNodes[i].numSuccedents == 1);
1502 rodNodes[i].succedents[0] = syncNode;
1503 syncNode->antecedents[i] = &rodNodes[i];
1504 syncNode->antType[i] = rf_trueData;
1505 }
1506 } else {
1507
1508 RF_ASSERT(blockNode->numSuccedents == 1);
1509 RF_ASSERT(syncNode->numAntecedents == 1);
1510 blockNode->succedents[0] = syncNode;
1511 syncNode->antecedents[0] = blockNode;
1512 syncNode->antType[0] = rf_control;
1513 }
1514
1515
1516 RF_ASSERT(syncNode->numSuccedents == (1 + nWndNodes));
1517 for (i = 0; i < nWndNodes; i++) {
1518 RF_ASSERT(wndNodes->numAntecedents == 1);
1519 syncNode->succedents[i] = &wndNodes[i];
1520 wndNodes[i].antecedents[0] = syncNode;
1521 wndNodes[i].antType[0] = rf_control;
1522 }
1523
1524
1525 RF_ASSERT(xorNode->numAntecedents == 1);
1526 syncNode->succedents[nWndNodes] = xorNode;
1527 xorNode->antecedents[0] = syncNode;
1528 xorNode->antType[0] = rf_control;
1529
1530
1531 RF_ASSERT(xorNode->numSuccedents == nfaults);
1532 RF_ASSERT(wnpNode->numAntecedents == 1);
1533 xorNode->succedents[0] = wnpNode;
1534 wnpNode->antecedents[0] = xorNode;
1535 wnpNode->antType[0] = rf_trueData;
1536 if (nfaults == 2) {
1537 RF_ASSERT(wnqNode->numAntecedents == 1);
1538 xorNode->succedents[1] = wnqNode;
1539 wnqNode->antecedents[0] = xorNode;
1540 wnqNode->antType[0] = rf_trueData;
1541 }
1542
1543 RF_ASSERT(termNode->numAntecedents == nWndNodes + nfaults);
1544 RF_ASSERT(termNode->numSuccedents == 0);
1545 for (i = 0; i < nWndNodes; i++) {
1546 RF_ASSERT(wndNodes->numSuccedents == 1);
1547 wndNodes[i].succedents[0] = termNode;
1548 termNode->antecedents[i] = &wndNodes[i];
1549 termNode->antType[i] = rf_control;
1550 }
1551 RF_ASSERT(wnpNode->numSuccedents == 1);
1552 wnpNode->succedents[0] = termNode;
1553 termNode->antecedents[nWndNodes] = wnpNode;
1554 termNode->antType[nWndNodes] = rf_control;
1555 if (nfaults == 2) {
1556 RF_ASSERT(wnqNode->numSuccedents == 1);
1557 wnqNode->succedents[0] = termNode;
1558 termNode->antecedents[nWndNodes + 1] = wnqNode;
1559 termNode->antType[nWndNodes + 1] = rf_control;
1560 }
1561 }
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597 void
1598 rf_CommonCreateSmallWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
1599 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
1600 RF_AllocListElem_t *allocList, RF_RedFuncs_t *pfuncs, RF_RedFuncs_t *qfuncs)
1601 {
1602 RF_DagNode_t *readDataNodes, *readParityNodes, *readQNodes, *termNode;
1603 RF_DagNode_t *unlockDataNodes, *unlockParityNodes, *unlockQNodes;
1604 RF_DagNode_t *xorNodes, *qNodes, *blockNode, *nodes;
1605 RF_DagNode_t *writeDataNodes, *writeParityNodes, *writeQNodes;
1606 int i, j, nNodes, totalNumNodes, lu_flag;
1607 RF_ReconUnitNum_t which_ru;
1608 int (*func) (RF_DagNode_t *);
1609 int (*undoFunc) (RF_DagNode_t *);
1610 int (*qfunc) (RF_DagNode_t *);
1611 int numDataNodes, numParityNodes;
1612 RF_StripeNum_t parityStripeID;
1613 RF_PhysDiskAddr_t *pda;
1614 char *name, *qname;
1615 long nfaults;
1616
1617 nfaults = qfuncs ? 2 : 1;
1618 lu_flag = (rf_enableAtomicRMW) ? 1 : 0;
1619
1620 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
1621 asmap->raidAddress, &which_ru);
1622 pda = asmap->physInfo;
1623 numDataNodes = asmap->numStripeUnitsAccessed;
1624 numParityNodes = (asmap->parityInfo->next) ? 2 : 1;
1625
1626 if (rf_dagDebug)
1627 printf("[Creating small-write DAG]\n");
1628 RF_ASSERT(numDataNodes > 0);
1629 dag_h->creator = "SmallWriteDAGFwd";
1630
1631 dag_h->numCommitNodes = 0;
1632 dag_h->numCommits = 0;
1633 dag_h->numSuccedents = 1;
1634
1635 qfunc = NULL;
1636 qname = NULL;
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654 totalNumNodes = (2 * numDataNodes) + (nfaults * numParityNodes)
1655 + (nfaults * 2 * numParityNodes) + 2;
1656 if (lu_flag)
1657 totalNumNodes += (numDataNodes + (nfaults * numParityNodes));
1658
1659
1660
1661 RF_CallocAndAdd(nodes, totalNumNodes, sizeof(RF_DagNode_t),
1662 (RF_DagNode_t *), allocList);
1663 i = 0;
1664 blockNode = &nodes[i];
1665 i += 1;
1666 readDataNodes = &nodes[i];
1667 i += numDataNodes;
1668 readParityNodes = &nodes[i];
1669 i += numParityNodes;
1670 writeDataNodes = &nodes[i];
1671 i += numDataNodes;
1672 writeParityNodes = &nodes[i];
1673 i += numParityNodes;
1674 xorNodes = &nodes[i];
1675 i += numParityNodes;
1676 termNode = &nodes[i];
1677 i += 1;
1678 if (lu_flag) {
1679 unlockDataNodes = &nodes[i];
1680 i += numDataNodes;
1681 unlockParityNodes = &nodes[i];
1682 i += numParityNodes;
1683 } else {
1684 unlockDataNodes = unlockParityNodes = NULL;
1685 }
1686 if (nfaults == 2) {
1687 readQNodes = &nodes[i];
1688 i += numParityNodes;
1689 writeQNodes = &nodes[i];
1690 i += numParityNodes;
1691 qNodes = &nodes[i];
1692 i += numParityNodes;
1693 if (lu_flag) {
1694 unlockQNodes = &nodes[i];
1695 i += numParityNodes;
1696 } else {
1697 unlockQNodes = NULL;
1698 }
1699 } else {
1700 readQNodes = writeQNodes = qNodes = unlockQNodes = NULL;
1701 }
1702 RF_ASSERT(i == totalNumNodes);
1703
1704
1705
1706 nNodes = numDataNodes + (nfaults * numParityNodes);
1707 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
1708 rf_NullNodeUndoFunc, NULL, nNodes, 0, 0, 0, dag_h,
1709 "Nil", allocList);
1710
1711
1712 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
1713 rf_TerminateUndoFunc, NULL, 0, nNodes, 0, 0, dag_h,
1714 "Trm", allocList);
1715
1716
1717 for (i = 0; i < numDataNodes; i++) {
1718 rf_InitNode(&readDataNodes[i], rf_wait, RF_FALSE,
1719 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1720 (numParityNodes * nfaults) + 1, 1, 4, 0, dag_h,
1721 "Rod", allocList);
1722 RF_ASSERT(pda != NULL);
1723
1724 readDataNodes[i].params[0].p = pda;
1725
1726 readDataNodes[i].params[1].p = rf_AllocBuffer(raidPtr, dag_h,
1727 pda, allocList);
1728 readDataNodes[i].params[2].v = parityStripeID;
1729 readDataNodes[i].params[3].v =
1730 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1731 lu_flag, 0, which_ru);
1732 pda = pda->next;
1733 for (j = 0; j < readDataNodes[i].numSuccedents; j++)
1734 readDataNodes[i].propList[j] = NULL;
1735 }
1736
1737
1738 pda = asmap->parityInfo;
1739 i = 0;
1740 for (i = 0; i < numParityNodes; i++) {
1741 RF_ASSERT(pda != NULL);
1742 rf_InitNode(&readParityNodes[i], rf_wait, RF_FALSE,
1743 rf_DiskReadFunc, rf_DiskReadUndoFunc, rf_GenericWakeupFunc,
1744 numParityNodes, 1, 4, 0, dag_h, "Rop", allocList);
1745 readParityNodes[i].params[0].p = pda;
1746
1747 readParityNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1748 dag_h, pda, allocList);
1749 readParityNodes[i].params[2].v = parityStripeID;
1750 readParityNodes[i].params[3].v =
1751 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1752 lu_flag, 0, which_ru);
1753 for (j = 0; j < readParityNodes[i].numSuccedents; j++)
1754 readParityNodes[i].propList[0] = NULL;
1755 pda = pda->next;
1756 }
1757
1758
1759 if (nfaults == 2) {
1760 pda = asmap->qInfo;
1761 for (i = 0; i < numParityNodes; i++) {
1762 RF_ASSERT(pda != NULL);
1763 rf_InitNode(&readQNodes[i], rf_wait, RF_FALSE,
1764 rf_DiskReadFunc, rf_DiskReadUndoFunc,
1765 rf_GenericWakeupFunc, numParityNodes, 1, 4, 0,
1766 dag_h, "Roq", allocList);
1767 readQNodes[i].params[0].p = pda;
1768
1769 readQNodes[i].params[1].p = rf_AllocBuffer(raidPtr,
1770 dag_h, pda, allocList);
1771 readQNodes[i].params[2].v = parityStripeID;
1772 readQNodes[i].params[3].v =
1773 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1774 lu_flag, 0, which_ru);
1775 for (j = 0; j < readQNodes[i].numSuccedents; j++)
1776 readQNodes[i].propList[0] = NULL;
1777 pda = pda->next;
1778 }
1779 }
1780
1781 pda = asmap->physInfo;
1782 for (i = 0; i < numDataNodes; i++) {
1783 RF_ASSERT(pda != NULL);
1784 rf_InitNode(&writeDataNodes[i], rf_wait, RF_FALSE,
1785 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1786 rf_GenericWakeupFunc, 1, 1, 4, 0,
1787 dag_h, "Wnd", allocList);
1788
1789 writeDataNodes[i].params[0].p = pda;
1790
1791 writeDataNodes[i].params[1].p = pda->bufPtr;
1792 writeDataNodes[i].params[2].v = parityStripeID;
1793 writeDataNodes[i].params[3].v =
1794 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1795
1796 if (lu_flag) {
1797
1798 rf_InitNode(&unlockDataNodes[i], rf_wait, RF_FALSE,
1799 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1800 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1801 "Und", allocList);
1802
1803 unlockDataNodes[i].params[0].p = pda;
1804 unlockDataNodes[i].params[1].v =
1805 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1806 0, lu_flag, which_ru);
1807 }
1808 pda = pda->next;
1809 }
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822 if ((numParityNodes == 2) || ((numDataNodes == 1) &&
1823 (asmap->totalSectorsAccessed <
1824 raidPtr->Layout.sectorsPerStripeUnit))) {
1825 func = pfuncs->simple;
1826 undoFunc = rf_NullNodeUndoFunc;
1827 name = pfuncs->SimpleName;
1828 if (qfuncs) {
1829 qfunc = qfuncs->simple;
1830 qname = qfuncs->SimpleName;
1831 }
1832 } else {
1833 func = pfuncs->regular;
1834 undoFunc = rf_NullNodeUndoFunc;
1835 name = pfuncs->RegularName;
1836 if (qfuncs) {
1837 qfunc = qfuncs->regular;
1838 qname = qfuncs->RegularName;
1839 }
1840 }
1841
1842
1843
1844
1845 if (numParityNodes == 2) {
1846 for (i = 0; i < numParityNodes; i++) {
1847
1848 rf_InitNode(&xorNodes[i], rf_wait, RF_FALSE, func,
1849 undoFunc, NULL, numParityNodes, numParityNodes +
1850 numDataNodes, 7, 1, dag_h, name, allocList);
1851 xorNodes[i].flags |= RF_DAGNODE_FLAG_YIELD;
1852 xorNodes[i].params[0] = readDataNodes[i].params[0];
1853 xorNodes[i].params[1] = readDataNodes[i].params[1];
1854 xorNodes[i].params[2] = readParityNodes[i].params[0];
1855 xorNodes[i].params[3] = readParityNodes[i].params[1];
1856 xorNodes[i].params[4] = writeDataNodes[i].params[0];
1857 xorNodes[i].params[5] = writeDataNodes[i].params[1];
1858 xorNodes[i].params[6].p = raidPtr;
1859
1860 xorNodes[i].results[0] = readParityNodes[i].params[1].p;
1861 if (nfaults == 2) {
1862
1863 rf_InitNode(&qNodes[i], rf_wait, RF_FALSE,
1864 qfunc, undoFunc, NULL, numParityNodes,
1865 numParityNodes + numDataNodes, 7, 1,
1866 dag_h, qname, allocList);
1867 qNodes[i].params[0] =
1868 readDataNodes[i].params[0];
1869 qNodes[i].params[1] =
1870 readDataNodes[i].params[1];
1871 qNodes[i].params[2] = readQNodes[i].params[0];
1872 qNodes[i].params[3] = readQNodes[i].params[1];
1873 qNodes[i].params[4] =
1874 writeDataNodes[i].params[0];
1875 qNodes[i].params[5] =
1876 writeDataNodes[i].params[1];
1877 qNodes[i].params[6].p = raidPtr;
1878
1879 qNodes[i].results[0] =
1880 readQNodes[i].params[1].p;
1881 }
1882 }
1883 } else {
1884
1885 rf_InitNode(&xorNodes[0], rf_wait, RF_FALSE, func, undoFunc,
1886 NULL, numParityNodes, numParityNodes + numDataNodes,
1887 (2 * (numDataNodes + numDataNodes + 1) + 1), 1, dag_h,
1888 name, allocList);
1889 xorNodes[0].flags |= RF_DAGNODE_FLAG_YIELD;
1890 for (i = 0; i < numDataNodes + 1; i++) {
1891
1892 xorNodes[0].params[2 * i + 0] =
1893 readDataNodes[i].params[0];
1894 xorNodes[0].params[2 * i + 1] =
1895 readDataNodes[i].params[1];
1896 }
1897 for (i = 0; i < numDataNodes; i++) {
1898
1899 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 0] =
1900 writeDataNodes[i].params[0];
1901 xorNodes[0].params[2 * (numDataNodes + 1 + i) + 1] =
1902 writeDataNodes[i].params[1];
1903 }
1904 xorNodes[0].params[2 * (numDataNodes + numDataNodes + 1)].p =
1905 raidPtr;
1906 xorNodes[0].results[0] = readParityNodes[0].params[1].p;
1907 if (nfaults == 2) {
1908 rf_InitNode(&qNodes[0], rf_wait, RF_FALSE, qfunc,
1909 undoFunc, NULL, numParityNodes,
1910 numParityNodes + numDataNodes,
1911 (2 * (numDataNodes + numDataNodes + 1) + 1),
1912 1, dag_h, qname, allocList);
1913 for (i = 0; i < numDataNodes; i++) {
1914
1915
1916 qNodes[0].params[2 * i + 0] =
1917 readDataNodes[i].params[0];
1918
1919 qNodes[0].params[2 * i + 1] =
1920 readDataNodes[i].params[1];
1921 }
1922
1923 qNodes[0].params[2 * numDataNodes + 0] =
1924 readQNodes[0].params[0];
1925 qNodes[0].params[2 * numDataNodes + 1] =
1926 readQNodes[0].params[1];
1927 for (i = 0; i < numDataNodes; i++) {
1928
1929
1930 qNodes[0].params
1931 [2 * (numDataNodes + 1 + i) + 0] =
1932 writeDataNodes[i].params[0];
1933
1934 qNodes[0].params
1935 [2 * (numDataNodes + 1 + i) + 1] =
1936 writeDataNodes[i].params[1];
1937 }
1938
1939 qNodes[0].params
1940 [2 * (numDataNodes + numDataNodes + 1)].p =
1941 raidPtr;
1942 qNodes[0].results[0] = readQNodes[0].params[1].p;
1943 }
1944 }
1945
1946
1947 pda = asmap->parityInfo;
1948 for (i = 0; i < numParityNodes; i++) {
1949 rf_InitNode(&writeParityNodes[i], rf_wait, RF_FALSE,
1950 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1951 rf_GenericWakeupFunc, 1, numParityNodes,
1952 4, 0, dag_h, "Wnp", allocList);
1953 RF_ASSERT(pda != NULL);
1954
1955 writeParityNodes[i].params[0].p = pda;
1956
1957 writeParityNodes[i].params[1].p = xorNodes[i].results[0];
1958 writeParityNodes[i].params[2].v = parityStripeID;
1959 writeParityNodes[i].params[3].v =
1960 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY, 0, 0, which_ru);
1961
1962 if (lu_flag) {
1963
1964 rf_InitNode(&unlockParityNodes[i], rf_wait, RF_FALSE,
1965 rf_DiskUnlockFunc, rf_DiskUnlockUndoFunc,
1966 rf_GenericWakeupFunc, 1, 1, 2, 0, dag_h,
1967 "Unp", allocList);
1968 unlockParityNodes[i].params[0].p =
1969 pda;
1970 unlockParityNodes[i].params[1].v =
1971 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1972 0, lu_flag, which_ru);
1973 }
1974 pda = pda->next;
1975 }
1976
1977
1978 if (nfaults == 2) {
1979 pda = asmap->qInfo;
1980 for (i = 0; i < numParityNodes; i++) {
1981 rf_InitNode(&writeQNodes[i], rf_wait, RF_FALSE,
1982 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
1983 rf_GenericWakeupFunc, 1, numParityNodes,
1984 4, 0, dag_h, "Wnq", allocList);
1985 RF_ASSERT(pda != NULL);
1986
1987 writeQNodes[i].params[0].p = pda;
1988
1989 writeQNodes[i].params[1].p = qNodes[i].results[0];
1990 writeQNodes[i].params[2].v = parityStripeID;
1991 writeQNodes[i].params[3].v =
1992 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
1993 0, 0, which_ru);
1994
1995 if (lu_flag) {
1996
1997 rf_InitNode(&unlockQNodes[i], rf_wait,
1998 RF_FALSE, rf_DiskUnlockFunc,
1999 rf_DiskUnlockUndoFunc,
2000 rf_GenericWakeupFunc, 1, 1, 2, 0,
2001 dag_h, "Unq", allocList);
2002
2003 unlockQNodes[i].params[0].p = pda;
2004 unlockQNodes[i].params[1].v =
2005 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2006 0, lu_flag, which_ru);
2007 }
2008 pda = pda->next;
2009 }
2010 }
2011
2012
2013
2014 dag_h->succedents[0] = blockNode;
2015
2016
2017 RF_ASSERT(blockNode->numSuccedents ==
2018 (numDataNodes + (numParityNodes * nfaults)));
2019 for (i = 0; i < numDataNodes; i++) {
2020 blockNode->succedents[i] = &readDataNodes[i];
2021 RF_ASSERT(readDataNodes[i].numAntecedents == 1);
2022 readDataNodes[i].antecedents[0] = blockNode;
2023 readDataNodes[i].antType[0] = rf_control;
2024 }
2025
2026
2027 for (i = 0; i < numParityNodes; i++) {
2028 blockNode->succedents[numDataNodes + i] = &readParityNodes[i];
2029 RF_ASSERT(readParityNodes[i].numAntecedents == 1);
2030 readParityNodes[i].antecedents[0] = blockNode;
2031 readParityNodes[i].antType[0] = rf_control;
2032 }
2033
2034
2035 if (nfaults == 2)
2036 for (i = 0; i < numParityNodes; i++) {
2037 blockNode->succedents[numDataNodes +
2038 numParityNodes + i] = &readQNodes[i];
2039 RF_ASSERT(readQNodes[i].numAntecedents == 1);
2040 readQNodes[i].antecedents[0] = blockNode;
2041 readQNodes[i].antType[0] = rf_control;
2042 }
2043
2044
2045 for (i = 0; i < numDataNodes; i++) {
2046 RF_ASSERT(readDataNodes[i].numSuccedents ==
2047 ((nfaults * numParityNodes) + 1));
2048 RF_ASSERT(writeDataNodes[i].numAntecedents == 1);
2049 readDataNodes[i].succedents[0] = &writeDataNodes[i];
2050 writeDataNodes[i].antecedents[0] = &readDataNodes[i];
2051 writeDataNodes[i].antType[0] = rf_antiData;
2052 }
2053
2054
2055 for (i = 0; i < numDataNodes; i++) {
2056 for (j = 0; j < numParityNodes; j++) {
2057 RF_ASSERT(xorNodes[j].numAntecedents ==
2058 numDataNodes + numParityNodes);
2059 readDataNodes[i].succedents[1 + j] = &xorNodes[j];
2060 xorNodes[j].antecedents[i] = &readDataNodes[i];
2061 xorNodes[j].antType[i] = rf_trueData;
2062 }
2063 }
2064
2065
2066 if (nfaults == 2)
2067 for (i = 0; i < numDataNodes; i++)
2068 for (j = 0; j < numParityNodes; j++) {
2069 RF_ASSERT(qNodes[j].numAntecedents ==
2070 numDataNodes + numParityNodes);
2071 readDataNodes[i].succedents
2072 [1 + numParityNodes + j] = &qNodes[j];
2073 qNodes[j].antecedents[i] = &readDataNodes[i];
2074 qNodes[j].antType[i] = rf_trueData;
2075 }
2076
2077
2078 for (i = 0; i < numParityNodes; i++) {
2079 for (j = 0; j < numParityNodes; j++) {
2080 RF_ASSERT(readParityNodes[i].numSuccedents ==
2081 numParityNodes);
2082 readParityNodes[i].succedents[j] = &xorNodes[j];
2083 xorNodes[j].antecedents[numDataNodes + i] =
2084 &readParityNodes[i];
2085 xorNodes[j].antType[numDataNodes + i] = rf_trueData;
2086 }
2087 }
2088
2089
2090 if (nfaults == 2)
2091 for (i = 0; i < numParityNodes; i++) {
2092 for (j = 0; j < numParityNodes; j++) {
2093 RF_ASSERT(readQNodes[i].numSuccedents ==
2094 numParityNodes);
2095 readQNodes[i].succedents[j] = &qNodes[j];
2096 qNodes[j].antecedents[numDataNodes + i] =
2097 &readQNodes[i];
2098 qNodes[j].antType[numDataNodes + i] =
2099 rf_trueData;
2100 }
2101 }
2102
2103
2104 for (i = 0; i < numParityNodes; i++) {
2105 RF_ASSERT(writeParityNodes[i].numAntecedents == numParityNodes);
2106 for (j = 0; j < numParityNodes; j++) {
2107 RF_ASSERT(xorNodes[j].numSuccedents == numParityNodes);
2108 xorNodes[i].succedents[j] = &writeParityNodes[j];
2109 writeParityNodes[j].antecedents[i] = &xorNodes[i];
2110 writeParityNodes[j].antType[i] = rf_trueData;
2111 }
2112 }
2113
2114
2115 if (nfaults == 2)
2116 for (i = 0; i < numParityNodes; i++) {
2117 RF_ASSERT(writeQNodes[i].numAntecedents ==
2118 numParityNodes);
2119 for (j = 0; j < numParityNodes; j++) {
2120 RF_ASSERT(qNodes[j].numSuccedents == 1);
2121 qNodes[i].succedents[j] = &writeQNodes[j];
2122 writeQNodes[j].antecedents[i] = &qNodes[i];
2123 writeQNodes[j].antType[i] = rf_trueData;
2124 }
2125 }
2126
2127 RF_ASSERT(termNode->numAntecedents ==
2128 (numDataNodes + (nfaults * numParityNodes)));
2129 RF_ASSERT(termNode->numSuccedents == 0);
2130 for (i = 0; i < numDataNodes; i++) {
2131 if (lu_flag) {
2132
2133 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2134 RF_ASSERT(unlockDataNodes[i].numAntecedents == 1);
2135 writeDataNodes[i].succedents[0] = &unlockDataNodes[i];
2136 unlockDataNodes[i].antecedents[0] = &writeDataNodes[i];
2137 unlockDataNodes[i].antType[0] = rf_control;
2138
2139
2140 RF_ASSERT(unlockDataNodes[i].numSuccedents == 1);
2141 unlockDataNodes[i].succedents[0] = termNode;
2142 termNode->antecedents[i] = &unlockDataNodes[i];
2143 termNode->antType[i] = rf_control;
2144 } else {
2145
2146 RF_ASSERT(writeDataNodes[i].numSuccedents == 1);
2147 RF_ASSERT(termNode->numAntecedents ==
2148 (numDataNodes + (nfaults * numParityNodes)));
2149 writeDataNodes[i].succedents[0] = termNode;
2150 termNode->antecedents[i] = &writeDataNodes[i];
2151 termNode->antType[i] = rf_control;
2152 }
2153 }
2154
2155 for (i = 0; i < numParityNodes; i++) {
2156 if (lu_flag) {
2157
2158 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2159 RF_ASSERT(unlockParityNodes[i].numAntecedents == 1);
2160 writeParityNodes[i].succedents[0] =
2161 &unlockParityNodes[i];
2162 unlockParityNodes[i].antecedents[0] =
2163 &writeParityNodes[i];
2164 unlockParityNodes[i].antType[0] = rf_control;
2165
2166
2167 RF_ASSERT(unlockParityNodes[i].numSuccedents == 1);
2168 unlockParityNodes[i].succedents[0] = termNode;
2169 termNode->antecedents[numDataNodes + i] =
2170 &unlockParityNodes[i];
2171 termNode->antType[numDataNodes + i] = rf_control;
2172 } else {
2173 RF_ASSERT(writeParityNodes[i].numSuccedents == 1);
2174 writeParityNodes[i].succedents[0] = termNode;
2175 termNode->antecedents[numDataNodes + i] =
2176 &writeParityNodes[i];
2177 termNode->antType[numDataNodes + i] = rf_control;
2178 }
2179 }
2180
2181 if (nfaults == 2)
2182 for (i = 0; i < numParityNodes; i++) {
2183 if (lu_flag) {
2184
2185 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2186 RF_ASSERT(unlockQNodes[i].numAntecedents == 1);
2187 writeQNodes[i].succedents[0] = &unlockQNodes[i];
2188 unlockQNodes[i].antecedents[0] =
2189 &writeQNodes[i];
2190 unlockQNodes[i].antType[0] = rf_control;
2191
2192
2193 RF_ASSERT(unlockQNodes[i].numSuccedents == 1);
2194 unlockQNodes[i].succedents[0] = termNode;
2195 termNode->antecedents[numDataNodes +
2196 numParityNodes + i] = &unlockQNodes[i];
2197 termNode->antType[numDataNodes +
2198 numParityNodes + i] = rf_control;
2199 } else {
2200 RF_ASSERT(writeQNodes[i].numSuccedents == 1);
2201 writeQNodes[i].succedents[0] = termNode;
2202 termNode->antecedents[numDataNodes +
2203 numParityNodes + i] = &writeQNodes[i];
2204 termNode->antType[numDataNodes +
2205 numParityNodes + i] = rf_control;
2206 }
2207 }
2208 }
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228 void
2229 rf_CreateRaidOneWriteDAGFwd(RF_Raid_t *raidPtr, RF_AccessStripeMap_t *asmap,
2230 RF_DagHeader_t *dag_h, void *bp, RF_RaidAccessFlags_t flags,
2231 RF_AllocListElem_t *allocList)
2232 {
2233 RF_DagNode_t *blockNode, *unblockNode, *termNode;
2234 RF_DagNode_t *nodes, *wndNode, *wmirNode;
2235 int nWndNodes, nWmirNodes, i;
2236 RF_ReconUnitNum_t which_ru;
2237 RF_PhysDiskAddr_t *pda, *pdaP;
2238 RF_StripeNum_t parityStripeID;
2239
2240 parityStripeID = rf_RaidAddressToParityStripeID(&(raidPtr->Layout),
2241 asmap->raidAddress, &which_ru);
2242 if (rf_dagDebug) {
2243 printf("[Creating RAID level 1 write DAG]\n");
2244 }
2245
2246 nWmirNodes = (asmap->parityInfo->next) ? 2 : 1;
2247 nWndNodes = (asmap->physInfo->next) ? 2 : 1;
2248
2249
2250 if (asmap->numDataFailed == 1)
2251 nWndNodes--;
2252 if (asmap->numParityFailed == 1)
2253 nWmirNodes--;
2254
2255
2256
2257
2258
2259 RF_CallocAndAdd(nodes, nWndNodes + nWmirNodes + 3,
2260 sizeof(RF_DagNode_t), (RF_DagNode_t *), allocList);
2261 i = 0;
2262 wndNode = &nodes[i];
2263 i += nWndNodes;
2264 wmirNode = &nodes[i];
2265 i += nWmirNodes;
2266 blockNode = &nodes[i];
2267 i += 1;
2268 unblockNode = &nodes[i];
2269 i += 1;
2270 termNode = &nodes[i];
2271 i += 1;
2272 RF_ASSERT(i == (nWndNodes + nWmirNodes + 3));
2273
2274
2275 dag_h->numCommitNodes = 0;
2276 dag_h->numCommits = 0;
2277 dag_h->numSuccedents = 1;
2278
2279
2280 rf_InitNode(blockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2281 rf_NullNodeUndoFunc, NULL, (nWndNodes + nWmirNodes),
2282 0, 0, 0, dag_h, "Nil", allocList);
2283 rf_InitNode(unblockNode, rf_wait, RF_FALSE, rf_NullNodeFunc,
2284 rf_NullNodeUndoFunc, NULL, 1, (nWndNodes + nWmirNodes),
2285 0, 0, dag_h, "Nil", allocList);
2286 rf_InitNode(termNode, rf_wait, RF_FALSE, rf_TerminateFunc,
2287 rf_TerminateUndoFunc, NULL, 0, 1, 0, 0, dag_h, "Trm", allocList);
2288
2289
2290 if (nWndNodes > 0) {
2291 pda = asmap->physInfo;
2292 for (i = 0; i < nWndNodes; i++) {
2293 rf_InitNode(&wndNode[i], rf_wait, RF_FALSE,
2294 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2295 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2296 "Wpd", allocList);
2297 RF_ASSERT(pda != NULL);
2298 wndNode[i].params[0].p = pda;
2299 wndNode[i].params[1].p = pda->bufPtr;
2300 wndNode[i].params[2].v = parityStripeID;
2301 wndNode[i].params[3].v =
2302 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2303 0, 0, which_ru);
2304 pda = pda->next;
2305 }
2306 RF_ASSERT(pda == NULL);
2307 }
2308
2309 if (nWmirNodes > 0) {
2310 pda = asmap->physInfo;
2311 pdaP = asmap->parityInfo;
2312 for (i = 0; i < nWmirNodes; i++) {
2313 rf_InitNode(&wmirNode[i], rf_wait, RF_FALSE,
2314 rf_DiskWriteFunc, rf_DiskWriteUndoFunc,
2315 rf_GenericWakeupFunc, 1, 1, 4, 0, dag_h,
2316 "Wsd", allocList);
2317 RF_ASSERT(pda != NULL);
2318 wmirNode[i].params[0].p = pdaP;
2319 wmirNode[i].params[1].p = pda->bufPtr;
2320 wmirNode[i].params[2].v = parityStripeID;
2321 wmirNode[i].params[3].v =
2322 RF_CREATE_PARAM3(RF_IO_NORMAL_PRIORITY,
2323 0, 0, which_ru);
2324 pda = pda->next;
2325 pdaP = pdaP->next;
2326 }
2327 RF_ASSERT(pda == NULL);
2328 RF_ASSERT(pdaP == NULL);
2329 }
2330
2331 RF_ASSERT(dag_h->numSuccedents == 1);
2332 RF_ASSERT(blockNode->numAntecedents == 0);
2333 dag_h->succedents[0] = blockNode;
2334
2335
2336 RF_ASSERT(blockNode->numSuccedents == (nWndNodes + nWmirNodes));
2337 for (i = 0; i < nWndNodes; i++) {
2338 RF_ASSERT(wndNode[i].numAntecedents == 1);
2339 blockNode->succedents[i] = &wndNode[i];
2340 wndNode[i].antecedents[0] = blockNode;
2341 wndNode[i].antType[0] = rf_control;
2342 }
2343 for (i = 0; i < nWmirNodes; i++) {
2344 RF_ASSERT(wmirNode[i].numAntecedents == 1);
2345 blockNode->succedents[i + nWndNodes] = &wmirNode[i];
2346 wmirNode[i].antecedents[0] = blockNode;
2347 wmirNode[i].antType[0] = rf_control;
2348 }
2349
2350
2351 RF_ASSERT(unblockNode->numAntecedents == (nWndNodes + nWmirNodes));
2352 for (i = 0; i < nWndNodes; i++) {
2353 RF_ASSERT(wndNode[i].numSuccedents == 1);
2354 wndNode[i].succedents[0] = unblockNode;
2355 unblockNode->antecedents[i] = &wndNode[i];
2356 unblockNode->antType[i] = rf_control;
2357 }
2358 for (i = 0; i < nWmirNodes; i++) {
2359 RF_ASSERT(wmirNode[i].numSuccedents == 1);
2360 wmirNode[i].succedents[0] = unblockNode;
2361 unblockNode->antecedents[i + nWndNodes] = &wmirNode[i];
2362 unblockNode->antType[i + nWndNodes] = rf_control;
2363 }
2364
2365
2366 RF_ASSERT(unblockNode->numSuccedents == 1);
2367 RF_ASSERT(termNode->numAntecedents == 1);
2368 RF_ASSERT(termNode->numSuccedents == 0);
2369 unblockNode->succedents[0] = termNode;
2370 termNode->antecedents[0] = unblockNode;
2371 termNode->antType[0] = rf_control;
2372
2373 return;
2374 }