Introduzione ad Hadoop

Posted on 9 October 2011 by Paolo Bernardi

Introduzione

Hadoop è un sistema di calcolo MapReduce distribuito per processi di tipo batch estremamente scalabile e in grado di maneggiare terabyte o petabyte di dati senza colpo ferire.

Il modello di calcolo MapReduce deve il suo nome a due celebri funzioni della programmazione funzionale, map e reduce, delle quali rispecchia in un certo senso il comportamento. In una computazione MapReduce infatti i dati iniziali sono una serie di record che vengono trattati singolarmente da processi chiamati Mapper e successivamente aggregati da processi chiamati Reducer.

Questo modello di calcolo si presta ottimamente alla parallelizzazione anche spinta, tuttavia viene da chiedersi quanti problemi reali sia possibile risolvere usandolo. In realtà sono molti più di quelli che si potrebbe pensare: attualmente MapReduce viene utilizzato nelle elaborazioni dei dati generati da enormi applicazioni web (es. Google, Facebook), ma anche per studi di genetica e di molti altri campi.

Un sistema Hadoop è formato da diversi componenti che implementano rispettivamente un filesystem distribuito ed il sistema di calcolo MapReduce. Il filesystem distribuito, chiamato HDFS, non è conforme alle specifiche POSIX perché consente di creare, cancellare, spostare file ma non di modificarli. Questo compromesso ha consentito di ottenere ottime prestazioni senza avere i problemi di implementazione dei filesystem distribuiti consueti. In HDFS i file sono suddivisi in blocchi (generalmente da 64Mb l’uno… sono commisurati alla mole di dati per cui è stato pensato Hadoop!) distribuiti tra più nodi del cluster, anche replicati, per garantire maggiore sicurezza. All’atto del calcolo MapReduce Hadoop cerca di fare eseguire a ciascun nodo i calcoli sui blocchi che ha sul proprio disco: in questo modo si ottiene una data locality altissima ed un traffico di rete molto basso.

I processi Mapper elaborano i dati di input, che nel modello di calcolo di Hadoop sono logicamente suddivisi in record (ad esempio un record può essere una linea di un file di testo o qualcosa del genere) e producono una coppia chiave-valore per ciascun record. Queste coppie chiave-valore (di norma ci sono più coppie aventi la stessa chiave) prodotte dai Mapper sono poi passate ai processi Reducer che aggregano le coppie corrispondenti alla stessa chiave; il risultato dei Reducer è un insieme di coppie chiave-valore aventi una sola coppia per chiave e viene generalmente scritto su un file di output nello stesso HDFS.

Installazione di Hadoop

Per le nostre prove installeremo Hadoop (versione 0.21.0 o successive) su un singolo computer; sebbene i processi verranno al massimo distribuiti tra i core disponibili (dipende dal vostro processore) è una configurazione piuttosto semplice che consente di avere un sistema funzionante senza particolari complicazioni.

Innanzitutto sul computer deve esserci Java, preferibilmente la versione 6. Personalmente ho preferito creare un utente separato per l’installazione e l’uso di Hadoop: l’ho chiamato hduser e mi riferirò ad esso per il resto del tutorial.

# adduser --ingroup hadoop hduser
# useradd --groups hadoop hduser
# su - adduser
$ passwd adduser

Sebbene stiamo configurando un cluster fittizio per un comodo uso di Hadoop è necessario che questi possa effettuare una connessione SSH al nostro utente hduser nella nostra macchina senza dover richiedere la password all’utente. Generiamo le chiavi crittografiche per questo scopo:

$ cd /home/hduser/
$ ssh-keygen -t rsa -P ''
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
$ chmod 600 ~/.ssh/authorized_keys

Ora scarichiamo Hadoop ed installiamolo nella home directory del nostro utente:

