AEO / aeo_ex_generator /aeo_example_generator.py
ibibek's picture
Upload 26 files
720ee15
import os
import openai
import json
import rdflib
class ExampleGenerator:
def __init__(self):
self.ontologies = {}
self.ontology_files = []
self.rules = {}
def add_ontology(self, onto):
if onto in self.ontology_files:
raise ValueError("Ontology file already exists.")
else:
onto_data = self.get_ontology_file(onto)
if onto_data:
self.ontology_files.append(onto)
self.ontologies[onto] = self.get_ontology_file(onto)
self.rules[onto] = self.generate_rules(onto)
else:
raise ValueError("Ontology file error.")
def get_ontology_file(self,filename):
text = ""
if os.path.isfile(filename):
with open(filename,'r') as f:
text = f.read()
f.close()
return text
else:
raise ValueError("Invalid filename.")
def ChatGPTTextSplitter(self,text):
"""Splits text in smaller subblocks to feed to the LLM"""
prompt = f"""The total length of content that I want to send you is too large to send in only one piece.
For sending you that content, I will follow this rule:
[START PART 1/10]
this is the content of the part 1 out of 10 in total
[END PART 1/10]
Then you just answer: "Instructions Sent."
And when I tell you "ALL PARTS SENT", then you can continue processing the data and answering my requests.
"""
if type(text) == str:
textsize = 12000
blocksize = int(len(text) / textsize)
if blocksize > 0:
yield prompt
for b in range(1,blocksize+1):
if b < blocksize+1:
prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {b}/{blocksize} received" and wait for the next part.
[START PART {b}/{blocksize}]
{text[(b-1)*textsize:b*textsize]}
[END PART {b}/{blocksize}]
Remember not answering yet. Just acknowledge you received this part with the message "Part {b}/{blocksize} received" and wait for the next part.
"""
yield prompt
else:
prompt = f"""
[START PART {b}/{blocksize}]
{text[(b-1)*textsize:b*textsize]}
[END PART {b}/{blocksize}]
ALL PARTS SENT. Now you can continue processing the request.
"""
yield prompt
else:
yield text
elif type(text) == list:
yield prompt
for n,block in enumerate(text):
if n+1 < len(text):
prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {n+1}/{len(text)} received" and wait for the next part.
[START PART {n+1}/{len(text)}]
{text[n]}
[END PART {n+1}/{len(text)}]
Remember not answering yet. Just acknowledge you received this part with the message "Part {n+1}/{len(text)} received" and wait for the next part.
"""
yield prompt
else:
prompt = f"""
[START PART {n+1}/{len(text)}]
{text[n]}
[END PART {n+1}/{len(text)}]
ALL PARTS SENT. Now you can continue processing the request.
"""
yield prompt
def send_ontology(self):
ontology = ""
if len(self.ontologies) > 0:
for k,v in self.ontologies.items():
ontology+=v+"\n"
print("Sending Ontology in Parts")
for i in self.ChatGPTTextSplitter(ontology):
print(self.llm_api(i))
else:
raise ValueError("No loaded ontology to send.")
def llm_api(self,prompt,model="gpt-3.5-turbo"):
messages = [{
"role":"user",
"content":prompt
}]
res = openai.ChatCompletion.create(model=model,messages=messages,temperature=0)
return res.choices[0].message['content']
def generate_rules(self,onto=None):
engagement_actions = ['engagement:Access',
'engagement:Alert',
'engagement:Beacon',
'engagement:Deploy',
'engagement:Obfuscate',
'engagement:Respond'
]
engagement_objects = [
'engagement:Honeypot',
'engagement:Honeytoken',
'engagement:Breadcrumb',
'engagement:BreadcrumbTrail',
'engagement:LureObject',
'engagement:HoneyObject',
'engagement:Decoy',
'engagement:DataSource'
]
engagement_objectives = [
'objective:CommandAndControl',
'objective:CredentialAccess',
'objective:DevelopResource',
'objective:Discover',
'objective:EscalatePrivilege',
'objective:Evade',
'objective:Execute',
'objective:Exfilitrate',
'objective:GainInitialAccess',
'objective:Impact',
'objective:MoveLaterally',
'objective:Persist',
'objective:Reconnaissance',
'objective:Affect',
'objective:Collect',
'objective:Detect',
'objective:Direct',
'objective:Disrupt',
'objective:Elicit',
'objective:Expose',
'objective:Motivate',
'objective:Plan',
'objective:Prepare',
'objective:Prevent',
'objective:Reassure',
'objective:Analyze',
'objective:Deny',
'objective:ElicitBehavior',
'objective:Lure',
'objective:TimeSink',
'objective:Track',
'objective:Trap'
]
prefix_ns = {"engagement": "https://ontology.adversaryengagement.org/ae/engagement#",
"objective":"https://ontology.adversaryengagement.org/ae/objective#",
"role":"https://ontology.adversaryengagement.org/ae/role#",
"identity":"https://ontology.adversaryengagement.org/ae/identity#",
"uco-core prefix":"https://ontology.unifiedcyberontology.org/uco/core#",
"uco-types":"https://ontology.unifiedcyberontology.org/uco/types#",
"uco-role":"https://ontology.unifiedcyberontology.org/uco/role#"
}
ns_str =""
for k,v in prefix_ns.items():
ns_str+=f"If namespace {k} prefix is used then {v}\n"
lookup = {"1":{"0":['Each'],
"1":['connects to']
}
}
v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology."""
structure = {'engagement:Narrative':{'engagement:hasStoryline':{"1":'engagement:Storyline'}
},
'engagement:Storyline':{'engagement:hasEvent':{"1":'uco-types:Thread'}
},
'uco-types:Thread':{'co:element':'contains all engagement:PlannedEvents',
'co:item':{"0":'uco-types:ThreadItem one each for each engagement:PlannedEvent'},
'co:size':"",
'uco-types:threadOriginItem':"is the uco-types:ThreadItem for the first engagement:PlannedEvent",
'uco-types:threadTerminalItem':"is the uco-types:ThreadItem for the last engagement:PlannedEvent"
},
'co:size':{'@type':'is xsd:nonNegativeInteger',
'@value':"which is the number of uco-types:ThreadItem"
},
'uco-types:ThreadItem':{'co:itemContent':'is the engagement:PlannedEvent',
'optional uco-types:threadNextItem':"is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one",
'optional uco-types:threadPreviousItem':'is the previous uco-types:ThreadItem for the previous'
},
'engagement:PlannedEvent':{'engagement:eventContext':"connects to one of the following engagement actions:"+"\n\t\t"+"\n\t\t".join(engagement_actions)
},
'engagement action':{'uco-core:performer':"",'uco-core:object': 'connects to one of the following engagement deception objects'+"\n\t\t"+"\n\t\t".join(engagement_objects)
},
'engagement deception object':{'engagement:hasCharacterization':{'1':'uco-core:UcoObject'},
'objective:hasObjective':'with @type objective:Objective and @id with one of the following instances:'+"\n\t\t"+"\n\t\t".join(engagement_objectives),
'uco-core:name':'is the objective'
},
'person':{'@type':'is uco-identity:Person',
'uco-core:hasFacet':{"1":{'connects to uco-identity:SimpleNameFacet':{'uco-identity:familyName':"",'uco-identity:givenName':""}
}
}
},
'uco-core:Role':{'@id':"is the role",'uco-core:name': 'is the role'
},
'uco-core:Role there is a uco-core:Relationship':{'uco-core:kindofRelationship':'is "has_Role"',
'uco-core:source':{"1":"the person who has the role"},
"uco-core:target":{"1":"uco-core:Role"}
},
'engagement:BreadcrumbTrail':{'engagement:hasBreadcrumb':{"1":{'uco-types:Thread':{'co:element':"contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail","co:size":"","co:item":"contains all uco-types:ThreadItem one each for each engagement:Breadcrumb","uco-types:threadOriginItem":"is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail","uco-types:threadTerminalItem":"is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail"}}
}},
'engagement:Breadcrumb':{'engagement:hasCharacterization':{"1":{'which connects to a uco-core:UcoObject which':{'uco-core:description':'which describes the object characterizing the breadcrumb'}},'uco-core:name': 'is the role'
}},
"class":{'@type': 'which is the class',
'@id': 'which is a unique identifier'},
"ns":ns_str
}
def get_list(struct,limiter="\n\t",skippre=False):
all_stat = []
for k,v in struct.items():
if k == "ns":
all_stat.append(v)
elif type(v)==dict:
look = "1"
if len(v) > 1:
plural = "has properties:"
else:
plural = "has property:"
if type(v)==dict:
statement = [" ".join([lookup[look]["0"][0],k,plural])]
if skippre:
statement = [" ".join([k,plural])]
for vk, vv in v.items():
statement.append(limiter)
statement.append(vk)
if type(vv)==dict:
for i in list(lookup.keys()):
if i in vv:
val = v[vk][i]
if type(val)==dict:
statement.append( get_list(val,limiter+"\t",skippre=True) )
else:
if not skippre:
statement.append(lookup[look]["1"][0])
statement.append(v[vk][i])
elif type(vv)==str:
statement.append(v[vk])
val = " ".join(statement)
all_stat.append(val)
return "\n".join(all_stat)
v = get_list(structure)
return v
def generate_rule(self,onto=None):
"""Raw rule string of AEO."""
v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology.
Each engagement:Narrative has property:
engagement:hasStoryline connects to an engagement:Storyline
Each engagement:Storyline has property:
engagement:hasEvent connects to a uco-types:Thread
Each uco-types:Thread has properties:
co:element contains all engagement:PlannedEvents
co:item contains all uco-types:ThreadItem one each for each engagement:PlannedEvent.
co:size
uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:PlannedEvent
uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:PlannedEvent
Each co:size has properties:
@type as xsd:nonNegativeInteger
@value which is the number of uco-types:ThreadItem
Each uco-types:ThreadItem has property:
co:itemContent is the engagement:PlannedEvent
optional uco-types:threadNextItem is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one,
optional uco-types:threadPreviousItem is the previous uco-types:ThreadItem for the previous engagement:PlannedEvent if there is one
Each engagement:PlannedEvent has property:
engagement:eventContext connects to one of the following engagement actions:
engagement:Access
engagement:Alert
engagement:Beacon
engagement:Deploy
engagement:Obfuscate
engagement:Respond
Each engagement action has properties:
uco-core:performer
uco-core:object which is the object which the action is applied to
Each engagement action has property:
uco-core:object connects to one of the following engagement deception objects:
engagement:Honeypot
engagement:Honeytoken
engagement:Breadcrumb
engagement:BreadcrumbTrail
engagement:LureObject
engagement:HoneyObject
engagement:Decoy
engagement:DataSource
Each engagement deception object has properties:
engagement:hasCharacterization connects to a uco-core:UcoObject
objective:hasObjective with @type objective:Objective and @id with one of the following instances:
objective:CommandAndControl
objective:CredentialAccess
objective:DevelopResource
objective:Discover
objective:EscalatePrivilege
objective:Evade
objective:Execute
objective:Exfilitrate
objective:GainInitialAccess
objective:Impact
objective:MoveLaterally
objective:Persist
objective:Reconnaissance
objective:Affect
objective:Collect
objective:Detect
objective:Direct
objective:Disrupt
objective:Elicit
objective:Expose
objective:Motivate
objective:Plan
objective:Prepare
objective:Prevent
objective:Reassure
objective:Analyze
objective:Deny
objective:ElicitBehavior
objective:Lure
objective:TimeSink
objective:Track
objective:Trap
uco-core:name is the objective
All people have property:
@type is uco-identity:Person
uco-core:hasFacet that connects to one of the following:
uco-identity:SimpleNameFacet which has the property:
uco-identity:familyName
uco-identity:givenName
Each uco-core:Role has properties:
@id is the role
uco-core:name is the role
Each uco-core:Role there is a uco-core:Relationship with properties:
uco-core:kindofRelationship is "has_Role"
uco-core:source connects to the person who has the role
uco-core:target connects to uco-core:Role
Each engagement:BreadcrumbTrail has property:
engagement:hasBreadcrumb connects to uco-types:Thread
This uco-types:Thread has property:
co:element contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail
co:item contains all uco-types:ThreadItem one each for each engagement:Breadcrumb
co:size
uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail
uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail
Each engagement:Breadcrumb has the properties:
engagement:hasCharacterization which connects to a uco-core:UcoObject with the property:
uco-core:description which describes the object characterizing the breadcrumb
All classes must include property:
@type is the class
@id is a unique identifier
If namespace engagement prefix is used then https://ontology.adversaryengagement.org/ae/engagement#
If namespace objective prefix is used then https://ontology.adversaryengagement.org/ae/objective#
If namespace role prefix is used then https://ontology.adversaryengagement.org/ae/role#
If namespace identity prefix is used then https://ontology.adversaryengagement.org/ae/identity#
If namespace uco-core prefix is used then https://ontology.unifiedcyberontology.org/uco/core#
If namespace uco-types prefix is used then https://ontology.unifiedcyberontology.org/uco/types#
If namespace uco-role prefix is used then https://ontology.unifiedcyberontology.org/uco/role#
"""
return v
def generate_continue(self):
v = """
continue
"""
return v
def raw_prompt(self,description):
def run(val):
prompt = f"""Give me a full json-ld format example for the following scenario:
{description}
{"".join(val)}
"""
for i in self.ChatGPTTextSplitter(prompt):
res = self.llm_api(i)
return res
# return json.loads(res)
res_val = run(self.generate_rules())
try:
val = json.loads(res_val)
return val
except:
#the response was cut off, prompt for the continuation.
data = []
data.append(res_val)
while True:
res = self.llm_api(self.generate_continue())
data.append(res)
try:
full = "".join(data)
return json.loads(full)
except:
pass
return None
def check_for_nested(self,jsonObject):
try:
for k,v in jsonObject.items():
if type(v) == dict:
return True
except:
pass
return False
def recursive_typelist(self,obj,ls):
if type(obj)!=dict:
return ls
obj_type = obj['@type'].split(":")[0]
if obj_type not in ls:
ls.append(obj_type)
for k,v in obj.items():
if self.check_for_nested(v):
return_ls = self.recursive_typelist(v,ls)
for re in return_ls:
if re not in ls:
ls.append(re)
return ls
def prompt(self,description):
res = self.raw_prompt(description)
#include only relevent namespaces
type_list = []
try:
for k in res['@graph']:
ns = k['@type'].split(":")[0]
if ns not in type_list:
type_list.append(ns)
type_list = self.recursive_typelist(k,type_list)
new_prefixes = {}
for k,v in res['@context'].items():
if k in type_list:
new_prefixes[k] = v
res['@context'] = new_prefixes
except:
return res
return res