Redes Neuronales para Lenguaje Natural, 2024

---
# POS-tagging con redes LSTM

En este notebook construiremos un etiquetador de categorías gramaticales (POS-tagger) usando una arquitectura LSTM en Pytorch. Los datos que utilizaremos son una versión reducida del corpus AnCora en español, etiquetada según el estándar Universal Dependencies.


---



In [None]:
!pip install torchmetrics

In [None]:
import string
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchmetrics as tm

BATCH_SIZE = 64

Descargar y abrir los archivos de datos

In [None]:
! wget https://eva.fing.edu.uy/mod/resource/view.php?id=195738 -O ancora_mini.zip
! unzip ancora_mini.zip

In [None]:
def load_data_file(filename):
    with open(filename) as f:
      tokens = []
      sent = []
      for l in f:
        if l.startswith('#'):
          continue
        toks = l.strip().split('\t')
        if (len(toks) >= 3) and ('-' not in toks[0]) and ('.' not in toks[0]):
          sent.append((toks[1],toks[3]))
        elif len(toks) <= 1:
          tokens.append(sent)
          sent = []

    if len(sent) > 0:
      tokens.append(sent)

    return tokens

train = load_data_file('ancora_mini_train.txt')
dev = load_data_file('ancora_mini_dev.txt')
test = load_data_file('ancora_mini_test.txt')
print(len(train),len(dev),len(test))

Imprimimos un par de ejemplos para ver el formato con el que estamos trabajando.

In [None]:
print(train[0])
print(train[20])

Descargar una colección de embeddings.

En este ejercicio se pueden utilizar diferentes colecciones de embeddings, que darán distintos resultados. Proveemos tres alternativas, cada una con sus pros y sus contras.

1.   Embeddings del Spanish Billion Word Corpus (SBWCE): tamaño 300, están entrenados con más de mil millones de palabras en español y se cargan con gensim.
2.   Embeddings de Wikipedia en HuggingFace: tamaño 100, están entrenados con el texto de la Wikipedia en español (mucho menos texto) y se cargan con gensim.
3.   Embeddings tipo FastText de TorchNLP: tamaño 300, son otro tipo de vector distinto a word2vec, se cargan con PyTorch-NLP.


In [None]:
# Alternativa 1

# Fromas de bajar los embeddings de SBWC:

# Sitio oficial de la Universidad Nacional de Córdoba
# ! wget https://cs.famaf.unc.edu.ar/~ccardellino/SBWCE/SBW-vectors-300-min5.bin.gz

# Mirror en el Owncloud de Fing
# ! wget https://www.fing.edu.uy/owncloud/index.php/s/ryHF9xWwox3NrFe/download/SBW-vectors-300-min5.bin.gz

# Si los enlaces son demasiado lentos: bajarlo una sola vez y copiarlo a google drive
# ! cp /content/drive/MyDrive/SBW-vectors-300-min5.bin.gz .

# El archivo viene en un gzip, lo descomprimimos
! gzip -d SBW-vectors-300-min5.bin.gz

# Abrirlos con la biblioteca de embeddings gensim
from gensim.models import KeyedVectors
wv = KeyedVectors.load_word2vec_format("./SBW-vectors-300-min5.bin", binary=True)
print(wv.vectors.shape)
embeddings_size = wv.vectors.shape[1]

In [None]:
# Alternativa 2

# Bajar y abrir los embeddings de Wikipedia de HuggingFace

from gensim.models import KeyedVectors
from huggingface_hub import hf_hub_download
wv = KeyedVectors.load_word2vec_format(hf_hub_download(repo_id="Word2vec/wikipedia2vec_eswiki_20180420_100d", filename="eswiki_20180420_100d.txt"))
print(wv.vectors.shape)
embeddings_size = wv.vectors.shape[1]


In [None]:
# Alternativa 3

# Para utilizar los embeddings tipo FastText de TorchNLP, primero hay que instalar la biblioteca
! pip install pytorch-nlp

import torchnlp
from torchnlp.word_to_vector import FastText

# Cargar los embeddings FastText en español
wv = FastText(language='es')
print(wv.vectors.shape)
embeddings_size = wv.vectors.shape[1]

# Cuidado! Estos embeddings no están cargados con la biblioteca gensim, por lo que no podemos utilizar sus funcionalidades habituales

Haremos un pequeño análisis de cobertura de la colección de embeddings sobre los tokens utilizados en el corpus.

Consideraremos cuatro clases de tokens: palabras conocidas (están en la colección de embeddings), números, símbolos de puntuación, y palabras desconocidas.

