[Code Review] Transformer Code Reivew


Attention is All you need 실습

데이터 전처리

image

-pytorch 자연어 처리 모델은 다양한 라이브러리를 포함하고 있다 ex) 토큰화 모듈인 spacy

image

  • spacy모듈의 토크나이저를 사용하여 문장 시퀀스를 토큰화 하였다 이후에 start token 과 end token을 추가해줄 것이다.

image

Field 라이브러리를 사용하여 시퀀스 데이터셋을 어떻게 전처리할 것인지 정의해준다. 이를 torchtext의 Multi30k데이터셋에 적용시킨다.

image

최소 2번 이상 나온 단어들만 추려 index화 시킨다 총 단어 갯수를 SRC와 TRG에서 구할 수 있다.

image

위에 예시처럼 각 해당하는 단어가 어떤 인덱스 인지 vocab을 활용해 알 수 있다.

image

image


모델정의

  • MultiheadAttentionLayer ``` import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module): def init(self, hidden_dim, n_heads, dropout_ratio, device): super().init()

    assert hidden_dim % n_heads == 0 # q,k,v의 차원은 hidden_dim / n_head

    self.hidden_dim = hidden_dim # 임베딩 차원
    self.n_heads = n_heads # 헤드(head)의 개수: 서로 다른 어텐션(attention) 컨셉의 수
    self.head_dim = hidden_dim // n_heads # 각 헤드(head)에서의 임베딩 차원

    self.fc_q = nn.Linear(hidden_dim, hidden_dim) # Query 값에 적용될 FC 레이어
    self.fc_k = nn.Linear(hidden_dim, hidden_dim) # Key 값에 적용될 FC 레이어
    self.fc_v = nn.Linear(hidden_dim, hidden_dim) # Value 값에 적용될 FC 레이어

    self.fc_o = nn.Linear(hidden_dim, hidden_dim)

    self.dropout = nn.Dropout(dropout_ratio)

    self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
    #모델에서 사용하는 input Tensor들은 input = input.to(device)를 호출해야한다.

def forward(self, query, key, value, mask = None):

    batch_size = query.shape[0]

    # query: [batch_size, query_len, hidden_dim]
    # key: [batch_size, key_len, hidden_dim]
    # value: [batch_size, value_len, hidden_dim]
 
    Q = self.fc_q(query)
    K = self.fc_k(key)
    V = self.fc_v(value)  

    # Q: [batch_size, query_len, hidden_dim]
    # K: [batch_size, key_len, hidden_dim]
    # V: [batch_size, value_len, hidden_dim]

    # hidden_dim → n_heads X head_dim 형태로 변형
    # n_heads(h)개의 서로 다른 어텐션(attention) 컨셉을 학습하도록 유도
    Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
    K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
    V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
    #permute함수는 차원의 순서를 바꿔 주는 역할 
    # 여기서는 hidden_dim 값을 head_dim * head로 바꿔줬는데 이는 아까 각각의 linear
    #를 여러번 거치고 들어가는 것보다 효율적이다

    # Q: [batch_size, n_heads, query_len, head_dim]
    # K: [batch_size, n_heads, key_len, head_dim]
    # V: [batch_size, n_heads, value_len, head_dim]

    # Attention Energy 계산
    energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

    # energy: [batch_size, n_heads, query_len, key_len]

    # 마스크(mask)를 사용하는 경우
    if mask is not None:
        # 마스크(mask) 값이 0인 부분을 -1e10으로 채우기
        #시퀀스를 읽는데 필요없는 부분은 소프트맥스하기전에 0에 가까운 값으로 만들어줌
        energy = energy.masked_fill(mask==0, -1e10)

    # 어텐션(attention) 스코어 계산: 각 단어에 대한 확률 값
    attention = torch.softmax(energy, dim=-1)

    # attention: [batch_size, n_heads, query_len, key_len]

    # 여기에서 Scaled Dot-Product Attention을 계산
    x = torch.matmul(self.dropout(attention), V)

    # x: [batch_size, n_heads, query_len, head_dim]

    x = x.permute(0, 2, 1, 3).contiguous()

    # x: [batch_size, query_len, n_heads, head_dim]

    x = x.view(batch_size, -1, self.hidden_dim) #다시 원래의 차원인 hidden_dim으로 바꿔서 
    #일자로 늘린다.

    # x: [batch_size, query_len, hidden_dim]

    x = self.fc_o(x)

    # x: [batch_size, query_len, hidden_dim]

    return x, attention ```
  • FeedforwardLayer

