vLLM 源码阅读 - Block Manager 与核心调度逻辑 (part2)

说明:基于 vLLM v0.7.3,commit id: ed6e9075d31e32c8548b480a47d1ffb77da1f54c (HEAD, tag: v0.7.3)

vLLM 调度逻辑

介绍具体的调度逻辑前,需要梳理一下 block manager 的实现。调度逻辑中,block manager 会作为具体的判断条件。

block manager subsystem 的设计可以参考官方的 PR

现总结如下:

1
2
3
4
5
6
7
8
9
10
BlockSpaceManager
将 `SequenceGroup` 的操作映射为 lower-level 的操作
BlockTable
将单个 `Sequence` 的 KV Cache 映射为 physical 的分配,处理 sliding_window / lookahead 的分配
DeviceAwareBlockAllocator
将 block 分配映射到 device,处理 swapping
BlockAllocator
分配和释放 block,重新计数、copy-on-write 等
Block
将 token ids 存入

block manager

BlockSpaceManager 的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# --------------------
# BlockSpaceManager 将 `SequenceGroup` 的操作映射为 lower-level 的操作
# --------------------
# vllm/core/block_manager.py
class SelfAttnBlockSpaceManager(BlockSpaceManager):
def __init__(
self,
block_size: int,
num_gpu_blocks: int,
num_cpu_blocks: int,
watermark: float = 0.01,
sliding_window: Optional[int] = None,
enable_caching: bool = False,
) -> None:
self.block_size = block_size
self.num_total_gpu_blocks = num_gpu_blocks
self.num_total_cpu_blocks = num_cpu_blocks
# ---------------------------------------
# memory swapping 的阈值,默认 0.01
# ---------------------------------------
self.watermark = watermark
self.watermark_blocks = int(watermark * num_gpu_blocks)
# ---------------------------------------
# CpuGpuBlockAllocator in vllm/core/block/cpu_gpu_block_allocator.py
# CpuGpuBlockAllocator 继承自接口类 DeviceAwareBlockAllocator

# block_allocator 属性即为 CpuGpuBlockAllocator 的实例
# ---------------------------------------
self.block_allocator = CpuGpuBlockAllocator.create(
allocator_type="prefix_caching" if enable_caching else "naive",
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks,
block_size=block_size,
)
# ---------------------------------------
# BlockTable in vllm/core/block/block_table.py
# 将单个 `Sequence` 的 KV Cache 映射为 physical 的分配,
# 处理 sliding_window / lookahead 的分配
# ---------------------------------------
self.block_tables: Dict[SeqId, BlockTable] = {}
self.cross_block_tables: Dict[EncoderSeqId, BlockTable] = {}

CpuGpuBlockAllocator.create 如下,创建 CpuGpuBlockAllocator 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# ------------------------
# DeviceAwareBlockAllocator 将 block 分配映射到 device,处理 swapping
# ------------------------
# vllm/core/block/cpu_gpu_block_allocator.py
class CpuGpuBlockAllocator(DeviceAwareBlockAllocator):
@staticmethod
def create(
allocator_type: str,
num_gpu_blocks: int,
num_cpu_blocks: int,
block_size: int,
) -> DeviceAwareBlockAllocator:
reserved_blocks = 1 if current_platform.is_hpu() else 0
# ---------------------------------
# 非 hpu device,则按照预先计算的 gpu_blocks 和 cpu_blocks 数量
# [0, 1, ..., num_gpu_blocks + num_cpu_blocks - 1]
# ---------------------------------
block_ids = list(
range(reserved_blocks, num_gpu_blocks + num_cpu_blocks))
num_gpu_blocks -= reserved_blocks
gpu_block_ids = block_ids[:num_gpu_blocks]
cpu_block_ids = block_ids[num_gpu_blocks:]
...
if allocator_type == "naive":
gpu_allocator: BlockAllocator = NaiveBlockAllocator(
create_block=NaiveBlock, # type: ignore
num_blocks=num_gpu_blocks,
block_size=block_size,
block_ids=gpu_block_ids,
)