La heurística para determinar si un token es número o símbolo de puntuación será si comienza con un dígito o símbolo de puntuación.

In [None]:
words = []
puncts = []
nums = []
unk = []
labels = set()
for sent in train+dev+test:
  for (t,p) in sent:
    if t in wv:
      words.append(t)
    elif t[0] in string.punctuation:
      puncts.append(t)
    elif t[0] in ['0','1','2','3','4','5','6','7','8','9']:
      nums.append(t)
    else:
      unk.append(t)
    labels.add(p)

print("Total: w %s p %s n %s u %s" % (len(words),len(puncts),len(nums),len(unk)))
words = set(words)
puncts = set(puncts)
nums = set(nums)
unk = set(unk)
print("Únicas: w %s p %s n %s u %s" % (len(words),len(puncts),len(nums),len(unk)))


Por lo que vemos del análisis utilizando SBWC, tenemos vectores para todas las palabras conocidas y para los dígitos, y solamente hay un 0.05% de palabras que no están en la colección de embeddings. Utilizaremos un embedding aleatorio para representar estas palabras desconocidas.

A esto hay que agregarle los símbolos de puntuación: SBWC no contiene embeddings para estos símbolos. Como forma de paliar esta situación, también nos crearemos un embedding aleatorio.

Pytorch trabajará con secuencias de números enteros, y no con las palabras en sí, por lo que construiremos una forma de pasar cada posible token y cada posible label a una representación numérica.

Primero comenzamos creando los diccionarios de palabras y labels que utilizaremos. Estos diccionarios contendrán todos los posibles tokens y labels representables en nuestro corpus.

A los tokens presentes en el corpus, les agregaremos cuatro tokens especiales:
- PAD: para realizar padding de las secuencias (se verá más adelante)
- UNK: representa las palabras desconocidas
- PUNCT: representa símbolos de puntuación
- NUM: representa números

A los labels les agregaremos solo un label especial NONE que se corresponderá con los tokens de PAD.

In [None]:
words = ['<PAD>','<UNK>','<PUNCT>','<NUM>'] + sorted(words)
wordict = { w:i for (i,w) in enumerate(words) }
labels = ['NONE'] + sorted(labels)
labeldict = { l:i for (i,l) in enumerate(labels) }

def get_index(word,wordict):
    if word in wordict:
      return wordict[word]
    elif word[0] in string.punctuation:
      return wordict['<PUNCT>']
    elif word[0] in [0,1,2,3,4,5,6,7,8,9]:
      return wordict['<NUM>']
    else:
      return wordict['<UNK>']

print("words",words[:10],"...")
print("wordict",[(w,wordict[w]) for w in words[:10]],"...")
print("labels",labels)
print("labeldict",labeldict)

A continuación nos crearemos una matriz más pequeña que tenga solo los embeddings con los que vamos a trabajar, de forma de que ocupen la menor cantidad de memoria posible. Esta matriz servirá como pesos de inicialización de la capa de embeddings de la red.

Crear una matriz de embeddings que sea del tamaño len(words) * 300 conteniendo los embeddings que representan todos los tokens en orden.

El embedding correspondiente a los números en SBWC es "DIGITO" (usar "digito" con los embeddings de Wikipedia), el embedding correspondiente a "PAD" será un vector de ceros, y será necesario crearse dos embeddings aleatorios (distintos) del tamaño correcto para representar símbolos de puntuación y palabras desconocidas.

In [None]:
# su código aquí...
pad_vector = ...
punct_vector = ...
num_vector = ...
unk_vector = ...
embeddings = ...

print("Matriz de embeddings:",embeddings.shape)
vocab_size = embeddings.shape[0]
embeddings_dim = embeddings.shape[1]


A continuación crearemos la clase `POSDataset` que hereda de `torch.utils.data.Dataset`, para manejar nuestros conjuntos de datos.

Notar que hasta ahora nuestros ejemplos tienen este formato:

`[ (token1,label1), (token2,label2), (token3,label3), ....]`

Necesitamos que `POSDataset` tenga los atributos `X` e `y`, donde `X` sea una lista de tensores  de tipo long con los identificadores de los tokens de cada ejemplo: `[num_token1, num_token2, num_token3,...]` e `y` sea una lista de tensores  de tipo long con los indentificadores de las labels de cada ejemplo: `[num_label1, num_label2, num_label3,...]`.

Completar la función `transform_data` para transformar las particiones al formato necesario. Esta función se utiliza luego en la definición de `POSDataset`.

In [None]:
from torch.utils.data import Dataset

def transform_data(partition):
  # Su código aquí
  ...
  return X, y

