Graduate School/Topics on AI

Solving Maze using Reinforcement Learning

  • -
728x90
반응형

1. DP using greedy

from pyamaze import maze, agent
import numpy as np

# Load the Maze
size = 5
m=maze(size,size)
m.CreateMaze(loadMaze="maze.csv")

# create the environment model
states = list(m.maze_map.keys())
actions = ['E','N', 'W', 'S']

# define how an action changes a state
def step(state, action):
    x, y = state
    if action=='E':
        y += 1
    elif action=='W':
        y -= 1
    elif action=='N':
        x -= 1
    elif action=='S':
        x += 1
    new_state = (x,y)
    return new_state

# create the transition model
# trans[s,a] = s'. e.g., trans[0,3] = 2, trans[]
trans = np.ones(shape=[len(states), len(actions)], dtype=np.int32)*(-1)
for s, state in enumerate(states):
    for a, action in enumerate(actions):
        #check if the action is possible
        if m.maze_map[state][action]>0:
            next_state = step(state,action)
            s_next = states.index(next_state)
            trans[s,a] =s_next
    if s==0:
        trans[s,:] = 0


rewards = np.ones(shape=[len(states), len(actions)])*(-1.0)
rewards[0, :] = 0.0
v = np.zeros(shape=[len(states)])
pi = np.ones(shape=[len(states), len(actions)])*(0.25)

def show_v():
    print(np.reshape(v, [size, size]).T)

def show_pi():
    action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
    pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
    for s in range(len(states)):
        for a in range(len(actions)):
            if pi[s,a]>0.0:
                pi_disp[s] = pi_disp[s] + action_arrows[a]
            else:
                pi_disp[s] = pi_disp[s] + " "
    print(np.reshape(pi_disp, [size, size]).T)


def run_episode(s=24):
    h = [states[s]]
    while s != 0:
        a = np.random.choice(np.arange(0,len(actions)), p=pi[s])
        h.append(states[s])
        s = trans[s][a]
        print(s)
        if s == -1 or len(h)>100:
            print("failed! total_steps=", len(h)-1)
            break
    if s==0:
            print("succeeded! total_steps=", len(h)-1)
    m.CreateMaze(loadMaze="maze.csv")
    a = agent(m, footprints=True, x=h[0][0], y=h[0][1])
    for pos in h:
        a.position = pos
    m.run()
    return h

def greedy(v):
    # initialize all pi[s,a] = 0.0
    pi = np.zeros(shape=[len(states),len(actions)])
    for s in range(len(states)):
        v_max = -np.inf
        a_max = []
        for a in range(len(actions)):
            s_next = trans[s,a]
            if s_next > -1 :
                #next_state_value = v[s_next]
                if v[s_next] > v_max:
                    v_max = v[s_next]
                    a_max = [a]
                elif v[s_next] == v_max:
                    a_max.append(a)

        #assign pi (s,a_max)
        n_max = len(a_max)
        prb = 1.0/n_max
        for action in a_max:
            pi[s,action] = prb

    return pi


gamma = 1.0
epsilon = 0.1


def update_value(v):
    v_old = v.copy()
    for s, state in enumerate(states):
        # v[s] = sum_n(pi(s, n)*total_outcome_for_a)
        v_sum = 0.0
        for a, action in enumerate(actions):
            r = rewards[s, a] # what is reward
            s_next = trans[s, a] # what is next state
            if s_next > -1:
                r_gamma = r+gamma*v_old[s_next]
                v_sum = v_sum+pi[s, a]*r_gamma
        v[s] = v_sum
    
    dv = np.sum(np.abs(v-v_old))
    
    return v, dv


# define what happens in a single iteration
def epoch(v, pi):
    for _ in range(100):
        v, dv = update_value(v)
        print(dv)
        if dv < epsilon:
            break
    pi = greedy(v)
    return v, pi


# run the iteration
for iter in range(100):
    print("iter:", iter)
    v, pi = epoch(v, pi)


run_episode()

 

2. Value Iteration

