forked from facebookresearch/chameleon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
loader.py
71 lines (50 loc) · 2.04 KB
/
loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the Chameleon License found in the
# LICENSE file in the root directory of this source tree.
import glob
import inspect
import json
from pathlib import Path
import torch
from chameleon.inference.transformer import ModelArgs, Transformer
def _convert(model_args: ModelArgs, consolidated_path: Path) -> Transformer:
old_default_dtype = torch.get_default_dtype()
torch.set_default_dtype(torch.bfloat16)
model = Transformer(model_args)
transfer_results = model.load_state_dict(
torch.load(str(consolidated_path)),
strict=False,
)
# TODO: More generally, assert missing or unexpected keys are buffers.
assert transfer_results.missing_keys == []
assert transfer_results.unexpected_keys == ["rope.freqs"]
model.eval()
torch.set_default_dtype(old_default_dtype)
return model
def _get_checkpoint_path(src_dir: Path, rank: int | None) -> Path:
base_path = src_dir / "consolidated.pth"
if not rank and base_path.exists():
return base_path
alt_path = src_dir / f"consolidated.{rank:02}.pth"
if alt_path.exists():
return alt_path
raise ValueError("Consolidated checkpoint not found.")
def load_model(path: str, rank: int | None = None) -> Transformer:
src_dir = Path(path)
with open(src_dir / "params.json", "r") as f:
params = json.loads(f.read())
with open(src_dir / "consolidate_params.json", "r") as f:
consolidate_params = json.loads(f.read())
params = {**params, **params["model"], **consolidate_params}
known_param = inspect.signature(ModelArgs.__init__).parameters
filtered_params = {k: v for k, v in params.items() if k in known_param}
return _convert(
ModelArgs(**filtered_params),
_get_checkpoint_path(src_dir, rank),
)
def detect_shard_count(path: str) -> int:
src_dir = Path(path)
if (src_dir / "consolidated.pth").exists():
return 1
return len(glob.glob(str(src_dir / "consolidated.*.pth")))