新闻中心

值分布强化学习 —— C51

2025-07-21
浏览次数:
返回列表
值分布强化学习是基于价值的强化学习算法,不同于传统方法仅建模累积回报期望值,它对整个分布Z(s,a)建模以保留分布信息。C51是其代表算法,将分布离散为51个支点,输出支点概率,通过投影贝尔曼更新处理分布范围问题,损失函数用KL散度,框架与DQN类似但输出和更新方式不同。

☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

值分布强化学习 —— c51 -

值分布强化学习简介

首先需要声明的是,值分布强化学习(Distributional Reinforcement Learning)是一类基于价值的强化学习算法(value-based Reinforcement Learning)

经典的基于价值的强化学习方法使用期望值对累积回报进行建模,表示为价值函数 V(s) 或动作价值函数 Q(s,a)

而在这个建模过程中,完整的分布信息在很大程度上被丢失了

提出值分布强化学习就是想要解决分布信息丢失这个问题,对累积回报随机变量的整个分布 Z(s,a) 进行建模,而非只建模其期望

值分布强化学习 —— C51 -        

如果用公式表示:

Q(st,at)=EZ(st,at)=E[i=1γt+iR(st+i,at+i)]Q(st,at)=EZ(st,at)=E[i=1∑∞γt+iR(st+i,at+i)]

值分布强化学习——C51算法

C51 简介

C51 算法来自 DeepMind 的 A Distributional Perspective on Reinforcement Learning 一文。在这篇文章中,作者首先说明传统 DQN 算法希望学习的 Q 是一个数值,其含义是未来奖励和的期望。而在值分布强化学习系列算法中,目标则由数值变为一个分布。在值分布强化学习中,目标也由数值 Q 变为随机变量 Z,这种改变可以使学到的内容是除了数值以外的更多信息,即整个分布。而模型返回的损失也转变为两个分布之间相似度的度量(metric)。

值分布强化学习 —— C51 -        

算法关键

参数化分布

简而言之,若分布取值范围为VminVmin到VmaxVmax,并均分为离散的N个点,每个等分支集为

{zi=Vmin+iΔz:0iN,Δz=VmaxVminN1}{zi=Vmin+iΔz:0≤i

模型输出的每个值对应取当前支点的概率

投影贝尔曼更新

通过上面的经过值分布贝尔曼操作符的作用后,新的随机变量的取值范围可能会超出第一个支点中离散化的支集的范围,如下图所示。

多多校园交易网 多多校园交易网

v2.2 修改相关字眼,加强搜索功能,重写找回密码功能,减少文件,增加学院功能,补给相关页面,修改相关表单字段名,更新图片新闻显示功能,修正租房搜索,增加BLOG,BBS文件夹,并修改频道设置和导航布局,去除相关ID扫描漏洞·全站设计考虑校园电子商务模式,人性化的设计,独特的校园式网络交易平台。 ·功能十分强大的后台管理界面,通过IE浏览器即可管理整个网

多多校园交易网 0 查看详情 多多校园交易网

值分布强化学习 —— C51 -        

因此我们必须将贝尔曼操作符更新后的随机变量投影到离散化的支集上,即论文提到的投影贝尔曼更新。

值分布强化学习 —— C51 -        

简单来说,投影过程就是将更新后随机变量的值分配到与其相邻的支点上

小结

C51算法与DQN相同点

  1. C51算法的框架依然是DQN算法
  2. 采样过程依然使用ϵgreedyϵ−greedy策略,这里贪婪是取期望贪婪
  3. 采用单独的目标网络

C51算法与DQN不同点

  1. C51算法的卷积神经网络的输出不再是行为值函数,而是支点处的概率。

  2. C51算法的损失函数不再是均方差和而是如上所述的KL散度

最后一个问题,该算法为什么叫C51呢?

这是因为在论文中,作者将随机变量的取值分成了51个支点类。

代码部分

导入依赖

In [1]
from typing import Dict, List, Tupleimport gymfrom visualdl import LogWriterfrom tqdm import tqdm,trangeimport numpy as npimport paddleimport paddle.nn as nnimport paddle.nn.functional as Fimport paddle.optimizer as optimizer
   

需要用到的特殊算子

In [2]
def index_add_(parent, axis, idx, child):
        expend_dim = parent.shape[0]
        idx_one_hot = F.one_hot(idx.cast("int64"), expend_dim)
        child = paddle.expand_as(child.cast("float32").unsqueeze(-1), idx_one_hot)
        output = parent + (idx_one_hot.cast("float32").multiply(child)).sum(axis).squeeze()        return output
   

经验回放

In [3]
class ReplayBuffer:
    def __init__(self, obs_dim: int, size: int, batch_size: int = 32):
        self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
        self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
        self.acts_buf = np.zeros([size], dtype=np.float32)
        self.rews_buf = np.zeros([size], dtype=np.float32)
        self.done_buf = np.zeros([size], dtype=np.float32)
        self.max_size, self.batch_size = size, batch_size
        self.ptr, self.size, = 0, 0

    def store(
        self, 
        obs: np.ndarray, 
        act: np.ndarray, 
        rew: float, 
        next_obs: np.ndarray, 
        done: bool,    ):
        self.obs_buf[self.ptr] = obs
        self.next_obs_buf[self.ptr] = next_obs
        self.acts_buf[self.ptr] = act
        self.rews_buf[self.ptr] = rew
        self.done_buf[self.ptr] = done
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)    def sample_batch(self):
        idxs = np.random.choice(self.size, size=self.batch_size, replace=False)        return dict(obs=self.obs_buf[idxs],
                    next_obs=self.next_obs_buf[idxs],
                    acts=self.acts_buf[idxs],
                    rews=self.rews_buf[idxs],
                    done=self.done_buf[idxs])    def __len__(self):
        return self.size
   