from pyamaze import maze, agent
import numpy as np

# Load the Maze
size = 5
m=maze(size,size)
m.CreateMaze(loadMaze="maze.csv")

# create the environment model
states = list(m.maze_map.keys())
actions = ['E','N', 'W', 'S']

# define how an action changes a state
def step(state, action):
    x, y = state
    if action=='E':
        y += 1
    elif action=='W':
        y -= 1
    elif action=='N':
        x -= 1
    elif action=='S':
        x += 1
    new_state = (x,y)
    return new_state

# create the transition model
# trans[s,a] = s'. e.g., trans[0,3] = 2, trans[]
trans = np.ones(shape=[len(states), len(actions)], dtype=np.int32)*(-1)
for s, state in enumerate(states):
    for a, action in enumerate(actions):
        #check if the action is possible
        if m.maze_map[state][action] > 0:
            next_state = step(state,action)
            s_next = states.index(next_state)
            trans[s,a] =s_next
    if s == 0:
        trans[s,:] = 0

rewards = np.ones(shape=[len(states), len(actions)])*(-1.0)
rewards[0, :] = 0.0
v = np.zeros(shape=[len(states)])

def show_v():
    print(np.reshape(v, [size, size]).T)

def run_episode(s=24):
    h = [states[s]]
    while s != 0:
        a = np.argmax([rewards[s, a]+gamma*v[trans[s][a]] if trans[s][a] > -1 else -np.inf for a in range(len(actions))])
        h.append(states[s])
        s = trans[s][a]
        print(s)
        if s == -1 or len(h)>100:
            print("failed! total_steps=", len(h)-1)
            break
    if s==0:
            print("succeeded! total_steps=", len(h)-1)
    m.CreateMaze(loadMaze="maze.csv")
    a = agent(m, footprints=True, x=h[0][0], y=h[0][1])
    for pos in h:
        a.position = pos
    m.run()
    return h

gamma = 1.0
epsilon = 0.1

def update_value(v):
    v_old = v.copy()
    for s, state in enumerate(states):
        #v[s] = max(outcomes_for_a)
        v_list = []
        for a, action in enumerate(actions):
            r = rewards[s, a] # what is reward
            s_next = trans[s, a] # what is next state
            if s_next > -1:
                r_gamma = r+gamma*v_old[s_next]
                v_list.append(r_gamma)
        v[s] = max(v_list)
    dv = np.sum(np.abs(v-v_old))
    return v, dv

# define what happens in a single iteration
def epoch(v):
    for _ in range(100):
        v, dv = update_value(v)
        print(dv)
        if dv < epsilon:
            break
    return v

# run the iteration
for iter in range(100):
    print("iter:", iter)
    v = epoch(v)

run_episode()

 

3. Model Free Prediction using TD

from pyamaze import maze, agent
import numpy as np

""" Perform model-free prediction using TD
- Reuse the maze environment (maze.csv) from previous class
- Assume that the environment model is unknown (i.e. no transition matrix)
- Compute value v[s] for the uniform random policy, pi[s,a]=0.25
- Compare this with the true value (true_value.npy)
"""

"""
MazeEnvironment Class:
- reset(): reset to a random state, returns: state_index
- step(a): apply action a, returns: reward, next state, done [done=True if episode terminates]
"""
class MazeEnvironment:
    def __init__(self):
        self.size = size = 5
        self.m = maze(size, size)
        self.m.CreateMaze(loadMaze="maze.csv")
        self.states = list(self.m.maze_map.keys())
        self.actions = ['E', 'N', 'W', 'S']

    def reset(self):
        self.s = np.random.choice(np.arange(0,25))
        self.state = self.states[self.s]
        return self.s

    def step(self, a):
        action = self.actions[a]
        state = self.state
        if self.m.maze_map[state][action] == 0:
            reward = -1
            done= True
            new_state = None
            return reward, new_state, done
        if state==(1,1):
            reward = 0
            done = True
            new_state = 0
            return reward, new_state, done

        x, y = state
        if action == 'E':
            y += 1
        elif action == 'W':
            y -= 1
        elif action == 'N':
            x -= 1
        elif action == 'S':
            x += 1
        self.state = new_state = (x, y)
        self.s = s_new = self.states.index(self.state)
        reward = -1
        done = False
        return reward, self.s, done

