MC(Monte Carlo)/TD(temporal difference) Control 구현¶
- 간략화된 정책 이터레이션 사용함
- 얕은 정책 평가 1번, 정책 개선 1번 번갈아 진행
- 순서
- 한 에피소드의 경험을 쌓음
- 경험한 데이터로 q(s, a) 테이블의 값을 업데이트 하고(정책 평가)
- 업데이트된 q(s, a) 테이블을 이용하여 $\epsilon-greedy$ 정책을 만들기(정책 개선)
In [1]:
import random
import numpy as np
In [2]:
class GridWorld():
def __init__(self):
self.x=0
self.y=0
def step(self, a):
# 0번 액션: 왼쪽, 1번 액션: 위, 2번 액션: 오른쪽, 3번 액션: 아래쪽
if a==0:
self.move_left()
elif a==1:
self.move_up()
elif a==2:
self.move_right()
elif a==3:
self.move_down()
reward = -1 # 보상은 항상 -1로 고정
done = self.is_done()
return (self.x, self.y), reward, done
def move_left(self):
if self.y==0:
pass
elif self.y==3 and self.x in [0,1,2]:
pass
elif self.y==5 and self.x in [2,3,4]:
pass
else:
self.y -= 1
def move_right(self):
if self.y==1 and self.x in [0,1,2]:
pass
elif self.y==3 and self.x in [2,3,4]:
pass
elif self.y==6:
pass
else:
self.y += 1
def move_up(self):
if self.x==0:
pass
elif self.x==3 and self.y==2:
pass
else:
self.x -= 1
def move_down(self):
if self.x==4:
pass
elif self.x==1 and self.y==4:
pass
else:
self.x+=1
def is_done(self):
if self.x==4 and self.y==6: # 목표 지점인 (4,6)에 도달하면 끝난다
return True
else:
return False
def reset(self):
self.x = 0
self.y = 0
return (self.x, self.y)
MC(Monte Carlo) 방식¶
In [3]:
class QAgent():
def __init__(self):
self.q_table = np.zeros((5, 7, 4)) # q벨류를 저장하는 변수. 모두 0으로 초기화.
self.eps = 0.9
self.alpha = 0.01
def select_action(self, s):
# eps-greedy로 액션을 선택
x, y = s
coin = random.random()
if coin < self.eps:
action = random.randint(0,3)
else:
action_val = self.q_table[x,y,:]
action = np.argmax(action_val)
return action
def update_table(self, history):
# 한 에피소드에 해당하는 history를 입력으로 받아 q 테이블의 값을 업데이트 한다
cum_reward = 0
for transition in history[::-1]:
s, a, r, s_prime = transition
x,y = s
# 몬테 카를로 방식을 이용하여 업데이트.
self.q_table[x,y,a] = self.q_table[x,y,a] + self.alpha * (cum_reward - self.q_table[x,y,a])
cum_reward = cum_reward + r
def anneal_eps(self):
self.eps -= 0.03
self.eps = max(self.eps, 0.1)
def show_table(self):
# 학습이 각 위치에서 어느 액션의 q 값이 가장 높았는지 보여주는 함수
q_lst = self.q_table.tolist()
data = np.zeros((5,7))
for row_idx in range(len(q_lst)):
row = q_lst[row_idx]
for col_idx in range(len(row)):
col = row[col_idx]
action = np.argmax(col)
data[row_idx, col_idx] = action
print(data)
In [7]:
test = []
def main():
env = GridWorld()
agent = QAgent()
for n_epi in range(5000): # 총 1,000 에피소드 동안 학습
done = False
history = []
s = env.reset()
while not done: # 한 에피소드가 끝날 때 까지
a = agent.select_action(s)
s_prime, r, done = env.step(a)
history.append((s, a, r, s_prime))
test.append((s, a, r, s_prime)) # 테스트용
s = s_prime
agent.update_table(history) # 히스토리를 이용하여 에이전트를 업데이트
agent.anneal_eps()
agent.show_table() # 학습이 끝난 결과를 출력
In [8]:
main()
[[2. 3. 0. 2. 3. 2. 3.] [2. 3. 0. 1. 2. 3. 3.] [2. 3. 0. 1. 0. 3. 3.] [0. 2. 2. 1. 0. 3. 3.] [2. 1. 0. 3. 0. 2. 0.]]
TD(temporal difference) 방식- SARSA¶
In [14]:
class QAgent():
def __init__(self):
self.q_table = np.zeros((5, 7, 4)) # 마찬가지로 Q 테이블을 0으로 초기화
self.eps = 0.9
def select_action(self, s):
# eps-greedy로 액션을 선택해준다
x, y = s
coin = random.random()
if coin < self.eps:
action = random.randint(0,3)
else:
action_val = self.q_table[x,y,:]
action = np.argmax(action_val)
return action
def update_table(self, transition):
s, a, r, s_prime = transition
x,y = s
next_x, next_y = s_prime
a_prime = self.select_action(s_prime) # S'에서 선택할 액션 (실제로 취한 액션이 아님)
# SARSA 업데이트 식을 이용
self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + self.q_table[next_x,next_y,a_prime] - self.q_table[x,y,a])
def anneal_eps(self):
self.eps -= 0.03
self.eps = max(self.eps, 0.1)
def show_table(self):
q_lst = self.q_table.tolist()
data = np.zeros((5,7))
for row_idx in range(len(q_lst)):
row = q_lst[row_idx]
for col_idx in range(len(row)):
col = row[col_idx]
action = np.argmax(col)
data[row_idx, col_idx] = action
print(data)
In [15]:
def main():
env = GridWorld()
agent = QAgent()
for n_epi in range(1000):
done = False
s = env.reset()
while not done:
a = agent.select_action(s)
s_prime, r, done = env.step(a)
agent.update_table((s,a,r,s_prime))
s = s_prime
agent.anneal_eps()
agent.show_table()
In [16]:
main()
[[2. 3. 0. 1. 3. 3. 3.] [3. 3. 0. 2. 2. 3. 3.] [3. 3. 0. 1. 0. 3. 3.] [2. 2. 2. 1. 0. 2. 3.] [2. 1. 1. 1. 0. 2. 0.]]
Q-러닝 구현¶
In [17]:
class QAgent():
def __init__(self):
self.q_table = np.zeros((5, 7, 4)) # 마찬가지로 Q 테이블을 0으로 초기화
self.eps = 0.9
def select_action(self, s):
# eps-greedy로 액션을 선택해준다
x, y = s
coin = random.random()
if coin < self.eps:
action = random.randint(0,3)
else:
action_val = self.q_table[x,y,:]
action = np.argmax(action_val)
return action
def update_table(self, transition):
s, a, r, s_prime = transition
x,y = s
next_x, next_y = s_prime
a_prime = self.select_action(s_prime) # S'에서 선택할 액션 (실제로 취한 액션이 아님)
# Q러닝 업데이트 식을 이용
self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + np.amax(self.q_table[next_x,next_y,:]) - self.q_table[x,y,a])
# SARSA 업데이트 식을 이용
# self.q_table[x,y,a] = self.q_table[x,y,a] + 0.1 * (r + self.q_table[next_x,next_y,a_prime] - self.q_table[x,y,a])
def anneal_eps(self):
self.eps -= 0.01 # Q러닝에선 epsilon 이 좀더 천천히 줄어 들도록 함.
self.eps = max(self.eps, 0.2)
def show_table(self):
q_lst = self.q_table.tolist()
data = np.zeros((5,7))
for row_idx in range(len(q_lst)):
row = q_lst[row_idx]
for col_idx in range(len(row)):
col = row[col_idx]
action = np.argmax(col)
data[row_idx, col_idx] = action
print(data)
In [18]:
main()
[[2. 3. 0. 1. 2. 2. 3.] [3. 3. 0. 2. 2. 3. 3.] [2. 3. 0. 1. 0. 3. 3.] [2. 2. 2. 1. 0. 2. 3.] [1. 2. 2. 1. 0. 2. 0.]]
In [ ]: