これはU-Netで使用されているトランスモジュールを実装したもので、次のようになります。
チェックポイントを直接読み込めるように、CompVis/Stable-Diffusionからモデル定義と命名を変更していません。
19from typing import Optional
20
21import torch
22import torch.nn.functional as F
23from torch import nn
26class SpatialTransformer(nn.Module):
channels
はフィーチャマップ内のチャネル数ですn_heads
アテンション・ヘッドの数ですn_layers
変圧器層の数ですd_cond
条件付き埋め込みのサイズ31 def __init__(self, channels: int, n_heads: int, n_layers: int, d_cond: int):
38 super().__init__()
初期グループ正規化
40 self.norm = torch.nn.GroupNorm(num_groups=32, num_channels=channels, eps=1e-6, affine=True)
初期コンボリューション
42 self.proj_in = nn.Conv2d(channels, channels, kernel_size=1, stride=1, padding=0)
トランスフォーマー層
45 self.transformer_blocks = nn.ModuleList(
46 [BasicTransformerBlock(channels, n_heads, channels // n_heads, d_cond=d_cond) for _ in range(n_layers)]
47 )
最終畳み込み
50 self.proj_out = nn.Conv2d(channels, channels, kernel_size=1, stride=1, padding=0)
x
形状の特徴マップです [batch_size, channels, height, width]
cond
形状の条件付き埋め込みです [batch_size, n_cond, d_cond]
52 def forward(self, x: torch.Tensor, cond: torch.Tensor):
シェイプを取得 [batch_size, channels, height, width]
58 b, c, h, w = x.shape
残留接続用
60 x_in = x
ノーマライズ
62 x = self.norm(x)
初期コンボリューション
64 x = self.proj_in(x)
からへの転置と形状変更 [batch_size, channels, height, width]
[batch_size, height * width, channels]
67 x = x.permute(0, 2, 3, 1).view(b, h * w, c)
トランスレイヤーを適用
69 for block in self.transformer_blocks:
70 x = block(x, cond)
形状を変更してからに転置 [batch_size, height * width, channels]
[batch_size, channels, height, width]
73 x = x.view(b, h, w, c).permute(0, 3, 1, 2)
最終畳み込み
75 x = self.proj_out(x)
残差を追加
77 return x + x_in
80class BasicTransformerBlock(nn.Module):
d_model
は入力埋め込みサイズですn_heads
アテンション・ヘッドの数ですd_head
アテンションヘッドくらいの大きさですd_cond
条件付き埋め込みのサイズです85 def __init__(self, d_model: int, n_heads: int, d_head: int, d_cond: int):
92 super().__init__()
セルフアテンション層とプレノルム層
94 self.attn1 = CrossAttention(d_model, d_model, n_heads, d_head)
95 self.norm1 = nn.LayerNorm(d_model)
クロス・アテンション・レイヤーとプレ・ノルム・レイヤー
97 self.attn2 = CrossAttention(d_model, d_cond, n_heads, d_head)
98 self.norm2 = nn.LayerNorm(d_model)
フィードフォワードネットワークとプレノルム層
100 self.ff = FeedForward(d_model)
101 self.norm3 = nn.LayerNorm(d_model)
x
形状の入力埋め込みです [batch_size, height * width, d_model]
cond
形状の条件付き埋め込みです [batch_size, n_cond, d_cond]
103 def forward(self, x: torch.Tensor, cond: torch.Tensor):
セルフアテンション
109 x = self.attn1(self.norm1(x)) + x
クロス・アテンションとコンディショニング
111 x = self.attn2(self.norm2(x), cond=cond) + x
フィードフォワードネットワーク
113 x = self.ff(self.norm3(x)) + x
115 return x
118class CrossAttention(nn.Module):
125 use_flash_attention: bool = False
d_model
は入力埋め込みサイズですn_heads
アテンション・ヘッドの数ですd_head
アテンションヘッドくらいの大きさですd_cond
条件付き埋め込みのサイズですis_inplace
メモリを節約するためにアテンションソフトマックス計算をインプレースで実行するかどうかを指定します127 def __init__(self, d_model: int, d_cond: int, n_heads: int, d_head: int, is_inplace: bool = True):
136 super().__init__()
137
138 self.is_inplace = is_inplace
139 self.n_heads = n_heads
140 self.d_head = d_head
アテンションスケーリングファクター
143 self.scale = d_head ** -0.5
クエリ、キー、値のマッピング
146 d_attn = d_head * n_heads
147 self.to_q = nn.Linear(d_model, d_attn, bias=False)
148 self.to_k = nn.Linear(d_cond, d_attn, bias=False)
149 self.to_v = nn.Linear(d_cond, d_attn, bias=False)
最終線形レイヤー
152 self.to_out = nn.Sequential(nn.Linear(d_attn, d_model))
フラッシュアテンションを設定します。フラッシュアテンションは、CrossAttention.use_flash_attention
インストールされていてに設定されている場合にのみ使用されますTrue
。
157 try:
フラッシュ・アテンションをインストールするには、彼らの Github リポジトリ https://github.com/HazyResearch/flash-attention をクローンしてから実行してください python setup.py install
161 from flash_attn.flash_attention import FlashAttention
162 self.flash = FlashAttention()
ドットプロダクトの注目度を高めるためのスケールを設定します。
164 self.flash.softmax_scale = self.scale
None
インストールされていない場合はに設定
166 except ImportError:
167 self.flash = None
x
形状の入力埋め込みです [batch_size, height * width, d_model]
cond
形状の条件付き埋め込みです [batch_size, n_cond, d_cond]
169 def forward(self, x: torch.Tensor, cond: Optional[torch.Tensor] = None):
cond
None
セルフアテンションを行うなら
176 has_cond = cond is not None
177 if not has_cond:
178 cond = x
クエリ、キー、値のベクトルの取得
181 q = self.to_q(x)
182 k = self.to_k(cond)
183 v = self.to_v(cond)
フラッシュアテンションが用意されていて、頭のサイズが以下の場合は、フラッシュアテンションを使用してください 128
186 if CrossAttention.use_flash_attention and self.flash is not None and not has_cond and self.d_head <= 128:
187 return self.flash_attention(q, k, v)
それ以外の場合は、通常の対応に戻る
189 else:
190 return self.normal_attention(q, k, v)
q
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
k
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
v
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
192 def flash_attention(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):
シーケンス軸に沿ったバッチサイズと要素数を取得 (width * height
)
202 batch_size, seq_len, _ = q.shape
q
v
フラッシュアテンション用のベクトルを積み重ねて、形状のテンソルを1つにする k
[batch_size, seq_len, 3, n_heads * d_head]
206 qkv = torch.stack((q, k, v), dim=2)
スプリット・ザ・ヘッド
208 qkv = qkv.view(batch_size, seq_len, 3, self.n_heads, self.d_head)
フラッシュアテンションは頭のサイズに合うので32
64
128
、このサイズに合うように頭をパッドする必要があります。
212 if self.d_head <= 32:
213 pad = 32 - self.d_head
214 elif self.d_head <= 64:
215 pad = 64 - self.d_head
216 elif self.d_head <= 128:
217 pad = 128 - self.d_head
218 else:
219 raise ValueError(f'Head size ${self.d_head} too large for Flash Attention')
頭をパッド・ザ・ヘッド
222 if pad:
223 qkv = torch.cat((qkv, qkv.new_zeros(batch_size, seq_len, 3, self.n_heads, pad)), dim=-1)
注意を計算:これにより形状のテンソルが得られます [batch_size, seq_len, n_heads, d_padded]
228 out, _ = self.flash(qkv)
余分な頭のサイズは切り捨ててください
230 out = out[:, :, :, :self.d_head]
形状を次の形式に変更 [batch_size, seq_len, n_heads * d_head]
232 out = out.reshape(batch_size, seq_len, self.n_heads * self.d_head)
[batch_size, height * width, d_model]
線形レイヤーでマッピング
235 return self.to_out(out)
q
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
k
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
v
頭が分割される前のクエリベクトルで、形状が合っているか [batch_size, seq, d_attn]
237 def normal_attention(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):
頭の形に分ける [batch_size, seq_len, n_heads, d_head]
247 q = q.view(*q.shape[:2], self.n_heads, -1)
248 k = k.view(*k.shape[:2], self.n_heads, -1)
249 v = v.view(*v.shape[:2], self.n_heads, -1)
注意力の計算
252 attn = torch.einsum('bihd,bjhd->bhij', q, k) * self.scale
コンピュートソフトマックス
256 if self.is_inplace:
257 half = attn.shape[0] // 2
258 attn[half:] = attn[half:].softmax(dim=-1)
259 attn[:half] = attn[:half].softmax(dim=-1)
260 else:
261 attn = attn.softmax(dim=-1)
アテンション出力を計算
265 out = torch.einsum('bhij,bjhd->bihd', attn, v)
形状を次の形式に変更 [batch_size, height * width, n_heads * d_head]
267 out = out.reshape(*out.shape[:2], -1)
[batch_size, height * width, d_model]
線形レイヤーでマッピング
269 return self.to_out(out)
272class FeedForward(nn.Module):
d_model
は入力埋め込みサイズですd_mult
隠れレイヤーサイズの乗数です277 def __init__(self, d_model: int, d_mult: int = 4):
282 super().__init__()
283 self.net = nn.Sequential(
284 GeGLU(d_model, d_model * d_mult),
285 nn.Dropout(0.),
286 nn.Linear(d_model * d_mult, d_model)
287 )
289 def forward(self, x: torch.Tensor):
290 return self.net(x)
293class GeGLU(nn.Module):
300 def __init__(self, d_in: int, d_out: int):
301 super().__init__()
線形投影法の組み合わせと
303 self.proj = nn.Linear(d_in, d_out * 2)
305 def forward(self, x: torch.Tensor):
取得して
307 x, gate = self.proj(x).chunk(2, dim=-1)
309 return x * F.gelu(gate)