cpu_allocator: BlockAllocator = NaiveBlockAllocator(
create_block=NaiveBlock, # type: ignore
num_blocks=num_cpu_blocks,
block_size=block_size,
block_ids=cpu_block_ids,
)
elif allocator_type == "prefix_caching":
gpu_allocator = PrefixCachingBlockAllocator(
num_blocks=num_gpu_blocks,
block_size=block_size,
block_ids=gpu_block_ids,
)

cpu_allocator = PrefixCachingBlockAllocator(
num_blocks=num_cpu_blocks,
block_size=block_size,
block_ids=cpu_block_ids,
)
...

def __init__(self, cpu_block_allocator: BlockAllocator,
gpu_block_allocator: BlockAllocator):
...
self._allocators = {
# ---------------------------------
# NaiveBlockAllocator 的实例
# 参照 `create` 函数中 gpu_allocator 的创建
# ---------------------------------
Device.CPU: cpu_block_allocator,
# ---------------------------------
# NaiveBlockAllocator 的实例
# 参照 `create` 函数中 gpu_allocator 的创建
# ---------------------------------
Device.GPU: gpu_block_allocator,
}

self._swap_mapping: Dict[int, int] = {}
self._null_block: Optional[Block] = None

self._block_ids_to_allocator: Dict[int, BlockAllocator] = {}
for _, allocator in self._allocators.items():
for block_id in allocator.all_block_ids:
self._block_ids_to_allocator[block_id] = allocator

# ---------------------------------
# 几个重要的 API
# ---------------------------------

# ---------------------------------
# 如果一个 block 没有满,还有 slot 空位
# ---------------------------------
def allocate_mutable_block(self,
prev_block: Optional[Block],
device: Device,
extra_hash: Optional[int] = None) -> Block:
# ---------------------------------
# 实际调用 gpu_allocator 或者 cpu_allocator (NaiveBlockAllocator) 的方法
# ---------------------------------
return self._allocators[device].allocate_mutable_block(
prev_block, extra_hash=extra_hash)

def allocate_immutable_blocks(
self,
prev_block: Optional[Block],
block_token_ids: List[List[int]],
device: Device,
extra_hash: Optional[int] = None) -> List[Block]:
# ---------------------------------
# 实际调用 gpu_allocator 或者 cpu_allocator (NaiveBlockAllocator) 的方法
# ---------------------------------
return self._allocators[device].allocate_immutable_blocks(
prev_block, block_token_ids, extra_hash=extra_hash)

def allocate_immutable_block(self,
prev_block: Optional[Block],
token_ids: List[int],
device: Device,
extra_hash: Optional[int] = None) -> Block:
# ---------------------------------
# 实际调用 gpu_allocator 或者 cpu_allocator (NaiveBlockAllocator) 的方法
# ---------------------------------
return self._allocators[device].allocate_immutable_block(
prev_block, token_ids, extra_hash=extra_hash)

现在详细看看 BlockAllocator 的初始化和核心 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# ---------------------
# BlockAllocator 分配和释放 block,重新计数、copy-on-write 等
# ---------------------
# vllm/core/block/naive_block.py
class NaiveBlockAllocator(BlockAllocator):
def __init__(
self,
create_block: Block.Factory,
num_blocks: int,
block_size: int,
block_ids: Optional[Iterable[int]] = None,
block_pool: Optional[BlockPool] = None,
):
if block_ids is None:
block_ids = range(num_blocks)

self._free_block_indices: Deque[BlockId] = deque(block_ids)
self._all_block_indices = frozenset(block_ids)
...
# ---------------------------------
# 建立 BlockId 的引用计数,Dict[int, int]
# ---------------------------------
self._refcounter = RefCounter(
all_block_indices=self._free_block_indices)
self._block_size = block_size

self._cow_tracker = CopyOnWriteTracker(
refcounter=self._refcounter.as_readonly())

