rts-parallel.c
1 //! @file rts-parallel.c
2 //! @author J. Marcel van der Veer
3
4 //! @section Copyright
5 //!
6 //! This file is part of Algol68G - an Algol 68 compiler-interpreter.
7 //! Copyright 2001-2024 J. Marcel van der Veer [algol68g@xs4all.nl].
8
9 //! @section License
10 //!
11 //! This program is free software; you can redistribute it and/or modify it
12 //! under the terms of the GNU General Public License as published by the
13 //! Free Software Foundation; either version 3 of the License, or
14 //! (at your option) any later version.
15 //!
16 //! This program is distributed in the hope that it will be useful, but
17 //! WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
18 //! or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
19 //! more details. You should have received a copy of the GNU General Public
20 //! License along with this program. If not, see [http://www.gnu.org/licenses/].
21
22 //! @section Synopsis
23 //!
24 //! Parallel clause implementation.
25
26 #include "a68g.h"
27 #include "a68g-genie.h"
28 #include "a68g-frames.h"
29 #include "a68g-prelude.h"
30
31 // This code implements a parallel clause for Algol68G.
32 // The parallel clause has been included for educational purposes;
33 // this implementation is not the most efficient one.
34 //
35 // POSIX threads are used to have separate registers and stack for each concurrent
36 // unit. Algol68G parallel units behave as POSIX threads - they have private
37 // stacks. Hence an assignation to an object in another thread, does not change
38 // that object in that other thread. Also jumps between threads are forbidden.
39
40 #if defined (BUILD_PARALLEL_CLAUSE)
41
42 // static pthread_mutex_t unit_sema = PTHREAD_MUTEX_INITIALIZER;
43
44 void save_stacks (pthread_t);
45 void restore_stacks (pthread_t);
46
47 #define SAVE_STACK(stk, st, si) {\
48 A68_STACK_DESCRIPTOR *s = (stk);\
49 BYTE_T *start = (st);\
50 int size = (si);\
51 if (size > 0) {\
52 if (!((s != NULL) && (BYTES (s) > 0) && (size <= BYTES (s)))) {\
53 a68_free (SWAP (s));\
54 SWAP (s) = (BYTE_T *) get_heap_space ((size_t) size);\
55 ABEND (SWAP (s) == NULL, ERROR_OUT_OF_CORE, __func__);\
56 }\
57 START (s) = start;\
58 BYTES (s) = size;\
59 COPY (SWAP (s), start, size);\
60 } else {\
61 START (s) = start;\
62 BYTES (s) = 0;\
63 a68_free (SWAP (s));\
64 SWAP (s) = NO_BYTE;\
65 }}
66
67 #define RESTORE_STACK(stk) {\
68 A68_STACK_DESCRIPTOR *s = (stk);\
69 if (s != NULL && BYTES (s) > 0) {\
70 COPY (START (s), SWAP (s), BYTES (s));\
71 }}
72
73 #define GET_THREAD_INDEX(z, ptid) {\
74 int _k_;\
75 pthread_t _tid_ = (ptid);\
76 (z) = -1;\
77 for (_k_ = 0; _k_ < A68_PAR (context_index) && (z) == -1; _k_++) {\
78 if (SAME_THREAD (_tid_, ID (&(A68_PAR (context)[_k_])))) {\
79 (z) = _k_;\
80 }\
81 }\
82 ABEND ((z) == -1, ERROR_INTERNAL_CONSISTENCY, __func__);\
83 }
84
85 #define ERROR_THREAD_FAULT "thread fault"
86
87 #define LOCK_THREAD {\
88 ABEND (pthread_mutex_lock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
89 }
90
91 #define UNLOCK_THREAD {\
92 ABEND (pthread_mutex_unlock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
93 }
94
95 //! @brief Does system stack grow up or down?.
96
97 static inline int stack_direction (BYTE_T * lwb)
98 {
99 BYTE_T upb;
100 if (&upb > lwb) {
101 return (int) sizeof (BYTE_T);
102 } else if (&upb < lwb) {
103 return - (int) sizeof (BYTE_T);
104 } else {
105 ASSERT (A68_FALSE);
106 return 0; // Pro forma
107 }
108 }
109
110 //! @brief Whether we are in the main thread.
111
112 BOOL_T is_main_thread (void)
113 {
114 return SAME_THREAD (A68_PAR (main_thread_id), pthread_self ());
115 }
116
117 //! @brief End a thread, beit normally or not.
118
119 void genie_abend_thread (void)
120 {
121 int k;
122 GET_THREAD_INDEX (k, pthread_self ());
123 ACTIVE (&(A68_PAR (context)[k])) = A68_FALSE;
124 UNLOCK_THREAD;
125 pthread_exit (NULL);
126 }
127
128 //! @brief When we end execution in a parallel clause we zap all threads.
129
130 void genie_set_exit_from_threads (int ret)
131 {
132 A68_PAR (abend_all_threads) = A68_TRUE;
133 A68_PAR (exit_from_threads) = A68_TRUE;
134 A68_PAR (par_return_code) = ret;
135 genie_abend_thread ();
136 }
137
138 //! @brief When we jump out of a parallel clause we zap all threads.
139
140 void genie_abend_all_threads (NODE_T * p, jmp_buf * jump_stat, NODE_T * label)
141 {
142 (void) p;
143 A68_PAR (abend_all_threads) = A68_TRUE;
144 A68_PAR (exit_from_threads) = A68_FALSE;
145 A68_PAR (jump_buffer) = jump_stat;
146 A68_PAR (jump_label) = label;
147 if (!is_main_thread ()) {
148 genie_abend_thread ();
149 }
150 }
151
152 //! @brief Save this thread and try to start another.
153
154 void try_change_thread (NODE_T * p)
155 {
156 if (is_main_thread ()) {
157 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
158 exit_genie (p, A68_RUNTIME_ERROR);
159 } else {
160 // Release the unit_sema so another thread can take it up ...
161 save_stacks (pthread_self ());
162 UNLOCK_THREAD;
163 // ... and take it up again!.
164 LOCK_THREAD;
165 restore_stacks (pthread_self ());
166 }
167 }
168
169 //! @brief Store the stacks of threads.
170
171 void save_stacks (pthread_t t)
172 {
173 int k;
174 GET_THREAD_INDEX (k, t);
175 // Store stack pointers.
176 CUR_PTR (&FRAME (&(A68_PAR (context)[k]))) = A68_FP;
177 CUR_PTR (&STACK (&(A68_PAR (context)[k]))) = A68_SP;
178 // Swap out evaluation stack.
179 ADDR_T p = A68_SP;
180 ADDR_T q = INI_PTR (&STACK (&(A68_PAR (context)[k])));
181 SAVE_STACK (&(STACK (&(A68_PAR (context)[k]))), STACK_ADDRESS (q), p - q);
182 // Swap out frame stack.
183 p = A68_FP;
184 q = INI_PTR (&FRAME (&(A68_PAR (context)[k])));
185 ADDR_T u = p + FRAME_SIZE (p);
186 ADDR_T v = q + FRAME_SIZE (q);
187 // Consider the embedding thread.
188 SAVE_STACK (&(FRAME (&(A68_PAR (context)[k]))), FRAME_ADDRESS (v), u - v);
189 }
190
191 //! @brief Restore stacks of thread.
192
193 void restore_stacks (pthread_t t)
194 {
195 if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
196 genie_abend_thread ();
197 } else {
198 int k;
199 GET_THREAD_INDEX (k, t);
200 // Restore stack pointers.
201 get_stack_size ();
202 A68 (system_stack_offset) = THREAD_STACK_OFFSET (&(A68_PAR (context)[k]));
203 A68_FP = CUR_PTR (&FRAME (&(A68_PAR (context)[k])));
204 A68_SP = CUR_PTR (&STACK (&(A68_PAR (context)[k])));
205 // Restore stacks.
206 RESTORE_STACK (&(STACK (&(A68_PAR (context)[k]))));
207 RESTORE_STACK (&(FRAME (&(A68_PAR (context)[k]))));
208 }
209 }
210
211 //! @brief Check whether parallel units have terminated.
212
213 void check_parallel_units (BOOL_T * active, pthread_t parent)
214 {
215 for (int k = 0; k < A68_PAR (context_index); k++) {
216 if (parent == PARENT (&(A68_PAR (context)[k]))) {
217 (*active) |= ACTIVE (&(A68_PAR (context)[k]));
218 }
219 }
220 }
221
222 //! @brief Execute one unit from a PAR clause.
223
224 void *start_unit (void *arg)
225 {
226 BYTE_T stack_offset;
227 (void) arg;
228 LOCK_THREAD;
229 pthread_t t = pthread_self ();
230 int k;
231 GET_THREAD_INDEX (k, t);
232 THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&A68_PAR (context)[k]));
233 restore_stacks (t);
234 NODE_T *p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
235 GENIE_UNIT_TRACE (p);
236 genie_abend_thread ();
237 return (void *) NULL;
238 }
239
240 //! @brief Execute parallel units.
241
242 void start_parallel_units (NODE_T * p, pthread_t parent)
243 {
244 for (; p != NO_NODE; FORWARD (p)) {
245 if (IS (p, UNIT)) {
246 pthread_t new_id;
247 pthread_attr_t new_at;
248 size_t ss;
249 BYTE_T stack_offset;
250 A68_THREAD_CONTEXT *u;
251 // Set up a thread for this unit.
252 if (A68_PAR (context_index) >= THREAD_MAX) {
253 static BUFFER msg;
254 a68_bufprt (msg, SNPRINTF_SIZE, "platform supports %d parallel units", THREAD_MAX);
255 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OVERFLOW, msg);
256 exit_genie (p, A68_RUNTIME_ERROR);
257 }
258 // Fill out a context for this thread.
259 u = &((A68_PAR (context)[A68_PAR (context_index)]));
260 UNIT (u) = p;
261 STACK_USED (u) = SYSTEM_STACK_USED;
262 THREAD_STACK_OFFSET (u) = NO_BYTE;
263 CUR_PTR (&STACK (u)) = A68_SP;
264 CUR_PTR (&FRAME (u)) = A68_FP;
265 INI_PTR (&STACK (u)) = A68_PAR (sp0);
266 INI_PTR (&FRAME (u)) = A68_PAR (fp0);
267 SWAP (&STACK (u)) = NO_BYTE;
268 SWAP (&FRAME (u)) = NO_BYTE;
269 START (&STACK (u)) = NO_BYTE;
270 START (&FRAME (u)) = NO_BYTE;
271 BYTES (&STACK (u)) = 0;
272 BYTES (&FRAME (u)) = 0;
273 ACTIVE (u) = A68_TRUE;
274 // Create the thread.
275 errno = 0;
276 if (pthread_attr_init (&new_at) != 0) {
277 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
278 exit_genie (p, A68_RUNTIME_ERROR);
279 }
280 if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
281 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
282 exit_genie (p, A68_RUNTIME_ERROR);
283 }
284 if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
285 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
286 exit_genie (p, A68_RUNTIME_ERROR);
287 }
288 ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
289 if (pthread_create (&new_id, &new_at, start_unit, NULL) != 0) {
290 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
291 exit_genie (p, A68_RUNTIME_ERROR);
292 }
293 PARENT (u) = parent;
294 ID (u) = new_id;
295 A68_PAR (context_index)++;
296 save_stacks (new_id);
297 } else {
298 start_parallel_units (SUB (p), parent);
299 }
300 }
301 }
302
303 //! @brief Execute one unit from a PAR clause.
304
305 void *start_genie_parallel (void *arg)
306 {
307 BYTE_T stack_offset;
308 (void) arg;
309 LOCK_THREAD;
310 pthread_t t = pthread_self ();
311 int k;
312 GET_THREAD_INDEX (k, t);
313 THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&(A68_PAR (context)[k])));
314 restore_stacks (t);
315 NODE_T *p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
316 // This is the thread spawned by the main thread, we spawn parallel units and await their completion.
317 start_parallel_units (SUB (p), t);
318 BOOL_T units_active;
319 do {
320 units_active = A68_FALSE;
321 check_parallel_units (&units_active, pthread_self ());
322 if (units_active) {
323 try_change_thread (p);
324 }
325 } while (units_active);
326 genie_abend_thread ();
327 return (void *) NULL;
328 }
329
330 //! @brief Execute parallel clause.
331
332 PROP_T genie_parallel (NODE_T * p)
333 {
334 ADDR_T stack_s = 0, frame_s = 0;
335 BYTE_T *system_stack_offset_s = NO_BYTE;
336 if (is_main_thread ()) {
337 // Spawn first thread and await its completion.
338 pthread_attr_t new_at;
339 size_t ss;
340 BYTE_T stack_offset;
341 A68_THREAD_CONTEXT *u;
342 LOCK_THREAD;
343 A68_PAR (abend_all_threads) = A68_FALSE;
344 A68_PAR (exit_from_threads) = A68_FALSE;
345 A68_PAR (par_return_code) = 0;
346 A68_PAR (sp0) = stack_s = A68_SP;
347 A68_PAR (fp0) = frame_s = A68_FP;
348 system_stack_offset_s = A68 (system_stack_offset);
349 A68_PAR (context_index) = 0;
350 // Set up a thread for this unit.
351 u = &(A68_PAR (context)[A68_PAR (context_index)]);
352 UNIT (u) = p;
353 STACK_USED (u) = SYSTEM_STACK_USED;
354 THREAD_STACK_OFFSET (u) = NO_BYTE;
355 CUR_PTR (&STACK (u)) = A68_SP;
356 CUR_PTR (&FRAME (u)) = A68_FP;
357 INI_PTR (&STACK (u)) = A68_PAR (sp0);
358 INI_PTR (&FRAME (u)) = A68_PAR (fp0);
359 SWAP (&STACK (u)) = NO_BYTE;
360 SWAP (&FRAME (u)) = NO_BYTE;
361 START (&STACK (u)) = NO_BYTE;
362 START (&FRAME (u)) = NO_BYTE;
363 BYTES (&STACK (u)) = 0;
364 BYTES (&FRAME (u)) = 0;
365 ACTIVE (u) = A68_TRUE;
366 // Spawn the first thread and join it to await its completion.
367 errno = 0;
368 if (pthread_attr_init (&new_at) != 0) {
369 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
370 exit_genie (p, A68_RUNTIME_ERROR);
371 }
372 if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
373 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
374 exit_genie (p, A68_RUNTIME_ERROR);
375 }
376 if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
377 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
378 exit_genie (p, A68_RUNTIME_ERROR);
379 }
380 ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
381 if (pthread_create (&A68_PAR (parent_thread_id), &new_at, start_genie_parallel, NULL) != 0) {
382 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
383 exit_genie (p, A68_RUNTIME_ERROR);
384 }
385 // Do not check errno here as a successful operation does not clear errno.
386 PARENT (u) = A68_PAR (main_thread_id);
387 ID (u) = A68_PAR (parent_thread_id);
388 A68_PAR (context_index)++;
389 save_stacks (A68_PAR (parent_thread_id));
390 UNLOCK_THREAD;
391 if (pthread_join (A68_PAR (parent_thread_id), NULL) != 0) {
392 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
393 exit_genie (p, A68_RUNTIME_ERROR);
394 }
395 // The first spawned thread has completed, now clean up.
396 for (int j = 0; j < A68_PAR (context_index); j++) {
397 if (ACTIVE (&(A68_PAR (context)[j])) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (main_thread_id)) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (parent_thread_id))) {
398 // If threads are zapped it is possible that some are active at this point!.
399 if (pthread_join (ID (&(A68_PAR (context)[j])), NULL) != 0) {
400 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
401 exit_genie (p, A68_RUNTIME_ERROR);
402 }
403 }
404 a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
405 SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
406 }
407 // Now every thread should have ended.
408 A68_PAR (context_index) = 0;
409 A68_SP = stack_s;
410 A68_FP = frame_s;
411 get_stack_size ();
412 A68 (system_stack_offset) = system_stack_offset_s;
413 // See if we ended execution in parallel clause.
414 if (is_main_thread () && A68_PAR (exit_from_threads)) {
415 exit_genie (p, A68_PAR (par_return_code));
416 }
417 if (is_main_thread () && ERROR_COUNT (&A68_JOB) > 0) {
418 exit_genie (p, A68_RUNTIME_ERROR);
419 }
420 // See if we jumped out of the parallel clause(s).
421 if (is_main_thread () && A68_PAR (abend_all_threads)) {
422 JUMP_TO (TABLE (TAX (A68_PAR (jump_label)))) = UNIT (TAX (A68_PAR (jump_label)));
423 longjmp (*(A68_PAR (jump_buffer)), 1);
424 }
425 } else {
426 // Not in the main thread, spawn parallel units and await completion.
427 BOOL_T units_active;
428 pthread_t t = pthread_self ();
429 // Spawn parallel units.
430 start_parallel_units (SUB (p), t);
431 do {
432 units_active = A68_FALSE;
433 check_parallel_units (&units_active, t);
434 if (units_active) {
435 try_change_thread (p);
436 }
437 } while (units_active);
438 }
439 return GPROP (p);
440 }
441
442 //! @brief OP LEVEL = (INT) SEMA
443
444 void genie_level_sema_int (NODE_T * p)
445 {
446 A68_INT k;
447 POP_OBJECT (p, &k, A68_INT);
448 A68_REF s = heap_generator (p, M_INT, SIZE (M_INT));
449 *DEREF (A68_INT, &s) = k;
450 PUSH_REF (p, s);
451 }
452
453 //! @brief OP LEVEL = (SEMA) INT
454
455 void genie_level_int_sema (NODE_T * p)
456 {
457 A68_REF s;
458 POP_REF (p, &s);
459 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
460 PUSH_VALUE (p, VALUE (DEREF (A68_INT, &s)), A68_INT);
461 }
462
463 //! @brief OP UP = (SEMA) VOID
464
465 void genie_up_sema (NODE_T * p)
466 {
467 if (is_main_thread ()) {
468 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
469 exit_genie (p, A68_RUNTIME_ERROR);
470 }
471 A68_REF s;
472 POP_REF (p, &s);
473 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
474 VALUE (DEREF (A68_INT, &s))++;
475 }
476
477 //! @brief OP DOWN = (SEMA) VOID
478
479 void genie_down_sema (NODE_T * p)
480 {
481 if (is_main_thread ()) {
482 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
483 exit_genie (p, A68_RUNTIME_ERROR);
484 }
485 A68_REF s;
486 POP_REF (p, &s);
487 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
488 BOOL_T cont = A68_TRUE;
489 while (cont) {
490 A68_INT *k = DEREF (A68_INT, &s);
491 if (VALUE (k) <= 0) {
492 save_stacks (pthread_self ());
493 while (VALUE (k) <= 0) {
494 if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
495 genie_abend_thread ();
496 }
497 UNLOCK_THREAD;
498 // Waiting a bit relaxes overhead.
499 int ret = a68_usleep (10);
500 ASSERT (ret == 0 || errno == EINTR);
501 LOCK_THREAD;
502 // Garbage may be collected, so recalculate 'k'.
503 k = DEREF (A68_INT, &s);
504 }
505 restore_stacks (pthread_self ());
506 cont = A68_TRUE;
507 } else {
508 VALUE (k)--;
509 cont = A68_FALSE;
510 }
511 }
512 }
513
514 #endif
© 2002-2024 J.M. van der Veer (jmvdveer@xs4all.nl)
|