mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
260 lines
8.7 KiB
Python
260 lines
8.7 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any, Optional
|
|
from contextlib import asynccontextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class JsonRpcError(Exception):
|
|
"""Error returned by the RPC server."""
|
|
def __init__(self, code: int, message: str, data: Any = None):
|
|
self.code = code
|
|
self.message = message
|
|
self.data = data
|
|
super().__init__(f"RPC Error {code}: {message}")
|
|
|
|
|
|
class RpcUnavailableError(Exception):
|
|
"""Raised when RPC endpoint is not reachable."""
|
|
pass
|
|
|
|
|
|
class JsonRpcConnection:
|
|
"""Single JSON-RPC connection over TCP."""
|
|
|
|
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
|
|
self.reader = reader
|
|
self.writer = writer
|
|
self.request_id = 0
|
|
self._lock = asyncio.Lock()
|
|
self._closed = False
|
|
|
|
@property
|
|
def is_closed(self) -> bool:
|
|
return self._closed or self.writer.is_closing()
|
|
|
|
async def call(self, method: str, params: Any = None, timeout: float = 30.0) -> Any:
|
|
if self.is_closed:
|
|
raise ConnectionError("Connection is closed")
|
|
|
|
async with self._lock:
|
|
self.request_id += 1
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"method": method,
|
|
"id": self.request_id,
|
|
"params": params,
|
|
}
|
|
|
|
message = json.dumps(request) + "\n"
|
|
|
|
try:
|
|
self.writer.write(message.encode("utf-8"))
|
|
await self.writer.drain()
|
|
|
|
response_line = await asyncio.wait_for(
|
|
self.reader.readline(),
|
|
timeout=timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
self._closed = True
|
|
raise TimeoutError(f"RPC call '{method}' timed out after {timeout}s")
|
|
except (ConnectionError, OSError) as e:
|
|
self._closed = True
|
|
raise ConnectionError(f"Connection lost: {e}")
|
|
|
|
if not response_line:
|
|
self._closed = True
|
|
raise ConnectionError("Connection closed by server")
|
|
|
|
response = json.loads(response_line.decode("utf-8"))
|
|
|
|
if "error" in response and response["error"]:
|
|
err = response["error"]
|
|
raise JsonRpcError(
|
|
err.get("code", -1),
|
|
err.get("message", "Unknown error"),
|
|
err.get("data")
|
|
)
|
|
|
|
return response.get("result")
|
|
|
|
async def close(self):
|
|
if not self._closed:
|
|
self._closed = True
|
|
self.writer.close()
|
|
try:
|
|
await self.writer.wait_closed()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class JsonRpcPool:
|
|
"""
|
|
Connection pool with background reconnection.
|
|
|
|
- If RPC is unavailable, immediately returns error (no waiting)
|
|
- Background task keeps trying to reconnect every N seconds
|
|
- Once connected, requests work again
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
min_connections: int = 5,
|
|
max_connections: int = 20,
|
|
reconnect_interval: float = 5.0,
|
|
connect_timeout: float = 5.0,
|
|
):
|
|
self.host = host
|
|
self.port = port
|
|
self.min_connections = min_connections
|
|
self.max_connections = max_connections
|
|
self.reconnect_interval = reconnect_interval
|
|
self.connect_timeout = connect_timeout
|
|
|
|
self._pool: asyncio.Queue[JsonRpcConnection] = None
|
|
self._semaphore: asyncio.Semaphore = None
|
|
self._connection_count = 0
|
|
self._lock = asyncio.Lock()
|
|
self._closed = False
|
|
self._available = False
|
|
self._reconnect_task: Optional[asyncio.Task] = None
|
|
|
|
async def start(self):
|
|
"""Initialize the pool"""
|
|
self._pool = asyncio.Queue()
|
|
self._semaphore = asyncio.Semaphore(self.max_connections)
|
|
self._connection_count = 0
|
|
self._closed = False
|
|
|
|
# Try to create initial connections
|
|
success_count = 0
|
|
for _ in range(self.min_connections):
|
|
conn = await self._create_connection()
|
|
if conn:
|
|
await self._pool.put(conn)
|
|
success_count += 1
|
|
|
|
if success_count > 0:
|
|
self._available = True
|
|
logger.info(f"RPC pool started with {success_count} connections")
|
|
else:
|
|
self._available = False
|
|
logger.warning(f"RPC {self.host}:{self.port} unavailable, will retry in background")
|
|
self._start_reconnect_task()
|
|
|
|
def _start_reconnect_task(self):
|
|
"""Start background reconnection task if not already running."""
|
|
if self._reconnect_task is None or self._reconnect_task.done():
|
|
self._reconnect_task = asyncio.create_task(self._reconnect_loop())
|
|
|
|
async def _reconnect_loop(self):
|
|
"""Background task that keeps trying to reconnect."""
|
|
while not self._closed and not self._available:
|
|
await asyncio.sleep(self.reconnect_interval)
|
|
|
|
if self._closed:
|
|
break
|
|
|
|
conn = await self._create_connection()
|
|
if conn:
|
|
await self._pool.put(conn)
|
|
self._available = True
|
|
logger.info(f"RPC {self.host}:{self.port} reconnected")
|
|
break
|
|
else:
|
|
logger.debug(f"RPC {self.host}:{self.port} still unavailable, retrying...")
|
|
|
|
async def _create_connection(self) -> Optional[JsonRpcConnection]:
|
|
"""Create a new connection. Returns None if connection fails."""
|
|
try:
|
|
reader, writer = await asyncio.wait_for(
|
|
asyncio.open_connection(self.host, self.port, limit=16*1024*1024),
|
|
timeout=self.connect_timeout
|
|
)
|
|
async with self._lock:
|
|
self._connection_count += 1
|
|
return JsonRpcConnection(reader, writer)
|
|
except (asyncio.TimeoutError, OSError) as e:
|
|
logger.debug(f"Connection failed: {e}")
|
|
return None
|
|
|
|
async def _destroy_connection(self, conn: JsonRpcConnection):
|
|
"""Close and clean up a connection"""
|
|
await conn.close()
|
|
async with self._lock:
|
|
self._connection_count = max(0, self._connection_count - 1)
|
|
|
|
async def call(self, method: str, params: Any = None, timeout: float = 30.0) -> Any:
|
|
"""Make an RPC call. Raises RpcUnavailableError immediately if not connected."""
|
|
if self._closed:
|
|
raise RuntimeError("Pool's closed")
|
|
|
|
if not self._available:
|
|
raise RpcUnavailableError(f"RPC {self.host}:{self.port} is unavailable")
|
|
|
|
async with self._semaphore:
|
|
# Get or create connection
|
|
conn = None
|
|
while not self._pool.empty():
|
|
try:
|
|
conn = self._pool.get_nowait()
|
|
if not conn.is_closed:
|
|
break
|
|
await self._destroy_connection(conn)
|
|
conn = None
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
if conn is None:
|
|
conn = await self._create_connection()
|
|
if conn is None:
|
|
self._available = False
|
|
self._start_reconnect_task()
|
|
raise RpcUnavailableError(f"RPC {self.host}:{self.port} is unavailable")
|
|
|
|
# Make the call
|
|
try:
|
|
result = await conn.call(method, params, timeout)
|
|
await self._pool.put(conn)
|
|
return result
|
|
except JsonRpcError:
|
|
# Server error - connection is still good
|
|
await self._pool.put(conn)
|
|
raise
|
|
except (ConnectionError, TimeoutError) as e:
|
|
# Connection failed
|
|
await self._destroy_connection(conn)
|
|
self._available = False
|
|
self._start_reconnect_task()
|
|
raise RpcUnavailableError(f"RPC {self.host}:{self.port} is unavailable: {e}")
|
|
|
|
@property
|
|
def is_available(self) -> bool:
|
|
"""Check if RPC is currently available."""
|
|
return self._available
|
|
|
|
async def close(self):
|
|
"""Close all connections"""
|
|
self._closed = True
|
|
|
|
if self._reconnect_task and not self._reconnect_task.done():
|
|
self._reconnect_task.cancel()
|
|
try:
|
|
await self._reconnect_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
while not self._pool.empty():
|
|
try:
|
|
conn = self._pool.get_nowait()
|
|
await self._destroy_connection(conn)
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
logger.info("RPC pool closed")
|