if block_pool is None:
extra_factor = 4
# Pre-allocate "num_blocks * extra_factor" block objects.
# The "* extra_factor" is a buffer to allow more block objects
# than physical blocks
# ------------------------------
# 默认创建一个 block 池,默认 block 池的大小是 num_blocks 的 4 倍
# block pool 中存放了 num_blocks * extra_factor 个
# NaiveBlock(Block) 的实例,即 block
#
# 初始化 block:
# NaiveBlock(
# prev_block=None,
# token_ids=[],
# block_size=self._block_size, # BlockPool 中传入的 self._block_size 参数
# allocator=self._allocator, # 即 BlockPool 中传入的 self 参数
# block_id=None,
# extra_hash=None
# )
# ------------------------------
self._block_pool = BlockPool(self._block_size, create_block, self,
num_blocks * extra_factor)

def _allocate_block_id(self) -> BlockId:
if not self._free_block_indices:
raise BlockAllocator.NoFreeBlocksError()

block_id = self._free_block_indices.popleft()
self._refcounter.incr(block_id)
return block_id

# ----------------------------
# 分配一个新的 block,并和前一个 block link
# 新的 block 的 slot 是空的,等待 append token id
# ----------------------------
def allocate_mutable_block(self,
prev_block: Optional[Block],
extra_hash: Optional[int] = None,
device: Optional[Device] = None) -> Block:
assert device is None
block_id = self._allocate_block_id()
# ----------------------------
# 即从 block 池中选取一个 Block 并调用 Block 的初始化函数
# ----------------------------
block = self._block_pool.init_block(prev_block=prev_block,
token_ids=[],
block_size=self._block_size,
physical_block_id=block_id)
return block

# ----------------------------
# 分配一个不可变的 block,并和前一个 block link
# 同时将 token ids append 到 block 的 _token_ids 属性当中
# ----------------------------
def allocate_immutable_block(self,
prev_block: Optional[Block],
token_ids: List[int],
extra_hash: Optional[int] = None,
device: Optional[Device] = None) -> Block:
assert device is None
block = self.allocate_mutable_block(prev_block=prev_block)
block.append_token_ids(token_ids)
return block

def swap_out(self, blocks: List[Block]) -> None:
...

def swap_in(self, blocks: List[Block]) -> None:
...

BlockPool 中预先创建的 block 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# vllm/core/block/naive_block.py
class NaiveBlock(Block):
def __init__(self,
prev_block: Optional[Block],
token_ids: List[int],
block_size: int,
allocator: BlockAllocator,
block_id: Optional[int] = None,
_cow_target: Optional[Block] = None,
extra_hash: Optional[int] = None):
self._token_ids: List[int] = []
self._block_size = block_size
self._prev_block = prev_block
self._block_id = block_id
self._allocator = allocator
self._cow_target = _cow_target if _cow_target is not None else self

self._append_token_ids_no_cow(token_ids)

def append_token_ids(self, token_ids: List[int]) -> None:
self._append_token_ids_no_cow(token_ids)

if self._block_id is not None:
self._block_id = (self._allocator.cow_block_if_not_appendable(
self._cow_target))

def _append_token_ids_no_cow(self, token_ids: List[int]) -> None:
if len(token_ids) == 0:
return

assert len(token_ids) <= self.num_empty_slots

self._token_ids.extend(token_ids)

# ----------------------
# block 中的 slot 是否填满
# ----------------------
@property
def is_full(self) -> bool:
return self.num_empty_slots == 0
# ----------------------
# block 中剩余空的 slots 的数量
# ----------------------
@property
def num_empty_slots(self) -> int:
return self._block_size - len(self.token_ids)