$ curl -O [http://apache.fis.uniroma2.it//hadoop/core/hadoop-0.21.0/hadoop-0.21.0.tar.gz](http://apache.fis.uniroma2.it//hadoop/core/hadoop-0.21.0/    hadoop-0.21.0.tar.gz)
$ curl -O [http://apache.fis.uniroma2.it//hadoop/core/hadoop-0.21.0/hadoop-0.21.0.tar.gz.asc](http://apache.fis.uniroma2.it//hadoop/core/hadoop-0.21.0/    hadoop-0.21.0.tar.gz.asc)
$ curl http://apache.fis.uniroma2.it//hadoop/core/KEYS | gpg --import -
$ gpg --verify hadoop-0.21.0.tar.gz.asc

Come buona abitudine abbiamo anche verificato l’integrità dell’archivio in questione. Ora decomprimiamolo e facciamo un collegamento simbolico alla directory che verrà creata:

$ tar -xzf hadoop-0.21.0.tar.gz
$ ln -s hadoop-0.21.0 hadoop

Adesso configuriamo l’ambiente di esecuzione di Hadoop aggiungendo queste righe al .bashrc del nostro utente hduser:

export HADOOP_HOME="$HOME/hadoop"
export JAVA_HOME='/usr/lib/jvm/java-openjdk'
export PATH="$PATH:$HADOOP_HOME/bin"

La variabile JAVA_HOME è necessaria e varia da distribuzione a distribuzione; il valore nell’esempio è valido per la mia Fedora 15 (nella mia Arch invece sarebbe /usr/lib/jvm/java-6-openjdk). Una volta modificato .bashrc è importante rendere attive le modifiche nella shell che stiamo usando:

$ . .bashrc

Procediamo ora con la configurazione di Hadoop vera e propria, a partire dal file hadoop/conf/hadoop-env.sh nel quale dovremmo specificare la variabile JAVA_HOME (proprio come in .bashrc) aggiungendo la seguente riga:

export JAVA_HOME='/usr/lib/jvm/java-openjdk'

Il file hadoop/conf/core-site.xml invece dovrà avere il seguente contenuto:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/home/hduser/tmp</value>
    <description>A base for other temporary directories.</description>
  </property>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:54310</value>
    <description>The name of the default file system. A URI whose
    scheme and authority determine the FileSystem implementation. The
    uri's scheme determines the config property (fs.SCHEME.impl) naming
    the FileSystem implementation class. The uri's authority is used to
    determine the host, port, etc. for a filesystem.</description>
  </property>
</configuration>

Dal momento che facciamo riferimento ad una directory temporanea è opportuno crearla:

$ mkdir /home/hduser/tmp
$ chmod 750 /home/hduser/tmp

Il file hadoop/conf/mapred-site.xml deve avere un contenuto simile a questo:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapred.job.tracker</name>
    <value>localhost:54311</value>
    <description>The host and port that the MapReduce job tracker runs
    at. If "local", then jobs are run in-process as a single map
    and reduce task.
    </description>
  </property>
</configuration>

Ed infine ecco il contenuto di hadoop/conf/hdfs-site.xml:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
    <description>Default block replication.
    The actual number of replications can be specified when the file is created.
    The default is used if replication is not specified in create time.
    </description>
  </property>
</configuration>

Per completare l’opera inizializziamo il filesystem distribuito di Hadoop:

$ hadoop namenode -format

Esecuzione di un job con Hadoop

Ora che Hadoop è (si spera!) configurato a dovere, testiamolo usando un’applicazione di esempio. L’esempio classico per i sistemi MapReduce è “WordCount”, un programma che conta la frequenza delle parole in un insieme di testi forniti in input. In questo caso però useremo FantaHadoop, un programmino che, a partire dai risultati del fantacalcio per una serie di giornate, restituisce per ciascun giocatore il totale dei suoi voti, le partite che ha giocato ed il voto medio.

Innanzitutto avviamo il filesystem distribuito HDFS ed il sistema MapReduce di Hadoop:

$ start-dfs.sh
$ start-mapred.sh

Se l’operazione è andata a buon fine potremo visitare questi URL con un browser:

  • http://localhost:50030, dove è possibile visualizzare i job in esecuzione e quelli passati
  • http://localhost:50060, dove è possibile visualizzare la lista dei task in esecuzione (un singolo job è composto di più task di tipo Mapper e Reducer)
  • http://localhost:50070, dove è possibile visualizzare il contenuto del filesystem di Hadoop e i log

Scarichiamo FantaHadoop (già compilato sotto forma di archivio JAR) e dei dati di input di esempio:

$ curl -O https://www.bernardi.cloud/wp-content/uploads/2011/10/software/FantaHadoop.jar
$ curl https://www.bernardi.cloud/wp-content/uploads/2011/10/fantacalcio-2010.tar.bz2 | tar xjf -

I dati di input, che si trovano nella directory fantacalcio-2010, vanno poi caricati sul filesystem distribuito di Hadoop, in una directory creata per l’occasione:

$ hadoop fs -mkdir /home/hduser/fantacalcio/input
$ hadoop fs -put fantacalcio-2010/*.csv /home/hduser/fantacalcio/input

I due comandi agiscono sul filesystem distribuito di Hadoop creando una directory (hadoop fs -mkdir funziona come mkdir -p in locale, all’occorrenza crea anche le directory intermedie) e copiandoci i file da usare come input per il nostro job (vengono presi da una directory locale, fantacalcio-2010, poi copiati su HDFS).

Ora non ci rimane che eseguire il job! Facciamolo con questo comando:

$ hadoop jar FantaHadoop.jar pb.fantahadoop.Main /home/hduser/fantacalcio/input /home/hduser/fantacalcio/output

In questa riga di comando abbiamo specificato il file JAR e la classe del nostro job nonché le directory di input e di output. È importante che la directory di output non esista, altrimenti il job andrà in errore. Per cancellare eventuali residui di computazioni precedenti basta usare questo comando:

$ hadoop fs -rmr /home/hduser/fantacalcio/output

Il comportamento di questo comando è simile a quello di rm -r sul filesystem locale.

Tornando all’esecuzione del nostro job, potremo renderci conto se ci sono stati errori oppure del suo progresso (rispettivamente della fase di map e di quella di reduce) semplicemente guardando l’output sul terminale oppure tramite l’interfaccia web all’indirizzo http://localhost:50030.

Se l’esecuzione termina correttamente avremo il nostro output in un file su HDFS che dovremo copiare sul disco locale per poterlo usare.

$ hadoop fs -get /home/hduser/fantacalcio/output/part-00000 statistiche.csv

Il file CSV, che potete tranquillamente aprire con LibreOffice (o OpenOffice) specificando la virgola (comma, in inglese) come separatore di campo, contiene una riga per ciascun giocatore con nome, ruolo (P, D, C o A), totale dei voti, presenze e media voto. Già che ci siamo creiamo anche una versione ordinata per numero di punti totalizzati da ciascun giocatore, in ordine decrescente:

$ sort -rn -t, -k 3 statistiche.csv > statistiche-ordinate.csv

Il codice sorgente di FantaHadoop

Per concludere, diamo una rapida occhiata al codice sorgente di FantaHadoop. Intanto scarichiamolo da GitHub:

$ git clone https://[email protected]/bernarpa/FantaHadoop.git

Si tratta di codice Java scritto con NetBeans, ma può essere tranquillamente modificato tramite qualsiasi altro IDE o editor di testo.

Il nocciolo del programma è la classe Main, che contiene:

  1. una classe interna Map, che implementa il processo Mapper;
  2. una classe interna Reduce, che implementa il processo Reducer;
  3. il metodo main, che configura il job Hadoop e lo esegue.

La classe CsvRow invece serve ad effettuare il parsing delle righe dei file CSV con i dati di ciascun giocatore per ciascuna giornate e a calcolare i voti del fantacalcio a partire da questi ultimi. La classe Map infatti la usa proprio per questi due scopi: legge una riga dei file CSV, ne calcola il voto e restituisce una coppia in cui la chiave è il nome del giocatore ed il valore un oggetto di tipo MyWritable contenente il numero di presenze (a questo livello, ancora 1 per coppia) e la somma dei voti ottenuti dal giocatore durante quelle presenze. La classe MyWritable implementa l’interfaccia Writable; le sue istanze possono così essere inviate attraverso un cluster Hadoop dai Mapper ai Reducer. Hadoop fornisce una serie di Writable per i tipi di dato più comuni (es. IntWritable, DoubleWritable etc…).

I Reducer (classe Reduce) ricevono in input le coppie chiave-valore inviate dai Mapper raggruppate per chiave, ovvero per singolo giocatore. ed effettuano la somma delle presenze e dei voti emettendo infine una coppia chiave-valore che contiene questi due dati aggregati per il giocatore considerato di volta in volta.

Il formato con cui Hadoop scrive i risultati inviati dai Reducer su disco è controllato dalla classe MyOutputFormat, che scrive dei file CSV consistenti in righe i cui campi sono separati dalla virgola.

Get in touch

Thank you for contacting me, I will be in touch with you as soon as possible.
There was an error while trying to send the comment, please try again later.