定义网络结构

虽然我们的DQN可以学到游戏的整个分布,但在 CartPole-v0 这一环境中,我们还是选取期望作为决策的依据

与传统的 DQN 相比,我们的 C51 会在更新方式上有所不同。

In [4]
class C51DQN(nn.Layer):
    def __init__(
        self, 
        in_dim: int, 
        out_dim: int, 
        atom_size: int, 
        support    ):
        # 初始化
        super(C51DQN, self).__init__()

        self.support = support
        self.out_dim = out_dim
        self.atom_size = atom_size
        
        self.layers = nn.Sequential(
            nn.Linear(in_dim, 128), 
            nn.ReLU(),
            nn.Linear(128, 128), 
            nn.ReLU(), 
            nn.Linear(128, out_dim * atom_size)
        )    def forward(self, x):
        dist = self.dist(x)
        q = paddle.sum(dist * self.support, axis=2)        return q    
    def dist(self, x):
        q_atoms = self.layers(x).reshape([-1, self.out_dim, self.atom_size])
        dist = F.softmax(q_atoms, axis=-1)
        dist = dist.clip(min=float(1e-3))  # 避免 nan
        return dist
   

Agent 中涉及到的参数设定

Attribute:env: gym 环境memory: 经验回放池的容量batch_size: 训练批次epsilon: 随机探索参数epsilonepsilon_decay: 随机探索参数epsilon衰减的步长max_epsilon: 随机探索参数epsilon上限min_epsilon: 随机探索参数epsilon下限target_update: 目标网络更新频率gamma: 衰减因子dqn: 训练模型dqn_target: 目标模型optimizer: 优化器transition: 转移向量,包含状态值state, 动作值action, 奖励reward, 次态next_state, (判断是否)结束回合donev_min: 离散支集的上限v_max: 离散支集的下限atom_size: 支点数量support: 支集模板(用于计算分布的向量模板)
    In [5]
