Gateston Johns
first real commit
9041389
raw
history blame contribute delete
No virus
4.01 kB
import logging
import os
from typing import Optional
import neo4j
from domain.entity_d import (
EntityD,
EntityKnowledgeGraphD,
EntityRelationshipD,
RelationshipD,
)
class Neo4jError(Exception):
...
class Neo4jDomainDAO:
""" To be used with a context manager to ensure the connection is closed after use. """
def __enter__(self):
uri = os.environ.get("NEO4J_URI", "")
user = os.environ.get("NEO4J_USER", "")
password = os.environ.get("NEO4J_PASSWORD", "")
if not uri:
raise ValueError("NEO4J_URI environment variable not set")
if not user:
raise ValueError("NEO4J_USER environment variable not set")
if not password:
raise ValueError("NEO4J_PASSWORD environment variable not set")
try:
self.driver = neo4j.GraphDatabase.driver(uri, auth=(user, password))
self.driver.verify_connectivity()
except Exception as e:
logging.error(f"Failed to connect to Neo4j: {e}")
raise Neo4jError("Failed to connect to Neo4j")
return self
def insert(self, knowledge_graph: EntityKnowledgeGraphD, pdf_file: str = ""):
for entity_relationship in knowledge_graph.entity_relationships:
create_cmds = entity_relationship.neo4j_create_cmds
create_cmds_args = entity_relationship.neo4j_create_args
for create_cmd, args in zip(create_cmds, create_cmds_args):
args['pdf_file'] = pdf_file
try:
self.driver.execute_query(
create_cmd, # type: ignore
parameters_=args, # type: ignore
database_='neo4j') # type: ignore
except Exception as e:
logging.warning(
f"Failed to insert entity relationship: {entity_relationship} due to {e}")
def query(self, query, query_args):
return self.driver.execute_query(query, parameters_=query_args,
database_='neo4j') # type: ignore
def get_knowledge_graph(self) -> Optional[EntityKnowledgeGraphD]:
records = [] #list[dict[str, Neo4jDict]]
try:
records, _, _ = self.driver.execute_query("MATCH (from:Entity) -[r:Relationship]-> (to:Entity) RETURN from, properties(r), to", database_='neo4j') # type: ignore
except Exception as e:
logging.exception(e)
return None
entity_relationships = []
for record in records:
er_dict = record.data()
from_args = er_dict['from']
from_entity = EntityD(entity_id='', entity_name=from_args['name'])
to_args = er_dict['to']
to_entity = EntityD(entity_id='', entity_name=to_args['name'])
relationship_args = er_dict['properties(r)']
relationship = RelationshipD(relationship_id='',
start_date=relationship_args['start_date'],
end_date=relationship_args['end_date'],
source_text=relationship_args['source_text'],
predicted_movement=RelationshipD.from_string(
relationship_args['predicted_movement']))
entity_relationships.append(
EntityRelationshipD(from_entity=from_entity,
relationship=relationship,
to_entity=to_entity))
return EntityKnowledgeGraphD(entity_relationships=entity_relationships)
def __exit__(self, exception_type, exception_value, traceback):
if traceback:
logging.error("Neo4jDomainDAO error: %s | %s | %s",
exception_type,
exception_value,
traceback)
self.driver.close()