Oforth Tutorial : Parallelism

I - Tasks and task console

In order to run parallel code, Oforth uses tasks.

A runnable (a method, a function, a block, ...) is used to create a task and, when the task is scheduled, the runnable with run concurrently.

Virtual machine schedules tasks on workers (OS threads) and keep track of all running or waiting tasks. #.w lists all workers and scheduled tasks. If #.w is run at start-up, you should see :

>.w
WORKR STAT SCHEDULE TASK WAIT
1 RUN default interpreter
ok


This list shows that there is currently one task running #interpreter on one worker (worker number 1). This task is waiting for console input and will execute commands entered. Here, we see that this task is running because it is currently performing command we have just entered : ".w". This list also shows that this task is running on default scheduler. We will talk about schedulers later.

Now let's create some tasks : the simpliest way to do this is to use #& on a runnable. This will create a new task that will run the runnable in parallel on default scheduler.

This first task we will create will sleep during 10 seconds, then say "Done" and leave. While the task is sleeping, we use #.w to see how this task appears :

>#[ System sleep(10000) "Done" println ] &
ok
>.w
WORKR STAT SCHEDULE TASK WAIT
1 WAIT default #[ System sleep ( 10000 " println ] _sleep
1 RUN default interpreter
ok


Now two tasks are running on worker 1 : the task we have just created and the interpreter. The first task is in WAIT state which means that it is waiting for something. The WAIT column shows that this task is sleeping. A waiting task does not consume any CPU : the worker will not run it before an event occurs that put back the task into RUN state.

When the task wakes up, "Done" is printed and the task ends. If #.w is used now, only the interpreter task remains.

Let's find out on which function the interpreter is waiting when there is no input : for this, we will launch a task that will perform #.w

>#.w &
ok
>WORKR STAT SCHEDULE TASK WAIT
1 RUN default .w
1 WAIT default interpreter _consoleReceiveKey
ok


#.w is running on a separate task (but on the same worker) and we see that the interpreter is waiting on #_consoleReceiveKey function : this function waits until a key is pressed on the keyboard.

II - Communication between tasks

Oforth does not provide any objects like locks, mutex, semaphores, ... to synchronise tasks and Oforth does not expose workers (OS threads).

Oforth parallelism model is :

  • Tasks are running in parallel.
  • Tasks share all words created into the dictionary (methods, classes, constants, ...) : these words are immutable objects.
  • Each task has its own data stack and can create mutable or immutables objects. These objects are not visible by other tasks.
  • A task can share only immutable objets with other tasks : as these objects cannot be updated, there is no need to synchronise access to these objects.
  • A task can use resources to send or receive immutable objects.
  • If a resource is not ready, task will enter into WAIT state until the resource is ready (or a timeout occurs).

Resources are shareable objects. They allow synchronisation between tasks : a tasks can wait on a resource until the resource is ready for the operation the task is performing.

Oforth resources inherits from Resource class (see lang/Parallel/Resource.of file). Among resources, we find :

  • Channels : they are FIFO queues and tasks can send immutable objets into a channel or receive an object from a channel.
  • Sockets : a task can send a buffer to a socket or receive a buffer from a socket.
  • Consoles : a task can receive input keys from a console or send a string on the console.

III - Channels

A channel is a FIFO queue shareable between tasks. One (or many) task can send immutables objects into a channel using #send and one (or many) task can receive thoses objects using #receive.

  • If a task uses #receive to receive an object from a channel and if the channel is empty, the task will enter into WAIT state until the channel is no more empty.
  • If a task uses #send to send an object into a channel and if the channel is full, the task will enter into WAIT state until the channel is no more full.

With thoses notions, we can already create a little ping-pong between 2 tasks :

: pong(n, pingCh, pongCh)
| i | n loop: i [ pingCh receive pongCh send drop ] ;

: pingpong(n)
| pingCh pongCh i |
Channel new ->pingCh
Channel new ->pongCh
#[ pong(n, pingCh, pongCh) ] &
n loop: i [ i pingCh send drop pongCh receive println ] ;


#pong function takes n (number of pingpong to do) and the two channels as parameters. n times, it receives an object on ping channel an resend the same object on pong channel. A drop is also done because #send returns a boolean (true if the send is ok, false otherwise).

#pingpong takes n as number of pingpong to execute. It creates the two channels, then launch a new task that run #pong. After that, #pingpong loops n times : it sends index i on ping channel and waits for return on pong channel. The returned object on pong channel is printed on console.

>pingpong(10)
1
2
3
4
5
6
7
8
9
10
ok


In order to verify those tasks on task console, we can replace the #println by just counting number of received messages. Now only the total will be printed on console and we will be able to check the task console :

: pingpong(n)
| pingCh pongCh i |
   Channel new ->pingCh
   Channel new ->pongCh
   #[ pong(n, pingCh, pongCh) ] &
   0 n loop: i [ i pingCh send drop pongCh receive drop ++ ] println ;


We also have to choose a big parameter to be able to use #.w before #pingpong finishes. Let's launch a ping pong with 10000000 as parameter, which means :

  • 10000000 objects send by #pingpong on ping channel
  • 10000000 objects received by #pong on ping channel
  • 10000000 objects resend by #pong on pong channel
  • and 10000000 objects received by #pingpong on pong channel

and bench it :

>#[ pingpong(10000000) ] bench
10000000
4504982
ok


Here, it took 4,5 seconds to exchange thoses messages between the two tasks and we have time to use #.w : we will launch #pingpong into a separate task and type #.w before the task ends.

>#[ #[ pingpong(10000000) ] bench ] &
ok
>.w
WORKR STAT SCHEDULE TASK WAIT
1 WAIT default #[ pong ( pongCh pingCh n ] _channelReceiveTimeout
1 WAIT default #[ #[ pingpong ( 10000000 ] bench ] _channelReceiveTimeout
1 RUN default interpreter
ok