MultiheadAttention이후 학습이 원할하게 진행되기 위해 만들어준 레이어이다. 이때 입력과 출력의 값이 같도록하여 이러한 형태의 블록을 계속 적용할 수 있게 한다.

class PositionwiseFeedforwardLayer(nn.Module): #하나의 레이어를 구성하는데 있어 필요한것 
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim) #아까와 같이 입력과 출력의 값이 같게 해준다

        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):

        # x: [batch_size, seq_len, hidden_dim]

        x = self.dropout(torch.relu(self.fc_1(x)))

        # x: [batch_size, seq_len, pf_dim]

        x = self.fc_2(x)

        # x: [batch_size, seq_len, hidden_dim]

        return x
  • Encoder Layer

하나의 인코더 레이어에 대해 정의한다. 입력과 출력의 차원이 같으므로 이러한 특징을 이용해 트랜스포머의 인코더는 인코더 레이어를 여러번 중첩해 사용할 수 있다.

class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        #이러한 레이어들이 Residual Connect되어 있는 구조이다 
        self.dropout = nn.Dropout(dropout_ratio)

    # 하나의 임베딩이 복제되어 Query, Key, Value로 입력되는 방식
    def forward(self, src, src_mask):

        # src: [batch_size, src_len, hidden_dim]
        # src_mask: [batch_size, src_len]

        # self attention
        # 필요한 경우 마스크(mask) 행렬을 이용하여 어텐션(attention)할 단어를 조절 가능
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src)) #더해줌으로써 residual connection을 수행한다

        # src: [batch_size, src_len, hidden_dim]

        # position-wise feedforward
        _src = self.positionwise_feedforward(src)

        # dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))#더해줌으로써 residual connection을 수행한다

        # src: [batch_size, src_len, hidden_dim]

        return src
  • Decoder Layer

    하나의 디코더 레이어에 대해 입력과 출력의 차원이 같고 소스 문장의 토큰에 대하여 마스크 값을 0으로 설정해준다 -> pad토큰은 문장길이를 채워줄 아무의미 없는 단어이므로

    타겟 문장에서는 각 단어는 다음 단어가 무엇인지 알 수 없도록 마스크를 사용한다.

    디코더는 한 블록안에 어텐션이 두번 들어가는데 하나는 self-attention이고 또 하나는 encoder-decoder attention이다.

    class DecoderLayer(nn.Module):
      def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
          super().__init__()
    
          self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
          self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim)
          self.ff_layer_norm = nn.LayerNorm(hidden_dim)
            
          self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
          self.encoder_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
          self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
          self.dropout = nn.Dropout(dropout_ratio)
    
      # 인코더의 출력 값(enc_src)을 어텐션(attention)하는 구조
      def forward(self, trg, enc_src, trg_mask, src_mask):
    
          # trg: [batch_size, trg_len, hidden_dim]
          # enc_src: [batch_size, src_len, hidden_dim]
          # trg_mask: [batch_size, trg_len]
          # src_mask: [batch_size, src_len]
    
          # self attention
          # 자기 자신에 대하여 어텐션(attention)
          _trg, _ = self.self_attention(trg, trg, trg, trg_mask)
    
          # dropout, residual connection and layer norm
          trg = self.self_attn_layer_norm(trg + self.dropout(_trg))
    
          # trg: [batch_size, trg_len, hidden_dim]
    
          # encoder attention
          # 디코더의 쿼리(Query)를 이용해 인코더를 어텐션(attention)
          _trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
    
          # dropout, residual connection and layer norm
          trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))
    
          # trg: [batch_size, trg_len, hidden_dim]
    
          # positionwise feedforward
          _trg = self.positionwise_feedforward(trg)
    
          # dropout, residual and layer norm
          trg = self.ff_layer_norm(trg + self.dropout(_trg))
    
          # trg: [batch_size, trg_len, hidden_dim]
          # attention: [batch_size, n_heads, trg_len, src_len]
    
          return trg, attention
      
    
  • Decoder Architecture

    전체 디코더 아키텍쳐를 정의한다

    class Decoder(nn.Module):
      def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
          super().__init__()
    
          self.device = device
    
          self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
          self.pos_embedding = nn.Embedding(max_length, hidden_dim)
    
          self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
    
          self.fc_out = nn.Linear(hidden_dim, output_dim)
    
          self.dropout = nn.Dropout(dropout_ratio)
    
          self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
    
      def forward(self, trg, enc_src, trg_mask, src_mask):
    
          # trg: [batch_size, trg_len]
          # enc_src: [batch_size, src_len, hidden_dim]
          # trg_mask: [batch_size, trg_len]
          # src_mask: [batch_size, src_len]
    
          batch_size = trg.shape[0]
          trg_len = trg.shape[1]
    
          pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
    
          # pos: [batch_size, trg_len]
    
          trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))
    
          # trg: [batch_size, trg_len, hidden_dim]
    
          for layer in self.layers:
              # 소스 마스크와 타겟 마스크 모두 사용
              trg, attention = layer(trg, enc_src, trg_mask, src_mask)
    
          # trg: [batch_size, trg_len, hidden_dim]
          # attention: [batch_size, n_heads, trg_len, src_len]
    
          output = self.fc_out(trg)
    
          # output: [batch_size, trg_len, output_dim]
    
          return output, attention
    
  • Transformer 아키텍쳐

  class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    # 소스 문장의 <pad> 토큰에 대하여 마스크(mask) 값을 0으로 설정
    def make_src_mask(self, src):

        # src: [batch_size, src_len]

        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)

        # src_mask: [batch_size, 1, 1, src_len]

        return src_mask

    # 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록(이전 단어만 보도록) 만들기 위해 마스크를 사용
    def make_trg_mask(self, trg):

        # trg: [batch_size, trg_len]

        """ (마스크 예시)
        1 0 0 0 0
        1 1 0 0 0
        1 1 1 0 0
        1 1 1 0 0
        1 1 1 0 0
        """
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)

        # trg_pad_mask: [batch_size, 1, 1, trg_len]

        trg_len = trg.shape[1]

        """ (마스크 예시)
        1 0 0 0 0
        1 1 0 0 0
        1 1 1 0 0
        1 1 1 1 0
        1 1 1 1 1
        """
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool()

        # trg_sub_mask: [trg_len, trg_len]

        trg_mask = trg_pad_mask & trg_sub_mask

        # trg_mask: [batch_size, 1, trg_len, trg_len]

        return trg_mask

    def forward(self, src, trg):

        # src: [batch_size, src_len]
        # trg: [batch_size, trg_len]

        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        # src_mask: [batch_size, 1, 1, src_len]
        # trg_mask: [batch_size, 1, trg_len, trg_len]

        enc_src = self.encoder(src, src_mask)

        # enc_src: [batch_size, src_len, hidden_dim]

        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)

        # output: [batch_size, trg_len, output_dim]
        # attention: [batch_size, n_heads, trg_len, src_len]

        return output, attention
  

