python-parallel-system-tools-pp4e

Parallel System Tools in Programming Python, 4th Edition, include forks, threads, pipes, signals, sockets, and other launching techniques.

Forking Processes

1.fork

In [1]:
import os
In [2]:
def child():
    print "hello from child", os.getpid()
    os._exit(0) # else goes back to parent loop
In [3]:
def parent():
    while True:
        #generates a copy of the calling program, 
        # it returns a different value in each copy: 
        # zero in the child process and 
        # the process ID of the new child in the parent.
        newpid = os.fork() 
        if newpid == 0:
            child()
        else:
            print "hello from parent", os.getpid(), newpid
        if raw_input() == 'q': break
In [8]:
parent()
hello from parent 8118 8360

hello from parent 8118 8366
hello from child 8360

hello from parent 8118 8373
hello from child 8366
q
hello from child 8373

2.fork-count

In [15]:
import os,time
In [27]:
def counter(count):
    for i in range(count):
        time.sleep(3)
        print "[%s] => %s" % (os.getpid(), i)
In [30]:
for i in range(5):
    pid = os.fork()
    if pid != 0:
        print "Process %d spawned" % pid
    else:
        counter(5)
        os._exit(0)
print "Main process exiting."
Process 9008 spawned
Process 9009 spawned
Process 9010 spawned
Process 9011 spawned
Process 9012 spawned
Main process exiting.
[8979] => 0
[8978] => 0
[8980] => 0
[8981] => 0
[8982] => 0
[8979] => 1[8978] => 1[8980] => 1[8981] => 1[8982] => 1




3.The fork/exec Combination

In [32]:
import os
In [34]:
parm = 0
while True:
    parm += 1
    pid = os.fork()
    if pid == 0:
        os.execlp('python','python','child.py',str(parm))
        assert False, 'error starting program'
    else:
        print "Child is", pid
        if raw_input() == 'q': break
Child is 9654

Child is 9655

Child is 9656

Child is 9657
q

Threads

1.The thread Module

1.1Basic usage

In [36]:
import thread
In [37]:
def child(tid):
    print 'Hello from thread', tid
In [38]:
def parent():
    i = 0
    while True:
        i += 1
        thread.start_new_thread(child, (i,))
        if raw_input() == 'q': break
In [39]:
parent()
Hello from thread 1

Hello from thread 2
q
Hello from thread 3

1.2 Other ways to code threads with thread

In [40]:
import thread
In [42]:
def action(i):
    print i**32
In [43]:
class Power:
    def __init__(self,i):
        self.i = i
    def action(self):
        print self.i**32
In [51]:
thread.start_new_thread(action,(2,))
Out[51]:
140621501368064
In [52]:
thread.start_new_thread(lambda: action(2), ())
4294967296
Out[52]:
140621501368064
In [56]:
obj = Power(2)
In [57]:
thread.start_new_thread(obj.action,())
Out[57]:
140621501368064

1.3 Running multiple threads

In [59]:
import thread,time
In [63]:
def counter(myId, count):
    for i in range(count):
        time.sleep(0.1)
        print '[%s] => %s' % (myId, i)
In [48]:
for i in range(5):
    thread.start_new_thread(counter,(i,3))
In [49]:
time.sleep(2)
print 'Main threading exit.'
[2] => 0
[4] => 0
[1] => 0
[0] => 0
[3] => 0
[2] => 1
[4] => 1
[1] => 1
[0] => 1
[3] => 1
[2] => 2
[4] => 2
[1] => 2
[0] => 2
[3] => 2
Main threading exit.

1.4 Synchronizing access to shared objects and names

In [32]:
import thread,time
In [33]:
def counter(myId, count):
    for i in range(count):
        time.sleep(0.1)
        mutex.acquire()
        print '[%s] => %s' % (myId, i)
        mutex.release()
In [45]:
mutex = thread.allocate_lock()
for i in range(5):
    thread.start_new_thread(counter,(i,3))
In [46]:
time.sleep(2)
print 'Main threading exit.'
[0] => 0
[1] => 0
[2] => 0
[3] => 0
[4] => 0
[0] => 1
[1] => 1
[2] => 1
[3] => 1
[4] => 1
[0] => 2
[1] => 2
[2] => 2
[3] => 2
[4] => 2
Main threading exit.

1.5 Waiting for spawned thread exits

In [19]:
# uses mutexes to know when threads are done in parent/main thread,
# instead of time.sleep; lock stdout to avoid comingled prints;
In [20]:
import thread
In [21]:
stdoutmutex = thread.allocate_lock()
In [22]:
# exitmutex = [thread.allocate_lock() for i in range(5)]
exitmutex = [False] * 5
In [23]:
def counter(myId, count):
    for i in range(count):
        time.sleep(0.1)
        stdoutmutex.acquire()
        print '[%s] => %s' % (myId, i)
        stdoutmutex.release()
