Pregunta ¿Cómo usar threading en Python?


Estoy tratando de entender el enhebrado en Python. He visto la documentación y ejemplos, pero francamente, muchos ejemplos son demasiado sofisticados y tengo problemas para entenderlos.

¿Cómo se muestran claramente las tareas divididas para multihilo?


928
2018-05-17 04:24


origen


Respuestas:


Desde que se hizo esta pregunta en 2010, ha habido una simplificación real en la forma de hacer multihilo simple con python con mapa y piscina.

El siguiente código proviene de una publicación de artículo / blog que definitivamente debes verificar (sin afiliación) - Paralelismo en una línea: Un mejor modelo para las tareas de enhebrado de día a día. Resumiré a continuación: termina siendo solo unas pocas líneas de código:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

¿Cuál es la versión multiproceso de:

results = []
for item in my_array:
    results.append(my_function(item))

Descripción

El mapa es una pequeña función genial y la clave para inyectar fácilmente el paralelismo en tu código de Python. Para los que no están familiarizados, el mapa es algo extraído de los lenguajes funcionales como Lisp. Es una función que mapea otra función sobre una secuencia.

Map maneja la iteración sobre la secuencia para nosotros, aplica la función y almacena todos los resultados en una lista práctica al final.

enter image description here


Implementación

Las versiones paralelas de la función de mapa son proporcionadas por dos bibliotecas: multiprocesamiento, y también su paso poco conocido, pero igualmente fantástico: multiprocesamiento.dummy.

multiprocessing.dummy es exactamente lo mismo que el módulo de multiprocesamiento, pero usa hilos en su lugar (una distinción importante - utilizar procesos múltiples para tareas intensivas de CPU; hilos para (y durante) IO)

multiprocesamiento.dummy replica la API de multiprocesamiento pero no es más que un contenedor alrededor del módulo de subprocesamiento.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Y los resultados de tiempo:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Pasando múltiples argumentos (funciona así solo en Python 3.3 y posterior)

Para pasar múltiples matrices:

results = pool.starmap(function, zip(list_a, list_b))

o pasar una constante y una matriz:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Si está utilizando una versión anterior de Python, puede pasar múltiples argumentos a través de esta solución.

(Gracias a usuario136036 para el comentario útil)


1008
2018-02-11 19:53



Aquí hay un ejemplo simple: necesita probar algunas URL alternativas y devolver el contenido de la primera para responder.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

Este es un caso en el que el subprocesamiento se usa como una optimización simple: cada subproceso está esperando que se resuelva y responda una URL, para poner su contenido en la cola; cada hilo es un daemon (no mantendrá el proceso si termina el hilo principal, eso es más común que no); el hilo principal comienza todos los subprocesos, hace un get en la cola para esperar hasta que uno de ellos haya hecho una put, luego emite los resultados y finaliza (lo que elimina cualquier subproceso que todavía esté en ejecución, ya que son hilos de daemon).

El uso adecuado de subprocesos en Python está invariablemente conectado a operaciones de E / S (ya que CPython no usa múltiples núcleos para ejecutar tareas vinculadas a la CPU de todos modos, la única razón para enhebrar no es bloquear el proceso mientras hay espera para algunas E / S ) Las colas son casi invariablemente la mejor forma de distribuir trabajo a hilos y / o recolectar los resultados de la obra, por cierto, y son intrínsecamente seguras en cuanto a los hilos, por lo que le evitan preocuparse por bloqueos, condiciones, eventos, semáforos y otras interconexiones. conceptos de coordinación / comunicación de hilos.


672
2018-05-17 04:36



NOTA: Para la paralelización real en Python, debe usar multiproceso módulo para bifurcar múltiples procesos que se ejecutan en paralelo (debido al bloqueo de intérprete global, los subprocesos de Python proporcionan entrelazado pero de hecho se ejecutan en serie, no en paralelo, y solo son útiles al intercalar operaciones de E / S).

Sin embargo, si simplemente está buscando entrelazado (o está haciendo operaciones de E / S que pueden ser paralelizadas a pesar del bloqueo de intérprete global), entonces el enhebrado módulo es el lugar para comenzar. Como un ejemplo realmente simple, consideremos el problema de sumar un gran rango sumando subrangos en paralelo:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Tenga en cuenta que lo anterior es un ejemplo muy estúpido, ya que no tiene absolutamente ninguna E / S y se ejecutará en serie aunque intercalada (con la sobrecarga adicional de cambio de contexto) en CPython debido al bloqueo de intérprete global.


226
2018-05-17 04:35



Como otros mencionados, CPython puede usar hilos solo para I \ O espera debido a GIL. Si desea beneficiarse de múltiples núcleos para tareas vinculadas a la CPU, use multiproceso:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

86
2018-03-08 22:22



Solo una nota, Queue no es necesario para enhebrar.

Este es el ejemplo más simple que podría imaginar que muestra 10 procesos que se ejecutan simultáneamente.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

84
2017-09-23 16:07



La respuesta de Alex Martelli me ayudó, sin embargo, aquí está la versión modificada que pensé que era más útil (al menos para mí).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

38
2017-10-01 15:50



Encontré esto muy útil: crear tantos hilos como núcleos y dejarlos ejecutar un número (grande) de tareas (en este caso, llamar a un programa shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

19
2018-06-06 23:51



Para mí, el ejemplo perfecto para Threading es monitorear eventos asincrónicos. Mira este código.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Puede jugar con este código abriendo una sesión de IPython y haciendo algo como:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Espera unos minutos

>>>a[0] = 2
Mon = 2

15
2018-04-14 04:18



Dada una función, f, enhebrarlo así:

import threading
threading.Thread(target=f).start()

Para pasar argumentos a f

threading.Thread(target=f, args=(a,b,c)).start()

15
2018-03-16 16:07