학습

파라미터 설정

INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
HIDDEN_DIM = 256
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1
  SRC_PAD_IDX = SRC.vocab.stoi[SRC.pad_token]
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]

# 인코더(encoder)와 디코더(decoder) 객체 선언
enc = Encoder(INPUT_DIM, HIDDEN_DIM, ENC_LAYERS, ENC_HEADS, ENC_PF_DIM, ENC_DROPOUT, device)
dec = Decoder(OUTPUT_DIM, HIDDEN_DIM, DEC_LAYERS, DEC_HEADS, DEC_PF_DIM, DEC_DROPOUT, device)

# Transformer 객체 선언
model = Transformer(enc, dec, SRC_PAD_IDX, TRG_PAD_IDX, device).to(device)

모델의 가중치 파라미터 초기화

모델의 가중치 초기화 방법인 xavier_uniform을 사용한다.

  def initialize_weights(m):
    if hasattr(m, 'weight') and m.weight.dim() > 1:
      nn.init.xavier_uniform_(m.weight.data)
  model.apply(initialize_weights)
  

optimizer설정

import torch.optim as optim

LEARNING_RATE = 0.0005
optimizer = torch.optim.Adam(model.parameters() , lr = LEARNING_RATE)

criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)

학습 함수

 
# 모델 학습(train) 함수
def train(model, iterator, optimizer, criterion, clip):
    model.train() # 학습 모드
    epoch_loss = 0

    # 전체 학습 데이터를 확인하며
    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg

        optimizer.zero_grad()

        # 출력 단어의 마지막 인덱스(<eos>)는 제외
        # 입력을 할 때는 <sos>부터 시작하도록 처리
        output, _ = model(src, trg[:,:-1])

        # output: [배치 크기, trg_len - 1, output_dim]
        # trg: [배치 크기, trg_len]

        output_dim = output.shape[-1]

        output = output.contiguous().view(-1, output_dim)
        # 출력 단어의 인덱스 0(<sos>)은 제외
        trg = trg[:,1:].contiguous().view(-1)

        # output: [배치 크기 * trg_len - 1, output_dim]
        # trg: [배치 크기 * trg len - 1]

        # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
        loss = criterion(output, trg)
        loss.backward() # 기울기(gradient) 계산

        # 기울기(gradient) clipping 진행
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        # 파라미터 업데이트
        optimizer.step()

        # 전체 손실 값 계산
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)
  
  

