v / vlib / sync
Raw file | 328 loc (292 sloc) | 5.49 KB | Latest commit hash bf0c8a0d9
1// Copyright (c) 2019-2023 Alexander Medvednikov. All rights reserved.
2// Use of this source code is governed by an MIT license
3// that can be found in the LICENSE file.
4module sync
5
6import time
7
8#flag -lpthread
9#include <semaphore.h>
10#include <sys/errno.h>
11
12[trusted]
13fn C.pthread_mutex_init(voidptr, voidptr) int
14fn C.pthread_mutex_lock(voidptr) int
15fn C.pthread_mutex_unlock(voidptr) int
16fn C.pthread_mutex_destroy(voidptr) int
17fn C.pthread_rwlockattr_init(voidptr) int
18fn C.pthread_rwlockattr_setkind_np(voidptr, int) int
19fn C.pthread_rwlockattr_setpshared(voidptr, int) int
20fn C.pthread_rwlock_init(voidptr, voidptr) int
21fn C.pthread_rwlock_rdlock(voidptr) int
22fn C.pthread_rwlock_wrlock(voidptr) int
23fn C.pthread_rwlock_unlock(voidptr) int
24fn C.pthread_rwlock_destroy(voidptr) int
25fn C.pthread_condattr_init(voidptr) int
26fn C.pthread_condattr_setpshared(voidptr, int) int
27fn C.pthread_condattr_destroy(voidptr) int
28fn C.pthread_cond_init(voidptr, voidptr) int
29fn C.pthread_cond_signal(voidptr) int
30fn C.pthread_cond_wait(voidptr, voidptr) int
31fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int
32fn C.pthread_cond_destroy(voidptr) int
33
34[typedef]
35struct C.pthread_mutex_t {}
36
37[typedef]
38struct C.pthread_cond_t {}
39
40[typedef]
41struct C.pthread_rwlock_t {}
42
43[typedef]
44struct C.pthread_rwlockattr_t {}
45
46[typedef]
47struct C.sem_t {}
48
49// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
50[heap]
51pub struct Mutex {
52 mutex C.pthread_mutex_t
53}
54
55[heap]
56pub struct RwMutex {
57 mutex C.pthread_rwlock_t
58}
59
60struct RwMutexAttr {
61 attr C.pthread_rwlockattr_t
62}
63
64struct CondAttr {
65 attr C.pthread_condattr_t
66}
67
68/*
69MacOSX has no unnamed semaphores and no `timed_wait()` at all
70 so we emulate the behaviour with other devices
71*/
72[heap]
73pub struct Semaphore {
74 mtx C.pthread_mutex_t
75 cond C.pthread_cond_t
76mut:
77 count u32
78}
79
80// new_mutex creates and initialises a new mutex instance on the heap, then returns a pointer to it.
81pub fn new_mutex() &Mutex {
82 mut m := &Mutex{}
83 m.init()
84 return m
85}
86
87// init initialises the mutex. It should be called once before the mutex is used,
88// since it creates the associated resources needed for the mutex to work properly.
89[inline]
90pub fn (mut m Mutex) init() {
91 C.pthread_mutex_init(&m.mutex, C.NULL)
92}
93
94// new_rwmutex creates a new read/write mutex instance on the heap, and returns a pointer to it.
95pub fn new_rwmutex() &RwMutex {
96 mut m := &RwMutex{}
97 m.init()
98 return m
99}
100
101// init initialises the RwMutex instance. It should be called once before the rw mutex is used,
102// since it creates the associated resources needed for the mutex to work properly.
103pub fn (mut m RwMutex) init() {
104 a := RwMutexAttr{}
105 C.pthread_rwlockattr_init(&a.attr)
106 // Give writer priority over readers
107 C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)
108 C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
109 C.pthread_rwlock_init(&m.mutex, &a.attr)
110}
111
112// @lock locks the mutex instance (`lock` is a keyword).
113// If the mutex was already locked, it will block, till it is unlocked.
114[inline]
115pub fn (mut m Mutex) @lock() {
116 C.pthread_mutex_lock(&m.mutex)
117}
118
119// unlock unlocks the mutex instance. The mutex is released, and one of
120// the other threads, that were blocked, because they called @lock can continue.
121[inline]
122pub fn (mut m Mutex) unlock() {
123 C.pthread_mutex_unlock(&m.mutex)
124}
125
126// destroy frees the resources associated with the mutex instance.
127// Note: the mutex itself is not freed.
128[inline]
129pub fn (mut m Mutex) destroy() {
130 res := C.pthread_mutex_destroy(&m.mutex)
131 if res != 0 {
132 cpanic(res)
133 }
134}
135
136// @rlock locks the given RwMutex instance for reading.
137// If the mutex was already locked, it will block, and will try to get the lock,
138// once the lock is released by another thread calling unlock.
139// Once it succeds, it returns.
140// Note: there may be several threads that are waiting for the same lock.
141// Note: RwMutex has separate read and write locks.
142[inline]
143pub fn (mut m RwMutex) @rlock() {
144 C.pthread_rwlock_rdlock(&m.mutex)
145}
146
147// @lock locks the given RwMutex instance for writing.
148// If the mutex was already locked, it will block, till it is unlocked,
149// then it will try to get the lock, and if it can, it will return, otherwise
150// it will continue waiting for the mutex to become unlocked.
151// Note: there may be several threads that are waiting for the same lock.
152// Note: RwMutex has separate read and write locks.
153[inline]
154pub fn (mut m RwMutex) @lock() {
155 C.pthread_rwlock_wrlock(&m.mutex)
156}
157
158// destroy frees the resources associated with the rwmutex instance.
159// Note: the mutex itself is not freed.
160[inline]
161pub fn (mut m RwMutex) destroy() {
162 res := C.pthread_rwlock_destroy(&m.mutex)
163 if res != 0 {
164 cpanic(res)
165 }
166}
167
168// runlock unlocks the RwMutex instance, locked for reading.
169// Note: Windows SRWLocks have different function to unlocking.
170// To have a common portable API, there are two methods for
171// unlocking here as well, even though that they do the same
172// on !windows platforms.
173[inline]
174pub fn (mut m RwMutex) runlock() {
175 C.pthread_rwlock_unlock(&m.mutex)
176}
177
178// unlock unlocks the RwMutex instance, locked for writing.
179// Note: Windows SRWLocks have different function to unlocking.
180// To have a common portable API, there are two methods for
181// unlocking here as well, even though that they do the same
182// on !windows platforms.
183[inline]
184pub fn (mut m RwMutex) unlock() {
185 C.pthread_rwlock_unlock(&m.mutex)
186}
187
188// new_semaphore creates a new initialised Semaphore instance on the heap, and returns a pointer to it.
189// The initial counter value of the semaphore is 0.
190[inline]
191pub fn new_semaphore() &Semaphore {
192 return new_semaphore_init(0)
193}
194
195// new_semaphore_init creates a new initialised Semaphore instance on the heap, and returns a pointer to it.
196// The `n` parameter can be used to set the initial counter value of the semaphore.
197pub fn new_semaphore_init(n u32) &Semaphore {
198 mut sem := &Semaphore{}
199 sem.init(n)
200 return sem
201}
202
203// init initialises the Semaphore instance with `n` as its initial counter value.
204// It should be called once before the semaphore is used, since it creates the associated
205// resources needed for the semaphore to work properly.
206pub fn (mut sem Semaphore) init(n u32) {
207 C.atomic_store_u32(&sem.count, n)
208 C.pthread_mutex_init(&sem.mtx, C.NULL)
209 attr := CondAttr{}
210 C.pthread_condattr_init(&attr.attr)
211 C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE)
212 C.pthread_cond_init(&sem.cond, &attr.attr)
213 C.pthread_condattr_destroy(&attr.attr)
214}
215
216// post increases the counter of the semaphore by 1.
217// If the resulting counter value is > 0, and if there is a thread waiting
218// on the semaphore, the waiting thread will decrement the counter by 1, and
219// then will continue running. See also .wait() .
220pub fn (mut sem Semaphore) post() {
221 mut c := C.atomic_load_u32(&sem.count)
222 for c > 1 {
223 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) {
224 return
225 }
226 }
227
228 C.pthread_mutex_lock(&sem.mtx)
229 c = C.atomic_fetch_add_u32(&sem.count, 1)
230 if c == 0 {
231 C.pthread_cond_signal(&sem.cond)
232 }
233 C.pthread_mutex_unlock(&sem.mtx)
234}
235
236// wait will just decrement the semaphore count, if it was positive.
237// It it was not positive, it will waits for the semaphore count to reach a positive number.
238// When that happens, it will decrease the semaphore count, and will return.
239// In effect, it allows you to block threads, until the semaphore, is posted by another thread.
240// See also .post() .
241pub fn (mut sem Semaphore) wait() {
242 mut c := C.atomic_load_u32(&sem.count)
243 for c > 0 {
244 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
245 return
246 }
247 }
248
249 C.pthread_mutex_lock(&sem.mtx)
250 c = C.atomic_load_u32(&sem.count)
251 outer: for {
252 if c == 0 {
253 C.pthread_cond_wait(&sem.cond, &sem.mtx)
254 c = C.atomic_load_u32(&sem.count)
255 }
256 for c > 0 {
257 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
258 if c > 1 {
259 C.pthread_cond_signal(&sem.cond)
260 }
261 break outer
262 }
263 }
264 }
265 C.pthread_mutex_unlock(&sem.mtx)
266}
267
268// try_wait tries to decrease the semaphore count by 1, if it was positive.
269// If it succeeds in that, it returns true, otherwise it returns false.
270// Note: try_wait should return as fast as possible so error handling is only
271// done when debugging
272pub fn (mut sem Semaphore) try_wait() bool {
273 mut c := C.atomic_load_u32(&sem.count)
274 for c > 0 {
275 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
276 return true
277 }
278 }
279 return false
280}
281
282// timed_wait is similar to .wait(), but it also accepts a timeout duration,
283// thus it can return false early, if the timeout passed before the semaphore was posted.
284pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
285 mut c := C.atomic_load_u32(&sem.count)
286 for c > 0 {
287 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
288 return true
289 }
290 }
291 C.pthread_mutex_lock(&sem.mtx)
292 t_spec := timeout.timespec()
293 mut res := 0
294 c = C.atomic_load_u32(&sem.count)
295
296 outer: for {
297 if c == 0 {
298 res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec)
299 if res == C.ETIMEDOUT {
300 break outer
301 }
302 c = C.atomic_load_u32(&sem.count)
303 }
304 for c > 0 {
305 if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) {
306 if c > 1 {
307 C.pthread_cond_signal(&sem.cond)
308 }
309 break outer
310 }
311 }
312 }
313 C.pthread_mutex_unlock(&sem.mtx)
314 return res == 0
315}
316
317// destroy frees the resources associated with the Semaphore instance.
318// Note: the semaphore instance itself is not freed.
319pub fn (mut sem Semaphore) destroy() {
320 mut res := C.pthread_cond_destroy(&sem.cond)
321 if res != 0 {
322 cpanic(res)
323 }
324 res = C.pthread_mutex_destroy(&sem.mtx)
325 if res != 0 {
326 cpanic(res)
327 }
328}