Python栈帧逃逸:

1、yield 语与生成器函数:

​ python中的yield语句用于定义生成器函数,它能让函数在运行过程中暂停并保存当前状态,后续再恢复执行。

​ 生成器 是 Pythonn中一种特殊的迭代器,它可以在迭代过程中动态生成值,而不需要一次性将所有值存0储在内存中。

​ 举个生成器的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def func():
print("生成器函数第一次执行")
yield 1
print("生成器函数第二次执行")
yield 2
print("生成器函数第三次执行")
yield 3


gen = func()
print(next(gen))
print(next(gen))
print(next(gen))
# frame = [x for x in gen]
# print(frame)







#执行结果
#生成器函数第一次执行
#1
#生成器函数第二次执行
#2
#生成器函数第三次执行
#3

​ 或者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def f():
a = 1
while True:
yield a
a += 1

f = f()
print(next(f))
print(next(f))


#执行结果
#1
#2

​ 也可以 遍历获取所有的自增值:

1
2
3
4
5
6
7
8
9
10
11
12
def f():
a = 1
for i in range(1,20):
yield a
a += 1

f = f()
for value in f:
print(value)


#会正常输出1到19

2、生成器表达式:

​ 生成器表达式是一种在Python中创建生成器的紧凑形式。类似于列表推导式,生成器表达式允许你使用简洁的语法来定义生成器,而不必显示地编写一个函数。生成器表达式的 语法与列表推导式类似,但是使用圆括号而不是方括号。生成器表达式会 逐个生成值,而不是一次性生成 整个序列。。这有利于 提高内存的额 利用率:

1
2
3
4
5
6
7
f = (i+1 for i in range(10))

#可以用next一步步 获取值
# print(next(f))
#也可以遍历获取所有值
for value in f:
print(value)

3、生成器属性:

1
2
3
4
5
gi_code:生成器对应的code对象。
gi_frame:生成器对应的frame(栈帧)对象
gi_running:生成器函数是否在执行。生成器函数在yield以后、执行yield的下一行代码前处于frozen状态,此时这个属性为0.
gi_yeildfrom:如果生成器正在从另一个生成器中yield值,则为该生成器对象的引;否则为None
gi_frame.f_locals:一个字典,,包含生成器当前栈的本地变量

4、gi_frame的使用:

​ gi_frame 是一个与生成器和协程相关的属性。它指向生成器或协程当前 执行的帧对象,如果这个生成器或协程正在执行的话。帧对象 表示代码 执行的当前上下文,包含了局部变量、执行的字节码指令等信息。

1
2
3
4
5
6
7
8
9
10
11
12
def f():
yield 1
yield 2
yield 3

f = f()
frame = f.gi_frame

print("局部变量(Local Variables):",frame.f_locals)
print("全局变量(Gloobal Variables):",frame.f_globals)
print("Code对象:",frame.f_code)
print("Insstruction Pointer:",frame.f_lasti)

​ 获取代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def my_generator():
yield 1
yield 2
yield 3

gen = my_generator()

# 获取生成器的当前代码信息
code = gen.gi_code

# 输出生成器的当前代码信息
print( code.co_name)
print(code.co_code)
print( code.co_consts)
print(code.co_filename)



# my_generator
# b'K\x00\x01\x00\x97\x00d\x01V\x00\x97\x01\x01\x00d\x02V\x00\x97\x01\x01\x00d\x03V\x00\x97\x01\x01\x00d\x00S\x00'
# (None, 1, 2, 3)
# C:\Users\20820\PycharmProjects\shizhan\bb.py

5、栈帧 (Frame)介绍:

​ 在 Python 中,栈帧(stack frame),也称为帧(frame),是用于执行代码的数据结构。每当 Python 解释器执行一个函数或方法时,都会创建一个新的栈帧,用于存储该函数或方法的局部变量、参数、返回地址以及其他执行相关的信息。这些栈帧会按照调用顺序被组织成一个栈,称为调用栈。

​ 栈帧包含了以下几个重要的属性:f_locals: 一个字典,包含了函数或方法的局部变量。键是变量名。f_globals: 一个字典,包含了函数或方法所在模块的全局变量。f_code: 一个代码对象(code object),包含了函数或方法的字节码指令、常量、变量名等信息。f_lasti: 整数,表示最后执行的字节码指令的索引。f_back: 指向上一级调用栈帧的引用,用于构建调用栈。

