Skip to content

vllm.v1.kv_offload.cpu.shared_offload_region

SharedOffloadRegion

Single mmap-backed memory region shared across all workers for a vLLM instance. Workers coordinate via the filesystem: the first worker to open the file with O_EXCL becomes the creator and calls ftruncate; the rest open the existing file and wait until it reaches the expected size. Each worker then mmap()s the full file.

File path: /dev/shm/vllm_offload_{instance_id}.mmap

Source code in vllm/v1/kv_offload/cpu/shared_offload_region.py
class SharedOffloadRegion:
    """
    Single mmap-backed memory region shared across all workers for a
    vLLM instance.  Workers coordinate via the filesystem: the first worker
    to open the file with O_EXCL becomes the creator and calls ftruncate;
    the rest open the existing file and wait until it reaches the expected
    size.  Each worker then mmap()s the full file.

    File path: /dev/shm/vllm_offload_{instance_id}.mmap
    """

    def __init__(
        self,
        instance_id: str,
        total_size_bytes: int,
        num_blocks: int,
        rank: int | None,
        num_workers: int,
        cpu_page_size: int,
    ) -> None:
        self.page_size = mmap.PAGESIZE

        self.total_size_bytes = total_size_bytes
        self.mmap_path = f"/dev/shm/vllm_offload_{instance_id}.mmap"
        self._creator = False  # set True only if this worker creates the file
        self.num_blocks = num_blocks
        self.rank = rank
        # interleaved-layout stride: one row = all workers' data for one block
        self._row_stride = cpu_page_size * num_workers
        if rank is not None:
            # byte offset to this worker's first slot within each block row
            self._worker_offset = rank * cpu_page_size
            # exclusive upper bound for this worker's area within each row
            self._worker_area_end = (rank + 1) * cpu_page_size
        try:
            # Exclusive create — only one worker succeeds
            self.fd: int | None = os.open(
                self.mmap_path, os.O_CREAT | os.O_EXCL | os.O_RDWR, 0o600
            )
            os.ftruncate(self.fd, self.total_size_bytes)
            self._creator = True
            logger.info(
                "Created mmap file %s (%.2f GB)",
                self.mmap_path,
                self.total_size_bytes / 1e9,
            )
        except FileExistsError:
            self.fd = os.open(self.mmap_path, os.O_RDWR)
            _wait_for_file_size(self.fd, self.total_size_bytes)
            logger.info("Opened existing mmap file %s", self.mmap_path)

        self.mmap_obj: mmap.mmap | None = mmap.mmap(
            self.fd,
            self.total_size_bytes,
            flags=mmap.MAP_SHARED,
            prot=mmap.PROT_READ | mmap.PROT_WRITE,
        )

        # MADV_POPULATE_WRITE was added in Linux 5.14 (value 23).
        _MADV_POPULATE_WRITE = getattr(mmap, "MADV_POPULATE_WRITE", 23)
        if rank is not None:
            # Populate only this worker's pages (one slot per block row).
            worker_offset = rank * cpu_page_size
            _t0 = time.perf_counter()
            page_size = self.page_size
            for block in range(num_blocks):
                raw_offset = block * self._row_stride + worker_offset
                aligned_offset = (raw_offset // page_size) * page_size
                end = raw_offset + cpu_page_size
                aligned_length = end - aligned_offset
                self.mmap_obj.madvise(
                    _MADV_POPULATE_WRITE, aligned_offset, aligned_length
                )
            logger.debug(
                "MADV_POPULATE_WRITE loop: %d blocks in %.3f s",
                num_blocks,
                time.perf_counter() - _t0,
            )
        else:
            # No rank — populate the entire shared region in one call.
            _t0 = time.perf_counter()
            self.mmap_obj.madvise(_MADV_POPULATE_WRITE, 0, self.total_size_bytes)
            logger.debug(
                "MADV_POPULATE_WRITE entire region: %.3f s", time.perf_counter() - _t0
            )

        self._base = torch.frombuffer(memoryview(self.mmap_obj), dtype=torch.int8)
        self._views: list[torch.Tensor] = []
        self.is_pinned: bool = False

    def create_next_view(self, tensor_page_size: int) -> torch.Tensor:
        """Allocate a strided int8 view for this worker, one canonical tensor.

        Must be called once per canonical tensor. The full mmap layout is:

            worker0_block0 | worker1_block0 | ... | worker{M-1}_block0
            worker0_block1 | worker1_block1 | ... | worker{M-1}_block1
            ...

        Each worker_block cell is cpu_page_size bytes and holds all canonical
        tensors for that worker and block concatenated:
            [ tensor0_data | tensor1_data | ... | tensor{L-1}_data ]

        Consecutive rows are separated by row_stride = cpu_page_size * M.

        Returns an int8 tensor of shape (num_blocks, tensor_page_size) with stride
        (row_stride, 1).  Using int8 keeps stride == bytes, so swap_blocks
        address arithmetic works without any dtype conversion.

        Args:
            tensor_page_size: Bytes per block for this  tensor.
        """
        assert self.rank is not None
        new_offset = self._worker_offset + tensor_page_size
        assert new_offset <= self._worker_area_end, (
            f"Worker offset {new_offset} exceeds worker area end "
            f"{self._worker_area_end} (overflowed by "
            f"{new_offset - self._worker_area_end} bytes)"
        )
        worker_layer_view = torch.as_strided(
            self._base,
            size=(self.num_blocks, tensor_page_size),
            stride=(self._row_stride, 1),
            storage_offset=self._worker_offset,
        )
        self._worker_offset = new_offset
        self._views.append(worker_layer_view)
        return worker_layer_view

    def cleanup(self) -> None:
        if self.is_pinned and self._base is not None:
            base_ptr = self._base.data_ptr()
            result = torch.cuda.cudart().cudaHostUnregister(base_ptr)
            if result.value != 0:
                logger.warning(
                    "cudaHostUnregister failed for rank=%d (code=%d)", self.rank, result
                )
            self.is_pinned = False
        # Release views before _base: each view holds a _base reference and a
        # direct StorageImpl reference.  Freeing views first lets both refcounts
        # drop so the storage (which holds the mmap_obj buffer export) is freed
        # before mmap_obj.close() is called below.
        if self._views is not None:
            self._views.clear()
        self._base = None
        if self.mmap_obj:
            try:
                self.mmap_obj.close()
            except Exception:
                logger.warning("Failed to close mmap_obj", exc_info=True)
            self.mmap_obj = None
        if self.fd is not None:
            try:
                os.close(self.fd)
            except Exception:
                logger.warning("Failed to close fd %s", self.fd, exc_info=True)
            self.fd = None
        if self._creator and getattr(self, "mmap_path", None):
            try:
                os.unlink(self.mmap_path)
                logger.info("Removed mmap file %s", self.mmap_path)
            except Exception:
                logger.warning(
                    "Failed to unlink path %s", self.mmap_path, exc_info=True
                )
            self._creator = False

create_next_view

create_next_view(tensor_page_size: int) -> Tensor

Allocate a strided int8 view for this worker, one canonical tensor.

Must be called once per canonical tensor. The full mmap layout is:

worker0_block0 | worker1_block0 | ... | worker{M-1}_block0
worker0_block1 | worker1_block1 | ... | worker{M-1}_block1
...

Each worker_block cell is cpu_page_size bytes and holds all canonical tensors for that worker and block concatenated: [ tensor0_data | tensor1_data | ... | tensor{L-1}_data ]

Consecutive rows are separated by row_stride = cpu_page_size * M.

Returns an int8 tensor of shape (num_blocks, tensor_page_size) with stride (row_stride, 1). Using int8 keeps stride == bytes, so swap_blocks address arithmetic works without any dtype conversion.

Parameters:

Name Type Description Default
tensor_page_size int

Bytes per block for this tensor.

required
Source code in vllm/v1/kv_offload/cpu/shared_offload_region.py
def create_next_view(self, tensor_page_size: int) -> torch.Tensor:
    """Allocate a strided int8 view for this worker, one canonical tensor.

    Must be called once per canonical tensor. The full mmap layout is:

        worker0_block0 | worker1_block0 | ... | worker{M-1}_block0
        worker0_block1 | worker1_block1 | ... | worker{M-1}_block1
        ...

    Each worker_block cell is cpu_page_size bytes and holds all canonical
    tensors for that worker and block concatenated:
        [ tensor0_data | tensor1_data | ... | tensor{L-1}_data ]

    Consecutive rows are separated by row_stride = cpu_page_size * M.

    Returns an int8 tensor of shape (num_blocks, tensor_page_size) with stride
    (row_stride, 1).  Using int8 keeps stride == bytes, so swap_blocks
    address arithmetic works without any dtype conversion.

    Args:
        tensor_page_size: Bytes per block for this  tensor.
    """
    assert self.rank is not None
    new_offset = self._worker_offset + tensor_page_size
    assert new_offset <= self._worker_area_end, (
        f"Worker offset {new_offset} exceeds worker area end "
        f"{self._worker_area_end} (overflowed by "
        f"{new_offset - self._worker_area_end} bytes)"
    )
    worker_layer_view = torch.as_strided(
        self._base,
        size=(self.num_blocks, tensor_page_size),
        stride=(self._row_stride, 1),
        storage_offset=self._worker_offset,
    )
    self._worker_offset = new_offset
    self._views.append(worker_layer_view)
    return worker_layer_view

_wait_for_file_size

_wait_for_file_size(
    fd: int, expected_size: int, timeout: float = 30.0
) -> None

Spin-wait until the file reaches expected_size (creator truncated it).

Source code in vllm/v1/kv_offload/cpu/shared_offload_region.py
def _wait_for_file_size(fd: int, expected_size: int, timeout: float = 30.0) -> None:
    """Spin-wait until the file reaches expected_size (creator truncated it)."""
    deadline = time.monotonic() + timeout
    while True:
        if os.fstat(fd).st_size >= expected_size:
            return
        if time.monotonic() > deadline:
            raise TimeoutError(
                f"Timed out waiting for mmap file to reach {expected_size} bytes"
            )
        time.sleep(0.005)