DIY_GIT_in_Python/ugit/base.py

129 lines
3.4 KiB
Python

import itertools
import operator
import os
from collections import namedtuple
from pathlib import Path, PurePath
from . import data
def write_tree(directory="."):
entries = []
with Path.iterdir(directory) as it:
for entry in it:
full = f"{directory}/{entry.name}"
if is_ignored(full):
continue
if entry.is_file(follow_symlinks=False):
type_ = "blob"
with open(full, "rb") as f:
oid = data.hash_object(f.read())
elif entry.is_dir(follow_symlinks=False):
type_ = "tree"
oid = write_tree(full)
entries.append((entry.name, oid, type_))
tree = "".join(f"{type_} {oid} {name}\n" for name, oid, type_ in sorted(entries))
return data.hash_object(tree.encode(), "tree")
def _iter_tree_entries(oid):
if not oid:
return
tree = data.get_object(oid, "tree")
for entry in tree.decode().splitlines():
type_, oid, name = entry.split(" ", 2)
yield type_, oid, name
def get_tree(oid, base_path=""):
result = {}
for type_, oid, name in _iter_tree_entries(oid):
assert "/" not in name
assert name not in ("..", ".")
path = base_path + name
if type_ == "blob":
result[path] = oid
elif type_ == "tree":
result.update(get_tree(oid, f"{path}/"))
else:
assert False, f"Unknown tree entry {type_}"
return result
def _empty_current_directory():
for root, dirnames, filenames in os.walk(".", topdown=False):
for filename in filenames:
path = PurePath.relative_to(f"{root}/{filename}")
if is_ignored(path) or not Path.is_file(path):
continue
Path.unlink(path)
for dirname in dirnames:
path = PurePath.relative_to(f"{root}/{dirname}")
if is_ignored(path):
continue
try:
Path.rmdir(path)
except (FileNotFoundError, OSError):
# Deletion might fail if the directory contains ignored files,
# so it's OK
pass
def read_tree(tree_oid):
_empty_current_directory()
for path, oid in get_tree(tree_oid, base_path="./").items():
Path.mkdir(PurePath.parent(path), exist_ok=True)
with open(path, "wb") as f:
f.write(data.get_object(oid))
def commit(message):
commit = f"tree {write_tree()}\n"
HEAD = data.get_ref("HEAD")
if HEAD:
commit += f"parent {HEAD}\n"
commit += "\n"
commit += f"{message}\n"
oid = data.hash_object(commit.encode(), "commit")
data.update_ref("HEAD", oid)
return oid
def checkout(oid):
commit = get_commit(oid)
read_tree(commit.tree)
data.update_ref("HEAD", oid)
Commit = namedtuple("Commit", ["tree", "parent", "message"])
def get_commit(oid):
parent = None
commit = data.get_object(oid, "commit").decode()
lines = iter(commit.splitlines())
for line in itertools.takewhile(operator.truth, lines):
key, value = line.split(" ", 1)
if key == "tree":
tree = value
elif key == "parent":
parent = value
else:
assert False, f"Unknown field {key}"
message = "\n".join(lines)
return Commit(tree=tree, parent=parent, message=message)
def is_ignored(path):
return ".ugit" in path.split("/")