​ 栈帧包含了以下几个重要的属性:

f_locals: 一个字典,包含了函数或方法的局部变量。键是变量名,值是变量的值。
f_globals: 一个字典,包含了函数或方法所在模块的全局变量。键是全局变量名,值是变量的值。
f_code: 一个代码对象(code object),包含了函数或方法的字节码指令、常量、变量名等信息。
f_lasti: 整数,表示最后执行的字节码指令的索引。
f_back: 指向上一级调用栈帧的引用,用于构建调用栈。

6、栈帧逃逸:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
s3cret="this is flag:flag{hello_World_hello_Python}"

def waff():
def f():
yield g.gi_frame.f_back

g = f() #生成器
frame = next(g) #获取到生成器的栈帧对象
b = frame.f_globals['s3cret'] #返回并获取前一级栈帧的globals
# b = frame.f_back.f_globals['s3cret'] # 返回并获取前一级栈帧的globals
print(b)
b=waff()



#this is flag:flag{hello_World_hello_Python}

​ 在看看上面的f_globals: 一个字典,包含了函数或方法所在模块的全局变量。键是全局变量名,值是变量的值。

不难看出这里函数和模块本就同在一个全局,所以都有属性s3cret,怎么看到没到全局?直接看file就能看出。

​ 在给个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
s3cret="this is flag"

codes='''
def waff():
def f():
yield g.gi_frame.f_back

g = f() #生成器
frame = next(g) #获取到生成器的栈帧对象
print(frame)
print(frame.f_back)
print(frame.f_back.f_back)
b = frame.f_back.f_back.f_globals['s3cret'] #返回并获取前一级栈帧的globals
return b
b=waff()
'''
locals={}
code = compile(codes, "test", "exec")
exec(code,locals)
print(locals["b"])


# <frame at 0x0000017C309E7C40, file 'test', line 8, code waff>
# <frame at 0x0000017C520AF560, file 'test', line 13, code <module>>
# <frame at 0x0000017C521A0040, file 'C:\\Users\\20820\\PycharmProjects\\shizhan\\bb.py', line 19, code <module>>
# this is flag

​ 首先,生成器的栈帧是 g ,经过一次回溯 之后到了waff的栈帧,再回溯一次之后到了exec的栈帧,这个时候访问f_globals就到了exec再上一层 的作用域,也就是全局变量 。

​ 另一种情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
s3cret="this is flag"

codes='''
def waff():
def f():
yield g.gi_frame.f_back

q = (q.gi_frame.f_back.f_back.f_back.f_globals for _ in [1])
b = [*q][0]["s3cret"]
return b
b=waff()
'''
locals={}
code = compile(codes, "test", "exec")
exec(code,locals)
print(locals["b"])

​ 这样也能输出flag,和上面的相比,是进行了三次回溯,先来一个回溯结构示意:

1
2
3
4
5
6
7
8
9
主程序全局帧 (包含 s3cret)

exec 执行环境帧

waff() 函数调用帧

列表推导式帧 ([*q] 迭代触发)

生成器 q 的帧 (q.gi_frame)

​ 首先,是生成器q的帧,回溯一次之后是[*q]这一行 的帧,之后才是waff和exec的帧到了exec的帧之后获取globals就可以获得到flag了。

​ 为什么用[*q]运行生成器,却不用next(q)?主要是因为,大部分的沙箱题目中,builtins基本上都被杨了,不能使用next来进行获取,所以只能使用这种办法。

来个例题(CISCN2024 mossfern):

​ main.py的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import os
import subprocess
from flask import Flask, request, jsonify
from uuid import uuid1

app = Flask(__name__)

runner = open("/app/runner.py", "r", encoding="UTF-8").read()
flag = open("/flag", "r", encoding="UTF-8").readline().strip()


