major update to the git server

This commit is contained in:
David Soria Parra
2024-11-20 12:03:38 +00:00
parent 56d2d1b1b4
commit bf3f3fdb0e
5 changed files with 197 additions and 48 deletions

View File

@@ -3,7 +3,7 @@ name = "mcp-git"
version = "0.1.0"
description = "A Model Context Protocol server providing tools to read, search, and manipulate Git repositories programmatically via LLMs"
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.10"
authors = [{ name = "Anthropic, PBC." }]
maintainers = [{ name = "David Soria Parra", email = "davidsp@anthropic.com" }]
keywords = ["git", "mcp", "llm", "automation"]
@@ -13,7 +13,7 @@ classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.10",
]
dependencies = [
"click>=8.1.7",
@@ -30,5 +30,4 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.uv]
index-strategy = "unsafe-best-match"
dev-dependencies = ["ruff>=0.7.3"]

View File

@@ -1,4 +1,6 @@
import logging
import json
import sys
import click
import anyio
import anyio.lowlevel
@@ -7,7 +9,15 @@ from git.types import Sequence
from mcp.server import Server
from mcp.server.session import ServerSession
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool, EmbeddedResource, ImageContent, ListRootsResult
from mcp.types import (
ClientCapabilities,
TextContent,
Tool,
EmbeddedResource,
ImageContent,
ListRootsResult,
RootsCapability,
)
from enum import StrEnum
import git
from git.objects import Blob, Tree
@@ -66,6 +76,20 @@ class ListReposInput(BaseModel):
pass
class GitLogInput(BaseModel):
repo_path: str
max_count: int = 10
ref: str = "HEAD"
class ListBranchesInput(BaseModel):
repo_path: str
class ListTagsInput(BaseModel):
repo_path: str
class GitTools(StrEnum):
READ_FILE = "git_read_file"
LIST_FILES = "git_list_files"
@@ -75,12 +99,19 @@ class GitTools(StrEnum):
GET_DIFF = "git_get_diff"
GET_REPO_STRUCTURE = "git_get_repo_structure"
LIST_REPOS = "git_list_repos"
GIT_LOG = "git_log"
LIST_BRANCHES = "git_list_branches"
LIST_TAGS = "git_list_tags"
def git_read_file(repo: git.Repo, file_path: str, ref: str = "HEAD") -> str:
tree = repo.commit(ref).tree
blob = tree / file_path
return blob.data_stream.read().decode("utf-8", errors="replace")
try:
return blob.data_stream.read().decode("utf-8", errors="replace")
except UnicodeDecodeError:
# If it's a binary file, return a message indicating that
return "[Binary file content not shown]"
def git_list_files(repo: git.Repo, path: str = "", ref: str = "HEAD") -> Sequence[str]:
@@ -122,10 +153,14 @@ def git_search_code(
tree = repo.commit(ref).tree
for blob in tree.traverse():
if isinstance(blob, Blob) and Path(blob.path).match(file_pattern):
content = blob.data_stream.read().decode("utf-8")
for i, line in enumerate(content.splitlines()):
if query in line:
results.append(f"{blob.path}:{i+1}: {line}")
try:
content = blob.data_stream.read().decode("utf-8", errors="replace")
for i, line in enumerate(content.splitlines()):
if query in line:
results.append(f"{blob.path}:{i+1}: {line}")
except UnicodeDecodeError:
# Skip binary files
continue
return results
@@ -153,14 +188,35 @@ def git_get_repo_structure(repo: git.Repo, ref: str = "HEAD") -> str:
return str(structure)
def git_log(repo: git.Repo, max_count: int = 10, ref: str = "HEAD") -> list[str]:
commits = list(repo.iter_commits(ref, max_count=max_count))
log = []
for commit in commits:
log.append(
f"Commit: {commit.hexsha}\n"
f"Author: {commit.author}\n"
f"Date: {commit.authored_datetime}\n"
f"Message: {commit.message}\n"
)
return log
def git_list_branches(repo: git.Repo) -> list[str]:
return [str(branch) for branch in repo.branches]
def git_list_tags(repo: git.Repo) -> list[str]:
return [str(tag) for tag in repo.tags]
async def serve(repository: Path | None) -> None:
# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
if repository is not None:
try:
git.Repo(repository)
logger.info(f"Using repository at {repository}")
except git.InvalidGitRepositoryError:
logger.error(f"{repository} is not a valid Git repository")
return
@@ -234,6 +290,28 @@ async def serve(repository: Path | None) -> None:
"accessible to the current session.",
inputSchema=ListReposInput.schema(),
),
Tool(
name=GitTools.GIT_LOG,
description="Retrieves the commit log for the repository, showing the "
"history of commits including commit hashes, authors, dates, and "
"commit messages. This tool provides an overview of the project's "
"development history.",
inputSchema=GitLogInput.schema(),
),
Tool(
name=GitTools.LIST_BRANCHES,
description="Lists all branches in the Git repository. This tool "
"provides an overview of the different lines of development in the "
"project.",
inputSchema=ListBranchesInput.schema(),
),
Tool(
name=GitTools.LIST_TAGS,
description="Lists all tags in the Git repository. This tool "
"provides an overview of the tagged versions or releases in the "
"project.",
inputSchema=ListTagsInput.schema(),
),
]
async def list_repos() -> Sequence[str]:
@@ -243,7 +321,14 @@ async def serve(repository: Path | None) -> None:
"server.request_context.session must be a ServerSession"
)
roots_result: ListRootsResult = await server.request_context.session.list_roots()
if not server.request_context.session.check_client_capability(
ClientCapabilities(roots=RootsCapability())
):
return []
roots_result: ListRootsResult = (
await server.request_context.session.list_roots()
)
logger.debug(f"Roots result: {roots_result}")
repo_paths = []
for root in roots_result.roots:
@@ -269,71 +354,123 @@ async def serve(repository: Path | None) -> None:
) -> list[TextContent | ImageContent | EmbeddedResource]:
if name == GitTools.LIST_REPOS:
result = await list_repos()
return [TextContent(type="text", text=str(r)) for r in result]
logging.debug(f"repos={result}")
return [
TextContent(
type="text",
text=f"Here is some JSON that contains a list of git repositories: {json.dumps(result)}",
)
]
repo_path = Path(arguments["repo_path"])
repo = git.Repo(repo_path)
match name:
case GitTools.READ_FILE:
content = git_read_file(
repo, arguments["file_path"], arguments.get("ref", "HEAD")
)
return [
TextContent(
type="text",
text=git_read_file(
repo, arguments["file_path"], arguments.get("ref", "HEAD")
)
text=f"Here is some JSON that contains the contents of a file: {json.dumps({'content': content})}",
)
]
case GitTools.LIST_FILES:
files = git_list_files(
repo, arguments.get("path", ""), arguments.get("ref", "HEAD")
)
return [
TextContent(type="text", text=str(f))
for f in git_list_files(
repo, arguments.get("path", ""), arguments.get("ref", "HEAD")
TextContent(
type="text",
text=f"Here is some JSON that contains a list of files: {json.dumps({'files': list(files)})}",
)
]
case GitTools.FILE_HISTORY:
history = git_file_history(
repo, arguments["file_path"], arguments.get("max_entries", 10)
)
return [
TextContent(type="text", text=entry)
for entry in git_file_history(
repo, arguments["file_path"], arguments.get("max_entries", 10)
TextContent(
type="text",
text=f"Here is some JSON that contains a file's history: {json.dumps({'history': list(history)})}",
)
]
case GitTools.COMMIT:
result = git_commit(repo, arguments["message"], arguments.get("files"))
return [TextContent(type="text", text=result)]
return [
TextContent(
type="text",
text=f"Here is some JSON that contains the commit result: {json.dumps({'result': result})}",
)
]
case GitTools.SEARCH_CODE:
results = git_search_code(
repo,
arguments["query"],
arguments.get("file_pattern", "*"),
arguments.get("ref", "HEAD"),
)
return [
TextContent(type="text", text=result)
for result in git_search_code(
repo,
arguments["query"],
arguments.get("file_pattern", "*"),
arguments.get("ref", "HEAD"),
TextContent(
type="text",
text=f"Here is some JSON that contains code search matches: {json.dumps({'matches': results})}",
)
]
case GitTools.GET_DIFF:
diff = git_get_diff(
repo,
arguments["ref1"],
arguments["ref2"],
arguments.get("file_path"),
)
return [
TextContent(
type="text",
text=git_get_diff(
repo,
arguments["ref1"],
arguments["ref2"],
arguments.get("file_path"),
)
text=f"Here is some JSON that contains a diff: {json.dumps({'diff': diff})}",
)
]
case GitTools.GET_REPO_STRUCTURE:
structure = git_get_repo_structure(repo, arguments.get("ref", "HEAD"))
return [
TextContent(
type="text",
text=git_get_repo_structure(repo, arguments.get("ref", "HEAD"))
text=f"Here is some JSON that contains the repository structure: {json.dumps({'structure': structure})}",
)
]
case GitTools.GIT_LOG:
log = git_log(
repo, arguments.get("max_count", 10), arguments.get("ref", "HEAD")
)
return [
TextContent(
type="text",
text=f"Here is some JSON that contains the git log: {json.dumps({'log': log})}",
)
]
case GitTools.LIST_BRANCHES:
branches = git_list_branches(repo)
return [
TextContent(
type="text",
text=f"Here is some JSON that contains a list of branches: {json.dumps({'branches': branches})}",
)
]
case GitTools.LIST_TAGS:
tags = git_list_tags(repo)
return [
TextContent(
type="text",
text=f"Here is some JSON that contains a list of tags: {json.dumps({'tags': tags})}",
)
]
@@ -348,7 +485,14 @@ async def serve(repository: Path | None) -> None:
@click.command()
@click.option("-r", "--repository", type=click.Path(path_type=Path, dir_okay=True))
def main(repository: Path | None):
@click.option("-v", "--verbose", count=True)
def main(repository: Path | None, verbose: int):
logging_level = logging.WARN
if verbose == 1:
logging_level = logging.INFO
elif verbose >= 2:
logging_level = logging.DEBUG
logging.basicConfig(level=logging_level, stream=sys.stderr)
anyio.run(serve, repository)

6
src/git/uv.lock generated
View File

@@ -139,7 +139,7 @@ wheels = [
[[package]]
name = "mcp"
version = "0.9.0"
version = "0.9.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@@ -149,9 +149,9 @@ dependencies = [
{ name = "sse-starlette" },
{ name = "starlette" },
]
sdist = { url = "https://files.pythonhosted.org/packages/cd/bb/fd56a5c331a6c95a4f2ec907683db3382d30b99b808ef6f46fa4f08a4b74/mcp-0.9.0.tar.gz", hash = "sha256:1d7e3f8d78bf5b37c98a233fce8cebbb86c57d8964d2c3b03cf08cdebd103d9a", size = 78343 }
sdist = { url = "https://files.pythonhosted.org/packages/e7/1c/932818470ffd49c33509110c835101a8dc4c9cdd06028b9f647fb3dde237/mcp-0.9.1.tar.gz", hash = "sha256:e8509a37c2ab546095788ed170e0fb4d7ce0cf5a3ee56b6449c78af27321a425", size = 78218 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/07/077116e6a23dd0546391f5caa81b4f52938d8a81f2449c55c0b50c0215bf/mcp-0.9.0-py3-none-any.whl", hash = "sha256:e09aca08eadaf0552541aaa71271b44f99a6a5d16e5b1b03c421366f72b51753", size = 31691 },
{ url = "https://files.pythonhosted.org/packages/b3/a0/2ee813d456b57a726d583868417d1ad900fbe12ee3c8cd866e3e804ca486/mcp-0.9.1-py3-none-any.whl", hash = "sha256:7f640fcfb0be486aa510594df309920ae1d375cdca1f8aff21db3a96d837f303", size = 31562 },
]
[[package]]

View File

@@ -1,9 +1,11 @@
from . import server
import asyncio
def main():
"""Main entry point for the package."""
asyncio.run(server.main())
# Optionally expose other important items at package level
__all__ = ['main', 'server']
__all__ = ["main", "server"]

View File

@@ -28,7 +28,7 @@ class McpServer(Server):
if cursor.fetchone()[0] == 0:
cursor.execute(
"INSERT INTO notes (name, content) VALUES (?, ?)",
("example", "This is an example note.")
("example", "This is an example note."),
)
conn.commit()
@@ -55,13 +55,13 @@ class McpServer(Server):
with closing(conn.cursor()) as cursor:
cursor.execute(
"INSERT OR REPLACE INTO notes (name, content) VALUES (?, ?)",
(name, content)
(name, content),
)
conn.commit()
def __init__(self):
super().__init__("sqlite")
# Initialize SQLite database
self.db_path = "notes.db"
self._init_database()
@@ -118,10 +118,14 @@ class McpServer(Server):
"""Generate a prompt using notes from the database"""
if name != "summarize-notes":
raise ValueError(f"Unknown prompt: {name}")
notes = "<notes>\n" + "\n".join(
f"<note name='{name}'>\n{content}\n</note>"
for name, content in self._get_notes().items()
) + "\n</notes>"
notes = (
"<notes>\n"
+ "\n".join(
f"<note name='{name}'>\n{content}\n</note>"
for name, content in self._get_notes().items()
)
+ "\n</notes>"
)
style = (arguments or {}).get("style", "simple")
prompt = """
Your task is to provide a summary of the notes provided below.
@@ -207,4 +211,4 @@ async def main():
experimental_capabilities={},
),
),
)
)