#     exitmutex[myId].acquire() # signal main thread
    exitmutex[myId] = True
In [41]:
for i in range(5):
    thread.start_new_thread(counter,(i,3))
In [42]:
for mutex in exitmutex:
#     while not mutex.locked():pass
    while not True:pass
print 'Main threading exit.'
[0] => 0
[1] => 0
[2] => 0
[3] => 0
[4] => 0
[0] => 1
[1] => 1
[2] => 1
[3] => 1
[4] => 1
[0] => 2
[1] => 2
[2] => 2
[4] => 2
[3] => 2
Main threading exit.

2.The threading Module

2.1 thread-classes

In [57]:
import threading
In [66]:
class mythread(threading.Thread):          # subclass Thread object
    def __init__(self, myId, count):
        self.myId = myId
        self.count = count
        threading.Thread.__init__(self)
    def run(self):
        for i in range(self.count):
            stdoutmutex.acquire()
            print '[%s] => %s' % (self.myId, i)
            stdoutmutex.release()
In [67]:
stdoutmutex = threading.Lock() # same as thread.allocate_lock( )
In [68]:
threads = []
for i in range(5):
    thread = mythread(i, 3)
    thread.start()
    threads.append(thread)
for thread in threads:
    thread.join()
print 'Main threading exit.'
[0] => 0
[0] => 1
[0] => 2
[1] => 0
[1] => 1
[1] => 2
[2] => 0
[2] => 1
[2] => 2
[3] => 0
[3] => 1
[3] => 2
[4] => 0
[4] => 1
[4] => 2
Main threading exit.

2.2 Other ways to code threads with threading

In [94]:
import threading
# subclass with state
class mythread(threading.Thread):
    def __init__(self, i):
        self.i = i
        threading.Thread.__init__(self)
    def run(self):
        print self.i**32
mythread(2).start( )
4294967296
4294967296
In [95]:
def action(i):
    print i**32
In [96]:
# pass action in
thread = threading.Thread(target=(lambda: action(i)))
thread.start( )
In [98]:
# same but no lambda wrapper for state
threading.Thread(target=action, args=(2,)).start( )

# basic thread module
import thread
thread.start_new_thread(action, (2,))
Out[98]:
140349501798144
In [99]:
# a non-thread class with state, OOP
class Power:
    def __init__(self, i):
        self.i = i
    def action(self):
        print(self.i ** 32)
obj = Power(2)
threading.Thread(target=obj.action).start() # thread runs bound method
4294967296
4294967296
In [101]:
# nested scope to retain state
def action(i):
    def power():
        print(i ** 32)
    return power
threading.Thread(target=action(2)).start() # thread runs returned function
# both with basic thread module
thread.start_new_thread(obj.action, ())
thread.start_new_thread(action(2), ()) # thread runs a callable object
Out[101]:
140349510190848

2.3 Synchronizing access to shared objects and names revisited

In [128]:
import threading
count = 0
def adder():
    global count
    count += 1
#     time.sleep(0.5)
    count += 1
threads = []
for i in range(100):
    thread = threading.Thread(target=adder, args=())
    thread.start()
    threads.append(thread)
for thread in threads: thread.join()
print count
200
In [130]:
#prints 200 each time, because shared resource access synchronized
count = 0
def adder(addlock):
    global count
    with addlock:
        count += 1
#     time.sleep(0.5)
    with addlock:
        count += 1
addlock = threading.Lock()
threads = []
for i in range(100):
    thread = threading.Thread(target=adder, args=(addlock,))
    thread.start()
    threads.append(thread)
for thread in threads: thread.join()
print count
200

The queue Module

In [131]:
#producer and consumer threads communicating with a shared queue
In [2]:
numconsumers = 2 # how many consumers to start
numproducers = 4  # how many producers to start
nummessages = 4  # messages per producer to put
In [3]:
import thread, Queue, time
safeprint = thread.allocate_lock()  # else print may overlap
dataQueue = Queue.Queue()        # shared global. infinite size

def producer(idnum):
    for msgnum in range(nummessages):
        time.sleep(idnum)
        dataQueue.put('[producer id=%d, count=%d]' % (idnum, msgnum))
def consumer(idnum):
    while True:
        time.sleep(0.1)
        try:
            data = dataQueue.get(block=False)
        except Queue.Empty:
            pass
        else:           
            safeprint.acquire( )
            print 'consumer', idnum, 'got =>', data
            safeprint.release( )

if __name__ == '__main__':
    for i in range(numconsumers):
        thread.start_new_thread(consumer, (i,))
    for i in range(numproducers):
        thread.start_new_thread(producer, (i,))
    time.sleep(((numproducers-1) * nummessages) + 1)
    print('Main thread exit.')