最外层的调用,从 block manager 调用分配的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelfAttnBlockSpaceManager(BlockSpaceManager):
def can_allocate(self,
seq_group: SequenceGroup,
num_lookahead_slots: int = 0) -> AllocStatus:
seq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]
# ---------------------------------------
# seq tokens num / block_size
# ---------------------------------------
num_required_blocks = BlockTable.get_num_required_blocks(
seq.get_token_ids(),
block_size=self.block_size,
num_lookahead_slots=num_lookahead_slots,
)
# ---------------------------------------
# 实际调用 NaiveBlockAllocator 的 get_num_free_blocks
# ---------------------------------------
num_free_gpu_blocks = self.block_allocator.get_num_free_blocks(
device=Device.GPU)

# Use watermark to avoid frequent cache eviction.
if (self.num_total_gpu_blocks - num_required_blocks
< self.watermark_blocks):
return AllocStatus.NEVER
if num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:
return AllocStatus.OK
else:
return AllocStatus.LATER

def allocate(self, seq_group: SequenceGroup) -> None:
# Allocate self-attention block tables for decoder sequences
waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
assert not (set(seq.seq_id for seq in waiting_seqs)
& self.block_tables.keys()), "block table already exists"

# NOTE: Here we assume that all sequences in the group have the same
# prompt.
seq = waiting_seqs[0]
block_table: BlockTable = self._allocate_sequence(seq)
self.block_tables[seq.seq_id] = block_table

block manager 中涉及的几个重要类的关系如下:

总结上述几个类的调用链和关系如下:

  1. SelfAttnBlockSpaceManager(BlockSpaceManager) 调用分配(allocate)的接口,将 seq_group 传入;
  2. 执行 BlockTable 的创建,创建 BlockTable 时传入的参数:CpuGpuBlockAllocator(DeviceAwareBlockAllocator) 的实例和 seq;SelfAttnBlockSpaceManager(BlockSpaceManager) 中管理着 seq_id 到 BlockTable 的映射;
  3. BlockTable 调用分配接口,根据 seq 的 token ids 数量调用 CpuGpuBlockAllocator(DeviceAwareBlockAllocator) 的 allocate_immutable_blocks 和 allocate_mutable_block;
  4. CpuGpuBlockAllocator(DeviceAwareBlockAllocator) 的 allocate_immutable_blocks 和 allocate_mutable_block 调用实际底层是 NaiveBlockAllocator(BlockAllocator) 的分配调用,从而建立 seq <–> block table <–> block 之间的联系。

下图所示:红色箭头为初始化过程;蓝色箭头为调用 allocate 的逻辑。

schedule

总的调度逻辑如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# core/scheduler.py
class Scheduler:
def schedule(
self
) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs, bool]:
# Schedule sequence groups.
# This function call changes the internal states of the scheduler
# such as self.running, self.swapped, and self.waiting.
# ---------------------------------------
# 调度逻辑的入口
# ---------------------------------------
scheduler_outputs: SchedulerOutputs = self._schedule()

# ---------------------------------------
# 不开启 chunked_prefill 的情况
# ---------------------------------------
def _schedule_default(self) -> SchedulerOutputs:
...
# If any requests are swapped, prioritized swapped requests.
# ---------------------------------------
# 优先调度 swapped 队列,即使 waiting 队列非空
# ---------------------------------------
if not self.swapped:
prefills = self._schedule_prefills(budget,
curr_loras,
enable_chunking=False)
# ---------------------------------------
# 如果 prefill 中没有 seq_groups 且调度策略为 'priority' 时调度优先级抢占的 seq
# ---------------------------------------
if len(prefills.seq_groups
) == 0 and self.scheduler_config.policy == "priority":
self._schedule_priority_preemption(budget)

# Don't schedule decodes if prefills are scheduled.
# NOTE: If `_schedule_prefills` doesn't enable chunking, self.running
# only contains decode requests, not chunked prefills.
# ---------------------------------------
# prefill 的 seq_groups 为空才会调度 running 或 swapped 队列中的 seq。
# 即:prefill 和 decode 是分开调度的。
# ---------------------------------------
if len(prefills.seq_groups) == 0:
running_scheduled = self._schedule_running(budget,
curr_loras,
enable_chunking=False)

