admin管理员组

文章数量:1030810

vLLM源码学习

vLLM v1

vLLM从v0.6.0开始,为了解决功能碎片化、模块之间耦合严重、技术债等问题,并行开发了v1。v1不仅解决了上述问题,还提升了推理性能,让CPU调度开销更小。下图是v1的类图,展示了从用户输入到模型推理的过程。

vLLM类图

主进程:负责对prompt做预处理和后处理,以及启动engine进程。

engine进程:请求调度,kv cache管理,单机单卡、单机多卡、多机多卡并行Executor启动。

worker进程:单GPU上模型执行,通信库执行。

执行流程图

执行流程图

LLM初始化会完成模型加载、KV Cache管理器、Scheduler调度器、分布式通信库等,值得注意的是,为了计算KV Cache可用空间,会用fake假数据完成一次推理。

generate接口会完成prompt预处理、任务添加、detokenizer,因为worker进程是独立的,新请求需要通过zmq队列传到worker进程里。

插件系统

通过Worker里的load_general_plugins函数加载用户的插件,插件分为通用插件和平台插件。

通用插件可以提交新的模型,使用方式如下:

代码语言:txt复制
# inside `setup.py` file
from setuptools import setup
 
setup(name='vllm_add_dummy_model',
      version='0.1',
      packages=['vllm_add_dummy_model'],
      entry_points={
          'vllm.general_plugins':
          ["register_dummy_model = vllm_add_dummy_model:register"]
      })
 
# inside `vllm_add_dummy_model.py` file
def register():
    from vllm import ModelRegistry
 
    if "MyLlava" not in ModelRegistry.get_supported_archs():
        ModelRegistry.register_model("MyLlava",
                                        "vllm_add_dummy_model.my_llava:MyLlava")

平台插件可以支持一个新的硬件,使用方式如下:

vLLM平台插件

分布式并行

单机多卡

单机多卡使用python的多进程实现,vllm/vllm/v1/executor/multiproc_executor.py at main · vllm-project/vllm,3D并行(张量并行、流水线并行、专家并行)的每一个分片是一个进程。

多个进程间使用torch.distributed通信,在多进程worker间启动通信组、感知每个进程的编号、当前进程在分布式环境的位置、执行通信算子等。

pytorch通信代码详见:.py

多机多卡

多机多卡使用Ray进行通信,vllm/vllm/v1/executor/ray_distributed_executor.py at main · vllm-project/vllm。

ray是2016年伯克利开展的一个课堂项目,论文《Ray: A Distributed Framework for Emerging AI Applications》,用于拓展分布式神经网络训练。

OpenAI用于大模型训练的底层框架,国内大厂都在用,它解决了分布式 ML 中的三个关键挑战。

  • 消除计算约束:远程访问几乎无限的计算
  • 容错:自动将失败的任务重新路由到集群中的其他机器
  • 状态管理:在任务之间共享数据并跨数据进行协调
Ray的应用

Ray的简单使用

主节点启动:

代码语言:txt复制
ray start --head --num-gpus=1
# num-gpus用于指定使用主节点上几张卡

启动后看输出日志,记录下来主节点的ip和port,从节点连接的时候需要。

从节点启动:

代码语言:txt复制
ray start --address='主节点ip:主节点端口' --num-gpus=1
# num-gpus用于指定使用从节点上几张卡

可以随意启动多个从节点

在集群内任意节点都可以查看集群状态,命令`ray status`

在主节点上运行python程序,Ray会自动把任务分到多台机器上执行。下面的代码是llama多机推理的简单例子:

代码语言:txt复制
import ray
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

@ray.remote(num_gpus=1)  # 每个Actor分配1块GPU
class PipelineStage:
    def __init__(self, model_path: int, max_length: int):
        self.device = "cuda:0"

        # 加载模型
        self.model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype="auto", device_map="auto")
        self.max_length = max_length

    def forward(self, inputs: dict):
        # 将输入数据移动到当前设备
        inputs = {
            "input_ids": inputs["input_ids"].to(self.device),
            "attention_mask": inputs["attention_mask"].to(self.device)
        }

        # 执行当前阶段计算
        generated_ids = self.model.generate(**inputs, max_length=self.max_length)

        return generated_ids

