threat-extraction / threat_extraction.py
smellslikeml
initial commit
5d2d256
raw
history blame
3.45 kB
import json
import openai
import requests
from os import environ as env
class FirewallManager:
def update_rule(self, ip: str, action: str):
# This is a placeholder. You would implement this to interact with your actual firewall.
print(f"Updated firewall rule for IP {ip} with action {action}")
class PacketFilter:
def drop_packet(self, ip: str):
# This is a placeholder. You would implement this to interact with your actual packet filter.
print(f"Dropped packet from IP {ip}")
class ThreatIntelExtractorTool(Tool):
name = "threat_intel_extractor_tool"
description = """
This tool scrapes a hypothetical threat intelligence feed, uses OpenAI API to extract structured information, and takes defensive actions based on the information.
Input is a URL of threat intel feed. Output is a structured response as a string with threat information.
"""
inputs = ["text"]
outputs = ["text"]
def __init__(self, firewall_manager: FirewallManager, packet_filter: PacketFilter):
self.openai_api_key = env.get("OPENAI_API_KEY")
openai.api_key = self.openai_api_key
self.firewall_manager = firewall_manager
self.packet_filter = packet_filter
def __call__(self, threat_intel_feed_url: str):
# Scrape threat intelligence feed
response = requests.get(threat_intel_feed_url)
threat_info_raw = response.text
# Send data to OpenAI API for text extraction
example_json = {
"Threats": [
{"Threat": "Threat 1", "IP": "192.0.2.0", "Description": "This is a hypothetical threat."},
{"Threat": "Threat 2", "IP": "192.0.2.1", "Description": "This is another hypothetical threat."}
]
}
prompt = f"Extract structured information from the following threat intelligence:\n{threat_info_raw}\nExample of the expected format:\n{json.dumps(example_json, indent=2)}"
extraction_response = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=100)
# Format extracted information into a structured response
extracted_info = self.format_extraction(extraction_response.choices[0].text.strip())
# Take defensive actions based on the extracted information
self.take_defensive_actions(extracted_info)
return extracted_info
def format_extraction(self, extraction: str) -> dict:
# This method would depend on the format of the extracted information
# For this example, let's assume the extraction is a list of threats, each one formatted as "Threat: <threat>, IP: <ip>, Description: <description>"
structured_info = []
for line in extraction.split('\n'):
parts = line.split(',')
structured_info.append({
'Threat': parts[0].split(':')[1].strip(),
'IP': parts[1].split(':')[1].strip(),
'Description': parts[2].split(':')[1].strip(),
})
return json.dumps({'Threats': structured_info})
def take_defensive_actions(self, threat_info: dict):
for threat in threat_info['Threats']:
ip = threat['IP']
# For the purposes of this example, let's assume that we want to block all traffic from the IP and drop any incoming packets.
self.firewall_manager.update_rule(ip, 'block')
self.packet_filter.drop_packet(ip)