class C51Agent:
    def __init__(
        self, 
        env: gym.Env,
        memory_size: int,
        batch_size: int,
        target_update: int,
        epsilon_decay: float,
        max_epsilon: float = 1.0,
        min_epsilon: float = 0.1,
        gamma: float = 0.99,        # C51 算法的参数
        v_min: float = 0.0,
        v_max: float = 200.0,
        atom_size: int = 51,
        log_dir: str = "./log"
    ):
        obs_dim = env.observation_space.shape[0]
        action_dim = env.action_space.n
        
        self.env = env
        self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
        self.batch_size = batch_size
        self.epsilon = max_epsilon
        self.epsilon_decay = epsilon_decay
        self.max_epsilon = max_epsilon
        self.min_epsilon = min_epsilon
        self.target_update = target_update
        self.gamma = gamma
    
        self.v_min = v_min
        self.v_max = v_max
        self.atom_size = atom_size
        self.support = paddle.linspace(
            self.v_min, self.v_max, self.atom_size
        )        # 定义网络
        self.dqn = C51DQN(
            obs_dim, action_dim, atom_size, self.support
        )
        self.dqn_target = C51DQN(
            obs_dim, action_dim, atom_size, self.support
        )
        self.dqn_target.load_dict(self.dqn.state_dict())
        self.dqn_target.eval()
        

        self.optimizer = optimizer.Adam(parameters=self.dqn.parameters())

        self.transition = []
        
        self.is_test = False
        
        self.log_dir = log_dir

        self.log_writer = LogWriter(logdir = self.log_dir, comment= "Categorical DQN")    def select_action(self, state: np.ndarray):
        if self.epsilon > np.random.random():
            selected_action = self.env.action_space.sample()        else:
            selected_action = self.dqn(
                paddle.to_tensor(state,dtype="float32"),
            ).argmax()
            selected_action = selected_action.detach().numpy()        
        if not self.is_test:
            self.transition = [state, selected_action]        
        return selected_action    def step(self, action: np.ndarray):
        next_state, reward, done, _ = self.env.step(int(action))        if not self.is_test:
            self.transition += [reward, next_state, done]
            self.memory.store(*self.transition)    
        return next_state, reward, done    def update_model(self):
        samples = self.memory.sample_batch()

        loss = self._compute_dqn_loss(samples)

        self.optimizer.clear_grad()
        loss.backward()
        self.optimizer.step()
        loss_show = loss        return loss_show.numpy().item()        
    def train(self, num_frames: int, plotting_interval: int = 200):
        self.is_test = False
        
        state = self.env.reset()
        update_cnt = 0
        epsilons = []
        losses = []
        scores = []
        score = 0
        epsilon = 0

        for frame_idx in trange(1, num_frames + 1):
            action = self.select_action(state)
            next_state, reward, done = self.step(action)

            state = next_state
            score += reward            # 回合结束
            if done:
                epsilon += 1
                state = self.env.reset()
                self.log_writer.add_scalar("Reward", value=paddle.to_tensor(score), step=epsilon)
                scores.append(score)
                score = 0

            if len(self.memory) >= self.batch_size:
                loss = self.update_model()
                self.log_writer.add_scalar("Loss", value=paddle.to_tensor(loss), step=frame_idx)
                losses.append(loss)
                update_cnt += 1
                
                self.epsilon = max(
                    self.min_epsilon, self.epsilon - (
                        self.max_epsilon - self.min_epsilon
                    ) * self.epsilon_decay
                )
                epsilons.append(self.epsilon)                if update_cnt % self.target_update == 0:
                    self._target_hard_update()
             
        self.env.close()                
    def test(self):
        self.is_test = True
        
        state = self.env.reset()
        done = False
        score = 0
        
        frames = []        while not done:
            frames.append(self.env.render(mode="rgb_array"))
            action = self.select_action(state)
            next_state, reward, done = self.step(int(action))

            state = next_state
            score += reward        
        print("score: ", score)
        self.env.close()        
        return frames    def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]):
        # 计算损失
        state = paddle.to_tensor(samples["obs"],dtype="float32")
        next_state = paddle.to_tensor(samples["next_obs"],dtype="float32")
        action = paddle.to_tensor(samples["acts"],dtype="int64")
        reward = paddle.to_tensor(samples["rews"].reshape([-1, 1]),dtype="float32")
        done = paddle.to_tensor(samples["done"].reshape([-1, 1]),dtype="float32")

        delta_z = float(self.v_max - self.v_min) / (self.atom_size - 1)        with paddle.no_grad():
            next_action = self.dqn_target(next_state).argmax(1)
            next_dist = self.dqn_target.dist(next_state)
            next_dist = next_dist[:self.batch_size,]
            _next_dist = paddle.gather(next_dist, next_action, axis=1)           
            eyes = np.eye(_next_dist.shape[0], _next_dist.shape[1]).astype("float32")
            eyes = np.repeat(eyes, _next_dist.shape[-1]).reshape(-1,_next_dist.shape[1],_next_dist.shape[-1])
            eyes = paddle.to_tensor(eyes)

            next_dist = _next_dist.multiply(eyes).sum(1)

            t_z = reward + (1 - done) * self.gamma * self.support
            t_z = t_z.clip(min=self.v_min, max=self.v_max)
            b = (t_z - self.v_min) / delta_z
            l = b.floor().cast("int64")
            u = b.ceil().cast("int64")

            offset = (
                paddle.linspace(                    0, (self.batch_size - 1) * self.atom_size, self.batch_size
                ).cast("int64")
                .unsqueeze(1)
                .expand([self.batch_size, self.atom_size])
            )

            proj_dist = paddle.zeros(next_dist.shape)
            proj_dist = index_add_(
                proj_dist.reshape([-1]),                0, 
                (l + offset).reshape([-1]), 
                (next_dist * (u.cast("float32") - b)).reshape([-1])
            )
            proj_dist = index_add_(
                proj_dist.reshape([-1]),                0, 
                (u + offset).reshape([-1]), 
                (next_dist * (b - l.cast("float32"))).reshape([-1])
            )
            proj_dist = proj_dist.reshape(next_dist.shape)

        dist = self.dqn.dist(state)
        _dist = paddle.gather(dist[:self.batch_size,], action, axis=1)
        eyes = np.eye(_dist.shape[0], _dist.shape[1]).astype("float32")
        eyes = np.repeat(eyes, _dist.shape[-1]).reshape(-1,_dist.shape[1],_dist.shape[-1])
        eyes = paddle.to_tensor(eyes)
        dist_batch = _dist.multiply(eyes).sum(1)
        log_p = paddle.log(dist_batch)

        loss = -(proj_dist * log_p).sum(1).mean()        return loss    def _target_hard_update(self):
        # 更新目标模型参数
        self.dqn_target.load_dict(self.dqn.state_dict())
   

