Graduate School/Topics on AI
Solving Maze using Reinforcement Learning
- -
728x90
반응형
1. DP using greedy
from pyamaze import maze, agent
import numpy as np
# Load the Maze
size = 5
m=maze(size,size)
m.CreateMaze(loadMaze="maze.csv")
# create the environment model
states = list(m.maze_map.keys())
actions = ['E','N', 'W', 'S']
# define how an action changes a state
def step(state, action):
x, y = state
if action=='E':
y += 1
elif action=='W':
y -= 1
elif action=='N':
x -= 1
elif action=='S':
x += 1
new_state = (x,y)
return new_state
# create the transition model
# trans[s,a] = s'. e.g., trans[0,3] = 2, trans[]
trans = np.ones(shape=[len(states), len(actions)], dtype=np.int32)*(-1)
for s, state in enumerate(states):
for a, action in enumerate(actions):
#check if the action is possible
if m.maze_map[state][action]>0:
next_state = step(state,action)
s_next = states.index(next_state)
trans[s,a] =s_next
if s==0:
trans[s,:] = 0
rewards = np.ones(shape=[len(states), len(actions)])*(-1.0)
rewards[0, :] = 0.0
v = np.zeros(shape=[len(states)])
pi = np.ones(shape=[len(states), len(actions)])*(0.25)
def show_v():
print(np.reshape(v, [size, size]).T)
def show_pi():
action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
for s in range(len(states)):
for a in range(len(actions)):
if pi[s,a]>0.0:
pi_disp[s] = pi_disp[s] + action_arrows[a]
else:
pi_disp[s] = pi_disp[s] + " "
print(np.reshape(pi_disp, [size, size]).T)
def run_episode(s=24):
h = [states[s]]
while s != 0:
a = np.random.choice(np.arange(0,len(actions)), p=pi[s])
h.append(states[s])
s = trans[s][a]
print(s)
if s == -1 or len(h)>100:
print("failed! total_steps=", len(h)-1)
break
if s==0:
print("succeeded! total_steps=", len(h)-1)
m.CreateMaze(loadMaze="maze.csv")
a = agent(m, footprints=True, x=h[0][0], y=h[0][1])
for pos in h:
a.position = pos
m.run()
return h
def greedy(v):
# initialize all pi[s,a] = 0.0
pi = np.zeros(shape=[len(states),len(actions)])
for s in range(len(states)):
v_max = -np.inf
a_max = []
for a in range(len(actions)):
s_next = trans[s,a]
if s_next > -1 :
#next_state_value = v[s_next]
if v[s_next] > v_max:
v_max = v[s_next]
a_max = [a]
elif v[s_next] == v_max:
a_max.append(a)
#assign pi (s,a_max)
n_max = len(a_max)
prb = 1.0/n_max
for action in a_max:
pi[s,action] = prb
return pi
gamma = 1.0
epsilon = 0.1
def update_value(v):
v_old = v.copy()
for s, state in enumerate(states):
# v[s] = sum_n(pi(s, n)*total_outcome_for_a)
v_sum = 0.0
for a, action in enumerate(actions):
r = rewards[s, a] # what is reward
s_next = trans[s, a] # what is next state
if s_next > -1:
r_gamma = r+gamma*v_old[s_next]
v_sum = v_sum+pi[s, a]*r_gamma
v[s] = v_sum
dv = np.sum(np.abs(v-v_old))
return v, dv
# define what happens in a single iteration
def epoch(v, pi):
for _ in range(100):
v, dv = update_value(v)
print(dv)
if dv < epsilon:
break
pi = greedy(v)
return v, pi
# run the iteration
for iter in range(100):
print("iter:", iter)
v, pi = epoch(v, pi)
run_episode()
2. Value Iteration
from pyamaze import maze, agent
import numpy as np
# Load the Maze
size = 5
m=maze(size,size)
m.CreateMaze(loadMaze="maze.csv")
# create the environment model
states = list(m.maze_map.keys())
actions = ['E','N', 'W', 'S']
# define how an action changes a state
def step(state, action):
x, y = state
if action=='E':
y += 1
elif action=='W':
y -= 1
elif action=='N':
x -= 1
elif action=='S':
x += 1
new_state = (x,y)
return new_state
# create the transition model
# trans[s,a] = s'. e.g., trans[0,3] = 2, trans[]
trans = np.ones(shape=[len(states), len(actions)], dtype=np.int32)*(-1)
for s, state in enumerate(states):
for a, action in enumerate(actions):
#check if the action is possible
if m.maze_map[state][action] > 0:
next_state = step(state,action)
s_next = states.index(next_state)
trans[s,a] =s_next
if s == 0:
trans[s,:] = 0
rewards = np.ones(shape=[len(states), len(actions)])*(-1.0)
rewards[0, :] = 0.0
v = np.zeros(shape=[len(states)])
def show_v():
print(np.reshape(v, [size, size]).T)
def run_episode(s=24):
h = [states[s]]
while s != 0:
a = np.argmax([rewards[s, a]+gamma*v[trans[s][a]] if trans[s][a] > -1 else -np.inf for a in range(len(actions))])
h.append(states[s])
s = trans[s][a]
print(s)
if s == -1 or len(h)>100:
print("failed! total_steps=", len(h)-1)
break
if s==0:
print("succeeded! total_steps=", len(h)-1)
m.CreateMaze(loadMaze="maze.csv")
a = agent(m, footprints=True, x=h[0][0], y=h[0][1])
for pos in h:
a.position = pos
m.run()
return h
gamma = 1.0
epsilon = 0.1
def update_value(v):
v_old = v.copy()
for s, state in enumerate(states):
#v[s] = max(outcomes_for_a)
v_list = []
for a, action in enumerate(actions):
r = rewards[s, a] # what is reward
s_next = trans[s, a] # what is next state
if s_next > -1:
r_gamma = r+gamma*v_old[s_next]
v_list.append(r_gamma)
v[s] = max(v_list)
dv = np.sum(np.abs(v-v_old))
return v, dv
# define what happens in a single iteration
def epoch(v):
for _ in range(100):
v, dv = update_value(v)
print(dv)
if dv < epsilon:
break
return v
# run the iteration
for iter in range(100):
print("iter:", iter)
v = epoch(v)
run_episode()
3. Model Free Prediction using TD
from pyamaze import maze, agent
import numpy as np
""" Perform model-free prediction using TD
- Reuse the maze environment (maze.csv) from previous class
- Assume that the environment model is unknown (i.e. no transition matrix)
- Compute value v[s] for the uniform random policy, pi[s,a]=0.25
- Compare this with the true value (true_value.npy)
"""
"""
MazeEnvironment Class:
- reset(): reset to a random state, returns: state_index
- step(a): apply action a, returns: reward, next state, done [done=True if episode terminates]
"""
class MazeEnvironment:
def __init__(self):
self.size = size = 5
self.m = maze(size, size)
self.m.CreateMaze(loadMaze="maze.csv")
self.states = list(self.m.maze_map.keys())
self.actions = ['E', 'N', 'W', 'S']
def reset(self):
self.s = np.random.choice(np.arange(0,25))
self.state = self.states[self.s]
return self.s
def step(self, a):
action = self.actions[a]
state = self.state
if self.m.maze_map[state][action] == 0:
reward = -1
done= True
new_state = None
return reward, new_state, done
if state==(1,1):
reward = 0
done = True
new_state = 0
return reward, new_state, done
x, y = state
if action == 'E':
y += 1
elif action == 'W':
y -= 1
elif action == 'N':
x -= 1
elif action == 'S':
x += 1
self.state = new_state = (x, y)
self.s = s_new = self.states.index(self.state)
reward = -1
done = False
return reward, self.s, done
""" Initialize the environment """
env = MazeEnvironment()
size = env.size
states = env.states
actions = env.actions
""" Initialize the value and policy """
v = np.zeros(shape=[len(states)]) # shape: [S]
pi = np.ones(shape=[len(states), len(actions)])*0.25 # shape: [S, A]
""" visualizer functions """
def show_v():
print(np.reshape(v, [size, size]).T)
def show_pi():
action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
for s in range(len(states)):
for a in range(len(actions)):
if pi[s,a]>0.0:
pi_disp[s] = pi_disp[s] + action_arrows[a]
else:
pi_disp[s] = pi_disp[s] + " "
print(np.reshape(pi_disp, [size, size]).T)
""" define run_episode function:
- runs an episode, utilizing env.reset() and env.step functions
- terminates when done=true or total_steps > 100
- returns:
history of states: S (s_0, ..., s_T+1), # initial state가 필요하기 때문에 A나 R보다 1개가 더 많음
history of actions: A (a_0, ..., a_T),
history of rewards: R (r_0. .., r_T)
"""
def run_episode():
s = env.reset() # s0
S = [s] # history of states
A = [] # history of actions
R = [] # history of rewards
done = False # episode is steel runing
while not done: # do steps
a_prob = pi[s, :] # 4 values
a = np.random.choice([0, 1, 2, 3], p=a_prob) # decide action
r, s, done = env.step(a)
S.append(s)
A.append(a)
R.append(r)
if not done and len(A)>100:
done = True
return S, A, R
gamma = 0.99
def td_prediction(alpha=0.1, episodes=100):
for _ in range(episodes):
S,A,R = run_episode() # gather (S,A,R) from an episode
T = len(A) # set T = episode_length
for i in range(T):
s = S[i] # current_state
r = R[i] # obtained reward
s_next = S[i+1] # next_state
if s_next is None:
v_new = r
elif s_next == 0:
v_new = 0
else:
v_new = r+gamma*v[s_next] # the new experienced value (i.e.TD target)
v[s] = v[s]+alpha*(v_new-v[s]) # update towards this new value
return v
def epoch(v, epsilon=0.1):
for _ in range(10000):
v_old = v.copy()
v = td_prediction()
dv = np.mean(np.abs(v_old - v))
if dv<epsilon:
break
return v
show_v()
v = epoch(v)
print("TD-value:")
show_v()
vt = np.load("true_value.npy")
v = vt
print("true-value:")
show_v()
4. Q-Learning
from pyamaze import maze, agent
import numpy as np
""" Perform model-free prediction using TD
- Reuse the maze environment (maze.csv) from previous class
- Assume that the environment model is unknown (i.e. no transition matrix)
- Perform Q-learning
"""
"""
MazeEnvironment Class:
- reset(): reset to a random state, returns: state_index
- step(a): apply action a, returns: reward, next state, done [done=True if episode terminates]
"""
class MazeEnvironment:
def __init__(self):
self.size = size = 5
self.m = maze(size, size)
self.m.CreateMaze(loadMaze="maze.csv")
self.states = list(self.m.maze_map.keys())
self.actions = ['E', 'N', 'W', 'S']
def reset(self):
self.s = np.random.choice(np.arange(0,25)) # choose 5*5 state randomly
self.state = self.states[self.s]
return self.s
def step(self, a):
action = self.actions[a]
state = self.state
if state == (1,1): # goal
reward = 0
done = True
new_state = 0
return reward, new_state, done
elif self.m.maze_map[state][action] == 0:
reward = -10
done= True
new_state = None
return reward, new_state, done
x, y = state
if action == 'E':
y += 1
elif action == 'W':
y -= 1
elif action == 'N':
x -= 1
elif action == 'S':
x += 1
self.state = new_state = (x, y)
self.s = s_new = self.states.index(self.state)
reward = -1 # each step, get -1
done = False
return reward, self.s, done
""" Initialize the environment """
env = MazeEnvironment()
size = env.size
states = env.states
actions = env.actions
""" Initialize the value and policy """
q = np.zeros(shape=[len(states), len(actions)]) # shape: [S, A]
pi = np.ones(shape=[len(states), len(actions)])*0.25 # [S,A] // epsilon policy
""" visualizer functions """
def show_pi():
action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
for s in range(len(states)):
for a in range(len(actions)):
if pi[s,a]>0.0:
pi_disp[s] = pi_disp[s] + action_arrows[a]
else:
pi_disp[s] = pi_disp[s] + " "
print(np.reshape(pi_disp, [size, size]).T)
""" define run_episode function:
- runs an episode, utilizing env.reset() and env.step functions
- terminates when done=true or total_steps > 100
- returns:
history of states: S (s_0, ..., s_T+1),
history of actions: A (a_0, ..., a_T),
history of rewards: R (r_0,. .., r_T)
"""
def run_episode():
s = env.reset()
done = False
S, A, R = [s], [], []
while not done or len(A)>100:
print(pi.shape)
a = np.random.choice([0, 1, 2, 3], p=pi[s, :]) # sample the action
r, s, done = env.step(a) # apply the action
# add to the record
S.append(s)
A.append(a)
R.append(r)
return S, A, R
def sim_episode():
S,A,R = run_episode()
env.m.CreateMaze(loadMaze="maze.csv")
s = S[0]
pos = states[s]
a = agent(env.m, footprints=True, x=pos[0], y=pos[1])
for s in S:
if s is None:
break
pos = states[s]
a.position = pos
env.m.run()
gamma = 0.99
"""
m = number of actions
set pi[s,a] = epsilon/m for all a
set pi[s,a] = epsilon/m + (1-epsilon) IF a = argmax_a q(s,a)
"""
def epsilon_greedy(q, epsilon=0.3):
m = len(actions)
pi[:, :] = epsilon/m
for s in range(len(states)):
a_max = np.argmax(q[s, :])
pi[s, a_max] += (1-epsilon)
return pi
"""
q_new = r + gamma*max_a'(q(s',a'))
q(s,a) = q(s,a) + alpha*(q_new - q(s,a))
"""
def q_learning(alpha=0.1, episodes=100):
for _ in range(episodes):
# gather/sample transitions
S, A, R = run_episode()
for i in range(len(A)):
s = S[i]
a = A[i]
r = R[i]
s_next = S[i+1]
if s_next is None:
q[s, a] = -100000
else:
q_target = q_new = r+gamma*np.max(q[s_next, :])
q[s, a] = q[s, a]+alpha*(q_new-q[s, a])
dq_min = 0.01
def train():
global pi
for _ in range(1000):
q_old = q.copy()
q_learning()
dq = np.mean(np.abs(q_old - q))
if dq<dq_min:
break
pi = epsilon_greedy(q)
show_pi()
train()
pi = epsilon_greedy(q, 0.0)
show_pi()
728x90
반응형
소중한 공감 감사합니다