204 lines
8.7 KiB
Python
204 lines
8.7 KiB
Python
![]() |
#!/usr/bin/env python3
|
||
|
"""
|
||
|
Book of Owners Management Script
|
||
|
Handles license purchases, upgrades, and validation
|
||
|
"""
|
||
|
|
||
|
import csv
|
||
|
import yaml
|
||
|
from datetime import datetime
|
||
|
import os
|
||
|
import uuid
|
||
|
|
||
|
class OwnershipManager:
|
||
|
def __init__(self):
|
||
|
self.base_path = os.path.join(os.path.dirname(__file__), '..')
|
||
|
self.load_policies()
|
||
|
|
||
|
def load_policies(self):
|
||
|
"""Load license policies"""
|
||
|
with open(os.path.join(self.base_path, 'policies', 'license-policies.yml'), 'r') as f:
|
||
|
self.policies = yaml.safe_load(f)
|
||
|
|
||
|
def generate_owner_id(self):
|
||
|
"""Generate unique owner ID"""
|
||
|
# Simple 3-digit incremental ID (could be more sophisticated)
|
||
|
book_file = os.path.join(self.base_path, 'ownership', 'book-of-owners.csv')
|
||
|
try:
|
||
|
with open(book_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
existing_ids = [int(row['owner_id']) for row in reader if row['owner_id'].isdigit()]
|
||
|
next_id = max(existing_ids) + 1 if existing_ids else 1
|
||
|
return f"{next_id:03d}"
|
||
|
except (FileNotFoundError, ValueError):
|
||
|
return "001"
|
||
|
|
||
|
def add_new_owner(self, real_name, email, display_name, license_type, payment_eur=0, privacy_level="alias_only"):
|
||
|
"""Add new owner to the system"""
|
||
|
owner_id = self.generate_owner_id()
|
||
|
timestamp = datetime.now().isoformat() + 'Z'
|
||
|
|
||
|
# Add to identity mapping (private)
|
||
|
identity_file = os.path.join(self.base_path, 'ownership', 'identity-mapping.csv')
|
||
|
with open(identity_file, 'a', newline='') as f:
|
||
|
writer = csv.writer(f)
|
||
|
writer.writerow([owner_id, real_name, email, display_name, privacy_level, timestamp])
|
||
|
|
||
|
# Add to license transactions
|
||
|
self.record_license_transaction(owner_id, "PURCHASE", "", license_type, payment_eur, "initial_purchase")
|
||
|
|
||
|
# Update book of owners
|
||
|
self.update_book_of_owners(owner_id, display_name, license_type)
|
||
|
|
||
|
print(f"✅ New owner added: {display_name} ({owner_id}) - {license_type} license")
|
||
|
return owner_id
|
||
|
|
||
|
def record_license_transaction(self, owner_id, transaction_type, from_license, to_license, payment_eur, reason, reference="manual", approver="system"):
|
||
|
"""Record license change in transaction log"""
|
||
|
timestamp = datetime.now().isoformat() + 'Z'
|
||
|
|
||
|
transactions_file = os.path.join(self.base_path, 'ownership', 'license-transactions.csv')
|
||
|
with open(transactions_file, 'a', newline='') as f:
|
||
|
writer = csv.writer(f)
|
||
|
writer.writerow([timestamp, owner_id, transaction_type, from_license, to_license, payment_eur, reason, reference, approver])
|
||
|
|
||
|
def update_book_of_owners(self, owner_id, display_name, license_type, current_sk=0, status="active"):
|
||
|
"""Update current book of owners record"""
|
||
|
timestamp = datetime.now().isoformat() + 'Z'
|
||
|
voting_weight = min(1.5, 1 + (current_sk / 1000))
|
||
|
|
||
|
book_file = os.path.join(self.base_path, 'ownership', 'book-of-owners.csv')
|
||
|
|
||
|
# Read existing records
|
||
|
records = []
|
||
|
try:
|
||
|
with open(book_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
records = [row for row in reader if row['owner_id'] != owner_id]
|
||
|
except FileNotFoundError:
|
||
|
pass
|
||
|
|
||
|
# Add/update record
|
||
|
records.append({
|
||
|
'owner_id': owner_id,
|
||
|
'display_name': display_name,
|
||
|
'license_type': license_type,
|
||
|
'license_date': timestamp,
|
||
|
'current_sk': current_sk,
|
||
|
'voting_weight': f"{voting_weight:.2f}",
|
||
|
'status': status,
|
||
|
'last_updated': timestamp
|
||
|
})
|
||
|
|
||
|
# Write back
|
||
|
with open(book_file, 'w', newline='') as f:
|
||
|
fieldnames = ['owner_id', 'display_name', 'license_type', 'license_date', 'current_sk', 'voting_weight', 'status', 'last_updated']
|
||
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||
|
writer.writeheader()
|
||
|
writer.writerows(records)
|
||
|
|
||
|
def check_sk_upgrades(self):
|
||
|
"""Check for automatic SK-based license upgrades"""
|
||
|
print("🔍 Checking for automatic license upgrades...")
|
||
|
|
||
|
# Load current SK balances from karma ledger
|
||
|
sk_balances = self.get_current_sk_balances()
|
||
|
|
||
|
# Check each owner for upgrade eligibility
|
||
|
book_file = os.path.join(self.base_path, 'ownership', 'book-of-owners.csv')
|
||
|
with open(book_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
for owner in reader:
|
||
|
owner_id = owner['owner_id']
|
||
|
current_license = owner['license_type']
|
||
|
display_name = owner['display_name']
|
||
|
current_sk = sk_balances.get(display_name, 0)
|
||
|
|
||
|
# Check campaign -> watch upgrade
|
||
|
if current_license == 'campaign' and current_sk >= 100:
|
||
|
print(f"🎉 Auto-upgrade: {display_name} (Campaign → Watch) - {current_sk} SK earned!")
|
||
|
self.record_license_transaction(owner_id, "UPGRADE_SK", "campaign", "watch", 0, "sk_threshold_100")
|
||
|
self.update_book_of_owners(owner_id, display_name, "watch", current_sk)
|
||
|
|
||
|
def get_current_sk_balances(self):
|
||
|
"""Get current SK balances from karma ledger"""
|
||
|
balances = {}
|
||
|
karma_file = os.path.join(self.base_path, 'ledger', 'social-karma', 'transactions.csv')
|
||
|
|
||
|
try:
|
||
|
with open(karma_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
for row in reader:
|
||
|
if row['type'] == 'KARMA':
|
||
|
balances[row['to']] = balances.get(row['to'], 0) + int(row['amount'])
|
||
|
elif row['type'] == 'DECAY':
|
||
|
balances[row['from']] = balances.get(row['from'], 0) - int(row['amount'])
|
||
|
except FileNotFoundError:
|
||
|
pass
|
||
|
|
||
|
return balances
|
||
|
|
||
|
def validate_currency_award(self, username, currency_type, amount):
|
||
|
"""Validate if user can receive SC or SK award"""
|
||
|
# Get user's license type
|
||
|
book_file = os.path.join(self.base_path, 'ownership', 'book-of-owners.csv')
|
||
|
|
||
|
try:
|
||
|
with open(book_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
for owner in reader:
|
||
|
if owner['display_name'] == username:
|
||
|
license_type = owner['license_type']
|
||
|
|
||
|
# Check SC eligibility
|
||
|
if currency_type == 'SC':
|
||
|
can_earn = self.policies['license_capabilities'][license_type]['can_earn_sc']
|
||
|
if not can_earn:
|
||
|
return False, f"{username} has {license_type} license - cannot earn SC (needs work/organizational)"
|
||
|
|
||
|
# Check SK eligibility
|
||
|
elif currency_type == 'SK':
|
||
|
can_earn = self.policies['license_capabilities'][license_type]['can_earn_sk']
|
||
|
if not can_earn:
|
||
|
return False, f"{username} has {license_type} license - cannot earn SK"
|
||
|
|
||
|
return True, f"✅ {username} eligible for {amount} {currency_type}"
|
||
|
|
||
|
return False, f"❌ {username} not found in Book of Owners"
|
||
|
|
||
|
except FileNotFoundError:
|
||
|
return False, "❌ Book of Owners not found"
|
||
|
|
||
|
def main():
|
||
|
"""Main owner management interface"""
|
||
|
manager = OwnershipManager()
|
||
|
|
||
|
print("📖 BOOK OF OWNERS MANAGEMENT")
|
||
|
print("=" * 40)
|
||
|
print("1. Add new owner")
|
||
|
print("2. Check SK upgrade eligibility")
|
||
|
print("3. Validate currency award")
|
||
|
print("4. Show current owners")
|
||
|
|
||
|
# For now, just show current owners
|
||
|
book_file = os.path.join(manager.base_path, 'ownership', 'book-of-owners.csv')
|
||
|
try:
|
||
|
with open(book_file, 'r') as f:
|
||
|
reader = csv.DictReader(f)
|
||
|
owners = list(reader)
|
||
|
|
||
|
if owners:
|
||
|
print(f"\n📊 Current Owners: {len([o for o in owners if o['owner_id']])}")
|
||
|
print("-" * 60)
|
||
|
for owner in owners:
|
||
|
if owner['owner_id']: # Skip header comments
|
||
|
print(f"{owner['owner_id']}: {owner['display_name']} ({owner['license_type']}) - {owner['current_sk']} SK")
|
||
|
else:
|
||
|
print("📝 No owners registered yet")
|
||
|
|
||
|
except FileNotFoundError:
|
||
|
print("📝 Book of Owners not created yet")
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|