#!/usr/bin/env python3 # This file is part of DarkFi (https://dark.fi) # # Copyright (C) 2020-2026 Dyne.org foundation # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import asyncio from datetime import datetime, timezone from quart import Quart, render_template, abort, request, redirect, url_for, Response from rpc_client import JsonRpcPool, JsonRpcError, RpcUnavailableError app = Quart(__name__) app.config.update( RPC_HOST="127.0.0.1", RPC_PORT="22222", RPC_MIN_CONNECTIONS=5, RPC_MAX_CONNECTIONS=50, RPC_RECONNECT_INTERVAL=5.0, NETWORK="Testnet", ) # Global pool rpc: JsonRpcPool = None @app.before_serving async def startup(): global rpc rpc = JsonRpcPool( host=app.config["RPC_HOST"], port=app.config["RPC_PORT"], min_connections=app.config["RPC_MIN_CONNECTIONS"], max_connections=app.config["RPC_MAX_CONNECTIONS"], reconnect_interval=app.config["RPC_RECONNECT_INTERVAL"], ) await rpc.start() app.logger.info(f"RPC pool initialized for {app.config['RPC_HOST']}:{app.config['RPC_PORT']}") @app.after_serving async def shutdown(): await rpc.close() app.logger.info("RPC pool closed") @app.errorhandler(JsonRpcError) async def handle_rpc_error(error: JsonRpcError): app.logger.error(f"RPC Error: {error.code} - {error.message}") if error.code == -32600: return await render_template( "error.html", network=app.config["NETWORK"], error_code="404", error="The requested resource was not found" ), 404 return await render_template( "error.html", network=app.config["NETWORK"], error_code="500", error=error.message ), 500 @app.errorhandler(RpcUnavailableError) async def handle_rpc_unavailable(error: RpcUnavailableError): app.logger.error(f"RPC Unavailable: {error}") return await render_template( "error.html", network=app.config["NETWORK"], error_code="503", error="Blockchain node is currently unavailable. Please try again later." ), 503 @app.errorhandler(ConnectionError) async def handle_connection_error(error): app.logger.error(f"Connection Error: {error}") return await render_template( "error.html", network=app.config["NETWORK"], error_code="503", error="Service temporarily unavailable" ), 503 @app.errorhandler(404) async def handle_not_found(error): return await render_template( "error.html", network=app.config["NETWORK"], error_code="404", error="Page not found" ), 404 def format_hashrate(hashrate: float) -> str: """Format hashrate with appropriate unit.""" if hashrate >= 1e12: return f"{hashrate / 1e12:.2f} TH/s" elif hashrate >= 1e9: return f"{hashrate / 1e9:.2f} GH/s" elif hashrate >= 1e6: return f"{hashrate / 1e6:.2f} MH/s" elif hashrate >= 1e3: return f"{hashrate / 1e3:.2f} KH/s" else: return f"{hashrate:.2f} H/s" def format_bytes(size: int) -> str: """Format byte size with appropriate unit.""" if size >= 1024 * 1024: return f"{size / (1024 * 1024):.2f} MB" elif size >= 1024: return f"{size / 1024:.2f} KB" else: return f"{size} bytes" @app.route("/") async def index(): current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) latest_blocks = await rpc.call("latest_blocks", params=[20]) hashrate = await rpc.call("get_hashrate", params=[]) for block in latest_blocks: dt = datetime.fromtimestamp(block["timestamp"], tz=timezone.utc) block["timestamp"] = dt.strftime("%H:%M UTC %d %b %Y") return await render_template( "index.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), latest_blocks=latest_blocks, ) @app.route("/blocks") async def list_blocks(): """List all blocks with pagination""" BLOCKS_PER_PAGE = 100 # Get page number from query param (1-indexed for users) page = request.args.get("page", 1, type=int) if page < 1: page = 1 current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) total = current_height + 1 # blocks 0 to current_height # Calculate range: newest first offset = (page - 1) * BLOCKS_PER_PAGE start_height = current_height - offset end_height = max(0, start_height - BLOCKS_PER_PAGE + 1) # Build list of heights to fetch heights = [h for h in range(start_height, end_height - 1, -1) if h >= 0] async def fetch_block(height): try: block = await rpc.call("get_block", params=[height]) dt = datetime.fromtimestamp(block["timestamp"], tz=timezone.utc) return { "height": int(block["height"]), "size": int(block["size"]), "n_txs": len(block["txs"]), "timestamp": dt.strftime("%H:%M UTC %d %b %Y"), "powtype": block["powtype"], "hash": block["hash"], } except JsonRpcError: return None # Fetch all blocks in parallel results = await asyncio.gather(*[fetch_block(h) for h in heights]) blocks = [b for b in results if b is not None] # Calculate pagination info total_pages = (total + BLOCKS_PER_PAGE - 1) // BLOCKS_PER_PAGE has_prev = page > 1 has_next = page < total_pages return await render_template( "blocks.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), blocks=blocks, total=total, page=page, total_pages=total_pages, has_prev=has_prev, has_next=has_next, blocks_per_page=BLOCKS_PER_PAGE, ) @app.route("/block/") async def get_block_by_height(block_height: int): if block_height < 0: abort(404) current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) block = await rpc.call("get_block", params=[block_height]) dt = datetime.fromtimestamp(block["timestamp"], tz=timezone.utc) block["timestamp"] = dt.strftime("%H:%M UTC %d %b %Y") block["n_txs"] = len(block["txs"]) return await render_template( "block.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), block=block, monero_hash=block["monero_hash"], ) @app.route("/tx/") async def get_tx_by_hash(tx_hash: str): # Validate hex string if not all(c in '0123456789abcdefABCDEF' for c in tx_hash): abort(404) current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) tx = await rpc.call("get_tx", params=[tx_hash]) return await render_template( "tx.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), tx=tx, ) @app.route("/search") async def search(): """Search for blocks by height/hash or transactions by hash.""" query = request.args.get("q", "").strip() if not query: return redirect(url_for("index")) # Try to interpret as block height (integer) if query.isdigit(): return redirect(url_for("get_block_by_height", block_height=int(query))) # Check if it looks like a hex hash if all(c in '0123456789abcdefABCDEF' for c in query): # Use the search RPC to determine if it's a block or tx hash try: result = await rpc.call("search", params=[query]) if result["type"] == "block": return redirect(f"/block/{result['height']}") elif result["type"] == "tx": return redirect(url_for("get_tx_by_hash", tx_hash=query)) except JsonRpcError: pass # Try as contract ID (base58) try: contract = await rpc.call("get_contract", params=[query]) return redirect(url_for("get_contract", contract_id=query)) except JsonRpcError: pass # Nothing found return await render_template( "error.html", network=app.config["NETWORK"], error_code="Not Found", error=f"No block, transaction, or contract found for: {query}" ), 404 @app.route("/contract/") async def get_contract(contract_id: str): current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) contract = await rpc.call("get_contract", params=[contract_id]) contract["wasm_size_formatted"] = format_bytes(int(contract["wasm_size"])) return await render_template( "contract.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), contract=contract, ) @app.route("/contracts") async def list_contracts(): current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) contracts = await rpc.call("list_contracts", params=[]) contract_count = await rpc.call("contract_count", params=[]) for contract in contracts: contract["wasm_size_formatted"] = format_bytes(int(contract["wasm_size"])) return await render_template( "contracts.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), contracts=contracts, contract_count=contract_count, ) @app.route("/stats") async def stats(): current_difficulty = await rpc.call("current_difficulty", params=[]) current_height = await rpc.call("current_height", params=[]) hashrate = await rpc.call("get_hashrate", params=[]) stats_data = await rpc.call("get_stats", params=[]) return await render_template( "stats.html", network=app.config["NETWORK"], current_difficulty=current_difficulty[0], current_height=current_height, hashrate=format_hashrate(hashrate), stats=stats_data, ) @app.route("/stats/daily_tx_chart.png") async def daily_tx_chart(): """Generate daily average transactions chart as PNG using matplotlib.""" import io import matplotlib matplotlib.use('Agg') # Non-interactive backend import matplotlib.pyplot as plt import matplotlib.dates as mdates stats_data = await rpc.call("get_stats", params=[]) daily_stats = stats_data.get("daily_stats", []) # Filter to last 90 days if daily_stats: max_day = max(d["day"] for d in daily_stats) daily_stats = [d for d in daily_stats if d["day"] >= max_day - 90] # Create figure with dark theme plt.style.use('dark_background') fig, ax = plt.subplots(figsize=(12, 4), dpi=100) fig.patch.set_facecolor('#0d1117') ax.set_facecolor('#0d1117') if daily_stats: # Convert day numbers to dates dates = [datetime.fromtimestamp(d["day"] * 86400, tz=timezone.utc) for d in daily_stats] values = [d["avg_tx"] for d in daily_stats] ax.fill_between(dates, values, alpha=0.3, color='#6366f1') ax.plot(dates, values, color='#6366f1', linewidth=2) # Format x-axis ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d')) ax.xaxis.set_major_locator(mdates.DayLocator(interval=7)) plt.xticks(rotation=45, ha='right') ax.set_ylabel('Avg TX per Block', color='#9ca3af') ax.tick_params(colors='#9ca3af') ax.spines['bottom'].set_color('#30363d') ax.spines['left'].set_color('#30363d') ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.grid(True, alpha=0.2, color='#30363d') else: ax.text(0.5, 0.5, 'No data available', ha='center', va='center', transform=ax.transAxes, color='#9ca3af', fontsize=14) ax.set_xlim(0, 1) ax.set_ylim(0, 1) plt.tight_layout() # Save to bytes buffer buf = io.BytesIO() plt.savefig(buf, format='png', facecolor='#0d1117', edgecolor='none') plt.close(fig) buf.seek(0) return Response(buf.getvalue(), mimetype='image/png') if __name__ == "__main__": app.run(host="127.0.0.1", port=5000, debug=True)