File size: 3,642 Bytes
a8b3f00 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
from typing import Optional, Union
from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
from models.model import App, EndUser
from models.web import PinnedConversation
from services.conversation_service import ConversationService
class WebConversationService:
@classmethod
def pagination_by_last_id(
cls,
app_model: App,
user: Optional[Union[Account, EndUser]],
last_id: Optional[str],
limit: int,
invoke_from: InvokeFrom,
pinned: Optional[bool] = None,
sort_by="-updated_at",
) -> InfiniteScrollPagination:
include_ids = None
exclude_ids = None
if pinned is not None:
pinned_conversations = (
db.session.query(PinnedConversation)
.filter(
PinnedConversation.app_id == app_model.id,
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
PinnedConversation.created_by == user.id,
)
.order_by(PinnedConversation.created_at.desc())
.all()
)
pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
if pinned:
include_ids = pinned_conversation_ids
else:
exclude_ids = pinned_conversation_ids
return ConversationService.pagination_by_last_id(
app_model=app_model,
user=user,
last_id=last_id,
limit=limit,
invoke_from=invoke_from,
include_ids=include_ids,
exclude_ids=exclude_ids,
sort_by=sort_by,
)
@classmethod
def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
pinned_conversation = (
db.session.query(PinnedConversation)
.filter(
PinnedConversation.app_id == app_model.id,
PinnedConversation.conversation_id == conversation_id,
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
PinnedConversation.created_by == user.id,
)
.first()
)
if pinned_conversation:
return
conversation = ConversationService.get_conversation(
app_model=app_model, conversation_id=conversation_id, user=user
)
pinned_conversation = PinnedConversation(
app_id=app_model.id,
conversation_id=conversation.id,
created_by_role="account" if isinstance(user, Account) else "end_user",
created_by=user.id,
)
db.session.add(pinned_conversation)
db.session.commit()
@classmethod
def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
pinned_conversation = (
db.session.query(PinnedConversation)
.filter(
PinnedConversation.app_id == app_model.id,
PinnedConversation.conversation_id == conversation_id,
PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
PinnedConversation.created_by == user.id,
)
.first()
)
if not pinned_conversation:
return
db.session.delete(pinned_conversation)
db.session.commit()
|