Threads in Python

Allgemeine Definition eines Thread

Threads
Ein Thread wird häufig auch als leichtgewichtiger Prozess bezeichnet. Allgemein wird durch einen Thread in der Informatik ein Ausführungsstrang oder eine Ausführungsreihenfolge in der Abarbeitung eines Programmes bezeichnet.

Es werden zwei Arten von Threads unerschieden. Kernelthreads laufen als Teil des Betriebssystemes ab, während User-Threads nicht im Kern des Betriebssystems implementiert sind.

Im gewissen Sinne kann man User Threads auch als eine Erweiterung des Funktions-Konzeptes einer Programmiersprache auffassen. Man kann einen Thread somit wie einen Funktionsaufruf oder Prozeduraufruf sehen. Ein User Thread entspricht in dieser Sichtweise einer Prozedur, die von anderer Stelle aufgerufen wird (über das explizite Scheduling genau dieses User Threads). Insbesondere in Ihrem Rückkehrverhalten unterscheiden sie sich aber deutlich von normalen Funktionen oder Prozeduren.

Threads und globale Variablen Standardmäßig besitzt jeder Prozess mindestens einen Thread, gewissermaßen den Prozess selbst. Ein Prozess kann mehrere Threads starten. Vom Betriebssystem werden diese wie Prozesse scheinbar gleichzeitig ausgeführt.

Der Vorteil von Threads gegenüber Prozessen besteht darin, dass sich die Threads eines Prozesses denselben Speicherbereich für globale Variablen teilen. Verändert ein Thread eine globale Variable, ist der neue Wert auch in dieser Variablen sofort für alle anderen Threads des Prozesses sichtbar. Ein Thread hat aber auch eigene lokale Variablen.

Die Verwaltung von Threads ist für das Betriebssystem einfacher, weshalb Threads auch als Leichtgewichtprozesse bezeichnet werden.

Threads in Python

Zwei Module unterstützen in Python die Benutzung von Threads: Das Modul thread betrachtet einen Thread als Funktion, während threading objektorientiert implementiert ist, und jeder Thread einem eigenen Objekt entspricht.

Das thread-Modul

Mit dem Modul thread kann man einzelne Funktionen in einem separaten Thread ausführen lassen. Dazu gibt es die Funktion thread.start_new_thread:

thread.start_new_thread(function, args[, kwargs])

function ist eine Referenz auf die auszuführende Funktion args ist ein Tupel mit den Parametern für die Funktion function. Der optionale Parameter kwargs kann ein Dictionary mit zusätzlichen Schlüsselwertparametern enthalten. Der Rückgabewert von start_new_thread() ist eine den Thread eindeutig identifizierende Zahl. Nachdem function verlassen wurde, wird der Thread automatisch gelöscht.

Beispiel für einen Thread in Python:
from thread import start_new_thread

def heron(a):
    """Berechnet die Wurzel von a"""
    eps = 0.0000001
    old = 1
    new = 1
    while True:
        old,new = new, (new + a/new) / 2.0
        print old, new
        if abs(new - old) < eps:
            break
    return new

start_new_thread(heron,(99,))
start_new_thread(heron,(999,))
start_new_thread(heron,(1733,))

c = raw_input("Eingabe.")
raw_input() im vorigen Beispiel ist notwendig, da alle Threads sofort abgebrochen werden, wenn das Hauptprogramm beendet ist. raw_input() bewirkt ein Warten.

Wir erweitern das vorige Beispiel mit einem Zähler für die Threads.
from thread import start_new_thread

num_threads = 0
def heron(a):
    global num_threads
    num_threads += 1
    
    # code ausgelassen
    num_threads -= 1
    return new

start_new_thread(heron,(99,))
start_new_thread(heron,(999,))
start_new_thread(heron,(1733,))
start_new_thread(heron,(17334,))

while num_threads > 0:
    pass
Aber das Skript funktioniert nicht so, wie wir es vielleicht erwarten. Was ist falsch?
Die abschließende While-Schleife wird bereits erreicht bevor einer der Threads gestartet werden konnte. Dies geschieht deshalb, weil dadurch num_threads nicht erhöht werden kann bevor die while-Schleife erreicht wird.