定义环境

In [6]
env_id = "CartPole-v0"env = gym.make(env_id)
   

设定随机种子

In [7]
seed = 777np.random.seed(seed)
paddle.seed(seed)
env.seed(seed)
       
[777]
               

超参数设定与训练

In [ ]
num_frames = 20000memory_size = 1000batch_size = 32target_update = 200epsilon_decay = 1 / 2000# 训练agent = C51Agent(env, memory_size, batch_size, target_update, epsilon_decay)
agent.train(num_frames)
       
  0%|          | 32/20000 [00:00<01:38, 202.97it/s]
       

训练结果展示(使用VisualDL查看log文件夹)

值分布强化学习 —— C51 - 值分布强化学习 —— C51 -        

以上就是值分布强化学习 —— C51的详细内容,更多请关注其它相关文章!


# 科大  # 铁岭企业seo方法优化  # 河北seo网站优化电话  # seo原创文章格式  # 徐州网站建设与规划  # 佛山定制网站建设机构  # 抖音seo视频优化推广  # 保险行业营销推广途径  # 巨人导航seo综合查询  # 东莞网站建设哪家有优势  # 台州seo网站推广费用  # 的是  # ai  # 戛纳  # 开源  # 首款  # 系列产品  # 而在  # 交易网  # 贝尔  # 中文网  # type  # writer  # 为什么 


相关栏目: 【 行业资讯67740 】 【 技术百科0 】 【 网络运营39195


相关推荐: 税负是什么意思  苹果电脑如何输入命令  苹果16系统有哪些功能  春运什么时候开始抢票  单片机加热片怎么制作  选哪个折叠屏手机好用  如何体验苹果16系统  如何在昇腾Ascend 910B上运行Qwen2.5教程  npm如何声明命令  市盈率负值是什么意思  市盈率亏损是什么意思  typescript怎么理解的  域名解析后为什么要进行域名备案  市盈率tt的扣非是什么意思  爱奇艺vip会员可以同时几个人用?  linux如何使用db2命令  苹果16关闭哪些功能好  typescript多久能学完  破太岁是什么意思  linux如何调出命令行  什么是typescript  输入命令如何换行  typescript怎么写多个构造方法  苹果16系统网站有哪些  typescript中如何引入本地js  华为的nfc功能是什么意思  路由器上的power按钮是什么意思  typescript能开发什么  固态硬盘2m如何修复  固态硬盘如何测试好坏  固态硬盘坏了如何换硬盘  怎么在爱奇艺中投屏到电视最新方法  如何使用net命令  征信不好如何快速恢复 征信不好快速恢复的方法  如何去掉拍电脑的纹路详细教程  春运抢票软件哪个好  苹果16更新了哪些软件  j*a里数组怎么赋值  power在坐标轴中是什么意思  如何查看固态硬盘速度  苹果16系统有哪些改变  苹果16哪些型号好  哪些编程软件需用typescript  单片机是怎么计时的  电脑如何查看固态硬盘  联想手机如何输入命令行  苹果16适合哪些机升级  shell如何注释所有命令  企业征信不好如何恢复 企业征信不好怎么恢复步骤  固态硬盘如何区分好坏 

搜索