Print this page
XXXX update sendmail to 8.14.9
| Split |
Close |
| Expand all |
| Collapse all |
--- old/usr/src/cmd/sendmail/libmilter/worker.c
+++ new/usr/src/cmd/sendmail/libmilter/worker.c
1 1 /*
2 - * Copyright (c) 2003-2004, 2007, 2009 Sendmail, Inc. and its suppliers.
2 + * Copyright (c) 2003-2004, 2007, 2009-2012 Proofpoint, Inc. and its suppliers.
3 3 * All rights reserved.
4 4 *
5 5 * By using this file, you agree to the terms and conditions set
6 6 * forth in the LICENSE file which can be found at the top level of
7 7 * the sendmail distribution.
8 8 *
9 9 * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10 10 * Jose-Marcio.Martins@ensmp.fr
11 11 */
12 12
13 13 #include <sm/gen.h>
14 -SM_RCSID("@(#)$Id: worker.c,v 8.17 2009/06/15 15:34:54 ca Exp $")
14 +SM_RCSID("@(#)$Id: worker.c,v 8.25 2013-11-22 20:51:37 ca Exp $")
15 15
16 16 #include "libmilter.h"
17 17
18 18 #if _FFR_WORKERS_POOL
19 19
20 20 typedef struct taskmgr_S taskmgr_T;
21 21
22 22 #define TM_SIGNATURE 0x23021957
23 23
24 24 struct taskmgr_S
25 25 {
26 26 long tm_signature; /* has the controller been initialized */
27 27 sthread_t tm_tid; /* thread id of controller */
28 28 smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */
29 29
30 30 int tm_nb_workers; /* number of workers in the pool */
31 31 int tm_nb_idle; /* number of workers waiting */
32 32
33 33 int tm_p[2]; /* poll control pipe */
34 34
35 35 smutex_t tm_w_mutex; /* linked list access mutex */
36 36 scond_t tm_w_cond; /* */
37 37 };
38 38
39 39 static taskmgr_T Tskmgr = {0};
40 40
41 41 #define WRK_CTX_HEAD Tskmgr.tm_ctx_head
42 42
43 43 #define RD_PIPE (Tskmgr.tm_p[0])
44 44 #define WR_PIPE (Tskmgr.tm_p[1])
45 45
46 46 #define PIPE_SEND_SIGNAL() \
47 47 do \
48 48 { \
49 49 char evt = 0x5a; \
50 50 int fd = WR_PIPE; \
51 51 if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \
52 52 smi_log(SMI_LOG_ERR, \
53 53 "Error writing to event pipe: %s", \
54 54 sm_errstring(errno)); \
55 55 } while (0)
56 56
57 57 #ifndef USE_PIPE_WAKE_POLL
58 58 # define USE_PIPE_WAKE_POLL 1
59 59 #endif /* USE_PIPE_WAKE_POLL */
60 60
61 61 /* poll check periodicity (default 10000 - 10 s) */
62 62 #define POLL_TIMEOUT 10000
63 63
64 64 /* worker conditional wait timeout (default 10 s) */
65 65 #define COND_TIMEOUT 10
66 66
67 67 /* functions */
68 68 static int mi_close_session __P((SMFICTX_PTR));
69 69
70 70 static void *mi_worker __P((void *));
71 71 static void *mi_pool_controller __P((void *));
72 72
73 73 static int mi_list_add_ctx __P((SMFICTX_PTR));
74 74 static int mi_list_del_ctx __P((SMFICTX_PTR));
75 75
76 76 /*
77 77 ** periodicity of cleaning up old sessions (timedout)
78 78 ** sessions list will be checked to find old inactive
79 79 ** sessions each DT_CHECK_OLD_SESSIONS sec
80 80 */
81 81
82 82 #define DT_CHECK_OLD_SESSIONS 600
83 83
84 84 #ifndef OLD_SESSION_TIMEOUT
85 85 # define OLD_SESSION_TIMEOUT ctx->ctx_timeout
86 86 #endif /* OLD_SESSION_TIMEOUT */
87 87
88 88 /* session states - with respect to the pool of workers */
89 89 #define WKST_INIT 0 /* initial state */
90 90 #define WKST_READY_TO_RUN 1 /* command ready do be read */
91 91 #define WKST_RUNNING 2 /* session running on a worker */
92 92 #define WKST_READY_TO_WAIT 3 /* session just finished by a worker */
93 93 #define WKST_WAITING 4 /* waiting for new command */
94 94 #define WKST_CLOSING 5 /* session finished */
95 95
96 96 #ifndef MIN_WORKERS
97 97 # define MIN_WORKERS 2 /* minimum number of threads to keep around */
98 98 #endif
99 99
100 100 #define MIN_IDLE 1 /* minimum number of idle threads */
101 101
102 102
103 103 /*
104 104 ** Macros for threads and mutex management
105 105 */
106 106
107 107 #define TASKMGR_LOCK() \
108 108 do \
109 109 { \
110 110 if (!smutex_lock(&Tskmgr.tm_w_mutex)) \
111 111 smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \
112 112 } while (0)
113 113
114 114 #define TASKMGR_UNLOCK() \
115 115 do \
116 116 { \
117 117 if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \
118 118 smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \
119 119 } while (0)
120 120
121 121 #define TASKMGR_COND_WAIT() \
122 122 scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123 123
124 124 #define TASKMGR_COND_SIGNAL() \
125 125 do \
126 126 { \
127 127 if (scond_signal(&Tskmgr.tm_w_cond) != 0) \
128 128 smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129 129 } while (0)
130 130
131 131 #define LAUNCH_WORKER(ctx) \
132 132 do \
133 133 { \
|
↓ open down ↓ |
109 lines elided |
↑ open up ↑ |
134 134 int r; \
135 135 sthread_t tid; \
136 136 \
137 137 if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \
138 138 smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139 139 sm_errstring(r)); \
140 140 } while (0)
141 141
142 142 #if POOL_DEBUG
143 143 # define POOL_LEV_DPRINTF(lev, x) \
144 - do { \
144 + do \
145 + { \
145 146 if ((lev) < ctx->ctx_dbg) \
146 147 sm_dprintf x; \
147 148 } while (0)
148 149 #else /* POOL_DEBUG */
149 150 # define POOL_LEV_DPRINTF(lev, x)
150 151 #endif /* POOL_DEBUG */
151 152
152 153 /*
153 154 ** MI_START_SESSION -- Start a session in the pool of workers
154 155 **
155 156 ** Parameters:
156 157 ** ctx -- context structure
157 158 **
|
↓ open down ↓ |
3 lines elided |
↑ open up ↑ |
158 159 ** Returns:
159 160 ** MI_SUCCESS/MI_FAILURE
160 161 */
161 162
162 163 int
163 164 mi_start_session(ctx)
164 165 SMFICTX_PTR ctx;
165 166 {
166 167 static long id = 0;
167 168
168 - SM_ASSERT(Tskmgr.tm_signature == TM_SIGNATURE);
169 + /* this can happen if the milter is shutting down */
170 + if (Tskmgr.tm_signature != TM_SIGNATURE)
171 + return MI_FAILURE;
169 172 SM_ASSERT(ctx != NULL);
170 173 POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
171 174 TASKMGR_LOCK();
172 175
173 176 if (mi_list_add_ctx(ctx) != MI_SUCCESS)
174 177 {
175 178 TASKMGR_UNLOCK();
176 179 return MI_FAILURE;
177 180 }
178 181
179 182 ctx->ctx_sid = id++;
180 183
181 184 /* if there is an idle worker, signal it, otherwise start new worker */
182 185 if (Tskmgr.tm_nb_idle > 0)
183 186 {
184 187 ctx->ctx_wstate = WKST_READY_TO_RUN;
185 188 TASKMGR_COND_SIGNAL();
186 189 }
187 190 else
188 191 {
189 192 ctx->ctx_wstate = WKST_RUNNING;
190 193 LAUNCH_WORKER(ctx);
191 194 }
192 195 TASKMGR_UNLOCK();
193 196 return MI_SUCCESS;
194 197 }
195 198
196 199 /*
197 200 ** MI_CLOSE_SESSION -- Close a session and clean up data structures
198 201 **
199 202 ** Parameters:
200 203 ** ctx -- context structure
201 204 **
202 205 ** Returns:
203 206 ** MI_SUCCESS/MI_FAILURE
204 207 */
205 208
206 209 static int
207 210 mi_close_session(ctx)
208 211 SMFICTX_PTR ctx;
|
↓ open down ↓ |
30 lines elided |
↑ open up ↑ |
209 212 {
210 213 SM_ASSERT(ctx != NULL);
211 214
212 215 (void) mi_list_del_ctx(ctx);
213 216 mi_clr_ctx(ctx);
214 217
215 218 return MI_SUCCESS;
216 219 }
217 220
218 221 /*
222 +** NONBLOCKING -- set nonblocking mode for a file descriptor.
223 +**
224 +** Parameters:
225 +** fd -- file descriptor
226 +** name -- name for (error) logging
227 +**
228 +** Returns:
229 +** MI_SUCCESS/MI_FAILURE
230 +*/
231 +
232 +static int
233 +nonblocking(int fd, const char *name)
234 +{
235 + int r;
236 +
237 + errno = 0;
238 + r = fcntl(fd, F_GETFL, 0);
239 + if (r == -1)
240 + {
241 + smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242 + name, sm_errstring(errno));
243 + return MI_FAILURE;
244 + }
245 + errno = 0;
246 + r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
247 + if (r == -1)
248 + {
249 + smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250 + name, sm_errstring(errno));
251 + return MI_FAILURE;
252 + }
253 + return MI_SUCCESS;
254 +}
255 +
256 +/*
219 257 ** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
220 258 ** Must be called before starting sessions.
221 259 **
222 260 ** Parameters:
223 261 ** none
224 262 **
225 263 ** Returns:
226 264 ** MI_SUCCESS/MI_FAILURE
227 265 */
228 266
229 267 int
230 268 mi_pool_controller_init()
231 269 {
232 270 sthread_t tid;
233 271 int r, i;
234 272
235 273 if (Tskmgr.tm_signature == TM_SIGNATURE)
236 274 return MI_SUCCESS;
237 275
238 276 SM_TAILQ_INIT(&WRK_CTX_HEAD);
|
↓ open down ↓ |
10 lines elided |
↑ open up ↑ |
239 277 Tskmgr.tm_tid = (sthread_t) -1;
240 278 Tskmgr.tm_nb_workers = 0;
241 279 Tskmgr.tm_nb_idle = 0;
242 280
243 281 if (pipe(Tskmgr.tm_p) != 0)
244 282 {
245 283 smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
246 284 sm_errstring(errno));
247 285 return MI_FAILURE;
248 286 }
287 + r = nonblocking(WR_PIPE, "WR_PIPE");
288 + if (r != MI_SUCCESS)
289 + return r;
290 + r = nonblocking(RD_PIPE, "RD_PIPE");
291 + if (r != MI_SUCCESS)
292 + return r;
249 293
250 294 (void) smutex_init(&Tskmgr.tm_w_mutex);
251 295 (void) scond_init(&Tskmgr.tm_w_cond);
252 296
253 297 /* Launch the pool controller */
254 298 if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
255 299 {
256 300 smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
257 301 sm_errstring(r));
258 302 return MI_FAILURE;
259 303 }
260 304 Tskmgr.tm_tid = tid;
261 305 Tskmgr.tm_signature = TM_SIGNATURE;
262 306
263 307 /* Create the pool of workers */
264 308 for (i = 0; i < MIN_WORKERS; i++)
265 309 {
266 310 if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
267 311 {
268 312 smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
269 313 sm_errstring(r));
270 314 return MI_FAILURE;
271 315 }
272 316 }
273 317
274 318 return MI_SUCCESS;
275 319 }
276 320
277 321 /*
278 322 ** MI_POOL_CONTROLLER -- manage the pool of workers
279 323 ** This thread must be running when listener begins
280 324 ** starting sessions
281 325 **
282 326 ** Parameters:
283 327 ** arg -- unused
284 328 **
285 329 ** Returns:
286 330 ** NULL
287 331 **
288 332 ** Control flow:
289 333 ** for (;;)
290 334 ** Look for timed out sessions
291 335 ** Select sessions to wait for sendmail command
292 336 ** Poll set of file descriptors
293 337 ** if timeout
294 338 ** continue
295 339 ** For each file descriptor ready
296 340 ** launch new thread if no worker available
297 341 ** else
298 342 ** signal waiting worker
299 343 */
300 344
301 345 /* Poll structure array (pollfd) size step */
302 346 #define PFD_STEP 256
303 347
304 348 #define WAIT_FD(i) (pfd[i].fd)
305 349 #define WAITFN "POLL"
306 350
307 351 static void *
308 352 mi_pool_controller(arg)
309 353 void *arg;
310 354 {
311 355 struct pollfd *pfd = NULL;
312 356 int dim_pfd = 0;
313 357 bool rebuild_set = true;
314 358 int pcnt = 0; /* error count for poll() failures */
315 359 time_t lastcheck;
316 360
317 361 Tskmgr.tm_tid = sthread_get_id();
318 362 if (pthread_detach(Tskmgr.tm_tid) != 0)
319 363 {
320 364 smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
321 365 return NULL;
322 366 }
323 367
324 368 pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
325 369 if (pfd == NULL)
326 370 {
|
↓ open down ↓ |
68 lines elided |
↑ open up ↑ |
327 371 smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
328 372 sm_errstring(errno));
329 373 return NULL;
330 374 }
331 375 dim_pfd = PFD_STEP;
332 376
333 377 lastcheck = time(NULL);
334 378 for (;;)
335 379 {
336 380 SMFICTX_PTR ctx;
337 - int nfd, rfd, i;
381 + int nfd, r, i;
338 382 time_t now;
339 383
340 384 POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
341 385
342 386 if (mi_stop() != MILTER_CONT)
343 387 break;
344 388
345 389 TASKMGR_LOCK();
346 390
347 391 now = time(NULL);
348 392
349 393 /* check for timed out sessions? */
350 394 if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
351 395 {
352 396 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
353 397 while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
354 398 {
355 399 SMFICTX_PTR ctx_nxt;
356 400
357 401 ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
358 402 if (ctx->ctx_wstate == WKST_WAITING)
359 403 {
360 404 if (ctx->ctx_wait == 0)
361 405 ctx->ctx_wait = now;
362 406 else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
363 407 < now)
364 408 {
365 409 /* if session timed out, close it */
366 410 sfsistat (*fi_close) __P((SMFICTX *));
367 411
368 412 POOL_LEV_DPRINTF(4,
369 413 ("Closing old connection: sd=%d id=%d",
370 414 ctx->ctx_sd,
371 415 ctx->ctx_sid));
372 416
373 417 if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
374 418 (void) (*fi_close)(ctx);
375 419
376 420 mi_close_session(ctx);
377 421 }
378 422 }
379 423 ctx = ctx_nxt;
380 424 }
381 425 lastcheck = now;
382 426 }
383 427
384 428 if (rebuild_set)
385 429 {
386 430 /*
387 431 ** Initialize poll set.
388 432 ** Insert into the poll set the file descriptors of
389 433 ** all sessions waiting for a command from sendmail.
390 434 */
391 435
392 436 nfd = 0;
393 437
394 438 /* begin with worker pipe */
395 439 pfd[nfd].fd = RD_PIPE;
396 440 pfd[nfd].events = MI_POLL_RD_FLAGS;
397 441 pfd[nfd].revents = 0;
398 442 nfd++;
399 443
400 444 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
401 445 {
402 446 /*
403 447 ** update ctx_wait - start of wait moment -
404 448 ** for timeout
405 449 */
406 450
407 451 if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
408 452 ctx->ctx_wait = now;
409 453
410 454 /* add the session to the pollfd array? */
411 455 if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
412 456 (ctx->ctx_wstate == WKST_WAITING))
413 457 {
414 458 /*
415 459 ** Resize the pollfd array if it
416 460 ** isn't large enough.
417 461 */
418 462
419 463 if (nfd >= dim_pfd)
420 464 {
421 465 struct pollfd *tpfd;
422 466 size_t new;
423 467
424 468 new = (dim_pfd + PFD_STEP) *
425 469 sizeof(*tpfd);
426 470 tpfd = (struct pollfd *)
427 471 realloc(pfd, new);
428 472 if (tpfd != NULL)
429 473 {
430 474 pfd = tpfd;
431 475 dim_pfd += PFD_STEP;
432 476 }
433 477 else
434 478 {
435 479 smi_log(SMI_LOG_ERR,
436 480 "Failed to realloc pollfd array:%s",
437 481 sm_errstring(errno));
438 482 }
439 483 }
440 484
441 485 /* add the session to pollfd array */
442 486 if (nfd < dim_pfd)
443 487 {
444 488 ctx->ctx_wstate = WKST_WAITING;
445 489 pfd[nfd].fd = ctx->ctx_sd;
446 490 pfd[nfd].events = MI_POLL_RD_FLAGS;
447 491 pfd[nfd].revents = 0;
|
↓ open down ↓ |
100 lines elided |
↑ open up ↑ |
448 492 nfd++;
449 493 }
450 494 }
451 495 }
452 496 rebuild_set = false;
453 497 }
454 498
455 499 TASKMGR_UNLOCK();
456 500
457 501 /* Everything is ready, let's wait for an event */
458 - rfd = poll(pfd, nfd, POLL_TIMEOUT);
502 + r = poll(pfd, nfd, POLL_TIMEOUT);
459 503
460 504 POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
461 505 WAITFN, now, nfd));
462 506
463 507 /* timeout */
464 - if (rfd == 0)
508 + if (r == 0)
465 509 continue;
466 510
467 511 rebuild_set = true;
468 512
469 513 /* error */
470 - if (rfd < 0)
514 + if (r < 0)
471 515 {
472 516 if (errno == EINTR)
473 517 continue;
474 518 pcnt++;
475 519 smi_log(SMI_LOG_ERR,
476 520 "%s() failed (%s), %s",
477 521 WAITFN, sm_errstring(errno),
478 522 pcnt >= MAX_FAILS_S ? "abort" : "try again");
479 523
480 524 if (pcnt >= MAX_FAILS_S)
481 525 goto err;
526 + continue;
482 527 }
483 528 pcnt = 0;
484 529
485 530 /* something happened */
486 531 for (i = 0; i < nfd; i++)
487 532 {
488 533 if (pfd[i].revents == 0)
489 534 continue;
490 535
491 536 POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
492 537 WAITFN, i, nfd,
493 538 WAIT_FD(i)));
494 539
495 - /* has a worker signaled an end of task ? */
540 + /* has a worker signaled an end of task? */
496 541 if (WAIT_FD(i) == RD_PIPE)
497 542 {
498 - char evt = 0;
499 - int r = 0;
543 + char evts[256];
544 + ssize_t r;
500 545
501 546 POOL_LEV_DPRINTF(4,
502 547 ("PIPE WILL READ evt = %08X %08X",
503 548 pfd[i].events, pfd[i].revents));
504 549
505 - if ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0)
550 + r = 1;
551 + while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
552 + && r != -1)
506 553 {
507 - r = read(RD_PIPE, &evt, sizeof(evt));
508 - if (r == sizeof(evt))
509 - {
510 - /* Do nothing */
511 - }
554 + r = read(RD_PIPE, evts, sizeof(evts));
512 555 }
513 556
514 557 POOL_LEV_DPRINTF(4,
515 558 ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
516 - i, RD_PIPE, r, evt));
559 + i, RD_PIPE, (int) r, evts[0]));
517 560
518 561 if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
519 562 {
520 563 /* Exception handling */
521 564 }
522 565 continue;
523 566 }
524 567
525 - /* no ! sendmail wants to send a command */
568 + /*
569 + ** Not the pipe for workers waking us,
570 + ** so must be something on an MTA connection.
571 + */
572 +
573 + TASKMGR_LOCK();
526 574 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
527 575 {
528 576 if (ctx->ctx_wstate != WKST_WAITING)
529 577 continue;
530 578
531 579 POOL_LEV_DPRINTF(4,
532 580 ("Checking context sd=%d - fd=%d ",
533 581 ctx->ctx_sd , WAIT_FD(i)));
534 582
535 583 if (ctx->ctx_sd == pfd[i].fd)
536 584 {
537 - TASKMGR_LOCK();
538 585
539 586 POOL_LEV_DPRINTF(4,
540 587 ("TASK: found %d for fd[%d]=%d",
541 588 ctx->ctx_sid, i, WAIT_FD(i)));
542 589
543 590 if (Tskmgr.tm_nb_idle > 0)
544 591 {
545 592 ctx->ctx_wstate = WKST_READY_TO_RUN;
546 593 TASKMGR_COND_SIGNAL();
547 594 }
548 595 else
549 596 {
550 597 ctx->ctx_wstate = WKST_RUNNING;
551 598 LAUNCH_WORKER(ctx);
552 599 }
553 - TASKMGR_UNLOCK();
554 600 break;
555 601 }
556 602 }
603 + TASKMGR_UNLOCK();
557 604
558 605 POOL_LEV_DPRINTF(4,
559 606 ("TASK %s FOUND - Checking PIPE for fd[%d]",
560 607 ctx != NULL ? "" : "NOT", WAIT_FD(i)));
561 608 }
562 609 }
563 610
564 611 err:
565 612 if (pfd != NULL)
566 613 free(pfd);
567 614
568 615 Tskmgr.tm_signature = 0;
616 +#if 0
617 + /*
618 + ** Do not clean up ctx -- it can cause double-free()s.
619 + ** The program is shutting down anyway, so it's not worth the trouble.
620 + ** There is a more complex solution that prevents race conditions
621 + ** while accessing ctx, but that's maybe for a later version.
622 + */
623 +
569 624 for (;;)
570 625 {
571 626 SMFICTX_PTR ctx;
572 627
573 628 ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
574 629 if (ctx == NULL)
575 630 break;
576 631 mi_close_session(ctx);
577 632 }
633 +#endif
578 634
579 635 (void) smutex_destroy(&Tskmgr.tm_w_mutex);
580 636 (void) scond_destroy(&Tskmgr.tm_w_cond);
581 637
582 638 return NULL;
583 639 }
584 640
585 641 /*
586 642 ** Look for a task ready to run.
587 643 ** Value of ctx is NULL or a pointer to a task ready to run.
588 644 */
589 645
590 646 #define GET_TASK_READY_TO_RUN() \
591 647 SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \
592 648 { \
593 649 if (ctx->ctx_wstate == WKST_READY_TO_RUN) \
594 650 { \
595 651 ctx->ctx_wstate = WKST_RUNNING; \
596 652 break; \
597 653 } \
598 654 }
599 655
600 656 /*
601 657 ** MI_WORKER -- worker thread
602 658 ** executes tasks distributed by the mi_pool_controller
603 659 ** or by mi_start_session
604 660 **
605 661 ** Parameters:
606 662 ** arg -- pointer to context structure
607 663 **
608 664 ** Returns:
609 665 ** NULL pointer
610 666 */
611 667
612 668 static void *
613 669 mi_worker(arg)
614 670 void *arg;
615 671 {
616 672 SMFICTX_PTR ctx;
617 673 bool done;
618 674 sthread_t t_id;
619 675 int r;
620 676
621 677 ctx = (SMFICTX_PTR) arg;
622 678 done = false;
623 679 if (ctx != NULL)
624 680 ctx->ctx_wstate = WKST_RUNNING;
625 681
626 682 t_id = sthread_get_id();
627 683 if (pthread_detach(t_id) != 0)
628 684 {
629 685 smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
630 686 if (ctx != NULL)
631 687 ctx->ctx_wstate = WKST_READY_TO_RUN;
632 688 return NULL;
633 689 }
634 690
635 691 TASKMGR_LOCK();
636 692 Tskmgr.tm_nb_workers++;
637 693 TASKMGR_UNLOCK();
638 694
639 695 while (!done)
640 696 {
641 697 if (mi_stop() != MILTER_CONT)
642 698 break;
643 699
644 700 /* let's handle next task... */
645 701 if (ctx != NULL)
646 702 {
647 703 int res;
648 704
649 705 POOL_LEV_DPRINTF(4,
650 706 ("worker %d: new task -> let's handle it",
651 707 t_id));
652 708 res = mi_engine(ctx);
653 709 POOL_LEV_DPRINTF(4,
654 710 ("worker %d: mi_engine returned %d", t_id, res));
655 711
656 712 TASKMGR_LOCK();
657 713 if (res != MI_CONTINUE)
658 714 {
659 715 ctx->ctx_wstate = WKST_CLOSING;
660 716
661 717 /*
662 718 ** Delete context from linked list of
663 719 ** sessions and close session.
664 720 */
665 721
666 722 mi_close_session(ctx);
667 723 }
668 724 else
669 725 {
670 726 ctx->ctx_wstate = WKST_READY_TO_WAIT;
671 727
672 728 POOL_LEV_DPRINTF(4,
673 729 ("writing to event pipe..."));
674 730
675 731 /*
676 732 ** Signal task controller to add new session
677 733 ** to poll set.
678 734 */
679 735
680 736 PIPE_SEND_SIGNAL();
681 737 }
682 738 TASKMGR_UNLOCK();
683 739 ctx = NULL;
684 740
685 741 }
686 742
687 743 /* check if there is any task waiting to be served */
688 744 TASKMGR_LOCK();
689 745
690 746 GET_TASK_READY_TO_RUN();
691 747
692 748 /* Got a task? */
693 749 if (ctx != NULL)
694 750 {
695 751 TASKMGR_UNLOCK();
696 752 continue;
697 753 }
698 754
699 755 /*
700 756 ** if not, let's check if there is enough idle workers
701 757 ** if yes: quit
702 758 */
703 759
704 760 if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
705 761 Tskmgr.tm_nb_idle > MIN_IDLE)
706 762 done = true;
707 763
708 764 POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
709 765 Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
710 766
711 767 if (done)
712 768 {
713 769 POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
714 770 Tskmgr.tm_nb_workers--;
715 771 TASKMGR_UNLOCK();
716 772 continue;
717 773 }
718 774
719 775 /*
720 776 ** if no task ready to run, wait for another one
721 777 */
722 778
723 779 Tskmgr.tm_nb_idle++;
724 780 TASKMGR_COND_WAIT();
725 781 Tskmgr.tm_nb_idle--;
726 782
727 783 /* look for a task */
728 784 GET_TASK_READY_TO_RUN();
729 785
730 786 TASKMGR_UNLOCK();
731 787 }
732 788 return NULL;
733 789 }
734 790
735 791 /*
736 792 ** MI_LIST_ADD_CTX -- add new session to linked list
737 793 **
738 794 ** Parameters:
739 795 ** ctx -- context structure
740 796 **
741 797 ** Returns:
742 798 ** MI_FAILURE/MI_SUCCESS
743 799 */
744 800
745 801 static int
746 802 mi_list_add_ctx(ctx)
747 803 SMFICTX_PTR ctx;
748 804 {
749 805 SM_ASSERT(ctx != NULL);
750 806 SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
751 807 return MI_SUCCESS;
752 808 }
753 809
754 810 /*
755 811 ** MI_LIST_DEL_CTX -- remove session from linked list when finished
756 812 **
757 813 ** Parameters:
758 814 ** ctx -- context structure
759 815 **
760 816 ** Returns:
761 817 ** MI_FAILURE/MI_SUCCESS
762 818 */
763 819
764 820 static int
765 821 mi_list_del_ctx(ctx)
766 822 SMFICTX_PTR ctx;
767 823 {
768 824 SM_ASSERT(ctx != NULL);
769 825 if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
770 826 return MI_FAILURE;
771 827
772 828 SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
773 829 return MI_SUCCESS;
774 830 }
775 831 #endif /* _FFR_WORKERS_POOL */
|
↓ open down ↓ |
188 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX