In [27]:
! pip install faknow sentence-transformers chromadb




In [28]:
import pandas as pd
import os
import chromadb
from chromadb.utils import embedding_functions
import math





def create_domain_identification_database(vdb_path: str,collection_name:str , df: pd.DataFrame) -> None:
 """This function processes the dataframe into the required format, and then creates the following collections in a ChromaDB instance
 1. domain_identification_collection - Contains input text embeddings, and the metadata the other columns

 Args:
 collection_name (str) : name of database collection
 vdb_path (str): Relative path of the location of the ChromaDB instance.
 df (pd.DataFrame): task scheduling dataset.

 """

 #identify the saving location of the ChromaDB
 chroma_client = chromadb.PersistentClient(path=vdb_path)

 #extract the embedding from hugging face
 embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="sentence-transformers/LaBSE")

 #creating the collection
 domain_identification_collection = chroma_client.create_collection(
 name=collection_name,
 embedding_function=embedding_function,
 )


 # the main text "query" that will be embedded
 domain_identification_documents = [row.query for row in df.itertuples()]

 # the metadata
 domain_identification_metadata = [
 {"domain": row.domain , "label": row.label}
 for row in df.itertuples()
 ]

 #index
 domain_ids = ["domain_id " + str(row.Index) for row in df.itertuples()]


 length = len(df)
 num_iteration = length / 166
 num_iteration = math.ceil(num_iteration)

 start = 0
 # start adding the the vectors
 for i in range(num_iteration):
 if i == num_iteration - 1 :
 domain_identification_collection.add(documents=domain_identification_documents[start:], metadatas=domain_identification_metadata[start:], ids=domain_ids[start:])
 else:
 end = start + 166
 domain_identification_collection.add(documents=domain_identification_documents[start:end], metadatas=domain_identification_metadata[start:end], ids=domain_ids[start:end])
 start = end
 return None



def delete_collection_from_vector_db(vdb_path: str, collection_name: str) -> None:
 """Deletes a particular collection from the persistent ChromaDB instance.

 Args:
 vdb_path (str): Path of the persistent ChromaDB instance.
 collection_name (str): Name of the collection to be deleted.
 """
 chroma_client = chromadb.PersistentClient(path=vdb_path)
 chroma_client.delete_collection(collection_name)
 return None


def list_collections_from_vector_db(vdb_path: str) -> None:
 """Lists all the available collections from the persistent ChromaDB instance.

 Args:
 vdb_path (str): Path of the persistent ChromaDB instance.
 """
 chroma_client = chromadb.PersistentClient(path=vdb_path)
 print(chroma_client.list_collections())


def get_collection_from_vector_db(
 vdb_path: str, collection_name: str
) -> chromadb.Collection:
 """Fetches a particular ChromaDB collection object from the persistent ChromaDB instance.

 Args:
 vdb_path (str): Path of the persistent ChromaDB instance.
 collection_name (str): Name of the collection which needs to be retrieved.
 """
 chroma_client = chromadb.PersistentClient(path=vdb_path)

 huggingface_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="sentence-transformers/LaBSE")




 collection = chroma_client.get_collection(
 name=collection_name, embedding_function=huggingface_ef
 )

 return collection


def retrieval( input_text : str,
 num_results : int,
 collection: chromadb.Collection ):

 """fetches the domain name from the collection based on the semantic similarity

 args:
 input_text : the received text which can be news , posts , or tweets
 num_results : number of fetched examples from the collection
 collection : the extracted collection from the database that we will fetch examples from

 """


 fetched_domain = collection.query(
 query_texts = [input_text],
 n_results = num_results,
 )

 #extracting domain name and label from the featched domains

 domain = fetched_domain["metadatas"][0][0]["domain"]
 label = fetched_domain["metadatas"][0][0]["label"]
 distance = fetched_domain["distances"][0][0]

 return domain , label , distance

In [29]:
from transformers import pipeline






