rts-parallel.c

     
   1  //! @file rts-parallel.c
   2  //! @author J. Marcel van der Veer
   3  //!
   4  //! @section Copyright
   5  //!
   6  //! This file is part of Algol68G - an Algol 68 compiler-interpreter.
   7  //! Copyright 2001-2023 J. Marcel van der Veer [algol68g@xs4all.nl].
   8  //!
   9  //! @section License
  10  //!
  11  //! This program is free software; you can redistribute it and/or modify it 
  12  //! under the terms of the GNU General Public License as published by the 
  13  //! Free Software Foundation; either version 3 of the License, or 
  14  //! (at your option) any later version.
  15  //!
  16  //! This program is distributed in the hope that it will be useful, but 
  17  //! WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 
  18  //! or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 
  19  //! more details. You should have received a copy of the GNU General Public 
  20  //! License along with this program. If not, see [http://www.gnu.org/licenses/].
  21  
  22  //! @section Synopsis 
  23  //!
  24  //! Parallel clause implementation.
  25  
  26  #include "a68g.h"
  27  #include "a68g-genie.h"
  28  #include "a68g-frames.h"
  29  #include "a68g-prelude.h"
  30  #include "a68g-mp.h"
  31  #include "a68g-double.h"
  32  #include "a68g-parser.h"
  33  #include "a68g-transput.h"
  34  
  35  // This code implements a parallel clause for Algol68G. 
  36  // The parallel clause has been included for educational purposes; 
  37  // this implementation is not the most efficient one.
  38  // 
  39  // POSIX threads are used to have separate registers and stack for each concurrent
  40  // unit. Algol68G parallel units behave as POSIX threads - they have private 
  41  // stacks. Hence an assignation to an object in another thread, does not change 
  42  // that object in that other thread. Also jumps between threads are forbidden.
  43  
  44  #if defined (BUILD_PARALLEL_CLAUSE)
  45  
  46  // static pthread_mutex_t unit_sema = PTHREAD_MUTEX_INITIALIZER;
  47  
  48  void save_stacks (pthread_t);
  49  void restore_stacks (pthread_t);
  50  
  51  #define SAVE_STACK(stk, st, si) {\
  52    A68_STACK_DESCRIPTOR *s = (stk);\
  53    BYTE_T *start = (st);\
  54    int size = (si);\
  55    if (size > 0) {\
  56      if (!((s != NULL) && (BYTES (s) > 0) && (size <= BYTES (s)))) {\
  57        if (SWAP (s) != NO_BYTE) {\
  58          a68_free (SWAP (s));\
  59        }\
  60        SWAP (s) = (BYTE_T *) get_heap_space ((size_t) size);\
  61        ABEND (SWAP (s) == NULL, ERROR_OUT_OF_CORE, __func__);\
  62      }\
  63      START (s) = start;\
  64      BYTES (s) = size;\
  65      COPY (SWAP (s), start, size);\
  66    } else {\
  67      START (s) = start;\
  68      BYTES (s) = 0;\
  69      if (SWAP (s) != NO_BYTE) {\
  70        a68_free (SWAP (s));\
  71      }\
  72      SWAP (s) = NO_BYTE;\
  73    }}
  74  
  75  #define RESTORE_STACK(stk) {\
  76    A68_STACK_DESCRIPTOR *s = (stk);\
  77    if (s != NULL && BYTES (s) > 0) {\
  78      COPY (START (s), SWAP (s), BYTES (s));\
  79    }}
  80  
  81  #define GET_THREAD_INDEX(z, ptid) {\
  82    int _k_;\
  83    pthread_t _tid_ = (ptid);\
  84    (z) = -1;\
  85    for (_k_ = 0; _k_ < A68_PAR (context_index) && (z) == -1; _k_++) {\
  86      if (SAME_THREAD (_tid_, ID (&(A68_PAR (context)[_k_])))) {\
  87        (z) = _k_;\
  88      }\
  89    }\
  90    ABEND ((z) == -1, ERROR_INTERNAL_CONSISTENCY, __func__);\
  91    }
  92  
  93  #define ERROR_THREAD_FAULT "thread fault"
  94  
  95  #define LOCK_THREAD {\
  96    ABEND (pthread_mutex_lock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
  97    }
  98  
  99  #define UNLOCK_THREAD {\
 100    ABEND (pthread_mutex_unlock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
 101    }
 102  
 103  //! @brief Does system stack grow up or down?.
 104  
 105  static inline int stack_direction (BYTE_T * lwb)
 106  {
 107    BYTE_T upb;
 108    if (&upb > lwb) {
 109      return (int) sizeof (BYTE_T);
 110    } else if (&upb < lwb) {
 111      return - (int) sizeof (BYTE_T);
 112    } else {
 113      ASSERT (A68_FALSE);
 114      return 0; // Pro forma
 115    }
 116  }
 117  
 118  //! @brief Whether we are in the main thread.
 119  
 120  BOOL_T is_main_thread (void)
 121  {
 122    return SAME_THREAD (A68_PAR (main_thread_id), pthread_self ());
 123  }
 124  
 125  //! @brief End a thread, beit normally or not.
 126  
 127  void genie_abend_thread (void)
 128  {
 129    int k;
 130    GET_THREAD_INDEX (k, pthread_self ());
 131    ACTIVE (&(A68_PAR (context)[k])) = A68_FALSE;
 132    UNLOCK_THREAD;
 133    pthread_exit (NULL);
 134  }
 135  
 136  //! @brief When we end execution in a parallel clause we zap all threads.
 137  
 138  void genie_set_exit_from_threads (int ret)
 139  {
 140    A68_PAR (abend_all_threads) = A68_TRUE;
 141    A68_PAR (exit_from_threads) = A68_TRUE;
 142    A68_PAR (par_return_code) = ret;
 143    genie_abend_thread ();
 144  }
 145  
 146  //! @brief When we jump out of a parallel clause we zap all threads.
 147  
 148  void genie_abend_all_threads (NODE_T * p, jmp_buf * jump_stat, NODE_T * label)
 149  {
 150    (void) p;
 151    A68_PAR (abend_all_threads) = A68_TRUE;
 152    A68_PAR (exit_from_threads) = A68_FALSE;
 153    A68_PAR (jump_buffer) = jump_stat;
 154    A68_PAR (jump_label) = label;
 155    if (!is_main_thread ()) {
 156      genie_abend_thread ();
 157    }
 158  }
 159  
 160  //! @brief Save this thread and try to start another.
 161  
 162  void try_change_thread (NODE_T * p)
 163  {
 164    if (is_main_thread ()) {
 165      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 166      exit_genie (p, A68_RUNTIME_ERROR);
 167    } else {
 168  // Release the unit_sema so another thread can take it up ...
 169      save_stacks (pthread_self ());
 170      UNLOCK_THREAD;
 171  // ... and take it up again!.
 172      LOCK_THREAD;
 173      restore_stacks (pthread_self ());
 174    }
 175  }
 176  
 177  //! @brief Store the stacks of threads.
 178  
 179  void save_stacks (pthread_t t)
 180  {
 181    ADDR_T p, q, u, v;
 182    int k;
 183    GET_THREAD_INDEX (k, t);
 184  // Store stack pointers.
 185    CUR_PTR (&FRAME (&(A68_PAR (context)[k]))) = A68_FP;
 186    CUR_PTR (&STACK (&(A68_PAR (context)[k]))) = A68_SP;
 187  // Swap out evaluation stack.
 188    p = A68_SP;
 189    q = INI_PTR (&STACK (&(A68_PAR (context)[k])));
 190    SAVE_STACK (&(STACK (&(A68_PAR (context)[k]))), STACK_ADDRESS (q), p - q);
 191  // Swap out frame stack.
 192    p = A68_FP;
 193    q = INI_PTR (&FRAME (&(A68_PAR (context)[k])));
 194    u = p + FRAME_SIZE (p);
 195    v = q + FRAME_SIZE (q);
 196  // Consider the embedding thread.
 197    SAVE_STACK (&(FRAME (&(A68_PAR (context)[k]))), FRAME_ADDRESS (v), u - v);
 198  }
 199  
 200  //! @brief Restore stacks of thread.
 201  
 202  void restore_stacks (pthread_t t)
 203  {
 204    if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
 205      genie_abend_thread ();
 206    } else {
 207      int k;
 208      GET_THREAD_INDEX (k, t);
 209  // Restore stack pointers.
 210      get_stack_size ();
 211      A68 (system_stack_offset) = THREAD_STACK_OFFSET (&(A68_PAR (context)[k]));
 212      A68_FP = CUR_PTR (&FRAME (&(A68_PAR (context)[k])));
 213      A68_SP = CUR_PTR (&STACK (&(A68_PAR (context)[k])));
 214  // Restore stacks.
 215      RESTORE_STACK (&(STACK (&(A68_PAR (context)[k]))));
 216      RESTORE_STACK (&(FRAME (&(A68_PAR (context)[k]))));
 217    }
 218  }
 219  
 220  //! @brief Check whether parallel units have terminated.
 221  
 222  void check_parallel_units (BOOL_T * active, pthread_t parent)
 223  {
 224    int k;
 225    for (k = 0; k < A68_PAR (context_index); k++) {
 226      if (parent == PARENT (&(A68_PAR (context)[k]))) {
 227        (*active) |= ACTIVE (&(A68_PAR (context)[k]));
 228      }
 229    }
 230  }
 231  
 232  //! @brief Execute one unit from a PAR clause.
 233  
 234  void *start_unit (void *arg)
 235  {
 236    pthread_t t;
 237    int k;
 238    BYTE_T stack_offset;
 239    NODE_T *p;
 240    (void) arg;
 241    LOCK_THREAD;
 242    t = pthread_self ();
 243    GET_THREAD_INDEX (k, t);
 244    THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&A68_PAR (context)[k]));
 245    restore_stacks (t);
 246    p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
 247    EXECUTE_UNIT_TRACE (p);
 248    genie_abend_thread ();
 249    return (void *) NULL;
 250  }
 251  
 252  //! @brief Execute parallel units.
 253  
 254  void start_parallel_units (NODE_T * p, pthread_t parent)
 255  {
 256    for (; p != NO_NODE; FORWARD (p)) {
 257      if (IS (p, UNIT)) {
 258        pthread_t new_id;
 259        pthread_attr_t new_at;
 260        size_t ss;
 261        BYTE_T stack_offset;
 262        A68_THREAD_CONTEXT *u;
 263  // Set up a thread for this unit.
 264        if (A68_PAR (context_index) >= THREAD_MAX) {
 265          static BUFFER msg;
 266          snprintf (msg, SNPRINTF_SIZE, "platform supports %d parallel units", THREAD_MAX);
 267          diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OVERFLOW, msg);
 268          exit_genie (p, A68_RUNTIME_ERROR);
 269        }
 270  // Fill out a context for this thread.
 271        u = &((A68_PAR (context)[A68_PAR (context_index)]));
 272        UNIT (u) = p;
 273        STACK_USED (u) = SYSTEM_STACK_USED;
 274        THREAD_STACK_OFFSET (u) = NO_BYTE;
 275        CUR_PTR (&STACK (u)) = A68_SP;
 276        CUR_PTR (&FRAME (u)) = A68_FP;
 277        INI_PTR (&STACK (u)) = A68_PAR (sp0);
 278        INI_PTR (&FRAME (u)) = A68_PAR (fp0);
 279        SWAP (&STACK (u)) = NO_BYTE;
 280        SWAP (&FRAME (u)) = NO_BYTE;
 281        START (&STACK (u)) = NO_BYTE;
 282        START (&FRAME (u)) = NO_BYTE;
 283        BYTES (&STACK (u)) = 0;
 284        BYTES (&FRAME (u)) = 0;
 285        ACTIVE (u) = A68_TRUE;
 286  // Create the thread.
 287        errno = 0;
 288        if (pthread_attr_init (&new_at) != 0) {
 289          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 290          exit_genie (p, A68_RUNTIME_ERROR);
 291        }
 292        if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
 293          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 294          exit_genie (p, A68_RUNTIME_ERROR);
 295        }
 296        if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
 297          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 298          exit_genie (p, A68_RUNTIME_ERROR);
 299        }
 300        ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
 301        if (pthread_create (&new_id, &new_at, start_unit, NULL) != 0) {
 302          diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
 303          exit_genie (p, A68_RUNTIME_ERROR);
 304        }
 305        PARENT (u) = parent;
 306        ID (u) = new_id;
 307        A68_PAR (context_index)++;
 308        save_stacks (new_id);
 309      } else {
 310        start_parallel_units (SUB (p), parent);
 311      }
 312    }
 313  }
 314  
 315  //! @brief Execute one unit from a PAR clause.
 316  
 317  void *start_genie_parallel (void *arg)
 318  {
 319    pthread_t t;
 320    int k;
 321    BYTE_T stack_offset;
 322    NODE_T *p;
 323    BOOL_T units_active;
 324    (void) arg;
 325    LOCK_THREAD;
 326    t = pthread_self ();
 327    GET_THREAD_INDEX (k, t);
 328    THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&(A68_PAR (context)[k])));
 329    restore_stacks (t);
 330    p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
 331  // This is the thread spawned by the main thread, we spawn parallel units and await their completion.
 332    start_parallel_units (SUB (p), t);
 333    do {
 334      units_active = A68_FALSE;
 335      check_parallel_units (&units_active, pthread_self ());
 336      if (units_active) {
 337        try_change_thread (p);
 338      }
 339    } while (units_active);
 340    genie_abend_thread ();
 341    return (void *) NULL;
 342  }
 343  
 344  //! @brief Execute parallel clause.
 345  
 346  PROP_T genie_parallel (NODE_T * p)
 347  {
 348    int j;
 349    ADDR_T stack_s = 0, frame_s = 0;
 350    BYTE_T *system_stack_offset_s = NO_BYTE;
 351    if (is_main_thread ()) {
 352  // Spawn first thread and await its completion.
 353      pthread_attr_t new_at;
 354      size_t ss;
 355      BYTE_T stack_offset;
 356      A68_THREAD_CONTEXT *u;
 357      LOCK_THREAD;
 358      A68_PAR (abend_all_threads) = A68_FALSE;
 359      A68_PAR (exit_from_threads) = A68_FALSE;
 360      A68_PAR (par_return_code) = 0;
 361      A68_PAR (sp0) = stack_s = A68_SP;
 362      A68_PAR (fp0) = frame_s = A68_FP;
 363      system_stack_offset_s = A68 (system_stack_offset);
 364      A68_PAR (context_index) = 0;
 365  // Set up a thread for this unit.
 366      u = &(A68_PAR (context)[A68_PAR (context_index)]);
 367      UNIT (u) = p;
 368      STACK_USED (u) = SYSTEM_STACK_USED;
 369      THREAD_STACK_OFFSET (u) = NO_BYTE;
 370      CUR_PTR (&STACK (u)) = A68_SP;
 371      CUR_PTR (&FRAME (u)) = A68_FP;
 372      INI_PTR (&STACK (u)) = A68_PAR (sp0);
 373      INI_PTR (&FRAME (u)) = A68_PAR (fp0);
 374      SWAP (&STACK (u)) = NO_BYTE;
 375      SWAP (&FRAME (u)) = NO_BYTE;
 376      START (&STACK (u)) = NO_BYTE;
 377      START (&FRAME (u)) = NO_BYTE;
 378      BYTES (&STACK (u)) = 0;
 379      BYTES (&FRAME (u)) = 0;
 380      ACTIVE (u) = A68_TRUE;
 381  // Spawn the first thread and join it to await its completion.
 382      errno = 0;
 383      if (pthread_attr_init (&new_at) != 0) {
 384        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 385        exit_genie (p, A68_RUNTIME_ERROR);
 386      }
 387      if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
 388        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 389        exit_genie (p, A68_RUNTIME_ERROR);
 390      }
 391      if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
 392        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 393        exit_genie (p, A68_RUNTIME_ERROR);
 394      }
 395      ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
 396      if (pthread_create (&A68_PAR (parent_thread_id), &new_at, start_genie_parallel, NULL) != 0) {
 397        diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
 398        exit_genie (p, A68_RUNTIME_ERROR);
 399      }
 400      if (errno != 0) {
 401        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 402        exit_genie (p, A68_RUNTIME_ERROR);
 403      }
 404      PARENT (u) = A68_PAR (main_thread_id);
 405      ID (u) = A68_PAR (parent_thread_id);
 406      A68_PAR (context_index)++;
 407      save_stacks (A68_PAR (parent_thread_id));
 408      UNLOCK_THREAD;
 409      if (pthread_join (A68_PAR (parent_thread_id), NULL) != 0) {
 410        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 411        exit_genie (p, A68_RUNTIME_ERROR);
 412      }
 413  // The first spawned thread has completed, now clean up.
 414      for (j = 0; j < A68_PAR (context_index); j++) {
 415        if (ACTIVE (&(A68_PAR (context)[j])) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (main_thread_id)) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (parent_thread_id))) {
 416  // If threads are zapped it is possible that some are active at this point!.
 417          if (pthread_join (ID (&(A68_PAR (context)[j])), NULL) != 0) {
 418            diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 419            exit_genie (p, A68_RUNTIME_ERROR);
 420          }
 421        }
 422        if (SWAP (&STACK (&(A68_PAR (context)[j]))) != NO_BYTE) {
 423          a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
 424          SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
 425        }
 426        if (SWAP (&STACK (&(A68_PAR (context)[j]))) != NO_BYTE) {
 427          a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
 428          SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
 429        }
 430      }
 431  // Now every thread should have ended.
 432      A68_PAR (context_index) = 0;
 433      A68_SP = stack_s;
 434      A68_FP = frame_s;
 435      get_stack_size ();
 436      A68 (system_stack_offset) = system_stack_offset_s;
 437  // See if we ended execution in parallel clause.
 438      if (is_main_thread () && A68_PAR (exit_from_threads)) {
 439        exit_genie (p, A68_PAR (par_return_code));
 440      }
 441      if (is_main_thread () && ERROR_COUNT (&A68_JOB) > 0) {
 442        exit_genie (p, A68_RUNTIME_ERROR);
 443      }
 444  // See if we jumped out of the parallel clause(s).
 445      if (is_main_thread () && A68_PAR (abend_all_threads)) {
 446        JUMP_TO (TABLE (TAX (A68_PAR (jump_label)))) = UNIT (TAX (A68_PAR (jump_label)));
 447        longjmp (*(A68_PAR (jump_buffer)), 1);
 448      }
 449    } else {
 450  // Not in the main thread, spawn parallel units and await completion.
 451      BOOL_T units_active;
 452      pthread_t t = pthread_self ();
 453  // Spawn parallel units.
 454      start_parallel_units (SUB (p), t);
 455      do {
 456        units_active = A68_FALSE;
 457        check_parallel_units (&units_active, t);
 458        if (units_active) {
 459          try_change_thread (p);
 460        }
 461      } while (units_active);
 462    }
 463    return GPROP (p);
 464  }
 465  
 466  //! @brief OP LEVEL = (INT) SEMA
 467  
 468  void genie_level_sema_int (NODE_T * p)
 469  {
 470    A68_INT k;
 471    A68_REF s;
 472    POP_OBJECT (p, &k, A68_INT);
 473    s = heap_generator (p, M_INT, SIZE (M_INT));
 474    *DEREF (A68_INT, &s) = k;
 475    PUSH_REF (p, s);
 476  }
 477  
 478  //! @brief OP LEVEL = (SEMA) INT
 479  
 480  void genie_level_int_sema (NODE_T * p)
 481  {
 482    A68_REF s;
 483    POP_REF (p, &s);
 484    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 485    PUSH_VALUE (p, VALUE (DEREF (A68_INT, &s)), A68_INT);
 486  }
 487  
 488  //! @brief OP UP = (SEMA) VOID
 489  
 490  void genie_up_sema (NODE_T * p)
 491  {
 492    A68_REF s;
 493    if (is_main_thread ()) {
 494      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 495      exit_genie (p, A68_RUNTIME_ERROR);
 496    }
 497    POP_REF (p, &s);
 498    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 499    VALUE (DEREF (A68_INT, &s))++;
 500  }
 501  
 502  //! @brief OP DOWN = (SEMA) VOID
 503  
 504  void genie_down_sema (NODE_T * p)
 505  {
 506    A68_REF s;
 507    A68_INT *k;
 508    BOOL_T cont = A68_TRUE;
 509    if (is_main_thread ()) {
 510      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 511      exit_genie (p, A68_RUNTIME_ERROR);
 512    }
 513    POP_REF (p, &s);
 514    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 515    while (cont) {
 516      k = DEREF (A68_INT, &s);
 517      if (VALUE (k) <= 0) {
 518        save_stacks (pthread_self ());
 519        while (VALUE (k) <= 0) {
 520          if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
 521            genie_abend_thread ();
 522          }
 523          UNLOCK_THREAD;
 524  // Waiting a bit relaxes overhead.
 525          int rc = usleep (10);
 526          ASSERT (rc == 0 || errno == EINTR);
 527          LOCK_THREAD;
 528  // Garbage may be collected, so recalculate 'k'.
 529          k = DEREF (A68_INT, &s);
 530        }
 531        restore_stacks (pthread_self ());
 532        cont = A68_TRUE;
 533      } else {
 534        VALUE (k)--;
 535        cont = A68_FALSE;
 536      }
 537    }
 538  }
 539  
 540  #endif