본문 바로가기
자연어 처리(NLP)/모델(Model)

Transformer를 이해하고 구현해보자! (2)

by Kaya_Alpha 2022. 1. 21.

이전글 : Transformer를 이해하고 구현해보자! (1)

다음글 : Transformer를 이용한 번역모델 구축


저번 포스팅에서는 Transformer의 구성요소인 Positional Encoding, Multi-Head Attention, LayerNorm, Feed-Forward에 대해 알아보고 구현해보았습니다.

 

이번 포스팅에서는 저번 포스팅에 이어 Encoder와 Decoder를 구현한 뒤, 전체 모델인 Transformer를 구현해보겠습니다. 사실 구현에 필요한 핵심적인 부분은 모두 저번 포스트에서 다루었기 때문에 이번 포스트에서는 전에 구현한 클래스들을 이어붙이는 작업이 주를 이룹니다.

 

+) 다음 포스팅에서는 이번에 구현한 Transformer 모델을 이용하여 한-영 번역기 모델도 학습을 진행해서 결과도 한번 보도록 하겠습니다!

1. Encoder 구현하기

Encoder Layer

위 그림에서 빨간색 박스로 친 부분이 Encoder를 나타냅니다.

 

그림에서 볼 수 있듯이, Encoder Layer는 Multi-Head Attention -> Add&Norm -> Feed-Forward -> Add&Norm 순으로 네트워크가 이어지는 모습을 보여줍니다.

 

또한 하나의 Encoder Layer를 사용하지 않고 그림에서는 N개의 Encoder Layer를 사용합니다.

 

따라서 구현시에는 하나의 Encoder Layer를 구현한 뒤 N개의 Encoder가 쌓여있는 Encoder를 구현하는 방식으로 구현할 수 있습니다.

 

각 코드의 라인별 주석을 보시면 구조를 이해하는데 도움이 될 것 같습니다.

 

1.1 Encoder Layer 구현 코드

 

아래 코드는 위의 그림을 기반으로 Encoder Layer를 구현한 코드입니다.

class EncoderLayer(nn.Module):
    
    def __init__(self,d_model,ffn_hidden,n_head,drop_prob):
        super(EncoderLayer,self).__init__()
        
        #Multi-Head Attention
        self.attention = MultiHeadAttention(d_model,n_head)
        
        #Layer Normalization(Multi-Head Attention ->)
        self.norm1 = LayerNorm(d_model = d_model)
        self.dropout1 = nn.Dropout(p = drop_prob)
        
        #Feed-Forward
        self.ffn = PositionwiseFeedForward(d_model = d_model,hidden = ffn_hidden,drop_prob = drop_prob)
        
        #Layer Normalization(FFN ->)
        self.norm2= LayerNorm(d_model = d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)
    
    def forward(self,x,src_mask):
        _x = x
        
        #1. Compute Multi-Head Attention
        x = self.attention(q= x,k= x,v= x,mask = src_mask)
        
        #2. Compute add & norm
        x = self.norm1(x + _x)
        x = self.dropout1(x)
        
        # 3. Compute Feed-Forward Network
        _x = x
        x = self.ffn(x)
        
        # 4. Compute add & norm
        x = self.norm2(x + _x)
        x = self.dropout2(x)
        
        return x

1.2 Encoder 구현 코드 

 

이렇게 하나의 Encoder Layer를 구현하였으면, 다음으로는 N개의 Encoder Layer를 이어붙일 차례입니다.

여기서 N은 사람마다 다르게 구성할 수 있으므로 n_layers 라는 변수를 이용하여 구현합니다.

 

또한, Encoder에 입력으로 들어가는 문장은 Embedding을 거치고 Positional Encoding과 합쳐서 들어가기 때문에, 이 부분도 코드에 추가해줍니다.

class Encoder(nn.Module):
    
    def __init__(self,enc_voc_size,max_len,d_model,ffn_hidden,n_head,n_layers,
                drop_prob,device):
        super().__init__()
        
        #Embedding
        self.embed = nn.Embedding(num_embeddings = len(kor_text.vocab),embedding_dim = d_model,padding_idx = 1)
        
        #Positional Encoding
        self.pe = PositionalEncoding(max_len = max_len,d_model = d_model,device = device)
        
        #Add Multi layers
        self.layers = nn.ModuleList([EncoderLayer(d_model = d_model,
                                                 ffn_hidden = ffn_hidden,
                                                 n_head = n_head,
                                                 drop_prob = drop_prob)
                                    for _ in range(n_layers)])
        
    def forward(self,x,src_mask):
    	#Compute Embedding
        x = self.emb(x) #sentence -> vector
        
        #Get Positional Encoding
        x_pe = self.pe(x)
        
        #Embedding + Positional Encoding
        x = x + x_pe
        
        #Compute Encoder layers
        for layer in self.layers:
            x = layer(x,src_mask)
        
        #Return encoder output
        return x

