251 lines
10 KiB
Python
Executable File
251 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Assess work session effort and recommend SC payout
|
||
Converts human collaboration into fair compensation through structured evaluation
|
||
Version: 0.1 (Prototype for 6_0_3_0)
|
||
"""
|
||
|
||
import argparse
|
||
import csv
|
||
import yaml
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
class SessionAssessor:
|
||
def __init__(self):
|
||
# Fix: Need to go up 3 levels from scripts/task-management/ to currency-ledger/
|
||
self.base_path = Path(__file__).parent.parent.parent
|
||
self.policies_path = self.base_path / "policies" / "task-management"
|
||
self.ledger_path = self.base_path / "ledger"
|
||
|
||
print(f"Debug: Base path = {self.base_path}")
|
||
print(f"Debug: Policies path = {self.policies_path}")
|
||
|
||
def load_assessment_template(self):
|
||
"""Load effort assessment scoring criteria"""
|
||
template_file = self.policies_path / "effort-assessment-template.yml"
|
||
print(f"Debug: Looking for template at {template_file}")
|
||
|
||
with open(template_file, 'r') as f:
|
||
return yaml.safe_load(f)
|
||
|
||
def get_task_data(self, task_id):
|
||
"""Retrieve task budget information"""
|
||
budget_file = self.ledger_path / "task-management" / "task-budgets.csv"
|
||
|
||
with open(budget_file, 'r') as f:
|
||
reader = csv.DictReader(f)
|
||
for row in reader:
|
||
if row['task_id'] == task_id:
|
||
return row
|
||
|
||
raise ValueError(f"Task {task_id} not found in task budgets")
|
||
|
||
def calculate_payout_percentage(self, attacker_score, defender_score, collaboration_score, template):
|
||
"""Calculate SC payout percentage based on effort scores"""
|
||
total_score = attacker_score + defender_score + collaboration_score
|
||
max_score = 30 # 3 categories × 10 points each
|
||
|
||
# Use template scoring ranges
|
||
payout_ranges = template['payout_calculation']
|
||
|
||
if total_score >= 27: # 90%+ of max
|
||
return 95 # excellent_session range
|
||
elif total_score >= 24: # 80%+ of max
|
||
return 85 # good_session range
|
||
elif total_score >= 18: # 60%+ of max
|
||
return 70 # adequate_session range
|
||
elif total_score >= 6: # 20%+ of max
|
||
return 40 # poor_session range
|
||
else:
|
||
return 10 # failed_session range
|
||
|
||
def create_session_log(self, session_data):
|
||
"""Log session details for transparency"""
|
||
session_file = self.ledger_path / "task-management" / "session-logs.csv"
|
||
|
||
# Create header if file doesn't exist
|
||
if not session_file.exists():
|
||
with open(session_file, 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow([
|
||
'session_id', 'task_id', 'start_time', 'end_time',
|
||
'attacker', 'defender', 'mission_leader',
|
||
'attacker_effort_score', 'defender_learning_score', 'collaboration_score',
|
||
'total_score', 'recommended_payout_percent', 'session_notes'
|
||
])
|
||
|
||
# Add session record
|
||
with open(session_file, 'a', newline='') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow([
|
||
session_data['session_id'],
|
||
session_data['task_id'],
|
||
session_data['start_time'],
|
||
session_data['end_time'],
|
||
session_data['attacker'],
|
||
session_data['defender'],
|
||
session_data['mission_leader'],
|
||
session_data['attacker_effort_score'],
|
||
session_data['defender_learning_score'],
|
||
session_data['collaboration_score'],
|
||
session_data['total_score'],
|
||
session_data['recommended_payout_percent'],
|
||
session_data['session_notes']
|
||
])
|
||
|
||
def create_pending_sc_entries(self, task_data, session_data):
|
||
"""Create entries in pending-sc for democratic validation"""
|
||
pending_file = self.ledger_path / "pending-sc" / "transactions.csv"
|
||
|
||
# Calculate actual SC amounts
|
||
payout_percent = session_data['recommended_payout_percent'] / 100
|
||
attacker_sc = int(int(task_data['attacker_sc']) * payout_percent)
|
||
defender_sc = int(int(task_data['defender_sc']) * payout_percent)
|
||
|
||
# Create pending SC entries (integrate with existing system)
|
||
entries = [
|
||
{
|
||
'timestamp': datetime.now().isoformat(),
|
||
'type': 'PENDING_SC',
|
||
'amount': attacker_sc,
|
||
'from': '',
|
||
'to': task_data['attacker_alias'],
|
||
'reference': f"task-{task_data['task_id']}-attacker",
|
||
'description': f"Task {task_data['task_id']}: {task_data['title']} - Attacker role ({session_data['recommended_payout_percent']}% effort payout)",
|
||
'validator_required': 'team-captain',
|
||
'phase': 'validation',
|
||
'evidence_link': task_data['forgejo_issue_url'],
|
||
'status': 'PENDING_VALIDATION',
|
||
'vesting_tranche': ''
|
||
},
|
||
{
|
||
'timestamp': datetime.now().isoformat(),
|
||
'type': 'PENDING_SC',
|
||
'amount': defender_sc,
|
||
'from': '',
|
||
'to': task_data['defender_alias'],
|
||
'reference': f"task-{task_data['task_id']}-defender",
|
||
'description': f"Task {task_data['task_id']}: {task_data['title']} - Defender role ({session_data['recommended_payout_percent']}% effort payout)",
|
||
'validator_required': 'team-captain',
|
||
'phase': 'validation',
|
||
'evidence_link': task_data['forgejo_issue_url'],
|
||
'status': 'PENDING_VALIDATION',
|
||
'vesting_tranche': ''
|
||
}
|
||
]
|
||
|
||
# Append to existing pending-sc file
|
||
with open(pending_file, 'a', newline='') as f:
|
||
writer = csv.writer(f)
|
||
for entry in entries:
|
||
writer.writerow([
|
||
entry['timestamp'],
|
||
entry['type'],
|
||
entry['amount'],
|
||
entry['from'],
|
||
entry['to'],
|
||
entry['reference'],
|
||
entry['description'],
|
||
entry['validator_required'],
|
||
entry['phase'],
|
||
entry['evidence_link'],
|
||
entry['status'],
|
||
entry['vesting_tranche']
|
||
])
|
||
|
||
return attacker_sc, defender_sc
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Assess work session and recommend SC payout"
|
||
)
|
||
parser.add_argument('--task-id', required=True,
|
||
help='Task ID (e.g., 6_1_3_0)')
|
||
parser.add_argument('--mission-leader', required=True,
|
||
help='Mission leader conducting assessment')
|
||
parser.add_argument('--start-time', required=True,
|
||
help='Session start time (ISO format)')
|
||
parser.add_argument('--end-time', required=True,
|
||
help='Session end time (ISO format)')
|
||
parser.add_argument('--attacker-effort-score', type=int, required=True,
|
||
choices=range(1, 11), help='Attacker effort score (1-10)')
|
||
parser.add_argument('--defender-learning-score', type=int, required=True,
|
||
choices=range(1, 11), help='Defender learning score (1-10)')
|
||
parser.add_argument('--collaboration-score', type=int, required=True,
|
||
choices=range(1, 11), help='Collaboration quality score (1-10)')
|
||
parser.add_argument('--session-notes', required=True,
|
||
help='Detailed assessment notes')
|
||
parser.add_argument('--override-payout', type=int,
|
||
help='Override calculated payout percentage (60-100)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Initialize assessor
|
||
assessor = SessionAssessor()
|
||
|
||
# Load assessment template
|
||
template = assessor.load_assessment_template()
|
||
|
||
# Get task data
|
||
task_data = assessor.get_task_data(args.task_id)
|
||
|
||
# Calculate or use override payout percentage
|
||
if args.override_payout:
|
||
if args.override_payout < 60 or args.override_payout > 100:
|
||
print("❌ Override payout must be between 60-100%")
|
||
return
|
||
payout_percent = args.override_payout
|
||
print(f"⚠️ Using override payout: {payout_percent}%")
|
||
else:
|
||
payout_percent = assessor.calculate_payout_percentage(
|
||
args.attacker_effort_score,
|
||
args.defender_learning_score,
|
||
args.collaboration_score,
|
||
template
|
||
)
|
||
|
||
# Generate session ID
|
||
session_id = f"sess_{args.task_id}_{datetime.now().strftime('%Y%m%d_%H%M')}"
|
||
|
||
# Prepare session data
|
||
session_data = {
|
||
'session_id': session_id,
|
||
'task_id': args.task_id,
|
||
'start_time': args.start_time,
|
||
'end_time': args.end_time,
|
||
'attacker': task_data['attacker_alias'],
|
||
'defender': task_data['defender_alias'],
|
||
'mission_leader': args.mission_leader,
|
||
'attacker_effort_score': args.attacker_effort_score,
|
||
'defender_learning_score': args.defender_learning_score,
|
||
'collaboration_score': args.collaboration_score,
|
||
'total_score': args.attacker_effort_score + args.defender_learning_score + args.collaboration_score,
|
||
'recommended_payout_percent': payout_percent,
|
||
'session_notes': args.session_notes
|
||
}
|
||
|
||
# Log session
|
||
assessor.create_session_log(session_data)
|
||
|
||
# Create pending SC entries
|
||
attacker_sc, defender_sc = assessor.create_pending_sc_entries(task_data, session_data)
|
||
|
||
# Output results
|
||
print(f"✅ Session Assessed: {session_id}")
|
||
print(f"📊 Total Score: {session_data['total_score']}/30")
|
||
print(f"💰 Recommended Payout: {payout_percent}%")
|
||
print(f"⚔️ Attacker SC: {attacker_sc} SC ({task_data['attacker_alias']})")
|
||
print(f"🛡️ Defender SC: {defender_sc} SC ({task_data['defender_alias']})")
|
||
print(f"📝 Session logged in: ledger/task-management/session-logs.csv")
|
||
print(f"⏳ Pending SC created in: ledger/pending-sc/transactions.csv")
|
||
print()
|
||
print("🔧 Next Steps:")
|
||
print("1. Team Captain reviews pending SC entries")
|
||
print("2. Community lazy consensus period (48h)")
|
||
print("3. SC awards processed via existing validation system")
|
||
print(f"4. Check status: python3 scripts/validate-pending-sc.py")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|