Cooperative threads is what the Coro module gives you. Obviously, you have to "use" it first:
use Coro;
To create a thread, you can use the "async" function that automatically gets exported from that module:
async {
print "hello\n";
};
Async expects a code block as first argument (in indirect object notation). You can actually pass it
extra arguments, and these will end up in @_ when executing the codeblock, but since it is a closure, you
can also just refer to any lexical variables that are currently visible.
The above lines create a thread, but if you save them in a file and execute it as a perl program, you
will not get any output.
The reasons is that, although you created a thread, and the thread is ready to execute (because "async"
puts it into the so-called readyqueue), it never gets any CPU time to actually execute, as the main
program - which also is a thread almost like any other - never gives up the CPU but instead exits the
whole program, by running off the end of the file. Since Coro threads are cooperative, the main thread
has to cooperate, and give up the CPU.
To explicitly give up the CPU, use the "cede" function (which is often called "yield" in other thread
implementations):
use Coro;
async {
print "hello\n";
};
cede;
Running the above prints "hello" and exits.
Now, this is not very interesting, so let's try a slightly more interesting program:
use Coro;
async {
print "async 1\n";
cede;
print "async 2\n";
};
print "main 1\n";
cede;
print "main 2\n";
cede;
Running this program prints:
main 1
async 1
main 2
async 2
This nicely illustrates the non-local jump ability: the main program prints the first line, and then
yields the CPU to whatever other threads there are. And there is one other, which runs and prints "async
1", and itself yields the CPU. Since the only other thread available is the main program, it continues
running and so on.
Let's look at the example in more detail: "async" first creates a new thread. All new threads start in a
suspended state. To make them run, they need to be put into the ready queue, which is the second thing
that "async" does. Each time a thread gives up the CPU, Coro runs a so-called scheduler. The scheduler
selects the next thread from the ready queue, removes it from the queue, and runs it.
"cede" also does two things: first it puts the running thread into the ready queue, and then it jumps
into the scheduler. This has the effect of giving up the CPU, but also ensures that, eventually, the
thread gets run again.
In fact, "cede" could be implemented like this:
sub my_cede {
$Coro::current->ready;
schedule;
}
This works because $Coro::current always contains the currently running thread, and the scheduler itself
can be called directly via "Coro::schedule".
What is the effect of just calling "schedule" without putting the current thread into the ready queue
first? Simple: the scheduler selects the next ready thread and runs it. And the current thread, as it
hasn't been put into the ready queue, will go to sleep until something wakes it up. If. Ever.
The following example remembers the current thread in a variable, creates a thread and then puts the main
program to sleep.
The newly created thread uses rand to wake up the main thread by calling its "ready" method - or not.
use Coro;
my $wakeme = $Coro::current;
async {
$wakeme->ready if 0.5 > rand;
};
schedule;
Now, when you run it, one of two things happen: Either the "async" thread wakes up the main thread again,
in which case the program silently exits, or it doesn't, in which case you get something like this:
FATAL: deadlock detected.
PID SC RSS USES Description Where
31976480 -C 19k 0 [main::] [program:9]
32223768 UC 12k 1 [Coro.pm:691]
32225088 -- 2068 1 [coro manager] [Coro.pm:691]
32225184 N- 216 0 [unblock_sub scheduler] -
Why is that? Well, when the "async" thread runs into the end of its block, it will be terminated (via a
call to "Coro::terminate") and the scheduler is called again. Since the "async" thread hasn't woken up
the main thread, and there aren't any other threads, there is nothing to wake up, and the program cannot
continue. Since there are threads that could be running (main) but none are ready to do so, Coro signals
a deadlock - no progress is possible. Usually you also get a listing of all threads, which might help you
track down the problem.
However, there is an important case where progress is, in fact, possible, despite no threads being ready
- namely in an event-based program. In such a program, some threads could wait for external events, such
as a timeout, or some data to arrive on a socket.
Since a deadlock in such a case would not be very useful, there is a module named Coro::AnyEvent that
integrates threads into an event loop. It configures Coro in a way that, instead of "die"ing with an
error message, it instead runs the event loop in the hope of receiving an event that will wake up some
thread.
Semaphoresandotherlocks
Using only "ready", "cede" and "schedule" to synchronise threads is difficult, especially if many threads
are ready at the same time. Coro supports a number of primitives to help synchronising threads in easier
ways. The first such primitives is Coro::Semaphore, which implements counting semaphores (binary
semaphores are available as Coro::Signal, and there are Coro::SemaphoreSet and Coro::RWLock primitives as
well).
Counting semaphores, in a sense, store a count of resources. You can remove/allocate/reserve a resource
by calling the "->down" method, which decrements the counter, and you can add or free a resource by
calling the "->up" method, which increments the counter. If the counter is 0, then "->down" cannot
decrement the semaphore - it is locked - and the thread will wait until a count becomes available again.
Here is an example:
use Coro;
my $sem = new Coro::Semaphore 0; # a locked semaphore
async {
print "unlocking semaphore\n";
$sem->up;
};
print "trying to lock semaphore\n";
$sem->down;
print "we got it!\n";
This program creates a locked semaphore (a semaphore with count 0) and tries to lock it (by trying to
decrement it's counter in the "down" method). Since the semaphore count is already exhausted, this will
block the main thread until the semaphore becomes available.
This yields the CPU to the only other read thread in the process,t he one created with "async", which
unlocks the semaphore (and instantly terminates itself by returning).
Since the semaphore is now available, the main program locks it and continues: "we got it!".
Counting semaphores are most often used to lock resources, or to exclude other threads from accessing or
using a resource. For example, consider a very costly function (that temporarily allocates a lot of ram,
for example). You wouldn't want to have many threads calling this function at the same time, so you use a
semaphore:
my $lock = new Coro::Semaphore; # unlocked initially - default is 1
sub costly_function {
$lock->down; # acquire semaphore
# do costly operation that blocks
$lock->up; # unlock it
}
No matter how many threads call "costly_function", only one will run the body of it, all others will wait
in the "down" call. If you want to limit the number of concurrent executions to five, you could create
the semaphore with an initial count of 5.
Why does the comment mention an "operation the blocks"? Again, that's because coro's threads are
cooperative: unless "costly_function" willingly gives up the CPU, other threads of control will simply
not run. This makes locking superfluous in cases where the function itself never gives up the CPU, but
when dealing with the outside world, this is rare.
Now consider what happens when the code "die"s after executing "down", but before "up". This will leave
the semaphore in a locked state, which often isn't what you want - imagine the caller expecting a failure
and wrapping the call into an "eval {}".
So normally you would want to free the lock again if execution somehow leaves the function, whether
"normally" or via an exception. Here the "guard" method proves useful:
my $lock = new Coro::Semaphore; # unlocked initially
sub costly_function {
my $guard = $lock->guard; # acquire guard
# do costly operation that blocks
}
The "guard" method "down"s the semaphore and returns a so-called guard object. Nothing happens as long as
there are references to it (i.e. it is in scope somehow), but when all references are gone, for example,
when "costly_function" returns or throws an exception, it will automatically call "up" on the semaphore,
no way to forget it. Even when the thread gets "cancel"ed by another thread will the guard object ensure
that the lock is freed.
This concludes this introduction to semaphores and locks. Apart from Coro::Semaphore and Coro::Signal,
there is also a reader-writer lock (Coro::RWLock) and a semaphore set (Coro::SemaphoreSet). All of these
come with their own manpage.
Channels
Semaphores are fine, but usually you want to communicate by exchanging data as well. Of course, you can
just use some locks, and array of sorts and use that to communicate, but there is a useful abstraction
for communicaiton between threads: Coro::Channel. Channels are the Coro equivalent of a unix pipe (and
very similar to AmigaOS message ports :) - you can put stuff into it on one side, and read data from it
on the other.
Here is a simple example that creates a thread and sends numbers to it. The thread calculates the square
of each number and puts that into another channel, which the main thread reads the result from:
use Coro;
my $calculate = new Coro::Channel;
my $result = new Coro::Channel;
async {
# endless loop
while () {
my $num = $calculate->get; # read a number
$num **= 2; # square it
$result->put ($num); # put the result into the result queue
}
};
for (1, 2, 5, 10, 77) {
$calculate->put ($_);
print "$_ ** 2 = ", $result->get, "\n";
}
Gives:
1 ** 2 = 1
2 ** 2 = 4
5 ** 2 = 25
10 ** 2 = 100
77 ** 2 = 5929
Both "get" and "put" methods can block the current thread: "get" first checks whether there is some data
available, and if not, it block the current thread until some data arrives. "put" can also block, as each
Channel has a "maximum item capacity", i.e. you cannot store more than a specific number of items, which
can be configured when the Channel gets created.
In the above example, "put" never blocks, as the default capacity of a Channel is very high. So the for
loop first puts data into the channel, then tries to "get" the result. Since the async thread hasn't put
anything in there yet (on the first iteration it hasn't even run yet), the result Channel is still empty,
so the main thread blocks.
Since the only other runnable/ready thread at this point is the squaring thread, it will be woken up,
will "get" the number, square it and put it into the result channel, waking up the main thread again. It
will still continue to run, as waking up other threads just puts them into the ready queue, nothing less,
nothing more.
Only when the async thread tries to "get" the next number from the calculate channel will it block
(because nothing is there yet) and the main thread will continue running. And so on.
This illustrates a general principle used by Coro: a thread will onlyeverblock when it has to. Neither
the Coro module itself nor any of its submodules will ever give up the CPU unless they have to, because
they wait for some event to happen.
Be careful, however: when multiple threads put numbers into $calculate and read from $result, they won't
know which result is theirs. The solution for this is to either use a semaphore, or send not just the
number, but also your own private result channel.
Whatismine,whatisours?
What, exactly, constitutes a thread? Obviously it contains the current point of execution. Not so
obviously, it also has to include all lexical variables, that means, every thread has its own set of
lexical variables.
To see why this is necessary, consider this program:
use Coro;
sub printit {
my ($string) = @_;
cede;
print $string;
}
async { printit "Hello, " };
async { printit "World!\n" };
cede; cede; # do it
The above prints "Hello, World!\n". If "printit" wouldn't have its own per-thread $string variable, it
would probably print "World!\nWorld\n", which is rather unexpected, and would make it very difficult to
make good use of threads.
To make things run smoothly, there are quite a number of other things that are per-thread:
$_, @_, $@ and the regex result vars, $&, %+, $1, $2, ...
$_ is used much like a local variable, so it gets localised per-thread. The same is true for regex
results ($1, $2 and so on).
@_ contains the arguments, so like lexicals, it also must be per-thread.
$@ is not obviously required to be per-thread, but it is quite useful.
$/ and the default output file handle
Threads most often block when doing I/O. Since $/ is used when reading lines, it would be very
inconvenient if it were a shared variable, so it is per-thread.
The default output handle (see "select") is a difficult case: sometimes being global is preferable,
sometimes per-thread is preferable. Since per-thread seems to be more common, it is per-thread.
$SIG{__DIE__} and $SIG{__WARN__}
If these weren't per-thread, then common constructs such as:
eval {
local $SIG{__DIE__} = sub { ... };
...
};
Would not allow coroutine switching. Since exception-handling is per-thread, those variables should
be per-thread as well.
Lots of other esoteric stuff
For example, $^H is per-thread. Most of the additional per-thread state is not directly visible to
Perl, but required to make the interpreter work. You won't normally notice these.
Everything else is shared between all threads. For example, the globals $a and $b are shared. When does
that matter? When using "sort", these variables become special, and therefore, switching threads when
sorting might have surprising results.
Other examples are the $!, errno, $., the current input line number, $,, "$\", $" and many other special
variables.
While in some cases a good argument could be made for localising them to the thread, they are rarely
used, and sometimes hard to localise.
Future versions of Coro might include more per-thread state when it becomes a problem.
Debugging
Sometimes it can be useful to find out what each thread is doing (or which threads exist in the first
place). The Coro::Debug module has (among other goodies), a function that allows you to print a "ps"-like
listing - you have seen it in action earlier when Coro detected a deadlock.
You use it like this:
use Coro::Debug;
Coro::Debug::command "ps";
Remember the example with the two channels and a worker thread that squared numbers? Running "ps" just
after "$calculate->get" outputs something similar to this:
PID SC RSS USES Description Where
8917312 -C 22k 0 [main::] [introscript:20]
8964448 N- 152 0 [coro manager] -
8964520 N- 152 0 [unblock_sub scheduler] -
8591752 UC 152 1 [introscript:12]
11546944 N- 152 0 [EV idle process] -
Interesting - there is more going on in the background than one would expect. Ignoring the extra threads,
the main thread has pid 8917312, and the one started by "async" has pid 8591752.
The latter is also the only thread that doesn't have a description, simply because we haven't set one.
Setting one is easy, just put it into "$Coro::current->{desc}":
async {
$Coro::current->{desc} = "cruncher";
...
};
This can be rather useful when debugging a program, or when using the interactive debug shell of
Coro::Debug.