def english_information_extraction(text: str):




 zeroshot_classifier = pipeline("zero-shot-classification", model="MoritzLaurer/deberta-v3-large-zeroshot-v1.1-all-33")

 hypothesis_template_domain = "This text is about {}"
 domain_classes = ["women" , "muslims" , "tamil" , "sinhala" , "other"]
 domains_output= zeroshot_classifier(text, domain_classes , hypothesis_template=hypothesis_template_domain, multi_label=False)

 sentiment_discrimination_prompt = f"the content of this text about {domains_output['labels'][0]} "
 hypothesis_template_sentiment = "is {} sentiment"
 hypothesis_template_sentiment = sentiment_discrimination_prompt + hypothesis_template_sentiment

 sentiment_classes = ["positive" ,"neutral", "negative"]
 sentiment_output= zeroshot_classifier(text, sentiment_classes , hypothesis_template=hypothesis_template_sentiment, multi_label=False)

 hypothesis_template_discrimination = "is {}"
 hypothesis_template_discrimination = sentiment_discrimination_prompt + hypothesis_template_discrimination

 discrimination_classes = ["hateful" , "not hateful"]

 discrimination_output= zeroshot_classifier(text, discrimination_classes , hypothesis_template=hypothesis_template_discrimination, multi_label=False)

 domain_label , domain_score = domains_output["labels"][0] , domains_output["scores"][0]
 sentiment_label , sentiment_score = sentiment_output["labels"][0] , sentiment_output["scores"][0]
 discrimination_label , discrimination_score = discrimination_output["labels"][0] , discrimination_output["scores"][0]

 return {"domain_label" : domain_label,
 "domain_score" : domain_score,
 "sentiment_label" : sentiment_label,
 "sentiment_score" : sentiment_score,
 "discrimination_label" : discrimination_label,
 "discrimination_score": discrimination_score}





In [51]:
#the model
from typing import List, Optional, Tuple

import torch
from torch import Tensor
from torch import nn
from transformers import RobertaModel

from faknow.model.layers.layer import TextCNNLayer
from faknow.model.model import AbstractModel
from faknow.data.process.text_process import TokenizerFromPreTrained
import pandas as pd
import gdown
import os

class _MLP(nn.Module):
 def __init__(self,
 input_dim: int,
 embed_dims: List[int],
 dropout_rate: float,
 output_layer=True):
 super().__init__()
 layers = list()
 for embed_dim in embed_dims:
 layers.append(nn.Linear(input_dim, embed_dim))
 layers.append(nn.BatchNorm1d(embed_dim))
 layers.append(nn.ReLU())
 layers.append(nn.Dropout(p=dropout_rate))
 input_dim = embed_dim
 if output_layer:
 layers.append(torch.nn.Linear(input_dim, 1))
 self.mlp = torch.nn.Sequential(*layers)

 def forward(self, x: Tensor) -> Tensor:
 """

 Args:
 x (Tensor): shared feature from domain and text, shape=(batch_size, embed_dim)

 """
 return self.mlp(x)


class _MaskAttentionLayer(torch.nn.Module):
 """
 Compute attention layer
 """
 def __init__(self, input_size: int):
 super(_MaskAttentionLayer, self).__init__()
 self.attention_layer = torch.nn.Linear(input_size, 1)

 def forward(self,
 inputs: Tensor,
 mask: Optional[Tensor] = None) -> Tuple[Tensor, Tensor]:
 weights = self.attention_layer(inputs).view(-1, inputs.size(1))
 if mask is not None:
 weights = weights.masked_fill(mask == 0, float("-inf"))
 weights = torch.softmax(weights, dim=-1).unsqueeze(1)
 outputs = torch.matmul(weights, inputs).squeeze(1)
 return outputs, weights