consumer 0 got => [producer id=0, count=0]
consumer 1 got => [producer id=0, count=1]
consumer 0 got => [producer id=0, count=2]
consumer 1 got => [producer id=0, count=3]
consumer 0 got => [producer id=1, count=0]
consumer 0 got => [producer id=2, count=0]
consumer 1 got => [producer id=1, count=1]
consumer 0 got => [producer id=3, count=0]
consumer 1 got => [producer id=1, count=2]
consumer 0 got => [producer id=2, count=1]
consumer 1 got => [producer id=1, count=3]
consumer 0 got => [producer id=2, count=2]
consumer 1 got => [producer id=3, count=1]
consumer 0 got => [producer id=2, count=3]
consumer 0 got => [producer id=3, count=2]
consumer 0 got => [producer id=3, count=3]
Main thread exit.
In [10]:
#same as queuetest.py, by queue object pass in as argument, not global
numconsumers = 2                  # how many consumers to start
numproducers = 4                  # how many producers to start
nummessages  = 4                  # messages per producer to put

import thread, Queue, time
safeprint = thread.allocate_lock()    # else prints may overlap
dataQueue = Queue.Queue()             # shared global, infinite size

def producer(idnum, dataqueue):
    for msgnum in range(nummessages):
        time.sleep(idnum)
        dataqueue.put('[producer id=%d, count=%d]' % (idnum, msgnum))

def consumer(idnum, dataqueue):
    while True:
        time.sleep(0.1)
        try:
            data = dataqueue.get(block=False)
        except Queue.Empty:
            pass
        else:
            with safeprint:
                print 'consumer', idnum, 'got =>', data

if __name__ == '__main__':
    for i in range(numconsumers):
        thread.start_new_thread(consumer, (i, dataQueue))
    for i in range(numproducers):
        thread.start_new_thread(producer, (i, dataQueue))
    time.sleep(((numproducers-1) * nummessages) + 1)
    print 'Main thread exit.'
consumer 1 got => [producer id=0, count=0]
consumer 0 got => [producer id=0, count=1]
consumer 1 got => [producer id=0, count=2]
consumer 0 got => [producer id=0, count=3]
consumer 0 got => [producer id=1, count=0]
consumer 1 got => [producer id=2, count=0]
consumer 0 got => [producer id=1, count=1]
consumer 1 got => [producer id=3, count=0]
consumer 0 got => [producer id=1, count=2]
consumer 1 got => [producer id=2, count=1]
consumer 1 got => [producer id=1, count=3]
consumer 1 got => [producer id=3, count=1]
consumer 1 got => [producer id=2, count=2]
consumer 1 got => [producer id=2, count=3]
consumer 1 got => [producer id=3, count=2]
consumer 1 got => [producer id=3, count=3]
Main thread exit.
In [12]:
#same as queuetest2.py, but uses threading, not _threads
numconsumers = 2                  # how many consumers to start
numproducers = 4                  # how many producers to start
nummessages  = 4                  # messages per producer to put

import threading, Queue, time
safeprint = threading.Lock()          # else prints may overlap
dataQueue = Queue.Queue()             # shared global, infinite size

def producer(idnum, dataqueue):
    for msgnum in range(nummessages):
        time.sleep(idnum)
        dataqueue.put('[producer id=%d, count=%d]' % (idnum, msgnum))

def consumer(idnum, dataqueue):
    while True:
        time.sleep(0.1)
        try:
            data = dataqueue.get(block=False)
        except Queue.Empty:
            pass
        else:
            with safeprint:
                print 'consumer', idnum, 'got =>', data

if __name__ == '__main__':
    for i in range(numconsumers):
        thread = threading.Thread(target=consumer, args=(i, dataQueue))
        thread.daemon = True  # else cannot exit!
        thread.start()

    waitfor = []
    for i in range(numproducers):
        thread = threading.Thread(target=producer, args=(i, dataQueue))
        waitfor.append(thread)
        thread.start()

    for thread in waitfor: thread.join()    # or time.sleep() long enough here
    print 'Main thread exit.'
consumer 0 got => [producer id=3, count=3]
consumer 1 got => [producer id=0, count=0]
consumer 0 got => [producer id=0, count=1]
consumer 0 got => [producer id=0, count=2]
consumer 1 got => [producer id=0, count=3]
consumer 1 got => [producer id=1, count=0]
consumer 0 got => [producer id=1, count=1]
consumer 1 got => [producer id=2, count=0]
consumer 0 got => [producer id=1, count=2]
consumer 1 got => [producer id=3, count=0]
consumer 0 got => [producer id=1, count=3]
consumer 0 got => [producer id=2, count=1]
consumer 0 got => [producer id=2, count=2]
consumer 0 got => [producer id=3, count=1]
consumer 0 got => [producer id=2, count=3]
consumer 0 got => [producer id=3, count=2]
Main thread exit.