# If any sequence group is preempted, do not swap in any sequence
# group. because it means there's no slot for new running requests.
# ---------------------------------------
# 如果 preempted 和 swapped_out 为空,说明 running_scheduled 中的 seq_group 都能正常分配 kv cahche 的空间,
# 则可以将 swapped 队列中的 seq_group swap_in 进行调度。
# ---------------------------------------
if (len(running_scheduled.preempted) +
len(running_scheduled.swapped_out) == 0):
swapped_in = \
self._schedule_swapped(budget, curr_loras)

关于 prefill 阶段的调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# core/scheduler.py
class Scheduler:
def _schedule_prefills(
self,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
partial_prefill_metadata: Optional[PartialPrefillMetadata] = None,
) -> SchedulerPrefillOutputs:
...
# ---------------------------------------
# prefill 阶段的 seq_groups 都在 waiting 的队列中
# ---------------------------------------
while self._passed_delay(time.time()) and waiting_queue:
# ---------------------------------------
# fcfs 调度方式,先进先调度
# ---------------------------------------
seq_group = waiting_queue[0]

waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
...
# ---------------------------------------
# prefill 阶段,num_new_tokens_cached 为 0;num_new_tokens_uncached 为 prompt 的长度。
# ---------------------------------------
num_new_tokens_uncached, num_new_tokens_cached = (
self._get_num_new_uncached_and_cached_tokens(
seq_group,
SequenceStatus.WAITING,
enable_chunking,
budget,
partial_prefill_metadata=partial_prefill_metadata,
))
# ---------------------------------------
# 如果 num_new_tokens > scheduler 默认设置的最大值(如:32768)
# 则该 seq_group 中的 seq 都将被设置为 SequenceStatus.FINISHED_IGNORED
# ---------------------------------------
num_new_tokens = num_new_tokens_uncached + num_new_tokens_cached
...
# If the sequence group cannot be allocated, stop.
# ---------------------------------------
# 判断当前 seq_group 是否有可分配的 KV Cache(由 block manager 管理),下面会详细介绍。
# ---------------------------------------
can_allocate = self.block_manager.can_allocate(
seq_group, num_lookahead_slots=num_lookahead_slots)
if can_allocate == AllocStatus.LATER:
break
elif can_allocate == AllocStatus.NEVER:
logger.warning(
"Input prompt (%d tokens) + lookahead slots (%d) is "
"too long and exceeds the capacity of block_manager",
num_new_tokens,
num_lookahead_slots,
)
for seq in waiting_seqs:
seq.status = SequenceStatus.FINISHED_IGNORED
ignored_seq_groups.append(seq_group)
waiting_queue.popleft()
continue
...
# ---------------------------------------
# 将 waiting 队列最先进入的 seq_group pop 出来,
# 为该 seq_group 分配 kv cache blocks,
# 并且将该 seq_group 中的 seq 状态由 SequenceStatus.WAITING 设置为 SequenceStatus.RUNNING。
# ---------------------------------------
waiting_queue.popleft()
self._allocate_and_set_running(seq_group)
...
seq_groups.append(
ScheduledSequenceGroup(seq_group=seq_group,
token_chunk_size=num_new_tokens))
# ---------------------------------------
# 更新 budget 状态
# 一个是总的 batched tokens 一个是处理的 seq 数量
# ---------------------------------------
budget.add_num_batched_tokens(
seq_group.request_id,
num_batched_tokens=num_new_tokens_uncached,
num_cached_tokens=num_new_tokens_cached,
)
budget.add_num_seqs(seq_group.request_id, num_new_seqs)
...
return SchedulerPrefillOutputs(
seq_groups=seq_groups,
ignored_seq_groups=ignored_seq_groups,
num_lookahead_slots=self._get_num_lookahead_slots(
is_prefill=True, enable_chunking=enable_chunking),
)

