root/dev/raidframe/rf_engine.c

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. rf_ShutdownEngine
  2. rf_ConfigureEngine
  3. rf_BranchDone
  4. rf_NodeReady
  5. rf_FireNode
  6. rf_FireNodeArray
  7. rf_FireNodeList
  8. rf_PropagateResults
  9. rf_ProcessNode
  10. rf_FinishNode
  11. rf_DispatchDAG
  12. rf_DAGExecutionThread_pre
  13. rf_DAGExecutionThread

    1 /*      $OpenBSD: rf_engine.c,v 1.15 2003/04/27 11:22:54 ho Exp $       */
    2 /*      $NetBSD: rf_engine.c,v 1.10 2000/08/20 16:51:03 thorpej Exp $   */
    3 
    4 /*
    5  * Copyright (c) 1995 Carnegie-Mellon University.
    6  * All rights reserved.
    7  *
    8  * Author: William V. Courtright II, Mark Holland, Rachad Youssef
    9  *
   10  * Permission to use, copy, modify and distribute this software and
   11  * its documentation is hereby granted, provided that both the copyright
   12  * notice and this permission notice appear in all copies of the
   13  * software, derivative works or modified versions, and any portions
   14  * thereof, and that both notices appear in supporting documentation.
   15  *
   16  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
   17  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND
   18  * FOR ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
   19  *
   20  * Carnegie Mellon requests users of this software to return to
   21  *
   22  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
   23  *  School of Computer Science
   24  *  Carnegie Mellon University
   25  *  Pittsburgh PA 15213-3890
   26  *
   27  * any improvements or extensions that they make and grant Carnegie the
   28  * rights to redistribute these changes.
   29  */
   30 
   31 /****************************************************************************
   32  *                                                                          *
   33  * engine.c -- Code for DAG execution engine.                               *
   34  *                                                                          *
   35  * Modified to work as follows (holland):                                   *
   36  *   A user-thread calls into DispatchDAG, which fires off the nodes that   *
   37  *   are direct successors to the header node. DispatchDAG then returns,    *
   38  *   and the rest of the I/O continues asynchronously. As each node         *
   39  *   completes, the node execution function calls FinishNode(). FinishNode  *
   40  *   scans the list of successors to the node and increments the antecedent *
   41  *   counts. Each node that becomes enabled is placed on a central node     *
   42  *   queue. A dedicated dag-execution thread grabs nodes off of this        *
   43  *   queue and fires them.                                                  *
   44  *                                                                          *
   45  *   NULL nodes are never fired.                                            *
   46  *                                                                          *
   47  *   Terminator nodes are never fired, but rather cause the callback        *
   48  *   associated with the DAG to be invoked.                                 *
   49  *                                                                          *
   50  *   If a node fails, the dag either rolls forward to the completion or     *
   51  *   rolls back, undoing previously-completed nodes and fails atomically.   *
   52  *   The direction of recovery is determined by the location of the failed  *
   53  *   node in the graph. If the failure occurred before the commit node in   *
   54  *   the graph, backward recovery is used. Otherwise, forward recovery is   *
   55  *   used.                                                                  *
   56  *                                                                          *
   57  ****************************************************************************/
   58 
   59 #include "rf_threadstuff.h"
   60 
   61 #include <sys/errno.h>
   62 
   63 #include "rf_dag.h"
   64 #include "rf_engine.h"
   65 #include "rf_etimer.h"
   66 #include "rf_general.h"
   67 #include "rf_dagutils.h"
   68 #include "rf_shutdown.h"
   69 #include "rf_raid.h"
   70 
   71 int  rf_BranchDone(RF_DagNode_t *);
   72 int  rf_NodeReady(RF_DagNode_t *);
   73 void rf_FireNode(RF_DagNode_t *);
   74 void rf_FireNodeArray(int, RF_DagNode_t **);
   75 void rf_FireNodeList(RF_DagNode_t *);
   76 void rf_PropagateResults(RF_DagNode_t *, int);
   77 void rf_ProcessNode(RF_DagNode_t *, int);
   78 
   79 void rf_DAGExecutionThread(RF_ThreadArg_t);
   80 #ifdef  RAID_AUTOCONFIG
   81 #define RF_ENGINE_PID   10
   82 void rf_DAGExecutionThread_pre(RF_ThreadArg_t);
   83 extern pid_t      lastpid;
   84 #endif  /* RAID_AUTOCONFIG */
   85 void            **rf_hook_cookies;
   86 extern int        numraid;
   87 
   88 #define DO_INIT(_l_,_r_)                                                \
   89 do {                                                                    \
   90         int _rc;                                                        \
   91         _rc = rf_create_managed_mutex(_l_, &(_r_)->node_queue_mutex);   \
   92         if (_rc) {                                                      \
   93                 return(_rc);                                            \
   94         }                                                               \
   95         _rc = rf_create_managed_cond(_l_, &(_r_)->node_queue_cond);     \
   96         if (_rc) {                                                      \
   97                 return(_rc);                                            \
   98         }                                                               \
   99 } while (0)
  100 
  101 /*
  102  * Synchronization primitives for this file. DO_WAIT should be enclosed
  103  * in a while loop.
  104  */
  105 
  106 /*
  107  * XXX Is this spl-ing really necessary ?
  108  */
  109 #define DO_LOCK(_r_)                                                    \
  110 do {                                                                    \
  111         ks = splbio();                                                  \
  112         RF_LOCK_MUTEX((_r_)->node_queue_mutex);                         \
  113 } while (0)
  114 
  115 #define DO_UNLOCK(_r_)                                                  \
  116 do {                                                                    \
  117         RF_UNLOCK_MUTEX((_r_)->node_queue_mutex);                       \
  118         splx(ks);                                                       \
  119 } while (0)
  120 
  121 #define DO_WAIT(_r_)                                                    \
  122         RF_WAIT_COND((_r_)->node_queue, (_r_)->node_queue_mutex)
  123 
  124 /* XXX RF_SIGNAL_COND? */
  125 #define DO_SIGNAL(_r_)                                                  \
  126         RF_BROADCAST_COND((_r_)->node_queue)
  127 
  128 void rf_ShutdownEngine(void *);
  129 
  130 void
  131 rf_ShutdownEngine(void *arg)
  132 {
  133         RF_Raid_t *raidPtr;
  134 
  135         raidPtr = (RF_Raid_t *) arg;
  136         raidPtr->shutdown_engine = 1;
  137         DO_SIGNAL(raidPtr);
  138 }
  139 
  140 int
  141 rf_ConfigureEngine(RF_ShutdownList_t **listp, RF_Raid_t *raidPtr,
  142     RF_Config_t *cfgPtr)
  143 {
  144         int rc;
  145         char raidname[16];
  146 
  147         DO_INIT(listp, raidPtr);
  148 
  149         raidPtr->node_queue = NULL;
  150         raidPtr->dags_in_flight = 0;
  151 
  152         rc = rf_init_managed_threadgroup(listp, &raidPtr->engine_tg);
  153         if (rc)
  154                 return (rc);
  155 
  156         /*
  157          * We create the execution thread only once per system boot. No need
  158          * to check return code b/c the kernel panics if it can't create the
  159          * thread.
  160          */
  161         if (rf_engineDebug) {
  162                 printf("raid%d: %s engine thread\n", raidPtr->raidid,
  163                     (initproc)?"Starting":"Creating");
  164         }
  165         if (rf_hook_cookies == NULL) {
  166                 rf_hook_cookies =
  167                     malloc(numraid * sizeof(void *),
  168                            M_RAIDFRAME, M_NOWAIT);
  169                 if (rf_hook_cookies == NULL)
  170                         return (ENOMEM);
  171                 bzero(rf_hook_cookies, numraid * sizeof(void *));
  172         }
  173 #ifdef  RAID_AUTOCONFIG
  174         if (initproc == NULL) {
  175                 rf_hook_cookies[raidPtr->raidid] =
  176                         startuphook_establish(rf_DAGExecutionThread_pre,
  177                             raidPtr);
  178         } else {
  179 #endif  /* RAID_AUTOCONFIG */
  180                 snprintf(&raidname[0], 16, "raid%d", raidPtr->raidid);
  181                 if (RF_CREATE_THREAD(raidPtr->engine_thread,
  182                     rf_DAGExecutionThread, raidPtr, &raidname[0])) {
  183                         RF_ERRORMSG("RAIDFRAME: Unable to start engine"
  184                             " thread\n");
  185                         return (ENOMEM);
  186                 }
  187                 if (rf_engineDebug) {
  188                         printf("raid%d: Engine thread started\n",
  189                             raidPtr->raidid);
  190                 }
  191                 RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
  192 #ifdef  RAID_AUTOCONFIG
  193         }
  194 #endif
  195         /* XXX Something is missing here... */
  196 #ifdef  debug
  197         printf("Skipping the WAIT_START !!!\n");
  198 #endif
  199         /* Engine thread is now running and waiting for work. */
  200         if (rf_engineDebug) {
  201                 printf("raid%d: Engine thread running and waiting for events\n",
  202                     raidPtr->raidid);
  203         }
  204         rc = rf_ShutdownCreate(listp, rf_ShutdownEngine, raidPtr);
  205         if (rc) {
  206                 RF_ERRORMSG3("Unable to add to shutdown list file %s line %d"
  207                     " rc=%d\n", __FILE__, __LINE__, rc);
  208                 rf_ShutdownEngine(NULL);
  209         }
  210         return (rc);
  211 }
  212 
  213 int
  214 rf_BranchDone(RF_DagNode_t *node)
  215 {
  216         int i;
  217 
  218         /*
  219          * Return true if forward execution is completed for a node and it's
  220          * succedents.
  221          */
  222         switch (node->status) {
  223         case rf_wait:
  224                 /* Should never be called in this state. */
  225                 RF_PANIC();
  226                 break;
  227         case rf_fired:
  228                 /* Node is currently executing, so we're not done. */
  229                 return (RF_FALSE);
  230         case rf_good:
  231                 /* For each succedent. */
  232                 for (i = 0; i < node->numSuccedents; i++)
  233                         /* Recursively check branch. */
  234                         if (!rf_BranchDone(node->succedents[i]))
  235                                 return RF_FALSE;
  236 
  237                 return RF_TRUE; /*
  238                                  * Node and all succedent branches aren't in
  239                                  * fired state.
  240                                  */
  241                 break;
  242         case rf_bad:
  243                 /* Succedents can't fire. */
  244                 return (RF_TRUE);
  245         case rf_recover:
  246                 /* Should never be called in this state. */
  247                 RF_PANIC();
  248                 break;
  249         case rf_undone:
  250         case rf_panic:
  251                 /* XXX Need to fix this case. */
  252                 /* For now, assume that we're done. */
  253                 return (RF_TRUE);
  254                 break;
  255         default:
  256                 /* Illegal node status. */
  257                 RF_PANIC();
  258                 break;
  259         }
  260 }
  261 
  262 int
  263 rf_NodeReady(RF_DagNode_t *node)
  264 {
  265         int ready;
  266 
  267         switch (node->dagHdr->status) {
  268         case rf_enable:
  269         case rf_rollForward:
  270                 if ((node->status == rf_wait) &&
  271                     (node->numAntecedents == node->numAntDone))
  272                         ready = RF_TRUE;
  273                 else
  274                         ready = RF_FALSE;
  275                 break;
  276         case rf_rollBackward:
  277                 RF_ASSERT(node->numSuccDone <= node->numSuccedents);
  278                 RF_ASSERT(node->numSuccFired <= node->numSuccedents);
  279                 RF_ASSERT(node->numSuccFired <= node->numSuccDone);
  280                 if ((node->status == rf_good) &&
  281                     (node->numSuccDone == node->numSuccedents))
  282                         ready = RF_TRUE;
  283                 else
  284                         ready = RF_FALSE;
  285                 break;
  286         default:
  287                 printf("Execution engine found illegal DAG status"
  288                     " in rf_NodeReady\n");
  289                 RF_PANIC();
  290                 break;
  291         }
  292 
  293         return (ready);
  294 }
  295 
  296 
  297 /*
  298  * User context and dag-exec-thread context:
  299  * Fire a node. The node's status field determines which function, do or undo,
  300  * to be fired.
  301  * This routine assumes that the node's status field has alread been set to
  302  * "fired" or "recover" to indicate the direction of execution.
  303  */
  304 void
  305 rf_FireNode(RF_DagNode_t *node)
  306 {
  307         switch (node->status) {
  308         case rf_fired:
  309                 /* Fire the do function of a node. */
  310                 if (rf_engineDebug>1) {
  311                         printf("raid%d: Firing node 0x%lx (%s)\n",
  312                             node->dagHdr->raidPtr->raidid,
  313                             (unsigned long) node, node->name);
  314                 }
  315                 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
  316 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
  317                         /* thread_block(); */
  318                         /* printf("Need to block the thread here...\n");  */
  319                         /*
  320                          * XXX thread_block is actually mentioned in
  321                          * /usr/include/vm/vm_extern.h
  322                          */
  323 #else
  324                         thread_block();
  325 #endif
  326                 }
  327                 (*(node->doFunc)) (node);
  328                 break;
  329         case rf_recover:
  330                 /* Fire the undo function of a node. */
  331                 if (rf_engineDebug>1) {
  332                         printf("raid%d: Firing (undo) node 0x%lx (%s)\n",
  333                             node->dagHdr->raidPtr->raidid,
  334                             (unsigned long) node, node->name);
  335                 }
  336                 if (node->flags & RF_DAGNODE_FLAG_YIELD) {
  337 #if (defined(__NetBSD__) || defined(__OpenBSD__)) && defined(_KERNEL)
  338                         /* thread_block(); */
  339                         /* printf("Need to block the thread here...\n"); */
  340                         /*
  341                          * XXX thread_block is actually mentioned in
  342                          * /usr/include/vm/vm_extern.h
  343                          */
  344 #else
  345                         thread_block();
  346 #endif
  347                 }
  348                 (*(node->undoFunc)) (node);
  349                 break;
  350         default:
  351                 RF_PANIC();
  352                 break;
  353         }
  354 }
  355 
  356 
  357 /*
  358  * User context:
  359  * Attempt to fire each node in a linear array.
  360  * The entire list is fired atomically.
  361  */
  362 void
  363 rf_FireNodeArray(int numNodes, RF_DagNode_t **nodeList)
  364 {
  365         RF_DagStatus_t dstat;
  366         RF_DagNode_t *node;
  367         int i, j;
  368 
  369         /* First, mark all nodes which are ready to be fired. */
  370         for (i = 0; i < numNodes; i++) {
  371                 node = nodeList[i];
  372                 dstat = node->dagHdr->status;
  373                 RF_ASSERT((node->status == rf_wait) ||
  374                     (node->status == rf_good));
  375                 if (rf_NodeReady(node)) {
  376                         if ((dstat == rf_enable) || (dstat == rf_rollForward)) {
  377                                 RF_ASSERT(node->status == rf_wait);
  378                                 if (node->commitNode)
  379                                         node->dagHdr->numCommits++;
  380                                 node->status = rf_fired;
  381                                 for (j = 0; j < node->numAntecedents; j++)
  382                                         node->antecedents[j]->numSuccFired++;
  383                         } else {
  384                                 RF_ASSERT(dstat == rf_rollBackward);
  385                                 RF_ASSERT(node->status == rf_good);
  386                                 /* Only one commit node per graph. */
  387                                 RF_ASSERT(node->commitNode == RF_FALSE);
  388                                 node->status = rf_recover;
  389                         }
  390                 }
  391         }
  392         /* Now, fire the nodes. */
  393         for (i = 0; i < numNodes; i++) {
  394                 if ((nodeList[i]->status == rf_fired) ||
  395                     (nodeList[i]->status == rf_recover))
  396                         rf_FireNode(nodeList[i]);
  397         }
  398 }
  399 
  400 
  401 /*
  402  * User context:
  403  * Attempt to fire each node in a linked list.
  404  * The entire list is fired atomically.
  405  */
  406 void
  407 rf_FireNodeList(RF_DagNode_t *nodeList)
  408 {
  409         RF_DagNode_t *node, *next;
  410         RF_DagStatus_t dstat;
  411         int j;
  412 
  413         if (nodeList) {
  414                 /* First, mark all nodes which are ready to be fired. */
  415                 for (node = nodeList; node; node = next) {
  416                         next = node->next;
  417                         dstat = node->dagHdr->status;
  418                         RF_ASSERT((node->status == rf_wait) ||
  419                             (node->status == rf_good));
  420                         if (rf_NodeReady(node)) {
  421                                 if ((dstat == rf_enable) ||
  422                                     (dstat == rf_rollForward)) {
  423                                         RF_ASSERT(node->status == rf_wait);
  424                                         if (node->commitNode)
  425                                                 node->dagHdr->numCommits++;
  426                                         node->status = rf_fired;
  427                                         for (j = 0; j < node->numAntecedents;
  428                                              j++)
  429                                                 node->antecedents[j]
  430                                                     ->numSuccFired++;
  431                                 } else {
  432                                         RF_ASSERT(dstat == rf_rollBackward);
  433                                         RF_ASSERT(node->status == rf_good);
  434                                         /* Only one commit node per graph. */
  435                                         RF_ASSERT(node->commitNode == RF_FALSE);
  436                                         node->status = rf_recover;
  437                                 }
  438                         }
  439                 }
  440                 /* Now, fire the nodes. */
  441                 for (node = nodeList; node; node = next) {
  442                         next = node->next;
  443                         if ((node->status == rf_fired) ||
  444                             (node->status == rf_recover))
  445                                 rf_FireNode(node);
  446                 }
  447         }
  448 }
  449 
  450 
  451 /*
  452  * Interrupt context:
  453  * For each succedent,
  454  *    propagate required results from node to succedent.
  455  *    increment succedent's numAntDone.
  456  *    place newly-enable nodes on node queue for firing.
  457  *
  458  * To save context switches, we don't place NIL nodes on the node queue,
  459  * but rather just process them as if they had fired. Note that NIL nodes
  460  * that are the direct successors of the header will actually get fired by
  461  * DispatchDAG, which is fine because no context switches are involved.
  462  *
  463  * Important:  when running at user level, this can be called by any
  464  * disk thread, and so the increment and check of the antecedent count
  465  * must be locked. I used the node queue mutex and locked down the
  466  * entire function, but this is certainly overkill.
  467  */
  468 void
  469 rf_PropagateResults(RF_DagNode_t *node, int context)
  470 {
  471         RF_DagNode_t *s, *a;
  472         RF_Raid_t *raidPtr;
  473         int i, ks;
  474         /* A list of NIL nodes to be finished. */
  475         RF_DagNode_t *finishlist = NULL;
  476         /* List of nodes with failed truedata antecedents. */
  477         RF_DagNode_t *skiplist = NULL;
  478         RF_DagNode_t *firelist = NULL;  /* A list of nodes to be fired. */
  479         RF_DagNode_t *q = NULL, *qh = NULL, *next;
  480         int j, skipNode;
  481 
  482         raidPtr = node->dagHdr->raidPtr;
  483 
  484         DO_LOCK(raidPtr);
  485 
  486         /* Debug - validate fire counts. */
  487         for (i = 0; i < node->numAntecedents; i++) {
  488                 a = *(node->antecedents + i);
  489                 RF_ASSERT(a->numSuccFired >= a->numSuccDone);
  490                 RF_ASSERT(a->numSuccFired <= a->numSuccedents);
  491                 a->numSuccDone++;
  492         }
  493 
  494         switch (node->dagHdr->status) {
  495         case rf_enable:
  496         case rf_rollForward:
  497                 for (i = 0; i < node->numSuccedents; i++) {
  498                         s = *(node->succedents + i);
  499                         RF_ASSERT(s->status == rf_wait);
  500                         (s->numAntDone)++;
  501                         if (s->numAntDone == s->numAntecedents) {
  502                                 /* Look for NIL nodes. */
  503                                 if (s->doFunc == rf_NullNodeFunc) {
  504                                         /*
  505                                          * Don't fire NIL nodes, just process
  506                                          * them.
  507                                          */
  508                                         s->next = finishlist;
  509                                         finishlist = s;
  510                                 } else {
  511                                         /*
  512                                          * Look to see if the node is to be
  513                                          * skipped.
  514                                          */
  515                                         skipNode = RF_FALSE;
  516                                         for (j = 0; j < s->numAntecedents; j++)
  517                                                 if ((s->antType[j] ==
  518                                                      rf_trueData) &&
  519                                                     (s->antecedents[j]->status
  520                                                      == rf_bad))
  521                                                         skipNode = RF_TRUE;
  522                                         if (skipNode) {
  523                                                 /*
  524                                                  * This node has one or more
  525                                                  * failed true data
  526                                                  * dependencies, so skip it.
  527                                                  */
  528                                                 s->next = skiplist;
  529                                                 skiplist = s;
  530                                         } else {
  531                                                 /*
  532                                                  * Add s to list of nodes (q)
  533                                                  * to execute.
  534                                                  */
  535                                                 if (context != RF_INTR_CONTEXT)
  536                                                 {
  537                                                         /*
  538                                                          * We only have to
  539                                                          * enqueue if we're at
  540                                                          * intr context.
  541                                                          */
  542                                                         /*
  543                                                          * Put node on a list to
  544                                                          * be fired after we
  545                                                          * unlock.
  546                                                          */
  547                                                         s->next = firelist;
  548                                                         firelist = s;
  549                                                 } else {
  550                                                         /*
  551                                                          * Enqueue the node for
  552                                                          * the dag exec thread
  553                                                          * to fire.
  554                                                          */
  555                                                      RF_ASSERT(rf_NodeReady(s));
  556                                                         if (q) {
  557                                                                 q->next = s;
  558                                                                 q = s;
  559                                                         } else {
  560                                                                 qh = q = s;
  561                                                                 qh->next = NULL;
  562                                                         }
  563                                                 }
  564                                         }
  565                                 }
  566                         }
  567                 }
  568 
  569                 if (q) {
  570                         /*
  571                          * Transfer our local list of nodes to the node
  572                          * queue.
  573                          */
  574                         q->next = raidPtr->node_queue;
  575                         raidPtr->node_queue = qh;
  576                         DO_SIGNAL(raidPtr);
  577                 }
  578                 DO_UNLOCK(raidPtr);
  579 
  580                 for (; skiplist; skiplist = next) {
  581                         next = skiplist->next;
  582                         skiplist->status = rf_skipped;
  583                         for (i = 0; i < skiplist->numAntecedents; i++) {
  584                                 skiplist->antecedents[i]->numSuccFired++;
  585                         }
  586                         if (skiplist->commitNode) {
  587                                 skiplist->dagHdr->numCommits++;
  588                         }
  589                         rf_FinishNode(skiplist, context);
  590                 }
  591                 for (; finishlist; finishlist = next) {
  592                         /* NIL nodes: no need to fire them. */
  593                         next = finishlist->next;
  594                         finishlist->status = rf_good;
  595                         for (i = 0; i < finishlist->numAntecedents; i++) {
  596                                 finishlist->antecedents[i]->numSuccFired++;
  597                         }
  598                         if (finishlist->commitNode)
  599                                 finishlist->dagHdr->numCommits++;
  600                         /*
  601                          * Okay, here we're calling rf_FinishNode() on nodes
  602                          * that have the null function as their work proc.
  603                          * Such a node could be the terminal node in a DAG.
  604                          * If so, it will cause the DAG to complete, which will
  605                          * in turn free memory used by the DAG, which includes
  606                          * the node in question.
  607                          * Thus, we must avoid referencing the node at all
  608                          * after calling rf_FinishNode() on it.
  609                          */
  610                         /* Recursive call. */
  611                         rf_FinishNode(finishlist, context);
  612                 }
  613                 /* Fire all nodes in firelist. */
  614                 rf_FireNodeList(firelist);
  615                 break;
  616 
  617         case rf_rollBackward:
  618                 for (i = 0; i < node->numAntecedents; i++) {
  619                         a = *(node->antecedents + i);
  620                         RF_ASSERT(a->status == rf_good);
  621                         RF_ASSERT(a->numSuccDone <= a->numSuccedents);
  622                         RF_ASSERT(a->numSuccDone <= a->numSuccFired);
  623 
  624                         if (a->numSuccDone == a->numSuccFired) {
  625                                 if (a->undoFunc == rf_NullNodeFunc) {
  626                                         /*
  627                                          * Don't fire NIL nodes, just process
  628                                          * them.
  629                                          */
  630                                         a->next = finishlist;
  631                                         finishlist = a;
  632                                 } else {
  633                                         if (context != RF_INTR_CONTEXT) {
  634                                                 /*
  635                                                  * We only have to enqueue if
  636                                                  * we're at intr context.
  637                                                  */
  638                                                 /*
  639                                                  * Put node on a list to
  640                                                  * be fired after we
  641                                                  * unlock.
  642                                                  */
  643                                                 a->next = firelist;
  644                                                 firelist = a;
  645                                         } else {
  646                                                 /*
  647                                                  * Enqueue the node for
  648                                                  * the dag exec thread
  649                                                  * to fire.
  650                                                  */
  651                                                 RF_ASSERT(rf_NodeReady(a));
  652                                                 if (q) {
  653                                                         q->next = a;
  654                                                         q = a;
  655                                                 } else {
  656                                                         qh = q = a;
  657                                                         qh->next = NULL;
  658                                                 }
  659                                         }
  660                                 }
  661                         }
  662                 }
  663                 if (q) {
  664                         /*
  665                          * Transfer our local list of nodes to the node
  666                          * queue.
  667                          */
  668                         q->next = raidPtr->node_queue;
  669                         raidPtr->node_queue = qh;
  670                         DO_SIGNAL(raidPtr);
  671                 }
  672                 DO_UNLOCK(raidPtr);
  673                 for (; finishlist; finishlist = next) {
  674                         /* NIL nodes: no need to fire them. */
  675                         next = finishlist->next;
  676                         finishlist->status = rf_good;
  677                         /*
  678                          * Okay, here we're calling rf_FinishNode() on nodes
  679                          * that have the null function as their work proc.
  680                          * Such a node could be the first node in a DAG.
  681                          * If so, it will cause the DAG to complete, which will
  682                          * in turn free memory used by the DAG, which includes
  683                          * the node in question.
  684                          * Thus, we must avoid referencing the node at all
  685                          * after calling rf_FinishNode() on it.
  686                          */
  687                         rf_FinishNode(finishlist, context);
  688                         /* Recursive call. */
  689                 }
  690                 /* Fire all nodes in firelist. */
  691                 rf_FireNodeList(firelist);
  692 
  693                 break;
  694         default:
  695                 printf("Engine found illegal DAG status in"
  696                     " rf_PropagateResults()\n");
  697                 RF_PANIC();
  698                 break;
  699         }
  700 }
  701 
  702 
  703 /*
  704  * Process a fired node which has completed.
  705  */
  706 void
  707 rf_ProcessNode(RF_DagNode_t *node, int context)
  708 {
  709         RF_Raid_t *raidPtr;
  710 
  711         raidPtr = node->dagHdr->raidPtr;
  712 
  713         switch (node->status) {
  714         case rf_good:
  715                 /* Normal case, don't need to do anything. */
  716                 break;
  717         case rf_bad:
  718                 if ((node->dagHdr->numCommits > 0) ||
  719                     (node->dagHdr->numCommitNodes == 0)) {
  720                         /* Crossed commit barrier. */
  721                         node->dagHdr->status = rf_rollForward;
  722                         if (rf_engineDebug || 1) {
  723                                 printf("raid%d: node (%s) returned fail,"
  724                                     " rolling forward\n", raidPtr->raidid,
  725                                     node->name);
  726                         }
  727                 } else {
  728                         /* Never reached commit barrier. */
  729                         node->dagHdr->status = rf_rollBackward;
  730                         if (rf_engineDebug || 1) {
  731                                 printf("raid%d: node (%s) returned fail,"
  732                                     " rolling backward\n", raidPtr->raidid,
  733                                     node->name);
  734                         }
  735                 }
  736                 break;
  737         case rf_undone:
  738                 /* Normal rollBackward case, don't need to do anything. */
  739                 break;
  740         case rf_panic:
  741                 /* An undo node failed !!! */
  742                 printf("UNDO of a node failed !!!/n");
  743                 break;
  744         default:
  745                 printf("node finished execution with an illegal status !!!\n");
  746                 RF_PANIC();
  747                 break;
  748         }
  749 
  750         /*
  751          * Enqueue node's succedents (antecedents if rollBackward) for
  752          * execution.
  753          */
  754         rf_PropagateResults(node, context);
  755 }
  756 
  757 
  758 /*
  759  * User context or dag-exec-thread context:
  760  * This is the first step in post-processing a newly-completed node.
  761  * This routine is called by each node execution function to mark the node
  762  * as complete and fire off any successors that have been enabled.
  763  */
  764 int
  765 rf_FinishNode(RF_DagNode_t *node, int context)
  766 {
  767         /* As far as I can tell, retcode is not used -wvcii. */
  768         int retcode = RF_FALSE;
  769         node->dagHdr->numNodesCompleted++;
  770         rf_ProcessNode(node, context);
  771 
  772         return (retcode);
  773 }
  774 
  775 
  776 /*
  777  * User context:
  778  * Submit dag for execution, return non-zero if we have to wait for completion.
  779  * If and only if we return non-zero, we'll cause cbFunc to get invoked with
  780  * cbArg when the DAG has completed.
  781  *
  782  * For now we always return 1. If the DAG does not cause any I/O, then the
  783  * callback may get invoked before DispatchDAG returns. There's code in state
  784  * 5 of ContinueRaidAccess to handle this.
  785  *
  786  * All we do here is fire the direct successors of the header node. The DAG
  787  * execution thread does the rest of the dag processing.
  788  */
  789 int
  790 rf_DispatchDAG(RF_DagHeader_t *dag, void (*cbFunc) (void *), void *cbArg)
  791 {
  792         RF_Raid_t *raidPtr;
  793 
  794         raidPtr = dag->raidPtr;
  795         if (dag->tracerec) {
  796                 RF_ETIMER_START(dag->tracerec->timer);
  797         }
  798         if (rf_engineDebug || rf_validateDAGDebug) {
  799                 if (rf_ValidateDAG(dag))
  800                         RF_PANIC();
  801         }
  802         if (rf_engineDebug>1) {
  803                 printf("raid%d: Entering DispatchDAG\n", raidPtr->raidid);
  804         }
  805         raidPtr->dags_in_flight++;      /*
  806                                          * Debug only:  blow off proper
  807                                          * locking.
  808                                          */
  809         dag->cbFunc = cbFunc;
  810         dag->cbArg = cbArg;
  811         dag->numNodesCompleted = 0;
  812         dag->status = rf_enable;
  813         rf_FireNodeArray(dag->numSuccedents, dag->succedents);
  814         return (1);
  815 }
  816 
  817 
  818 /*
  819  * Dedicated kernel thread:
  820  * The thread that handles all DAG node firing.
  821  * To minimize locking and unlocking, we grab a copy of the entire node queue
  822  * and then set the node queue to NULL before doing any firing of nodes.
  823  * This way we only have to release the lock once. Of course, it's probably
  824  * rare that there's more than one node in the queue at any one time, but it
  825  * sometimes happens.
  826  *
  827  * In the kernel, this thread runs at spl0 and is not swappable. I copied these
  828  * characteristics from the aio_completion_thread.
  829  */
  830 
  831 #ifdef  RAID_AUTOCONFIG
  832 void
  833 rf_DAGExecutionThread_pre(RF_ThreadArg_t arg)
  834 {
  835         RF_Raid_t *raidPtr;
  836         char raidname[16];
  837         pid_t oldpid = lastpid;
  838 
  839         raidPtr = (RF_Raid_t *) arg;
  840 
  841         if (rf_engineDebug) {
  842                 printf("raid%d: Starting engine thread\n", raidPtr->raidid);
  843         }
  844 
  845         lastpid = RF_ENGINE_PID + raidPtr->raidid - 1;
  846         snprintf(raidname, sizeof raidname, "raid%d", raidPtr->raidid);
  847 
  848         if (RF_CREATE_THREAD(raidPtr->engine_thread, rf_DAGExecutionThread,
  849             raidPtr, &raidname[0])) {
  850                 RF_ERRORMSG("RAIDFRAME: Unable to start engine thread\n");
  851                 return;
  852         }
  853 
  854         lastpid = oldpid;
  855         if (rf_engineDebug) {
  856                 printf("raid%d: Engine thread started\n", raidPtr->raidid);
  857         }
  858         RF_THREADGROUP_STARTED(&raidPtr->engine_tg);
  859 }
  860 #endif  /* RAID_AUTOCONFIG */
  861 
  862 void
  863 rf_DAGExecutionThread(RF_ThreadArg_t arg)
  864 {
  865         RF_DagNode_t *nd, *local_nq, *term_nq, *fire_nq;
  866         RF_Raid_t *raidPtr;
  867         int ks;
  868         int s;
  869 
  870         raidPtr = (RF_Raid_t *) arg;
  871 
  872         while (!(&raidPtr->engine_tg)->created)
  873                 (void) tsleep((void *)&(&raidPtr->engine_tg)->created, PWAIT,
  874                                 "raidinit", 0);
  875 
  876         if (rf_engineDebug) {
  877                 printf("raid%d: Engine thread is running\n", raidPtr->raidid);
  878         }
  879         /* XXX What to put here ? XXX */
  880 
  881         s = splbio();
  882 
  883         RF_THREADGROUP_RUNNING(&raidPtr->engine_tg);
  884 
  885         rf_hook_cookies[raidPtr->raidid] =
  886                 shutdownhook_establish(rf_shutdown_hook, (void *)raidPtr);
  887 
  888         DO_LOCK(raidPtr);
  889         while (!raidPtr->shutdown_engine) {
  890 
  891                 while (raidPtr->node_queue != NULL) {
  892                         local_nq = raidPtr->node_queue;
  893                         fire_nq = NULL;
  894                         term_nq = NULL;
  895                         raidPtr->node_queue = NULL;
  896                         DO_UNLOCK(raidPtr);
  897 
  898                         /* First, strip out the terminal nodes. */
  899                         while (local_nq) {
  900                                 nd = local_nq;
  901                                 local_nq = local_nq->next;
  902                                 switch (nd->dagHdr->status) {
  903                                 case rf_enable:
  904                                 case rf_rollForward:
  905                                         if (nd->numSuccedents == 0) {
  906                                                 /*
  907                                                  * End of the dag, add to
  908                                                  * callback list.
  909                                                  */
  910                                                 nd->next = term_nq;
  911                                                 term_nq = nd;
  912                                         } else {
  913                                                 /*
  914                                                  * Not the end, add to the
  915                                                  * fire queue.
  916                                                  */
  917                                                 nd->next = fire_nq;
  918                                                 fire_nq = nd;
  919                                         }
  920                                         break;
  921                                 case rf_rollBackward:
  922                                         if (nd->numAntecedents == 0) {
  923                                                 /*
  924                                                  * End of the dag, add to the
  925                                                  * callback list.
  926                                                  */
  927                                                 nd->next = term_nq;
  928                                                 term_nq = nd;
  929                                         } else {
  930                                                 /*
  931                                                  * Not the end, add to the
  932                                                  * fire queue.
  933                                                  */
  934                                                 nd->next = fire_nq;
  935                                                 fire_nq = nd;
  936                                         }
  937                                         break;
  938                                 default:
  939                                         RF_PANIC();
  940                                         break;
  941                                 }
  942                         }
  943 
  944                         /*
  945                          * Execute callback of dags which have reached the
  946                          * terminal node.
  947                          */
  948                         while (term_nq) {
  949                                 nd = term_nq;
  950                                 term_nq = term_nq->next;
  951                                 nd->next = NULL;
  952                                 (nd->dagHdr->cbFunc) (nd->dagHdr->cbArg);
  953                                 raidPtr->dags_in_flight--; /* Debug only. */
  954                         }
  955 
  956                         /* Fire remaining nodes. */
  957                         rf_FireNodeList(fire_nq);
  958 
  959                         DO_LOCK(raidPtr);
  960                 }
  961                 while (!raidPtr->shutdown_engine && raidPtr->node_queue == NULL)
  962                         DO_WAIT(raidPtr);
  963         }
  964         DO_UNLOCK(raidPtr);
  965 
  966         if (rf_hook_cookies && rf_hook_cookies[raidPtr->raidid] != NULL) {
  967                 shutdownhook_disestablish(rf_hook_cookies[raidPtr->raidid]);
  968                 rf_hook_cookies[raidPtr->raidid] = NULL;
  969         }
  970 
  971         RF_THREADGROUP_DONE(&raidPtr->engine_tg);
  972 
  973         splx(s);
  974         kthread_exit(0);
  975 }

/* [<][>][^][v][top][bottom][index][help] */