rts-parallel.c
1 //! @file rts-parallel.c
2 //! @author J. Marcel van der Veer
3 //!
4 //! @section Copyright
5 //!
6 //! This file is part of Algol68G - an Algol 68 compiler-interpreter.
7 //! Copyright 2001-2023 J. Marcel van der Veer [algol68g@xs4all.nl].
8 //!
9 //! @section License
10 //!
11 //! This program is free software; you can redistribute it and/or modify it
12 //! under the terms of the GNU General Public License as published by the
13 //! Free Software Foundation; either version 3 of the License, or
14 //! (at your option) any later version.
15 //!
16 //! This program is distributed in the hope that it will be useful, but
17 //! WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
18 //! or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
19 //! more details. You should have received a copy of the GNU General Public
20 //! License along with this program. If not, see [http://www.gnu.org/licenses/].
21
22 //! @section Synopsis
23 //!
24 //! Parallel clause implementation.
25
26 #include "a68g.h"
27 #include "a68g-genie.h"
28 #include "a68g-frames.h"
29 #include "a68g-prelude.h"
30 #include "a68g-mp.h"
31 #include "a68g-double.h"
32 #include "a68g-parser.h"
33 #include "a68g-transput.h"
34
35 // This code implements a parallel clause for Algol68G.
36 // The parallel clause has been included for educational purposes;
37 // this implementation is not the most efficient one.
38 //
39 // POSIX threads are used to have separate registers and stack for each concurrent
40 // unit. Algol68G parallel units behave as POSIX threads - they have private
41 // stacks. Hence an assignation to an object in another thread, does not change
42 // that object in that other thread. Also jumps between threads are forbidden.
43
44 #if defined (BUILD_PARALLEL_CLAUSE)
45
46 // static pthread_mutex_t unit_sema = PTHREAD_MUTEX_INITIALIZER;
47
48 void save_stacks (pthread_t);
49 void restore_stacks (pthread_t);
50
51 #define SAVE_STACK(stk, st, si) {\
52 A68_STACK_DESCRIPTOR *s = (stk);\
53 BYTE_T *start = (st);\
54 int size = (si);\
55 if (size > 0) {\
56 if (!((s != NULL) && (BYTES (s) > 0) && (size <= BYTES (s)))) {\
57 if (SWAP (s) != NO_BYTE) {\
58 a68_free (SWAP (s));\
59 }\
60 SWAP (s) = (BYTE_T *) get_heap_space ((size_t) size);\
61 ABEND (SWAP (s) == NULL, ERROR_OUT_OF_CORE, __func__);\
62 }\
63 START (s) = start;\
64 BYTES (s) = size;\
65 COPY (SWAP (s), start, size);\
66 } else {\
67 START (s) = start;\
68 BYTES (s) = 0;\
69 if (SWAP (s) != NO_BYTE) {\
70 a68_free (SWAP (s));\
71 }\
72 SWAP (s) = NO_BYTE;\
73 }}
74
75 #define RESTORE_STACK(stk) {\
76 A68_STACK_DESCRIPTOR *s = (stk);\
77 if (s != NULL && BYTES (s) > 0) {\
78 COPY (START (s), SWAP (s), BYTES (s));\
79 }}
80
81 #define GET_THREAD_INDEX(z, ptid) {\
82 int _k_;\
83 pthread_t _tid_ = (ptid);\
84 (z) = -1;\
85 for (_k_ = 0; _k_ < A68_PAR (context_index) && (z) == -1; _k_++) {\
86 if (SAME_THREAD (_tid_, ID (&(A68_PAR (context)[_k_])))) {\
87 (z) = _k_;\
88 }\
89 }\
90 ABEND ((z) == -1, ERROR_INTERNAL_CONSISTENCY, __func__);\
91 }
92
93 #define ERROR_THREAD_FAULT "thread fault"
94
95 #define LOCK_THREAD {\
96 ABEND (pthread_mutex_lock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
97 }
98
99 #define UNLOCK_THREAD {\
100 ABEND (pthread_mutex_unlock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
101 }
102
103 //! @brief Does system stack grow up or down?.
104
105 static inline int stack_direction (BYTE_T * lwb)
106 {
107 BYTE_T upb;
108 if (&upb > lwb) {
109 return (int) sizeof (BYTE_T);
110 } else if (&upb < lwb) {
111 return - (int) sizeof (BYTE_T);
112 } else {
113 ASSERT (A68_FALSE);
114 return 0; // Pro forma
115 }
116 }
117
118 //! @brief Whether we are in the main thread.
119
120 BOOL_T is_main_thread (void)
121 {
122 return SAME_THREAD (A68_PAR (main_thread_id), pthread_self ());
123 }
124
125 //! @brief End a thread, beit normally or not.
126
127 void genie_abend_thread (void)
128 {
129 int k;
130 GET_THREAD_INDEX (k, pthread_self ());
131 ACTIVE (&(A68_PAR (context)[k])) = A68_FALSE;
132 UNLOCK_THREAD;
133 pthread_exit (NULL);
134 }
135
136 //! @brief When we end execution in a parallel clause we zap all threads.
137
138 void genie_set_exit_from_threads (int ret)
139 {
140 A68_PAR (abend_all_threads) = A68_TRUE;
141 A68_PAR (exit_from_threads) = A68_TRUE;
142 A68_PAR (par_return_code) = ret;
143 genie_abend_thread ();
144 }
145
146 //! @brief When we jump out of a parallel clause we zap all threads.
147
148 void genie_abend_all_threads (NODE_T * p, jmp_buf * jump_stat, NODE_T * label)
149 {
150 (void) p;
151 A68_PAR (abend_all_threads) = A68_TRUE;
152 A68_PAR (exit_from_threads) = A68_FALSE;
153 A68_PAR (jump_buffer) = jump_stat;
154 A68_PAR (jump_label) = label;
155 if (!is_main_thread ()) {
156 genie_abend_thread ();
157 }
158 }
159
160 //! @brief Save this thread and try to start another.
161
162 void try_change_thread (NODE_T * p)
163 {
164 if (is_main_thread ()) {
165 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
166 exit_genie (p, A68_RUNTIME_ERROR);
167 } else {
168 // Release the unit_sema so another thread can take it up ...
169 save_stacks (pthread_self ());
170 UNLOCK_THREAD;
171 // ... and take it up again!.
172 LOCK_THREAD;
173 restore_stacks (pthread_self ());
174 }
175 }
176
177 //! @brief Store the stacks of threads.
178
179 void save_stacks (pthread_t t)
180 {
181 ADDR_T p, q, u, v;
182 int k;
183 GET_THREAD_INDEX (k, t);
184 // Store stack pointers.
185 CUR_PTR (&FRAME (&(A68_PAR (context)[k]))) = A68_FP;
186 CUR_PTR (&STACK (&(A68_PAR (context)[k]))) = A68_SP;
187 // Swap out evaluation stack.
188 p = A68_SP;
189 q = INI_PTR (&STACK (&(A68_PAR (context)[k])));
190 SAVE_STACK (&(STACK (&(A68_PAR (context)[k]))), STACK_ADDRESS (q), p - q);
191 // Swap out frame stack.
192 p = A68_FP;
193 q = INI_PTR (&FRAME (&(A68_PAR (context)[k])));
194 u = p + FRAME_SIZE (p);
195 v = q + FRAME_SIZE (q);
196 // Consider the embedding thread.
197 SAVE_STACK (&(FRAME (&(A68_PAR (context)[k]))), FRAME_ADDRESS (v), u - v);
198 }
199
200 //! @brief Restore stacks of thread.
201
202 void restore_stacks (pthread_t t)
203 {
204 if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
205 genie_abend_thread ();
206 } else {
207 int k;
208 GET_THREAD_INDEX (k, t);
209 // Restore stack pointers.
210 get_stack_size ();
211 A68 (system_stack_offset) = THREAD_STACK_OFFSET (&(A68_PAR (context)[k]));
212 A68_FP = CUR_PTR (&FRAME (&(A68_PAR (context)[k])));
213 A68_SP = CUR_PTR (&STACK (&(A68_PAR (context)[k])));
214 // Restore stacks.
215 RESTORE_STACK (&(STACK (&(A68_PAR (context)[k]))));
216 RESTORE_STACK (&(FRAME (&(A68_PAR (context)[k]))));
217 }
218 }
219
220 //! @brief Check whether parallel units have terminated.
221
222 void check_parallel_units (BOOL_T * active, pthread_t parent)
223 {
224 int k;
225 for (k = 0; k < A68_PAR (context_index); k++) {
226 if (parent == PARENT (&(A68_PAR (context)[k]))) {
227 (*active) |= ACTIVE (&(A68_PAR (context)[k]));
228 }
229 }
230 }
231
232 //! @brief Execute one unit from a PAR clause.
233
234 void *start_unit (void *arg)
235 {
236 pthread_t t;
237 int k;
238 BYTE_T stack_offset;
239 NODE_T *p;
240 (void) arg;
241 LOCK_THREAD;
242 t = pthread_self ();
243 GET_THREAD_INDEX (k, t);
244 THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&A68_PAR (context)[k]));
245 restore_stacks (t);
246 p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
247 EXECUTE_UNIT_TRACE (p);
248 genie_abend_thread ();
249 return (void *) NULL;
250 }
251
252 //! @brief Execute parallel units.
253
254 void start_parallel_units (NODE_T * p, pthread_t parent)
255 {
256 for (; p != NO_NODE; FORWARD (p)) {
257 if (IS (p, UNIT)) {
258 pthread_t new_id;
259 pthread_attr_t new_at;
260 size_t ss;
261 BYTE_T stack_offset;
262 A68_THREAD_CONTEXT *u;
263 // Set up a thread for this unit.
264 if (A68_PAR (context_index) >= THREAD_MAX) {
265 static BUFFER msg;
266 snprintf (msg, SNPRINTF_SIZE, "platform supports %d parallel units", THREAD_MAX);
267 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OVERFLOW, msg);
268 exit_genie (p, A68_RUNTIME_ERROR);
269 }
270 // Fill out a context for this thread.
271 u = &((A68_PAR (context)[A68_PAR (context_index)]));
272 UNIT (u) = p;
273 STACK_USED (u) = SYSTEM_STACK_USED;
274 THREAD_STACK_OFFSET (u) = NO_BYTE;
275 CUR_PTR (&STACK (u)) = A68_SP;
276 CUR_PTR (&FRAME (u)) = A68_FP;
277 INI_PTR (&STACK (u)) = A68_PAR (sp0);
278 INI_PTR (&FRAME (u)) = A68_PAR (fp0);
279 SWAP (&STACK (u)) = NO_BYTE;
280 SWAP (&FRAME (u)) = NO_BYTE;
281 START (&STACK (u)) = NO_BYTE;
282 START (&FRAME (u)) = NO_BYTE;
283 BYTES (&STACK (u)) = 0;
284 BYTES (&FRAME (u)) = 0;
285 ACTIVE (u) = A68_TRUE;
286 // Create the thread.
287 errno = 0;
288 if (pthread_attr_init (&new_at) != 0) {
289 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
290 exit_genie (p, A68_RUNTIME_ERROR);
291 }
292 if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
293 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
294 exit_genie (p, A68_RUNTIME_ERROR);
295 }
296 if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
297 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
298 exit_genie (p, A68_RUNTIME_ERROR);
299 }
300 ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
301 if (pthread_create (&new_id, &new_at, start_unit, NULL) != 0) {
302 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
303 exit_genie (p, A68_RUNTIME_ERROR);
304 }
305 PARENT (u) = parent;
306 ID (u) = new_id;
307 A68_PAR (context_index)++;
308 save_stacks (new_id);
309 } else {
310 start_parallel_units (SUB (p), parent);
311 }
312 }
313 }
314
315 //! @brief Execute one unit from a PAR clause.
316
317 void *start_genie_parallel (void *arg)
318 {
319 pthread_t t;
320 int k;
321 BYTE_T stack_offset;
322 NODE_T *p;
323 BOOL_T units_active;
324 (void) arg;
325 LOCK_THREAD;
326 t = pthread_self ();
327 GET_THREAD_INDEX (k, t);
328 THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&(A68_PAR (context)[k])));
329 restore_stacks (t);
330 p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
331 // This is the thread spawned by the main thread, we spawn parallel units and await their completion.
332 start_parallel_units (SUB (p), t);
333 do {
334 units_active = A68_FALSE;
335 check_parallel_units (&units_active, pthread_self ());
336 if (units_active) {
337 try_change_thread (p);
338 }
339 } while (units_active);
340 genie_abend_thread ();
341 return (void *) NULL;
342 }
343
344 //! @brief Execute parallel clause.
345
346 PROP_T genie_parallel (NODE_T * p)
347 {
348 int j;
349 ADDR_T stack_s = 0, frame_s = 0;
350 BYTE_T *system_stack_offset_s = NO_BYTE;
351 if (is_main_thread ()) {
352 // Spawn first thread and await its completion.
353 pthread_attr_t new_at;
354 size_t ss;
355 BYTE_T stack_offset;
356 A68_THREAD_CONTEXT *u;
357 LOCK_THREAD;
358 A68_PAR (abend_all_threads) = A68_FALSE;
359 A68_PAR (exit_from_threads) = A68_FALSE;
360 A68_PAR (par_return_code) = 0;
361 A68_PAR (sp0) = stack_s = A68_SP;
362 A68_PAR (fp0) = frame_s = A68_FP;
363 system_stack_offset_s = A68 (system_stack_offset);
364 A68_PAR (context_index) = 0;
365 // Set up a thread for this unit.
366 u = &(A68_PAR (context)[A68_PAR (context_index)]);
367 UNIT (u) = p;
368 STACK_USED (u) = SYSTEM_STACK_USED;
369 THREAD_STACK_OFFSET (u) = NO_BYTE;
370 CUR_PTR (&STACK (u)) = A68_SP;
371 CUR_PTR (&FRAME (u)) = A68_FP;
372 INI_PTR (&STACK (u)) = A68_PAR (sp0);
373 INI_PTR (&FRAME (u)) = A68_PAR (fp0);
374 SWAP (&STACK (u)) = NO_BYTE;
375 SWAP (&FRAME (u)) = NO_BYTE;
376 START (&STACK (u)) = NO_BYTE;
377 START (&FRAME (u)) = NO_BYTE;
378 BYTES (&STACK (u)) = 0;
379 BYTES (&FRAME (u)) = 0;
380 ACTIVE (u) = A68_TRUE;
381 // Spawn the first thread and join it to await its completion.
382 errno = 0;
383 if (pthread_attr_init (&new_at) != 0) {
384 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
385 exit_genie (p, A68_RUNTIME_ERROR);
386 }
387 if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
388 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
389 exit_genie (p, A68_RUNTIME_ERROR);
390 }
391 if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
392 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
393 exit_genie (p, A68_RUNTIME_ERROR);
394 }
395 ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
396 if (pthread_create (&A68_PAR (parent_thread_id), &new_at, start_genie_parallel, NULL) != 0) {
397 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
398 exit_genie (p, A68_RUNTIME_ERROR);
399 }
400 if (errno != 0) {
401 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
402 exit_genie (p, A68_RUNTIME_ERROR);
403 }
404 PARENT (u) = A68_PAR (main_thread_id);
405 ID (u) = A68_PAR (parent_thread_id);
406 A68_PAR (context_index)++;
407 save_stacks (A68_PAR (parent_thread_id));
408 UNLOCK_THREAD;
409 if (pthread_join (A68_PAR (parent_thread_id), NULL) != 0) {
410 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
411 exit_genie (p, A68_RUNTIME_ERROR);
412 }
413 // The first spawned thread has completed, now clean up.
414 for (j = 0; j < A68_PAR (context_index); j++) {
415 if (ACTIVE (&(A68_PAR (context)[j])) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (main_thread_id)) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (parent_thread_id))) {
416 // If threads are zapped it is possible that some are active at this point!.
417 if (pthread_join (ID (&(A68_PAR (context)[j])), NULL) != 0) {
418 diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
419 exit_genie (p, A68_RUNTIME_ERROR);
420 }
421 }
422 if (SWAP (&STACK (&(A68_PAR (context)[j]))) != NO_BYTE) {
423 a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
424 SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
425 }
426 if (SWAP (&STACK (&(A68_PAR (context)[j]))) != NO_BYTE) {
427 a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
428 SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
429 }
430 }
431 // Now every thread should have ended.
432 A68_PAR (context_index) = 0;
433 A68_SP = stack_s;
434 A68_FP = frame_s;
435 get_stack_size ();
436 A68 (system_stack_offset) = system_stack_offset_s;
437 // See if we ended execution in parallel clause.
438 if (is_main_thread () && A68_PAR (exit_from_threads)) {
439 exit_genie (p, A68_PAR (par_return_code));
440 }
441 if (is_main_thread () && ERROR_COUNT (&A68_JOB) > 0) {
442 exit_genie (p, A68_RUNTIME_ERROR);
443 }
444 // See if we jumped out of the parallel clause(s).
445 if (is_main_thread () && A68_PAR (abend_all_threads)) {
446 JUMP_TO (TABLE (TAX (A68_PAR (jump_label)))) = UNIT (TAX (A68_PAR (jump_label)));
447 longjmp (*(A68_PAR (jump_buffer)), 1);
448 }
449 } else {
450 // Not in the main thread, spawn parallel units and await completion.
451 BOOL_T units_active;
452 pthread_t t = pthread_self ();
453 // Spawn parallel units.
454 start_parallel_units (SUB (p), t);
455 do {
456 units_active = A68_FALSE;
457 check_parallel_units (&units_active, t);
458 if (units_active) {
459 try_change_thread (p);
460 }
461 } while (units_active);
462 }
463 return GPROP (p);
464 }
465
466 //! @brief OP LEVEL = (INT) SEMA
467
468 void genie_level_sema_int (NODE_T * p)
469 {
470 A68_INT k;
471 A68_REF s;
472 POP_OBJECT (p, &k, A68_INT);
473 s = heap_generator (p, M_INT, SIZE (M_INT));
474 *DEREF (A68_INT, &s) = k;
475 PUSH_REF (p, s);
476 }
477
478 //! @brief OP LEVEL = (SEMA) INT
479
480 void genie_level_int_sema (NODE_T * p)
481 {
482 A68_REF s;
483 POP_REF (p, &s);
484 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
485 PUSH_VALUE (p, VALUE (DEREF (A68_INT, &s)), A68_INT);
486 }
487
488 //! @brief OP UP = (SEMA) VOID
489
490 void genie_up_sema (NODE_T * p)
491 {
492 A68_REF s;
493 if (is_main_thread ()) {
494 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
495 exit_genie (p, A68_RUNTIME_ERROR);
496 }
497 POP_REF (p, &s);
498 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
499 VALUE (DEREF (A68_INT, &s))++;
500 }
501
502 //! @brief OP DOWN = (SEMA) VOID
503
504 void genie_down_sema (NODE_T * p)
505 {
506 A68_REF s;
507 A68_INT *k;
508 BOOL_T cont = A68_TRUE;
509 if (is_main_thread ()) {
510 diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
511 exit_genie (p, A68_RUNTIME_ERROR);
512 }
513 POP_REF (p, &s);
514 CHECK_INIT (p, INITIALISED (&s), M_SEMA);
515 while (cont) {
516 k = DEREF (A68_INT, &s);
517 if (VALUE (k) <= 0) {
518 save_stacks (pthread_self ());
519 while (VALUE (k) <= 0) {
520 if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
521 genie_abend_thread ();
522 }
523 UNLOCK_THREAD;
524 // Waiting a bit relaxes overhead.
525 int rc = usleep (10);
526 ASSERT (rc == 0 || errno == EINTR);
527 LOCK_THREAD;
528 // Garbage may be collected, so recalculate 'k'.
529 k = DEREF (A68_INT, &s);
530 }
531 restore_stacks (pthread_self ());
532 cont = A68_TRUE;
533 } else {
534 VALUE (k)--;
535 cont = A68_FALSE;
536 }
537 }
538 }
539
540 #endif