Started working on fuse mount for CP/M disk

This commit is contained in:
Dreaded_X 2021-01-14 03:07:32 +01:00
commit 2510b8fb50

269
src/main.py Executable file
View File

@ -0,0 +1,269 @@
#!/usr/bin/env python3
import stat
import errno
import fuse
fuse.fuse_python_api = (0, 2)
# @todo Not hardcode this
# out = open("../tools/disk.img", "rb+", buffering=0)
out = open("/dev/sdb", "rb+", buffering=0)
# Constants
sectorsPerTrack = 256
pageSize = 128
blockSize = 16384
maxDirs = 128
diskSize = 511 # @todo Do we need to actualy use size-1 or size?
pagesPerBlock = 128
def seekBlock(blockNumber):
out.seek(pageSize*sectorsPerTrack + blockNumber*blockSize, 0)
# @todo Take in the disk number
def readEntries():
# Block 0 contains all the entries
seekBlock(0)
files = {}
for i in range(maxDirs):
seekBlock(0)
out.seek(32*i, 1)
offset = out.tell()
user = out.read(1)[0]
if user != 0xe5:
n = out.read(8)
t = out.read(3)
filename = str(user) + ":" + n.decode('ascii').split(' ')[0] + '.' + t.decode('ascii').split(' ')[0];
extends = int.from_bytes(out.read(1), byteorder='little')
out.seek(2, 1)
records = int.from_bytes(out.read(1), byteorder='little')
records += extends*128
blocks = []
for j in range(8):
block = int.from_bytes(out.read(2), byteorder='little')
if block != 0:
blocks.append(block)
# Print all the info we need on the file
files["/A/" + filename] = [filename, user, records*pageSize, blocks, offset]
return files
class MyStat(fuse.Stat):
def __init__(self):
self.st_mode = stat.S_IFDIR | 0o755
self.st_ino = 0
self.st_dev = 0
self.st_nlink = 2
self.st_uid = 1000
self.st_gid = 1000
self.st_size = 128
self.st_atime = 0
self.st_mtime = 0
self.st_ctime = 0
class CPMFS(fuse.Fuse):
def __init__(self, *args, **kw):
fuse.Fuse.__init__(self, *args, **kw)
self.drives = ["A"]
# @todo We need to actually fill the drives with all the files that are contained in the disk image
# Get the file/directory attributes
def getattr(self, path):
path = path.upper()
st = MyStat()
files = readEntries()
if path == '/':
pass
elif len(path) == 2 and path[0] == "/" and path[1] in self.drives:
pass
elif path == "/BOOT.BIN":
st.st_mode = stat.S_IFREG | 0o666
st.st_nlink = 1
st.st_size = 128
elif path in files:
st.st_mode = stat.S_IFREG | 0o666
st.st_nlink = 1
st.st_size = files[path][2]
else:
return -errno.ENOENT
return st
# Return the contents of a directory
def readdir(self, path, offset):
path = path.upper()
print(len(path))
dirents = ['.', ".."]
if path == '/':
dirents.extend(self.drives)
dirents.append("boot.bin")
else:
files = readEntries()
dirents.extend(f[0] for f in files.values())
for r in dirents:
yield fuse.Direntry(r)
# @todo How do we handle failing to create a file?
# def mknod(self, path, mode, dev):
# pe = path.split('/')[1:]
#
# n, t = pe[1].split('.')
#
# if len(n) > 8:
# raise RuntimeError("Filename cannot be longer than 8")
#
# if len(t) > 3:
# raise RuntimeError("Filetype cannot be longer than 3")
#
# seekBlock(0)
# for i in range(maxDirs):
# user = out.read(1)[0]
# if user == 0xe5:
# # @todo Assume user 0 for now
# out.seek(-1, 1)
# out.write(bytearray(1))
# out.write(n.upper().encode("ascii")[0:8])
# if len(n) != 8:
# out.write((" " * (8-len(n))).encode("ascii"))
# out.write(t.upper().encode("ascii")[0:3])
# if len(t) != 3:
# out.write((" " * (3-len(t))).encode("ascii"))
#
# out.write(bytearray(20))
#
# break
# else:
# out.seek(31, 1)
#
# return 0
# def unlink(self, path):
# path = path.upper()
# files = readEntries()
#
# if path in files:
# out.seek(files[path][4])
# # We do not have to remove any data we can just set user to 0xe5 to mark it as unused
# out.write(b'\xe5')
#
# return 0
# def write(self, path, buf, offset):
# print("WRITE!!!!!!")
# if offset != 0:
# # @todo Implement handling offsets
# raise RuntimeError("OFFSET IS NOT IMPLEMENTED!")
#
# pe = path.split('/')[1:]
# files = readEntries()
#
# freeBlocks = list(range(1, diskSize+1))
#
# for _,f in files.items():
# for block in f[2]:
# if block != 0:
# freeBlocks.remove(block)
#
# f = files[pe[1].upper()]
# usedBlocks = []
#
# print("Length:", len(buf))
# total_records = math.ceil(len(buf)/pageSize)
# records = (total_records - 1) % pagesPerBlock + 1
# extends = math.ceil(total_records/pagesPerBlock)
#
# out.seek(f[3] + 12)
# out.write((extends-1).to_bytes(1, byteorder='little'))
# out.seek(f[3] + 15)
# out.write(records.to_bytes(1, byteorder='little'))
#
# print(f[2])
# print(freeBlocks)
# for i in range(extends):
# block = 0
# if len(f[2]):
# block = f[2][0]
# f[2].remove(block)
# else:
# block = freeBlocks[0]
# freeBlocks.remove(block)
#
# usedBlocks.append(block)
# seekBlock(block)
# written = out.write(buf[i*128*128:(i+1)*128*128])
# if written < 128*128:
# out.write(b'\x1a')
#
# out.seek(f[3] + 16)
# for i in range(8):
# if i < len(usedBlocks):
# out.write(usedBlocks[i].to_bytes(2, byteorder='little'))
# else:
# out.write((0).to_bytes(2, byteorder='little'))
#
#
# print(f[2])
# print(freeBlocks)
# print(usedBlocks)
#
# print("$$$$$$$$$RECORDS:", records, extends)
#
# return len(buf)
def read(self, path, size, offset):
path = path.upper()
if (path == "/BOOT.BIN"):
out.seek(0, 0)
data = out.read(128)
return data[offset:offset+size]
f = readEntries()[path]
remaining = f[2]
if remaining == 0:
return b''
data = b''
for block in f[3]:
if block == 0:
continue
seekBlock(block)
if remaining > blockSize:
data += out.read(blockSize)
remaining -= blockSize
else:
data += out.read(remaining)
break
diff = f[2] - len(data)
# Pad the file if for some reason in end up to short
# This only happends when working with a disk image that is not a multiple of blockSize
if diff:
data += b'\0' * diff
return data[offset:offset+size]
def main():
usage="""
CP/M Disk mount utility
""" + fuse.Fuse.fusage
server = CPMFS(version="%prog " + fuse.__version__, usage=usage, dash_s_do='setsingle')
server.parse(errex=1)
server.main()
if __name__ == "__main__":
main()