1 | // Copyright (c) 2019-2023 Alexander Medvednikov. All rights reserved. |
2 | // Use of this source code is governed by an MIT license |
3 | // that can be found in the LICENSE file. |
4 | module sync |
5 | |
6 | import time |
7 | |
8 | #flag -lpthread |
9 | #include <semaphore.h> |
10 | #include <sys/errno.h> |
11 | |
12 | [trusted] |
13 | fn C.pthread_mutex_init(voidptr, voidptr) int |
14 | fn C.pthread_mutex_lock(voidptr) int |
15 | fn C.pthread_mutex_unlock(voidptr) int |
16 | fn C.pthread_mutex_destroy(voidptr) int |
17 | fn C.pthread_rwlockattr_init(voidptr) int |
18 | fn C.pthread_rwlockattr_setkind_np(voidptr, int) int |
19 | fn C.pthread_rwlockattr_setpshared(voidptr, int) int |
20 | fn C.pthread_rwlock_init(voidptr, voidptr) int |
21 | fn C.pthread_rwlock_rdlock(voidptr) int |
22 | fn C.pthread_rwlock_wrlock(voidptr) int |
23 | fn C.pthread_rwlock_unlock(voidptr) int |
24 | fn C.pthread_rwlock_destroy(voidptr) int |
25 | fn C.pthread_condattr_init(voidptr) int |
26 | fn C.pthread_condattr_setpshared(voidptr, int) int |
27 | fn C.pthread_condattr_destroy(voidptr) int |
28 | fn C.pthread_cond_init(voidptr, voidptr) int |
29 | fn C.pthread_cond_signal(voidptr) int |
30 | fn C.pthread_cond_wait(voidptr, voidptr) int |
31 | fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int |
32 | fn C.pthread_cond_destroy(voidptr) int |
33 | |
34 | [typedef] |
35 | struct C.pthread_mutex_t {} |
36 | |
37 | [typedef] |
38 | struct C.pthread_cond_t {} |
39 | |
40 | [typedef] |
41 | struct C.pthread_rwlock_t {} |
42 | |
43 | [typedef] |
44 | struct C.pthread_rwlockattr_t {} |
45 | |
46 | [typedef] |
47 | struct C.sem_t {} |
48 | |
49 | // [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. |
50 | [heap] |
51 | pub struct Mutex { |
52 | mutex C.pthread_mutex_t |
53 | } |
54 | |
55 | [heap] |
56 | pub struct RwMutex { |
57 | mutex C.pthread_rwlock_t |
58 | } |
59 | |
60 | struct RwMutexAttr { |
61 | attr C.pthread_rwlockattr_t |
62 | } |
63 | |
64 | struct CondAttr { |
65 | attr C.pthread_condattr_t |
66 | } |
67 | |
68 | /* |
69 | MacOSX has no unnamed semaphores and no `timed_wait()` at all |
70 | so we emulate the behaviour with other devices |
71 | */ |
72 | [heap] |
73 | pub struct Semaphore { |
74 | mtx C.pthread_mutex_t |
75 | cond C.pthread_cond_t |
76 | mut: |
77 | count u32 |
78 | } |
79 | |
80 | // new_mutex creates and initialises a new mutex instance on the heap, then returns a pointer to it. |
81 | pub fn new_mutex() &Mutex { |
82 | mut m := &Mutex{} |
83 | m.init() |
84 | return m |
85 | } |
86 | |
87 | // init initialises the mutex. It should be called once before the mutex is used, |
88 | // since it creates the associated resources needed for the mutex to work properly. |
89 | [inline] |
90 | pub fn (mut m Mutex) init() { |
91 | C.pthread_mutex_init(&m.mutex, C.NULL) |
92 | } |
93 | |
94 | // new_rwmutex creates a new read/write mutex instance on the heap, and returns a pointer to it. |
95 | pub fn new_rwmutex() &RwMutex { |
96 | mut m := &RwMutex{} |
97 | m.init() |
98 | return m |
99 | } |
100 | |
101 | // init initialises the RwMutex instance. It should be called once before the rw mutex is used, |
102 | // since it creates the associated resources needed for the mutex to work properly. |
103 | pub fn (mut m RwMutex) init() { |
104 | a := RwMutexAttr{} |
105 | C.pthread_rwlockattr_init(&a.attr) |
106 | // Give writer priority over readers |
107 | C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) |
108 | C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) |
109 | C.pthread_rwlock_init(&m.mutex, &a.attr) |
110 | } |
111 | |
112 | // @lock locks the mutex instance (`lock` is a keyword). |
113 | // If the mutex was already locked, it will block, till it is unlocked. |
114 | [inline] |
115 | pub fn (mut m Mutex) @lock() { |
116 | C.pthread_mutex_lock(&m.mutex) |
117 | } |
118 | |
119 | // unlock unlocks the mutex instance. The mutex is released, and one of |
120 | // the other threads, that were blocked, because they called @lock can continue. |
121 | [inline] |
122 | pub fn (mut m Mutex) unlock() { |
123 | C.pthread_mutex_unlock(&m.mutex) |
124 | } |
125 | |
126 | // destroy frees the resources associated with the mutex instance. |
127 | // Note: the mutex itself is not freed. |
128 | [inline] |
129 | pub fn (mut m Mutex) destroy() { |
130 | res := C.pthread_mutex_destroy(&m.mutex) |
131 | if res != 0 { |
132 | cpanic(res) |
133 | } |
134 | } |
135 | |
136 | // @rlock locks the given RwMutex instance for reading. |
137 | // If the mutex was already locked, it will block, and will try to get the lock, |
138 | // once the lock is released by another thread calling unlock. |
139 | // Once it succeds, it returns. |
140 | // Note: there may be several threads that are waiting for the same lock. |
141 | // Note: RwMutex has separate read and write locks. |
142 | [inline] |
143 | pub fn (mut m RwMutex) @rlock() { |
144 | C.pthread_rwlock_rdlock(&m.mutex) |
145 | } |
146 | |
147 | // @lock locks the given RwMutex instance for writing. |
148 | // If the mutex was already locked, it will block, till it is unlocked, |
149 | // then it will try to get the lock, and if it can, it will return, otherwise |
150 | // it will continue waiting for the mutex to become unlocked. |
151 | // Note: there may be several threads that are waiting for the same lock. |
152 | // Note: RwMutex has separate read and write locks. |
153 | [inline] |
154 | pub fn (mut m RwMutex) @lock() { |
155 | C.pthread_rwlock_wrlock(&m.mutex) |
156 | } |
157 | |
158 | // destroy frees the resources associated with the rwmutex instance. |
159 | // Note: the mutex itself is not freed. |
160 | [inline] |
161 | pub fn (mut m RwMutex) destroy() { |
162 | res := C.pthread_rwlock_destroy(&m.mutex) |
163 | if res != 0 { |
164 | cpanic(res) |
165 | } |
166 | } |
167 | |
168 | // runlock unlocks the RwMutex instance, locked for reading. |
169 | // Note: Windows SRWLocks have different function to unlocking. |
170 | // To have a common portable API, there are two methods for |
171 | // unlocking here as well, even though that they do the same |
172 | // on !windows platforms. |
173 | [inline] |
174 | pub fn (mut m RwMutex) runlock() { |
175 | C.pthread_rwlock_unlock(&m.mutex) |
176 | } |
177 | |
178 | // unlock unlocks the RwMutex instance, locked for writing. |
179 | // Note: Windows SRWLocks have different function to unlocking. |
180 | // To have a common portable API, there are two methods for |
181 | // unlocking here as well, even though that they do the same |
182 | // on !windows platforms. |
183 | [inline] |
184 | pub fn (mut m RwMutex) unlock() { |
185 | C.pthread_rwlock_unlock(&m.mutex) |
186 | } |
187 | |
188 | // new_semaphore creates a new initialised Semaphore instance on the heap, and returns a pointer to it. |
189 | // The initial counter value of the semaphore is 0. |
190 | [inline] |
191 | pub fn new_semaphore() &Semaphore { |
192 | return new_semaphore_init(0) |
193 | } |
194 | |
195 | // new_semaphore_init creates a new initialised Semaphore instance on the heap, and returns a pointer to it. |
196 | // The `n` parameter can be used to set the initial counter value of the semaphore. |
197 | pub fn new_semaphore_init(n u32) &Semaphore { |
198 | mut sem := &Semaphore{} |
199 | sem.init(n) |
200 | return sem |
201 | } |
202 | |
203 | // init initialises the Semaphore instance with `n` as its initial counter value. |
204 | // It should be called once before the semaphore is used, since it creates the associated |
205 | // resources needed for the semaphore to work properly. |
206 | pub fn (mut sem Semaphore) init(n u32) { |
207 | C.atomic_store_u32(&sem.count, n) |
208 | C.pthread_mutex_init(&sem.mtx, C.NULL) |
209 | attr := CondAttr{} |
210 | C.pthread_condattr_init(&attr.attr) |
211 | C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE) |
212 | C.pthread_cond_init(&sem.cond, &attr.attr) |
213 | C.pthread_condattr_destroy(&attr.attr) |
214 | } |
215 | |
216 | // post increases the counter of the semaphore by 1. |
217 | // If the resulting counter value is > 0, and if there is a thread waiting |
218 | // on the semaphore, the waiting thread will decrement the counter by 1, and |
219 | // then will continue running. See also .wait() . |
220 | pub fn (mut sem Semaphore) post() { |
221 | mut c := C.atomic_load_u32(&sem.count) |
222 | for c > 1 { |
223 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c + 1) { |
224 | return |
225 | } |
226 | } |
227 | |
228 | C.pthread_mutex_lock(&sem.mtx) |
229 | c = C.atomic_fetch_add_u32(&sem.count, 1) |
230 | if c == 0 { |
231 | C.pthread_cond_signal(&sem.cond) |
232 | } |
233 | C.pthread_mutex_unlock(&sem.mtx) |
234 | } |
235 | |
236 | // wait will just decrement the semaphore count, if it was positive. |
237 | // It it was not positive, it will waits for the semaphore count to reach a positive number. |
238 | // When that happens, it will decrease the semaphore count, and will return. |
239 | // In effect, it allows you to block threads, until the semaphore, is posted by another thread. |
240 | // See also .post() . |
241 | pub fn (mut sem Semaphore) wait() { |
242 | mut c := C.atomic_load_u32(&sem.count) |
243 | for c > 0 { |
244 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
245 | return |
246 | } |
247 | } |
248 | |
249 | C.pthread_mutex_lock(&sem.mtx) |
250 | c = C.atomic_load_u32(&sem.count) |
251 | outer: for { |
252 | if c == 0 { |
253 | C.pthread_cond_wait(&sem.cond, &sem.mtx) |
254 | c = C.atomic_load_u32(&sem.count) |
255 | } |
256 | for c > 0 { |
257 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
258 | if c > 1 { |
259 | C.pthread_cond_signal(&sem.cond) |
260 | } |
261 | break outer |
262 | } |
263 | } |
264 | } |
265 | C.pthread_mutex_unlock(&sem.mtx) |
266 | } |
267 | |
268 | // try_wait tries to decrease the semaphore count by 1, if it was positive. |
269 | // If it succeeds in that, it returns true, otherwise it returns false. |
270 | // Note: try_wait should return as fast as possible so error handling is only |
271 | // done when debugging |
272 | pub fn (mut sem Semaphore) try_wait() bool { |
273 | mut c := C.atomic_load_u32(&sem.count) |
274 | for c > 0 { |
275 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
276 | return true |
277 | } |
278 | } |
279 | return false |
280 | } |
281 | |
282 | // timed_wait is similar to .wait(), but it also accepts a timeout duration, |
283 | // thus it can return false early, if the timeout passed before the semaphore was posted. |
284 | pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { |
285 | mut c := C.atomic_load_u32(&sem.count) |
286 | for c > 0 { |
287 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
288 | return true |
289 | } |
290 | } |
291 | C.pthread_mutex_lock(&sem.mtx) |
292 | t_spec := timeout.timespec() |
293 | mut res := 0 |
294 | c = C.atomic_load_u32(&sem.count) |
295 | |
296 | outer: for { |
297 | if c == 0 { |
298 | res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec) |
299 | if res == C.ETIMEDOUT { |
300 | break outer |
301 | } |
302 | c = C.atomic_load_u32(&sem.count) |
303 | } |
304 | for c > 0 { |
305 | if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c - 1) { |
306 | if c > 1 { |
307 | C.pthread_cond_signal(&sem.cond) |
308 | } |
309 | break outer |
310 | } |
311 | } |
312 | } |
313 | C.pthread_mutex_unlock(&sem.mtx) |
314 | return res == 0 |
315 | } |
316 | |
317 | // destroy frees the resources associated with the Semaphore instance. |
318 | // Note: the semaphore instance itself is not freed. |
319 | pub fn (mut sem Semaphore) destroy() { |
320 | mut res := C.pthread_cond_destroy(&sem.cond) |
321 | if res != 0 { |
322 | cpanic(res) |
323 | } |
324 | res = C.pthread_mutex_destroy(&sem.mtx) |
325 | if res != 0 { |
326 | cpanic(res) |
327 | } |
328 | } |