1_general_forum/currency-ledger/scripts/system-check.py

222 lines
9.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Comprehensive Smartup Zero System Health Check
Validates complete currency ledger and task management system
"""
import os
import csv
import yaml
from pathlib import Path
class SystemChecker:
def __init__(self):
self.base_path = Path(__file__).parent.parent
self.errors = []
self.warnings = []
def check_file_structure(self):
"""Validate complete file structure exists"""
print("📁 CHECKING FILE STRUCTURE")
print("=" * 30)
required_files = [
"ledger/smartup-credits/transactions.csv",
"ledger/social-karma/transactions.csv",
"ledger/pending-sc/transactions.csv",
"ledger/treasury/balance.csv",
"ledger/task-management/task-budgets.csv",
"ledger/task-management/session-logs.csv",
"ownership/book-of-owners.csv",
"policies/credit-rates.yml",
"policies/validation-rules.yml",
"policies/task-management/effort-assessment-template.yml",
"policies/task-management/mission-leader-rules.yml"
]
for file_path in required_files:
full_path = self.base_path / file_path
if full_path.exists():
print(f"{file_path}")
else:
print(f"{file_path}")
self.errors.append(f"Missing required file: {file_path}")
def validate_csv_integrity(self):
"""Check CSV files can be parsed and have proper headers"""
print(f"\n📊 VALIDATING CSV INTEGRITY")
print("=" * 30)
csv_files = {
"ledger/smartup-credits/transactions.csv": ["timestamp", "type", "amount", "from", "to", "reference"],
"ledger/treasury/balance.csv": ["timestamp", "event_type", "total_eur_balance", "sc_outstanding"],
"ledger/task-management/task-budgets.csv": ["task_id", "total_sc_budget", "attacker_alias", "defender_alias"],
"ownership/book-of-owners.csv": ["owner_id", "display_name", "license_type"]
}
for file_path, required_headers in csv_files.items():
full_path = self.base_path / file_path
if full_path.exists():
try:
with open(full_path, 'r') as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
missing_headers = [h for h in required_headers if h not in headers]
if missing_headers:
print(f"⚠️ {file_path} - Missing headers: {missing_headers}")
self.warnings.append(f"{file_path} missing headers: {missing_headers}")
else:
print(f"{file_path}")
except Exception as e:
print(f"{file_path} - Parse error: {e}")
self.errors.append(f"CSV parse error in {file_path}: {e}")
def validate_treasury_balance(self):
"""Check 3x rule compliance and balance integrity"""
print(f"\n💰 VALIDATING TREASURY BALANCE")
print("=" * 30)
try:
# Get latest treasury state
treasury_file = self.base_path / "ledger/treasury/balance.csv"
latest_balance = {"sc_outstanding": 0, "total_eur_balance": 0}
with open(treasury_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
if not row['timestamp'].startswith('#') and row['timestamp']:
latest_balance = {
"sc_outstanding": int(row['sc_outstanding']),
"total_eur_balance": float(row['total_eur_balance'])
}
# Calculate SC outstanding from transactions
sc_file = self.base_path / "ledger/smartup-credits/transactions.csv"
calculated_outstanding = 0
with open(sc_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
if not row['timestamp'].startswith('#') and row['timestamp']:
if row['type'] == 'SC':
calculated_outstanding += int(row['amount'])
elif row['type'] in ['REDEEM', 'DESTROY']:
calculated_outstanding -= int(row['amount'])
print(f"Treasury SC Outstanding: {latest_balance['sc_outstanding']}")
print(f"Calculated SC Outstanding: {calculated_outstanding}")
print(f"EUR Treasury Balance: €{latest_balance['total_eur_balance']}")
if latest_balance['sc_outstanding'] != calculated_outstanding:
self.errors.append(f"Treasury/SC mismatch: {latest_balance['sc_outstanding']} vs {calculated_outstanding}")
print(f"❌ Treasury balance mismatch!")
else:
print(f"✅ Treasury balance consistent")
# Check 3x rule
if latest_balance['total_eur_balance'] > 0:
ratio = latest_balance['sc_outstanding'] / (latest_balance['total_eur_balance'] * 3)
if ratio <= 1:
print(f"✅ 3x Rule compliant ({ratio:.1%})")
else:
print(f"❌ 3x Rule violation ({ratio:.1%})")
self.errors.append(f"3x rule violation: {ratio:.1%}")
else:
if latest_balance['sc_outstanding'] > 0:
print(f"⚠️ 3x Rule: SC outstanding but no treasury funding")
self.warnings.append("SC outstanding without treasury funding")
else:
print(f"✅ 3x Rule: No SC outstanding, no treasury needed")
except Exception as e:
print(f"❌ Treasury validation failed: {e}")
self.errors.append(f"Treasury validation error: {e}")
def validate_task_system(self):
"""Check task-management system integrity"""
print(f"\n🎯 VALIDATING TASK SYSTEM")
print("=" * 30)
try:
# Check if task budgets match session logs
budget_file = self.base_path / "ledger/task-management/task-budgets.csv"
session_file = self.base_path / "ledger/task-management/session-logs.csv"
task_budgets = {}
if budget_file.exists():
with open(budget_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
task_budgets[row['task_id']] = row
print(f"{len(task_budgets)} tasks in budget system")
session_logs = {}
if session_file.exists():
with open(session_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
session_logs[row['task_id']] = row
print(f"{len(session_logs)} completed sessions logged")
# Verify ADM split calculations
for task_id, budget in task_budgets.items():
total_budget = int(budget['total_sc_budget'])
attacker_sc = int(budget['attacker_sc'])
defender_sc = int(budget['defender_sc'])
expected_attacker = int(total_budget * 0.9)
expected_defender = int(total_budget * 0.1)
if attacker_sc != expected_attacker or defender_sc != expected_defender:
print(f"⚠️ {task_id}: ADM split incorrect")
self.warnings.append(f"{task_id} has incorrect ADM split")
else:
print(f"{task_id}: ADM split correct")
except Exception as e:
print(f"❌ Task system validation failed: {e}")
self.errors.append(f"Task system validation error: {e}")
def run_complete_check(self):
"""Run all system checks"""
print("🚀 SMARTUP ZERO COMPREHENSIVE SYSTEM CHECK")
print("=" * 50)
self.check_file_structure()
self.validate_csv_integrity()
self.validate_treasury_balance()
self.validate_task_system()
print(f"\n📊 SYSTEM CHECK RESULTS")
print("=" * 30)
if self.errors:
print(f"{len(self.errors)} ERRORS FOUND:")
for error in self.errors:
print(f"{error}")
if self.warnings:
print(f"⚠️ {len(self.warnings)} WARNINGS:")
for warning in self.warnings:
print(f"{warning}")
if not self.errors and not self.warnings:
print("🎉 SYSTEM CHECK PASSED!")
print("✅ All components operational")
print("✅ Data integrity verified")
print("✅ Ready for production use")
elif not self.errors:
print("✅ SYSTEM OPERATIONAL (with warnings)")
print("💡 Address warnings for optimal performance")
else:
print("❌ SYSTEM CHECK FAILED")
print("🔧 Fix errors before production deployment")
if __name__ == "__main__":
checker = SystemChecker()
checker.run_complete_check()