关于我们

质量为本、客户为根、勇于拼搏、务实创新

< 返回新闻公共列表

强化学习从基础到进阶--案例与实践[7.1]:深度确定性策略梯度DDPG算法、双延迟深度确定性策略梯度TD3算法详解项目实战

发布时间:2023-06-28 01:00:15
强化学习从基础到进阶--案例与实践[7.1]:深度确定性策略梯度DDPG算法、双延迟深度确定性策略梯度TD3算法详解项目实战 项目链接见文末fork一下直接运行 1、定义算法 1.1 定义模型 !pip uninstall -y parl !pip install parl import parl import paddle import paddle.nn as nn import paddle.nn.functional as F class Actor(parl.Model): def __init__(self, n_states, n_actions): super(Actor, self).__init__() self.l1 = nn.Linear(n_states, 400) self.l2 = nn.Linear(400, 300) self.l3 = nn.Linear(300, n_actions) def forward(self, state): x = F.relu(self.l1(state)) x = F.relu(self.l2(x)) return paddle.tanh(self.l3(x)) class Critic(parl.Model): def __init__(self, n_states, n_actions): super(Critic, self).__init__() self.l1 = nn.Linear(n_states, 400) self.l2 = nn.Linear(400 + n_actions, 300) self.l3 = nn.Linear(300, 1) def forward(self, state, action): x = F.relu(self.l1(state)) x = F.relu(self.l2(paddle.concat([x, action], 1))) return self.l3(x) class ActorCritic(parl.Model): def __init__(self, n_states, n_actions): super(ActorCritic, self).__init__() self.actor_model = Actor(n_states, n_actions) self.critic_model = Critic(n_states, n_actions) def policy(self, state): return self.actor_model(state) def value(self, state, action): return self.critic_model(state, action) def get_actor_params(self): return self.actor_model.parameters() def get_critic_params(self): return self.critic_model.parameters() 1.2 定义经验回放 from collections import deque import random class ReplayBuffer: def __init__(self, capacity: int) -> None: self.capacity = capacity self.buffer = deque(maxlen=self.capacity) def push(self,transitions): '''_summary_ Args: trainsitions (tuple): _description_ ''' self.buffer.append(transitions) def sample(self, batch_size: int, sequential: bool = False): if batch_size > len(self.buffer): batch_size = len(self.buffer) if sequential: # sequential sampling rand = random.randint(0, len(self.buffer) - batch_size) batch = [self.buffer[i] for i in range(rand, rand + batch_size)] return zip(*batch) else: batch = random.sample(self.buffer, batch_size) return zip(*batch) def clear(self): self.buffer.clear() def __len__(self): return len(self.buffer) 1.3 定义智能体 import parl import paddle import numpy as np class DDPGAgent(parl.Agent): def __init__(self, algorithm,memory,cfg): super(DDPGAgent, self).__init__(algorithm) self.n_actions = cfg['n_actions'] self.expl_noise = cfg['expl_noise'] self.batch_size = cfg['batch_size'] self.memory = memory self.alg.sync_target(decay=0) def sample_action(self, state): action_numpy = self.predict_action(state) action_noise = np.random.normal(0, self.expl_noise, size=self.n_actions) action = (action_numpy + action_noise).clip(-1, 1) return action def predict_action(self, state): state = paddle.to_tensor(state.reshape(1, -1), dtype='float32') action = self.alg.predict(state) action_numpy = action.cpu().numpy()[0] return action_numpy def update(self): if len(self.memory) < self.batch_size: return state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample( self.batch_size) done_batch = np.expand_dims(done_batch , -1) reward_batch = np.expand_dims(reward_batch, -1) state_batch = paddle.to_tensor(state_batch, dtype='float32') action_batch = paddle.to_tensor(action_batch, dtype='float32') reward_batch = paddle.to_tensor(reward_batch, dtype='float32') next_state_batch = paddle.to_tensor(next_state_batch, dtype='float32') done_batch = paddle.to_tensor(done_batch, dtype='float32') critic_loss, actor_loss = self.alg.learn(state_batch, action_batch, reward_batch, next_state_batch, done_batch) 2. 定义训练 def train(cfg, env, agent): ''' 训练 ''' print(f"开始训练!") rewards = [] # 记录所有回合的奖励 for i_ep in range(cfg["train_eps"]): ep_reward = 0 state = env.reset() for i_step in range(cfg['max_steps']): action = agent.sample_action(state) # 采样动作 next_state, reward, done, _ = env.step(action) agent.memory.push((state, action, reward,next_state, done)) state = next_state agent.update() ep_reward += reward if done: break rewards.append(ep_reward) if (i_ep + 1) % 10 == 0: print(f"回合:{i_ep+1}/{cfg['train_eps']},奖励:{ep_reward:.2f}") print("完成训练!") env.close() res_dic = { 'episodes':range(len(rewards)),'rewards':rewards} return res_dic def test(cfg, env, agent): print("开始测试!") rewards = [] # 记录所有回合的奖励 for i_ep in range(cfg['test_eps']): ep_reward = 0 state = env.reset() for i_step in range(cfg['max_steps']): action = agent.predict_action(state) next_state, reward, done, _ = env.step(action) state = next_state ep_reward += reward if done: break rewards.append(ep_reward) print(f"回合:{i_ep+1}/{cfg['test_eps']},奖励:{ep_reward:.2f}") print("完成测试!") env.close() return { 'episodes':range(len(rewards)),'rewards':rewards} 3、定义环境 OpenAI Gym中其实集成了很多强化学习环境,足够大家学习了,但是在做强化学习的应用中免不了要自己创建环境,比如在本项目中其实不太好找到Qlearning能学出来的环境,Qlearning实在是太弱了,需要足够简单的环境才行,因此本项目写了一个环境,大家感兴趣的话可以看一下,一般环境接口最关键的部分即使reset和step。 import gym import os import paddle import numpy as np import random from parl.algorithms import DDPG class NormalizedActions(gym.ActionWrapper): ''' 将action范围重定在[0.1]之间 ''' def action(self, action): low_bound = self.action_space.low upper_bound = self.action_space.high action = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound) action = np.clip(action, low_bound, upper_bound) return action def reverse_action(self, action): low_bound = self.action_space.low upper_bound = self.action_space.high action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1 action = np.clip(action, low_bound, upper_bound) return action def all_seed(env,seed = 1): ''' 万能的seed函数 ''' env.seed(seed) # env config np.random.seed(seed) random.seed(seed) paddle.seed(seed) def env_agent_config(cfg): env = NormalizedActions(gym.make(cfg['env_name'])) # 装饰action噪声 if cfg['seed'] !=0: all_seed(env,seed=cfg['seed']) n_states = env.observation_space.shape[0] n_actions = env.action_space.shape[0] print(f"状态维度:{n_states},动作维度:{n_actions}") cfg.update({ "n_states":n_states,"n_actions":n_actions}) # 更新n_states和n_actions到cfg参数中 memory = ReplayBuffer(cfg['memory_capacity']) model = ActorCritic(n_states, n_actions) algorithm = DDPG(model, gamma=cfg['gamma'], tau=cfg['tau'], actor_lr=cfg['actor_lr'], critic_lr=cfg['critic_lr']) agent = DDPGAgent(algorithm,memory,cfg) return env,agent 4、设置参数 到这里所有qlearning模块就算完成了,下面需要设置一些参数,方便大家“炼丹”,其中默认的是笔者已经调好的~。另外为了定义了一个画图函数,用来描述奖励的变化。 import argparse import matplotlib.pyplot as plt import seaborn as sns def get_args(): """ 超参数 """ parser = argparse.ArgumentParser(description="hyperparameters") parser.add_argument('--algo_name',default='DDPG',type=str,help="name of algorithm") parser.add_argument('--env_name',default='Pendulum-v0',type=str,help="name of environment") parser.add_argument('--train_eps',default=200,type=int,help="episodes of training") parser.add_argument('--test_eps',default=20,type=int,help="episodes of testing") parser.add_argument('--max_steps',default=100000,type=int,help="steps per episode, much larger value can simulate infinite steps") parser.add_argument('--gamma',default=0.99,type=float,help="discounted factor") parser.add_argument('--critic_lr',default=1e-3,type=float,help="learning rate of critic") parser.add_argument('--actor_lr',default=1e-4,type=float,help="learning rate of actor") parser.add_argument('--memory_capacity',default=80000,type=int,help="memory capacity") parser.add_argument('--expl_noise',default=0.1,type=float) parser.add_argument('--batch_size',default=128,type=int) parser.add_argument('--target_update',default=2,type=int) parser.add_argument('--tau',default=1e-2,type=float) parser.add_argument('--critic_hidden_dim',default=256,type=int) parser.add_argument('--actor_hidden_dim',default=256,type=int) parser.add_argument('--device',default='cpu',type=str,help="cpu or cuda") parser.add_argument('--seed',default=1,type=int,help="random seed") args = parser.parse_args([]) args = { **vars(args)} # 将args转换为字典 # 打印参数 print("训练参数如下:") print(''.join(['=']*80)) tplt = "{:^20}\t{:^20}\t{:^20}" print(tplt.format("参数名","参数值","参数类型")) for k,v in args.items(): print(tplt.format(k,v,str(type(v)))) print(''.join(['=']*80)) return args def smooth(data, weight=0.9): '''用于平滑曲线,类似于Tensorboard中的smooth Args: data (List):输入数据 weight (Float): 平滑权重,处于0-1之间,数值越高说明越平滑,一般取0.9 Returns: smoothed (List): 平滑后的数据 ''' last = data[0] # First value in the plot (first timestep) smoothed = list() for point in data: smoothed_val = last * weight + (1 - weight) * point # 计算平滑值 smoothed.append(smoothed_val) last = smoothed_val return smoothed def plot_rewards(rewards,cfg,path=None,tag='train'): sns.set() plt.figure() # 创建一个图形实例,方便同时多画几个图 plt.title(f"{tag}ing curve on {cfg['device']} of {cfg['algo_name']} for {cfg['env_name']}") plt.xlabel('epsiodes') plt.plot(rewards, label='rewards') plt.plot(smooth(rewards), label='smoothed') plt.legend() 5、训练 # 获取参数 cfg = get_args() # 训练 env, agent = env_agent_config(cfg) res_dic = train(cfg, env, agent) plot_rewards(res_dic['rewards'], cfg, tag="train") # 测试 res_dic = test(cfg, env, agent) plot_rewards(res_dic['rewards'], cfg, tag="test") # 画出结果 训练参数如下: ================================================================================ 参数名 参数值 参数类型 algo_name DDPG env_name Pendulum-v0 train_eps 200 test_eps 20 max_steps 100000 gamma 0.99 critic_lr 0.001 actor_lr 0.0001 memory_capacity 80000 expl_noise 0.1 batch_size 128 target_update 2 tau 0.01 critic_hidden_dim 256 actor_hidden_dim 256 device cpu seed 1 ================================================================================ 状态维度:3,动作维度:1 开始训练! 回合:10/200,奖励:-922.80 回合:20/200,奖励:-390.80 回合:30/200,奖励:-125.50 回合:40/200,奖励:-822.66 回合:50/200,奖励:-384.92 回合:60/200,奖励:-132.26 回合:70/200,奖励:-240.20 回合:80/200,奖励:-242.37 回合:90/200,奖励:-127.13 回合:100/200,奖励:-365.29 回合:110/200,奖励:-126.27 回合:120/200,奖励:-231.47 回合:130/200,奖励:-1.98 回合:140/200,奖励:-223.84 回合:150/200,奖励:-123.29 回合:160/200,奖励:-362.06 回合:170/200,奖励:-126.93 回合:180/200,奖励:-119.77 回合:190/200,奖励:-114.72 回合:200/200,奖励:-116.01 完成训练! 开始测试! 回合:1/20,奖励:-125.61 回合:2/20,奖励:-0.97 回合:3/20,奖励:-130.02 回合:4/20,奖励:-117.46 回合:5/20,奖励:-128.45 回合:6/20,奖励:-124.48 回合:7/20,奖励:-118.31 回合:8/20,奖励:-127.18 回合:9/20,奖励:-118.09 回合:10/20,奖励:-0.55 回合:11/20,奖励:-117.72 回合:12/20,奖励:-1.08 回合:13/20,奖励:-124.74 回合:14/20,奖励:-133.55 回合:15/20,奖励:-234.81 回合:16/20,奖励:-126.93 回合:17/20,奖励:-128.20 回合:18/20,奖励:-124.76 回合:19/20,奖励:-119.91 回合:20/20,奖励:-287.89 完成测试! # 项目链接fork一下直接运行 https://www.heywhale.com/mw/project/649afec770567260f8b79b26 更多优质内容请关注公号:汀丶人工智能

/template/Home/leiyu/PC/Static