Program Exits

1.Process Exit Status and Shared State

In [2]:
import os
In [3]:
exitstat = 0
In [7]:
def child():
    global exitstat
    exitstat += 1
    print 'Hello from child', os.getpid(), exitstat
    os._exit(exitstat)
    print 'never reached'
In [12]:
def parent():
    while True:
        newpid = os.fork()
        if newpid == 0:
            child()
        else:
            pid, status = os.wait()
            print 'Parent got', pid, status, (status >> 8)
            if raw_input() == 'q': break
In [13]:
parent()
Parent got 4367 256 1
Hello from child 4367 1

Parent got 4373 256 1
Hello from child 4373 1

Parent got 4379 256 1
Hello from child 4379 1

Parent got 4385 256 1
Hello from child 4385 1

Parent got 4391 256 1
Hello from child 4391 1
q

2. Thread Exits and Shared State

In [20]:
import thread
In [21]:
exitstat = 0
In [22]:
def child():
    global exitstat
    exitstat += 1
    threadid = thread.get_ident()
    print 'Hello from child', threadid, exitstat
    thread.exit()
    print 'never reached'
def parent():
    while True:
        thread.start_new_thread(child, ())
        if raw_input() == 'q': break
if __name__ == '__main__': parent()
Hello from child 140613521495808 1

Hello from child 140613521495808 2
q
Hello from child 140613521495808 3
In [1]:
# As we’ve learned, a thread normally exits silently when the function it runs returns,
# and the function return value is ignored. Optionally, the _thread.exit function can be
# called to terminate the calling thread explicitly and silently. This call works almost
# exactly like sys.exit (but takes no return status argument), and it works by raising a
# SystemExit exception in the calling thread. Because of that, a thread can also prema-
# turely end by calling sys.exit or by directly raising SystemExit . Be sure not to call
# os._exit within a thread function, though—doing so can have odd results (the last time
# I tried, it hung the entire process on my Linux system and killed every thread in the
# process on Windows!).
In [2]:
# keep in mind that threads and processes have default lifespan models,
# which we explored earlier. By way of review, when child threads are still running, the
# two thread modules’ behavior differs—programs on most platforms exit when the pa-
# rent thread does under _thread , but not normally under threading unless children are
# made daemons. When using processes, children normally outlive their parent. This
# different process behavior makes sense if you remember that threads are in-process
# function calls, but processes are more independent and autonomous.
# When used well, exit status can be used to implement error detection and simple com-
# munication protocols in systems composed of command-line scripts. But having said
# that, I should underscore that most scripts do simply fall off the end of the source to
# exit, and most thread functions simply return; explicit exit calls are generally employed
# for exceptional conditions and in limited contexts only.

Interprocess Communication

1. Anonymous pipe

1.1 Anonymous pipe basics

In [28]:
import os, time
In [29]:
def child(pipeout):
    zzz = 0
    while True:
        time.sleep(zzz)                                    #make parent wait
        msg = ('Spam %03d' % zzz).encode() #pipes are binary bytes
        os.write(pipeout, msg)                       #send to parent
        zzz = (zzz+1) % 5                                #goto 0 after 4
In [32]:
def parent():
    pipein,pipeout = os.pipe()
    if os.fork == 0:
        child(pipeout)
    else:
        while True:
            line = os.read(pipein, 32)
            # blocks until data sent
            print 'Parent %d got [%s] at %s' % (os.getpid(), line, time.time())
In [34]:
parent()

1.2 Wrapping pipe descriptors in file objects

In [39]:
# same as pipe1.py, but wrap pipe input in stdio file object
# to read by line, and close unused pipe fds in both processes
import os, time
def child(pipeout):
    zzz = 0
    while True:
        time.sleep(zzz)
        msg = ('Spam %03d\n' % zzz).encode()
        os.write(pipeout, msg)
        zzz = (zzz+1) % 5
def parent():
    pipein, pipeout = os.pipe()
    if os.fork() == 0:
        os.close(pipein)
        child(pipeout)
    else:
        os.close(pipeout)
        pipein = os.fdopen(pipein)
        while True:
            line = pipein.readline()[:-1]
            print('Parent %d got [%s] at %s' % (os.getpid(), line, time.time()))
In [38]:
parent()

1.1.3 Anonymous pipes and threads

In [1]:
import os, time, threading
In [2]:
def child(pipeout):
    zzz = 0
    while True:
        time.sleep(zzz)
        msg = ("Spam %03d" % zzz).encode()
        os.write(pipeout, msg)
        zzz = (zzz +1) % 5
In [3]:
def parent(pipein):
    while True:
        line = os.read(pipein, 32)
        print 'Parent %d got [%s] at %s' % (os.getpid(), line, time.time())
