# 百闻不如一码！手把手教你用Python搭一个Transformer

Feed Forward层

```class Embedder(nn.Module):
def __init__(self, vocab_size, d_model):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
def forward(self, x):
return self.embed(x)```

Vasmari用下面的函数创建位置特异性常量来解决这类问题：

```class PositionalEncoder(nn.Module):
def __init__(self, d_model, max_seq_len = 80):
super().__init__()
self.d_model = d_model

# create constant 'pe' matrix with values dependant on
# pos and i
pe = torch.zeros(max_seq_len, d_model)
for pos in range(max_seq_len):
for i in range(0, d_model, 2):
pe[pos, i] = \
math.sin(pos / (10000 ** ((2 * i)/d_model)))
pe[pos, i + 1] = \
math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))

pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)

def forward(self, x):
# make embeddings relatively larger
x = x * math.sqrt(self.d_model)
seq_len = x.size(1)
x = x + Variable(self.pe[:,:seq_len], \
return x```

```batch = next(iter(train_iter))
input_seq = batch.English.transpose(0,1)

# creates mask with 0s wherever there is padding in the input

```# create mask as before
target_seq = batch.French.transpose(0,1)
size = target_seq.size(1) # get seq_len for matrix

k=1).astype('uint8')

V,K和Q分别代表“key”、“value”和“query”，这些是注意力函数的相关术语，但我不觉得解释这些术语会对理解这个模型有任何帮助。

```class MultiHeadAttention(nn.Module):
def __init__(self, heads, d_model, dropout = 0.1):
super().__init__()
self.d_model = d_model
self.q_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
self.out = nn.Linear(d_model, d_model)```
```def forward(self, q, k, v, mask=None):
bs = q.size(0)

# perform linear operation and split into h heads

k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
v = self.v_linear(v).view(bs, -1, self.h, self.d_k)

# transpose to get dimensions bs * h * sl * d_model

k = k.transpose(1,2)
q = q.transpose(1,2)
v = v.transpose(1,2)
# calculate attention using function we will define next
scores = attention(q, k, v, self.d_k, mask, self.dropout)
# concatenate heads and put through final linear layer
concat = scores.transpose(1,2).contiguous()\
.view(bs, -1, self.d_model)
output = self.out(concat)
return output```

```class Norm(nn.Module):
def __init__(self, d_model, eps = 1e-6):
super().__init__()

self.size = d_model
# create two learnable parameters to calibrate normalisation
self.alpha = nn.Parameter(torch.ones(self.size))
self.bias = nn.Parameter(torch.zeros(self.size))
self.eps = eps
def forward(self, x):
norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \
/ (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
return norm```

```# build an encoder layer with one multi-head attention layer and one # feed-forward layer

class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout = 0.1):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.ff = FeedForward(d_model)
self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)

x2 = self.norm_1(x)
x2 = self.norm_2(x)
x = x + self.dropout_2(self.ff(x2))
return x

# build a decoder layer with two multi-head attention layers and
# one feed-forward layer

class DecoderLayer(nn.Module):
super().__init__()
self.norm_1 = Norm(d_model)
self.norm_2 = Norm(d_model)
self.norm_3 = Norm(d_model)

self.dropout_1 = nn.Dropout(dropout)
self.dropout_2 = nn.Dropout(dropout)
self.dropout_3 = nn.Dropout(dropout)

self.ff = FeedForward(d_model).cuda()

x2 = self.norm_1(x)
x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
x2 = self.norm_2(x)
x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs,
x2 = self.norm_3(x)
x = x + self.dropout_3(self.ff(x2))
return x

# We can then build a convenient cloning function that can generate multiple layers:

def get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])```

```class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model)
self.norm = Norm(d_model)
x = self.embed(src)
x = self.pe(x)
for i in range(N):
return self.norm(x)

class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads):
super().__init__()
self.N = N
self.embed = Embedder(vocab_size, d_model)
self.pe = PositionalEncoder(d_model)
self.norm = Norm(d_model)
x = self.embed(trg)
x = self.pe(x)
for i in range(self.N):
return self.norm(x)```

Transformer模型构建完毕！

```class Transformer(nn.Module):
def __init__(self, src_vocab, trg_vocab, d_model, N, heads):
super().__init__()
self.encoder = Encoder(src_vocab, d_model, N, heads)
self.decoder = Decoder(trg_vocab, d_model, N, heads)
self.out = nn.Linear(d_model, trg_vocab)
output = self.out(d_output)
return output

# we don't perform softmax on the output as this will be handled
# automatically by our loss function```

```d_model = 512
N = 6
src_vocab = len(EN_TEXT.vocab)
trg_vocab = len(FR_TEXT.vocab)

model = Transformer(src_vocab, trg_vocab, d_model, N, heads)

for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)

# this code is very important! It initialises the parameters with a
# range of values that stops the signal fading or getting too big.
# See this blog for a mathematical explanation.

optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)```

```def train_model(epochs, print_every=100):
model.train()

start = time.time()
temp = start

total_loss = 0

for epoch in range(epochs):

for i, batch in enumerate(train_iter):

src = batch.English.transpose(0,1)
trg = batch.French.transpose(0,1)

# the French sentence we input has all words except
# the last, as it is using each word to predict the next

trg_input = trg[:, :-1]

# the words we are trying to predict

targets = trg[:, 1:].contiguous().view(-1)

loss = F.cross_entropy(preds.view(-1, preds.size(-1)),

loss.backward()
optim.step()

total_loss += loss.data[0]
if (i + 1) % print_every == 0:
loss_avg = total_loss / print_every
print("time = %dm, epoch %d, iter = %d, loss = %.3f,
%ds per %d iters" % ((time.time() - start) // 60,
epoch + 1, i + 1, loss_avg, time.time() - temp,
print_every))
total_loss = 0
temp = time.time()```

```def translate(model, src, max_len = 80, custom_string=False):

model.eval()

if custom_sentence == True:
src = tokenize_en(src)
sentence=\
Variable(torch.LongTensor([[EN_TEXT.vocab.stoi[tok] for tok
in sentence]])).cuda()

outputs = torch.zeros(max_len).type_as(src.data)
outputs[0] = torch.LongTensor([FR_TEXT.vocab.stoi['<sos>']])

for i in range(1, max_len):

k=1).astype('uint8')

out = model.out(model.decoder(outputs[:i].unsqueeze(0),
out = F.softmax(out, dim=-1)
val, ix = out[:, -1].data.topk(1)

outputs[i] = ix[0][0]
if ix[0][0] == FR_TEXT.vocab.stoi['<eos>']:
break

return ' '.join(
[FR_TEXT.vocab.itos[ix] for ix in outputs[:i]]
)```

Transformer模型的构建过程大致就是这样。想要获取完整代码，可以进入下面这个Github页面：

https://github.com/SamLynnEvans/Transformer

639 篇文章35 人订阅

0 条评论