|
import os |
|
import openai |
|
import json |
|
import rdflib |
|
|
|
|
|
class ExampleGenerator: |
|
def __init__(self): |
|
self.ontologies = {} |
|
self.ontology_files = [] |
|
self.rules = {} |
|
def add_ontology(self, onto): |
|
if onto in self.ontology_files: |
|
raise ValueError("Ontology file already exists.") |
|
else: |
|
onto_data = self.get_ontology_file(onto) |
|
if onto_data: |
|
self.ontology_files.append(onto) |
|
self.ontologies[onto] = self.get_ontology_file(onto) |
|
self.rules[onto] = self.generate_rules(onto) |
|
else: |
|
raise ValueError("Ontology file error.") |
|
def get_ontology_file(self,filename): |
|
text = "" |
|
if os.path.isfile(filename): |
|
with open(filename,'r') as f: |
|
text = f.read() |
|
f.close() |
|
return text |
|
else: |
|
raise ValueError("Invalid filename.") |
|
def ChatGPTTextSplitter(self,text): |
|
"""Splits text in smaller subblocks to feed to the LLM""" |
|
prompt = f"""The total length of content that I want to send you is too large to send in only one piece. |
|
|
|
For sending you that content, I will follow this rule: |
|
|
|
[START PART 1/10] |
|
this is the content of the part 1 out of 10 in total |
|
[END PART 1/10] |
|
|
|
Then you just answer: "Instructions Sent." |
|
|
|
And when I tell you "ALL PARTS SENT", then you can continue processing the data and answering my requests. |
|
""" |
|
if type(text) == str: |
|
textsize = 12000 |
|
blocksize = int(len(text) / textsize) |
|
if blocksize > 0: |
|
yield prompt |
|
|
|
for b in range(1,blocksize+1): |
|
if b < blocksize+1: |
|
prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {b}/{blocksize} received" and wait for the next part. |
|
[START PART {b}/{blocksize}] |
|
{text[(b-1)*textsize:b*textsize]} |
|
[END PART {b}/{blocksize}] |
|
Remember not answering yet. Just acknowledge you received this part with the message "Part {b}/{blocksize} received" and wait for the next part. |
|
""" |
|
yield prompt |
|
else: |
|
prompt = f""" |
|
[START PART {b}/{blocksize}] |
|
{text[(b-1)*textsize:b*textsize]} |
|
[END PART {b}/{blocksize}] |
|
ALL PARTS SENT. Now you can continue processing the request. |
|
""" |
|
yield prompt |
|
else: |
|
yield text |
|
elif type(text) == list: |
|
yield prompt |
|
|
|
for n,block in enumerate(text): |
|
if n+1 < len(text): |
|
prompt = f"""Do not answer yet. This is just another part of the text I want to send you. Just receive and acknowledge as "Part {n+1}/{len(text)} received" and wait for the next part. |
|
[START PART {n+1}/{len(text)}] |
|
{text[n]} |
|
[END PART {n+1}/{len(text)}] |
|
Remember not answering yet. Just acknowledge you received this part with the message "Part {n+1}/{len(text)} received" and wait for the next part. |
|
""" |
|
yield prompt |
|
else: |
|
prompt = f""" |
|
[START PART {n+1}/{len(text)}] |
|
{text[n]} |
|
[END PART {n+1}/{len(text)}] |
|
ALL PARTS SENT. Now you can continue processing the request. |
|
""" |
|
yield prompt |
|
|
|
def send_ontology(self): |
|
ontology = "" |
|
if len(self.ontologies) > 0: |
|
for k,v in self.ontologies.items(): |
|
ontology+=v+"\n" |
|
print("Sending Ontology in Parts") |
|
for i in self.ChatGPTTextSplitter(ontology): |
|
print(self.llm_api(i)) |
|
else: |
|
raise ValueError("No loaded ontology to send.") |
|
def llm_api(self,prompt,model="gpt-3.5-turbo"): |
|
messages = [{ |
|
"role":"user", |
|
"content":prompt |
|
}] |
|
res = openai.ChatCompletion.create(model=model,messages=messages,temperature=0) |
|
return res.choices[0].message['content'] |
|
|
|
def generate_rules(self,onto=None): |
|
engagement_actions = ['engagement:Access', |
|
'engagement:Alert', |
|
'engagement:Beacon', |
|
'engagement:Deploy', |
|
'engagement:Obfuscate', |
|
'engagement:Respond' |
|
] |
|
engagement_objects = [ |
|
'engagement:Honeypot', |
|
'engagement:Honeytoken', |
|
'engagement:Breadcrumb', |
|
'engagement:BreadcrumbTrail', |
|
'engagement:LureObject', |
|
'engagement:HoneyObject', |
|
'engagement:Decoy', |
|
'engagement:DataSource' |
|
] |
|
engagement_objectives = [ |
|
'objective:CommandAndControl', |
|
'objective:CredentialAccess', |
|
'objective:DevelopResource', |
|
'objective:Discover', |
|
'objective:EscalatePrivilege', |
|
'objective:Evade', |
|
'objective:Execute', |
|
'objective:Exfilitrate', |
|
'objective:GainInitialAccess', |
|
'objective:Impact', |
|
'objective:MoveLaterally', |
|
'objective:Persist', |
|
'objective:Reconnaissance', |
|
'objective:Affect', |
|
'objective:Collect', |
|
'objective:Detect', |
|
'objective:Direct', |
|
'objective:Disrupt', |
|
'objective:Elicit', |
|
'objective:Expose', |
|
'objective:Motivate', |
|
'objective:Plan', |
|
'objective:Prepare', |
|
'objective:Prevent', |
|
'objective:Reassure', |
|
'objective:Analyze', |
|
'objective:Deny', |
|
'objective:ElicitBehavior', |
|
'objective:Lure', |
|
'objective:TimeSink', |
|
'objective:Track', |
|
'objective:Trap' |
|
] |
|
prefix_ns = {"engagement": "https://ontology.adversaryengagement.org/ae/engagement#", |
|
"objective":"https://ontology.adversaryengagement.org/ae/objective#", |
|
"role":"https://ontology.adversaryengagement.org/ae/role#", |
|
"identity":"https://ontology.adversaryengagement.org/ae/identity#", |
|
"uco-core prefix":"https://ontology.unifiedcyberontology.org/uco/core#", |
|
"uco-types":"https://ontology.unifiedcyberontology.org/uco/types#", |
|
"uco-role":"https://ontology.unifiedcyberontology.org/uco/role#" |
|
} |
|
ns_str ="" |
|
for k,v in prefix_ns.items(): |
|
ns_str+=f"If namespace {k} prefix is used then {v}\n" |
|
lookup = {"1":{"0":['Each'], |
|
"1":['connects to'] |
|
} |
|
} |
|
v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology.""" |
|
|
|
structure = {'engagement:Narrative':{'engagement:hasStoryline':{"1":'engagement:Storyline'} |
|
}, |
|
'engagement:Storyline':{'engagement:hasEvent':{"1":'uco-types:Thread'} |
|
}, |
|
'uco-types:Thread':{'co:element':'contains all engagement:PlannedEvents', |
|
'co:item':{"0":'uco-types:ThreadItem one each for each engagement:PlannedEvent'}, |
|
'co:size':"", |
|
'uco-types:threadOriginItem':"is the uco-types:ThreadItem for the first engagement:PlannedEvent", |
|
'uco-types:threadTerminalItem':"is the uco-types:ThreadItem for the last engagement:PlannedEvent" |
|
}, |
|
'co:size':{'@type':'is xsd:nonNegativeInteger', |
|
'@value':"which is the number of uco-types:ThreadItem" |
|
}, |
|
'uco-types:ThreadItem':{'co:itemContent':'is the engagement:PlannedEvent', |
|
'optional uco-types:threadNextItem':"is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one", |
|
'optional uco-types:threadPreviousItem':'is the previous uco-types:ThreadItem for the previous' |
|
}, |
|
'engagement:PlannedEvent':{'engagement:eventContext':"connects to one of the following engagement actions:"+"\n\t\t"+"\n\t\t".join(engagement_actions) |
|
}, |
|
'engagement action':{'uco-core:performer':"",'uco-core:object': 'connects to one of the following engagement deception objects'+"\n\t\t"+"\n\t\t".join(engagement_objects) |
|
}, |
|
'engagement deception object':{'engagement:hasCharacterization':{'1':'uco-core:UcoObject'}, |
|
'objective:hasObjective':'with @type objective:Objective and @id with one of the following instances:'+"\n\t\t"+"\n\t\t".join(engagement_objectives), |
|
'uco-core:name':'is the objective' |
|
}, |
|
'person':{'@type':'is uco-identity:Person', |
|
'uco-core:hasFacet':{"1":{'connects to uco-identity:SimpleNameFacet':{'uco-identity:familyName':"",'uco-identity:givenName':""} |
|
} |
|
} |
|
}, |
|
'uco-core:Role':{'@id':"is the role",'uco-core:name': 'is the role' |
|
}, |
|
'uco-core:Role there is a uco-core:Relationship':{'uco-core:kindofRelationship':'is "has_Role"', |
|
'uco-core:source':{"1":"the person who has the role"}, |
|
"uco-core:target":{"1":"uco-core:Role"} |
|
}, |
|
'engagement:BreadcrumbTrail':{'engagement:hasBreadcrumb':{"1":{'uco-types:Thread':{'co:element':"contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail","co:size":"","co:item":"contains all uco-types:ThreadItem one each for each engagement:Breadcrumb","uco-types:threadOriginItem":"is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail","uco-types:threadTerminalItem":"is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail"}} |
|
}}, |
|
'engagement:Breadcrumb':{'engagement:hasCharacterization':{"1":{'which connects to a uco-core:UcoObject which':{'uco-core:description':'which describes the object characterizing the breadcrumb'}},'uco-core:name': 'is the role' |
|
}}, |
|
"class":{'@type': 'which is the class', |
|
'@id': 'which is a unique identifier'}, |
|
"ns":ns_str |
|
|
|
} |
|
|
|
def get_list(struct,limiter="\n\t",skippre=False): |
|
all_stat = [] |
|
for k,v in struct.items(): |
|
if k == "ns": |
|
all_stat.append(v) |
|
elif type(v)==dict: |
|
look = "1" |
|
if len(v) > 1: |
|
plural = "has properties:" |
|
else: |
|
plural = "has property:" |
|
if type(v)==dict: |
|
statement = [" ".join([lookup[look]["0"][0],k,plural])] |
|
if skippre: |
|
statement = [" ".join([k,plural])] |
|
for vk, vv in v.items(): |
|
statement.append(limiter) |
|
statement.append(vk) |
|
if type(vv)==dict: |
|
for i in list(lookup.keys()): |
|
if i in vv: |
|
val = v[vk][i] |
|
if type(val)==dict: |
|
statement.append( get_list(val,limiter+"\t",skippre=True) ) |
|
else: |
|
if not skippre: |
|
statement.append(lookup[look]["1"][0]) |
|
statement.append(v[vk][i]) |
|
elif type(vv)==str: |
|
statement.append(v[vk]) |
|
val = " ".join(statement) |
|
all_stat.append(val) |
|
return "\n".join(all_stat) |
|
|
|
v = get_list(structure) |
|
return v |
|
def generate_rule(self,onto=None): |
|
"""Raw rule string of AEO.""" |
|
v = """Remember make a json-ld format example that only uses classes and properties terms from Adversary Engagement Ontology, Unified Cyber Ontology. |
|
|
|
Each engagement:Narrative has property: |
|
engagement:hasStoryline connects to an engagement:Storyline |
|
Each engagement:Storyline has property: |
|
engagement:hasEvent connects to a uco-types:Thread |
|
Each uco-types:Thread has properties: |
|
co:element contains all engagement:PlannedEvents |
|
co:item contains all uco-types:ThreadItem one each for each engagement:PlannedEvent. |
|
co:size |
|
uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:PlannedEvent |
|
uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:PlannedEvent |
|
Each co:size has properties: |
|
@type as xsd:nonNegativeInteger |
|
@value which is the number of uco-types:ThreadItem |
|
Each uco-types:ThreadItem has property: |
|
co:itemContent is the engagement:PlannedEvent |
|
optional uco-types:threadNextItem is the next uco-types:ThreadItem for the next engagement:PlannedEvent if there is one, |
|
optional uco-types:threadPreviousItem is the previous uco-types:ThreadItem for the previous engagement:PlannedEvent if there is one |
|
Each engagement:PlannedEvent has property: |
|
engagement:eventContext connects to one of the following engagement actions: |
|
engagement:Access |
|
engagement:Alert |
|
engagement:Beacon |
|
engagement:Deploy |
|
engagement:Obfuscate |
|
engagement:Respond |
|
Each engagement action has properties: |
|
uco-core:performer |
|
uco-core:object which is the object which the action is applied to |
|
Each engagement action has property: |
|
uco-core:object connects to one of the following engagement deception objects: |
|
engagement:Honeypot |
|
engagement:Honeytoken |
|
engagement:Breadcrumb |
|
engagement:BreadcrumbTrail |
|
engagement:LureObject |
|
engagement:HoneyObject |
|
engagement:Decoy |
|
engagement:DataSource |
|
Each engagement deception object has properties: |
|
engagement:hasCharacterization connects to a uco-core:UcoObject |
|
objective:hasObjective with @type objective:Objective and @id with one of the following instances: |
|
objective:CommandAndControl |
|
objective:CredentialAccess |
|
objective:DevelopResource |
|
objective:Discover |
|
objective:EscalatePrivilege |
|
objective:Evade |
|
objective:Execute |
|
objective:Exfilitrate |
|
objective:GainInitialAccess |
|
objective:Impact |
|
objective:MoveLaterally |
|
objective:Persist |
|
objective:Reconnaissance |
|
objective:Affect |
|
objective:Collect |
|
objective:Detect |
|
objective:Direct |
|
objective:Disrupt |
|
objective:Elicit |
|
objective:Expose |
|
objective:Motivate |
|
objective:Plan |
|
objective:Prepare |
|
objective:Prevent |
|
objective:Reassure |
|
objective:Analyze |
|
objective:Deny |
|
objective:ElicitBehavior |
|
objective:Lure |
|
objective:TimeSink |
|
objective:Track |
|
objective:Trap |
|
uco-core:name is the objective |
|
All people have property: |
|
@type is uco-identity:Person |
|
uco-core:hasFacet that connects to one of the following: |
|
uco-identity:SimpleNameFacet which has the property: |
|
uco-identity:familyName |
|
uco-identity:givenName |
|
Each uco-core:Role has properties: |
|
@id is the role |
|
uco-core:name is the role |
|
Each uco-core:Role there is a uco-core:Relationship with properties: |
|
uco-core:kindofRelationship is "has_Role" |
|
uco-core:source connects to the person who has the role |
|
uco-core:target connects to uco-core:Role |
|
Each engagement:BreadcrumbTrail has property: |
|
engagement:hasBreadcrumb connects to uco-types:Thread |
|
This uco-types:Thread has property: |
|
co:element contains all engagement:Breadcrumb that belong to this engagement:BreadcrumbTrail |
|
co:item contains all uco-types:ThreadItem one each for each engagement:Breadcrumb |
|
co:size |
|
uco-types:threadOriginItem is the uco-types:ThreadItem for the first engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail |
|
uco-types:threadTerminalItem is the uco-types:ThreadItem for the last engagement:Breadcrumb belonging to this engagement:BreadcrumbTrail |
|
Each engagement:Breadcrumb has the properties: |
|
engagement:hasCharacterization which connects to a uco-core:UcoObject with the property: |
|
uco-core:description which describes the object characterizing the breadcrumb |
|
All classes must include property: |
|
@type is the class |
|
@id is a unique identifier |
|
|
|
If namespace engagement prefix is used then https://ontology.adversaryengagement.org/ae/engagement# |
|
If namespace objective prefix is used then https://ontology.adversaryengagement.org/ae/objective# |
|
If namespace role prefix is used then https://ontology.adversaryengagement.org/ae/role# |
|
If namespace identity prefix is used then https://ontology.adversaryengagement.org/ae/identity# |
|
If namespace uco-core prefix is used then https://ontology.unifiedcyberontology.org/uco/core# |
|
If namespace uco-types prefix is used then https://ontology.unifiedcyberontology.org/uco/types# |
|
If namespace uco-role prefix is used then https://ontology.unifiedcyberontology.org/uco/role# |
|
""" |
|
return v |
|
|
|
def generate_continue(self): |
|
v = """ |
|
continue |
|
""" |
|
return v |
|
|
|
def raw_prompt(self,description): |
|
|
|
def run(val): |
|
prompt = f"""Give me a full json-ld format example for the following scenario: |
|
{description} |
|
|
|
{"".join(val)} |
|
""" |
|
for i in self.ChatGPTTextSplitter(prompt): |
|
res = self.llm_api(i) |
|
return res |
|
|
|
res_val = run(self.generate_rules()) |
|
try: |
|
val = json.loads(res_val) |
|
return val |
|
except: |
|
|
|
data = [] |
|
data.append(res_val) |
|
while True: |
|
res = self.llm_api(self.generate_continue()) |
|
data.append(res) |
|
try: |
|
full = "".join(data) |
|
return json.loads(full) |
|
except: |
|
pass |
|
|
|
return None |
|
|
|
def check_for_nested(self,jsonObject): |
|
try: |
|
for k,v in jsonObject.items(): |
|
if type(v) == dict: |
|
return True |
|
except: |
|
pass |
|
return False |
|
|
|
def recursive_typelist(self,obj,ls): |
|
if type(obj)!=dict: |
|
return ls |
|
obj_type = obj['@type'].split(":")[0] |
|
if obj_type not in ls: |
|
ls.append(obj_type) |
|
for k,v in obj.items(): |
|
if self.check_for_nested(v): |
|
return_ls = self.recursive_typelist(v,ls) |
|
for re in return_ls: |
|
if re not in ls: |
|
ls.append(re) |
|
return ls |
|
|
|
|
|
def prompt(self,description): |
|
res = self.raw_prompt(description) |
|
|
|
|
|
type_list = [] |
|
try: |
|
for k in res['@graph']: |
|
ns = k['@type'].split(":")[0] |
|
if ns not in type_list: |
|
type_list.append(ns) |
|
type_list = self.recursive_typelist(k,type_list) |
|
new_prefixes = {} |
|
for k,v in res['@context'].items(): |
|
if k in type_list: |
|
new_prefixes[k] = v |
|
res['@context'] = new_prefixes |
|
except: |
|
return res |
|
return res |
|
|