2. Decoder 구현하기

Encoder와는 다르게 decoder에서는 총 2번의 attention 계산이 이루어집니다.

 

Decoder의 첫번째 attention계산에서는 Decoder 입력만으로 attention이 계산됩니다(Encoder와 동일)

 

하지만, 두번째 attention에서는 첫번째 attention 계산과는 다르게 첫번째 attention 결과 값이 두번째 attention 계산의 Query로 들어가며, Key와 Value값은 마지막 Encoder의 출력값이 들어갑니다.

 

나머지 add&norm 부분과 Feed-forward는 encoder에서 수행한 것과 동일한 메커니즘으로 수행됩니다

 

2.1 Decoder Layer 구현 코드

 

이 부분을 코드로 구현하면 아래와 같습니다.

class DecoderLayer(nn.Module):
    
    def __init__(self,d_model,ffn_hidden,n_head,drop_prob):
        super(DecoderLayer,self).__init__()
        
        #self attention(only Decoder input)
        self.self_attention = MultiHeadAttention(d_model = d_model,n_head = n_head)
        
        #layer normalization(first)
        self.norm1 = LayerNorm(d_model = d_model)
        #dropout(first)
        self.dropout1 = nn.Dropout(p=drop_prob)
        
        #attention(encoder + decoder)
        self.enc_dec_attention = MultiHeadAttention(d_model = d_model,n_head = n_head)
        
        #layer normalization(second)
        self.norm2 = LayerNorm(d_model = d_model)
        #dropout(second)
        self.dropout2 = nn.Dropout(p=drop_prob)
        
        #Feed-Forward
        self.ffn = PositionwiseFeedForward(d_model = d_model,hidden = ffn_hidden,
                                           drop_prob = drop_prob)
        #Layer normalization(third)
        self.norm3 = LayerNorm(d_model = d_model)
        #dropout(third)
        self.dropout3 = nn.Dropout(p= drop_prob)
        
    def forward(self,dec,enc,trg_mask,src_mask):
        
        _x = dec
        #Compute self-attention
        x = self.self_attention(q = dec,k = dec,v = dec,mask = trg_mask)
        
        #Compute add & norm
        x = self.norm1(x + _x)
        x=  self.dropout1(x)
        
        if enc is not None:  #encoder의 출력값이 있다면 (없으면 FFN으로 넘어감)
            _x = x
            
            #Compute encoder - decoder attention
            #Query(q) : decoder attention output
            #Key(k) : Encoder output
            #Value(v) : Encoder output
            x = self.enc_dec_attention(q = x,k = enc,v = enc,mask = src_mask)
            
            #Compute add & norm
            x = self.norm2(x + _x)
            x = self.dropout2(x)
            
        _x = x
        
        #Compute FFN
        x = self.ffn(x)
        
        #Compute add & norm
        x = self.norm3(x + _x)
        x = self.dropout3(x)
        
        return x

2.2 Decoder 구현 코드

 

다음으로 Encoder를 구현한것과 같은 방식으로, 위에서 구현한 하나의 Decoder layer를 여러개로 이어서 하나의 Decoder 객체로 만들어줍니다.

또한 Encoder를 구현할 때와 마찬가지로, Embedding 부분과 Positional Encoding 부분도 추가해줍니다.

class Decoder(nn.Module):
    def __init__(self,dec_voc_size,max_len,d_model,ffn_hidden,n_head,n_layers,
                drop_prob,device):
        super().__init__()
        
        #Embedding
        self.embed = nn.Embedding(num_embeddings = len(eng_text.vocab),embedding_dim = d_model,padding_idx = 1)
        
        #Positional Encoding
        self.pe = PositionalEncoding(max_len = 50,d_model = d_model,device = 'cuda')
        
        #Add decoder layers
        self.layers = nn.ModuleList([DecoderLayer(d_model = d_model,
                                                 ffn_hidden = ffn_hidden,
                                                 n_head = n_head,
                                                 drop_prob = drop_prob)
                                    for _ in range(n_layers)])
        
        #Linear
        self.linear = nn.Linear(d_model,dec_voc_size)
    
    def forward(self,trg,src,trg_mask,src_mask):
        
        #Compute Embedding
        trg = self.embed(trg)
        
        #Get Positional Encoding
        trg_pe = self.pe(trg)
        
        #Embedding + Positional Encoding
        trg = trg + trg_pe
        
        #Compute Decoder layers
        for layer in self.layers:
            trg = layer(trg,src,trg_mask,src_mask)
        
        #pass to LM head
        output = self.linear(trg)
        
        return output

