Un semplice ThreadPool in Ruby

Qualche giorno fa, studiando una soluzione per parallelizzare l'iterazione degli elementi di un'istanza Enumerable in Ruby, ci siamo scontrati con l'esigenza di limitare la continua creazione/distruzione dei thread sfruttati per processare i singoli elementi al fine di limitare l'impatto prestazionale ed evitare uno spreco di risorse inutile. Una soluzione, come proposto, potrebbe essere quella di utilizzare un pool di thread, vediamo quindi come implementarne uno:

1 class ThreadPool 2 require 'thread' 3 4 def initialize(max_number_of_threads = 5) 5 @shutdown = false 6 @max_threads = max_number_of_threads 7 @thread_pool = [] 8 @queued_blocks = [] 9 @mutex = Mutex.new 10 @condition = ConditionVariable.new 11 12 1.upto @max_threads do 13 @thread_pool << Thread.new do 14 thread_loop 15 end 16 end 17 end 18 19 def enqueue(&block) 20 raise ArgumentError, 'Block not given' unless block_given? 21 22 @mutex.synchronize do 23 @queued_blocks << block 24 @condition.signal 25 end 26 end 27 28 def next_block_in_queue 29 @mutex.synchronize do 30 Thread.current.exit if @shutdown and @queued_blocks.size == 0 31 32 @condition.wait @mutex if @queued_blocks.empty? 33 @queued_blocks.shift 34 end 35 end 36 37 def thread_loop 38 loop do 39 block = next_block_in_queue 40 block.call unless block.nil? 41 end 42 end 43 44 def shutdown(wait = true) 45 @shutdown = true 46 @thread_pool.each { |thread| thread.join if thread.alive? } if wait 47 end 48 49 attr_reader :max_threads 50 51 private :thread_loop, :next_block_in_queue 52 end

