暗网情报收集技术#
技术介绍#
暗网情报收集是指通过合法的OSINT(开源情报)技术,收集和分析暗网中的公开信息。这项技术主要用于安全研究、威胁情报收集和网络安全防护,必须在法律框架内进行。
主要功能#
- Tor网络访问
- 暗网市场监控
- 威胁情报收集
- 泄露数据分析
- 论坛信息收集
- 安全研究
- 风险评估
- 报告生成
适用场景#
- 威胁情报收集
- 安全研究
- 网络安全防护
- 合规性检查
- 风险评估
- 安全监控
入门级使用#
Tor网络基础#
安装Tor浏览器#
# 下载Tor浏览器
# 访问 https://www.torproject.org/download/
# Linux系统
wget https://www.torproject.org/dist/torbrowser/13.0.8/tor-browser-linux64-13.0.8_en-US.tar.xz
tar -xf tor-browser-linux64-13.0.8_en-US.tar.xz
cd tor-browser_en-US
./start-tor-browser.desktop
# Windows系统
# 下载并运行安装程序
# macOS系统
# 下载DMG文件并安装配置Tor网络#
# 安装Tor服务
sudo apt-get install tor
# 配置Tor
sudo nano /etc/tor/torrc
# 添加以下配置
SocksPort 9050
ControlPort 9051
DataDirectory /var/lib/tor
ExitNodes {us}
# 重启Tor服务
sudo systemctl restart tor
# 验证Tor状态
sudo systemctl status tor基本访问#
访问.onion网站#
# 使用Tor浏览器访问
# 在Tor浏览器地址栏输入.onion地址
# 使用curl通过Tor代理
curl --socks5 127.0.0.1:9050 http://example.onion
# 使用torsocks
sudo apt-get install torsocks
torsocks curl http://example.onionDNS查询#
# 通过Tor进行DNS查询
torsocks nslookup example.onion
# 使用dig
torsocks dig example.onion
# 使用Python进行DNS查询
import socket
import socks
socks.set_default_proxy(socks.SOCKS5, "127.0.0.1", 9050)
socket.socket = socks.socksocket
print(socket.gethostbyname("example.onion"))初级使用#
暗网目录#
常用暗网目录#
# 访问暗网目录
# 注意:这些地址可能会变化
# The Hidden Wiki
# http://thehiddenwiki.org/
# Onion Link Directory
# http://onionlinks.net/
# Deep Web Directory
# http://deepweblinks.com/
# 注意:访问这些网站时要注意安全
# 不要下载可疑文件
# 不要提供个人信息目录使用技巧#
# 1. 验证网站信誉
# - 查看用户评价
# - 检查网站年龄
# - 避免新网站
# 2. 使用虚拟机
# - 在隔离环境中访问
# - 使用VPN + Tor
# - 定期快照
# 3. 禁用JavaScript
# - Tor浏览器设置
# - Security Level: Safest
# 4. 使用专用配置文件
# - 创建独立的Tor浏览器配置
# - 不保存浏览历史信息收集基础#
论坛信息收集#
#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup
def scrape_forum(forum_url):
"""收集论坛信息"""
# 通过Tor代理访问
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
response = requests.get(forum_url, proxies=proxies, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# 提取帖子标题
titles = soup.find_all('h2', class_='post-title')
for title in titles:
print(title.text.strip())
# 提取发布时间
times = soup.find_all('span', class_='post-time')
for time in times:
print(time.text.strip())
except Exception as e:
print(f"Error: {e}")
# 使用示例
scrape_forum('http://example-forum.onion')市场信息收集#
#!/usr/bin/env python3
import requests
import json
def collect_market_data(market_url):
"""收集市场数据"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
response = requests.get(market_url, proxies=proxies, timeout=30)
# 解析JSON数据
data = response.json()
# 提取商品信息
products = data.get('products', [])
for product in products:
print(f"商品: {product['name']}")
print(f"价格: {product['price']}")
print(f"卖家: {product['seller']}")
print()
except Exception as e:
print(f"Error: {e}")
# 使用示例
collect_market_data('http://example-market.onion/api/products')中级使用#
自动化收集#
定期监控脚本#
#!/bin/bash
# 暗网定期监控脚本
TARGET_URLS=(
"http://example-forum.onion"
"http://example-market.onion"
)
OUTPUT_DIR="./darkweb_monitoring"
LOG_FILE="monitoring.log"
# 创建输出目录
mkdir -p "$OUTPUT_DIR"
# 监控函数
monitor_url() {
local url=$1
local timestamp=$(date +%Y%m%d_%H%M%S)
local output_file="$OUTPUT_DIR/$(echo $url | sed 's|http://||' | sed 's|/||g')_$timestamp.html"
echo "$(date): 监控 $url" >> "$LOG_FILE"
# 通过Tor下载页面
curl --socks5 127.0.0.1:9050 -o "$output_file" "$url" 2>&1 >> "$LOG_FILE"
# 检查变化
if [ -f "$OUTPUT_DIR/$(echo $url | sed 's|http://||' | sed 's|/||g')_previous.html" ]; then
diff "$OUTPUT_DIR/$(echo $url | sed 's|http://||' | sed 's|/||g')_previous.html" "$output_file" > "$OUTPUT_DIR/changes_$(date +%Y%m%d_%H%M%S).txt"
fi
# 保存当前版本
cp "$output_file" "$OUTPUT_DIR/$(echo $url | sed 's|http://||' | sed 's|/||g')_previous.html"
}
# 监控所有URL
for url in "${TARGET_URLS[@]}"; do
monitor_url "$url"
done
echo "$(date): 监控完成" >> "$LOG_FILE"数据提取脚本#
#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
def extract_data(url, output_file):
"""提取数据并保存"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
response = requests.get(url, proxies=proxies, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# 提取数据
data = {
'url': url,
'timestamp': datetime.now().isoformat(),
'titles': [t.text.strip() for t in soup.find_all('h2')],
'links': [a['href'] for a in soup.find_all('a', href=True)]
}
# 保存数据
with open(output_file, 'w') as f:
json.dump(data, f, indent=2)
print(f"数据已保存到 {output_file}")
except Exception as e:
print(f"Error: {e}")
# 使用示例
extract_data('http://example-forum.onion', 'forum_data.json')数据分析#
文本分析#
#!/usr/bin/env python3
import json
from collections import Counter
import re
def analyze_text(data_file):
"""分析文本数据"""
with open(data_file) as f:
data = json.load(f)
# 提取所有文本
text = ' '.join(data['titles'])
# 词频统计
words = re.findall(r'\b\w+\b', text.lower())
word_freq = Counter(words)
print("=== 词频统计 ===")
for word, count in word_freq.most_common(20):
print(f"{word}: {count}")
# 提取URL
urls = data['links']
print("\n=== URL统计 ===")
print(f"总URL数: {len(urls)}")
print(f"唯一URL数: {len(set(urls))}")
# 提取域名
domains = [url.split('/')[2] for url in urls if '://' in url]
domain_freq = Counter(domains)
print("\n=== 域名统计 ===")
for domain, count in domain_freq.most_common(10):
print(f"{domain}: {count}")
# 使用示例
analyze_text('forum_data.json')关系分析#
#!/usr/bin/env python3
import json
import networkx as nx
import matplotlib.pyplot as plt
def build_relationship_graph(data_file):
"""构建关系图"""
with open(data_file) as f:
data = json.load(f)
G = nx.Graph()
# 添加节点
for title in data['titles']:
words = title.split()
for word in words:
if len(word) > 3: # 忽略短词
G.add_node(word)
# 添加边(共现关系)
for title in data['titles']:
words = [w for w in title.split() if len(w) > 3]
for i in range(len(words)):
for j in range(i+1, len(words)):
G.add_edge(words[i], words[j])
return G
def visualize_graph(G, output_file='relationship_graph.png'):
"""可视化关系图"""
plt.figure(figsize=(12, 8))
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_size=3000,
node_color='lightblue', font_size=8)
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"关系图已保存到 {output_file}")
# 使用示例
G = build_relationship_graph('forum_data.json')
visualize_graph(G)中上级使用#
高级监控#
实时监控#
#!/usr/bin/env python3
import requests
import time
from datetime import datetime
import hashlib
def monitor_changes(url, interval=300):
"""实时监控变化"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
last_hash = None
while True:
try:
print(f"{datetime.now()}: 检查 {url}")
response = requests.get(url, proxies=proxies, timeout=30)
content = response.content
# 计算哈希
current_hash = hashlib.md5(content).hexdigest()
# 检查变化
if last_hash and current_hash != last_hash:
print(f"{datetime.now()}: 检测到变化!")
# 保存变化
with open(f"change_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html", 'wb') as f:
f.write(content)
last_hash = current_hash
# 等待下一次检查
time.sleep(interval)
except Exception as e:
print(f"Error: {e}")
time.sleep(60)
# 使用示例
monitor_changes('http://example-forum.onion', interval=300)多线程监控#
#!/usr/bin/env python3
import requests
import threading
from queue import Queue
def monitor_url(url, result_queue):
"""监控单个URL"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
response = requests.get(url, proxies=proxies, timeout=30)
result_queue.put((url, response.status_code, len(response.content)))
except Exception as e:
result_queue.put((url, str(e), 0))
def multi_thread_monitor(urls, num_threads=5):
"""多线程监控"""
result_queue = Queue()
threads = []
# 创建线程
for url in urls:
thread = threading.Thread(target=monitor_url, args=(url, result_queue))
thread.start()
threads.append(thread)
# 限制并发线程数
if len(threads) >= num_threads:
for t in threads:
t.join()
threads = []
# 等待所有线程完成
for t in threads:
t.join()
# 收集结果
while not result_queue.empty():
url, status, size = result_queue.get()
print(f"{url}: {status}, {size} bytes")
# 使用示例
urls = [
'http://example1.onion',
'http://example2.onion',
'http://example3.onion'
]
multi_thread_monitor(urls, num_threads=3)数据存储#
数据库存储#
#!/usr/bin/env python3
import sqlite3
import json
from datetime import datetime
def create_database(db_file):
"""创建数据库"""
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS darkweb_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
title TEXT,
content TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def insert_data(conn, url, title, content):
"""插入数据"""
cursor = conn.cursor()
cursor.execute('''
INSERT INTO darkweb_data (url, title, content)
VALUES (?, ?, ?)
''', (url, title, content))
conn.commit()
def query_data(conn, url_pattern=None):
"""查询数据"""
cursor = conn.cursor()
if url_pattern:
cursor.execute('''
SELECT * FROM darkweb_data
WHERE url LIKE ?
ORDER BY timestamp DESC
''', (f'%{url_pattern}%',))
else:
cursor.execute('''
SELECT * FROM darkweb_data
ORDER BY timestamp DESC
''')
return cursor.fetchall()
# 使用示例
conn = create_database('darkweb_data.db')
# 插入数据
insert_data(conn, 'http://example.onion', 'Example Title', 'Example Content')
# 查询数据
results = query_data(conn, 'example')
for row in results:
print(f"URL: {row[1]}")
print(f"Title: {row[2]}")
print(f"Timestamp: {row[4]}")
print()
conn.close()高级使用#
威胁情报分析#
恶意指标提取#
#!/usr/bin/env python3
import requests
import re
import json
def extract_iocs(content):
"""提取恶意指标"""
iocs = {
'domains': [],
'ips': [],
'urls': [],
'emails': [],
'hashes': []
}
# 提取域名
iocs['domains'] = re.findall(r'\b[a-zA-Z0-9-]+\.[a-zA-Z0-9-]+\.[a-zA-Z]{2,}\b', content)
# 提取IP地址
iocs['ips'] = re.findall(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', content)
# 提取URL
iocs['urls'] = re.findall(r'https?://[^\s<>"{}|\\^`\[\]]+', content)
# 提取电子邮件
iocs['emails'] = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', content)
# 提取哈希
iocs['hashes'] = re.findall(r'\b[a-fA-F0-9]{32,64}\b', content)
return iocs
def analyze_threat_intel(url):
"""分析威胁情报"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
response = requests.get(url, proxies=proxies, timeout=30)
iocs = extract_iocs(response.text)
# 保存IOC
with open('threat_iocs.json', 'w') as f:
json.dump(iocs, f, indent=2)
print("IOC提取完成:")
print(f"域名: {len(iocs['domains'])}")
print(f"IP地址: {len(iocs['ips'])}")
print(f"URL: {len(iocs['urls'])}")
print(f"电子邮件: {len(iocs['emails'])}")
print(f"哈希: {len(iocs['hashes'])}")
except Exception as e:
print(f"Error: {e}")
# 使用示例
analyze_threat_intel('http://example-forum.onion')趋势分析#
#!/usr/bin/env python3
import json
import matplotlib.pyplot as plt
from datetime import datetime
from collections import defaultdict
def analyze_trends(data_files):
"""分析趋势"""
trends = defaultdict(lambda: defaultdict(int))
for file in data_files:
with open(file) as f:
data = json.load(f)
# 提取时间戳
timestamp = datetime.strptime(data['timestamp'], '%Y-%m-%dT%H:%M:%S')
date_key = timestamp.strftime('%Y-%m-%d')
# 统计各类指标
for domain in data.get('domains', []):
trends['domains'][date_key] += 1
for ip in data.get('ips', []):
trends['ips'][date_key] += 1
for url in data.get('urls', []):
trends['urls'][date_key] += 1
return trends
def plot_trends(trends, output_file='trend_analysis.png'):
"""绘制趋势图"""
fig, axes = plt.subplots(3, 1, figsize=(12, 12))
# 域名趋势
dates = sorted(trends['domains'].keys())
domain_counts = [trends['domains'][d] for d in dates]
axes[0].plot(dates, domain_counts, color='blue')
axes[0].set_title('Domain Trend')
axes[0].set_ylabel('Count')
axes[0].tick_params(axis='x', rotation=45)
# IP趋势
ip_counts = [trends['ips'][d] for d in dates]
axes[1].plot(dates, ip_counts, color='red')
axes[1].set_title('IP Address Trend')
axes[1].set_ylabel('Count')
axes[1].tick_params(axis='x', rotation=45)
# URL趋势
url_counts = [trends['urls'][d] for d in dates]
axes[2].plot(dates, url_counts, color='green')
axes[2].set_title('URL Trend')
axes[2].set_ylabel('Count')
axes[2].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(output_file, dpi=300)
plt.close()
print(f"趋势图已保存到 {output_file}")
# 使用示例
data_files = ['threat_iocs_1.json', 'threat_iocs_2.json', 'threat_iocs_3.json']
trends = analyze_trends(data_files)
plot_trends(trends)大师级使用#
自动化系统#
分布式监控#
#!/usr/bin/env python3
import requests
import multiprocessing
import time
from datetime import datetime
def monitor_node(url, result_queue):
"""监控单个节点"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
try:
start_time = time.time()
response = requests.get(url, proxies=proxies, timeout=30)
end_time = time.time()
result = {
'url': url,
'status_code': response.status_code,
'response_time': end_time - start_time,
'content_length': len(response.content),
'timestamp': datetime.now().isoformat()
}
result_queue.put(result)
except Exception as e:
result_queue.put({
'url': url,
'error': str(e),
'timestamp': datetime.now().isoformat()
})
def distributed_monitor(urls, num_workers=4):
"""分布式监控"""
result_queue = multiprocessing.Queue()
processes = []
# 创建进程池
pool = multiprocessing.Pool(processes=num_workers)
# 启动监控
results = []
for url in urls:
result = pool.apply_async(monitor_node, (url, result_queue))
results.append(result)
# 收集结果
for result in results:
result.wait()
pool.close()
pool.join()
# 收集所有结果
all_results = []
while not result_queue.empty():
all_results.append(result_queue.get())
return all_results
# 使用示例
urls = [
'http://example1.onion',
'http://example2.onion',
'http://example3.onion',
'http://example4.onion'
]
results = distributed_monitor(urls, num_workers=4)
for result in results:
print(json.dumps(result, indent=2))智能分析#
#!/usr/bin/env python3
import json
import numpy as np
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer
def cluster_texts(texts, num_clusters=3):
"""聚类文本"""
# 特征提取
vectorizer = TfidfVectorizer(max_features=1000)
X = vectorizer.fit_transform(texts)
# 聚类
kmeans = KMeans(n_clusters=num_clusters, random_state=42)
kmeans.fit(X)
return kmeans.labels_
def analyze_patterns(data_file):
"""分析模式"""
with open(data_file) as f:
data = json.load(f)
# 提取文本
texts = data.get('titles', [])
# 聚类
labels = cluster_texts(texts, num_clusters=3)
# 分析聚类结果
clusters = {}
for i, label in enumerate(labels):
if label not in clusters:
clusters[label] = []
clusters[label].append(texts[i])
# 输出聚类结果
for cluster_id, cluster_texts in clusters.items():
print(f"\n=== Cluster {cluster_id} ===")
print(f"Size: {len(cluster_texts)}")
print("Sample texts:")
for text in cluster_texts[:5]:
print(f" - {text}")
# 使用示例
analyze_patterns('forum_data.json')报告生成#
自动报告#
#!/usr/bin/env python3
import json
from datetime import datetime
from jinja2 import Template
def generate_report(data_files, template_file, output_file):
"""生成报告"""
# 收集所有数据
all_data = []
for file in data_files:
with open(file) as f:
data = json.load(f)
all_data.append(data)
# 统计数据
total_domains = len(set([d for data in all_data for d in data.get('domains', [])]))
total_ips = len(set([d for data in all_data for d in data.get('ips', [])]))
total_urls = len(set([d for data in all_data for d in data.get('urls', [])]))
# 生成报告
template = Template(open(template_file).read())
report = template.render(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
total_data=len(all_data),
total_domains=total_domains,
total_ips=total_ips,
total_urls=total_urls,
data=all_data
)
with open(output_file, 'w') as f:
f.write(report)
print(f"报告已生成: {output_file}")
# 使用示例
generate_report(
['threat_iocs_1.json', 'threat_iocs_2.json'],
'report_template.html',
'darkweb_report.html'
)实战案例#
案例1: 威胁情报监控#
场景描述#
监控暗网论坛和市场,收集威胁情报和恶意指标。
实施步骤#
#!/usr/bin/env python3
import requests
import json
from datetime import datetime
import time
def monitor_threat_intel(targets, output_file):
"""监控威胁情报"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
all_data = []
for target in targets:
print(f"监控: {target['name']}")
try:
response = requests.get(target['url'], proxies=proxies, timeout=30)
# 提取数据
data = {
'source': target['name'],
'url': target['url'],
'timestamp': datetime.now().isoformat(),
'content': response.text[:1000] # 限制内容长度
}
all_data.append(data)
# 保存到文件
with open(output_file, 'w') as f:
json.dump(all_data, f, indent=2)
print(f" 数据已收集: {len(response.content)} bytes")
except Exception as e:
print(f" 错误: {e}")
# 等待一段时间
time.sleep(60)
print("监控完成")
# 使用示例
targets = [
{'name': 'Forum A', 'url': 'http://example-forum.onion'},
{'name': 'Market B', 'url': 'http://example-market.onion'},
{'name': 'Leak Site C', 'url': 'http://example-leak.onion'}
]
monitor_threat_intel(targets, 'threat_intel.json')案例2: 数据泄露监控#
场景描述#
监控暗网中的数据泄露网站,及时发现组织数据的泄露。
实施步骤#
#!/usr/bin/env python3
import requests
import json
from datetime import datetime
import re
def monitor_data_leaks(leak_sites, keywords, output_file):
"""监控数据泄露"""
proxies = {
'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'
}
alerts = []
for site in leak_sites:
print(f"检查: {site['name']}")
try:
response = requests.get(site['url'], proxies=proxies, timeout=30)
content = response.text.lower()
# 搜索关键词
for keyword in keywords:
if keyword.lower() in content:
alert = {
'site': site['name'],
'url': site['url'],
'keyword': keyword,
'timestamp': datetime.now().isoformat(),
'snippet': content[max(0, content.find(keyword)-50):content.find(keyword)+50]
}
alerts.append(alert)
print(f" 警告: 发现关键词 '{keyword}'")
except Exception as e:
print(f" 错误: {e}")
# 保存警报
if alerts:
with open(output_file, 'w') as f:
json.dump(alerts, f, indent=2)
print(f"\n发现 {len(alerts)} 个警报")
else:
print("\n未发现警报")
# 使用示例
leak_sites = [
{'name': 'Leak Site A', 'url': 'http://leak-site1.onion'},
{'name': 'Leak Site B', 'url': 'http://leak-site2.onion'}
]
keywords = [
'your-company-name',
'your-product-name',
'your-employee-name'
]
monitor_data_leaks(leak_sites, keywords, 'data_leak_alerts.json')总结#
暗网情报收集是一项专业的安全研究技术,必须在法律框架内进行。
核心优势#
- 威胁情报: 收集威胁情报和恶意指标
- 早期预警: 及早发现潜在威胁
- 安全研究: 支持安全研究和分析
- 风险评估: 评估组织面临的风险
应用场景#
- 威胁情报收集
- 安全研究
- 网络安全防护
- 合规性检查
- 风险评估
最佳实践#
- 合法使用: 确保在法律框架内使用
- 道德准则: 遵守职业道德和伦理
- 数据保护: 妥善保护收集的数据
- 验证信息: 对收集的信息进行验证
- 报告机制: 建立报告和通知机制
注意事项#
- 仅在授权的情况下使用
- 遵守相关法律法规
- 尊重隐私权
- 不参与非法活动
- 保护敏感信息
- 定期更新安全措施
法律声明#
本教程仅供学习和研究使用。使用本教程中描述的技术必须遵守当地法律法规。未经授权访问暗网或收集信息可能违法。使用者应承担全部法律责任。