其实早在13年的时候,deepmind出来了第一篇用深度学习来解决Q学习的问题的paper,那个时候deepmind还不够火,和一般的Q学习不同的是,由于12年Alex率先用CNN解决图像中的high level的语义的提取,deepmind也同时采用了CNN来直接对图像进行特征提取,而非传统的进行手工特征提取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 | # -*-coding:utf-8-*- import gym import math import random import numpy as np import matplotlib import matplotlib.pyplot as plt from collections import namedtuple from itertools import count from PIL import Image import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as T env = gym.make(\'CartPole-v0\').unwrapped # set up matplotlib is_ipython = \'inline\' in matplotlib.get_backend() if is_ipython: from IPython import display plt.ion() # if gpu is to be used # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") Transition = namedtuple(\'Transition\', (\'state\', \'action\', \'next_state\', \'reward\')) # 声明一个name为Transition,里面的变量为以下的类似dict的 class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): """Saves a transition.""" if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position 1) % self.capacity def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): # 定义__len__以便于用len函数? return len(self.memory) class DQN(nn.Module): def __init__(self): super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2) self.bn3 = nn.BatchNorm2d(32) self.head = nn.Linear(448, 2) def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.head(x.view(x.size(0), -1)) resize = T.Compose([T.ToPILImage(), T.Resize(40, interpolation=Image.CUBIC), T.ToTensor()]) # This is based on the code from gym. screen_width = 600 def get_cart_location(): world_width = env.x_threshold * 2 scale = screen_width / world_width return int(env.state[0] * scale screen_width / 2.0) # MIDDLE OF CART def get_screen(): screen = env.render(mode=\'rgb_array\').transpose( (2, 0, 1)) # transpose into torch order (CHW) # Strip off the top and bottom of the screen screen = screen[:, 160:320] view_width = 320 cart_location = get_cart_location() if cart_location < view_width // 2: slice_range = slice(view_width) elif cart_location > (screen_width - view_width // 2): slice_range = slice(-view_width, None) else: slice_range = slice(cart_location - view_width // 2, cart_location view_width // 2) # Strip off the edges, so that we have a square image centered on a cart screen = screen[:, :, slice_range] # Convert to float, rescare, convert to torch tensor # (this doesn\'t require a copy) screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 screen = torch.from_numpy(screen) # Resize, and add a batch dimension (BCHW) return resize(screen).unsqueeze(0).cuda() env.reset() # plt.figure() # plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), # interpolation=\'none\') # plt.title(\'Example extracted screen\') # plt.show() BATCH_SIZE = 128 GAMMA = 0.999 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 TARGET_UPDATE = 10 policy_net = DQN().cuda() target_net = DQN().cuda() target_net.load_state_dict(policy_net.state_dict()) target_net.eval() optimizer = optim.RMSprop(policy_net.parameters()) memory = ReplayMemory(10000) steps_done = 0 def select_action(state): global steps_done sample = random.random() eps_threshold = EPS_END (EPS_START - EPS_END) * \ math.exp(-1. * steps_done / EPS_DECAY) steps_done = 1 if sample > eps_threshold: with torch.no_grad(): return policy_net(state).max(1)[1].view(1, 1) # policy网络的输出 else: return torch.tensor([[random.randrange(2)]], dtype=torch.long).cuda() # 随机的选择一个网络的输出或者 episode_durations = [] def plot_durations(): plt.figure(2) plt.clf() durations_t = torch.tensor(episode_durations, dtype=torch.float) plt.title(\'Training...\') plt.xlabel(\'Episode\') plt.ylabel(\'Duration\') plt.plot(durations_t.numpy()) # Take 100 episode averages and plot them too if len(durations_t) >= 100: means = durations_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) plt.plot(means.numpy()) plt.pause(0.001) # pause a bit so that plots are updated if is_ipython: display.clear_output(wait=True) display.display(plt.gcf()) def optimize_model(): if len(memory) < BATCH_SIZE: return transitions = memory.sample(BATCH_SIZE) # 进行随机的sample,序列问题是不存在的 # print(transitions) # Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for # detailed explanation). batch = Transition(*zip(*transitions)) # print("current") # print(batch.state[0]) # print("next") # print(batch.next_state[0]) # print(torch.sum(batch.state[0])) # print(torch.sum(batch.next_state[0])) # print(torch.sum(batch.state[1])) # # print(type(batch)) # print("@#$%^&*") # Compute a mask of non-final states and concatenate the batch elements non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.uint8).cuda() # lambda表达式返回的是否为空的二值 non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) # 空的不cat,所以长度不一定是batchsize # print("the non_final_mask is") # print(non_final_mask) # none_total = 0 # total = 0 # for s in batch.next_state: # if s is None: # none_total = none_total 1 # else: # total = total 1 # print(none_total, total) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) # print(action_batch) # 非0即1 # print(reward_batch) # print(len(non_final_mask)) # Compute Q(s_t, a) - the model computes Q(s_t), then we select the # columns of actions taken state_action_values = policy_net(state_batch).gather(1, action_batch) # gather将torch.tensor的中对应于action的index取出,dim为1 # 从整体公式上而言,Q函数的值即为state_action_value的值 # print((policy_net(state_batch))) # print(state_action_values) # Compute V(s_{t 1}) for all next states. next_state_values = torch.zeros(BATCH_SIZE).cuda() # print(next_state_values) # print("no final mask") # print(non_final_mask) # print("@#$%^&*") next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() # non_final_mask为1的地方进行赋值操作,其余仍为0 # print(target_net(non_final_next_states).max(1)[0].detach()) # print("12345") # print(next_state_values) # Compute the expected Q values expected_state_action_values = (next_state_values * GAMMA) reward_batch # Compute Huber loss loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)) # compare the parameters of 2 networks print(policy_net.state_dict()[\'head.bias\']) print("!@#$%^&*") print(target_net.state_dict()[\'head.bias\']) # Optimize the model optimizer.zero_grad() loss.backward() for param in policy_net.parameters(): param.grad.data.clamp_(-1, 1) optimizer.step() num_episodes = 50 for i_episode in range(num_episodes): # print("the episode is %f" % i_episode) # Initialize the environment and state env.reset() last_screen = get_screen() # print(last_screen) # print("#QW&*!$") current_screen = get_screen() # 得到一张图片,而非一个batch # print(current_screen) state = current_screen - last_screen # 两帧之间的差值,作为一个state,并且输入网络,类比于RNN对pose的估计 for t in count(): # 创建一个无限循环迭代器,t的数值会一直增加 # Select and perform an action action = select_action(state) _, reward, done, _ = env.step(action.item()) # done表示游戏是否结束, reward由gym内部决定;输入action,gym展示下一个状态 reward = torch.tensor([reward]).cuda() # Observe new state last_screen = current_screen current_screen = get_screen() if not done: next_state = current_screen - last_screen else: next_state = None # Store the transition in memory memory.push(state, action, next_state, reward) # memory存储state,action,next_state,以及对应的reward # print("the length of the memory is %d" % len(memory)) # Move to the next state state = next_state # Perform one step of the optimization (on the target network) optimize_model() if done: episode_durations.append(t 1) plot_durations() break # Update the target network if i_episode % TARGET_UPDATE == 0: # 只有在某个频率下才会update target网络结构 target_net.load_state_dict(policy_net.state_dict()) print(\'Complete\') env.render() env.close() plt.ioff() plt.show() env.close() |
loss如上,实际上就是求取两个Q函数之间的差值,ok,前一个Q函数的自变量描述的是当前的状态s以及对应的行为a,后一个r Q描述的是当前的reward加上,在下一个state如何采取下一步行动能够让Q最大的项。
优化的目标是policy net,target网络为定期对policy的copy,如下:
policy net输入state batch,并且将实际中的对应的action的那一列输出,action非0即1,所以policy_net输出的是batch_size的列向量。
