Spaces:
Running
Running
# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors | |
# | |
# This module is part of GitPython and is released under the | |
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ | |
__all__ = ["Commit"] | |
from collections import defaultdict | |
import datetime | |
from io import BytesIO | |
import logging | |
import os | |
import re | |
from subprocess import Popen, PIPE | |
import sys | |
from time import altzone, daylight, localtime, time, timezone | |
import warnings | |
from gitdb import IStream | |
from git.cmd import Git | |
from git.diff import Diffable | |
from git.util import Actor, Stats, finalize_process, hex_to_bin | |
from . import base | |
from .tree import Tree | |
from .util import ( | |
Serializable, | |
TraversableIterableObj, | |
altz_to_utctz_str, | |
from_timestamp, | |
parse_actor_and_date, | |
parse_date, | |
) | |
# typing ------------------------------------------------------------------ | |
from typing import ( | |
Any, | |
Dict, | |
IO, | |
Iterator, | |
List, | |
Sequence, | |
Tuple, | |
TYPE_CHECKING, | |
Union, | |
cast, | |
) | |
if sys.version_info >= (3, 8): | |
from typing import Literal | |
else: | |
from typing_extensions import Literal | |
from git.types import PathLike | |
if TYPE_CHECKING: | |
from git.refs import SymbolicReference | |
from git.repo import Repo | |
# ------------------------------------------------------------------------ | |
_logger = logging.getLogger(__name__) | |
class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): | |
"""Wraps a git commit object. | |
See :manpage:`gitglossary(7)` on "commit object": | |
https://git-scm.com/docs/gitglossary#def_commit_object | |
:note: | |
This class will act lazily on some of its attributes and will query the value on | |
demand only if it involves calling the git binary. | |
""" | |
# ENVIRONMENT VARIABLES | |
# Read when creating new commits. | |
env_author_date = "GIT_AUTHOR_DATE" | |
env_committer_date = "GIT_COMMITTER_DATE" | |
# CONFIGURATION KEYS | |
conf_encoding = "i18n.commitencoding" | |
# INVARIANTS | |
default_encoding = "UTF-8" | |
type: Literal["commit"] = "commit" | |
__slots__ = ( | |
"tree", | |
"author", | |
"authored_date", | |
"author_tz_offset", | |
"committer", | |
"committed_date", | |
"committer_tz_offset", | |
"message", | |
"parents", | |
"encoding", | |
"gpgsig", | |
) | |
_id_attribute_ = "hexsha" | |
parents: Sequence["Commit"] | |
def __init__( | |
self, | |
repo: "Repo", | |
binsha: bytes, | |
tree: Union[Tree, None] = None, | |
author: Union[Actor, None] = None, | |
authored_date: Union[int, None] = None, | |
author_tz_offset: Union[None, float] = None, | |
committer: Union[Actor, None] = None, | |
committed_date: Union[int, None] = None, | |
committer_tz_offset: Union[None, float] = None, | |
message: Union[str, bytes, None] = None, | |
parents: Union[Sequence["Commit"], None] = None, | |
encoding: Union[str, None] = None, | |
gpgsig: Union[str, None] = None, | |
) -> None: | |
"""Instantiate a new :class:`Commit`. All keyword arguments taking ``None`` as | |
default will be implicitly set on first query. | |
:param binsha: | |
20 byte sha1. | |
:param tree: | |
A :class:`~git.objects.tree.Tree` object. | |
:param author: | |
The author :class:`~git.util.Actor` object. | |
:param authored_date: int_seconds_since_epoch | |
The authored DateTime - use :func:`time.gmtime` to convert it into a | |
different format. | |
:param author_tz_offset: int_seconds_west_of_utc | |
The timezone that the `authored_date` is in. | |
:param committer: | |
The committer string, as an :class:`~git.util.Actor` object. | |
:param committed_date: int_seconds_since_epoch | |
The committed DateTime - use :func:`time.gmtime` to convert it into a | |
different format. | |
:param committer_tz_offset: int_seconds_west_of_utc | |
The timezone that the `committed_date` is in. | |
:param message: string | |
The commit message. | |
:param encoding: string | |
Encoding of the message, defaults to UTF-8. | |
:param parents: | |
List or tuple of :class:`Commit` objects which are our parent(s) in the | |
commit dependency graph. | |
:return: | |
:class:`Commit` | |
:note: | |
Timezone information is in the same format and in the same sign as what | |
:func:`time.altzone` returns. The sign is inverted compared to git's UTC | |
timezone. | |
""" | |
super().__init__(repo, binsha) | |
self.binsha = binsha | |
if tree is not None: | |
assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) | |
if tree is not None: | |
self.tree = tree | |
if author is not None: | |
self.author = author | |
if authored_date is not None: | |
self.authored_date = authored_date | |
if author_tz_offset is not None: | |
self.author_tz_offset = author_tz_offset | |
if committer is not None: | |
self.committer = committer | |
if committed_date is not None: | |
self.committed_date = committed_date | |
if committer_tz_offset is not None: | |
self.committer_tz_offset = committer_tz_offset | |
if message is not None: | |
self.message = message | |
if parents is not None: | |
self.parents = parents | |
if encoding is not None: | |
self.encoding = encoding | |
if gpgsig is not None: | |
self.gpgsig = gpgsig | |
def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]: | |
return tuple(commit.parents) | |
def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes: | |
"""Calculate the sha of a commit. | |
:param repo: | |
:class:`~git.repo.base.Repo` object the commit should be part of. | |
:param commit: | |
:class:`Commit` object for which to generate the sha. | |
""" | |
stream = BytesIO() | |
commit._serialize(stream) | |
streamlen = stream.tell() | |
stream.seek(0) | |
istream = repo.odb.store(IStream(cls.type, streamlen, stream)) | |
return istream.binsha | |
def replace(self, **kwargs: Any) -> "Commit": | |
"""Create new commit object from an existing commit object. | |
Any values provided as keyword arguments will replace the corresponding | |
attribute in the new object. | |
""" | |
attrs = {k: getattr(self, k) for k in self.__slots__} | |
for attrname in kwargs: | |
if attrname not in self.__slots__: | |
raise ValueError("invalid attribute name") | |
attrs.update(kwargs) | |
new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs) | |
new_commit.binsha = self._calculate_sha_(self.repo, new_commit) | |
return new_commit | |
def _set_cache_(self, attr: str) -> None: | |
if attr in Commit.__slots__: | |
# Read the data in a chunk, its faster - then provide a file wrapper. | |
_binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha) | |
self._deserialize(BytesIO(stream.read())) | |
else: | |
super()._set_cache_(attr) | |
# END handle attrs | |
def authored_datetime(self) -> datetime.datetime: | |
return from_timestamp(self.authored_date, self.author_tz_offset) | |
def committed_datetime(self) -> datetime.datetime: | |
return from_timestamp(self.committed_date, self.committer_tz_offset) | |
def summary(self) -> Union[str, bytes]: | |
""":return: First line of the commit message""" | |
if isinstance(self.message, str): | |
return self.message.split("\n", 1)[0] | |
else: | |
return self.message.split(b"\n", 1)[0] | |
def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int: | |
"""Count the number of commits reachable from this commit. | |
:param paths: | |
An optional path or a list of paths restricting the return value to commits | |
actually containing the paths. | |
:param kwargs: | |
Additional options to be passed to :manpage:`git-rev-list(1)`. They must not | |
alter the output style of the command, or parsing will yield incorrect | |
results. | |
:return: | |
An int defining the number of reachable commits | |
""" | |
# Yes, it makes a difference whether empty paths are given or not in our case as | |
# the empty paths version will ignore merge commits for some reason. | |
if paths: | |
return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()) | |
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines()) | |
def name_rev(self) -> str: | |
""" | |
:return: | |
String describing the commits hex sha based on the closest | |
`~git.refs.reference.Reference`. | |
:note: | |
Mostly useful for UI purposes. | |
""" | |
return self.repo.git.name_rev(self) | |
def iter_items( | |
cls, | |
repo: "Repo", | |
rev: Union[str, "Commit", "SymbolicReference"], | |
paths: Union[PathLike, Sequence[PathLike]] = "", | |
**kwargs: Any, | |
) -> Iterator["Commit"]: | |
R"""Find all commits matching the given criteria. | |
:param repo: | |
The :class:`~git.repo.base.Repo`. | |
:param rev: | |
Revision specifier. See :manpage:`git-rev-parse(1)` for viable options. | |
:param paths: | |
An optional path or list of paths. If set only :class:`Commit`\s that | |
include the path or paths will be considered. | |
:param kwargs: | |
Optional keyword arguments to :manpage:`git-rev-list(1)` where: | |
* ``max_count`` is the maximum number of commits to fetch. | |
* ``skip`` is the number of commits to skip. | |
* ``since`` selects all commits since some date, e.g. ``"1970-01-01"``. | |
:return: | |
Iterator yielding :class:`Commit` items. | |
""" | |
if "pretty" in kwargs: | |
raise ValueError("--pretty cannot be used as parsing expects single sha's only") | |
# END handle pretty | |
# Use -- in all cases, to prevent possibility of ambiguous arguments. | |
# See https://github.com/gitpython-developers/GitPython/issues/264. | |
args_list: List[PathLike] = ["--"] | |
if paths: | |
paths_tup: Tuple[PathLike, ...] | |
if isinstance(paths, (str, os.PathLike)): | |
paths_tup = (paths,) | |
else: | |
paths_tup = tuple(paths) | |
args_list.extend(paths_tup) | |
# END if paths | |
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) | |
return cls._iter_from_process_or_stream(repo, proc) | |
def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]: | |
R"""Iterate _all_ parents of this commit. | |
:param paths: | |
Optional path or list of paths limiting the :class:`Commit`\s to those that | |
contain at least one of the paths. | |
:param kwargs: | |
All arguments allowed by :manpage:`git-rev-list(1)`. | |
:return: | |
Iterator yielding :class:`Commit` objects which are parents of ``self`` | |
""" | |
# skip ourselves | |
skip = kwargs.get("skip", 1) | |
if skip == 0: # skip ourselves | |
skip = 1 | |
kwargs["skip"] = skip | |
return self.iter_items(self.repo, self, paths, **kwargs) | |
def stats(self) -> Stats: | |
"""Create a git stat from changes between this commit and its first parent | |
or from all changes done if this is the very first commit. | |
:return: | |
:class:`Stats` | |
""" | |
if not self.parents: | |
text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True) | |
text2 = "" | |
for line in text.splitlines()[1:]: | |
(insertions, deletions, filename) = line.split("\t") | |
text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) | |
text = text2 | |
else: | |
text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True) | |
return Stats._list_from_string(self.repo, text) | |
def trailers(self) -> Dict[str, str]: | |
"""Deprecated. Get the trailers of the message as a dictionary. | |
:note: | |
This property is deprecated, please use either :attr:`trailers_list` or | |
:attr:`trailers_dict`. | |
:return: | |
Dictionary containing whitespace stripped trailer information. | |
Only contains the latest instance of each trailer key. | |
""" | |
warnings.warn( | |
"Commit.trailers is deprecated, use Commit.trailers_list or Commit.trailers_dict instead", | |
DeprecationWarning, | |
stacklevel=2, | |
) | |
return {k: v[0] for k, v in self.trailers_dict.items()} | |
def trailers_list(self) -> List[Tuple[str, str]]: | |
"""Get the trailers of the message as a list. | |
Git messages can contain trailer information that are similar to :rfc:`822` | |
e-mail headers. See :manpage:`git-interpret-trailers(1)`. | |
This function calls ``git interpret-trailers --parse`` onto the message to | |
extract the trailer information, returns the raw trailer data as a list. | |
Valid message with trailer:: | |
Subject line | |
some body information | |
another information | |
key1: value1.1 | |
key1: value1.2 | |
key2 : value 2 with inner spaces | |
Returned list will look like this:: | |
[ | |
("key1", "value1.1"), | |
("key1", "value1.2"), | |
("key2", "value 2 with inner spaces"), | |
] | |
:return: | |
List containing key-value tuples of whitespace stripped trailer information. | |
""" | |
cmd = ["git", "interpret-trailers", "--parse"] | |
proc: Git.AutoInterrupt = self.repo.git.execute( # type: ignore[call-overload] | |
cmd, | |
as_process=True, | |
istream=PIPE, | |
) | |
trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8") | |
trailer = trailer.strip() | |
if not trailer: | |
return [] | |
trailer_list = [] | |
for t in trailer.split("\n"): | |
key, val = t.split(":", 1) | |
trailer_list.append((key.strip(), val.strip())) | |
return trailer_list | |
def trailers_dict(self) -> Dict[str, List[str]]: | |
"""Get the trailers of the message as a dictionary. | |
Git messages can contain trailer information that are similar to :rfc:`822` | |
e-mail headers. See :manpage:`git-interpret-trailers(1)`. | |
This function calls ``git interpret-trailers --parse`` onto the message to | |
extract the trailer information. The key value pairs are stripped of leading and | |
trailing whitespaces before they get saved into a dictionary. | |
Valid message with trailer:: | |
Subject line | |
some body information | |
another information | |
key1: value1.1 | |
key1: value1.2 | |
key2 : value 2 with inner spaces | |
Returned dictionary will look like this:: | |
{ | |
"key1": ["value1.1", "value1.2"], | |
"key2": ["value 2 with inner spaces"], | |
} | |
:return: | |
Dictionary containing whitespace stripped trailer information, mapping | |
trailer keys to a list of their corresponding values. | |
""" | |
d = defaultdict(list) | |
for key, val in self.trailers_list: | |
d[key].append(val) | |
return dict(d) | |
def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]: | |
"""Parse out commit information into a list of :class:`Commit` objects. | |
We expect one line per commit, and parse the actual commit information directly | |
from our lighting fast object database. | |
:param proc: | |
:manpage:`git-rev-list(1)` process instance - one sha per line. | |
:return: | |
Iterator supplying :class:`Commit` objects | |
""" | |
# def is_proc(inp) -> TypeGuard[Popen]: | |
# return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline') | |
# def is_stream(inp) -> TypeGuard[IO]: | |
# return hasattr(proc_or_stream, 'readline') | |
if hasattr(proc_or_stream, "wait"): | |
proc_or_stream = cast(Popen, proc_or_stream) | |
if proc_or_stream.stdout is not None: | |
stream = proc_or_stream.stdout | |
elif hasattr(proc_or_stream, "readline"): | |
proc_or_stream = cast(IO, proc_or_stream) # type: ignore[redundant-cast] | |
stream = proc_or_stream | |
readline = stream.readline | |
while True: | |
line = readline() | |
if not line: | |
break | |
hexsha = line.strip() | |
if len(hexsha) > 40: | |
# Split additional information, as returned by bisect for instance. | |
hexsha, _ = line.split(None, 1) | |
# END handle extra info | |
assert len(hexsha) == 40, "Invalid line: %s" % hexsha | |
yield cls(repo, hex_to_bin(hexsha)) | |
# END for each line in stream | |
# TODO: Review this - it seems process handling got a bit out of control due to | |
# many developers trying to fix the open file handles issue. | |
if hasattr(proc_or_stream, "wait"): | |
proc_or_stream = cast(Popen, proc_or_stream) | |
finalize_process(proc_or_stream) | |
def create_from_tree( | |
cls, | |
repo: "Repo", | |
tree: Union[Tree, str], | |
message: str, | |
parent_commits: Union[None, List["Commit"]] = None, | |
head: bool = False, | |
author: Union[None, Actor] = None, | |
committer: Union[None, Actor] = None, | |
author_date: Union[None, str, datetime.datetime] = None, | |
commit_date: Union[None, str, datetime.datetime] = None, | |
) -> "Commit": | |
"""Commit the given tree, creating a :class:`Commit` object. | |
:param repo: | |
:class:`~git.repo.base.Repo` object the commit should be part of. | |
:param tree: | |
:class:`~git.objects.tree.Tree` object or hex or bin sha. | |
The tree of the new commit. | |
:param message: | |
Commit message. It may be an empty string if no message is provided. It will | |
be converted to a string, in any case. | |
:param parent_commits: | |
Optional :class:`Commit` objects to use as parents for the new commit. If | |
empty list, the commit will have no parents at all and become a root commit. | |
If ``None``, the current head commit will be the parent of the new commit | |
object. | |
:param head: | |
If ``True``, the HEAD will be advanced to the new commit automatically. | |
Otherwise the HEAD will remain pointing on the previous commit. This could | |
lead to undesired results when diffing files. | |
:param author: | |
The name of the author, optional. | |
If unset, the repository configuration is used to obtain this value. | |
:param committer: | |
The name of the committer, optional. | |
If unset, the repository configuration is used to obtain this value. | |
:param author_date: | |
The timestamp for the author field. | |
:param commit_date: | |
The timestamp for the committer field. | |
:return: | |
:class:`Commit` object representing the new commit. | |
:note: | |
Additional information about the committer and author are taken from the | |
environment or from the git configuration. See :manpage:`git-commit-tree(1)` | |
for more information. | |
""" | |
if parent_commits is None: | |
try: | |
parent_commits = [repo.head.commit] | |
except ValueError: | |
# Empty repositories have no head commit. | |
parent_commits = [] | |
# END handle parent commits | |
else: | |
for p in parent_commits: | |
if not isinstance(p, cls): | |
raise ValueError(f"Parent commit '{p!r}' must be of type {cls}") | |
# END check parent commit types | |
# END if parent commits are unset | |
# Retrieve all additional information, create a commit object, and serialize it. | |
# Generally: | |
# * Environment variables override configuration values. | |
# * Sensible defaults are set according to the git documentation. | |
# COMMITTER AND AUTHOR INFO | |
cr = repo.config_reader() | |
env = os.environ | |
committer = committer or Actor.committer(cr) | |
author = author or Actor.author(cr) | |
# PARSE THE DATES | |
unix_time = int(time()) | |
is_dst = daylight and localtime().tm_isdst > 0 | |
offset = altzone if is_dst else timezone | |
author_date_str = env.get(cls.env_author_date, "") | |
if author_date: | |
author_time, author_offset = parse_date(author_date) | |
elif author_date_str: | |
author_time, author_offset = parse_date(author_date_str) | |
else: | |
author_time, author_offset = unix_time, offset | |
# END set author time | |
committer_date_str = env.get(cls.env_committer_date, "") | |
if commit_date: | |
committer_time, committer_offset = parse_date(commit_date) | |
elif committer_date_str: | |
committer_time, committer_offset = parse_date(committer_date_str) | |
else: | |
committer_time, committer_offset = unix_time, offset | |
# END set committer time | |
# Assume UTF-8 encoding. | |
enc_section, enc_option = cls.conf_encoding.split(".") | |
conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding) | |
if not isinstance(conf_encoding, str): | |
raise TypeError("conf_encoding could not be coerced to str") | |
# If the tree is no object, make sure we create one - otherwise the created | |
# commit object is invalid. | |
if isinstance(tree, str): | |
tree = repo.tree(tree) | |
# END tree conversion | |
# CREATE NEW COMMIT | |
new_commit = cls( | |
repo, | |
cls.NULL_BIN_SHA, | |
tree, | |
author, | |
author_time, | |
author_offset, | |
committer, | |
committer_time, | |
committer_offset, | |
message, | |
parent_commits, | |
conf_encoding, | |
) | |
new_commit.binsha = cls._calculate_sha_(repo, new_commit) | |
if head: | |
# Need late import here, importing git at the very beginning throws as | |
# well... | |
import git.refs | |
try: | |
repo.head.set_commit(new_commit, logmsg=message) | |
except ValueError: | |
# head is not yet set to the ref our HEAD points to. | |
# Happens on first commit. | |
master = git.refs.Head.create( | |
repo, | |
repo.head.ref, | |
new_commit, | |
logmsg="commit (initial): %s" % message, | |
) | |
repo.head.set_reference(master, logmsg="commit: Switching to %s" % master) | |
# END handle empty repositories | |
# END advance head handling | |
return new_commit | |
# { Serializable Implementation | |
def _serialize(self, stream: BytesIO) -> "Commit": | |
write = stream.write | |
write(("tree %s\n" % self.tree).encode("ascii")) | |
for p in self.parents: | |
write(("parent %s\n" % p).encode("ascii")) | |
a = self.author | |
aname = a.name | |
c = self.committer | |
fmt = "%s %s <%s> %s %s\n" | |
write( | |
( | |
fmt | |
% ( | |
"author", | |
aname, | |
a.email, | |
self.authored_date, | |
altz_to_utctz_str(self.author_tz_offset), | |
) | |
).encode(self.encoding) | |
) | |
# Encode committer. | |
aname = c.name | |
write( | |
( | |
fmt | |
% ( | |
"committer", | |
aname, | |
c.email, | |
self.committed_date, | |
altz_to_utctz_str(self.committer_tz_offset), | |
) | |
).encode(self.encoding) | |
) | |
if self.encoding != self.default_encoding: | |
write(("encoding %s\n" % self.encoding).encode("ascii")) | |
try: | |
if self.__getattribute__("gpgsig"): | |
write(b"gpgsig") | |
for sigline in self.gpgsig.rstrip("\n").split("\n"): | |
write((" " + sigline + "\n").encode("ascii")) | |
except AttributeError: | |
pass | |
write(b"\n") | |
# Write plain bytes, be sure its encoded according to our encoding. | |
if isinstance(self.message, str): | |
write(self.message.encode(self.encoding)) | |
else: | |
write(self.message) | |
# END handle encoding | |
return self | |
def _deserialize(self, stream: BytesIO) -> "Commit": | |
readline = stream.readline | |
self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "") | |
self.parents = [] | |
next_line = None | |
while True: | |
parent_line = readline() | |
if not parent_line.startswith(b"parent"): | |
next_line = parent_line | |
break | |
# END abort reading parents | |
self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii")))) | |
# END for each parent line | |
self.parents = tuple(self.parents) | |
# We don't know actual author encoding before we have parsed it, so keep the | |
# lines around. | |
author_line = next_line | |
committer_line = readline() | |
# We might run into one or more mergetag blocks, skip those for now. | |
next_line = readline() | |
while next_line.startswith(b"mergetag "): | |
next_line = readline() | |
while next_line.startswith(b" "): | |
next_line = readline() | |
# END skip mergetags | |
# Now we can have the encoding line, or an empty line followed by the optional | |
# message. | |
self.encoding = self.default_encoding | |
self.gpgsig = "" | |
# Read headers. | |
enc = next_line | |
buf = enc.strip() | |
while buf: | |
if buf[0:10] == b"encoding ": | |
self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore") | |
elif buf[0:7] == b"gpgsig ": | |
sig = buf[buf.find(b" ") + 1 :] + b"\n" | |
is_next_header = False | |
while True: | |
sigbuf = readline() | |
if not sigbuf: | |
break | |
if sigbuf[0:1] != b" ": | |
buf = sigbuf.strip() | |
is_next_header = True | |
break | |
sig += sigbuf[1:] | |
# END read all signature | |
self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore") | |
if is_next_header: | |
continue | |
buf = readline().strip() | |
# Decode the author's name. | |
try: | |
( | |
self.author, | |
self.authored_date, | |
self.author_tz_offset, | |
) = parse_actor_and_date(author_line.decode(self.encoding, "replace")) | |
except UnicodeDecodeError: | |
_logger.error( | |
"Failed to decode author line '%s' using encoding %s", | |
author_line, | |
self.encoding, | |
exc_info=True, | |
) | |
try: | |
( | |
self.committer, | |
self.committed_date, | |
self.committer_tz_offset, | |
) = parse_actor_and_date(committer_line.decode(self.encoding, "replace")) | |
except UnicodeDecodeError: | |
_logger.error( | |
"Failed to decode committer line '%s' using encoding %s", | |
committer_line, | |
self.encoding, | |
exc_info=True, | |
) | |
# END handle author's encoding | |
# A stream from our data simply gives us the plain message. | |
# The end of our message stream is marked with a newline that we strip. | |
self.message = stream.read() | |
try: | |
self.message = self.message.decode(self.encoding, "replace") | |
except UnicodeDecodeError: | |
_logger.error( | |
"Failed to decode message '%s' using encoding %s", | |
self.message, | |
self.encoding, | |
exc_info=True, | |
) | |
# END exception handling | |
return self | |
# } END serializable implementation | |
def co_authors(self) -> List[Actor]: | |
"""Search the commit message for any co-authors of this commit. | |
Details on co-authors: | |
https://github.blog/2018-01-29-commit-together-with-co-authors/ | |
:return: | |
List of co-authors for this commit (as :class:`~git.util.Actor` objects). | |
""" | |
co_authors = [] | |
if self.message: | |
results = re.findall( | |
r"^Co-authored-by: (.*) <(.*?)>$", | |
self.message, | |
re.MULTILINE, | |
) | |
for author in results: | |
co_authors.append(Actor(*author)) | |
return co_authors | |