Come creare una RNN da zero con PyTorch
Obiettivo
Proviamo a creare una Recurrent Neural Network (RNN) che, dato un carattere tra a,b,c,d, lo continui secondo il pattern: 'aaaaabbbbbccccacdddddaaaa'
Importiamo le librerie:
import torch import torch.nn as nn import torch.optim as optim sequence = 'aaaaabbbbbccccacdddddaaaa'
torch
: il core di PyTorch, per operazioni tensoriali e autograd.torch.nn
: modulo per costruire reti neurali in PyTorch.torch.optim
: modulo per gli ottimizzatori, usato per aggiornare i pesi della rete durante l'addestramento.
Mappatura Caratteri-indici:
# Funzioni di utilità per convertire da carattere a intero e viceversa chars = list(set(sequence)) char_to_ix = {ch: i for i, ch in enumerate(chars)} # {'b': 0, 's': 1, 'c': 2, 'd': 3, 'a': 4} ix_to_char = {i: ch for i, ch in enumerate(chars)} # {0: 'b', 1: 's', 2: 'c', 3: 'd', 4: 'a'}
Creazione di due dizionari per convertire i caratteri in indici e viceversa.
Questo è utile per manipolare i caratteri in forma numerica, facilitando le operazioni matematiche sui dati. char_to_ix
mappa ogni carattere unico a un indice intero univoco, e ix_to_char
fa l'opposto.
Creiamo dei Tensori per codificare i caratteri sia di input che di addestramento:
def char_to_onehot(char): tensor = torch.zeros(len(chars)) tensor[char_to_ix[char]] = 1 return tensor
- Creiamo una lista di M (numero di CAMPIONI di training) x N (Numero di token nel vocabolario one-hot)
inputs = torch.stack([char_to_onehot(ch) for ch in sequence[:-1]])
- tensor([[0., 0., 0., 0., 1.],
- [0., 0., 0., 0., 1.],
- [0., 0., 0., 0., 1.],
- Quando si usa CrossEntropyLoss di PyTorch, l'output atteso è l'indice
- della classe target, non la rappresentazione one-hot. CrossEntropyLoss
- combina LogSoftmax e NLLLoss (Negative Log Likelihood Loss) in un'unica
- classe. Questa funzione si aspetta logits non normalizzati come input
- (cioè, l'output grezzo del modello prima dell'applicazione di una
- funzione di attivazione come Softmax) e l'indice della classe come target.
- PyTorch gestisce internamente la conversione degli indici in una
- rappresentazione one-hot e l'applicazione della Softmax per calcolare la
- perdita, il che rende l'uso di CrossEntropyLoss più efficiente dal punto di
- vista computazionale rispetto al calcolo manuale di Softmax seguito da una
- funzione di perdita come NLLLoss su output one-hot. Vedere le wiki per i concetti di
- logit, softmax, CrossEntropyLoss, etc
targets = torch.tensor([char_to_ix[ch] for ch in sequence[1:]], dtype=torch.long)
- tensor([4, 4, 4, 4, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 1])
class RNN(nn.Module):
# INIZIALIZZAZIONE DELL'ARCHITETTURA def __init__(self, input_size, hidden_size, output_size): super(RNN, self).__init__() self.hidden_size = hidden_size
# La funzione è: # torch.nn.RNN(self, input_size, hidden_size, num_layers=1, # nonlinearity='tanh', bias=True, batch_first=False, dropout=0.0, # bidirectional=False, device=None, dtype=None) # # input size è la dimensione dell'input # hidden size la dimensione del layer "nascosto". Piu grande è più c'è # spazio di apprendimento. # # batch first true se diamo l'input e outpur nella forma: (seq_len, batch_size, features) # batch first false se abbiamo l'input e output come (batch_size, seq_len, features) self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
# Layer totalmente connesso self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x, hidden): # hidden lo aspettiamo sempre come (batch_size, seq_len, features) che in questo caso "unario" # è [1,1,hidden_size] # # X ce lo aspettiamo egualmente come (batch_size, seq_len, input_size) che diventa [1,1,one-hot-size] # nel caso di batch 1 e seq 1 out, hidden = self.rnn(x, hidden)
# passiamo direttamente a un layer totalmente connesso che darà in output i LOGIT # siccome per ogni sequenza (che qui è da "uno") vogliamo sempre prendere l'ultimo step # facciamo quest'operazione di slicing. # ricordando che out ritorna il formato (batch_size, seq_len, output_size (onr-hot size) # out[:, -1, :] significa: # - : per ogni esempio nel batch (qui è da uno) # - prendi l'ultimo output emesso dalla RNN (-1) # - in tutte le sue componenti/faetures (: output vector size - one hot size) out = self.fc(out[:, -1, :])
# Out contiene ora i LOGIT in formato: (batch_size, output_size) # ad esempio -0.3382, 0.2435, -0.4452, 0.4239, 0.1288 return out, hidden
def init_hidden(self): # hidden ha formato (batch_size, seq_len, dimensione hidden), qui (1,1,hidden) return torch.zeros(1, 1, self.hidden_size)
- Model instantiation
n_hidden = 128 model = RNN(len(chars), n_hidden, len(chars))
- Loss and optimizer
criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.005)
- Addestramento
In PyTorch, per ogni epoch (o mini batch) durante la fase di addestramento, vogliamo azzerare esplicitamente i gradienti prima di iniziare a fare la backpropagation (cioè, aggiornare i pesi e i bias) perché PyTorch accumula i gradienti su passaggi backward successivi.
__l'azione predefinita è stata impostata per accumulare (cioè sommare) i gradienti ad ogni chiamata di loss.backward()__.
Per questa ragione, quando inizi il tuo ciclo di addestramento, dovresti __azzerare i gradienti__ in modo che si possa fare aggiornare dei parametri correttamente. Altrimenti il gradiente sarebbe una combinazione del vecchio gradiente, che hai già utilizzato per aggiornare i parametri del tuo modello, e del nuovo gradiente calcolato. Di conseguenza, questo punterebbe in una direzione diversa da quella intesa verso il minimo (o massimo, in caso di obiettivi di massimizzazione).
- Training the model
for epoch in range(1000):
# torch.Size([1, 1, 128]) hidden = model.init_hidden()
# model.zero_grad()
# inizializza la loss per questa epoch loss = 0
# inputs.size(0) è il numero di caratteri nella stringa di input: 20 for i in range(inputs.size(0)): # inputs[i] è un tensore tensor([0., 0., 0., 0., 1.]), quindi torch.Size([5]) # Siccome la rete si aspetta (1,1,5) # Dobbiamo usare unsqueeze due volte che aggiunge le dimensioni "batch size" e "seq length" # da [input_size] a [1, 1, input_size] input_tensor = inputs[i].unsqueeze(0).unsqueeze(0) output, hidden = model(input_tensor, hidden)
# Anche per il calcolo della loss dobbiamo usare unsqueeze una volte per aggiungere la dimensione "batch size" # da [1, one-hot size] a [1, 1, one-hot size] loss += criterion(output, targets[i].unsqueeze(0))
loss.backward() optimizer.step()
if epoch % 100 == 0: print(f'Epoch: {epoch}, Loss: {loss.item()/inputs.size(0)}')
- Funzione per generare una sequenza in maniera autoregressiva
def generate_seq(start_char='a', length=20):
hidden = model.init_hidden() input = char_to_onehot(start_char)
# questa è la lista di caratteri in output output_seq = start_char
for i in range(length-1): # anche qui unsqueeze input_tensor = input.unsqueeze(0).unsqueeze(0) output, hidden = model(input_tensor, hidden)
# prendiamo direttamente la posizione col valore massimo dei logit predicted_char_idx = torch.argmax(output).item() predicted_char = ix_to_char[predicted_char_idx]
# accodiamo il carattere alla sequenza output_seq += predicted_char
# usiamo l'ultimo carattere predetto come input al prossimo giro input = char_to_onehot(predicted_char)
return output_seq
- Generating a sequence
print(generate_seq('a', 25))