|
from collections.abc import Callable |
|
from functools import wraps |
|
from typing import Optional, Union |
|
|
|
from controllers.console.app.error import AppNotFoundError |
|
from extensions.ext_database import db |
|
from libs.login import current_user |
|
from models import App |
|
from models.model import AppMode |
|
|
|
|
|
def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[AppMode]] = None): |
|
def decorator(view_func): |
|
@wraps(view_func) |
|
def decorated_view(*args, **kwargs): |
|
if not kwargs.get("app_id"): |
|
raise ValueError("missing app_id in path parameters") |
|
|
|
app_id = kwargs.get("app_id") |
|
app_id = str(app_id) |
|
|
|
del kwargs["app_id"] |
|
|
|
app_model = ( |
|
db.session.query(App) |
|
.filter(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal") |
|
.first() |
|
) |
|
|
|
if not app_model: |
|
raise AppNotFoundError() |
|
|
|
app_mode = AppMode.value_of(app_model.mode) |
|
if app_mode == AppMode.CHANNEL: |
|
raise AppNotFoundError() |
|
|
|
if mode is not None: |
|
if isinstance(mode, list): |
|
modes = mode |
|
else: |
|
modes = [mode] |
|
|
|
if app_mode not in modes: |
|
mode_values = {m.value for m in modes} |
|
raise AppNotFoundError(f"App mode is not in the supported list: {mode_values}") |
|
|
|
kwargs["app_model"] = app_model |
|
|
|
return view_func(*args, **kwargs) |
|
|
|
return decorated_view |
|
|
|
if view is None: |
|
return decorator |
|
else: |
|
return decorator(view) |
|
|