class MDFEND(AbstractModel):
 r"""
 MDFEND: Multi-domain Fake News Detection, CIKM 2021
 paper: https://dl.acm.org/doi/10.1145/3459637.3482139
 code: https://github.com/kennqiang/MDFEND-Weibo21
 """
 def __init__(self,
 pre_trained_bert_name: str,
 domain_num: int,
 mlp_dims: Optional[List[int]] = None,
 dropout_rate=0.2,
 expert_num=5):
 """

 Args:
 pre_trained_bert_name (str): the name or local path of pre-trained bert model
 domain_num (int): total number of all domains
 mlp_dims (List[int]): a list of the dimensions in MLP layer, if None, [384] will be taken as default, default=384
 dropout_rate (float): rate of Dropout layer, default=0.2
 expert_num (int): number of experts also called TextCNNLayer, default=5
 """
 super(MDFEND, self).__init__()
 self.domain_num = domain_num
 self.expert_num = expert_num
 self.bert = RobertaModel.from_pretrained(
 pre_trained_bert_name).requires_grad_(False)
 self.embedding_size = self.bert.config.hidden_size
 self.loss_func = nn.BCELoss()
 if mlp_dims is None:
 mlp_dims = [384]

 filter_num = 64
 filter_sizes = [1, 2, 3, 5, 10]
 experts = [
 TextCNNLayer(self.embedding_size, filter_num, filter_sizes)
 for _ in range(self.expert_num)
 ]
 self.experts = nn.ModuleList(experts)

 self.gate = nn.Sequential(
 nn.Linear(self.embedding_size * 2, mlp_dims[-1]), nn.ReLU(),
 nn.Linear(mlp_dims[-1], self.expert_num), nn.Softmax(dim=1))

 self.attention = _MaskAttentionLayer(self.embedding_size)

 self.domain_embedder = nn.Embedding(num_embeddings=self.domain_num,
 embedding_dim=self.embedding_size)
 self.classifier = _MLP(320, mlp_dims, dropout_rate)

 def forward(self, token_id: Tensor, mask: Tensor,
 domain: Tensor) -> Tensor:
 """

 Args:
 token_id (Tensor): token ids from bert tokenizer, shape=(batch_size, max_len)
 mask (Tensor): mask from bert tokenizer, shape=(batch_size, max_len)
 domain (Tensor): domain id, shape=(batch_size,)

 Returns:
 FloatTensor: the prediction of being fake, shape=(batch_size,)
 """
 text_embedding = self.bert(token_id,
 attention_mask=mask).last_hidden_state
 attention_feature, _ = self.attention(text_embedding, mask)

 domain_embedding = self.domain_embedder(domain.view(-1, 1)).squeeze(1)

 gate_input = torch.cat([domain_embedding, attention_feature], dim=-1)
 gate_output = self.gate(gate_input)

 shared_feature = 0
 for i in range(self.expert_num):
 expert_feature = self.experts[i](text_embedding)
 shared_feature += (expert_feature * gate_output[:, i].unsqueeze(1))

 label_pred = self.classifier(shared_feature)

 return torch.sigmoid(label_pred.squeeze(1))

 def calculate_loss(self, data) -> Tensor:
 """
 calculate loss via BCELoss

 Args:
 data (dict): batch data dict

 Returns:
 loss (Tensor): loss value
 """

 token_ids = data['text']['token_id']
 masks = data['text']['mask']
 domains = data['domain']
 labels = data['label']
 output = self.forward(token_ids, masks, domains)
 return self.loss_func(output, labels.float())

 def predict(self, data_without_label) -> Tensor:
 """
 predict the probability of being fake news

 Args:
 data_without_label (Dict[str, Any]): batch data dict

 Returns:
 Tensor: one-hot probability, shape=(batch_size, 2)
 """

 token_ids = data_without_label['text']['token_id']
 masks = data_without_label['text']['mask']
 domains = data_without_label['domain']

 # shape=(n,), data = 1 or 0
 round_pred = torch.round(self.forward(token_ids, masks,
 domains)).long()
 # after one hot: shape=(n,2), data = [0,1] or [1,0]
 one_hot_pred = torch.nn.functional.one_hot(round_pred, num_classes=2)
 return one_hot_pred


def download_from_gdrive(file_id, output_path):
 output = os.path.join(output_path)

 # Check if the file already exists
 if not os.path.exists(output):
 gdown.download(id=file_id, output=output, quiet=False)


 return output



def loading_model_and_tokenizer():
 max_len, bert = 160, 'FacebookAI/xlm-roberta-base'
 #https://drive.google.com/file/d/1--6GB3Ff81sILwtuvVTuAW3shGW_5VWC/view

 file_id = "1--6GB3Ff81sILwtuvVTuAW3shGW_5VWC"

 model_path = '/content/drive/MyDrive/models/last-epoch-model-2024-03-17-01_00_32_1.pth'

 MODEL_SAVE_PATH = download_from_gdrive(file_id, model_path)
 domain_num = 4



 tokenizer = TokenizerFromPreTrained(max_len, bert)

 model = MDFEND(bert, domain_num , expert_num=12 , mlp_dims = [3010, 2024 ,1012 ,606 , 400])

 model.load_state_dict(torch.load(f=MODEL_SAVE_PATH , map_location=torch.device('cpu')))

 model.requires_grad_(False)

 return tokenizer , model

In [31]:
import pandas as pd
import torch
def preparing_data(text:str , domain: int):
 """



 Args:
 text (_str_): input text from the user
 domain (_int_): output domain from domain identification pipeline

 Returns:
 _DataFrame_: dataframe contains texts and domain
 """
 # Let's assume you have the following dictionary
 # the model can't do inference with only one example so this dummy example must be put
 dict_data = {
 'text': ['hello world' ] ,
 'domain': [0] ,
 }

 dict_data["text"].append(text)
 dict_data["domain"].append(domain)
 # Convert the dictionary to a DataFrame
 df = pd.DataFrame(dict_data)

 # return the dataframe
 return df