""" Initialize the environment """
env = MazeEnvironment()
size = env.size
states = env.states
actions = env.actions

""" Initialize the value and policy """
v = np.zeros(shape=[len(states)]) # shape: [S]
pi = np.ones(shape=[len(states), len(actions)])*0.25 # shape: [S, A]

""" visualizer functions """
def show_v():
    print(np.reshape(v, [size, size]).T)

def show_pi():
    action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
    pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
    for s in range(len(states)):
        for a in range(len(actions)):
            if pi[s,a]>0.0:
                pi_disp[s] = pi_disp[s] + action_arrows[a]
            else:
                pi_disp[s] = pi_disp[s] + " "
    print(np.reshape(pi_disp, [size, size]).T)

""" define run_episode function: 
- runs an episode, utilizing env.reset() and env.step functions
- terminates when done=true or total_steps > 100
- returns:
    history of states: S (s_0, ..., s_T+1), # initial state가 필요하기 때문에 A나 R보다 1개가 더 많음
    history of actions: A (a_0, ..., a_T),
    history of rewards: R  (r_0. .., r_T)
"""
def run_episode():
    s = env.reset() # s0
    S = [s] # history of states
    A = [] # history of actions
    R = [] # history of rewards
    done = False # episode is steel runing
    
    while not done: # do steps
        a_prob = pi[s, :] # 4 values
        a = np.random.choice([0, 1, 2, 3], p=a_prob) # decide action
        r, s, done = env.step(a)
        
        S.append(s)
        A.append(a)
        R.append(r)
        
        if not done and len(A)>100:
            done = True
    
    return S, A, R



gamma = 0.99

def td_prediction(alpha=0.1, episodes=100):
    for _ in range(episodes):
        S,A,R = run_episode() # gather (S,A,R) from an episode
        T = len(A) # set T = episode_length
        for i in range(T):
            s = S[i] # current_state
            r = R[i] # obtained reward
            s_next = S[i+1] # next_state
            if s_next is None:
                v_new = r
            elif s_next == 0:
                v_new = 0
            else:
                v_new = r+gamma*v[s_next] # the new experienced value (i.e.TD target)
            v[s] = v[s]+alpha*(v_new-v[s]) # update towards this new value

    return v


def epoch(v, epsilon=0.1):
    for _ in range(10000):
        v_old = v.copy()
        v = td_prediction()
        dv = np.mean(np.abs(v_old - v))
        if dv<epsilon:
            break
    return v


show_v()
v = epoch(v)
print("TD-value:")
show_v()

vt = np.load("true_value.npy")
v = vt
print("true-value:")
show_v()

 

4. Q-Learning

from pyamaze import maze, agent
import numpy as np

""" Perform model-free prediction using TD
- Reuse the maze environment (maze.csv) from previous class
- Assume that the environment model is unknown (i.e. no transition matrix)
- Perform Q-learning
"""


"""
MazeEnvironment Class:
- reset(): reset to a random state, returns: state_index
- step(a): apply action a, returns: reward, next state, done [done=True if episode terminates]

"""
class MazeEnvironment:
    def __init__(self):
        self.size = size = 5
        self.m = maze(size, size)
        self.m.CreateMaze(loadMaze="maze.csv")
        self.states = list(self.m.maze_map.keys())
        self.actions = ['E', 'N', 'W', 'S']

    def reset(self):
        self.s = np.random.choice(np.arange(0,25)) # choose 5*5 state randomly
        self.state = self.states[self.s]
        return self.s

    def step(self, a):
        action = self.actions[a]
        state = self.state
        if state == (1,1): # goal
            reward = 0
            done = True
            new_state = 0
            return reward, new_state, done
        elif self.m.maze_map[state][action] == 0:
            reward = -10
            done= True
            new_state = None
            return reward, new_state, done

        x, y = state
        if action == 'E':
            y += 1
        elif action == 'W':
            y -= 1
        elif action == 'N':
            x -= 1
        elif action == 'S':
            x += 1
        self.state = new_state = (x, y)
        self.s = s_new = self.states.index(self.state)
        reward = -1 # each step, get -1
        done = False
        return reward, self.s, done