In [4]:
pipein ,pipeout = os.pipe()
In [5]:
threading.Thread(target=child, args=(pipeout,)).start()
In [7]:
parent(pipein)
Parent 3257 got [Spam 003Spam 004Spam 000Spam 001] at 1400549465.95
Parent 3257 got [Spam 002Spam 003Spam 004Spam 000] at 1400549465.95
Parent 3257 got [Spam 001Spam 002Spam 003] at 1400549465.95
Parent 3257 got [Spam 004] at 1400549468.75
Parent 3257 got [Spam 000] at 1400549468.76
Parent 3257 got [Spam 001] at 1400549469.76
Parent 3257 got [Spam 002] at 1400549471.76
Parent 3257 got [Spam 003] at 1400549474.76
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-7-a47c722744b8> in <module>()
----> 1 parent(pipein)

<ipython-input-3-218f40ee8ea0> in parent(pipein)
      1 def parent(pipein):
      2     while True:
----> 3         line = os.read(pipein, 32)
      4         print 'Parent %d got [%s] at %s' % (os.getpid(), line, time.time())

KeyboardInterrupt: 

1.1.4 Bidirectional IPC with anonymous pipes

In [9]:
# spawn a child process/program, connect my stdin/stdout to child process's
# stdout/stdin--my reads and writes map to output and input streams of the
# spawned program; much like tying together streams with subprocess module;
In [17]:
import sys, os
In [19]:
def spawn(prog, *args):                       # pass progname, cmdline args
    stdinFd  = sys.stdin.fileno()             # get descriptors for streams
    stdoutFd = sys.stdout.fileno()            # normally stdin=0, stdout=1

    parentStdin, childStdout  = os.pipe()     # make two IPC pipe channels
    childStdin,  parentStdout = os.pipe()     # pipe returns (inputfd, outoutfd)
    pid = os.fork()                           # make a copy of this process
    if pid:
        os.close(childStdout)                 # in parent process after fork:
        os.close(childStdin)                  # close child ends in parent
        os.dup2(parentStdin,  stdinFd)        # my sys.stdin copy  = pipe1[0]
        os.dup2(parentStdout, stdoutFd)       # my sys.stdout copy = pipe2[1]
    else:
        os.close(parentStdin)                 # in child process after fork:
        os.close(parentStdout)                # close parent ends in child
        os.dup2(childStdin,  stdinFd)         # my sys.stdin copy  = pipe2[0]
        os.dup2(childStdout, stdoutFd)        # my sys.stdout copy = pipe1[1]
        args = (prog,) + args
        os.execvp(prog, args)                 # new program in this process
        assert False, 'execvp failed!'        # os.exec call never returns here

if __name__ == '__main__':
    mypid = os.getpid()
    spawn('python', '-u', 'pipes-testchild.py', 'spam')     # fork child program

    print('Hello 1 from parent', mypid)               # to child's stdin
    sys.stdout.flush()                                # subvert stdio buffering
    reply = input()                                   # from child's stdout
    sys.stderr.write('Parent got: "%s"\n' % reply)    # stderr not tied to pipe!

    print('Hello 2 from parent', mypid)
    sys.stdout.flush()
    reply = sys.stdin.readline()
    sys.stderr.write('Parent got: "%s"\n' % reply[:-1])
---------------------------------------------------------------------------
UnsupportedOperation                      Traceback (most recent call last)
<ipython-input-19-8b7123926819> in <module>()
     22 if __name__ == '__main__':
     23     mypid = os.getpid()
---> 24     spawn('python', '-u', 'pipes-testchild.py', 'spam')     # fork child program
     25 
     26     print('Hello 1 from parent', mypid)               # to child's stdin

<ipython-input-19-8b7123926819> in spawn(prog, *args)
      1 def spawn(prog, *args):                       # pass progname, cmdline args
      2     stdinFd  = sys.stdin.fileno()             # get descriptors for streams
----> 3     stdoutFd = sys.stdout.fileno()            # normally stdin=0, stdout=1
      4 
      5     parentStdin, childStdout  = os.pipe()     # make two IPC pipe channels

/usr/local/lib/python2.7/dist-packages/IPython/kernel/zmq/iostream.pyc in fileno(self)
    185 
    186     def fileno(self):
--> 187         raise UnsupportedOperation("IOStream has no fileno.")
    188 
    189     def write(self, string):

UnsupportedOperation: IOStream has no fileno.
In [14]:
import os, time, sys
mypid     = os.getpid()
parentpid = os.getppid()
sys.stderr.write('Child %d of %d got arg: "%s"\n' % (mypid, parentpid, sys.argv[1]))
for i in range(2):
    time.sleep(3)              # make parent process wait by sleeping here
    recv = raw_input()             # stdin tied to pipe: comes from parent's stdout
    time.sleep(3)
    send = 'Child %d got: [%s]' % (mypid, recv)
    print send                # stdout tied to pipe: goes to parent's stdin
    sys.stdout.flush()         # make sure it's sent now or else process blocks
