此操作系统非彼操作系统...

【OS】Python模拟简单的操作系统

Background

学习操作系统的时候,关注的对象是:

  • 应用程序
  • 系统调用
  • 操作系统的内部实现

通常的学习思路:

  • 真实的操作系统
  • 编译内核环境
  • qemu模拟

但是,换个角度想一下,把上述的思路抽象,可以使用如下的思路学习:

  • 应用程序 == Python Code + Syscall
  • 操作系统 == Python Code + 假想的IO设备

比如,我想要实现操作系统中的打印,可以在Python中定义sys_write就是通过系统调用进行输出内容的函数:

def main():
    sys_write("Hello Python OS!")

Core

一个最基本的OS,不可能只有一个write的api吧,想一想一个最基本的OS需要什么api呢?

  • write(输出字符)
  • spawn(创建一个状态)
  • choose(随机选择一个状态)
  • sched(随机切换到某个状态并且执行)

如何使用Python实现上述的api呢?

部分api还是非常显而易见的:

def sys_write(msg):
    print(msg)

def sys_choose(x):
    return randmo.choice(x)

def sys_spawn(x):
    running.append(x)   # 状态队列

但是sched这种如何实现呢?得切换到running队列中的某个状态中并且执行!同时需要注意如下几点:

  • 保存当前状态
  • 恢复目的状态

其实这种操作在软件层经常用到,就比如函数调用链,就需要保存当前状态,同时去执行下一个函数。

那么如何在Python Code中实现这样的操作呢?可以使用生成器模式,在函数中使用yield关键字让其成为一个生成器。

Python的生成器支持send方法,可以让生成器变为双向通道。send可以把参数发送给生成器,同时让这个参数成为上一条yield表达式的求值结果,并且将生成器推进到下一条yield表达式,然后把yield右边的值返回给send方法的调用者。 首次调用send方法的时候只能传入None

Code

结合上述的分析,我们可以用面向对象的思想写出如下的代码

import sys, random


class Thread:
    def __init__(self, func, *args) -> None:
        self.func = func(*args)
        self.ret = None

    def step(self):
        syscall, args, *_ = self.func.send(self.ret)
        self.ret = None
        return syscall, args


class OS:
    SYSCALLS = ("choose", "write", "spawn", "sched")

    def __init__(self, src) -> None:
        vars = {}
        exec(src, vars)
        self.main = vars["main"]

    def run(self):
        running = [Thread(self.main)]
        while running:
            t: Thread = None
            try:
                match (t := running[0]).step():
                    case "choose", x:  # choose this and exec
                        t.ret = random.choice(x)
                    case "write", msg:
                        print(msg, end="")
                    case "spawn", (fn, args):
                        running.append(Thread(fn, args))
                    case "sched", _:
                        random.shuffle(running)
            except StopIteration:
                running.remove(t)
                random.shuffle(running)


if __name__ == "__main__":
    if sys.argv.__len__() < 2:
        print("python3 os_model.py [src]")
        sys.exit(0)

    src = open(sys.argv[1], "r", encoding="u8").read()
    for syscall in OS.SYSCALLS:
        src = src.replace(f"sys_{syscall}", f"yield '{syscall}', ")
    OS(src).run()

稍微解释一下上述的代码:

  • 需要传入一个参数,指定需要执行的python文件
  • 读取这个python文件的代码,将sys_write这类的syscall函数转为yield构造的生成器函数
  • 使用OS类进行执行,OS在__init__的时候使用exec载入了python文件的信息
  • 执行run函数,在每个线程的step函数执行后,执行不同的syscall函数
  • 维护running线程队列
  • 每次执行sys_spawn的时候会将这个函数加入到running队列
  • 一个线程执行结束抛出StopIteration的异常,同时从running队列删除,在打乱线程队列

那么如何使用上述的代码呢?

python3 os_model.py test.py

test.py如下:

count = 0

def Tprint(name):
    global count
    for i in range(3):
        count += 1
        sys_write(f'#{count:02} Hello from {name}{i+1}\n')
        sys_sched()

def main():
    n = sys_choose([3, 4, 5])
    sys_write(f'#Thread = {n}\n')
    for name in 'ABCDE'[:n]:
        sys_spawn(Tprint, name)
    sys_sched()

上述代码执行后的效果,可以正常执行

image-20230526131004022

其实上述的模型有挺多不合理的地方:

  • sys_sched只能由用户手动调用,而真实OS会被动调用
  • 只有一个终端 ,同时没有stdin的输入
  • 没有实现fork等较常用的syscall的操作

但是在GitHub上有个开源的项目,使用python模拟运行一个操作系统,并且只有简短的500行

codebox/mosaic: Python script for creating photomosaic images (github.com)

Idea

我们可以用 “简化” 的方式把操作系统的概念用可执行模型的方式呈现出来:

  • 程序被建模为高级语言 (Python) 的执行和系统调用
  • 系统调用的实现未必一定需要基于真实或模拟的计算机硬件
  • 操作系统的 “行为模型” 更容易理解