This source file includes following definitions.
- workq_init
- workq_init_syswq
- workq_create
- workq_destroy
- workq_add_task
- workq_create_thread
- workq_thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 #include <sys/param.h>
21 #include <sys/systm.h>
22 #include <sys/malloc.h>
23 #include <sys/pool.h>
24 #include <sys/queue.h>
25 #include <sys/kthread.h>
26 #include <sys/workq.h>
27
28 struct workq_task {
29 int wqt_flags;
30 workq_fn wqt_func;
31 void *wqt_arg1;
32 void *wqt_arg2;
33
34 SIMPLEQ_ENTRY(workq_task) wqt_entry;
35 };
36
37 struct workq {
38 int wq_flags;
39 #define WQ_F_RUNNING (1<<0)
40 int wq_running;
41 int wq_busy;
42 int wq_max;
43 const char *wq_name;
44
45 SIMPLEQ_HEAD(, workq_task) wq_tasklist;
46 };
47
48 struct pool workq_task_pool;
49 struct workq workq_syswq = {
50 WQ_F_RUNNING,
51 0,
52 0,
53 1,
54 "syswq"
55 };
56
57 void workq_init(void);
58 void workq_init_syswq(void *);
59 void workq_create_thread(void *);
60 void workq_thread(void *);
61
62 void
63 workq_init(void)
64 {
65 pool_init(&workq_task_pool, sizeof(struct workq_task), 0, 0,
66 0, "wqtasks", NULL);
67
68 SIMPLEQ_INIT(&workq_syswq.wq_tasklist);
69 kthread_create_deferred(workq_init_syswq, NULL);
70 }
71
72 void
73 workq_init_syswq(void *arg)
74 {
75 if (kthread_create(workq_thread, &workq_syswq, NULL, "%s",
76 workq_syswq.wq_name) != 0)
77 panic("unable to create system work queue thread");
78
79 workq_syswq.wq_running++;
80 }
81
82 struct workq *
83 workq_create(const char *name, int maxqs)
84 {
85 struct workq *wq;
86
87 wq = malloc(sizeof(*wq), M_DEVBUF, M_NOWAIT);
88 if (wq == NULL)
89 return (NULL);
90
91 wq->wq_flags = WQ_F_RUNNING;
92 wq->wq_running = 0;
93 wq->wq_busy = 0;
94 wq->wq_max = maxqs;
95 wq->wq_name = name;
96 SIMPLEQ_INIT(&wq->wq_tasklist);
97
98
99 kthread_create_deferred(workq_create_thread, wq);
100
101 return (wq);
102 }
103
104 void
105 workq_destroy(struct workq *wq)
106 {
107 int s;
108
109 s = splhigh();
110
111 wq->wq_flags &= ~WQ_F_RUNNING;
112 while (wq->wq_running != 0) {
113 wakeup(wq);
114 tsleep(&wq->wq_running, PWAIT, "wqdestroy", 0);
115 }
116
117 splx(s);
118
119 free(wq, M_DEVBUF);
120 }
121
122 int
123 workq_add_task(struct workq *wq, int flags, workq_fn func,
124 void *a1, void *a2)
125 {
126 struct workq_task *wqt;
127 int wake = 1;
128 int s;
129
130 if (wq == NULL)
131 wq = &workq_syswq;
132
133 s = splhigh();
134 wqt = pool_get(&workq_task_pool, (flags & WQ_WAITOK) ?
135 PR_WAITOK : PR_NOWAIT);
136 splx(s);
137 if (!wqt)
138 return (ENOMEM);
139
140 wqt->wqt_flags = flags;
141 wqt->wqt_func = func;
142 wqt->wqt_arg1 = a1;
143 wqt->wqt_arg2 = a2;
144
145 s = splhigh();
146 SIMPLEQ_INSERT_TAIL(&wq->wq_tasklist, wqt, wqt_entry);
147 if ((wq->wq_running < wq->wq_max) && (wq->wq_running == wq->wq_busy)) {
148 kthread_create_deferred(workq_create_thread, wq);
149 wake = 0;
150 }
151 splx(s);
152
153 if (wake)
154 wakeup_one(wq);
155
156 return (0);
157 }
158
159 void
160 workq_create_thread(void *arg)
161 {
162 struct workq *wq = arg;
163 int rv;
164
165 rv = kthread_create(workq_thread, wq, NULL, "%s", wq->wq_name);
166 if (rv != 0) {
167 printf("unable to create \"%s\" workq thread\n", wq->wq_name);
168 return;
169 }
170
171 wq->wq_running++;
172 }
173
174 void
175 workq_thread(void *arg)
176 {
177 struct workq *wq = arg;
178 struct workq_task *wqt;
179 int s;
180
181 s = splhigh();
182 while (wq->wq_flags & WQ_F_RUNNING) {
183 while ((wqt = SIMPLEQ_FIRST(&wq->wq_tasklist)) != NULL) {
184 SIMPLEQ_REMOVE_HEAD(&wq->wq_tasklist, wqt_entry);
185 wq->wq_busy++;
186 splx(s);
187
188 if (wqt->wqt_flags & WQ_MPSAFE)
189 ;
190 wqt->wqt_func(wqt->wqt_arg1, wqt->wqt_arg2);
191 if (wqt->wqt_flags & WQ_MPSAFE)
192 ;
193
194 s = splhigh();
195 wq->wq_busy--;
196 pool_put(&workq_task_pool, wqt);
197 }
198 tsleep(wq, PWAIT, "bored", 0);
199 }
200 wq->wq_running--;
201 splx(s);
202 wakeup(&wq->wq_running);
203
204 kthread_exit(0);
205 }