Child 3257 of 2729 got arg: "-f"
ss
Child 3257 got: [ss]
rt
Child 3257 got: [rt]

1.1.5 Output stream buffering revisited: Deadlocks and flushes

In [20]:
# Deadlock in general, though, is a bigger problem than we have space to address fully
# here. On the other hand, if you know enough that you want to do IPC in Python, you’re
# probably already a veteran of the deadlock wars.

2 Named Pipes (Fifos)

In [23]:
# named pipes; os.mkfifo is not available on Windows (without Cygwin); 
# there is no reason to fork here, since fifo file pipes are external 
# to processes--shared fds in parent/child processes are irrelevent;
In [24]:
import os, time, sys
fifoname = '/tmp/pipefifo'                       # must open same name

def child():
    pipeout = os.open(fifoname, os.O_WRONLY)     # open fifo pipe file as fd
    zzz = 0
    while True:
        time.sleep(zzz)
        msg = ('Spam %03d\n' % zzz).encode()     # binary as opened here
        os.write(pipeout, msg)
        zzz = (zzz+1) % 5

def parent():
    pipein = open(fifoname, 'r')                 # open fifo as text file object
    while True:
        line = pipein.readline()[:-1]            # blocks until data sent
        print('Parent %d got "%s" at %s' % (os.getpid(), line, time.time()))
In [26]:
if __name__ == '__main__':
    if not os.path.exists(fifoname):
        os.mkfifo(fifoname)                      # create a named pipe file
    if len(sys.argv) == 1:
        parent()                                 # run as parent if no args
    else:                                        # else run as child process
        child()

3 Sockets

In [ ]:
# sockets for cross-task communication: start threads to communicate over sockets;
# independent programs can too, because sockets are system-wide, much like fifos;
# see the GUI and Internet parts of the book for more realistic socket use cases;
# some socket servers may also need to talk to clients in threads or processes;
# sockets pass byte strings, but can be pickled objects or encoded Unicode text;
# caveat: prints in threads may need to be synchronized if their output overlaps;
In [27]:
from socket import socket, AF_INET, SOCK_STREAM     # portable socket api

port = 50008                 # port number identifies socket on machine
host = 'localhost'           # server and client run on same local machine here

def server():
    sock = socket(AF_INET, SOCK_STREAM)         # ip addresses tcp connection
    sock.bind(('', port))                       # bind to port on this machine
    sock.listen(5)                              # allow up to 5 pending clients
    while True:
        conn, addr = sock.accept()              # wait for client to connect
        data = conn.recv(1024)                  # read bytes data from this client
        reply = 'server got: [%s]' % data       # conn is a new connected socket
        conn.send(reply.encode())               # send bytes reply back to client

def client(name):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.connect((host, port))                  # connect to a socket port
    sock.send(name.encode())                    # send bytes data to listener
    reply = sock.recv(1024)                     # receive bytes data from listener
    sock.close()                                # up to 1024 bytes in message
    print('client got: [%s]' % reply)
In [29]:
if __name__ == '__main__':
    from threading import Thread
    sthread = Thread(target=server)
    sthread.daemon = True                       # don't wait for server thread
    sthread.start()                             # do wait for children to exit
    for i in range(5): 
         Thread(target=client, args=('client%s' % i,)).start()
client got: [server got: [client4]]

4 Signals

In [31]:
# # catch signals in Python; pass signal number N as a command-line arg,
# # use a "kill -N pid" shell command to send this process a signal;  most
# # signal handlers restored by Python after caught (see network scripting
# # chapter for SIGCHLD details); on Windows, signal module is available,
# # but it defines only a few signal types there, and os.kill is missing;

import sys, signal, time
def now(): return time.ctime(time.time())        # current time string

def onSignal(signum, stackframe):                # python signal handler
    print('Got signal', signum, 'at', now())     # most handlers stay in effect

signum = int(sys.argv[1])
signal.signal(signum, onSignal)                  # install signal handler
while True: signal.pause()                       # wait for signals (or: pass)
In [32]:
# set and catch alarm timeout signals in Python; time.sleep doesn't play
# well with alarm (or signal in general in my Linux PC), so we call
# signal.pause here to do nothing until a signal is received;

import sys, signal, time
def now(): return time.asctime()

def onSignal(signum, stackframe):                 # python signal handler
    print('Got alarm', signum, 'at', now())       # most handlers stay in effect

while True:
    print('Setting at', now())
    signal.signal(signal.SIGALRM, onSignal)       # install signal handler
    signal.alarm(5)                               # do signal in 5 seconds
    signal.pause()                                # wait for signals