3. Transformer 구현하기

 

이제 마지막으로 위에서 구현한 Encoder와 Decoder를 하나의 모델로 합쳐 주면 됩니다!

+) 추가로 encoder와 decoder에 들어갈 mask도 만들어서 입력에 넣도록 합니다.

 

class Transformer(nn.Module):
    
    def __init__(self,src_pad_idx,trg_pad_idx,trg_sos_idx,enc_voc_size,dec_voc_size,d_model,n_head,max_len,
                ffn_hidden,n_layers,drop_prob,device):
        super().__init__()
        #Get <PAD> idx
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.trg_sos_idx = trg_sos_idx
        
        #Encoder
        self.encoder = Encoder(enc_voc_size = enc_voc_size,
                              max_len = max_len,
                              d_model = d_model,
                              ffn_hidden = ffn_hidden,
                              n_head = n_head,
                              n_layers = n_layers,
                              drop_prob = drop_prob,
                              device = device)
        
        #Decoder
        self.decoder = Decoder(dec_voc_size = dec_voc_size,
                              max_len = max_len,
                              d_model = d_model,
                              ffn_hidden = ffn_hidden,
                              n_head = n_head,
                              n_layers = n_layers,
                              drop_prob = drop_prob,
                              device = device)
        self.device = device
    
    def make_pad_mask(self,q,k):
    
    	#Padding부분은 attention연산에서 제외해야하므로 mask를 씌워줘서 계산이 되지 않도록 한다.
        
        len_q,len_k = q.size(1),k.size(1)
        print(len_k)
        #batch_size x 1 x 1 x len_k
        k = k.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        print(k.shape)
        # batch_size x1 x len_1 x len_k
        k = k.repeat(1,1,len_q,1)
        
        #batch_size x 1 x len_q x 1
        q = q.ne(self.src_pad_idx).unsqueeze(1).unsqueeze(3)
        #batch_size x 1 x len_q x len_k
        q = q.repeat(1,1,1,len_k)
        
        mask = k & q
        
        return mask
    
    def make_no_peak_mask(self,q,k):
    	
        #Decoder 부분에서 t번째 단어를 예측하기 위해 입력으로 t-1번째 단어까지 넣어야 하므로 나머지 부분을 masking처리 한다.
        #만약 t번째 단어를 예측하는데 이미 decoder에 t번째 단어가 들어간다면?? => 답을 이미 알고 있는 상황..
        #따라서 Seq2Seq 모델에서 처럼 t번째 단어를 예측하기 위해서 t-1번째 단어까지만 입력될 필요가 있음
        #(나머지 t,t+1,...,max_len)까지 단어는 t번째 단어를 예측하는데 전혀 필요하지 않음 => Masking!!
        len_q,len_k = q.size(1),k.size(1)
        
        #len_q x len_k (torch.tril = 하삼각행렬)
        mask = torch.tril(torch.ones(len_q,len_k)).type(torch.BoolTensor).to(self.device)
        
        return mask
    
    def forward(self,src,trg):
    	
        #Get Mask
        src_mask = self.make_pad_mask(src,src)
        src_trg_mask = self.make_pad_mask(trg,src)
        trg_mask = self.make_pad_mask(trg,trg) * self.make_no_peak_mask(trg,trg)
        
        #Compute Encoder
        enc_src = self.encoder(src,src_mask)
        
        #Compute Decoder
        output = self.decoder(trg,enc_src,trg_mask,src_trg_mask)
        
        return output

 

자 이제 이렇게 Transformer 모델을 만들어보았습니다.

 

다음 포스팅에서는 이렇게 만든 transformer 모델을 이용하여 한-영 번역기를 만들어서 실험해보고 성능이 어느정도 나오지는지 확인해 보겠습니다!

 

*잘못된 부분에 대한 지적 및 질문은 언제나 환영입니다!

 

이전 글 : Transformer를 이해하고 구현해보자! (1)

다음 글 : Transformer를 이용한 번역모델 구축