File size: 6,728 Bytes
1364cbe
5a87d26
ccc9d98
58a0ecb
1364cbe
 
 
 
1b48b29
 
8fbbd7f
d088d6c
1364cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
d088d6c
1364cbe
 
 
 
 
8fbbd7f
5a87d26
 
 
 
 
 
 
2af96bc
1364cbe
2af96bc
 
1364cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d088d6c
 
1364cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d088d6c
1364cbe
 
 
 
 
d088d6c
1364cbe
d088d6c
 
1364cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2af96bc
ccc9d98
2af96bc
 
 
 
811a3ef
1364cbe
 
 
 
 
 
 
 
018304a
1364cbe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d088d6c
1364cbe
 
 
d088d6c
1364cbe
 
 
d088d6c
 
1364cbe
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import logging
import os
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from urllib.request import urlopen, urlretrieve

import streamlit as st
from huggingface_hub import HfApi, whoami

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Config:
    """Application configuration."""

    hf_token: str
    hf_username: str
    transformers_version: str = "3.0.0"
    hf_base_url: str = "https://huggingface.co"
    transformers_base_url: str = (
        "https://github.com/xenova/transformers.js/archive/refs"
    )
    repo_path: Path = Path("./transformers.js")

    @classmethod
    def from_env(cls) -> "Config":
        """Create config from environment variables and secrets."""
        system_token = st.secrets.get("HF_TOKEN")
        user_token = st.session_state.get("user_hf_token")
        if user_token:
            hf_username = whoami(token=user_token)["name"]
        else:
            hf_username = (
                os.getenv("SPACE_AUTHOR_NAME") or whoami(token=system_token)["name"]
            )
        hf_token = user_token or system_token

        if not hf_token:
            raise ValueError("HF_TOKEN must be set")

        return cls(hf_token=hf_token, hf_username=hf_username)


class ModelConverter:
    """Handles model conversion and upload operations."""

    def __init__(self, config: Config):
        self.config = config
        self.api = HfApi(token=config.hf_token)

    def _get_ref_type(self) -> str:
        """Determine the reference type for the transformers repository."""
        url = f"{self.config.transformers_base_url}/tags/{self.config.transformers_version}.tar.gz"
        try:
            return "tags" if urlopen(url).getcode() == 200 else "heads"
        except Exception as e:
            logger.warning(f"Failed to check tags, defaulting to heads: {e}")
            return "heads"

    def setup_repository(self) -> None:
        """Download and setup transformers repository if needed."""
        if self.config.repo_path.exists():
            return

        ref_type = self._get_ref_type()
        archive_url = f"{self.config.transformers_base_url}/{ref_type}/{self.config.transformers_version}.tar.gz"
        archive_path = Path(f"./transformers_{self.config.transformers_version}.tar.gz")

        try:
            urlretrieve(archive_url, archive_path)
            self._extract_archive(archive_path)
            logger.info("Repository downloaded and extracted successfully")
        except Exception as e:
            raise RuntimeError(f"Failed to setup repository: {e}")
        finally:
            archive_path.unlink(missing_ok=True)

    def _extract_archive(self, archive_path: Path) -> None:
        """Extract the downloaded archive."""
        import tarfile
        import tempfile

        with tempfile.TemporaryDirectory() as tmp_dir:
            with tarfile.open(archive_path, "r:gz") as tar:
                tar.extractall(tmp_dir)

            extracted_folder = next(Path(tmp_dir).iterdir())
            extracted_folder.rename(self.config.repo_path)

    def convert_model(self, input_model_id: str) -> Tuple[bool, Optional[str]]:
        """Convert the model to ONNX format."""
        try:
            result = subprocess.run(
                [
                    sys.executable,
                    "-m",
                    "scripts.convert",
                    "--quantize",
                    "--model_id",
                    input_model_id,
                ],
                cwd=self.config.repo_path,
                capture_output=True,
                text=True,
                env={},
            )

            if result.returncode != 0:
                return False, result.stderr

            return True, result.stderr

        except Exception as e:
            return False, str(e)

    def upload_model(self, input_model_id: str, output_model_id: str) -> Optional[str]:
        """Upload the converted model to Hugging Face."""
        try:
            self.api.create_repo(output_model_id, exist_ok=True, private=False)
            model_folder_path = self.config.repo_path / "models" / input_model_id

            self.api.upload_folder(
                folder_path=str(model_folder_path), repo_id=output_model_id
            )
            return None
        except Exception as e:
            return str(e)
        finally:
            import shutil

            shutil.rmtree(model_folder_path, ignore_errors=True)


def main():
    """Main application entry point."""
    st.write("## Convert a Hugging Face model to ONNX")

    try:
        config = Config.from_env()
        converter = ModelConverter(config)
        converter.setup_repository()

        input_model_id = st.text_input(
            "Enter the Hugging Face model ID to convert. Example: `EleutherAI/pythia-14m`"
        )

        if not input_model_id:
            return

        st.text_input(
            f"Optional: Your Hugging Face write token. Fill it if you want to upload the model under your account.",
            type="password",
            key="user_hf_token",
        )

        model_name = input_model_id.split("/")[-1]
        output_model_id = f"{config.hf_username}/{model_name}-ONNX"
        output_model_url = f"{config.hf_base_url}/{output_model_id}"

        if converter.api.repo_exists(output_model_id):
            st.write("This model has already been converted! 🎉")
            st.link_button(f"Go to {output_model_id}", output_model_url, type="primary")
            return

        st.write(f"URL where the model will be converted and uploaded to:")
        st.code(output_model_url, language="plaintext")

        if not st.button(label="Proceed", type="primary"):
            return

        with st.spinner("Converting model..."):
            success, stderr = converter.convert_model(input_model_id)
            if not success:
                st.error(f"Conversion failed: {stderr}")
                return

            st.success("Conversion successful!")
            st.code(stderr)

        with st.spinner("Uploading model..."):
            error = converter.upload_model(input_model_id, output_model_id)
            if error:
                st.error(f"Upload failed: {error}")
                return

            st.success("Upload successful!")
            st.write("You can now go and view the model on Hugging Face!")
            st.link_button(f"Go to {output_model_id}", output_model_url, type="primary")

    except Exception as e:
        logger.exception("Application error")
        st.error(f"An error occurred: {str(e)}")


if __name__ == "__main__":
    main()