Aber es gibt noch ein weiteres schwerwiegenderes Problem:
Das Problem liegt in den Zuweisungen
num_threads += 1
und
num_threads -= 1
die nicht atomar sind. Sie bestehen im Prinzip aus drei Aktionen: Lesen des Wertes von num_threads, dann wird eine neue Instanz mit dem um eins erhöhten oder verminderten Wert gebildet der neue Wert muss wieder der globalen Variablen num_threads zugewiesen werden.

Fehler können wie folgt passieren:
Der erste Thread liest Variable num_threads ein, die noch den Wert 0 hat, dann geht er "schlafen". Dann liest auch der zweite Thread die Variable num_threads ein, die immer noch den Wert 0 hat, da der erste Thread sie nicht mehr erhöhen konnte. Nun liest auch der dritte Thread die Variable num_threads ein, die immer noch den Wert 0 hat, da der erste und der zweite Thread sie nicht mehr erhöhen konnten. Die drei Threads speichern anschließend jeweils eine 1 ab, falls nicht zwischenzeitlich einer der anderen Threads die Variable bereits durch die Anweisung num_threads -= 1 erniedrigt hatte.

Lösung

Probleme der vorherigen Art kann man dadurch lösen, indem man "kritische Abschnitte" (Critical Sections) mit Lock-Objekten markiert. Sie werden dadurch atomar, d.h. sie können nicht aufgesplittet werden und müssen als Ganzes ausgeführt werden, bevor ein anderer Thread weiterarbeiten darf. Mit der Funktion thread.allocate_lock kann ein neues Lock-Objekt erzeugt werden:

lock_objekt = thread.allocate_lock()

Der Beginn einer "critical Section" wird mit lock_object.acquire() markiert und das Ende mit lock_object.release().
Die Lösung mit Locks sieht nun wie folgt aus:
from thread import start_new_thread, allocate_lock
num_threads = 0
thread_started = False
lock = allocate_lock()
def heron(a):
    global num_threads, thread_started
    lock.acquire()
    num_threads += 1
    thread_started = True
    lock.release()
    ...
    lock.acquire()
    num_threads -= 1
    lock.release()
    return new

start_new_thread(heron,(99,))
start_new_thread(heron,(999,))
start_new_thread(heron,(1733,))

while not thread_started:
    pass
while num_threads > 0:
    pass

threading-Modul

Das Modul threading wollen wir mit einem Beispiel einführen. Der dort implementierte Thread tut recht wenig, d.h. er schäft für 5 Sekunden und gibt entsprechende Meldungen aus:
import time
from threading import Thread

def sleeper(i):
    print "thread %d sleeps for 5 seconds" % i
    time.sleep(5)
    print "thread %d woke up" % i

for i in range(10):
    t = Thread(target=sleeper, args=(i,))
    t.start()
Zur Erklärung der Arbeitsweise der threding.Thread-Klasse: Die Klasse threading.Thread hat eine Methode start(), die einen Thread startet. Sie stößt die Methode run() an, die man überladen muss. Die join()-Methode stellt sicher, dass das Hauptprogramm wartet, bis alle Threads terminiert haben.

Die Ausgabe des vorigen Skriptes sieht wie folgt aus:
thread 0 sleeps for 5 seconds
thread 1 sleeps for 5 seconds
thread 2 sleeps for 5 seconds
thread 3 sleeps for 5 seconds
thread 4 sleeps for 5 seconds
thread 5 sleeps for 5 seconds
thread 6 sleeps for 5 seconds
thread 7 sleeps for 5 seconds
thread 8 sleeps for 5 seconds
thread 9 sleeps for 5 seconds
thread 1 woke up
thread 0 woke up
thread 3 woke up
thread 2 woke up
thread 5 woke up
thread 9 woke up
thread 8 woke up
thread 7 woke up
thread 6 woke up
thread 4 woke up
Das nächste Beispiel zeigt einen Thread, der ermittelt ob eine Zahl eine Primzahl ist. Der Thread wird über das threading-Modul definiert.
import threading 
 
class PrimeNumber(threading.Thread): 
  def __init__(self, number): 
    threading.Thread.__init__(self) 
    self.Number = number
 
  def run(self): 
    counter = 2 
    while counter*counter < self.Number: 
      if self.Number % counter == 0: 
        print "%d ist keine Primzahl, da %d = %d * %d" % ( self.Number, self.Number, counter, self.Number / counter) 
                return 
            counter += 1 
        print "%d ist eine Primzahl" % self.Number
