Monte Carlo Learning

Page content

강화 학습(Reinforcement Learning)에서 사용하는 Monte Carlo에 대해서 알아 보고 Gym에서 제공하는 문제를 해결하기 위한 알고리듬을 만들어 보자.

실제로 돌려 보고 싶으면 구글 코랩으로 ~

Open In Colab

문제 (Problem)

👤 보스

인공지능 게임 프로젝트를 시작한다.
게임을 만들려면 ‘강화학습’이라는 것을 알아야 한다는데…
아래 체육관(Gym)에 가서
‘얼음 호수’ 문제를 한번 풀어보게

https://gym.openai.com/envs/FrozenLake-v0/

⚙️ 엔지니어

오~ 게임이라
딥러닝에 찌든 머리가 갑자기 맑아진다~
새로운 마음으로 문제를 해결해 보자!

문제 분석 (Problem Anaysis)

일단 Gym을 설치하고 돌려 보자!

%pip install gym
Requirement already satisfied: gym in /home/dataman/anaconda3/lib/python3.7/site-packages (0.14.0)
Requirement already satisfied: pyglet<=1.3.2,>=1.2.0 in /home/dataman/anaconda3/lib/python3.7/site-packages (from gym) (1.3.2)
Requirement already satisfied: numpy>=1.10.4 in /home/dataman/anaconda3/lib/python3.7/site-packages (from gym) (1.16.4)
Requirement already satisfied: six in /home/dataman/anaconda3/lib/python3.7/site-packages (from gym) (1.12.0)
Requirement already satisfied: scipy in /home/dataman/anaconda3/lib/python3.7/site-packages (from gym) (1.3.1)
Requirement already satisfied: cloudpickle~=1.2.0 in /home/dataman/anaconda3/lib/python3.7/site-packages (from gym) (1.2.1)
Requirement already satisfied: future in /home/dataman/anaconda3/lib/python3.7/site-packages (from pyglet<=1.3.2,>=1.2.0->gym) (0.17.1)
Note: you may need to restart the kernel to use updated packages.
import gym
import numpy as np
from time import sleep
from IPython.display import display, clear_output, Pretty


def get_optimal_value(state, action, reward):
    return None

def get_optimal_action():
    return env.action_space.sample()
#
# Environment
#
env = gym.make('FrozenLake-v0', is_slippery=False) # 얼음위에서 미끄러지지 않도록 설정
state = env.reset()

# Initial world display
world = env.render(mode='ansi')
display(Pretty(world))
sleep(0.5)

#
# Agent
#
for step in range(100):
    action = get_optimal_action()
    next_state, reward, done, info = env.step(action)    
    get_optimal_value(state, action, reward)
    state = next_state
    
    # updated world display
    world = env.render(mode='ansi')
    clear_output(wait=True)
    display(Pretty(world))
    sleep(0.5)
    
    if done: # an episode finished
        print("Episode finished after {} timesteps".format(step+1))
        break
  (Right)
SFFF
FHFH
FFFH
HFFG



Episode finished after 5 timesteps

4X4 매트릭스에서 S(Start)에서 시작해서 H(Hole)를 피해서 G(Goal)까지 도착하는 최적의 길을 찾는 게임이다. 코드를 보면 …

  1. 환경(Environemnt)과 에이젼트(Agent)로 나누어져 있고,
  2. 에이젼트는 환경으로 부터 상태(state)를 얻어 오고
  3. 최적의 액션(action)을 결정하고
  4. 액션을 실행하고 그 결과로 다음 상태, 보상(Reward)을 얻어 오고
  5. 상태, 액션, 보상을 이용해서 최적의 가치(Value)를 계산하고
  6. 3,4,5를 반복하는

… 구조로 되어 있다.

⚙️ 엔지니어

이거…
딥러닝하고는 완전 다른 문제다…
딥러닝은 데이터를 주고 사람보다 뛰어난 분류와 예측을 하도록 모델링 을 하는 것이다.
강화학습은 환경과 에이젼트를 주고 사람보다 뛰어난 행동을 하도록 알고리듬 을 만드는 것이다.

우선 환경(Environment)에 대해서 알아 보자

환경 (Environment)

‘얼음호수’의 환경은 16개의 상태(State)와 4개의 액션(Action)으로 구성 되어 있다.

액션 (Action)

# Action Space
env.action_space
Discrete(4)

‘얼음호수’에서는 4개의 액션이 존재한다. 그리고 각각의 액션이 번호로 지정 되어 있다.

\(A = \{0, 1, 2, 3\}\)

Num Action
0 왼쪽으로 이동 (Move Left)
1 아래로 이동 (Move Down)
2 오른쪽으로 이동 (Move Right)
3 위로 이동 (Move Up)

상태 (State)

# State Space
env.observation_space
Discrete(16)

‘얼음호수’의 상태(State) \(S\)는 다음과 같이 각 상태가 0과 15까지 번호로 지정되어 있다.

