File size: 3,961 Bytes
4d84406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
How to use:
1. Create a Space with a Persistent Storage attached. Filesystem will be available under `/data`.
2. Add `hf_oauth: true` to the Space metadata (README.md). Make sure to have Gradio>=3.41.0 configured.
3. Add `HISTORY_FOLDER` as a Space variable (example. `"/data/history"`).
4. Add `filelock` as dependency in `requirements.txt`.
5. Add history gallery to your Gradio app:
    a. Add imports: `from gallery_history import fetch_gallery_history, show_gallery_history`
    a. Add `history = show_gallery_history()` within `gr.Blocks` context.
    b. Add `.then(fn=fetch_gallery_history, inputs=[prompt, result], outputs=history)` on the generate event.
"""
import json
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from uuid import uuid4

import gradio as gr
from filelock import FileLock

_folder = os.environ.get("HISTORY_FOLDER")
if _folder is None:
    print(
        "'HISTORY_FOLDER' environment variable not set. User history will be saved "
        "locally and will be lost when the Space instance is restarted."
    )
    _folder = Path(__file__).parent / "history"
HISTORY_FOLDER_PATH = Path(_folder)

IMAGES_FOLDER_PATH = HISTORY_FOLDER_PATH / "images"
IMAGES_FOLDER_PATH.mkdir(parents=True, exist_ok=True)


def show_gallery_history():
    gr.Markdown("## Past images\n\nYou must be logged in to activate it.")
    with gr.Column():
        with gr.Row():
            gr.LoginButton()
            gr.LogoutButton()
        gallery = gr.Gallery(
            label="Past images",
            show_label=True,
            elem_id="gallery",
            object_fit="contain",
            height="auto",
            preview=True,
            show_share_button=True,
            show_download_button=True,
        )
        gallery.attach_load_event(fetch_gallery_history, every=None)
        return gallery


def fetch_gallery_history(
    prompt: Optional[str] = None,
    result: Optional[Dict] = None,
    user: Optional[gr.OAuthProfile] = None,
):
    if user is None:
        return []
    try:
        if prompt is not None and result is not None:  # None values means no new images
            return _update_user_history(
                user["preferred_username"], [(item["name"], prompt) for item in result]
            )
        else:
            return _read_user_history(user["preferred_username"])
    except Exception as e:
        raise gr.Error(f"Error while fetching history: {e}") from e


####################
# Internal helpers #
####################


def _read_user_history(username: str) -> List[Tuple[str, str]]:
    """Return saved history for that user."""
    with _user_lock(username):
        path = _user_history_path(username)
        if path.exists():
            return json.loads(path.read_text())
        return []  # No history yet


def _update_user_history(
    username: str, new_images: List[Tuple[str, str]]
) -> List[Tuple[str, str]]:
    """Update history for that user and return it."""
    with _user_lock(username):
        # Read existing
        path = _user_history_path(username)
        if path.exists():
            images = json.loads(path.read_text())
        else:
            images = []  # No history yet

        # Copy images to persistent folder
        for src_path, prompt in new_images:
            images.append((_copy_image(src_path), prompt))

        # Save and return
        path.write_text(json.dumps(images))
        return images


def _user_history_path(username: str) -> Path:
    return HISTORY_FOLDER_PATH / f"{username}.json"


def _user_lock(username: str) -> FileLock:
    """Ensure history is not corrupted if concurrent calls."""
    return FileLock(f"{_user_history_path(username)}.lock")


def _copy_image(src: str) -> str:
    """Copy image to the persistent storage."""
    dst = IMAGES_FOLDER_PATH / f"{uuid4().hex}_{Path(src).name}"  # keep file ext
    shutil.copyfile(src, dst)
    return str(dst)