# prompts_completions 进行了 padding 操作,形如: # prompts | responses # x x x x x x o o o o|o o o o x x x x # x x x o o o o o o o|o o o o o x x x # x x x x x x o o o o|o o o x x x x x # 对应的 attention_mask: # 0 0 0 0 0 0 1 1 1 1|1 1 1 1 0 0 0 0 # 0 0 0 1 1 1 1 1 1 1|1 1 1 1 1 0 0 0 # 0 0 0 0 0 0 1 1 1 1|1 1 1 0 0 0 0 0
import torch from tensordict import TensorDict import ray from verl import DataProto from verl.single_controller import Worker from verl.single_controller.base.decorator import Dispatch, Execute, register from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup from verl.utils.ray_utils import parallel_put
@register(dispatch_mode=Dispatch.DP_COMPUTE, execute_mode=Execute.ALL, blocking=False) defdummy_compute(self, data): for key in data.batch.keys(): data.batch[key] += self.rank return data
# verl/protocol.py @dataclass classDataProto: """ A DataProto is a data structure that aims to provide a standard protocol for data exchange between functions. It contains a batch (TensorDict) and a meta_info (Dict). The batch is a TensorDict https://pytorch.org/tensordict/. TensorDict allows you to manipulate a dictionary of Tensors like a single Tensor. Ideally, the tensors with the same batch size should be put inside batch. """
# verl/single_controller/ray/base.py classRayClassWithInitArgs(ClassWithInitArgs): ... def__call__( self, placement_group, placement_group_bundle_idx, use_gpu: bool = True, num_gpus=1, sharing_with=None, device_name="cuda", ) -> Any: """Create and return a Ray actor with the configured options. Args: placement_group: Ray placement group for scheduling placement_group_bundle_idx: Index of the bundle in the placement group use_gpu: Whether to use GPU resources num_gpus: Number of GPUs to allocate sharing_with: Actor to share resources with device_name: Device for training Returns: A Ray actor handle with the configured options """ ... # Ray Actor 的资源分配和初始化 self.cls.options(**options).remote(*self.args, **self.kwargs)
GPUWorker 是一个 Ray Actor 类,上面 RayClassWithInitArgs 的初始化操作,并不会立即初始化 Ray Actor 即 GPUWorker。GPUWorker 的初始化发生在 RayClassWithInitArgs 的实例发生调用行为时。此时可以根据调用时的传入参数,将 GPUWorker (Ray Actor) 绑定到特定的 CPU、GPU 资源上。代码示例如下:
# per node,node 层面循环 for pg_idx, pg inenumerate(sort_placement_group_by_node_ip(pgs)): assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " # per device (GPU) 层面循环 for local_rank inrange(local_world_size): rank += 1
# we pass in environment variable at option so that Worker can use environment variable to set # --------------------------- # 作为 Ray actor 的环境变量 # 这些环境变量有些会用作分布式进程组的初始化参数(torch.distributed.init_process_group) # --------------------------- env_vars = { "WORLD_SIZE": str(world_size), "RANK": str(rank), "WG_PREFIX": self.name_prefix, "WG_BACKEND": "ray", "RAY_LOCAL_WORLD_SIZE": str(local_world_size), "RAY_LOCAL_RANK": str(local_rank), } if rank != 0: env_vars["MASTER_ADDR"] = self._master_addr env_vars["MASTER_PORT"] = self._master_port
import re
cia_name = type(ray_cls_with_init.cls).__name__ match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" cia_name = match.group(1) ifmatchelse cia_name # "ActorClass(Obj)" -> "Obj" name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}"# e.g. Worker_2:5
# verl/single_controller/base/worker_group.py classWorkerGroup: ... def_bind_worker_method(self, user_defined_cls, func_generator): method_names = [] for method_name indir(user_defined_cls): try: method = getattr(user_defined_cls, method_name) assertcallable(method), f"{method_name} in {user_defined_cls} is not callable" except Exception: # if it is a property, it will fail because Class doesn't have instance property continue
# -------------------- # 经过 @register 装饰过的函数会被设置 {MAGIC_ATTR} 属性 # -------------------- ifhasattr(method, MAGIC_ATTR): # this method is decorated by register # -------------------- # {"dispatch_mode": ..., "execute_mode": ..., "blocking": ..., "materialize_futures": ...} # -------------------- attribute = getattr(method, MAGIC_ATTR) assertisinstance(attribute, Dict), f"attribute must be a dictionary. Got {type(attribute)}" assert"dispatch_mode"in attribute, "attribute must contain dispatch_mode in its key"
# get execute_fn from string try: execute_fn = getattr(self, wg_execute_fn_name) assertcallable(execute_fn), "execute_fn must be callable" except Exception: print(f"execute_fn {wg_execute_fn_name} is invalid") raise
# bind a new method to the RayWorkerGroup # ---------------------- # 对 @register 装饰的函数注入 dispatch_fn collect_fn execute_fn 逻辑。 # 参考下方介绍。 # ---------------------- func = func_generator( self, method_name, dispatch_fn=dispatch_fn, collect_fn=collect_fn, execute_fn=execute_fn, blocking=blocking, )
try: # ------------------------- # 将 Worker 中使用 @register 装饰的函数 # 绑定到 WorkerGroup 中。 # 以上述示例代码为例,即将 `GPUWorker` 中的 `add` 和 `dummy_compute` # 绑定到 `RayWorkerGroup` 上。 # ------------------------- setattr(self, method_name, func) method_names.append(method_name) except Exception as e: raise ValueError(f"Fail to set method_name {method_name}") from e
# verl/single_controller/ray/base.py defcreate_colocated_worker_cls(class_dict: dict[str, RayClassWithInitArgs]): ... # TODO: create a class with customizable name classWorkerDict(worker_cls): def__init__(self): super().__init__() self.worker_dict = {} for key, user_defined_cls in cls_dict.items(): # --------------------- # key 为不同 model 的 role name,即 `actor_rollout` `critic` `ref` `rm`。 # # 注意此时 user_defined_cls 是正常的 class,而非 ray actor class, # 比如对于 actor_rollout,是 ActorRolloutRefWorker 类。 # --------------------- user_defined_cls = _unwrap_ray_remote(user_defined_cls) # directly instantiate the class without remote # in worker class, e.g. <verl.single_controller.base.worker.Worker> # when DISABLE_WORKER_INIT == 1 it will return immediately with patch.dict(os.environ, {"DISABLE_WORKER_INIT": "1"}): self.worker_dict[key] = user_defined_cls( *init_args_dict[key].get("args", ()), **init_args_dict[key].get("kwargs", {}) )
# now monkey-patch the methods from inner class to WorkerDict for key, user_defined_cls in cls_dict.items(): user_defined_cls = _unwrap_ray_remote(user_defined_cls) _bind_workers_method_to_parent(WorkerDict, key, user_defined_cls)
classRayWorkerGroup(WorkerGroup): ... defspawn(self, prefix_set): """Spawn to a dictionary of worker groups, each with a subset of method with prefix. Args: prefix_set: Set of prefixes to create worker groups for Returns: Dictionary of worker groups keyed by prefix """ if self.fused_worker_used: return self.spawn_fused(prefix_set)
def_rebind_actor_methods(worker_group, actor_name): prefix: str = actor_name + "_" for method_name indir(worker_group): if method_name.startswith(prefix): # only valid when Python >= 3.9 original_method_name = method_name.removeprefix(prefix) method = getattr(worker_group, method_name) setattr(worker_group, original_method_name, method)
new_worker_group_dict = {} for prefix in prefix_set: new_worker_group = self.from_detached( name_prefix=self.name_prefix, worker_names=self._worker_names, worker_handles=self._workers, ray_cls_with_init=self.ray_cls_with_init, profile_steps=self.profile_steps, worker_nsight_options=self.worker_nsight_options, )