threads = [] 
while True: 
    input = long(raw_input("number: ")) 
    if input < 1: 
        break 
 
    thread = PrimeNumber(input) 
    threads += [thread] 
    thread.start() 
 
for x in threads: 
    x.join()
Mit Locks sollte es so aussehen:
class PrimeNumber(threading.Thread):
    prime_numbers = {} 
    lock = threading.Lock()
    
    def __init__(self, number): 
        threading.Thread.__init__(self) 
        self.Number = number
        PrimeNumber.lock.acquire() 
        PrimeNumber.prime_numbers[number] = "None" 
        PrimeNumber.lock.release() 
 
    def run(self): 
        counter = 2
        res = True
        while counter*counter < self.Number and res: 
            if self.Number % counter == 0: 
               res = False 
            counter += 1 
        PrimeNumber.lock.acquire() 
        PrimeNumber.prime_numbers[self.Number] = res 
        PrimeNumber.lock.release() 
threads = [] 
while True: 
    input = long(raw_input("number: ")) 
    if input < 1: 
        break 
 
    thread = PrimeNumber(input) 
    threads += [thread] 
    thread.start() 
 
for x in threads: 
    x.join()

Thread, Beispiel aus der Praxis

Ping im Netzwerk Die bisherigen Beispiele waren im Prinzip nur von didaktischen Interesse, hatten aber wenig Praxisbezug. Das folgende Beispiel zeigt nun eine interessante Anwendung, die sich sehr gut in der Praxis einsetzen lässt. Man möchte in einem bestehenden lokalen Netzwerk feststellen, welche IP-Adressen vergeben sind bzw. welche Computer gerade aktive sind. Manuell würden wir für ein Netzwerk 192.168.178.x wie folgt vorgehen: Wir würden für die Adressen 192.168.178.0, 192.168.178.1, 192.168.178.3 usw. bis 192.168.178.255 ein ping absetzen und auf das Ergebnis warten. Dies kann man mittels einer for-Schleife über den Adressbereich und einem os.popen("ping -q -c2 "+ip,"r") in Python umsetzen.

Eine Lösung ohne Threads, wie die folgende zeigt sich als wenig effizient. Da auf jedes Ping separat gewartet werden muss.

Lösung ohne Threads:
import os, re

received_packages = re.compile(r"(\d) received")
status = ("no response","alive but losses","alive")

for suffix in range(20,30):
   ip = "192.168.178."+str(suffix)
   ping_out = os.popen("ping -q -c2 "+ip,"r")
   print "... pinging ",ip
   while True:
      line = ping_out.readline()
      if not line: break
      n_received = received_packages.findall(line)
      if n_received:
         print ip + ": " + status[int(n_received[0])]
Um das Skript zu verstehen, sollte man sich die Ergebnisse eines ping-Aufrufes in der Shell anschauen:
$ ping -q -c2 192.168.178.26
PING 192.168.178.26 (192.168.178.26) 56(84) bytes of data.

--- 192.168.178.26 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.022/0.032/0.042/0.010 ms
Falls ein Ping nicht zum Erfolg führt, gibt es folgende Ausgabe:
$ ping -q -c2 192.168.178.23
PING 192.168.178.23 (192.168.178.23) 56(84) bytes of data.

--- 192.168.178.23 ping statistics ---
2 packets transmitted, 0 received, +2 errors, 100% packet loss, time 1006ms

Und nun eine deutliche schnellere Lösung mit Threads:
import os, re, threading

class ip_check(threading.Thread):
   def __init__ (self,ip):
      threading.Thread.__init__(self)
      self.ip = ip
      self.__successful_pings = -1
   def run(self):
      ping_out = os.popen("ping -q -c2 "+self.ip,"r")
      while True:
        line = ping_out.readline()
        if not line: break
        n_received = re.findall(received_packages,line)
        if n_received:
           self.__successful_pings = int(n_received[0])
   def status(self):
      if self.__successful_pings == 0:
         return "no response"
      elif self.__successful_pings == 1:
         return "alive, but 50 % package loss"
      elif self.__successful_pings == 2:
         return "alive"
      else:
         return "shouldn't occur"
received_packages = re.compile(r"(\d) received")

check_results = []
for suffix in range(20,70):
   ip = "192.168.178."+str(suffix)
   current = ip_check(ip)
   check_results.append(current)
   current.start()

for el in check_results:
   el.join()
   print "Status from ", el.ip,"is",el.status()