There are now 3 tasks, all running on the same worker. During #interpreter run, #pong & #pingpong are waiting on #receive on a channel (pingCh pour #pong and pongCh for #pingpong).

Oforth can handle a lot of tasks running in parallel and creating tasks is far less expensive that creating threads.

We can try to check Oforth task creation mechanisms :

: tconsume(channel, n) | i | 0 n loop: i [ channel receive + ] println ;

: tcreate(n)
| i channel |
   Channel new ->channel
   #[ #[ tconsume(channel, n) ] bench ] &
   n loop: i [ #[ channel send(2) drop ] & ] ;


With this example, function #tcreate will :

  • Create a channel
  • Create n tasks that will just send integer 2 on this channel and finish.
  • Launch a task that will run (and bench) #tconsume function : this function receives all objects sent into the channel, sum them and print the result

So the bench result is when the last integer has been received on the channel and #tconsume stops.

>tcreate(1000000)
ok
>2000000
522570


2000000 is the sum of all objects received, and the last one has been received after half a second.

Now an little exercice : find a way to optimize #tcreate to win 20% of this elapsed time. If you have an idea, post it on the forum... :)

IV- All waiting functions

Here is a short description of all functions that can put a task into WAIT state :

Task :

  • Task : _suspendTask : Task is suspended and will wait until #resume is called on the task.
  • Task : _yieldTask : Task is yielding (stay scheduled but allow other task on the same worker to run).
  • Task : _sleep : Task is sleeping and will wait for sleeping delay.

Console :

  • Console : _consoleReceiveKey : Task is waiting for input on console and will wait until a new input is available.

Channel :

  • Channel : _channelReceiveTimeout : task will wait if channel is empty (or timeout occured)
  • Channel : _channelSendTimeout : task will wait if channel si full (or timeout occured)

Socket :

  • Socket : _tcpSockReceiveTimeout : task will wait if socket is not ready for recv (or timeour occured)
  • Socket : _tcpSockSendTimeout : task will wait if socket is not ready for send (or timeout occured)
  • Socket : _tcpSockAcceptTimeout : only if timeout is provided.
  • Socket : _tcpSockConnect : if connect would block, task is wait until connect is ok (or ko).

Like channels, a task can wait on socket operations.

Sockets created by Oforth are always NO_BLOCKING sockets. When a recv or a send would block the socket, the task will enter into WAIT mode, waiting for the socket to be ready for the operation.

Socket operations are fully compatible with Oforth tasks and scheduling. For instance, there is no problem to create one task by client connection to handle keep alive on server side : waiting for keep alive cost no CPU and, even if the task is blocked, the worker itself is not blocked at all and runs other tasks while the task is waiting for a new request.

The only exception is #accept : if no timeout is asked, #accept will truly block on accept() call in order to retrieve new connections as quilcky as possible and avoid to loose connections because the back log is full. Because of this, it it highly recommended to run a task that accept connections on a separate scheduler (see Scheduler chapter).

V - Emitters

Emitters are objects that can emit events and launch listeners on those events (see lang/parallel/Emitter.of).

Basically an emitter is an object with a dedicated channel (created during emitter initialization) and a dedicated task (launched during emitter initialization) that will wait for events on the channel and launch listeners that are waiting for events.

An event is a list with first object as event name and other objects as event parameters.

For instance, $addListener event (used to add a listener for an eventName) is : [ $addListener, eventName, runnable ]

Default emitter defines 5 events :

  • $addListener : add a listener to an event. When the event is emitters all listener are launched.
  • $addParallelListener : same as addListener, but listerners are launched into a separate task.
  • $close : close the emitter. No more event can be emitter but all remaining events into the channel will be handled.
  • $timeout : a timeout can be defined on the event channel. If no new event is emitted during delay, this event is emitted.
  • $context : allows to update emitter context.

Methods defined on emitter class are (see parallel/emitter.of) :

  • #emit(aEvent) : emit event --> all listener waiting for this event will be launched.
  • #onEvent(aEventName, aRunnable) : add aRunnable as listener for aEventName
  • #onEventParallel(aEventName, aRunnable) : same but, on emit, the listener will run into a dedicated task.
  • #close : close the emitter.

Let's write a small emitter :

: myemitter
| e |
    Emitter newDefault ->e
    e onEvent($foo, #[ "foo" println ])
    e emit($foo)
    e onEvent($foo, #[ "bar" println ])
    e emit($foo)
    e close;


And another one that launch listeners into parallel tasks :

func: myemitter2
| e |
   Emitter newDefault ->e
   e onEventParallel($foo, #[ System sleep(3000) "foo" println ])
   e emit($foo)
   e onEventParallel($foo, #[ System sleep(3000) "bar" println ])
   e emit($foo)
   e close;


And another one with a timeout delay of 1 second :

: myemitter3
| e |
   Emitter new(System.Scheduler, 1000000) ->e
   e onEventParallel($timeout, #[ "timeout occured" println ])
   e onEventParallel($foo, #[ System sleep(3000) "foo" println ])
   e emit($foo)
   e onEventParallel($foo, #[ System sleep(3000) "bar" println ])
   e emit($foo)
   System sleep(10000)
   "Closing emitter" println
   e close;


TcpServer and NodeServer (see corresponding packages) are subclasses of Emitter class.

To be continued...

Franck Bensusan

Oforth Documentation

Current available documentation :

I - Tutorials

  • Oforth basics : here
  • Oforth syntax : here
  • Writing Oforth programs : here
  • Oforth classes : here
  • Oforth parallelism : here
  • Oforth node : Work in progress...

II - Reference

  • Lang package reference : here