class POSDataset(Dataset):
    def __init__(self, partition):
      self.X, self.y = transform_data(partition)

      # Ordenamos las oraciones por largo
      self.X.sort(key = lambda x:len(x))
      self.y.sort(key = lambda x:len(x))

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = POSDataset(train)
dev_dataset = POSDataset(dev)
test_dataset = POSDataset(test)

print("train",len(train_dataset))
print("dev",len(dev_dataset))
print("test",len(test_dataset))

Notar que en el constructor de `POSDataset` ordenamos las oraciones por largo. Esto es para que dentro de un mismo batch queden oraciones de tamaños lo más similares posible.

Usaremos la clase `torch.utils.data.Dataloader` para crear los minibatches.

Notar que para poder crear un tensor a partir de un minibatch, todas las oraciones del minibatch deben tener el mismo largo. Para eso utilizaremos el token especial PAD y el label especial NONE que definimos antes.

Para hacer la transformación necesaria a cada minibatch, el constructor de `Dataloader` recibe un parámetro `collate_fn` que nos permite crear cada minibatch manualmente, dados los ejemplos que contendrá el minibatch.

Escriba la función `collate_fn` que toma como entrada los ejemplos de un minibatch (una lista de pares de tensores `(num_tokens, num_labels)`), y retorne los tensores `X` e `y` (de rango 2, matrices), donde `X` tenga los identificadores de los tokens (completando con el token PAD), e `y` tenga los identificadores de las labels (completando con el label NONE).

Puede utilizar el método `nn.utils.rnn.pad_sequence` para realizar el padding de los ejemplos (https://pytorch.org/docs/stable/generated/torch.nn.utils.rnn.pad_sequence.html).

In [None]:
def collate_fn(batch):
  # Su código aquí
  ...
  return X, y

In [None]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
dev_loader = DataLoader(dev_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

print("train", next(iter(train_loader))[0].shape, next(iter(train_loader))[0].dtype)
print("dev", next(iter(dev_loader))[0].shape, next(iter(dev_loader))[0].dtype)
print("test", next(iter(test_loader))[0].shape, next(iter(test_loader))[0].dtype)

Llegó la hora de implementar la red LSTM. Esta red debe tener:
- Una capa de embeddings (torch.nn.Embeddings), la que inicializaremos con la matriz de embeddings que definimos más arriba (poner requires_grad en False para que el entrenamiento sea más rápido, no queremos modificar los pesos de los embeddings)
- Una capa LSTM (torch.nn.LSTM), su entrada es la dimensión de los embeddings, y el tamaño de la capa oculta debe quedar como parámetro
- Una capa lineal (torch.nn.Linear) que va de la capa oculta de la LSTM a la salida final, que tiene tamaño len(labels)


In [None]:
class LSTMTagger(nn.Module):

    def __init__(self, ....):
        super(LSTMTagger, self).__init__()
        # su código aquí

    def forward(self, batch):
        # su código aquí
        return tag_scores


Escribir una función para evaluar la red. Debe tomar el dataloader a evaluar, ejecutar la red y calcular el accuracy.

Puede utilizar la biblioteca `torchmetrics`, usando la clase Accuracy de la siguiente manera:

`acc = tm.Accuracy(task="multiclass",num_classes=len(labels),ignore_index=labeldict['NONE'])`

In [None]:
def evaluate(net,dataloader):
    # su código aquí
    acc = ...
    print("accuracy",acc)
    return acc


Instanciar y entrenar la red. Utilizaremos la función de pérdida CrossEntropyLoss, notar que para eso es necesario que lo que devuelva la red sea la salida de la última capa, sin aplicar la función de activación Softmax.

In [None]:
loss_function = nn.CrossEntropyLoss(ignore_index=labeldict['NONE'])

lstmnet = LSTMTagger(...) # los parámetros apropiados
evaluate(lstmnet,dev_loader) # seguro va a dar muy cerca de 0 al principio!

optimizer = optim.SGD(lstmnet.parameters(), lr=0.1)

for epoch in range(...): # elegir
    for word_batch,label_batch in train_loader:
        # su código aquí, se debe:
        # - inicializar los gradientes
        # - ejecutar la red sobre el batch (paso forward)
        # - usar la loss function para comparar la salida de la red con los labels esperados
        # - calcular los gradientes (paso backward)
        # - realizar el descenso por gradiente

    # ya que estamos, imprimimos la performance sobre dev luego de completada una epoch
    evaluate(lstmnet,dev_loader)



Finalmente, evaluar el resultado de la mejor red encontrada sobre el corpus de test.

In [None]:
evaluate(lstmnet,test_loader)