9.1 Policy Gradient
먼저 정책 기반 에이전트가 왜 필요한지에 대해 설명해본다.
1. 가치 기반 에이전트가 액션을 선택하는 방식은 결정론적이다.
모든 상태 s에 대해 각 상태에서 선택하는 액션이 변하지 않는다는 뜻이다.
이 에이전트가 가위바위보를 한다면 어떨까? 계속 정해진 하나의 패만 내고 상대가 전략을 수정할 수 있다면 쉽게 간파당할 것이다. 이에 반해 정책 기반 에이전트는 확률적 정책을 취할 수 있다.
(가위, 바위, 보를 동등하게 3분의 1 확률로 선택하는 정책을 가질 수 있는 것)
2. 액션 공간이 연속적인 경우 대처 가능
또한 액션 공간이 연속적일 경우(0에서 1사이의 모든 실수 값이 액션으로 선택 가능한 상황)
이때 가치 기반 에이전트가 작동하기 위해서는 모든 a에 대해 Q(s,a)의 값을 최대로 하는 a를 찾아야 한다.
연속적 액션 공간에서는 액션이 무한이기 때문에 일일이 넣어볼 수 없고 결국 최적화 문제를 하나씩 푸는 셈
= 즉 Q(s,a) 기반 에이전트가 작동하기 힘들다.
그러나 정책 기반 에이전트는 \(\pi\)(s)가 주어져 있다면 바로 액션을 정할 수 있기에 문제가 없다.
- 이번 챕터에서 다룰 문제
- 모델 프리
- 커다란 문제여서 테이블에 값을 담을 수 없어 뉴럴넷이 필요한 상황
- 정책 네트워크를 강화하는 것이 목적
목적 함수 정하기
정책 네트워크 : \(\pi_{\theta}(s, a)\)
\(\theta\) : 정책 네트워크의 파라미터
어떻게 정책 네트워크를 강화시킬 수 있을까?
결국 뉴럴넷의 파라미터를 업데이트 하는 것이니 그라디언트 디센트 방법론을 사용할 것임을 추측할 수 있다. 이를 위해선 손실 함수 정의가 필요한데 손실 함수를 어떻게 정의할 수 있을까 (정답을 모르기 때문에)
그래서 정책 네트워크를 업데이트할 때는 손실 함수를 줄이는 방향이 아닌 정책을 평가하는 기준을 세워서 그 값을 증가시키도록 하는 방향으로 그라디언트 업데이트를 하고자 한다.
우리의 목적은 이 정책이 얼마나 좋은 정책인지 평가하는 방법을 찾는 것이다.
평가함수 : \(J(\theta)\)
정책을 어떻게 평가할까?
보상의 합이 큰 정책이 좋은 정책일 것이다. (에피소드마다 서로 다른 보상이기에 기댓값 연산자 필요)
\(J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \sum_t r_t \right ]\)
시작하는 상태가 (\(s_0\))으로 고정되어있다면 (\(s_0\))의 가치로 볼 수 있음 = 가치 함수로 표현 가능
\(J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \sum_t r_t \right ] = v_{\pi_{\theta}}(s_0)\)
시작하는 상태가 고정되어 있지 않고 시작 상태 의 확률 분포 가 정의되어 있다면 (매번 다른 상태에서 시작)
\(J(\theta) = \sum_{s \in S} d(s) * v_{\pi_{\theta}}(s)\)
그리디언트 기반으로 \(J(\theta)\)를 최대화
\(\theta' \gets \theta + \alpha * \nabla_{\theta} J(\theta)\)
- 우리의 목적은 \( \nabla_{\theta} J(\theta)\)\)를 구하는 것이고 이를 구하는 방법을 살펴본다.
1-Step MDP
- 1-Setp MDP: 딱 한 스텝만 진행하고 바로 에피소드가 끝나는 MDP
- 즉, 리턴이 보상과 같다.
\(\begin{equation} \begin{split} J(\theta) &= \sum_{s \in S} d(s) * v_{\pi_{\theta}}(s) \\ &= \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) * R_{s, a} \\ \nabla_{\theta}J(\theta) &= \nabla_{\theta} \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) * R_{s, a} \end{split} \end{equation}\)
문제점
- 모델 프리 상황으로 \(R_{s,a}\)와 \(P_{ss'}^a\)를 알 수 없다.
- \(R_{s,a}\)를 안다고 해도 s가 많은 경우 계산 불가능. (뉴럴넷을 사용하는 상태)
샘플 기반 방법론으로 이를 계산할 수 있다.
\(\begin{equation} \begin{split} \nabla_{\theta}J(\theta) &= \nabla_{\theta} \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) * R_{s, a} \\ &= \sum_{s \in S} d(s) \sum_{a \in A} \nabla_{\theta} \pi_{\theta}(s, a) * R_{s, a} \\ &= \sum_{s \in S} d(s) \sum_{a \in A} {\pi_{\theta}(s, a) \over \pi_{\theta}(s, a)} \nabla_{\theta} \pi_{\theta}(s, a) * R_{s, a} \\ &= \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) {\nabla_{\theta} \pi_{\theta}(s, a) \over \pi_{\theta}(s, a)} * R_{s, a} \\ &= \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) \nabla_{\theta} \log \pi_{\theta}(s, a) * R_{s, a} \\ \end{split} \end{equation}\)
기댓값 연산자를 이용해 간단하게 식 변경 가능
\(\begin{equation} \begin{split} \nabla_{\theta}J(\theta) &= \sum_{s \in S} d(s) \sum_{a \in A} \pi_{\theta}(s, a) \nabla_{\theta} \log \pi_{\theta}(s, a) * R_{s, a} \\ &= \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * R_{s, a} \right ] \end{split} \end{equation}\)
\(\pi_{\theta}\)(s,a)로 움직이는 에이전트를 환경에 가져다 놓고, \(\nabla_{\theta} \log \pi_{\theta}(s, a) * R_{s, a}\)의 값을 여러 개 모으면 된다. 이 값을 평균을 내면 \(\nabla_{\theta}J(\theta)\)와 같다.
일반적 MDP에서의 Policy Gradient
- 1step MDP: \(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * R_{s, a} \right ]\)
- MDP : \(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * Q_{\pi_{\theta}} (s, a) \right ]\)
\(R_{s,a}\)가 \(Q_{\pi_{\theta}} (s, a)\)로 바뀌었다.
MDP에서는 한 스텝만 밟고 MDP가 종료되는 게 아니라 이후에 여러 스텝이 있기 때문에 이후에 받을 보상까지 포함.
9.2 REINFORCE 알고리즘
이론적 배경
\(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} [ \nabla_{\theta} \log \pi_{\theta}(s, a) * G_t ]\)
\(Q_{\pi_{\theta}} (s, a)\)자리에 리턴 \(G_t\)가 들어갔다.
리턴의 샘플을 여러 개 얻어서 평균을 내면 액션-밸류인 \(Q_{\pi_{\theta}} (s, a)\)에 근사해지기 때문에 이를 이용해 \(G_t\)를 그라디언트에 사용한다.
REINFORCE pseudo code
\(\pi_{\theta}\)로 에피소드 하나에 해당하는 데이터를 얻고, 해당 데이터로 \(\theta\)를 업데이트하고 이를 이용해 다음 에피소드의 경험을 얻고 그 데이터로 또 강화한다.
\(\theta\)가 변하지 않거나 성능 개선이 일어나지 않을 때까지 반복하면 된다.
> \(\nabla_{\theta} \log \pi_{\theta} (s, a) * G_t\)에서 \(G_t\)를 무시하고 \(\nabla_{\theta} \log \pi_{\theta} (s, a)\)만 가지고 업데이트 한다고 가정
\(\theta \gets \theta + \alpha * \nabla_{\theta} \log \pi_{\theta} (s, a)\)
위 수식의 의미는 \(\log \pi_{\theta} (s, a)\)를 증가시킨다는 의미이고 로그 함수는 단조 증가 함수이므로 \(\pi_{theta}(s,a)\)를 증가시키겠다는 의미와 같다. (즉, s에서 a를 선택할 확률을 증가시키겠다는 의미)
> 만일 \(G_t\)가 +1일때와 -1일때의 경우
\(\theta \gets \theta + \alpha * \nabla_{\theta} \log \pi_{\theta} (s, a) * 1\)
\(\theta \gets \theta - \alpha * \nabla_{\theta} \log \pi_{\theta} (s, a) * 1\)
위의 식은 \( \log \pi_{\theta} (s, a)\)를 증가시키는 방향으로 업데이트 하는 방면, 다른 한쪽은 감소시키는 방향으로 업데이트.즉 리턴이 +1인 액션의 확률을 증가시키도록 업데이트하고, 리턴이 -1인 액션의 확률은 감소시키도록 업데이트한다.
> \(G_t\)가 +100일 때의 경우
\(\theta \gets \theta + \alpha * \nabla_{\theta} \log \pi_{\theta} (s, a) * 100\)
리턴이 100배 더 좋았다면, 그 액션을 100배 더 크게 업데이트 해 주겠다는 뜻이다.
REINFORCE 구현
데이터가 주어졌을 때, 이를 이용해 계산하는 식
\(\nabla_{\theta}J(\theta) \approx G_t * \nabla_{\theta} \log \pi_{\theta}(s_t, a_t)\)
그러나 텐서플로, 파이토치 같은 라이브러리를 사용할 때는 위와 같은 미분된 형태의 수식을 사용하지 않는다.
라이브러리의 optimizer는 손실 함수를 자동으로 minimize, 우린 maximize해야하므로 -를 붙인다.
\(\nabla_{\theta}J(\theta) \approx -G_t * \nabla_{\theta} \log \pi_{\theta}(s_t, a_t)\)
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
#Hyperparameters
learning_rate = 0.0002
gamma = 0.98
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.data = []
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 2)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.softmax(self.fc2(x), dim=0)
return x
def put_data(self, item):
self.data.append(item)
def train_net(self):
R = 0
self.optimizer.zero_grad()
for r, prob in self.data[::-1]:
R = r + gamma * R
loss = -torch.log(prob) * R
loss.backward()
self.optimizer.step()
self.data = []
def main():
env = gym.make('CartPole-v1')
pi = Policy()
score = 0.0
print_interval = 20
for n_epi in range(10000):
s, _ = env.reset()
done = False
while not done: # CartPole-v1 forced to terminates at 500 step.
prob = pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample()
s_prime, r, done, truncated, info = env.step(a.item())
pi.put_data((r,prob[a]))
s = s_prime
score += r
pi.train_net()
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
9.3 액터크리틱
정책 네트워크와 밸류 네트워크를 함께 학습하는 액터-크리틱 방법론에 대해 배워보자.
그중 Q 액터-크리틱, 어드밴티비 액터-크리틱, TD 액터-크리틱을 설명한다.
Q 액터-크리틱
기존 policy gradient 식을 떠올려보자.
\(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * Q_{\pi_{\theta}} (s, a) \right ]\)
REINFORCE 알고리즘은 \(Q_{\pi_{\theta}}\) (s, a)자리에 그 샘플인 리턴 \(G_t\)를 사용한다. 그대로 \(Q_{\pi_{\theta}}\) (s, a)를 사용하면 그게 곧 Q 액터-크리틱이다.
학습: \(\theta\)로 파라미터화된 정책 네트워크 \(\pi_{\theta}\)와 w로 파라미터화 된 밸류 네트워크 \(Q_w\)
- \(\pi_{\theta}\)는 실행할 액션 a를 선택하는 액터 역할
- \(Q_w\)는 선택된 액션 a의 밸류를 평가하는 크리틱 역할
Q 액터-크리틱 pseudo code
어드밴티지 액터-크리틱
policy gradient 식을 살펴보자.
\(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * Q_{\pi_{\theta}} (s, a) \right ]\)
\(\nabla_{\theta} \log \pi_{\theta}(s, a)\)는 벡터이고 뒤에 곱해지는 \(Q_{\pi_{\theta}} (s, a)\)는 숫자 값이다.
= 상태 s에서 액션 a를 하고 얻게 되는 리턴의 기댓값
만약 운 좋게 밸류가 아주 높은 상태 s'에 도달했다고 가정하자. 어떤 액션을 취하든 리턴이 높다.
\(Q(s', a_0)\) = 1000, \(Q(s', a_1)\) = 1050
--> 이 둘의 차이를 학습하기 위해서는 무수히 많은 샘플이 필요할 수 있음.
대안
\(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * \{Q_{\pi_{\theta}} (s, a) -V_{\pi_{\theta}}(s)\} \right ]\)
위와 같이 모든 상태에서 업데이트할 때, 각 상태의 밸류인 \(V_{\pi_{\theta}}\)(s)를 빼주고자 한다.
\(Q_{\pi_{\theta}}\)(s, a) - \(V_{\pi_{\theta}}\)(s)는 상태 s에 있는 것보다 액션 a를 실행함으로써 추가로 얼마의 가치를 더 얻게 되느냐 하는 것이다.
그래서 이를 어드밴티지라고 부른다.
\(A_{\pi_{\theta}}(s, a) \equiv Q_{\pi_{\theta}(s, a) - V_{\pi_{\theta}}}(s)\)
\(V_{\pi_{\theta}}\)(s)를 빼줘도 원 수식의 기댓값이 변하지 않는지 증명해보자.
\(\begin{equation} \begin{split} \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * Q_{\pi_{\theta}} (s, a) \right ] &= \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * \{Q_{\pi_{\theta}} (s, a) -V_{\pi_{\theta}}(s)\} \right ] \\ &= \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * Q_{\pi_{\theta}} (s, a) \right ] - \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * V_{\pi_{\theta}} (s) \right ] \\ \text{즉},&\;\;\;\; \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * V_{\pi_{\theta}} (s) \right ] = 0 \\ \end{split} \end{equation}\)
상태 s에 대한 임의의 함수 B(s)에 대해 다음이 성립
\(\mathbb{E} \left [ \nabla_{\theta} \log \pi_{\theta} (s, a) * B(s) \right ] = 0\)
상태 분포(\(d_{\pi}(s)\)): 정책 \(\pi\)를 따라서 움직이는 에이전트가 각 상태에 평균적으로 머무는 비율을 나타내는 분포
\(\mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * B(s) \right ] = \sum_{s \in S} d_{\pi_{\theta}}(s) \sum_{a \in A} \pi_{\theta}(s, a) \nabla_{\theta} \log \pi_{\theta} (s, a) * B(s)\)
같다는 것은 증명으로 확인 가능하다.
어드밴티지 액터-크리틱의 policy-gradient는 다음과 같다.
\(\begin{equation} \begin{split} \nabla_{\theta}J(\theta) &= \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * A_{\pi_{\theta}}(s, a) \right ] \\ A_{\pi_{\theta}}(s, a) &= Q_{\pi_{\theta}}(s, a) - V_{\pi_{\theta}}(s) \end{split} \end{equation}\)
실제 알고리즘으로 변환하려면 다음과 같은 근사가 필요하다.
\(Q_{\pi_{\theta}}\) ≈ \(Q_w\), \(V_{\pi_{\theta}}\) ≈ \(V_{\phi}(s)\)
필요한 뉴럴넷
- 정책 함수 \(\pi_{\theta}\)(s,a)의 뉴럴 넷 \(\theta\)
- 액션-가치 함수 \(Q_w\)(s,a)의 뉴럴넷 W
- 가치 함수 \(V_{\phi}\)(s)의 뉴럴넷 \(\phi\)
어드밴티지 액터-크리틱 pseudo code
TD 액터-크리틱
액터-크리틱은 3쌍의 뉴럴넷을 필요로 한다. TD 액터-크리틱은 \(Q_w\)를 필요없게 해준다.
TD 에러 =
δ는 A(s,a)의 불편 추정량
δ는 같은 상태 s에서 같은 액션 a를 선택해도 상태 전이가 어떻게 일어나느냐에 따라 매번 다른 값을 얻게 된다. 이 값을 여러개 평균내면 그 값이 A(s,a)로 수렴한다는 뜻이다.
\(Q_{\pi}(s,a)\) 대신에 \(G_t\)를 사용해 업데이트 했던 것처럼 기존 어드벤티지 액터-크리틱에서의 policy gradient 수식을 다음과 같이 변형 가능
\(\nabla_{\theta}J(\theta) = \mathbb{E}_{\pi_{\theta}} \left [ \nabla_{\theta} \log \pi_{\theta}(s, a) * \delta \right ]\)
TD 액터-크리틱 pseudo code
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
#Hyperparameters
learning_rate = 0.0002
gamma = 0.98
n_rollout = 10
class ActorCritic(nn.Module):
def __init__(self):
super(ActorCritic, self).__init__()
self.data = []
self.fc1 = nn.Linear(4,256)
self.fc_pi = nn.Linear(256,2)
self.fc_v = nn.Linear(256,1)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, softmax_dim = 0):
x = F.relu(self.fc1(x))
x = self.fc_pi(x)
prob = F.softmax(x, dim=softmax_dim)
return prob
def v(self, x):
x = F.relu(self.fc1(x))
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, done_lst = [], [], [], [], []
for transition in self.data:
s,a,r,s_prime,done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r/100.0])
s_prime_lst.append(s_prime)
done_mask = 0.0 if done else 1.0
done_lst.append([done_mask])
s_batch, a_batch, r_batch, s_prime_batch, done_batch = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst, dtype=torch.float), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_lst, dtype=torch.float)
self.data = []
return s_batch, a_batch, r_batch, s_prime_batch, done_batch
def train_net(self):
s, a, r, s_prime, done = self.make_batch()
td_target = r + gamma * self.v(s_prime) * done
delta = td_target - self.v(s)
pi = self.pi(s, softmax_dim=1)
pi_a = pi.gather(1,a)
loss = -torch.log(pi_a) * delta.detach() + F.smooth_l1_loss(self.v(s), td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()
def main():
env = gym.make('CartPole-v1')
model = ActorCritic()
print_interval = 20
score = 0.0
for n_epi in range(10000):
done = False
s, _ = env.reset()
while not done:
for t in range(n_rollout):
prob = model.pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, truncated, info = env.step(a)
model.put_data((s,a,r,s_prime,done))
s = s_prime
score += r
if done:
break
model.train_net()
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
'바닥부터 배우는 강화 학습' 카테고리의 다른 글
바닥부터 배우는 강화학습 1장부터 9장까지 총정리 (0) | 2024.01.15 |
---|---|
[ 바닥부터 배우는 강화 학습 ] 10. 알파고와 MCTS (1) | 2024.01.09 |
[ 바닥부터 배우는 강화 학습 ] 08. 가치 기반 에이전트 (1) | 2024.01.08 |
[ 바닥부터 배우는 강화 학습 ] 05. MDP를 모를 때 밸류 평가하기 (3) | 2024.01.02 |
[ 바닥부터 배우는 강화 학습 ] 06. MDP를 모를 때 최고의 정책 찾기 (0) | 2023.11.23 |