@app.post("/run")
def run():
id = str(uuid1())
try:
data = request.json
open(f"/app/uploads/{id}.py", "w", encoding="UTF-8").write(
runner.replace("THIS_IS_SEED", flag).replace("THIS_IS_TASK_RANDOM_ID", id))
open(f"/app/uploads/{id}.txt", "w", encoding="UTF-8").write(data.get("code", ""))
run = subprocess.run(
['python', f"/app/uploads/{id}.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=3
)
result = run.stdout.decode("utf-8")
error = run.stderr.decode("utf-8")
print(result, error)


if os.path.exists(f"/app/uploads/{id}.py"):
os.remove(f"/app/uploads/{id}.py")
if os.path.exists(f"/app/uploads/{id}.txt"):
os.remove(f"/app/uploads/{id}.txt")
return jsonify({
"result": f"{result}\n{error}"
})
except:
if os.path.exists(f"/app/uploads/{id}.py"):
os.remove(f"/app/uploads/{id}.py")
if os.path.exists(f"/app/uploads/{id}.txt"):
os.remove(f"/app/uploads/{id}.txt")
return jsonify({
"result": "None"
})


if __name__ == "__main__":
app.run("0.0.0.0", 5000)

​ 没啥太大得到用处,大概就是每一次访问/run路由就生成一个uuid.py,将 flag替换进文件占位符,之后,再生成一个uuid.txt文件,将用户上传的代码写入,之后运行代码,并结果返回。

​ 之后看看runner.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def source_simple_check(source):

"""
Check the source with pure string in string, prevent dangerous strings
:param source: source code
:return: None
"""

from sys import exit
from builtins import print

try:
source.encode("ascii")
except UnicodeEncodeError:
print("non-ascii is not permitted")
exit()

for i in ["__", "getattr", "exit"]:
if i in source.lower():
print(i)
exit()


def block_wrapper():
"""
Check the run process with sys.audithook, no dangerous operations should be conduct
:return: None
"""

def audit(event, args):

from builtins import str, print
import os

for i in ["marshal", "__new__", "process", "os", "sys", "interpreter", "cpython", "open", "compile", "gc"]:
if i in (event + "".join(str(s) for s in args)).lower():
print(i)
os._exit(1)
return audit


def source_opcode_checker(code):
"""
Check the source in the bytecode aspect, no methods and globals should be load
:param code: source code
:return: None
"""

from dis import dis
from builtins import str
from io import StringIO
from sys import exit

opcodeIO = StringIO()
dis(code, file=opcodeIO)
opcode = opcodeIO.getvalue().split("\n")
opcodeIO.close()
for line in opcode:
if any(x in str(line) for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"]):
if any(x in str(line) for x in ["randint", "randrange", "print", "seed"]):
break
print("".join([x for x in ["LOAD_GLOBAL", "IMPORT_NAME", "LOAD_METHOD"] if x in str(line)]))
exit()


if __name__ == "__main__":

from builtins import open
from sys import addaudithook
from contextlib import redirect_stdout
from random import randint, randrange, seed
from io import StringIO
from random import seed
from time import time

source = open(f"/app/uploads/THIS_IS_TASK_RANDOM_ID.txt", "r").read()
source_simple_check(source)
source_opcode_checker(source)
code = compile(source, "<sandbox>", "exec")
addaudithook(block_wrapper())
outputIO = StringIO()
with redirect_stdout(outputIO):
seed(str(time()) + "THIS_IS_SEED" + str(time()))
exec(code, {
"__builtins__": None,
"randint": randint,
"randrange": randrange,
"seed": seed,
"print": print
}, None)
output = outputIO.getvalue()

if "THIS_IS_SEED" in output:
print("这 runtime 你就嘎嘎写吧, 一写一个不吱声啊,点儿都没拦住!")
print("bad code-operation why still happened ah?")
else:
print(output)

​ 首先是source_simple_check()函数,这个函数简单来说就是给 所有全角字符都ban了,无法使用,以及三个关键字,也给ban了。

​ 之后是block_wrapper(),检测代码执行过程中事件和参数是否含有那些关键字 。

​ 之后是source_opcode_checker(),目的为检测用户程序的操作码中是否含有访问全局变量、导入模块以及方法。

更新于

请我喝[茶]~( ̄▽ ̄)~*

g01den 微信支付

微信支付

g01den 支付宝

支付宝

g01den 贝宝

贝宝