평가 함수

  # 모델 평가(evaluate) 함수
def evaluate(model, iterator, criterion):
    model.eval() # 평가 모드
    epoch_loss = 0

    with torch.no_grad():
        # 전체 평가 데이터를 확인하며 학습을 멈춰둠
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg

            # 출력 단어의 마지막 인덱스(<eos>)는 제외
            # 입력을 할 때는 <sos>부터 시작하도록 처리
            output, _ = model(src, trg[:,:-1])

            # output: [배치 크기, trg_len - 1, output_dim]
            # trg: [배치 크기, trg_len]

            output_dim = output.shape[-1]

            output = output.contiguous().view(-1, output_dim)
            # 출력 단어의 인덱스 0(<sos>)은 제외
            trg = trg[:,1:].contiguous().view(-1)

            # output: [배치 크기 * trg_len - 1, output_dim]
            # trg: [배치 크기 * trg len - 1]

            # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
            loss = criterion(output, trg)

            # 전체 손실 값 계산
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)
  

학습

  import math
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs
  import time
import math
import random

N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() # 시작 시간 기록

    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() # 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'transformer_german_to_english.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')
                                    

학습 결과

    Epoch: 01 | Time: 0m 41s
	Train Loss: 4.244 | Train PPL: 69.668
	Validation Loss: 3.050 | Validation PPL: 21.120
Epoch: 02 | Time: 0m 40s
	Train Loss: 2.838 | Train PPL: 17.086
	Validation Loss: 2.329 | Validation PPL: 10.269
Epoch: 03 | Time: 0m 41s
	Train Loss: 2.254 | Train PPL: 9.526
	Validation Loss: 1.990 | Validation PPL: 7.315
Epoch: 04 | Time: 0m 40s
	Train Loss: 1.897 | Train PPL: 6.667
	Validation Loss: 1.824 | Validation PPL: 6.194
Epoch: 05 | Time: 0m 41s
	Train Loss: 1.642 | Train PPL: 5.166
	Validation Loss: 1.713 | Validation PPL: 5.544
Epoch: 06 | Time: 0m 40s
	Train Loss: 1.453 | Train PPL: 4.275
	Validation Loss: 1.651 | Validation PPL: 5.214
Epoch: 07 | Time: 0m 40s
	Train Loss: 1.297 | Train PPL: 3.657
	Validation Loss: 1.628 | Validation PPL: 5.093
Epoch: 08 | Time: 0m 40s
	Train Loss: 1.169 | Train PPL: 3.219
	Validation Loss: 1.623 | Validation PPL: 5.069
Epoch: 09 | Time: 0m 40s
	Train Loss: 1.060 | Train PPL: 2.885
	Validation Loss: 1.617 | Validation PPL: 5.037
Epoch: 10 | Time: 0m 40s
	Train Loss: 0.962 | Train PPL: 2.617
	Validation Loss: 1.637 | Validation PPL: 5.140                                

학습된 모델저장

# 학습된 모델 저장
from google.colab import files

files.download('transformer_german_to_english.pt')
                                   

저장된 모델 평가

model.load_state_dict(torch.load('transformer_german_to_english.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):.3f}')