def loading_data(tokenizer , df: pd.DataFrame ):
 ids = []
 masks = []
 domain_list = []

 texts = df["text"]
 domains= df["domain"]


 for i in range(len(df)):
 text = texts[i]
 token = tokenizer(text)
 ids.append(token["token_id"])
 masks.append(token["mask"])
 domain_list.append(domains[i])

 input_ids = torch.cat(ids , dim=0)
 input_masks = torch.cat(masks ,dim = 0)
 input_domains = torch.tensor(domain_list)


 return input_ids , input_masks , input_domains

In [32]:
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer

def language_identification(texts):
 text = [
 texts,

 ]

 model_ckpt = "papluca/xlm-roberta-base-language-detection"
 tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
 model = AutoModelForSequenceClassification.from_pretrained(model_ckpt)

 inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")

 with torch.no_grad():
 logits = model(**inputs).logits

 preds = torch.softmax(logits, dim=-1)

 # Map raw predictions to languages
 id2lang = model.config.id2label
 vals, idxs = torch.max(preds, dim=1)
 lang_dict = {id2lang[k.item()]: v.item() for k, v in zip(idxs, vals)}
 return lang_dict

In [33]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [34]:

def run_pipeline(input_text:str):

 language_dict = language_identification(input_text)
 language_code = next(iter(language_dict))

 if language_code == "en":

 output_english = english_information_extraction(input_text)

 return output_english

 else:


 num_results = 1
 path = "/content/drive/MyDrive/general_domains/vector_database"
 collection_name = "general_domains"


 collection = get_collection_from_vector_db(path , collection_name)

 domain , label_domain , distance = retrieval(input_text , num_results , collection )

 if distance >1.45:
 domain = "undetermined"

 tokenizer , model = loading_model_and_tokenizer()

 df = preparing_data(input_text , label_domain)

 input_ids , input_masks , input_domains = loading_data(tokenizer , df )

 labels = []
 outputs = []
 with torch.no_grad():

 pred = model.forward(input_ids, input_masks , input_domains)
 labels.append([])

 for output in pred:
 number = output.item()
 label = int(1) if number >= 0.5 else int(0)
 labels[-1].append(label)
 outputs.append(pred)

 discrimination_class = ["discriminative" if i == int(1) else "not discriminative" for i in labels[0]]


 return { "domain_label" :domain ,
 "domain_score":distance ,
 "discrimination_label" : discrimination_class[-1],
 "discrimination_score" : outputs[0][1:].item(),
 }
















In [35]:
input_text_1 = input("input text:")

output_1 = run_pipeline( input_text_1)

input text:muslims loves their prophet muhammed


In [36]:
output_1

{'domain_label': 'muslims',
 'domain_score': 0.9989225268363953,
 'sentiment_label': 'positive',
 'sentiment_score': 0.9239600300788879,
 'discrimination_label': 'not hateful',
 'discrimination_score': 0.9917498826980591}

In [54]:
input_text_2 = input("input text:")

output_2 = run_pipeline( input_text_2)

input text:මුස්ලිම්වරු ඔවුන්ගේ අනාගතවක්තෘ මුහම්මද්ට ආදරෙයි


Downloading...
From (original): https://drive.google.com/uc?id=1--6GB3Ff81sILwtuvVTuAW3shGW_5VWC
From (redirected): https://drive.google.com/uc?id=1--6GB3Ff81sILwtuvVTuAW3shGW_5VWC&confirm=t&uuid=4bc00ac8-29e3-458b-a64d-c0f583a18df7
To: /content/drive/MyDrive/models/last-epoch-model-2024-03-17-01_00_32_1.pth
100%|██████████| 1.20G/1.20G [00:17<00:00, 69.1MB/s]
You are using a model of type xlm-roberta to instantiate a model of type roberta. This is not supported for all configurations of models and can yield errors.


In [55]:
output_2

{'domain_label': 'muslims',
 'domain_score': 0.9477148933517974,
 'discrimination_label': 'not discriminative',
 'discrimination_score': 0.016480498015880585}

In [56]:
input_text_3 = input("input text:")

output_3 = run_pipeline( input_text_3)

input text:முஸ்லீம்கள் தங்கள் தீர்க்கதரிசியை நேசிக்கிறார்கள்


You are using a model of type xlm-roberta to instantiate a model of type roberta. This is not supported for all configurations of models and can yield errors.


In [57]:
output_3

{'domain_label': 'muslims',
 'domain_score': 0.9295339941122466,
 'discrimination_label': 'not discriminative',
 'discrimination_score': 0.011930261738598347}