rts-parallel.c

     
   1  //! @file rts-parallel.c
   2  //! @author J. Marcel van der Veer
   3  
   4  //! @section Copyright
   5  //!
   6  //! This file is part of Algol68G - an Algol 68 compiler-interpreter.
   7  //! Copyright 2001-2024 J. Marcel van der Veer [algol68g@xs4all.nl].
   8  
   9  //! @section License
  10  //!
  11  //! This program is free software; you can redistribute it and/or modify it 
  12  //! under the terms of the GNU General Public License as published by the 
  13  //! Free Software Foundation; either version 3 of the License, or 
  14  //! (at your option) any later version.
  15  //!
  16  //! This program is distributed in the hope that it will be useful, but 
  17  //! WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 
  18  //! or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 
  19  //! more details. You should have received a copy of the GNU General Public 
  20  //! License along with this program. If not, see [http://www.gnu.org/licenses/].
  21  
  22  //! @section Synopsis 
  23  //!
  24  //! Parallel clause implementation.
  25  
  26  #include "a68g.h"
  27  #include "a68g-genie.h"
  28  #include "a68g-frames.h"
  29  #include "a68g-prelude.h"
  30  
  31  // This code implements a parallel clause for Algol68G. 
  32  // The parallel clause has been included for educational purposes; 
  33  // this implementation is not the most efficient one.
  34  // 
  35  // POSIX threads are used to have separate registers and stack for each concurrent
  36  // unit. Algol68G parallel units behave as POSIX threads - they have private 
  37  // stacks. Hence an assignation to an object in another thread, does not change 
  38  // that object in that other thread. Also jumps between threads are forbidden.
  39  
  40  #if defined (BUILD_PARALLEL_CLAUSE)
  41  
  42  // static pthread_mutex_t unit_sema = PTHREAD_MUTEX_INITIALIZER;
  43  
  44  void save_stacks (pthread_t);
  45  void restore_stacks (pthread_t);
  46  
  47  #define SAVE_STACK(stk, st, si) {\
  48    A68_STACK_DESCRIPTOR *s = (stk);\
  49    BYTE_T *start = (st);\
  50    int size = (si);\
  51    if (size > 0) {\
  52      if (!((s != NULL) && (BYTES (s) > 0) && (size <= BYTES (s)))) {\
  53        a68_free (SWAP (s));\
  54        SWAP (s) = (BYTE_T *) get_heap_space ((size_t) size);\
  55        ABEND (SWAP (s) == NULL, ERROR_OUT_OF_CORE, __func__);\
  56      }\
  57      START (s) = start;\
  58      BYTES (s) = size;\
  59      COPY (SWAP (s), start, size);\
  60    } else {\
  61      START (s) = start;\
  62      BYTES (s) = 0;\
  63      a68_free (SWAP (s));\
  64      SWAP (s) = NO_BYTE;\
  65    }}
  66  
  67  #define RESTORE_STACK(stk) {\
  68    A68_STACK_DESCRIPTOR *s = (stk);\
  69    if (s != NULL && BYTES (s) > 0) {\
  70      COPY (START (s), SWAP (s), BYTES (s));\
  71    }}
  72  
  73  #define GET_THREAD_INDEX(z, ptid) {\
  74    int _k_;\
  75    pthread_t _tid_ = (ptid);\
  76    (z) = -1;\
  77    for (_k_ = 0; _k_ < A68_PAR (context_index) && (z) == -1; _k_++) {\
  78      if (SAME_THREAD (_tid_, ID (&(A68_PAR (context)[_k_])))) {\
  79        (z) = _k_;\
  80      }\
  81    }\
  82    ABEND ((z) == -1, ERROR_INTERNAL_CONSISTENCY, __func__);\
  83    }
  84  
  85  #define ERROR_THREAD_FAULT "thread fault"
  86  
  87  #define LOCK_THREAD {\
  88    ABEND (pthread_mutex_lock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
  89    }
  90  
  91  #define UNLOCK_THREAD {\
  92    ABEND (pthread_mutex_unlock (&A68_PAR (unit_sema)) != 0, ERROR_THREAD_FAULT, __func__);\
  93    }
  94  
  95  //! @brief Does system stack grow up or down?.
  96  
  97  static inline int stack_direction (BYTE_T * lwb)
  98  {
  99    BYTE_T upb;
 100    if (&upb > lwb) {
 101      return (int) sizeof (BYTE_T);
 102    } else if (&upb < lwb) {
 103      return - (int) sizeof (BYTE_T);
 104    } else {
 105      ASSERT (A68_FALSE);
 106      return 0; // Pro forma
 107    }
 108  }
 109  
 110  //! @brief Whether we are in the main thread.
 111  
 112  BOOL_T is_main_thread (void)
 113  {
 114    return SAME_THREAD (A68_PAR (main_thread_id), pthread_self ());
 115  }
 116  
 117  //! @brief End a thread, beit normally or not.
 118  
 119  void genie_abend_thread (void)
 120  {
 121    int k;
 122    GET_THREAD_INDEX (k, pthread_self ());
 123    ACTIVE (&(A68_PAR (context)[k])) = A68_FALSE;
 124    UNLOCK_THREAD;
 125    pthread_exit (NULL);
 126  }
 127  
 128  //! @brief When we end execution in a parallel clause we zap all threads.
 129  
 130  void genie_set_exit_from_threads (int ret)
 131  {
 132    A68_PAR (abend_all_threads) = A68_TRUE;
 133    A68_PAR (exit_from_threads) = A68_TRUE;
 134    A68_PAR (par_return_code) = ret;
 135    genie_abend_thread ();
 136  }
 137  
 138  //! @brief When we jump out of a parallel clause we zap all threads.
 139  
 140  void genie_abend_all_threads (NODE_T * p, jmp_buf * jump_stat, NODE_T * label)
 141  {
 142    (void) p;
 143    A68_PAR (abend_all_threads) = A68_TRUE;
 144    A68_PAR (exit_from_threads) = A68_FALSE;
 145    A68_PAR (jump_buffer) = jump_stat;
 146    A68_PAR (jump_label) = label;
 147    if (!is_main_thread ()) {
 148      genie_abend_thread ();
 149    }
 150  }
 151  
 152  //! @brief Save this thread and try to start another.
 153  
 154  void try_change_thread (NODE_T * p)
 155  {
 156    if (is_main_thread ()) {
 157      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 158      exit_genie (p, A68_RUNTIME_ERROR);
 159    } else {
 160  // Release the unit_sema so another thread can take it up ...
 161      save_stacks (pthread_self ());
 162      UNLOCK_THREAD;
 163  // ... and take it up again!.
 164      LOCK_THREAD;
 165      restore_stacks (pthread_self ());
 166    }
 167  }
 168  
 169  //! @brief Store the stacks of threads.
 170  
 171  void save_stacks (pthread_t t)
 172  {
 173    int k;
 174    GET_THREAD_INDEX (k, t);
 175  // Store stack pointers.
 176    CUR_PTR (&FRAME (&(A68_PAR (context)[k]))) = A68_FP;
 177    CUR_PTR (&STACK (&(A68_PAR (context)[k]))) = A68_SP;
 178  // Swap out evaluation stack.
 179    ADDR_T p = A68_SP;
 180    ADDR_T q = INI_PTR (&STACK (&(A68_PAR (context)[k])));
 181    SAVE_STACK (&(STACK (&(A68_PAR (context)[k]))), STACK_ADDRESS (q), p - q);
 182  // Swap out frame stack.
 183    p = A68_FP;
 184    q = INI_PTR (&FRAME (&(A68_PAR (context)[k])));
 185    ADDR_T u = p + FRAME_SIZE (p);
 186    ADDR_T v = q + FRAME_SIZE (q);
 187  // Consider the embedding thread.
 188    SAVE_STACK (&(FRAME (&(A68_PAR (context)[k]))), FRAME_ADDRESS (v), u - v);
 189  }
 190  
 191  //! @brief Restore stacks of thread.
 192  
 193  void restore_stacks (pthread_t t)
 194  {
 195    if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
 196      genie_abend_thread ();
 197    } else {
 198      int k;
 199      GET_THREAD_INDEX (k, t);
 200  // Restore stack pointers.
 201      get_stack_size ();
 202      A68 (system_stack_offset) = THREAD_STACK_OFFSET (&(A68_PAR (context)[k]));
 203      A68_FP = CUR_PTR (&FRAME (&(A68_PAR (context)[k])));
 204      A68_SP = CUR_PTR (&STACK (&(A68_PAR (context)[k])));
 205  // Restore stacks.
 206      RESTORE_STACK (&(STACK (&(A68_PAR (context)[k]))));
 207      RESTORE_STACK (&(FRAME (&(A68_PAR (context)[k]))));
 208    }
 209  }
 210  
 211  //! @brief Check whether parallel units have terminated.
 212  
 213  void check_parallel_units (BOOL_T * active, pthread_t parent)
 214  {
 215    for (int k = 0; k < A68_PAR (context_index); k++) {
 216      if (parent == PARENT (&(A68_PAR (context)[k]))) {
 217        (*active) |= ACTIVE (&(A68_PAR (context)[k]));
 218      }
 219    }
 220  }
 221  
 222  //! @brief Execute one unit from a PAR clause.
 223  
 224  void *start_unit (void *arg)
 225  {
 226    BYTE_T stack_offset;
 227    (void) arg;
 228    LOCK_THREAD;
 229    pthread_t t = pthread_self ();
 230    int k;
 231    GET_THREAD_INDEX (k, t);
 232    THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&A68_PAR (context)[k]));
 233    restore_stacks (t);
 234    NODE_T *p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
 235    GENIE_UNIT_TRACE (p);
 236    genie_abend_thread ();
 237    return (void *) NULL;
 238  }
 239  
 240  //! @brief Execute parallel units.
 241  
 242  void start_parallel_units (NODE_T * p, pthread_t parent)
 243  {
 244    for (; p != NO_NODE; FORWARD (p)) {
 245      if (IS (p, UNIT)) {
 246        pthread_t new_id;
 247        pthread_attr_t new_at;
 248        size_t ss;
 249        BYTE_T stack_offset;
 250        A68_THREAD_CONTEXT *u;
 251  // Set up a thread for this unit.
 252        if (A68_PAR (context_index) >= THREAD_MAX) {
 253          static BUFFER msg;
 254          a68_bufprt (msg, SNPRINTF_SIZE, "platform supports %d parallel units", THREAD_MAX);
 255          diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OVERFLOW, msg);
 256          exit_genie (p, A68_RUNTIME_ERROR);
 257        }
 258  // Fill out a context for this thread.
 259        u = &((A68_PAR (context)[A68_PAR (context_index)]));
 260        UNIT (u) = p;
 261        STACK_USED (u) = SYSTEM_STACK_USED;
 262        THREAD_STACK_OFFSET (u) = NO_BYTE;
 263        CUR_PTR (&STACK (u)) = A68_SP;
 264        CUR_PTR (&FRAME (u)) = A68_FP;
 265        INI_PTR (&STACK (u)) = A68_PAR (sp0);
 266        INI_PTR (&FRAME (u)) = A68_PAR (fp0);
 267        SWAP (&STACK (u)) = NO_BYTE;
 268        SWAP (&FRAME (u)) = NO_BYTE;
 269        START (&STACK (u)) = NO_BYTE;
 270        START (&FRAME (u)) = NO_BYTE;
 271        BYTES (&STACK (u)) = 0;
 272        BYTES (&FRAME (u)) = 0;
 273        ACTIVE (u) = A68_TRUE;
 274  // Create the thread.
 275        errno = 0;
 276        if (pthread_attr_init (&new_at) != 0) {
 277          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 278          exit_genie (p, A68_RUNTIME_ERROR);
 279        }
 280        if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
 281          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 282          exit_genie (p, A68_RUNTIME_ERROR);
 283        }
 284        if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
 285          diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 286          exit_genie (p, A68_RUNTIME_ERROR);
 287        }
 288        ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
 289        if (pthread_create (&new_id, &new_at, start_unit, NULL) != 0) {
 290          diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
 291          exit_genie (p, A68_RUNTIME_ERROR);
 292        }
 293        PARENT (u) = parent;
 294        ID (u) = new_id;
 295        A68_PAR (context_index)++;
 296        save_stacks (new_id);
 297      } else {
 298        start_parallel_units (SUB (p), parent);
 299      }
 300    }
 301  }
 302  
 303  //! @brief Execute one unit from a PAR clause.
 304  
 305  void *start_genie_parallel (void *arg)
 306  {
 307    BYTE_T stack_offset;
 308    (void) arg;
 309    LOCK_THREAD;
 310    pthread_t t = pthread_self ();
 311    int k;
 312    GET_THREAD_INDEX (k, t);
 313    THREAD_STACK_OFFSET (&(A68_PAR (context)[k])) = (BYTE_T *) (&stack_offset - stack_direction (&stack_offset) * STACK_USED (&(A68_PAR (context)[k])));
 314    restore_stacks (t);
 315    NODE_T *p = (NODE_T *) (UNIT (&(A68_PAR (context)[k])));
 316  // This is the thread spawned by the main thread, we spawn parallel units and await their completion.
 317    start_parallel_units (SUB (p), t);
 318    BOOL_T units_active;
 319    do {
 320      units_active = A68_FALSE;
 321      check_parallel_units (&units_active, pthread_self ());
 322      if (units_active) {
 323        try_change_thread (p);
 324      }
 325    } while (units_active);
 326    genie_abend_thread ();
 327    return (void *) NULL;
 328  }
 329  
 330  //! @brief Execute parallel clause.
 331  
 332  PROP_T genie_parallel (NODE_T * p)
 333  {
 334    ADDR_T stack_s = 0, frame_s = 0;
 335    BYTE_T *system_stack_offset_s = NO_BYTE;
 336    if (is_main_thread ()) {
 337  // Spawn first thread and await its completion.
 338      pthread_attr_t new_at;
 339      size_t ss;
 340      BYTE_T stack_offset;
 341      A68_THREAD_CONTEXT *u;
 342      LOCK_THREAD;
 343      A68_PAR (abend_all_threads) = A68_FALSE;
 344      A68_PAR (exit_from_threads) = A68_FALSE;
 345      A68_PAR (par_return_code) = 0;
 346      A68_PAR (sp0) = stack_s = A68_SP;
 347      A68_PAR (fp0) = frame_s = A68_FP;
 348      system_stack_offset_s = A68 (system_stack_offset);
 349      A68_PAR (context_index) = 0;
 350  // Set up a thread for this unit.
 351      u = &(A68_PAR (context)[A68_PAR (context_index)]);
 352      UNIT (u) = p;
 353      STACK_USED (u) = SYSTEM_STACK_USED;
 354      THREAD_STACK_OFFSET (u) = NO_BYTE;
 355      CUR_PTR (&STACK (u)) = A68_SP;
 356      CUR_PTR (&FRAME (u)) = A68_FP;
 357      INI_PTR (&STACK (u)) = A68_PAR (sp0);
 358      INI_PTR (&FRAME (u)) = A68_PAR (fp0);
 359      SWAP (&STACK (u)) = NO_BYTE;
 360      SWAP (&FRAME (u)) = NO_BYTE;
 361      START (&STACK (u)) = NO_BYTE;
 362      START (&FRAME (u)) = NO_BYTE;
 363      BYTES (&STACK (u)) = 0;
 364      BYTES (&FRAME (u)) = 0;
 365      ACTIVE (u) = A68_TRUE;
 366  // Spawn the first thread and join it to await its completion.
 367      errno = 0;
 368      if (pthread_attr_init (&new_at) != 0) {
 369        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 370        exit_genie (p, A68_RUNTIME_ERROR);
 371      }
 372      if (pthread_attr_setstacksize (&new_at, (size_t) A68 (stack_size)) != 0) {
 373        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 374        exit_genie (p, A68_RUNTIME_ERROR);
 375      }
 376      if (pthread_attr_getstacksize (&new_at, &ss) != 0) {
 377        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 378        exit_genie (p, A68_RUNTIME_ERROR);
 379      }
 380      ABEND ((size_t) ss != (size_t) A68 (stack_size), ERROR_ACTION, __func__);
 381      if (pthread_create (&A68_PAR (parent_thread_id), &new_at, start_genie_parallel, NULL) != 0) {
 382        diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_CANNOT_CREATE);
 383        exit_genie (p, A68_RUNTIME_ERROR);
 384      }
 385      if (errno != 0) {
 386        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 387        exit_genie (p, A68_RUNTIME_ERROR);
 388      }
 389      PARENT (u) = A68_PAR (main_thread_id);
 390      ID (u) = A68_PAR (parent_thread_id);
 391      A68_PAR (context_index)++;
 392      save_stacks (A68_PAR (parent_thread_id));
 393      UNLOCK_THREAD;
 394      if (pthread_join (A68_PAR (parent_thread_id), NULL) != 0) {
 395        diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 396        exit_genie (p, A68_RUNTIME_ERROR);
 397      }
 398  // The first spawned thread has completed, now clean up.
 399      for (int j = 0; j < A68_PAR (context_index); j++) {
 400        if (ACTIVE (&(A68_PAR (context)[j])) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (main_thread_id)) && OTHER_THREAD (ID (&(A68_PAR (context)[j])), A68_PAR (parent_thread_id))) {
 401  // If threads are zapped it is possible that some are active at this point!.
 402          if (pthread_join (ID (&(A68_PAR (context)[j])), NULL) != 0) {
 403            diagnostic (A68_RUNTIME_ERROR, p, ERROR_THREAD_FAULT);
 404            exit_genie (p, A68_RUNTIME_ERROR);
 405          }
 406        }
 407        a68_free (SWAP (&STACK (&(A68_PAR (context)[j]))));
 408        SWAP (&STACK (&(A68_PAR (context)[j]))) = NO_BYTE;
 409      }
 410  // Now every thread should have ended.
 411      A68_PAR (context_index) = 0;
 412      A68_SP = stack_s;
 413      A68_FP = frame_s;
 414      get_stack_size ();
 415      A68 (system_stack_offset) = system_stack_offset_s;
 416  // See if we ended execution in parallel clause.
 417      if (is_main_thread () && A68_PAR (exit_from_threads)) {
 418        exit_genie (p, A68_PAR (par_return_code));
 419      }
 420      if (is_main_thread () && ERROR_COUNT (&A68_JOB) > 0) {
 421        exit_genie (p, A68_RUNTIME_ERROR);
 422      }
 423  // See if we jumped out of the parallel clause(s).
 424      if (is_main_thread () && A68_PAR (abend_all_threads)) {
 425        JUMP_TO (TABLE (TAX (A68_PAR (jump_label)))) = UNIT (TAX (A68_PAR (jump_label)));
 426        longjmp (*(A68_PAR (jump_buffer)), 1);
 427      }
 428    } else {
 429  // Not in the main thread, spawn parallel units and await completion.
 430      BOOL_T units_active;
 431      pthread_t t = pthread_self ();
 432  // Spawn parallel units.
 433      start_parallel_units (SUB (p), t);
 434      do {
 435        units_active = A68_FALSE;
 436        check_parallel_units (&units_active, t);
 437        if (units_active) {
 438          try_change_thread (p);
 439        }
 440      } while (units_active);
 441    }
 442    return GPROP (p);
 443  }
 444  
 445  //! @brief OP LEVEL = (INT) SEMA
 446  
 447  void genie_level_sema_int (NODE_T * p)
 448  {
 449    A68_INT k;
 450    POP_OBJECT (p, &k, A68_INT);
 451    A68_REF s = heap_generator (p, M_INT, SIZE (M_INT));
 452    *DEREF (A68_INT, &s) = k;
 453    PUSH_REF (p, s);
 454  }
 455  
 456  //! @brief OP LEVEL = (SEMA) INT
 457  
 458  void genie_level_int_sema (NODE_T * p)
 459  {
 460    A68_REF s;
 461    POP_REF (p, &s);
 462    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 463    PUSH_VALUE (p, VALUE (DEREF (A68_INT, &s)), A68_INT);
 464  }
 465  
 466  //! @brief OP UP = (SEMA) VOID
 467  
 468  void genie_up_sema (NODE_T * p)
 469  {
 470    if (is_main_thread ()) {
 471      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 472      exit_genie (p, A68_RUNTIME_ERROR);
 473    }
 474    A68_REF s;
 475    POP_REF (p, &s);
 476    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 477    VALUE (DEREF (A68_INT, &s))++;
 478  }
 479  
 480  //! @brief OP DOWN = (SEMA) VOID
 481  
 482  void genie_down_sema (NODE_T * p)
 483  {
 484    if (is_main_thread ()) {
 485      diagnostic (A68_RUNTIME_ERROR, p, ERROR_PARALLEL_OUTSIDE);
 486      exit_genie (p, A68_RUNTIME_ERROR);
 487    }
 488    A68_REF s;
 489    POP_REF (p, &s);
 490    CHECK_INIT (p, INITIALISED (&s), M_SEMA);
 491    BOOL_T cont = A68_TRUE;
 492    while (cont) {
 493      A68_INT *k = DEREF (A68_INT, &s);
 494      if (VALUE (k) <= 0) {
 495        save_stacks (pthread_self ());
 496        while (VALUE (k) <= 0) {
 497          if (ERROR_COUNT (&A68_JOB) > 0 || A68_PAR (abend_all_threads)) {
 498            genie_abend_thread ();
 499          }
 500          UNLOCK_THREAD;
 501  // Waiting a bit relaxes overhead.
 502          int ret = a68_usleep (10);
 503          ASSERT (ret == 0 || errno == EINTR);
 504          LOCK_THREAD;
 505  // Garbage may be collected, so recalculate 'k'.
 506          k = DEREF (A68_INT, &s);
 507        }
 508        restore_stacks (pthread_self ());
 509        cont = A68_TRUE;
 510      } else {
 511        VALUE (k)--;
 512        cont = A68_FALSE;
 513      }
 514    }
 515  }
 516  
 517  #endif
     


© 2002-2024 J.M. van der Veer (jmvdveer@xs4all.nl)