""" Initialize the environment """
env = MazeEnvironment()
size = env.size
states = env.states
actions = env.actions

""" Initialize the value and policy """
q = np.zeros(shape=[len(states), len(actions)]) # shape: [S, A]
pi = np.ones(shape=[len(states), len(actions)])*0.25 # [S,A] // epsilon policy

""" visualizer functions """

def show_pi():
    action_arrows = ["\u2192", "\u2191", "\u2190", "\u2193"]
    pi_disp = np.array(['' for _ in range(size*size)], dtype="<U4")
    for s in range(len(states)):
        for a in range(len(actions)):
            if pi[s,a]>0.0:
                pi_disp[s] = pi_disp[s] + action_arrows[a]
            else:
                pi_disp[s] = pi_disp[s] + " "
    print(np.reshape(pi_disp, [size, size]).T)

""" define run_episode function: 
- runs an episode, utilizing env.reset() and env.step functions
- terminates when done=true or total_steps > 100
- returns:
    history of states: S (s_0, ..., s_T+1), 
    history of actions: A (a_0, ..., a_T),
    history of rewards: R  (r_0,. .., r_T)
"""
def run_episode():
    s = env.reset()
    done = False
    S, A, R = [s], [], []
    while not done or len(A)>100:
        print(pi.shape)
        a = np.random.choice([0, 1, 2, 3], p=pi[s, :]) # sample the action
        r, s, done = env.step(a) # apply the action
        
        # add to the record
        S.append(s)
        A.append(a)
        R.append(r)
    return S, A, R


def sim_episode():
    S,A,R = run_episode()
    env.m.CreateMaze(loadMaze="maze.csv")
    s = S[0]
    pos = states[s]
    a = agent(env.m, footprints=True, x=pos[0], y=pos[1])
    for s in S:
        if s is None:
            break
        pos = states[s]
        a.position = pos
    env.m.run()


gamma = 0.99

"""
m = number of actions
set pi[s,a] = epsilon/m  for all a
set pi[s,a] = epsilon/m + (1-epsilon)  IF a = argmax_a q(s,a)
"""
def epsilon_greedy(q, epsilon=0.3):
    m = len(actions)
    pi[:, :] = epsilon/m
    for s in range(len(states)):
        a_max = np.argmax(q[s, :])
        pi[s, a_max] += (1-epsilon)
    return pi

""" 
q_new = r + gamma*max_a'(q(s',a'))
q(s,a) = q(s,a) + alpha*(q_new - q(s,a))
"""
def q_learning(alpha=0.1, episodes=100):
    for _ in range(episodes):
        # gather/sample transitions
        S, A, R = run_episode()
        for i in range(len(A)):
            s = S[i]
            a = A[i]
            r = R[i]
            s_next = S[i+1]
            if s_next is None:
                q[s, a] = -100000
            else:
                q_target = q_new = r+gamma*np.max(q[s_next, :])
                q[s, a] = q[s, a]+alpha*(q_new-q[s, a])


dq_min = 0.01


def train():
    global pi
    for _ in range(1000):
        q_old = q.copy()
        q_learning()
        dq = np.mean(np.abs(q_old - q))
        if dq<dq_min:
            break
        pi = epsilon_greedy(q)


show_pi()
train()
pi = epsilon_greedy(q, 0.0)
show_pi()
728x90
반응형

'Graduate School > Topics on AI' 카테고리의 다른 글

Mine Sweeper Game  (0) 2024.09.10
Last Card Game  (2) 2024.09.10
AutoEncoder Implementation  (0) 2024.09.10
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.