Il codice è abbastanza autoesplicativo visto che ho cercato di dare dei nomi alle variabili e ai metodi per ottenere una lettura più naturale possibile del listato, comunque analizziamo passo per passo il funzionamento di questa classe.

  • Durante la creazione di un'istanza della classe ThreadPool (ThreadPool.new) vengono creati tanti thread quanti sono quelli specificati nell'unico argomento della funzione di inizializzazione.
  • A tutti i thread creati viene iniettato il metodo thread_loop come parte di codice da eseguire.
  • All'interno di thread_loop viene invocato il metodo next_block_in_queue che attende la disponibilità di un blocco di codice da far eseguire al primo thread del pool disponibile. Per lasciare i thread in attesa viene utilizzato il metodo wait di una ConditionVariable su un Mutex il quale a sua volta, attraverso il metodo synchronize, garantisce l'accesso da parte di un unico thread alla parte di codice protetta.
  • Il metodo enqueue accetta come parametro un blocco che dovrà essere eseguito da uno dei thread appartenenti al nostro pool. Nel momento in cui viene messo in coda un blocco e memorizzato all'interno di un array (@queued_blocks), la ConditionVariable viene allertata tramite il suo metodo signal per sbloccare l'attesa innestatasi all'interno del thread_loop. Il primo thread libero recupera il primo blocco disponibile e lo esegue (block.call) e, una volta terminata la sua esecuzione, ritorna al punto precedente invocando next_block_in_queue.
  • Nel caso tutti i thread del pool siano già impegnati, i blocchi da eseguire vengono accumulati in @queued_blocks e prelevati da next_block_in_queue appena i thread si liberano.
  • Attraverso il metodo shutdown è possibile disattivare il pool di thread. Di default la chiamata a questo metodo blocca il thread in cui essa viene effettuata (per esempio potrebbe essere il thread principale dell'applicazione), altrimenti specificando wait = false è possibile far sì che le operazioni del pool terminino in maniera asincrona.

Ecco infine un esempio pratico per testare la nostra classe ThreadPool:

1 require 'open-uri' 2 require 'threadpool.rb' 3 4 url_list = [ 5 'http://www.codeplex.com/Phalanger/Project/ProjectRss.aspx', 6 'http://www.codeplex.com/IronPython/Project/ProjectRss.aspx', 7 'http://www.codeplex.com/RORIIS/Project/ProjectRss.aspx', 8 'http://www.codeplex.com/AkismetApi/Project/ProjectRss.aspx', 9 'http://www.codeplex.com/AjaxPro/Project/ProjectRss.aspx', 10 'http://www.codeplex.com/IndySockets/Project/ProjectRss.aspx', 11 'http://www.codeplex.com/32feet/Project/ProjectRss.aspx', 12 ] 13 14 pool = ThreadPool.new 4 15 16 pool.enqueue do 17 1.upto 50 do |n| 18 puts n 19 sleep 0.2 20 end 21 end 22 23 url_list.each do |url| 24 pool.enqueue do 25 puts "Processing #{url}" 26 byte_read = open(url).read.length 27 puts "Processed #{url} [#{byte_read} bytes read]" 28 end 29 end 30 31 pool.shutdown 32 puts "DONE!"

Questa implementazione di ThreadPool è piuttosto semplificata, non prevede per esempio una capacità minima e massima del pool con il conseguente ridimensionamento dinamico dello stesso, comunque dovrebbe già bastare per la maggior parte dei casi. Prossimamente integreremo questo ThreadPool nel nostro metodo Enumerable#parallelize per limitare i thread utilizzati durante l'iterazione parallela.

Sorgenti allegati: threadpool.zip - threadpool.tar.gz


Puoi scrivere un commento oppure inviare un trackback dal tuo sito.

7 commenti a “Un semplice ThreadPool in Ruby”

  1. ok, quando cerco di pensare alla concorrenza il cervello mi si scioglie, ed in più oggi sono pure malato.. ma non potresti semplificare un po' usando una SizedQueue?

  2. SizedQueue blocca il thread chiamante quando viene invocato il relativo metodo push e lo spazio nella coda si esaurisce, questo perché lo scopo della suddetta classe è quello di assicurare che non vi siano mai più di n elementi all'interno della coda stessa. Quello che interessa invece nel threadpool è che non vi siano più di n thread attivi nell'esecuzione dei blocchi di codice mettendo in coda (non necessariamente una sized, anzi) tutti gli altri blocchi per un'esecuzione successiva riutilizzando gli stessi thread mano a mano che si liberano, il tutto senza bloccare il thread che invoca il metodo enqueue.

    Detto ciò, se come penso con "semplificare un po' usando una SizedQueue" intendi dire di usarla internamente a ThreadPool allora sinceramente non mi vengono proprio idee di come poterla sfruttare in modo da semplificare effettivamente il codice, inoltre così a occhio il problema del blocco del thread principale costringerebbe come minimo a una ridefinizione dei suoi metodi push e pop e già qui penso che non ne varrebbe più la pena.

    Se invece intendi dire di utilizzare direttamente SizedQueue anzichè ThreadPool... beh le due classi gestiscono due problematiche differenti e, al lato pratico, hanno proprio comportamenti differenti alla base.

  3. Gravatar
    Angelo ha detto:

    Salve,
    ho provato ad utilizzare il codice e provarlo con la classe scrubyt ma va in tilt....

    l'errore è il seguente...

    /usr/lib/ruby/gems/1.8/gems/scrubyt-0.2.8/lib/scrubyt/core/shared/extractor.rb:109:in `method_missing': Only one root pattern allowed (RuntimeError)
    from ./threadpool.rb:46:in `join'
    from ./threadpool.rb:46:in `shutdown'
    from ./threadpool.rb:46:in `each'
    from ./threadpool.rb:46:in `shutdown'
    from test2.rb:55

    eppure se la provo senza la vostra classe il tutto funziona... sapete darmi una mano?

    Grazie

  4. Ciao Angelo,
    purtroppo non conosco scrubyt se non di nome ma il primo sospetto che mi viene è che la sua libreria non sia implicitamente thread safe e questo potrebbe creare problemi di race conditions, infatti a giudicare dallo stack trace che hai incollato e andando a verificare nel codice di scrubyt, l'eccezione viene generata "volontariamente" dalla libreria dopo un controllo effettuato su una variabile di classe senza effettuare alcun lock della stessa.

    Se tu potessi riportare un esempio di codice (insieme alle versioni di scrubyt e di ruby che stai utilizzando) in modo tale da permettermi di riprodurre lo stesso problema che hai descritto, magari riesco a verificare meglio la mia ipotesi e darti una risposta più precisa.

  5. Gravatar
    Angelo ha detto:

    come versioni sono sia l'ultima di scrubyt e l'ultima di ruby.....

  6. Gravatar
    Angelo ha detto:

    ahh.... ok scusatemi non avevo capito.... posto il codice....


    require 'rubygems'
    require 'scrubyt'
    require 'threadpool.rb'

    url_list = [
    'prova',
    'prova1', 'prova2', 'prova3', 'prova4']

    pool = ThreadPool.new 4

    url_list.each do |url|
    pool.enqueue do

    google_data = Scrubyt::Extractor.define do

    fetch 'http://www.google.com/ncr'
    fill_textfield 'q', "#{url}"
    submit

    link "Ruby Programming Language" do
    url "href", :type => :attribute
    end
    next_page "Next", :limit => 2
    end

    end
    end

    pool.shutdown
    puts "DONE!"

  7. Ok appena ho un attimo di tempo vedo di sistemarmi l'ambiente ruby su linux (non l'ho ancora fatto, dopo una reinstallata) e di fare qualche prova con lo script. Penso che il problema sia proprio quello che ti ho descritto, ma prima voglio verificare.

Lascia un commento

Puoi utilizzare i seguenti tag XHTML: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>