关于 decode 阶段调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# core/scheduler.py
class Scheduler:
def _schedule_running(
self,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
partial_prefill_metadata: Optional[PartialPrefillMetadata] = None,
) -> SchedulerRunningOutputs:
...
while running_queue:
seq_group = running_queue[0]
# We discard the cached tokens info here because we don't need it
# for running sequence:
# 1. If a sequence is running with chunked prefill, the cached
# tokens info was already used for the first prefill.
# 2. If a sequence is running with non-chunked prefill, then
# there it's a decoding sequence, and the cached tokens info is
# irrelevant.
num_uncached_new_tokens, _ = \
self._get_num_new_uncached_and_cached_tokens(
seq_group,
SequenceStatus.RUNNING,
enable_chunking,
budget,
partial_prefill_metadata,
)
running_queue.popleft()
...
while not self._can_append_slots(seq_group, enable_chunking):
budget.subtract_num_batched_tokens(seq_group.request_id,
num_running_tokens)
num_running_seqs = seq_group.get_max_num_running_seqs()
budget.subtract_num_seqs(seq_group.request_id,
num_running_seqs)

if (curr_loras is not None and seq_group.lora_int_id > 0
and seq_group.lora_int_id in curr_loras):
curr_loras.remove(seq_group.lora_int_id)

# Determine victim sequence
cont_loop = True
# ---------------------------------------
# 如果 running 的队列非空,但是没有空间分配 kv cache,
# 就将 running 队列的最后一个 seq_group pop 出来作为抢占或者 swapped out,然后循环直到满足退出条件

# 如果 running 的队列为空,没有可分配 kv cache 的空间,则将当前 seq_group 作为抢占或者 swapped out
# ---------------------------------------
if running_queue:
# Preempt the lowest-priority sequence group.
victim_seq_group = running_queue.pop()
else:
# No other sequence group can be preempted.
# Preempt the current sequence group.
# Note: This is also where we stop this loop
# (since there is nothing else to preempt)
victim_seq_group = seq_group
cont_loop = False

# With async postprocessor, before preempting a sequence
# we need to ensure it has no pending async postprocessor
do_preempt = True
if self.use_async_output_proc:
assert self.output_proc_callback is not None
self.output_proc_callback(
request_id=victim_seq_group.request_id)

# It may be that the async pending "victim_seq_group"
# becomes finished, in which case we simply free it.
if victim_seq_group.is_finished():
self._free_finished_seq_group(victim_seq_group)
do_preempt = False

# Do preemption
if do_preempt:
# ---------------------------------------
# 决定抢占模式 SWAP or RECOMPUTE
# ---------------------------------------
preempted_mode = self._preempt(victim_seq_group,
blocks_to_swap_out)
if preempted_mode == PreemptionMode.RECOMPUTE:
preempted.append(victim_seq_group)
else:
swapped_out.append(victim_seq_group)

if not cont_loop:
break
else:
# ---------------------------------------
# 分配 slot,后续进行 decode
# ---------------------------------------
self._append_slots(seq_group, blocks_to_copy, enable_chunking)

swapped 的调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# core/scheduler.py
class Scheduler:
def _schedule_swapped(
self,
budget: SchedulingBudget,
curr_loras: Optional[Set[int]],
enable_chunking: bool = False,
) -> SchedulerSwappedInOutputs:
...
blocks_to_swap_in: List[Tuple[int, int]] = []
blocks_to_copy: List[Tuple[int, int]] = []
decode_seq_groups: List[ScheduledSequenceGroup] = []
prefill_seq_groups: List[ScheduledSequenceGroup] = []
infeasible_seq_groups: List[SequenceGroup] = []

swapped_queue = self.swapped

leftover_swapped: Deque[SequenceGroup] = deque()
while swapped_queue:
seq_group = swapped_queue[0]
swapped_queue.popleft()

self._swap_in(seq_group, blocks_to_swap_in)
self._append_slots(seq_group, blocks_to_copy, enable_chunking)
...