summaryrefslogtreecommitdiff
path: root/src/personalization/models/embedding/qwen3_8b.py
blob: fb02e678065174cd945f598a7e00f32fb6576a61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import annotations

from typing import List, Sequence

import torch
from transformers import AutoModel, AutoTokenizer

from personalization.config.registry import choose_dtype, choose_device_map
from personalization.config.settings import LocalModelsConfig
from .base import EmbeddingModel, _mean_pool, _maybe_normalize


class Qwen3Embedding8B(EmbeddingModel):
    def __init__(
        self,
        model_path: str,
        dtype: torch.dtype,
        device_map: str = "auto",
        trust_remote_code: bool = True,
    ) -> None:
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_path, use_fast=True, trust_remote_code=trust_remote_code
        )
        
        # Handle specific device assignment (e.g., "cuda:0", "cuda:1")
        if device_map and device_map.startswith("cuda:"):
            # Load to CPU first, then move to specific GPU
            self.model = AutoModel.from_pretrained(
                model_path,
                torch_dtype=dtype,
                device_map=None,  # Don't use accelerate's device_map
                trust_remote_code=trust_remote_code,
                low_cpu_mem_usage=True,
            )
            self.model = self.model.to(device_map)
        else:
            # Use accelerate's auto device mapping
            self.model = AutoModel.from_pretrained(
                model_path,
                torch_dtype=dtype,
                device_map=device_map,
                trust_remote_code=trust_remote_code,
                low_cpu_mem_usage=True,
            )

    @classmethod
    def from_config(cls, cfg: LocalModelsConfig) -> "Qwen3Embedding8B":
        if not cfg.embedding or not cfg.embedding.qwen3:
            raise ValueError("Embedding config for qwen3 is missing")
        spec = cfg.embedding.qwen3
        dtype = choose_dtype(spec.dtype)
        device_map = choose_device_map(spec.device_map)
        return cls(
            spec.local_path,
            dtype=dtype,
            device_map=device_map,
            trust_remote_code=True,
        )

    @torch.inference_mode()
    def encode(
        self,
        texts: Sequence[str],
        batch_size: int = 8,
        max_length: int = 512,
        normalize: bool = True,
        return_tensor: bool = False,
    ) -> List[List[float]] | torch.Tensor:
        device = next(self.model.parameters()).device
        outputs: List[torch.Tensor] = []
        for i in range(0, len(texts), batch_size):
            batch = list(texts[i : i + batch_size])
            enc = self.tokenizer(
                batch,
                padding=True,
                truncation=True,
                max_length=max_length,
                return_tensors="pt",
            ).to(device)
            model_out = self.model(**enc, output_hidden_states=False, return_dict=True)
            pooled = _mean_pool(model_out.last_hidden_state, enc["attention_mask"])  # type: ignore[attr-defined]
            pooled = _maybe_normalize(pooled, normalize)
            outputs.append(pooled)
        emb = torch.cat(outputs, dim=0)
        if return_tensor:
            return emb
        return emb.cpu().to(torch.float32).tolist()