('Setting at', 'Tue May 20 15:58:01 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:06 2014')
('Setting at', 'Tue May 20 15:58:06 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:11 2014')
('Setting at', 'Tue May 20 15:58:11 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:16 2014')
('Setting at', 'Tue May 20 15:58:16 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:21 2014')
('Setting at', 'Tue May 20 15:58:21 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:26 2014')
('Setting at', 'Tue May 20 15:58:26 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:31 2014')
('Setting at', 'Tue May 20 15:58:31 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:36 2014')
('Setting at', 'Tue May 20 15:58:36 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:41 2014')
('Setting at', 'Tue May 20 15:58:41 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:46 2014')
('Setting at', 'Tue May 20 15:58:46 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:51 2014')
('Setting at', 'Tue May 20 15:58:51 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:58:56 2014')
('Setting at', 'Tue May 20 15:58:56 2014')
('Got alarm', 14, 'at', 'Tue May 20 15:59:01 2014')
('Setting at', 'Tue May 20 15:59:01 2014')
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-32-120c29997daa> in <module>()
     13     signal.signal(signal.SIGALRM, onSignal)       # install signal handler
     14     signal.alarm(5)                               # do signal in 5 seconds
---> 15     signal.pause()                                # wait for signals

KeyboardInterrupt: 

The multiprocessing Module

1 The Basics: Processes and Locks

In [33]:
"""
multiprocess basics: Process works like threading.Thread, but 
runs function call in parallel in a process instead of a thread;
locks can be used to synchronize, e.g. prints on some platforms;
starts new interpreter on windows, forks a new process on unix;
"""

import os
from multiprocessing import Process, Lock

def whoami(label, lock):
    msg = '%s: name:%s, pid:%s'
    with lock:
        print(msg % (label, __name__, os.getpid()))

if __name__ == '__main__':
    lock = Lock()
    whoami('function call', lock)

    p = Process(target=whoami, args=('spawned child', lock))
    p.start()
    p.join()

    for i in range(5):
        Process(target=whoami, args=(('run process %s' % i), lock)).start()

    with lock:
        print('Main process exit.')
('Got alarm', 14, 'at', 'Tue May 20 15:59:06 2014')
function call: name:__main__, pid:3257
spawned child: name:__main__, pid:5196
Main process exit.
run process 0: name:__main__, pid:5204
run process 1: name:__main__, pid:5205
run process 2: name:__main__, pid:5206

2 IPC Tools: Pipes, Shared Memory, and Queues

2.1 multiprocessing pipes

In [ ]:
# Use multiprocess anonymous pipes to communicate. Returns 2 connection
# object representing ends of the pipe: objects are sent on one end and
# received on the other, though pipes are bidirectional by default

from multiprocessing import Process, Pipe

def sender(pipe):
    pipe.send(['spam'] + [42, 'eggs'])
    pipe.close()

def talker(pipe):
    pipe.send(dict(name='bob', spam=42))
    reply = pipe.recv()
    print 'talk got:', reply

if __name__ == '__main__':
    parentEnd, childEnd = Pipe()
    Process(target=sender,args=(childEnd,)).start()
    print 'parent got:', parentEnd.recv()
    parentEnd.close()

    parentEnd, childEnd = Pipe()
    child = Process(target=talker,args=(parentEnd,))
    child.start()
    print 'parent got:', parentEnd.recv()
    parentEnd.send([x *2 for x in 'spam'])
    child.join()
    print 'parent exit'

2.2 Shared memory and globals

In [37]:
"""
Use multiprocess shared memory objects to communicate.
Passed objects are shared, but globals are not on Windows.
Last test here reflects common use case: distributing work.
"""

import os
from multiprocessing import Process, Value, Array

procs = 3
count = 0    # per-process globals, not shared 

def showdata(label, val, arr):
    """
    print data values in this process
    """
    msg = '%-12s: pid:%4s, global:%s, value:%s, array:%s'
    print(msg % (label, os.getpid(), count, val.value, list(arr)))

def updater(val, arr):
    """
    communicate via shared memory
    """
    global count
    count += 1                         # global count not shared
    val.value += 1                     # passed in objects are
    for i in range(3): arr[i] += 1