\(S = \{0, 1, \cdots , 15\}\)

\(\begin{vmatrix} 0 & 1 & 2 & 3 \\ 4 & 5 & 6 & 7 \\ 8 & 9 & 10 & 11 \\ 12 & 13 & 14 & 15 \end{vmatrix}\)

그리고 각 상태마다 액션(action), 확률(probability), 다음 상태(next state), 보상(reward), 종료(done)가 {action: [(probability, nextstate, reward, done)]} 형식으로 정의 되어 있다.

6번 상태를 까보자

env.P[6]
{0: [(1.0, 5, 0.0, True)],
 1: [(1.0, 10, 0.0, False)],
 2: [(1.0, 7, 0.0, True)],
 3: [(1.0, 2, 0.0, False)]}

해석을 해 보면 다음과 같다.

6번 상태에서
왼쪽으로 이동하면 100%의 확률로 5번 상태로 변환되고, 보상은 0.0, 종료한다.
아래로 이동하면 100%의 확률로 10번 상태로 변환되고, 보상은 0.0, 계속한다.
오른쪽으로 이동하면 100%의 확률로 7번 상태로 변환되고 보상은 0.0, 종료한다.
위로 이동하면 100%의 확률로 2번 상태로 변환되고 보상은 0.0, 계속한다.

보상 (Reward)

14번의 상태도 까보자

env.P[14]
{0: [(1.0, 13, 0.0, False)],
 1: [(1.0, 14, 0.0, False)],
 2: [(1.0, 15, 1.0, True)],
 3: [(1.0, 10, 0.0, False)]}

14번 상태에서 오른쪽으로 이동하면 100% 확률로 15번 상태로 변환되고 1.0을 보상 받고 종료한다!

기대 보상값은 아래와 같이 정의한다.

\(\mathcal R_s^a = \mathbb E [\mathcal R_{t+1} | S_t = s, A_t = a ]\)

예를 들어서 14번 상태에서, 2번 액션에서의 기대 보상값은 아래와 같다.

\(\mathcal R_{14}^{2} = 1.0\)

Return (미래 보상의 합)

에이전트는 바로 다음의 보상만을 보고 액션을 하면 안된다. 먼 미래의 최종 보상의 합을 예상해서 액션을 해야 한다.
‘2보 전진을 위한 1보 후퇴’ 전략을 구사할 수 있어야 한다.

에이전트가 원하는 최종 골(Goal)은 다음의 식과 같이 다음, 다음, 다음…의 미래의 보상들의 합을 돌려 받는(Return) 것이다.

\(G_t = R_{t+1} + \gamma R_{t+2} + \cdots + = \sum_{k=0}^{\infty} \gamma^{k} R_{t+k+1}\)

그러나 에이전트도 욕심이 있어서 바로 다음 보상이 커보인다. 이것을 만족시켜 주는 것이 디스카운트(discount factor) \(\gamma\) 값이다. \(\gamma\) 는 0과 1사이의 값을 가진다. 이렇게 하면 미래로 갈 수록 보상은 지수적으로 작아 보이게 된다.

\(\gamma \in [0,1]\)

⚙️ 엔지니어

에이전트는
보상을 받기 위해
구멍에 빠지지 않고 15번 상태로 이동하기 위해
최적의 액션을 해야 한다.

우리는
에이전트가 최적의 액션을 하도록
알고리듬을 만들어야 한다.
어떻게 ?

에이전트 (Agent)

에이전트는 환경에서 부터 주어진 상태(\(S\)), 액션(\(A\)), 보상(\(R\))을 가지고 가치(Q-value)를 계산하고 정책(Policy)을 수립해서 최적의 액션을 수행해야 한다.

정책 (Policy)

에이전트가 주어진 상태에서 액션을 선택하는 확률이다.

\(\pi(s,a) = \pi (a|s) = \mathbb P[A_t = a | S_t = s]\)

예를 들어서 \(\pi (1 | 5) = 0.25\) 라는 것은 5번 상태에서 아래로 이동하는 액션(1) 을 수행할 확률을 25%로 정한다는 뜻이다.

상태 가치 (State Value)

상태 가치는 상태 s 에서 기대되는 미래 보상의 합이다.

\(v_{\pi}(s) = \mathbb E_{\pi} [G_t | S_t = s]\)

Q-value (State Action Value)

Q-value는 상태 s 에서 액션 a 를 수행할 경우에 기대되는 미래 보상의 합이다.

\(q_{\pi}(s,a) = \mathbb E_{\pi} [G_t | S_t = s, A_t = a]\)

최적 가치 (Optimal Value)

상태 s 에서 가장 큰 Q-value이다.

\(q_*(s,a) = \max_{\pi} q_{\pi}(s,a)\)

최적 정책 (Optimal Policy)

