sink点:输入的入口和执行敏感操作的点
在代码审计中,Sink点(污染汇聚点) 是指程序中接收外部输入并可能导致安全漏洞的关键位置。这些位置如果没有严格的输入验证或安全防护,可能成为攻击入口(如SQL注入、XSS、命令注入等)。以下是常见Sink点分类及审计方法:
一、常见Sink点分类
1. 数据库操作
- 危险代码模式:
# 直接拼接SQL语句(SQL注入)
query = f"SELECT * FROM users WHERE username = '{user_input}'"
cursor.execute(query)
- 审计重点:
- 所有使用字符串拼接的SQL语句。
- ORM框架中未使用参数化查询的方法(如
execute()直接拼接)。
2. 文件操作
- 危险代码模式:
# 直接使用用户输入构造文件路径(路径遍历)
with open(f"/data/{filename}", "r") as f:
content = f.read()
- 审计重点:
- 文件读写操作中的动态路径构造。
- 解压缩文件时未校验文件名(Zip Slip漏洞)。
3. 命令执行
- 危险代码模式:
# 直接执行用户输入的命令(命令注入)
import os
os.system(f"ping {user_input}")
- 审计重点:
os.system、subprocess.run、eval等动态执行函数。
4. 反序列化
- 危险代码模式:
# 反序列化未受信数据(反序列化漏洞)
import pickle
data = pickle.loads(user_input)
- 审计重点:
- 使用
pickle、marshal等模块的反序列化操作。
5. Web输出
- 危险代码模式:
# 直接输出未转义的用户输入(XSS)
from flask import render_template_string
@app.route('/unsafe')
def unsafe():
return render_template_string(f'<div>{request.args.get("content")}</div>')
- 审计重点:
- 动态渲染HTML、XML时未转义用户输入(如Jinja2的
|safe过滤器滥用)。
6. HTTP头部操作
- 危险代码模式:
# 直接设置HTTP头(响应头注入)
from flask import Response
@app.route('/redirect')
def redirect():
url = request.args.get('url')
return Response("", headers={"Location": url})
- 审计重点:
- 动态设置
Location、Set-Cookie等HTTP头。
二、审计方法论
1. 静态分析
- 工具辅助:
- Python:
Bandit、Semgrep - Java:
FindSecBugs - 通用:
SonarQube - 人工检查:
- 跟踪用户输入(从
request、input()等入口点开始)。 - 检查敏感函数(如
eval、os.system)的调用链。
2. 动态测试
- 输入模糊测试:
- 使用
sqlmap测试SQL注入。 - 使用
XSStrike测试XSS漏洞。 - 流量拦截:
- 通过Burp Suite修改请求参数,观察异常行为。
3. 数据流分析
- 识别“Source(输入源) → Propagation(传播路径) → Sink(污染汇聚点)”链条。
- 示例流程:
用户输入 → 未过滤 → 拼接SQL查询 → 执行数据库操作(Sink)
三、修复建议
| 漏洞类型 | 修复方案 |
| SQL注入 | 使用参数化查询(如SQLAlchemy的text() + 参数绑定) |
| XSS | 输出时转义(如Jinja2的`{{ content |
| 命令注入 | 使用白名单校验输入,避免直接拼接命令(或改用API替代命令行操作) |
| 路径遍历 | 规范化路径并校验是否在允许的目录内(如os.path.abspath + 前缀检查) |
| 反序列化漏洞 | 避免反序列化不可信数据,改用JSON等安全格式 |
四、实战案例
漏洞代码(Flask应用):
@app.route('/search')
def search():
keyword = request.args.get('keyword')
query = f"SELECT * FROM posts WHERE content LIKE '%{keyword}%'"
result = db.engine.execute(query) # SQL注入Sink点
return render_template('results.html', data=result)
修复代码:
from sqlalchemy import text
@app.route('/search')
def search():
keyword = request.args.get('keyword')
query = text("SELECT * FROM posts WHERE content LIKE :keyword")
result = db.session.execute(query, {"keyword": f"%{keyword}%"}) # 参数化查询
return render_template('results.html', data=result)

Comments NOTHING