master_node = "master ip"
slave_node1 = "slave1 ip"
slave_node2 = "slave2 ip"
prompt = "Explain the theory of relativity in simple terms."
model_path = "./Llama-3.2-3B-Instruct/"

def main():
    # 初始化Ray集群
    ray.init(
        address="auto",
        runtime_env={"env_vars": {"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1"}},
        _node_ip_address=master_node
    )

    # 在3台机器上各启动一个Actor
    stage1 = PipelineStage.options(
        resources={f"node:{master_node}": 0.01},  # 绑定到master node
        num_gpus=1
    ).remote(model_path=model_path, max_length=20)

    stage2 = PipelineStage.options(
        resources={f"node:{slave_node1}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=30)
    
    stage3 = PipelineStage.options(
        resources={f"node:{slave_node2}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=40)
    
    # 准备输入数据
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    inputs = tokenizer(prompt, return_tensors="pt")

    # 执行pipeline推理
    generated_ids_1 = ray.get(stage1.forward.remote(inputs))
    inputs = {
        "input_ids": generated_ids_1,
        "attention_mask": torch.ones_like(generated_ids_1)
    }
    generated_ids_2 = ray.get(stage2.forward.remote(inputs))
    
    inputs = {
        "input_ids": generated_ids_2,
        "attention_mask": torch.ones_like(generated_ids_2)
    }
    generated_ids_3 = ray.get(stage3.forward.remote(inputs))
    
    # 解码输出
    print(tokenizer.batch_decode(generated_ids_1, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_2, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_3, skip_special_tokens=True))

    ray.shutdown()


if __name__ == "__main__":
    main()

参考文献

CPU进程间通信库:zeromq/pyzmq: PyZMQ: Python bindings for zeromq

vllm-ascend:vllm-project/vllm-ascend: Community maintained hardware plugin for vLLM on Ascend

昇腾vllm插件文档:Quickstart — vllm-ascend

ray仓库:ray/python/ray/_private/accelerators/npu.py at master · ray-project/ray

vLLM源码学习

vLLM v1

vLLM从v0.6.0开始,为了解决功能碎片化、模块之间耦合严重、技术债等问题,并行开发了v1。v1不仅解决了上述问题,还提升了推理性能,让CPU调度开销更小。下图是v1的类图,展示了从用户输入到模型推理的过程。

vLLM类图

主进程:负责对prompt做预处理和后处理,以及启动engine进程。

engine进程:请求调度,kv cache管理,单机单卡、单机多卡、多机多卡并行Executor启动。

worker进程:单GPU上模型执行,通信库执行。

执行流程图

执行流程图

LLM初始化会完成模型加载、KV Cache管理器、Scheduler调度器、分布式通信库等,值得注意的是,为了计算KV Cache可用空间,会用fake假数据完成一次推理。

generate接口会完成prompt预处理、任务添加、detokenizer,因为worker进程是独立的,新请求需要通过zmq队列传到worker进程里。

插件系统

通过Worker里的load_general_plugins函数加载用户的插件,插件分为通用插件和平台插件。

通用插件可以提交新的模型,使用方式如下:

代码语言:txt复制
# inside `setup.py` file
from setuptools import setup
 
setup(name='vllm_add_dummy_model',
      version='0.1',
      packages=['vllm_add_dummy_model'],
      entry_points={
          'vllm.general_plugins':
          ["register_dummy_model = vllm_add_dummy_model:register"]
      })
 
# inside `vllm_add_dummy_model.py` file
def register():
    from vllm import ModelRegistry
 
    if "MyLlava" not in ModelRegistry.get_supported_archs():
        ModelRegistry.register_model("MyLlava",
                                        "vllm_add_dummy_model.my_llava:MyLlava")

平台插件可以支持一个新的硬件,使用方式如下:

vLLM平台插件

分布式并行

单机多卡

单机多卡使用python的多进程实现,vllm/vllm/v1/executor/multiproc_executor.py at main · vllm-project/vllm,3D并行(张量并行、流水线并行、专家并行)的每一个分片是一个进程。

多个进程间使用torch.distributed通信,在多进程worker间启动通信组、感知每个进程的编号、当前进程在分布式环境的位置、执行通信算子等。

pytorch通信代码详见:.py

多机多卡

多机多卡使用Ray进行通信,vllm/vllm/v1/executor/ray_distributed_executor.py at main · vllm-project/vllm。

ray是2016年伯克利开展的一个课堂项目,论文《Ray: A Distributed Framework for Emerging AI Applications》,用于拓展分布式神经网络训练。

OpenAI用于大模型训练的底层框架,国内大厂都在用,它解决了分布式 ML 中的三个关键挑战。

  • 消除计算约束:远程访问几乎无限的计算
  • 容错:自动将失败的任务重新路由到集群中的其他机器
  • 状态管理:在任务之间共享数据并跨数据进行协调
Ray的应用

Ray的简单使用

主节点启动:

代码语言:txt复制
ray start --head --num-gpus=1
# num-gpus用于指定使用主节点上几张卡

启动后看输出日志,记录下来主节点的ip和port,从节点连接的时候需要。

从节点启动:

代码语言:txt复制
ray start --address='主节点ip:主节点端口' --num-gpus=1
# num-gpus用于指定使用从节点上几张卡

可以随意启动多个从节点

在集群内任意节点都可以查看集群状态,命令`ray status`

在主节点上运行python程序,Ray会自动把任务分到多台机器上执行。下面的代码是llama多机推理的简单例子:

代码语言:txt复制
import ray
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

@ray.remote(num_gpus=1)  # 每个Actor分配1块GPU
class PipelineStage:
    def __init__(self, model_path: int, max_length: int):
        self.device = "cuda:0"

        # 加载模型
        self.model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype="auto", device_map="auto")
        self.max_length = max_length

    def forward(self, inputs: dict):
        # 将输入数据移动到当前设备
        inputs = {
            "input_ids": inputs["input_ids"].to(self.device),
            "attention_mask": inputs["attention_mask"].to(self.device)
        }

        # 执行当前阶段计算
        generated_ids = self.model.generate(**inputs, max_length=self.max_length)

        return generated_ids

master_node = "master ip"
slave_node1 = "slave1 ip"
slave_node2 = "slave2 ip"
prompt = "Explain the theory of relativity in simple terms."
model_path = "./Llama-3.2-3B-Instruct/"

def main():
    # 初始化Ray集群
    ray.init(
        address="auto",
        runtime_env={"env_vars": {"RAY_ENABLE_WINDOWS_OR_OSX_CLUSTER": "1"}},
        _node_ip_address=master_node
    )

    # 在3台机器上各启动一个Actor
    stage1 = PipelineStage.options(
        resources={f"node:{master_node}": 0.01},  # 绑定到master node
        num_gpus=1
    ).remote(model_path=model_path, max_length=20)

    stage2 = PipelineStage.options(
        resources={f"node:{slave_node1}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=30)
    
    stage3 = PipelineStage.options(
        resources={f"node:{slave_node2}": 0.01},  # 绑定到slave node
        num_gpus=1
    ).remote(model_path=model_path, max_length=40)
    
    # 准备输入数据
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    inputs = tokenizer(prompt, return_tensors="pt")

    # 执行pipeline推理
    generated_ids_1 = ray.get(stage1.forward.remote(inputs))
    inputs = {
        "input_ids": generated_ids_1,
        "attention_mask": torch.ones_like(generated_ids_1)
    }
    generated_ids_2 = ray.get(stage2.forward.remote(inputs))
    
    inputs = {
        "input_ids": generated_ids_2,
        "attention_mask": torch.ones_like(generated_ids_2)
    }
    generated_ids_3 = ray.get(stage3.forward.remote(inputs))
    
    # 解码输出
    print(tokenizer.batch_decode(generated_ids_1, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_2, skip_special_tokens=True))
    print(tokenizer.batch_decode(generated_ids_3, skip_special_tokens=True))

    ray.shutdown()


if __name__ == "__main__":
    main()

参考文献

CPU进程间通信库:zeromq/pyzmq: PyZMQ: Python bindings for zeromq

vllm-ascend:vllm-project/vllm-ascend: Community maintained hardware plugin for vLLM on Ascend

昇腾vllm插件文档:Quickstart — vllm-ascend

ray仓库:ray/python/ray/_private/accelerators/npu.py at master · ray-project/ray

本文标签: vLLM源码学习