\(q_*(s,a)\)가 결정 되면 \(\pi (s, a) = 1\) 이 된다. 즉 상태 s일때는 100% 액션 a를 수행 한다.

\( \pi_*(s, a) = \begin{cases} 1 & \text{if } a= \text{argmax}_{a \in A} q_\star(s,a) \\ 0 & \text{otherwise} \end{cases} \)

⚙️ 엔지니어

에이전트는
각 상태(state)의 액션(action)마다
Q-value를 계산한다.

에이전트가 최적의 액션을 하기 위해서는
각 상태에서 Q-value의 최고값을 가진
액션을 수행하면 된다!

그런데
Q-value를 어떻게 찾지?

메모리 (Memory)

랜덤 액션에서 얻은 데이터 (\(s_t, a_t\)) 들을 몬테 카를로에서 사용하기 위해서 메모리(memory)에 저장한다.

\(\text{memory} = [(s_0, a_0), (s_1, a_1), \cdots]\)

Monte Carlo (몬테 카를로)

‘얼음호수’ 에서 S(Start)에서 시작해서 H(Hole)에 빠지거나 G(Goal)에 도착하면 게임을 끝이난다. 이 것을 에피소드(episode) 라고 한다. 하나의 에피소드가 끝나면 에피소드에서 지나왔던 상태(state)에 대해서 Q-value를 구한다. 그리고 계속해서 에피소드를 실행 시키면서 지나온 상태(state)에 대한 Q-value를 업데이트 하면 결국 최적의 Q-value를 구할 수 있다. 이것이 몬테 카를로 방법(Monte Carlo method)이다. 이때 액션은 랜덤으로 결정한다.

\(\pi(0|\cdot) = \pi(1|\cdot) = \pi(2|\cdot) = \pi(3|\cdot) = 0.25\)

하나의 에피소드가 끝나면 아래와 같이 Q-value를 구한다.

\(k\): Sample k번째 에피소드

\(\begin{align} N(S_t, A_t) & \leftarrow N(S_t, A_t) + 1 \\ Q(S_t, A_t) & \leftarrow Q(S_t, A_t) + \dfrac{1}{N(S_t, A_t)} \left( G_t - Q(S_t, A_t) \right)
\end{align}\)

Target

목표로 하는 값이다. 여기서 타겟은 \(G_t\)이다.

Error (델타)

목표값과 현재값과의 차이를 \(\delta\) 라고 한다.
\(\delta_t = G_t - Q(S_t, A_t) \)

계속해서 에피소드를 실행 시키면서 \(Q(S_t, A_t)\)에다가 \(\delta_t\) 의 평균을 업데이트 하면 결국 \(Q(s, a) \rightarrow q_*(s,a)\)가 된다.

⚙️ 엔지니어

아하! 최적의 Q-value를 찾아 가는 것이 강화 학습(Reinfocement Learning)하는 방법이구나!

학습 (Learning)

몬테 카를로를 이용해서 최적의 Q-value를 찾아보자!

from tqdm import tqdm

num_state = env.observation_space.n
num_action = env.action_space.n
num_episode = 5000

# Q_table
Q_table = np.zeros((num_state, num_action))
# N_table
N_table = np.zeros((num_state, num_action))
# R_table
R_table = np.zeros((num_state, num_action))

for episode in tqdm(range(num_episode)):
    memory = []    
    state = env.reset()
    
    for step in range(100):
        action = env.action_space.sample()        
        memory.append((state, action)) # trajectory
        next_state, reward, done, info = env.step(action)
        R_table[state][action] = reward
        state = next_state
        
        if done: # an episode finished  
            #
            # Monte Carlo policy evaluation
            #
            for i in range(len(memory)):
                G_t = 0
                gamma = 0.6 # discount factor
                for j in range( i, len(memory) ):
                    S_t = memory[j][0]
                    A_t = memory[j][1]
                    if i==j:
                        G_t += R_table[S_t][A_t]
                    else:
                        G_t += gamma * R_table[S_t][A_t]
                        gamma = gamma * gamma
                S_t = memory[i][0]
                A_t = memory[i][1]
                N_table[S_t][A_t] += 1
                Q_table[S_t][A_t] += (G_t - Q_table[S_t][A_t]) / N_table[S_t][A_t]
            break
100%|██████████| 5000/5000 [00:00<00:00, 8161.28it/s]

해결 (Solution)

state = env.reset()
done = False

# Initial world display
world = env.render(mode='ansi')
display(Pretty(world))
sleep(0.5)

while not done:
    action = np.argmax(Q_table[state]) # Optimal Policy
    state, reward, done, info = env.step(action)
    
    # updated world display
    world = env.render(mode='ansi')
    clear_output(wait=True)
    display(Pretty(world))
    sleep(0.5)
    
    if done and state == 15:
        print('\n 🎉👍 성공! 🍺🥇')
  (Right)
SFFF
FHFH
FFFH
HFFG




 🎉👍 성공! 🍺🥇