Line data Source code
1 : /*
2 : * A type which wraps a semaphore
3 : *
4 : * semaphore.c
5 : *
6 : * Copyright (c) 2006-2008, R Oudkerk
7 : * Licensed to PSF under a Contributor Agreement.
8 : */
9 :
10 : #include "multiprocessing.h"
11 :
12 : enum { RECURSIVE_MUTEX, SEMAPHORE };
13 :
14 : typedef struct {
15 : PyObject_HEAD
16 : SEM_HANDLE handle;
17 : long last_tid;
18 : int count;
19 : int maxvalue;
20 : int kind;
21 : } SemLockObject;
22 :
23 : #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
24 :
25 :
26 : #ifdef MS_WINDOWS
27 :
28 : /*
29 : * Windows definitions
30 : */
31 :
32 : #define SEM_FAILED NULL
33 :
34 : #define SEM_CLEAR_ERROR() SetLastError(0)
35 : #define SEM_GET_LAST_ERROR() GetLastError()
36 : #define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
37 : #define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
38 : #define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
39 : #define SEM_UNLINK(name) 0
40 :
41 : static int
42 : _GetSemaphoreValue(HANDLE handle, long *value)
43 : {
44 : long previous;
45 :
46 : switch (WaitForSingleObject(handle, 0)) {
47 : case WAIT_OBJECT_0:
48 : if (!ReleaseSemaphore(handle, 1, &previous))
49 : return MP_STANDARD_ERROR;
50 : *value = previous + 1;
51 : return 0;
52 : case WAIT_TIMEOUT:
53 : *value = 0;
54 : return 0;
55 : default:
56 : return MP_STANDARD_ERROR;
57 : }
58 : }
59 :
60 : static PyObject *
61 : semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
62 : {
63 : int blocking = 1;
64 : double timeout;
65 : PyObject *timeout_obj = Py_None;
66 : DWORD res, full_msecs, nhandles;
67 : HANDLE handles[2], sigint_event;
68 :
69 : static char *kwlist[] = {"block", "timeout", NULL};
70 :
71 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
72 : &blocking, &timeout_obj))
73 : return NULL;
74 :
75 : /* calculate timeout */
76 : if (!blocking) {
77 : full_msecs = 0;
78 : } else if (timeout_obj == Py_None) {
79 : full_msecs = INFINITE;
80 : } else {
81 : timeout = PyFloat_AsDouble(timeout_obj);
82 : if (PyErr_Occurred())
83 : return NULL;
84 : timeout *= 1000.0; /* convert to millisecs */
85 : if (timeout < 0.0) {
86 : timeout = 0.0;
87 : } else if (timeout >= 0.5 * INFINITE) { /* 25 days */
88 : PyErr_SetString(PyExc_OverflowError,
89 : "timeout is too large");
90 : return NULL;
91 : }
92 : full_msecs = (DWORD)(timeout + 0.5);
93 : }
94 :
95 : /* check whether we already own the lock */
96 : if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
97 : ++self->count;
98 : Py_RETURN_TRUE;
99 : }
100 :
101 : /* check whether we can acquire without releasing the GIL and blocking */
102 : if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
103 : self->last_tid = GetCurrentThreadId();
104 : ++self->count;
105 : Py_RETURN_TRUE;
106 : }
107 :
108 : /* prepare list of handles */
109 : nhandles = 0;
110 : handles[nhandles++] = self->handle;
111 : if (_PyOS_IsMainThread()) {
112 : sigint_event = _PyOS_SigintEvent();
113 : assert(sigint_event != NULL);
114 : handles[nhandles++] = sigint_event;
115 : }
116 :
117 : /* do the wait */
118 : Py_BEGIN_ALLOW_THREADS
119 : if (sigint_event != NULL)
120 : ResetEvent(sigint_event);
121 : res = WaitForMultipleObjects(nhandles, handles, FALSE, full_msecs);
122 : Py_END_ALLOW_THREADS
123 :
124 : /* handle result */
125 : switch (res) {
126 : case WAIT_TIMEOUT:
127 : Py_RETURN_FALSE;
128 : case WAIT_OBJECT_0 + 0:
129 : self->last_tid = GetCurrentThreadId();
130 : ++self->count;
131 : Py_RETURN_TRUE;
132 : case WAIT_OBJECT_0 + 1:
133 : errno = EINTR;
134 : return PyErr_SetFromErrno(PyExc_IOError);
135 : case WAIT_FAILED:
136 : return PyErr_SetFromWindowsErr(0);
137 : default:
138 : PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
139 : "WaitForMultipleObjects() gave unrecognized "
140 : "value %d", res);
141 : return NULL;
142 : }
143 : }
144 :
145 : static PyObject *
146 : semlock_release(SemLockObject *self, PyObject *args)
147 : {
148 : if (self->kind == RECURSIVE_MUTEX) {
149 : if (!ISMINE(self)) {
150 : PyErr_SetString(PyExc_AssertionError, "attempt to "
151 : "release recursive lock not owned "
152 : "by thread");
153 : return NULL;
154 : }
155 : if (self->count > 1) {
156 : --self->count;
157 : Py_RETURN_NONE;
158 : }
159 : assert(self->count == 1);
160 : }
161 :
162 : if (!ReleaseSemaphore(self->handle, 1, NULL)) {
163 : if (GetLastError() == ERROR_TOO_MANY_POSTS) {
164 : PyErr_SetString(PyExc_ValueError, "semaphore or lock "
165 : "released too many times");
166 : return NULL;
167 : } else {
168 : return PyErr_SetFromWindowsErr(0);
169 : }
170 : }
171 :
172 : --self->count;
173 : Py_RETURN_NONE;
174 : }
175 :
176 : #else /* !MS_WINDOWS */
177 :
178 : /*
179 : * Unix definitions
180 : */
181 :
182 : #define SEM_CLEAR_ERROR()
183 : #define SEM_GET_LAST_ERROR() 0
184 : #define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
185 : #define SEM_CLOSE(sem) sem_close(sem)
186 : #define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
187 : #define SEM_UNLINK(name) sem_unlink(name)
188 :
189 : #ifndef HAVE_SEM_UNLINK
190 : # define sem_unlink(name) 0
191 : #endif
192 :
193 : #ifndef HAVE_SEM_TIMEDWAIT
194 : # define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
195 :
196 : int
197 : sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
198 : {
199 : int res;
200 : unsigned long delay, difference;
201 : struct timeval now, tvdeadline, tvdelay;
202 :
203 : errno = 0;
204 : tvdeadline.tv_sec = deadline->tv_sec;
205 : tvdeadline.tv_usec = deadline->tv_nsec / 1000;
206 :
207 : for (delay = 0 ; ; delay += 1000) {
208 : /* poll */
209 : if (sem_trywait(sem) == 0)
210 : return 0;
211 : else if (errno != EAGAIN)
212 : return MP_STANDARD_ERROR;
213 :
214 : /* get current time */
215 : if (gettimeofday(&now, NULL) < 0)
216 : return MP_STANDARD_ERROR;
217 :
218 : /* check for timeout */
219 : if (tvdeadline.tv_sec < now.tv_sec ||
220 : (tvdeadline.tv_sec == now.tv_sec &&
221 : tvdeadline.tv_usec <= now.tv_usec)) {
222 : errno = ETIMEDOUT;
223 : return MP_STANDARD_ERROR;
224 : }
225 :
226 : /* calculate how much time is left */
227 : difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
228 : (tvdeadline.tv_usec - now.tv_usec);
229 :
230 : /* check delay not too long -- maximum is 20 msecs */
231 : if (delay > 20000)
232 : delay = 20000;
233 : if (delay > difference)
234 : delay = difference;
235 :
236 : /* sleep */
237 : tvdelay.tv_sec = delay / 1000000;
238 : tvdelay.tv_usec = delay % 1000000;
239 : if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
240 : return MP_STANDARD_ERROR;
241 :
242 : /* check for signals */
243 : Py_BLOCK_THREADS
244 : res = PyErr_CheckSignals();
245 : Py_UNBLOCK_THREADS
246 :
247 : if (res) {
248 : errno = EINTR;
249 : return MP_EXCEPTION_HAS_BEEN_SET;
250 : }
251 : }
252 : }
253 :
254 : #endif /* !HAVE_SEM_TIMEDWAIT */
255 :
256 : static PyObject *
257 0 : semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
258 : {
259 0 : int blocking = 1, res, err = 0;
260 : double timeout;
261 0 : PyObject *timeout_obj = Py_None;
262 0 : struct timespec deadline = {0};
263 : struct timeval now;
264 : long sec, nsec;
265 :
266 : static char *kwlist[] = {"block", "timeout", NULL};
267 :
268 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
269 : &blocking, &timeout_obj))
270 0 : return NULL;
271 :
272 0 : if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
273 0 : ++self->count;
274 0 : Py_RETURN_TRUE;
275 : }
276 :
277 0 : if (timeout_obj != Py_None) {
278 0 : timeout = PyFloat_AsDouble(timeout_obj);
279 0 : if (PyErr_Occurred())
280 0 : return NULL;
281 0 : if (timeout < 0.0)
282 0 : timeout = 0.0;
283 :
284 0 : if (gettimeofday(&now, NULL) < 0) {
285 0 : PyErr_SetFromErrno(PyExc_OSError);
286 0 : return NULL;
287 : }
288 0 : sec = (long) timeout;
289 0 : nsec = (long) (1e9 * (timeout - sec) + 0.5);
290 0 : deadline.tv_sec = now.tv_sec + sec;
291 0 : deadline.tv_nsec = now.tv_usec * 1000 + nsec;
292 0 : deadline.tv_sec += (deadline.tv_nsec / 1000000000);
293 0 : deadline.tv_nsec %= 1000000000;
294 : }
295 :
296 : do {
297 0 : Py_BEGIN_ALLOW_THREADS
298 0 : if (blocking && timeout_obj == Py_None)
299 0 : res = sem_wait(self->handle);
300 0 : else if (!blocking)
301 0 : res = sem_trywait(self->handle);
302 : else
303 0 : res = sem_timedwait(self->handle, &deadline);
304 0 : Py_END_ALLOW_THREADS
305 0 : err = errno;
306 0 : if (res == MP_EXCEPTION_HAS_BEEN_SET)
307 0 : break;
308 0 : } while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
309 :
310 0 : if (res < 0) {
311 0 : errno = err;
312 0 : if (errno == EAGAIN || errno == ETIMEDOUT)
313 0 : Py_RETURN_FALSE;
314 0 : else if (errno == EINTR)
315 0 : return NULL;
316 : else
317 0 : return PyErr_SetFromErrno(PyExc_OSError);
318 : }
319 :
320 0 : ++self->count;
321 0 : self->last_tid = PyThread_get_thread_ident();
322 :
323 0 : Py_RETURN_TRUE;
324 : }
325 :
326 : static PyObject *
327 0 : semlock_release(SemLockObject *self, PyObject *args)
328 : {
329 0 : if (self->kind == RECURSIVE_MUTEX) {
330 0 : if (!ISMINE(self)) {
331 0 : PyErr_SetString(PyExc_AssertionError, "attempt to "
332 : "release recursive lock not owned "
333 : "by thread");
334 0 : return NULL;
335 : }
336 0 : if (self->count > 1) {
337 0 : --self->count;
338 0 : Py_RETURN_NONE;
339 : }
340 : assert(self->count == 1);
341 : } else {
342 : #ifdef HAVE_BROKEN_SEM_GETVALUE
343 : /* We will only check properly the maxvalue == 1 case */
344 : if (self->maxvalue == 1) {
345 : /* make sure that already locked */
346 : if (sem_trywait(self->handle) < 0) {
347 : if (errno != EAGAIN) {
348 : PyErr_SetFromErrno(PyExc_OSError);
349 : return NULL;
350 : }
351 : /* it is already locked as expected */
352 : } else {
353 : /* it was not locked so undo wait and raise */
354 : if (sem_post(self->handle) < 0) {
355 : PyErr_SetFromErrno(PyExc_OSError);
356 : return NULL;
357 : }
358 : PyErr_SetString(PyExc_ValueError, "semaphore "
359 : "or lock released too many "
360 : "times");
361 : return NULL;
362 : }
363 : }
364 : #else
365 : int sval;
366 :
367 : /* This check is not an absolute guarantee that the semaphore
368 : does not rise above maxvalue. */
369 0 : if (sem_getvalue(self->handle, &sval) < 0) {
370 0 : return PyErr_SetFromErrno(PyExc_OSError);
371 0 : } else if (sval >= self->maxvalue) {
372 0 : PyErr_SetString(PyExc_ValueError, "semaphore or lock "
373 : "released too many times");
374 0 : return NULL;
375 : }
376 : #endif
377 : }
378 :
379 0 : if (sem_post(self->handle) < 0)
380 0 : return PyErr_SetFromErrno(PyExc_OSError);
381 :
382 0 : --self->count;
383 0 : Py_RETURN_NONE;
384 : }
385 :
386 : #endif /* !MS_WINDOWS */
387 :
388 : /*
389 : * All platforms
390 : */
391 :
392 : static PyObject *
393 0 : newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
394 : {
395 : SemLockObject *self;
396 :
397 0 : self = PyObject_New(SemLockObject, type);
398 0 : if (!self)
399 0 : return NULL;
400 0 : self->handle = handle;
401 0 : self->kind = kind;
402 0 : self->count = 0;
403 0 : self->last_tid = 0;
404 0 : self->maxvalue = maxvalue;
405 0 : return (PyObject*)self;
406 : }
407 :
408 : static PyObject *
409 0 : semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
410 : {
411 : char buffer[256];
412 0 : SEM_HANDLE handle = SEM_FAILED;
413 : int kind, maxvalue, value;
414 : PyObject *result;
415 : static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
416 : static int counter = 0;
417 :
418 0 : if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
419 : &kind, &value, &maxvalue))
420 0 : return NULL;
421 :
422 0 : if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
423 0 : PyErr_SetString(PyExc_ValueError, "unrecognized kind");
424 0 : return NULL;
425 : }
426 :
427 0 : PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
428 :
429 : SEM_CLEAR_ERROR();
430 0 : handle = SEM_CREATE(buffer, value, maxvalue);
431 : /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
432 0 : if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
433 0 : goto failure;
434 :
435 0 : if (SEM_UNLINK(buffer) < 0)
436 0 : goto failure;
437 :
438 0 : result = newsemlockobject(type, handle, kind, maxvalue);
439 0 : if (!result)
440 0 : goto failure;
441 :
442 0 : return result;
443 :
444 : failure:
445 0 : if (handle != SEM_FAILED)
446 0 : SEM_CLOSE(handle);
447 0 : mp_SetError(NULL, MP_STANDARD_ERROR);
448 0 : return NULL;
449 : }
450 :
451 : static PyObject *
452 0 : semlock_rebuild(PyTypeObject *type, PyObject *args)
453 : {
454 : SEM_HANDLE handle;
455 : int kind, maxvalue;
456 :
457 0 : if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
458 : &handle, &kind, &maxvalue))
459 0 : return NULL;
460 :
461 0 : return newsemlockobject(type, handle, kind, maxvalue);
462 : }
463 :
464 : static void
465 0 : semlock_dealloc(SemLockObject* self)
466 : {
467 0 : if (self->handle != SEM_FAILED)
468 0 : SEM_CLOSE(self->handle);
469 0 : PyObject_Del(self);
470 0 : }
471 :
472 : static PyObject *
473 0 : semlock_count(SemLockObject *self)
474 : {
475 0 : return PyLong_FromLong((long)self->count);
476 : }
477 :
478 : static PyObject *
479 0 : semlock_ismine(SemLockObject *self)
480 : {
481 : /* only makes sense for a lock */
482 0 : return PyBool_FromLong(ISMINE(self));
483 : }
484 :
485 : static PyObject *
486 0 : semlock_getvalue(SemLockObject *self)
487 : {
488 : #ifdef HAVE_BROKEN_SEM_GETVALUE
489 : PyErr_SetNone(PyExc_NotImplementedError);
490 : return NULL;
491 : #else
492 : int sval;
493 0 : if (SEM_GETVALUE(self->handle, &sval) < 0)
494 0 : return mp_SetError(NULL, MP_STANDARD_ERROR);
495 : /* some posix implementations use negative numbers to indicate
496 : the number of waiting threads */
497 0 : if (sval < 0)
498 0 : sval = 0;
499 0 : return PyLong_FromLong((long)sval);
500 : #endif
501 : }
502 :
503 : static PyObject *
504 0 : semlock_iszero(SemLockObject *self)
505 : {
506 : #ifdef HAVE_BROKEN_SEM_GETVALUE
507 : if (sem_trywait(self->handle) < 0) {
508 : if (errno == EAGAIN)
509 : Py_RETURN_TRUE;
510 : return mp_SetError(NULL, MP_STANDARD_ERROR);
511 : } else {
512 : if (sem_post(self->handle) < 0)
513 : return mp_SetError(NULL, MP_STANDARD_ERROR);
514 : Py_RETURN_FALSE;
515 : }
516 : #else
517 : int sval;
518 0 : if (SEM_GETVALUE(self->handle, &sval) < 0)
519 0 : return mp_SetError(NULL, MP_STANDARD_ERROR);
520 0 : return PyBool_FromLong((long)sval == 0);
521 : #endif
522 : }
523 :
524 : static PyObject *
525 0 : semlock_afterfork(SemLockObject *self)
526 : {
527 0 : self->count = 0;
528 0 : Py_RETURN_NONE;
529 : }
530 :
531 : /*
532 : * Semaphore methods
533 : */
534 :
535 : static PyMethodDef semlock_methods[] = {
536 : {"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
537 : "acquire the semaphore/lock"},
538 : {"release", (PyCFunction)semlock_release, METH_NOARGS,
539 : "release the semaphore/lock"},
540 : {"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
541 : "enter the semaphore/lock"},
542 : {"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
543 : "exit the semaphore/lock"},
544 : {"_count", (PyCFunction)semlock_count, METH_NOARGS,
545 : "num of `acquire()`s minus num of `release()`s for this process"},
546 : {"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
547 : "whether the lock is owned by this thread"},
548 : {"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
549 : "get the value of the semaphore"},
550 : {"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
551 : "returns whether semaphore has value zero"},
552 : {"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
553 : ""},
554 : {"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
555 : "rezero the net acquisition count after fork()"},
556 : {NULL}
557 : };
558 :
559 : /*
560 : * Member table
561 : */
562 :
563 : static PyMemberDef semlock_members[] = {
564 : {"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
565 : ""},
566 : {"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
567 : ""},
568 : {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
569 : ""},
570 : {NULL}
571 : };
572 :
573 : /*
574 : * Semaphore type
575 : */
576 :
577 : PyTypeObject SemLockType = {
578 : PyVarObject_HEAD_INIT(NULL, 0)
579 : /* tp_name */ "_multiprocessing.SemLock",
580 : /* tp_basicsize */ sizeof(SemLockObject),
581 : /* tp_itemsize */ 0,
582 : /* tp_dealloc */ (destructor)semlock_dealloc,
583 : /* tp_print */ 0,
584 : /* tp_getattr */ 0,
585 : /* tp_setattr */ 0,
586 : /* tp_reserved */ 0,
587 : /* tp_repr */ 0,
588 : /* tp_as_number */ 0,
589 : /* tp_as_sequence */ 0,
590 : /* tp_as_mapping */ 0,
591 : /* tp_hash */ 0,
592 : /* tp_call */ 0,
593 : /* tp_str */ 0,
594 : /* tp_getattro */ 0,
595 : /* tp_setattro */ 0,
596 : /* tp_as_buffer */ 0,
597 : /* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
598 : /* tp_doc */ "Semaphore/Mutex type",
599 : /* tp_traverse */ 0,
600 : /* tp_clear */ 0,
601 : /* tp_richcompare */ 0,
602 : /* tp_weaklistoffset */ 0,
603 : /* tp_iter */ 0,
604 : /* tp_iternext */ 0,
605 : /* tp_methods */ semlock_methods,
606 : /* tp_members */ semlock_members,
607 : /* tp_getset */ 0,
608 : /* tp_base */ 0,
609 : /* tp_dict */ 0,
610 : /* tp_descr_get */ 0,
611 : /* tp_descr_set */ 0,
612 : /* tp_dictoffset */ 0,
613 : /* tp_init */ 0,
614 : /* tp_alloc */ 0,
615 : /* tp_new */ semlock_new,
616 : };
|