In [38]:
if __name__ == '__main__':
    scalar = Value('i', 0)             # shared memory: process/thread safe
    vector = Array('d', procs)         # type codes from ctypes: int, double

    # show start value in parent process
    showdata('parent start', scalar, vector)

    # spawn child, pass in shared memory
    p = Process(target=showdata, args=('child ', scalar, vector))
    p.start(); p.join()

    # pass in shared memory updated in parent, wait for each to finish
    # each child sees updates in parent so far for args (but not global)

    print('\nloop1 (updates in parent, serial children)...')
    for i in range(procs):
        count += 1
        scalar.value += 1
        vector[i] += 1
        p = Process(target=showdata, args=(('process %s' % i), scalar, vector))
        p.start(); p.join()

    # same as prior, but allow children to run in parallel
    # all see the last iteration's result because all share objects

    print('\nloop2 (updates in parent, parallel children)...')
    ps = []
    for i in range(procs):
        count += 1
        scalar.value += 1
        vector[i] += 1
        p = Process(target=showdata, args=(('process %s' % i), scalar, vector))
        p.start()
        ps.append(p)
    for p in ps: p.join()

    # shared memory updated in spawned children, wait for each 

    print('\nloop3 (updates in serial children)...')
    for i in range(procs):
        p = Process(target=updater, args=(scalar, vector))
        p.start()
        p.join()
    showdata('parent temp', scalar, vector)
    
    # same, but allow children to update in parallel

    ps = []
    print('\nloop4 (updates in parallel children)...')
    for i in range(procs):
        p = Process(target=updater, args=(scalar, vector))
        p.start()
        ps.append(p)
    for p in ps: p.join()
    # global count=6 in parent only  
    # show final results here
    # scalar=12:  +6 parent, +6 in 6 children
    showdata('parent end', scalar, vector) # array[i]=8: +2 parent, +6 in 6 children
parent start: pid:3257, global:0, value:0, array:[0.0, 0.0, 0.0]

loop1 (updates in parent, serial children)...
child       : pid:5361, global:0, value:0, array:[0.0, 0.0, 0.0]
process 0   : pid:5369, global:1, value:1, array:[1.0, 0.0, 0.0]
process 1   : pid:5377, global:2, value:2, array:[1.0, 1.0, 0.0]

loop2 (updates in parent, parallel children)...
process 2   : pid:5385, global:3, value:3, array:[1.0, 1.0, 1.0]

loop3 (updates in serial children)...
process 0   : pid:5393, global:4, value:6, array:[2.0, 2.0, 2.0]
process 2   : pid:5397, global:6, value:6, array:[2.0, 2.0, 2.0]
process 1   : pid:5394, global:5, value:6, array:[2.0, 2.0, 2.0]
parent temp : pid:3257, global:6, value:9, array:[5.0, 5.0, 5.0]

loop4 (updates in parallel children)...
parent end  : pid:3257, global:6, value:12, array:[8.0, 8.0, 8.0]

2.3 Queues and subclassing

In [46]:
import os, time, Queue
from multiprocessing import Process, Queue as queue
class Counter(Process):
    """docstring for Counter"""
    label = ' @'
    def __init__(self, start, Queue):
        self.state = start
        self.post  = Queue
        Process.__init__(self)
    def run(self):
        for i in range(3):
            time.sleep(3)
            self.state += 1
            print self.label, self.pid, self.state
            self.post.put(self.pid, self.state)
        print self.label, self.pid, '-'

if __name__ == '__main__':
    print 'start', os.getpid()
    expected = 9
    post = queue()
    p = Counter(0,post)
    q = Counter(100,post)
    r = Counter(1000,post)
    p.start(); q.start(); r.start()

    while expected:                             # parent consumes data on queue
        time.sleep(0.5)                         # this is essentially like a GUI,
        try:                                    # though GUIs often use threads
            data = post.get(block=False)
        except Queue.Empty:
            print 'no data...'
        else:
            print 'posted:', data
            expected -= 1

    p.join(); q.join(); r.join()                # must get before join putter
    print 'finish', os.getpid(), r.exitcode   # exitcode is child exit status
start 3257
no data...
no data...
no data...
no data...
no data...
no data... @ 5847 1

posted: @ 5848 101
 @ 5849 1001
 5848
posted: 5847
posted: 5849
no data...
no data...
posted: @ @ 5847 2
 5848 102
 5847
posted: @ 5849 1002
 5848
posted: 5849
no data...
no data...
no data...
posted: @ @ @ 5847 3
 5848 103
 5849 1003
 @ 5847 -
 @ 5848 -
 @ 5849 -
 5847
posted: 5848
posted: 5849
finish 3257 0

3 Starting Independent Programs

In [57]:
"Use multiprocessing to start independent programs, os.fork or not"
import os
from multiprocessing import Process

def runp(arg):
    os.execlp('python', 'python', 'child.py', str(arg))

if __name__ == '__main__':
    for i in range(5):
        Process(target=runp, args=(i,)).start()
    print('parent exit')
parent exit

4 process pools

In [61]:
from multiprocessing import Pool

def powers(x):
    return 2 ** x

if __name__ == '__main__':
    workers = Pool(processes=5)

    results = workers.map(powers,[2]*100)
    print results[:16]
    print results[-2:]

    results = workers.map(powers,range(10))
    print results[:16]
    print results[-2:]
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
[4, 4]
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
[256, 512]