DIY_GIT_in_Python/ugit/base.py

172 lines
4.2 KiB
Python

import itertools
import operator
import os
import string
from collections import namedtuple
from pathlib import Path, PurePath
from . import data
def write_tree(directory="."):
entries = []
with Path.iterdir(directory) as it:
for entry in it:
full = f"{directory}/{entry.name}"
if is_ignored(full):
continue
if entry.is_file(follow_symlinks=False):
type_ = "blob"
with open(full, "rb") as f:
oid = data.hash_object(f.read())
elif entry.is_dir(follow_symlinks=False):
type_ = "tree"
oid = write_tree(full)
entries.append((entry.name, oid, type_))
tree = "".join(f"{type_} {oid} {name}\n" for name, oid, type_ in sorted(entries))
return data.hash_object(tree.encode(), "tree")
def _iter_tree_entries(oid):
if not oid:
return
tree = data.get_object(oid, "tree")
for entry in tree.decode().splitlines():
type_, oid, name = entry.split(" ", 2)
yield type_, oid, name
def get_tree(oid, base_path=""):
result = {}
for type_, oid, name in _iter_tree_entries(oid):
assert "/" not in name
assert name not in ("..", ".")
path = base_path + name
if type_ == "blob":
result[path] = oid
elif type_ == "tree":
result.update(get_tree(oid, f"{path}/"))
else:
assert False, f"Unknown tree entry {type_}"
return result
def _empty_current_directory():
for root, dirnames, filenames in os.walk(".", topdown=False):
for filename in filenames:
path = PurePath.relative_to(f"{root}/{filename}")
if is_ignored(path) or not Path.is_file(path):
continue
Path.unlink(path)
for dirname in dirnames:
path = PurePath.relative_to(f"{root}/{dirname}")
if is_ignored(path):
continue
try:
Path.rmdir(path)
except (FileNotFoundError, OSError):
# Deletion might fail if the directory contains ignored files,
# so it's OK
pass
def read_tree(tree_oid):
_empty_current_directory()
for path, oid in get_tree(tree_oid, base_path="./").items():
Path.mkdir(PurePath.parent(path), exist_ok=True)
with open(path, "wb") as f:
f.write(data.get_object(oid))
def commit(message):
commit = f"tree {write_tree()}\n"
HEAD = data.get_ref("HEAD")
if HEAD:
commit += f"parent {HEAD}\n"
commit += "\n"
commit += f"{message}\n"
oid = data.hash_object(commit.encode(), "commit")
data.update_ref("HEAD", oid)
return oid
def create_tag(name, oid):
data.update_ref(f"refs/tags/{name}", oid)
def checkout(oid):
commit = get_commit(oid)
read_tree(commit.tree)
data.update_ref("HEAD", oid)
Commit = namedtuple("Commit", ["tree", "parent", "message"])
def get_commit(oid):
parent = None
commit = data.get_object(oid, "commit").decode()
lines = iter(commit.splitlines())
for line in itertools.takewhile(operator.truth, lines):
key, value = line.split(" ", 1)
if key == "tree":
tree = value
elif key == "parent":
parent = value
else:
assert False, f"Unknown field {key}"
message = "\n".join(lines)
return Commit(tree=tree, parent=parent, message=message)
def iter_commits_and_parents(oids):
oids = set(oids)
visited = set()
while oids:
oid = oids.pop()
if not oid or oid in visited:
continue
visited.add(oid)
yield oid
commit = get_commit(oid)
oids.add(commit.parent)
def get_oid(name):
if name == "@":
name = "HEAD"
# Name is ref
refs_to_try = [
f"{name}",
f"refs/{name}",
f"refs/tags/{name}",
f"refs/heads/{name}",
]
for ref in refs_to_try:
if data.get_ref(ref):
return data.get_ref(ref)
# Name is SHA1
is_hex = all(c in string.hexdigits for c in name)
if len(name) == 40 and is_hex:
return name
assert False, f"Unknown name {name}"
def is_ignored(path):
return ".ugit" in path.split("/")