import gradio as gr
from typing import Optional
import os
from datetime import datetime
import urllib.parse
class GovernanceFormBuilder:
def __init__(self):
self.form_data = {
'scopes': {},
'participants': {},
'policies': {}
}
# Track added conditions for current policy being edited
self.current_policy_conditions = []
# Preview component placeholder (initialized in _create_preview_panel)
self.preview_code: Optional[gr.Textbox] = None
def create_interface(self):
"""Create the main Gradio interface"""
with gr.Blocks(
title="Governance Policy Builder",
theme=gr.themes.Soft(),
css="""
.main-container { max-width: 1200px; margin: auto; }
.form-section { padding: 20px; border: 1px solid #e0e0e0; border-radius: 8px; margin: 10px 0; }
.preview-panel { background-color: #f8f9fa; padding: 15px; border-radius: 8px; }
"""
) as ui:
gr.Markdown("# 🏛️ Governance Policy Builder")
gr.Markdown("Define governance rules for your software project using an intuitive form interface.")
# State to store form data across tabs
form_state = gr.State(self.form_data)
with gr.Row():
with gr.Column(scale=3, elem_classes=["main-container"]):
with gr.Tabs():
with gr.Tab("🎯 Scopes", id="scopes"):
scope_components = self._create_scope_forms()
with gr.Tab("👥 Participants", id="participants"):
participant_components = self._create_participant_forms()
with gr.Tab("📋 Policies", id="policies"):
policy_components = self._create_policy_forms()
with gr.Column(scale=2):
preview_components = self._create_preview_panel()
# Update handlers
self._setup_event_handlers(scope_components, participant_components, policy_components, preview_components, form_state)
return ui
def _create_scope_forms(self):
"""Create forms for defining scopes (Projects, Activities, Tasks)"""
gr.Markdown("## Define Project Scopes")
gr.Markdown("Set up the organizational structure of your project.")
with gr.Accordion("Projects", open=True):
gr.Markdown("### Define Projects")
gr.Markdown("*Define individual projects with optional platform and repository information*")
gr.Markdown("**Note:** Only Project Name is required")
with gr.Row():
project_name = gr.Textbox(
label="Project Name *",
placeholder="e.g., MyProject, K8sProject",
info="Unique name for this project (required - must be unique across all projects, activities, and tasks)"
)
project_platform = gr.Dropdown(
label="Platform (Optional)",
choices=["", "GitHub", "GitLab"],
value="",
info="Select the platform where the project is hosted"
)
with gr.Row():
project_repo_owner = gr.Textbox(
label="Repository Owner *",
placeholder="e.g., kubernetes, microsoft, myorganization",
info="Username or organization name",
visible=False
)
project_repo_name = gr.Textbox(
label="Repository Name *",
placeholder="e.g., kubernetes, vscode, myproject",
info="Repository name",
visible=False
)
with gr.Row():
add_project_btn = gr.Button("➕ Add Project", variant="secondary")
clear_projects_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added projects
projects_display = gr.Textbox(
label="Added Projects",
lines=4,
interactive=False,
placeholder="No projects added yet. Create projects to define your organizational structure.",
info="Projects you've added will appear here"
)
# Hidden component to store projects data
projects_data = gr.State([])
with gr.Accordion("Activities", open=False):
gr.Markdown("### Define Activities")
gr.Markdown("*Define activities with optional parent project association*")
gr.Markdown("**Note:** Only Activity Name is required")
with gr.Row():
activity_name = gr.Textbox(
label="Activity Name *",
placeholder="e.g., CodeReview, TestActivity, Documentation",
info="Unique name for this activity (required - must be unique across all projects, activities, and tasks)"
)
activity_parent = gr.Dropdown(
label="Parent Project (Optional)",
choices=[], # Will be populated dynamically from added projects
value=None,
allow_custom_value=False,
info="Select a parent project or leave empty for root-level activity"
)
with gr.Row():
add_activity_btn = gr.Button("➕ Add Activity", variant="secondary")
clear_activities_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added activities
activities_display = gr.Textbox(
label="Added Activities",
lines=4,
interactive=False,
placeholder="No activities added yet. Create activities to organize your project workflow.",
info="Activities you've added will appear here"
)
# Hidden component to store activities data
activities_data = gr.State([])
with gr.Accordion("Tasks", open=False):
gr.Markdown("### Define Tasks")
gr.Markdown("*Define specific tasks with optional parent, type, and action details*")
gr.Markdown("**Note:** Only Task Name is required")
with gr.Row():
task_name = gr.Textbox(
label="Task Name *",
placeholder="e.g., MergeTask, PRMerge, ApprovalTask",
info="Unique name for this task (required - must be unique across all projects, activities, and tasks)"
)
task_parent = gr.Dropdown(
label="Parent Activity (Optional)",
choices=[], # Will be populated dynamically from activities only
value=None,
allow_custom_value=False,
info="Select a parent activity (tasks can only belong to activities)"
)
with gr.Row():
task_type = gr.Dropdown(
label="Type (Optional)",
choices=["", "Pull request", "MemberLifecycle"],
value="",
info="Type of task"
)
task_action = gr.Dropdown(
label="Action (Optional)",
choices=[""],
value="",
info="Specific action this task performs",
visible=False
)
with gr.Row():
add_task_btn = gr.Button("➕ Add Task", variant="secondary")
clear_tasks_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added tasks
tasks_display = gr.Textbox(
label="Added Tasks",
lines=4,
interactive=False,
placeholder="No tasks added yet. Create tasks to define specific governance actions.",
info="Tasks you've added will appear here"
)
# Hidden component to store tasks data
tasks_data = gr.State([])
return {
'project_name': project_name,
'project_platform': project_platform,
'project_repo_owner': project_repo_owner,
'project_repo_name': project_repo_name,
'add_project_btn': add_project_btn,
'clear_projects_btn': clear_projects_btn,
'projects_display': projects_display,
'projects_data': projects_data,
'activity_name': activity_name,
'activity_parent': activity_parent,
'add_activity_btn': add_activity_btn,
'clear_activities_btn': clear_activities_btn,
'activities_display': activities_display,
'activities_data': activities_data,
'task_name': task_name,
'task_parent': task_parent,
'task_type': task_type,
'task_action': task_action,
'add_task_btn': add_task_btn,
'clear_tasks_btn': clear_tasks_btn,
'tasks_display': tasks_display,
'tasks_data': tasks_data
}
def _create_participant_forms(self):
"""Create forms for defining participants (Roles, Individuals, Agents)"""
gr.Markdown("## Define Participants")
gr.Markdown("Set up the people and roles involved in governance decisions.")
with gr.Accordion("Profiles", open=True):
gr.Markdown("### Define Profiles")
gr.Markdown("*Create diversity profiles for tracking inclusive governance*")
gr.Markdown("*Both gender and race are optional - you can define profiles with just one attribute or both*")
with gr.Row():
profile_name = gr.Textbox(
label="Profile Name",
placeholder="e.g., diverse_profile, senior_dev_profile",
info="Unique name for this profile"
)
with gr.Row():
profile_gender = gr.Dropdown(
label="Gender (Optional)",
choices=["", "male", "female", "non-binary", "other"],
value="",
allow_custom_value=False,
info="Leave empty if not applicable"
)
profile_race = gr.Dropdown(
label="Race/Ethnicity (Optional)",
choices=["", "asian", "black", "hispanic", "white", "mixed", "other"],
value="",
allow_custom_value=False,
info="Leave empty if not applicable"
)
profile_language = gr.Dropdown(
label="Language (Optional)",
choices=["", "english", "spanish", "french", "german", "chinese", "japanese", "other"],
value="",
allow_custom_value=True,
info="Leave empty if not applicable"
)
with gr.Row():
add_profile_btn = gr.Button("➕ Add Profile", variant="secondary")
clear_profiles_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added profiles
profiles_display = gr.Textbox(
label="Added Profiles",
lines=4,
interactive=False,
placeholder="No profiles added yet. Remember: profiles must have at least one attribute (gender, race, or language).",
info="Profiles you've added will appear here"
)
# Hidden component to store profiles data
profiles_data = gr.State([])
with gr.Accordion("Roles", open=True):
gr.Markdown("### Define Roles")
gr.Markdown("*Define individual roles for your project*")
gr.Markdown("**Note:** Role names must be unique across all participants (roles, individuals, agents)")
with gr.Row():
role_name = gr.Textbox(
label="Role Name *",
placeholder="e.g., maintainer, reviewer, approver, owner",
info="Unique name for this role (required - must be unique across all participants)"
)
role_vote_value = gr.Number(
label="Vote Value (Optional)",
value=1.0,
step=0.1,
info="Weight of this role's vote (must be >= 0.0, default: 1.0)"
)
with gr.Row():
add_role_btn = gr.Button("➕ Add Role", variant="secondary")
clear_roles_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added roles
roles_display = gr.Textbox(
label="Added Roles",
lines=4,
interactive=False,
placeholder="No roles added yet. Create roles to define project responsibilities.",
info="Roles you've added will appear here"
)
# Hidden component to store roles data
roles_data = gr.State([])
with gr.Accordion("Individuals", open=True):
gr.Markdown("### Individual Participants")
gr.Markdown("*Define individual human participants with optional attributes*")
gr.Markdown("*All attributes except name are optional*")
with gr.Row():
individual_name = gr.Textbox(
label="Individual Name",
placeholder="e.g., john_doe, jane_smith",
info="Unique name for this individual"
)
with gr.Row():
individual_vote_value = gr.Number(
label="Vote Value (Optional)",
value=1.0,
step=0.1,
info="Weight of this individual's vote (must be >= 0.0, default: 1.0)"
)
individual_profile = gr.Dropdown(
label="Profile (Optional)",
choices=[], # Will be populated dynamically from added profiles
value=None,
allow_custom_value=False,
info="Diversity profile for this individual"
)
with gr.Row():
individual_role = gr.Dropdown(
label="Role (Optional)",
choices=[], # Will be populated dynamically from roles text
value=None,
allow_custom_value=False,
info="Role assignment for this individual"
)
with gr.Row():
add_individual_btn = gr.Button("➕ Add Individual", variant="secondary")
clear_individuals_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added individuals
individuals_display = gr.Textbox(
label="Added Individuals",
lines=4,
interactive=False,
placeholder="No individuals added yet",
info="Individual participants you've added will appear here"
)
# Hidden component to store individuals data
individuals_data = gr.State([])
with gr.Accordion("Agents", open=True):
gr.Markdown("### Automated Participants")
gr.Markdown("*Define automated systems that participate in decisions*")
gr.Markdown("*All attributes except name are optional*")
with gr.Row():
agent_name = gr.Textbox(
label="Agent Name",
placeholder="e.g., k8s-ci-robot, dependabot",
info="Unique name for this automated agent"
)
agent_vote_value = gr.Number(
label="Vote Value (Optional)",
value=1.0,
step=0.1,
info="Weight of this agent's vote (must be >= 0.0, default: 1.0)"
)
with gr.Row():
agent_confidence = gr.Number(
label="Confidence (Optional)",
value=None,
minimum=0.0,
maximum=1.0,
step=0.1,
placeholder="Enter value (0.0-1.0)",
info="Agent's confidence level (0.0 to 1.0)"
)
agent_autonomy_level = gr.Number(
label="Autonomy Level (Optional)",
value=None,
minimum=0.0,
maximum=1.0,
step=0.1,
placeholder="Enter value (0.0-1.0)",
info="Agent's autonomy level (0.0 to 1.0)"
)
with gr.Row():
agent_explainability = gr.Number(
label="Explainability (Optional)",
value=None,
minimum=0.0,
maximum=1.0,
step=0.1,
placeholder="Enter value (0.0-1.0)",
info="Agent's explainability level (0.0 to 1.0)"
)
agent_role = gr.Dropdown(
label="Role (Optional)",
choices=[], # Will be populated dynamically from roles text
value=None,
allow_custom_value=False,
info="Role assignment for this agent"
)
with gr.Row():
add_agent_btn = gr.Button("➕ Add Agent", variant="secondary")
clear_agents_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added agents
agents_display = gr.Textbox(
label="Added Agents",
lines=4,
interactive=False,
placeholder="No agents added yet",
info="Automated agents you've added will appear here"
)
# Hidden component to store agents data
agents_data = gr.State([])
return {
'profile_name': profile_name,
'profile_gender': profile_gender,
'profile_race': profile_race,
'profile_language': profile_language,
'add_profile_btn': add_profile_btn,
'clear_profiles_btn': clear_profiles_btn,
'profiles_display': profiles_display,
'profiles_data': profiles_data,
'role_name': role_name,
'role_vote_value': role_vote_value,
'add_role_btn': add_role_btn,
'clear_roles_btn': clear_roles_btn,
'roles_display': roles_display,
'roles_data': roles_data,
'individual_name': individual_name,
'individual_vote_value': individual_vote_value,
'individual_profile': individual_profile,
'individual_role': individual_role,
'add_individual_btn': add_individual_btn,
'clear_individuals_btn': clear_individuals_btn,
'individuals_display': individuals_display,
'individuals_data': individuals_data,
'agent_name': agent_name,
'agent_vote_value': agent_vote_value,
'agent_confidence': agent_confidence,
'agent_autonomy_level': agent_autonomy_level,
'agent_explainability': agent_explainability,
'agent_role': agent_role,
'add_agent_btn': add_agent_btn,
'clear_agents_btn': clear_agents_btn,
'agents_display': agents_display,
'agents_data': agents_data
}
def _create_policy_forms(self):
"""Create forms for defining policies"""
gr.Markdown("## Define Governance Policies")
gr.Markdown("Configure the decision-making rules for your project.")
with gr.Accordion("Add New Policy", open=True):
gr.Markdown("### Create a New Policy")
gr.Markdown("*Define individual policies and add them to your governance structure*")
gr.Markdown("**Note:** Fields marked with * are mandatory")
with gr.Row():
policy_name = gr.Textbox(
label="Policy Name *",
placeholder="e.g., pr_merge_policy",
info="A unique name for this policy (required)"
)
policy_type = gr.Dropdown(
label="Policy Type",
choices=[
"MajorityPolicy",
"AbsoluteMajorityPolicy",
"ConsensusPolicy",
"LazyConsensusPolicy",
"LeaderDrivenPolicy",
"VotingPolicy"
],
value="MajorityPolicy",
info="Type of decision-making process"
)
with gr.Row():
policy_scope = gr.Dropdown(
label="Policy Scope *",
choices=[], # Will be populated dynamically from defined scopes
value=None,
allow_custom_value=False,
info="Select the scope this policy applies to (required)"
)
# Participants for this policy
policy_participants = gr.Dropdown(
label="Policy Participants *",
choices=[], # Will be populated dynamically from defined participants
value=None,
allow_custom_value=False,
multiselect=True,
info="Select participants from defined roles/individuals (required)"
)
# Optional communication channel for this policy
with gr.Row():
communication_channel = gr.Textbox(
label="Communication Channel (Optional)",
placeholder="e.g., slack, email, github",
info="Medium used for decision discussion/notification"
)
with gr.Row():
# Decision type
decision_type = gr.Dropdown(
label="Decision Type",
choices=["BooleanDecision", "StringList", "ElementList"],
value="BooleanDecision"
)
# Options for StringList
decision_options = gr.Textbox(
label="Decision Options",
placeholder="e.g., option1, option2, option3 (for StringList)",
visible=False,
info="Comma-separated custom options for StringList decisions"
)
# Options for ElementList (Individual names)
decision_options_element_list = gr.Dropdown(
label="Decision Options",
choices=[],
value=None,
allow_custom_value=False,
multiselect=True,
visible=False,
interactive=True,
info="Select individual(s) for ElementList decisions"
)
# Parameters (for voting policies)
voting_ratio = gr.Slider(
label="Voting Ratio",
minimum=0.0,
maximum=1.0,
value=0.5,
step=0.1,
visible=True, # Start visible since MajorityPolicy is default
info="Required ratio of positive votes (1.0 = unanimous)"
)
with gr.Row():
# Default policy for LeaderDrivenPolicy
default_decision = gr.Dropdown(
label="Default Policy",
choices=[], # Will be populated with other defined policies
value=None,
allow_custom_value=False,
visible=False,
info="Default policy when leader doesn't participate (Showing only policies with same scope)"
)
# Fallback policy for ConsensusPolicy and LazyConsensusPolicy
fallback_policy = gr.Dropdown(
label="Fallback Policy",
choices=[], # Will be populated with other defined policies
value=None,
allow_custom_value=False,
visible=False,
info="Policy to use when consensus cannot be reached (Showing only policies with same scope)"
)
# Conditions section wrapped in a visual box
with gr.Group():
gr.Markdown("#### 📜 Policy Conditions (Optional)")
gr.Markdown("*Configure additional rules and constraints for this policy*")
# Add visual containment with padding
# We add left and right spacers to make it visual that it's contained
with gr.Row():
gr.HTML("")
with gr.Column(scale=100):
with gr.Row():
condition_type = gr.Dropdown(
label="Add Condition",
choices=[
"", # Empty option for no condition
"Deadline",
"MinDecisionTime",
"ParticipantExclusion",
"MinParticipants",
"VetoRight",
"CheckCiCd",
"MinTime",
"LabelCondition"
],
value="",
info="Select a condition type to add to this policy"
)
# Condition-specific fields (initially hidden)
with gr.Row():
veto_participants = gr.Dropdown(
label="Veto Right Participants",
choices=[], # Will be populated from participants
value=None,
multiselect=True,
allow_custom_value=False,
visible=False,
info="Participants who can veto this decision (does not allow default values yet, such as Author, RepoOwner, etc.)"
)
# ParticipantExclusion condition
excluded_participants = gr.Dropdown(
label="Excluded Participants",
choices=[], # Will be populated from participants
value=None,
multiselect=True,
allow_custom_value=False,
visible=False,
info="Participants excluded from this decision (does not allow default values yet, such as Author, RepoOwner, etc.)"
)
with gr.Row():
# MinParticipants condition
min_participants = gr.Number(
label="Minimum Participants",
value=None,
visible=False,
info="Minimum number of participants required (≥1)"
)
# Deadline condition fields
with gr.Row():
with gr.Row():
deadline_offset_value = gr.Number(
label="Deadline Offset Value",
value=None,
visible=False,
info="Time offset number (≥1, e.g., 7 for '7 days')"
)
deadline_offset_unit = gr.Dropdown(
label="Deadline Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the offset"
)
deadline_date = gr.Textbox(
label="Deadline Date (DD/MM/YYYY)",
placeholder="e.g., 25/12/2024",
visible=False,
info="Specific deadline date (optional, can be used with or without offset)"
)
# MinDecisionTime condition fields
with gr.Row():
with gr.Row():
min_decision_offset_value = gr.Number(
label="Min Decision Time Offset Value",
value=None,
visible=False,
info="Minimum time offset number (≥1, e.g., 2 for '2 days')"
)
min_decision_offset_unit = gr.Dropdown(
label="Min Decision Time Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the minimum decision time"
)
min_decision_date = gr.Textbox(
label="Min Decision Date (DD/MM/YYYY)",
placeholder="e.g., 01/01/2024",
visible=False,
info="Specific minimum decision date (optional, can be used with or without offset)"
)
# CheckCiCd condition fields
with gr.Row():
with gr.Row():
check_ci_cd_evaluation_mode = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to evaluate CI/CD checks (pre/post/concurrent)"
)
check_ci_cd_value = gr.Checkbox(
label="Check",
value=True,
visible=False,
info="Whether CI/CD checks must pass (true/false)"
)
with gr.Row():
with gr.Row():
# LabelCondition fields
label_condition_type = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to check labels (pre/post/concurrent)"
)
label_condition_operator = gr.Dropdown(
label="Label Operator",
choices=["", "not"],
value="",
visible=False,
info="Label condition operator (empty for required, 'not' for forbidden)"
)
label_condition_labels = gr.Textbox(
label="Labels",
placeholder="e.g., lgtm, approved",
visible=False,
info="Comma-separated list of labels"
)
# MinTime condition fields
with gr.Row():
with gr.Row():
min_time_evaluation_mode = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to evaluate MinTime (pre/post/concurrent, optional)"
)
min_time_activity = gr.Dropdown(
label="Activity Type",
choices=["Activity", "InActivity"],
value="Activity",
visible=False,
info="Check for activity or inactivity"
)
with gr.Row():
min_time_offset_value = gr.Number(
label="MinTime Offset Value",
value=None,
visible=False,
info="Time offset number (≥1, e.g., 7 for '7 days')"
)
min_time_offset_unit = gr.Dropdown(
label="MinTime Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the MinTime offset"
)
# Condition management buttons
with gr.Row():
with gr.Row():
add_condition_btn = gr.Button("➕ Add Condition", variant="secondary")
clear_conditions_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Added conditions display (adaptive height)
with gr.Row():
added_conditions_list = gr.Textbox(
label="Added Conditions",
value="",
interactive=False,
lines=2,
max_lines=10,
info="List of conditions added to this policy"
)
with gr.Row():
gr.HTML("") # Vertical spacer
gr.HTML("") # End of visual containment
with gr.Row():
add_policy_btn = gr.Button("➕ Add Policy", variant="secondary")
clear_policies_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added policies
policies_display = gr.Textbox(
label="Added Policies",
lines=6,
interactive=False,
placeholder="No policies added yet. Create policies to define your governance rules.",
info="Policies you've added will appear here"
)
# Hidden component to store policies data
policies_data = gr.State([])
with gr.Accordion("🔄 Composed Policy (Multi-phase)", open=False):
gr.Markdown("### Create Complex Multi-Phase Governance Policies")
gr.Markdown("Composed policies orchestrate multiple existing policies in sequence or parallel.")
# Basic composed policy information
with gr.Row():
composed_policy_name = gr.Textbox(
label="Composed Policy Name *",
placeholder="e.g., TwoPhaseReview, ComplexApproval",
info="Unique name for this composed policy"
)
composed_policy_scope = gr.Dropdown(
label="Composed Policy Scope *",
choices=[], # Will be populated dynamically from defined scopes
value=None,
allow_custom_value=False,
info="Select the scope this composed policy applies to (required)"
)
# Execution configuration
gr.Markdown("**⚙️ Execution Configuration**")
with gr.Row():
execution_type = gr.Dropdown(
label="Execution Type",
choices=["sequential", "parallel"],
value="sequential",
info="Sequential: phases run one after another; Parallel: phases run simultaneously"
)
require_all = gr.Checkbox(
label="Require All Phases",
value=True,
info="Must all phases succeed for the composed policy to succeed?"
)
carry_over = gr.Checkbox(
label="Carry Over Results",
value=True,
info="Pass results between phases? (Only applicable for sequential execution)"
)
# Phase management
gr.Markdown("**📋 Phase Management**")
gr.Markdown("Define each phase as a separate policy. Phases will inherit the composed policy's scope.")
# Phase policy definition (similar to single policy form)
with gr.Row():
phase_name = gr.Textbox(
label="Phase Name *",
placeholder="e.g., ReviewPhase, ApprovalPhase",
info="Unique name for this phase"
)
phase_type = gr.Dropdown(
label="Phase Policy Type",
choices=[
"MajorityPolicy",
"LeaderDrivenPolicy",
"AbsoluteMajorityPolicy",
"ConsensusPolicy",
"LazyConsensusPolicy",
"VotingPolicy"
],
value="MajorityPolicy",
info="Type of decision-making process for this phase"
)
with gr.Row():
# Participants for this phase
phase_participants = gr.Dropdown(
label="Phase Participants *",
choices=[], # Will be populated dynamically from defined participants
value=None,
allow_custom_value=False,
multiselect=True,
info="Select participants for this phase"
)
# Decision type for the phase
phase_decision_type = gr.Dropdown(
label="Phase Decision Type",
choices=["BooleanDecision", "StringList", "ElementList"],
value="BooleanDecision"
)
# Optional communication channel for this phase
with gr.Row():
phase_communication_channel = gr.Textbox(
label="Phase Communication Channel (Optional)",
placeholder="e.g., slack, email, github",
info="Medium used for this phase's communication"
)
with gr.Row():
# Options for StringList
phase_decision_options = gr.Textbox(
label="Phase Decision Options",
placeholder="e.g., option1, option2, option3 (for StringList)",
visible=False,
info="Comma-separated custom options for StringList decisions"
)
# Options for ElementList (Individual names)
phase_decision_options_element_list = gr.Dropdown(
label="Phase Decision Options",
choices=[],
value=None,
allow_custom_value=False,
multiselect=True,
visible=False,
interactive=True,
info="Select individual(s) for ElementList decisions"
)
# Parameters (for voting policies)
phase_voting_ratio = gr.Slider(
label="Phase Voting Ratio",
minimum=0.0,
maximum=1.0,
value=0.5,
step=0.1,
visible=True, # Start visible since MajorityPolicy is default for phases too
info="Required ratio of positive votes (1.0 = unanimous)"
)
# Phase-specific policy attributes (default/fallback)
with gr.Row():
phase_default_decision = gr.Dropdown(
label="Phase Default Decision",
choices=[],
value=None,
visible=False,
info="Default policy for LeaderDrivenPolicy phases"
)
phase_fallback_policy = gr.Dropdown(
label="Phase Fallback Policy",
choices=[],
value=None,
visible=False,
info="Fallback policy for Consensus/LazyConsensus phases"
)
# Phase Conditions Section
with gr.Group():
gr.Markdown("#### 📜 Phase Conditions (Optional)")
gr.Markdown("*Configure additional rules and constraints for this phase*")
# Add visual containment with padding following same structure
with gr.Row():
gr.HTML("") # Left spacer
with gr.Column(scale=100): # Main content area
with gr.Row():
phase_condition_type = gr.Dropdown(
label="Add Condition",
choices=[
"", # Empty option for no condition
"Deadline",
"MinDecisionTime",
"ParticipantExclusion",
"MinParticipants",
"VetoRight",
"CheckCiCd",
"MinTime",
"LabelCondition"
],
value="",
info="Select a condition type to add to this phase"
)
# VetoRight condition
with gr.Row():
phase_veto_participants = gr.Dropdown(
label="Veto Participants",
choices=[], # Will be populated from participants
value=None,
multiselect=True,
allow_custom_value=False,
visible=False,
info="Participants with veto power for this phase"
)
# ParticipantExclusion condition
with gr.Row():
phase_excluded_participants = gr.Dropdown(
label="Excluded Participants",
choices=[], # Will be populated from participants
value=None,
multiselect=True,
allow_custom_value=False,
visible=False,
info="Participants excluded from this phase decision"
)
# MinParticipants condition
with gr.Row():
phase_min_participants = gr.Number(
label="Minimum Participants",
value=None,
visible=False,
info="Minimum number of participants required for this phase (≥1)"
)
# Deadline condition fields
with gr.Row():
with gr.Row():
phase_deadline_offset_value = gr.Number(
label="Deadline Offset Value",
value=None,
visible=False,
info="Time offset number (≥1, e.g., 7 for '7 days')"
)
phase_deadline_offset_unit = gr.Dropdown(
label="Deadline Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the offset"
)
phase_deadline_date = gr.Textbox(
label="Deadline Date (DD/MM/YYYY)",
placeholder="e.g., 25/12/2024",
visible=False,
info="Specific deadline date (optional, can be used with or without offset)"
)
# MinDecisionTime condition fields
with gr.Row():
with gr.Row():
phase_min_decision_offset_value = gr.Number(
label="Min Decision Time Offset Value",
value=None,
visible=False,
info="Minimum time offset number (≥1, e.g., 2 for '2 days')"
)
phase_min_decision_offset_unit = gr.Dropdown(
label="Min Decision Time Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the minimum decision time"
)
phase_min_decision_date = gr.Textbox(
label="Min Decision Date (DD/MM/YYYY)",
placeholder="e.g., 01/01/2024",
visible=False,
info="Specific minimum decision date (optional, can be used with or without offset)"
)
# CheckCiCd condition fields
with gr.Row():
with gr.Row():
phase_check_ci_cd_evaluation_mode = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to evaluate CI/CD checks (pre/post/concurrent)"
)
phase_check_ci_cd_value = gr.Checkbox(
label="Check",
value=True,
visible=False,
info="Whether CI/CD checks must pass (true/false)"
)
# LabelCondition fields
with gr.Row():
with gr.Row():
phase_label_condition_type = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to check labels (pre/post/concurrent)"
)
phase_label_condition_operator = gr.Dropdown(
label="Label Operator",
choices=["", "not"],
value="",
visible=False,
info="Label condition operator (empty for required, 'not' for forbidden)"
)
phase_label_condition_labels = gr.Textbox(
label="Labels",
placeholder="e.g., lgtm, approved",
visible=False,
info="Comma-separated list of labels"
)
# Phase MinTime condition fields
with gr.Row():
with gr.Row():
phase_min_time_evaluation_mode = gr.Dropdown(
label="Evaluation Mode",
choices=["", "pre", "post", "concurrent"],
value="",
visible=False,
info="When to evaluate MinTime (pre/post/concurrent, optional)"
)
phase_min_time_activity = gr.Dropdown(
label="Activity Type",
choices=["Activity", "InActivity"],
value="Activity",
visible=False,
info="Check for activity or inactivity"
)
with gr.Row():
phase_min_time_offset_value = gr.Number(
label="MinTime Offset Value",
value=None,
visible=False,
info="Time offset number (≥1, e.g., 7 for '7 days')"
)
phase_min_time_offset_unit = gr.Dropdown(
label="MinTime Offset Unit",
choices=["days", "weeks", "months", "years"],
value="days",
visible=False,
info="Time unit for the MinTime offset"
)
# Phase condition management buttons
with gr.Row():
with gr.Row():
add_phase_condition_btn = gr.Button("➕ Add Condition", variant="secondary")
clear_phase_conditions_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Added phase conditions display
with gr.Row():
added_phase_conditions_list = gr.Textbox(
label="Added Conditions",
value="",
interactive=False,
lines=2,
max_lines=10,
info="List of conditions added to this phase"
)
with gr.Row():
gr.HTML("") # Vertical spacer
gr.HTML("")
# Phase management buttons
with gr.Row():
add_phase_btn = gr.Button("➕ Add Phase", variant="secondary")
clear_phases_btn = gr.Button("🗑️ Clear All Phases", variant="secondary")
# Display added phases
added_phases_list = gr.Textbox(
label="Added Phases",
value="",
interactive=False,
lines=1,
max_lines=8,
info="Phases that will be executed in this composed policy"
)
# Composed policy actions
with gr.Row():
add_composed_policy_btn = gr.Button("➕ Add Composed Policy", variant="primary")
clear_composed_policies_btn = gr.Button("🗑️ Clear All", variant="secondary")
# Display added composed policies
composed_policies_display = gr.Textbox(
label="Added Composed Policies",
lines=6,
interactive=False,
placeholder="No composed policies added yet. Create composed policies to define complex multi-phase governance.",
info="Composed policies you've added will appear here"
)
# Hidden component to store composed policies data
composed_policies_data = gr.State([])
return {
'policy_name': policy_name,
'policy_type': policy_type,
'policy_scope': policy_scope,
'policy_participants': policy_participants,
'communication_channel': communication_channel,
'decision_type': decision_type,
'decision_options': decision_options,
'decision_options_element_list': decision_options_element_list,
'voting_ratio': voting_ratio,
'default_decision': default_decision,
'fallback_policy': fallback_policy,
'condition_type': condition_type,
'veto_participants': veto_participants,
'excluded_participants': excluded_participants,
'min_participants': min_participants,
'deadline_offset_value': deadline_offset_value,
'deadline_offset_unit': deadline_offset_unit,
'deadline_date': deadline_date,
'min_decision_offset_value': min_decision_offset_value,
'min_decision_offset_unit': min_decision_offset_unit,
'min_decision_date': min_decision_date,
'check_ci_cd_evaluation_mode': check_ci_cd_evaluation_mode,
'check_ci_cd_value': check_ci_cd_value,
'label_condition_type': label_condition_type,
'label_condition_operator': label_condition_operator,
'label_condition_labels': label_condition_labels,
'min_time_evaluation_mode': min_time_evaluation_mode,
'min_time_activity': min_time_activity,
'min_time_offset_value': min_time_offset_value,
'min_time_offset_unit': min_time_offset_unit,
'add_condition_btn': add_condition_btn,
'clear_conditions_btn': clear_conditions_btn,
'added_conditions_list': added_conditions_list,
'add_policy_btn': add_policy_btn,
'clear_policies_btn': clear_policies_btn,
'policies_display': policies_display,
'policies_data': policies_data,
# Composed Policy components
'composed_policy_name': composed_policy_name,
'composed_policy_scope': composed_policy_scope,
'execution_type': execution_type,
'require_all': require_all,
'carry_over': carry_over,
# Phase definition components
'phase_name': phase_name,
'phase_type': phase_type,
'phase_participants': phase_participants,
'phase_communication_channel': phase_communication_channel,
'phase_decision_type': phase_decision_type,
'phase_decision_options': phase_decision_options,
'phase_decision_options_element_list': phase_decision_options_element_list,
'phase_voting_ratio': phase_voting_ratio,
'phase_default_decision': phase_default_decision,
'phase_fallback_policy': phase_fallback_policy,
# Phase condition components
'phase_condition_type': phase_condition_type,
'phase_veto_participants': phase_veto_participants,
'phase_excluded_participants': phase_excluded_participants,
'phase_min_participants': phase_min_participants,
'phase_deadline_offset_value': phase_deadline_offset_value,
'phase_deadline_offset_unit': phase_deadline_offset_unit,
'phase_deadline_date': phase_deadline_date,
'phase_min_decision_offset_value': phase_min_decision_offset_value,
'phase_min_decision_offset_unit': phase_min_decision_offset_unit,
'phase_min_decision_date': phase_min_decision_date,
'phase_check_ci_cd_evaluation_mode': phase_check_ci_cd_evaluation_mode,
'phase_check_ci_cd_value': phase_check_ci_cd_value,
'phase_label_condition_type': phase_label_condition_type,
'phase_label_condition_operator': phase_label_condition_operator,
'phase_label_condition_labels': phase_label_condition_labels,
'phase_min_time_evaluation_mode': phase_min_time_evaluation_mode,
'phase_min_time_activity': phase_min_time_activity,
'phase_min_time_offset_value': phase_min_time_offset_value,
'phase_min_time_offset_unit': phase_min_time_offset_unit,
'add_phase_condition_btn': add_phase_condition_btn,
'clear_phase_conditions_btn': clear_phase_conditions_btn,
'added_phase_conditions_list': added_phase_conditions_list,
'add_phase_btn': add_phase_btn,
'clear_phases_btn': clear_phases_btn,
'added_phases_list': added_phases_list,
'add_composed_policy_btn': add_composed_policy_btn,
'clear_composed_policies_btn': clear_composed_policies_btn,
'composed_policies_display': composed_policies_display,
'composed_policies_data': composed_policies_data
}
def _create_preview_panel(self):
"""Create the live preview panel"""
gr.Markdown("## 👁️ Live Preview")
gr.Markdown("Generated DSL code will appear here as you fill the form.")
self.preview_code = gr.Textbox(
label="Generated DSL",
lines=25,
max_lines=25,
interactive=False,
elem_classes=["preview-panel"],
show_copy_button=True
)
# Action buttons
with gr.Row():
generate_btn = gr.Button("🔄 Refresh Preview", variant="secondary", visible=False)
download_btn = gr.Button(
value="✅ Prepare Policy for Download",
variant="primary"
)
load_example_btn = gr.Button("📋 Load Example", variant="secondary", visible=False)
# Download file component - will output the policy file for download
download_file = gr.File(label="Download Policy", visible=True)
return {
'preview_code': self.preview_code,
'generate_btn': generate_btn,
'download_btn': download_btn,
'load_example_btn': load_example_btn,
'download_file': download_file
}
def _setup_event_handlers(self, scope_components, participant_components, policy_components, preview_components, _form_state):
"""Set up event handlers for form interactions"""
# Helper function to validate date format (DD/MM/YYYY)
def validate_date_format(date_string):
"""Validate date string is in DD/MM/YYYY format. Returns (is_valid, error_message)"""
if not date_string or not date_string.strip():
return True, "" # Date is optional
import re
date_string = date_string.strip()
# Check format DD/MM/YYYY
pattern = r'^(\d{1,2})/(\d{1,2})/(\d{4})$'
match = re.match(pattern, date_string)
if not match:
return False, "Date must be in DD/MM/YYYY format (e.g., 25/12/2024)"
day, month, year = map(int, match.groups())
# Validate day
if day < 1 or day > 31:
return False, "Day must be between 1 and 31"
# Validate month
if month < 1 or month > 12:
return False, "Month must be between 1 and 12"
# Validate year (reasonable range)
if year < 1900 or year > 2100:
return False, "Year must be between 1900 and 2100"
return True, ""
def add_profile(name, gender, race, language, current_profiles):
"""Add a new profile to the list"""
if not name.strip():
display_text = self._format_profiles_display(current_profiles)
error_message = f"❌ Error: Please enter a profile name\n\n{display_text}" if current_profiles else "❌ Error: Please enter a profile name"
return current_profiles, error_message, "", "", "", "", None
# Check if profile already exists
if any(p['name'] == name.strip() for p in current_profiles):
display_text = self._format_profiles_display(current_profiles)
error_message = f"❌ Error: Profile name already exists\n\n{display_text}" if current_profiles else "❌ Error: Profile name already exists"
return current_profiles, error_message, "", "", "", "", None
# Validate that at least one attribute is provided
if not gender and not race and not language:
display_text = self._format_profiles_display(current_profiles)
error_message = f"❌ Error: Profiles must have at least one attribute (gender, race, or language)\n\n{display_text}" if current_profiles else "❌ Error: Profiles must have at least one attribute (gender, race, or language)"
return current_profiles, error_message, "", "", "", "", None
# Create new profile
new_profile = {
'name': name.strip(),
'gender': gender if gender else None,
'race': race if race else None,
'language': language if language else None
}
updated_profiles = current_profiles + [new_profile]
# Update display
display_text = self._format_profiles_display(updated_profiles)
success_message = f"✅ Profile '{new_profile['name']}' added successfully!\n\n{display_text}"
return updated_profiles, success_message, "", "", "", "", None # Clear individual profile selection
def clear_profiles():
"""Clear all profiles"""
return [], "No profiles added yet", None
def add_role(name, vote_value, current_roles, current_individuals, current_agents):
"""Add a new role to the list"""
if not name.strip():
display_text = self._format_roles_display(current_roles)
error_message = f"❌ Error: Please enter a role name\n\n{display_text}" if current_roles else "❌ Error: Please enter a role name"
return current_roles, error_message, "", None
# Validate vote_value is not negative if provided
if vote_value is not None and vote_value < 0.0:
display_text = self._format_roles_display(current_roles)
error_message = f"❌ Error: Vote value cannot be negative\n\n{display_text}" if current_roles else "❌ Error: Vote value cannot be negative"
return current_roles, error_message, "", None
# Check for global name uniqueness across all participant types
role_name = name.strip()
# Check against existing individuals
if any(i['name'] == role_name for i in current_individuals):
display_text = self._format_roles_display(current_roles)
error_message = f"❌ Error: Name '{role_name}' already exists as an individual\n\n{display_text}" if current_roles else f"❌ Error: Name '{role_name}' already exists as an individual"
return current_roles, error_message, "", None
# Check against existing agents
if any(a['name'] == role_name for a in current_agents):
display_text = self._format_roles_display(current_roles)
error_message = f"❌ Error: Name '{role_name}' already exists as an agent\n\n{display_text}" if current_roles else f"❌ Error: Name '{role_name}' already exists as an agent"
return current_roles, error_message, "", None
# Check against existing roles
if any(r['name'] == role_name for r in current_roles):
display_text = self._format_roles_display(current_roles)
error_message = f"❌ Error: Role name already exists\n\n{display_text}" if current_roles else "❌ Error: Role name already exists"
return current_roles, error_message, "", None
# Add new role with optional vote value
new_role = {
'name': role_name,
'vote_value': vote_value if vote_value != 1.0 else None # Only store if different from default 1.0
}
updated_roles = current_roles + [new_role]
display_text = self._format_roles_display(updated_roles)
success_message = f"✅ Role '{role_name}' added successfully!\n\n{display_text}"
return updated_roles, success_message, "", None
def clear_all_roles():
"""Clear all roles"""
return [], "No roles added yet. Create roles to define project responsibilities.", "", None
def add_individual(name, vote_value, profile, role, current_individuals, current_profiles, current_roles, current_agents):
"""Add a new individual to the list"""
if not name.strip():
return current_individuals, "❌ Please enter an individual name", "", 1.0, None, None
# Validate vote_value is not negative if provided
if vote_value is not None and vote_value < 0.0:
return current_individuals, "❌ Vote value cannot be negative", "", 1.0, None, None
# Check for global name uniqueness across all participant types
individual_name = name.strip()
# Check against existing roles
if any(r['name'] == individual_name for r in current_roles):
return current_individuals, f"❌ Name '{individual_name}' already exists as a role", "", 1.0, None, None
# Check against existing agents
if any(a['name'] == individual_name for a in current_agents):
return current_individuals, f"❌ Name '{individual_name}' already exists as an agent", "", 1.0, None, None
# Check against existing individuals
if any(i['name'] == individual_name for i in current_individuals):
return current_individuals, "❌ Individual name already exists", "", 1.0, None, None
# Create new individual
new_individual = {
'name': name.strip(),
'vote_value': vote_value if vote_value not in (None, 1.0) else None, # Only store if different from default 1.0
'profile': profile if profile else None,
'role': role if role else None
}
updated_individuals = current_individuals + [new_individual]
# Update display
display_text = self._format_individuals_display(updated_individuals)
success_message = f"✅ Individual '{new_individual['name']}' added successfully!\n\n{display_text}"
# Update dropdown choices for next individual
return updated_individuals, success_message, "", 1.0, None, None
def clear_individuals():
"""Clear all individuals"""
return [], "No individuals added yet"
def update_individual_dropdowns(current_roles, current_profiles):
"""Update individual dropdown choices when roles or profiles change"""
return (
gr.Dropdown(choices=[p['name'] for p in current_profiles] if current_profiles else [], value=None),
gr.Dropdown(choices=[r['name'] for r in current_roles] if current_roles else [], value=None)
)
def add_agent(name, vote_value, confidence, autonomy_level, explainability, role, current_agents, current_roles, current_individuals):
"""Add a new agent to the list"""
if not name.strip():
return current_agents, "❌ Please enter an agent name", "", 1.0, None, None, None, None
# Validate vote_value is not negative if provided
if vote_value is not None and vote_value < 0.0:
return current_agents, "❌ Vote value cannot be negative", "", 1.0, None, None, None, None
# Check for global name uniqueness across all participant types
agent_name = name.strip()
# Check against existing roles
if any(r['name'] == agent_name for r in current_roles):
return current_agents, f"❌ Name '{agent_name}' already exists as a role", "", 1.0, None, None, None, None
# Check against existing individuals
if any(i['name'] == agent_name for i in current_individuals):
return current_agents, f"❌ Name '{agent_name}' already exists as an individual", "", 1.0, None, None, None, None
# Check against existing agents
if any(a['name'] == agent_name for a in current_agents):
return current_agents, "❌ Agent name already exists", "", 1.0, None, None, None, None
# Create new agent
new_agent = {
'name': name.strip(),
'vote_value': vote_value if vote_value != 1.0 else None, # Only store if different from default 1.0
'confidence': confidence if confidence is not None else None,
'autonomy_level': autonomy_level if autonomy_level is not None else None,
'explainability': explainability if explainability is not None else None,
'role': role if role else None
}
updated_agents = current_agents + [new_agent]
# Update display
display_text = self._format_agents_display(updated_agents)
success_message = f"✅ Agent '{new_agent['name']}' added successfully!\n\n{display_text}"
return updated_agents, success_message, "", 1.0, None, None, None, None
def clear_agents():
"""Clear all agents"""
return [], "No agents added yet"
def update_agent_role_dropdown(current_roles):
"""Update agent role dropdown when roles change"""
return gr.Dropdown(choices=[r['name'] for r in current_roles] if current_roles else [], value=None)
# Scope event handlers
def add_project(name, platform, repo_owner, repo_name, current_projects, current_activities=None, current_tasks=None):
"""Add a new project to the list"""
if not name.strip():
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Please enter a project name\n\n{display_text}" if current_projects else "❌ Error: Please enter a project name"
return current_projects, error_message, "", "", "", ""
# Check for global name uniqueness across projects, activities, and tasks
project_name = name.strip()
# Check against existing projects
if any(p['name'] == project_name for p in current_projects):
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Project name already exists\n\n{display_text}" if current_projects else "❌ Error: Project name already exists"
return current_projects, error_message, "", "", "", ""
# Check against existing activities (if provided)
if current_activities and any(a['name'] == project_name for a in current_activities):
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Name '{project_name}' already exists as an activity\n\n{display_text}" if current_projects else f"❌ Error: Name '{project_name}' already exists as an activity"
return current_projects, error_message, "", "", "", ""
# Check against existing tasks (if provided)
if current_tasks and any(t['name'] == project_name for t in current_tasks):
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Name '{project_name}' already exists as a task\n\n{display_text}" if current_projects else f"❌ Error: Name '{project_name}' already exists as a task"
return current_projects, error_message, "", "", "", ""
# Validate platform and repository relationship
if platform and platform.strip():
if not repo_owner or not repo_owner.strip():
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Repository owner is required when a platform is selected\n\n{display_text}" if current_projects else "❌ Error: Repository owner is required when a platform is selected"
return current_projects, error_message, "", "", "", ""
if not repo_name or not repo_name.strip():
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Repository name is required when a platform is selected\n\n{display_text}" if current_projects else "❌ Error: Repository name is required when a platform is selected"
return current_projects, error_message, "", "", "", ""
# Check for duplicate platform-repository combination
full_repo = f"{repo_owner.strip()}/{repo_name.strip()}"
if any(p.get('platform') == platform.strip() and p.get('repo') == full_repo for p in current_projects):
display_text = self._format_projects_display(current_projects)
error_message = f"❌ Error: Repository {full_repo} already exists for {platform}\n\n{display_text}" if current_projects else f"❌ Error: Repository {full_repo} already exists for {platform}"
return current_projects, error_message, "", "", "", ""
# Create new project
new_project = {
'name': name.strip(),
'platform': platform.strip() if platform and platform.strip() else None,
'repo': f"{repo_owner.strip()}/{repo_name.strip()}" if repo_owner and repo_owner.strip() and repo_name and repo_name.strip() else None
}
updated_projects = current_projects + [new_project]
success_message = self._format_projects_display(updated_projects)
return updated_projects, success_message, "", "", "", ""
def clear_projects():
"""Clear all projects"""
return [], "No projects added yet. Create projects to define your organizational structure."
def update_repository_visibility(platform):
"""Show/hide repository fields based on platform selection"""
if platform and platform.strip():
return gr.Textbox(visible=True), gr.Textbox(visible=True)
else:
return gr.Textbox(value="", visible=False), gr.Textbox(value="", visible=False)
def add_activity(name, parent, current_activities, current_projects):
"""Add a new activity to the list"""
if not name.strip():
display_text = self._format_activities_display(current_activities)
error_message = f"❌ Error: Please enter an activity name\n\n{display_text}" if current_activities else "❌ Error: Please enter an activity name"
return current_activities, error_message, "", None
# Check for global name uniqueness across projects and activities
activity_name = name.strip()
# Check against existing projects
if any(p['name'] == activity_name for p in current_projects):
display_text = self._format_activities_display(current_activities)
error_message = f"❌ Error: Name '{activity_name}' already exists as a project\n\n{display_text}" if current_activities else f"❌ Error: Name '{activity_name}' already exists as a project"
return current_activities, error_message, "", None
# Check against existing activities
if any(a['name'] == activity_name for a in current_activities):
display_text = self._format_activities_display(current_activities)
error_message = f"❌ Error: Activity name already exists\n\n{display_text}" if current_activities else "❌ Error: Activity name already exists"
return current_activities, error_message, "", None
# Create new activity
new_activity = {
'name': activity_name,
'parent': parent if parent else None
}
updated_activities = current_activities + [new_activity]
success_message = self._format_activities_display(updated_activities)
return updated_activities, success_message, "", None
def clear_activities():
"""Clear all activities"""
return [], "No activities added yet. Create activities to organize your project workflow."
def update_activity_parent_dropdown(current_projects):
"""Update activity parent dropdown when projects change"""
project_choices = [p['name'] for p in current_projects]
return gr.Dropdown(choices=project_choices, value=None)
def add_task(name, parent, task_type, action, current_tasks, current_projects, current_activities):
"""Add a new task to the list"""
if not name.strip():
display_text = self._format_tasks_display(current_tasks)
error_message = f"❌ Error: Please enter a task name\n\n{display_text}" if current_tasks else "❌ Error: Please enter a task name"
return current_tasks, error_message, "", None, "", ""
# Check for global name uniqueness across projects, activities, and tasks
task_name = name.strip()
# Check against existing projects
if any(p['name'] == task_name for p in current_projects):
display_text = self._format_tasks_display(current_tasks)
error_message = f"❌ Error: Name '{task_name}' already exists as a project\n\n{display_text}" if current_tasks else f"❌ Error: Name '{task_name}' already exists as a project"
return current_tasks, error_message, "", None, "", ""
# Check against existing activities
if any(a['name'] == task_name for a in current_activities):
display_text = self._format_tasks_display(current_tasks)
error_message = f"❌ Error: Name '{task_name}' already exists as an activity\n\n{display_text}" if current_tasks else f"❌ Error: Name '{task_name}' already exists as an activity"
return current_tasks, error_message, "", None, "", ""
# Check against existing tasks
if any(t['name'] == task_name for t in current_tasks):
display_text = self._format_tasks_display(current_tasks)
error_message = f"❌ Error: Task name already exists\n\n{display_text}" if current_tasks else "❌ Error: Task name already exists"
return current_tasks, error_message, "", None, "", ""
# Validate parent is an activity (if provided)
if parent and parent.strip():
if not any(a['name'] == parent for a in current_activities):
display_text = self._format_tasks_display(current_tasks)
error_message = f"❌ Error: Parent '{parent}' is not a valid activity\n\n{display_text}" if current_tasks else f"❌ Error: Parent '{parent}' is not a valid activity"
return current_tasks, error_message, "", None, "", ""
# Create new task
new_task = {
'name': task_name,
'parent': parent if parent and parent.strip() else None,
'type': task_type if task_type and task_type.strip() else None,
'action': action if action and action.strip() else None
}
updated_tasks = current_tasks + [new_task]
success_message = self._format_tasks_display(updated_tasks)
return updated_tasks, success_message, "", None, "", ""
def clear_tasks():
"""Clear all tasks"""
return [], "No tasks added yet. Create tasks to define specific governance actions."
def update_task_action_dropdown(task_type):
"""Update task action dropdown based on task type"""
if task_type == "Pull request":
action_choices = ["", "merge", "review"]
return gr.Dropdown(choices=action_choices, value="", visible=True)
elif task_type == "MemberLifecycle":
action_choices = ["", "onboard", "remove"]
return gr.Dropdown(choices=action_choices, value="", visible=True)
else:
# Empty type - hide action dropdown
return gr.Dropdown(choices=[""], value="", visible=False)
def update_task_parent_dropdown(current_activities):
"""Update task parent dropdown when activities change (tasks can only have activities as parents)"""
parent_choices = [a['name'] for a in current_activities]
return gr.Dropdown(choices=parent_choices, value=None)
def update_scope_dropdown(projects_data, activities_data, tasks_data):
"""Update policy scope dropdown when scope definitions change"""
scope_choices = []
# Extract names from structured data
for project in projects_data:
if project['name']:
scope_choices.append(project['name'])
for activity in activities_data:
if activity['name']:
scope_choices.append(activity['name'])
for task in tasks_data:
if task['name']:
scope_choices.append(task['name'])
return gr.Dropdown(choices=sorted(list(set(scope_choices))), value=None)
def update_participant_dropdown(roles_data, individuals_data, agents_data):
"""Update policy participants dropdown when participant definitions change"""
participant_choices = self._extract_participant_names(roles_data, individuals_data, agents_data)
return (
gr.Dropdown(choices=participant_choices, value=None), # policy_participants
gr.Dropdown(choices=participant_choices, value=None), # veto_participants
gr.Dropdown(choices=participant_choices, value=None), # excluded_participants
gr.Dropdown(choices=participant_choices, value=None), # phase_participants
gr.Dropdown(choices=participant_choices, value=None), # phase_veto_participants
gr.Dropdown(choices=participant_choices, value=None) # phase_excluded_participants
)
def update_phase_participant_dropdown(roles_data, individuals_data, agents_data):
"""Update phase participants dropdown when participant definitions change"""
participant_choices = self._extract_participant_names(roles_data, individuals_data, agents_data)
return gr.Dropdown(choices=participant_choices, value=None)
def update_policy_reference_dropdowns(policies_data, current_scope=None):
"""Update default and fallback policy dropdowns when policies change or scope changes"""
if not policies_data or not current_scope:
# If no policies exist OR no scope is specified, show no policies
policy_choices = []
else:
# Filter policies to only show those with the same scope
policy_choices = [p['name'] for p in policies_data if p.get('scope') == current_scope]
return (
gr.Dropdown(choices=policy_choices), # default_decision
gr.Dropdown(choices=policy_choices) # fallback_policy
)
def update_policy_reference_dropdowns_on_scope_change(policies_data, current_scope=None):
"""Update and clear default and fallback policy dropdowns when scope changes"""
if not policies_data or not current_scope:
# If no policies exist OR no scope is specified, show no policies
policy_choices = []
else:
# Filter policies to only show those with the same scope
policy_choices = [p['name'] for p in policies_data if p.get('scope') == current_scope]
return (
gr.Dropdown(choices=policy_choices, value=None), # default_decision - clear selection
gr.Dropdown(choices=policy_choices, value=None) # fallback_policy - clear selection
)
def update_policy_type_visibility(policy_type):
"""Update visibility of policy-specific attributes based on policy type"""
show_default = policy_type == "LeaderDrivenPolicy"
show_fallback = policy_type in ["ConsensusPolicy", "LazyConsensusPolicy"]
show_voting_ratio = policy_type in ["MajorityPolicy", "AbsoluteMajorityPolicy", "VotingPolicy"]
return (
gr.Dropdown(visible=show_default), # default_decision
gr.Dropdown(visible=show_fallback), # fallback_policy
gr.Slider(visible=show_voting_ratio) # voting_ratio
)
def update_decision_options_visibility(decision_type):
"""Update visibility of decision options based on decision type"""
show_string_list = decision_type == "StringList"
show_element_list = decision_type == "ElementList"
return (
gr.update(visible=show_string_list),
gr.update(visible=show_element_list)
)
def update_condition_visibility(condition_type):
"""Update visibility of condition fields based on condition type"""
show_veto = condition_type == "VetoRight"
show_exclusion = condition_type == "ParticipantExclusion"
show_min_participants = condition_type == "MinParticipants"
show_deadline = condition_type == "Deadline"
show_min_decision = condition_type == "MinDecisionTime"
show_check_ci_cd = condition_type == "CheckCiCd"
show_label = condition_type == "LabelCondition"
show_min_time = condition_type == "MinTime"
return [
gr.Dropdown(visible=show_veto), # veto_participants
gr.Dropdown(visible=show_exclusion), # excluded_participants
gr.Number(visible=show_min_participants), # min_participants
gr.Number(visible=show_deadline), # deadline_offset_value
gr.Dropdown(visible=show_deadline), # deadline_offset_unit
gr.Textbox(visible=show_deadline), # deadline_date
gr.Number(visible=show_min_decision), # min_decision_offset_value
gr.Dropdown(visible=show_min_decision), # min_decision_offset_unit
gr.Textbox(visible=show_min_decision), # min_decision_date
gr.Dropdown(visible=show_check_ci_cd), # check_ci_cd_evaluation_mode
gr.Checkbox(visible=show_check_ci_cd), # check_ci_cd_value
gr.Dropdown(visible=show_label), # label_condition_type
gr.Dropdown(visible=show_label), # label_condition_operator
gr.Textbox(visible=show_label), # label_condition_labels
gr.Dropdown(visible=show_min_time), # min_time_evaluation_mode
gr.Dropdown(visible=show_min_time), # min_time_activity
gr.Number(visible=show_min_time), # min_time_offset_value
gr.Dropdown(visible=show_min_time) # min_time_offset_unit
]
def add_policy(name, policy_type, scope, participants, decision_type, decision_options, voting_ratio,
default_decision, fallback_policy, condition_type, veto_participants, excluded_participants,
min_participants, deadline_offset_value, deadline_offset_unit, deadline_date,
min_decision_offset_value, min_decision_offset_unit, min_decision_date,
label_condition_type, label_condition_operator, label_condition_labels,
communication_channel,
added_conditions_list, current_policies):
"""Add a new policy to the list"""
# Validate policy name (mandatory)
if not name.strip():
display_text = self._format_policies_display(current_policies)
error_message = f"❌ Error: Please enter a policy name\n\n{display_text}" if current_policies else "❌ Error: Please enter a policy name"
return current_policies, error_message
# Validate policy scope (mandatory)
if not scope:
display_text = self._format_policies_display(current_policies)
error_message = f"❌ Error: Please select a policy scope\n\n{display_text}" if current_policies else "❌ Error: Please select a policy scope"
return current_policies, error_message
# Validate policy participants (mandatory)
if not participants or (isinstance(participants, list) and len(participants) == 0):
display_text = self._format_policies_display(current_policies)
error_message = f"❌ Error: Please select at least one participant\n\n{display_text}" if current_policies else "❌ Error: Please select at least one participant"
return current_policies, error_message
# Check if policy already exists
if any(p['name'] == name.strip() for p in current_policies):
display_text = self._format_policies_display(current_policies)
error_message = f"❌ Error: Policy name '{name.strip()}' already exists\n\n{display_text}"
return current_policies, error_message
# Handle decision_options: convert list to string for ElementList, keep string for StringList
final_decision_options = None
if decision_type in ["StringList", "ElementList"] and decision_options:
if decision_type == "ElementList" and isinstance(decision_options, list):
# Convert list from multiselect dropdown to comma-separated string
final_decision_options = ",".join(decision_options) if decision_options else None
else:
# StringList: use as-is (already a string from textbox)
final_decision_options = decision_options
# Create new policy
new_policy = {
'name': name.strip(),
'type': policy_type,
'scope': scope if scope else None,
'participants': participants if participants else None,
'decision_type': decision_type,
'decision_options': final_decision_options,
'voting_ratio': voting_ratio if policy_type in ["MajorityPolicy", "AbsoluteMajorityPolicy", "VotingPolicy"] else None,
'default_decision': default_decision if policy_type == "LeaderDrivenPolicy" else None,
'fallback_policy': fallback_policy if policy_type in ["ConsensusPolicy", "LazyConsensusPolicy"] else None,
'communication_channel': communication_channel.strip() if communication_channel and communication_channel.strip() else None,
# Store the dynamically added conditions
'added_conditions': added_conditions_list.strip() if added_conditions_list else None,
# Keep backward compatibility with single condition fields for now
'condition_type': condition_type if condition_type else None,
'veto_participants': veto_participants if condition_type == "VetoRight" else None,
'excluded_participants': excluded_participants if condition_type == "ParticipantExclusion" else None,
'min_participants': min_participants if condition_type == "MinParticipants" else None,
'deadline_offset_value': deadline_offset_value if condition_type == "Deadline" else None,
'deadline_offset_unit': deadline_offset_unit if condition_type == "Deadline" else None,
'deadline_date': deadline_date if condition_type == "Deadline" else None,
'min_decision_offset_value': min_decision_offset_value if condition_type == "MinDecisionTime" else None,
'min_decision_offset_unit': min_decision_offset_unit if condition_type == "MinDecisionTime" else None,
'min_decision_date': min_decision_date if condition_type == "MinDecisionTime" else None,
'label_condition_type': label_condition_type if condition_type == "LabelCondition" else None,
'label_condition_operator': label_condition_operator if condition_type == "LabelCondition" else None,
'label_condition_labels': label_condition_labels if condition_type == "LabelCondition" else None
}
updated_policies = current_policies + [new_policy]
# Update display
display_text = self._format_policies_display(updated_policies)
success_message = f"✅ Policy '{new_policy['name']}' added successfully!\n\n{display_text}"
# For successful addition, return updated policies and success message
return updated_policies, success_message
def clear_policies():
# Clear all policies
return [], "No policies added yet"
def add_condition(condition_type, veto_participants, excluded_participants, min_participants,
deadline_offset_value, deadline_offset_unit, deadline_date,
min_decision_offset_value, min_decision_offset_unit, min_decision_date,
check_ci_cd_evaluation_mode, check_ci_cd_value,
label_condition_type, label_condition_operator, label_condition_labels,
min_time_evaluation_mode, min_time_activity, min_time_offset_value, min_time_offset_unit,
current_conditions_text):
"""Add a condition to the current policy"""
# Parse existing conditions from the display text (ignore error messages and success messages)
existing_conditions = []
if current_conditions_text.strip():
for line in current_conditions_text.strip().split('\n'):
line = line.strip()
# Skip error messages, success messages, and empty lines
if line and not line.startswith('❌ Error:') and not line.startswith('✅'):
existing_conditions.append(line)
# Helper function to format the conditions display with error
def format_conditions_with_error(error_msg):
if existing_conditions:
conditions_text = '\n'.join(existing_conditions)
return f"❌ Error: {error_msg}\n\n{conditions_text}"
else:
return f"❌ Error: {error_msg}"
# Helper function to return error with all condition fields reset
def return_error(error_msg):
return (
format_conditions_with_error(error_msg), # added_conditions_list
gr.Dropdown(value=""), # condition_type
gr.Dropdown(value=None), # veto_participants
gr.Dropdown(value=None), # excluded_participants
None, # min_participants
None, # deadline_offset_value
"days", # deadline_offset_unit
"", # deadline_date
None, # min_decision_offset_value
"days", # min_decision_offset_unit
"", # min_decision_date
"", # check_ci_cd_evaluation_mode
True, # check_ci_cd_value
"", # label_condition_type
"", # label_condition_operator
"", # label_condition_labels
"", # min_time_evaluation_mode
"Activity", # min_time_activity
None, # min_time_offset_value
"days" # min_time_offset_unit
)
if not condition_type:
return return_error("Please select a condition type")
# Check for duplicates (except LabelCondition)
existing_types = []
for condition in existing_conditions:
if condition.startswith('Deadline'):
existing_types.append('Deadline')
elif condition.startswith('MinDecisionTime'):
existing_types.append('MinDecisionTime')
elif condition.startswith('ParticipantExclusion'):
existing_types.append('ParticipantExclusion')
elif condition.startswith('MinParticipants'):
existing_types.append('MinParticipants')
elif condition.startswith('VetoRight'):
existing_types.append('VetoRight')
elif condition.startswith('MinTime'):
existing_types.append('MinTime')
elif condition.startswith('CheckCiCd'):
existing_types.append('CheckCiCd')
# LabelCondition can have multiple entries, so we don't track it
if condition_type != "LabelCondition" and condition_type in existing_types:
return return_error(f"{condition_type} already exists. Only LabelCondition can be added multiple times.")
# Create the condition string
condition_str = ""
if condition_type == "VetoRight":
if not veto_participants:
return return_error("Please specify veto participants")
# veto_participants is already a list from multiselect dropdown
participants_list = [p.strip() for p in veto_participants if p.strip()] if isinstance(veto_participants, list) else [veto_participants.strip()]
condition_str = f"VetoRight: {', '.join(participants_list)}"
elif condition_type == "ParticipantExclusion":
if not excluded_participants:
return return_error("Please specify excluded participants")
# excluded_participants is already a list from multiselect dropdown
participants_list = [p.strip() for p in excluded_participants if p.strip()] if isinstance(excluded_participants, list) else [excluded_participants.strip()]
condition_str = f"ParticipantExclusion: {', '.join(participants_list)}"
elif condition_type == "MinParticipants":
if not min_participants or min_participants < 1:
return return_error("Please specify minimum participants (≥1)")
condition_str = f"MinParticipants: {min_participants}"
elif condition_type == "Deadline":
parts = []
if deadline_offset_value and deadline_offset_unit:
if deadline_offset_value < 1:
return return_error("Deadline offset value must be ≥1")
parts.append(f"{deadline_offset_value} {deadline_offset_unit}")
if deadline_date and deadline_date.strip():
is_valid, error_msg = validate_date_format(deadline_date)
if not is_valid:
return return_error(f"Deadline date error: {error_msg}")
parts.append(deadline_date.strip())
if not parts:
return return_error("Please specify deadline offset or date")
condition_str = f"Deadline: {', '.join(parts)}"
elif condition_type == "MinDecisionTime":
parts = []
if min_decision_offset_value and min_decision_offset_unit:
if min_decision_offset_value < 1:
return return_error("Minimum decision time offset value must be ≥1")
parts.append(f"{min_decision_offset_value} {min_decision_offset_unit}")
if min_decision_date and min_decision_date.strip():
is_valid, error_msg = validate_date_format(min_decision_date)
if not is_valid:
return return_error(f"Minimum decision time date error: {error_msg}")
parts.append(min_decision_date.strip())
if not parts:
return return_error("Please specify minimum decision time offset or date")
condition_str = f"MinDecisionTime: {', '.join(parts)}"
elif condition_type == "CheckCiCd":
# CheckCiCd: evaluation_mode (optional) and boolean value (true/false)
evaluation_part = f"{check_ci_cd_evaluation_mode} " if check_ci_cd_evaluation_mode and check_ci_cd_evaluation_mode.strip() else ""
check_value = "true" if check_ci_cd_value else "false"
condition_str = f"CheckCiCd {evaluation_part}: {check_value}"
elif condition_type == "MinTime":
# MinTime: evaluation_mode (optional), activity (required), offset value and unit (required)
if not min_time_activity or not min_time_activity.strip():
return return_error("Please specify activity type (Activity/InActivity)")
if not min_time_offset_value or min_time_offset_value < 1:
return return_error("Please specify MinTime offset value (≥1)")
if not min_time_offset_unit:
return return_error("Please specify MinTime offset unit")
# Build with optional evaluation mode
evaluation_part = f"{min_time_evaluation_mode} " if min_time_evaluation_mode and min_time_evaluation_mode.strip() else ""
condition_str = f"MinTime {evaluation_part}of {min_time_activity}: {min_time_offset_value} {min_time_offset_unit}".rstrip()
elif condition_type == "LabelCondition":
if not label_condition_labels:
return return_error("Please specify labels")
labels = [l.strip() for l in label_condition_labels.split(',') if l.strip()]
if not labels:
return return_error("Please specify valid labels")
# Build the condition string with optional evaluation mode
evaluation_part = f"{label_condition_type} " if label_condition_type and label_condition_type.strip() else ""
operator_part = f"{label_condition_operator} " if label_condition_operator else ""
condition_str = f"LabelCondition {evaluation_part}{operator_part}: {', '.join(labels)}".rstrip()
if condition_str:
# Add to existing conditions and format success message
updated_conditions = existing_conditions + [condition_str]
conditions_text = '\n'.join(updated_conditions)
success_message = f"✅ {condition_type} condition added successfully!\n\n{conditions_text}"
return (
success_message, # added_conditions_list
gr.Dropdown(value=""), # condition_type
gr.Dropdown(value=None), # veto_participants
gr.Dropdown(value=None), # excluded_participants
None, # min_participants
None, # deadline_offset_value
"days", # deadline_offset_unit
"", # deadline_date
None, # min_decision_offset_value
"days", # min_decision_offset_unit
"", # min_decision_date
"", # check_ci_cd_evaluation_mode
True, # check_ci_cd_value
"", # label_condition_type
"", # label_condition_operator
"", # label_condition_labels
"", # min_time_evaluation_mode
"Activity", # min_time_activity
None, # min_time_offset_value
"days" # min_time_offset_unit
)
return return_error("Unable to add condition")
def clear_conditions():
"""Clear all conditions for the current policy"""
return ""
def add_phase_condition(phase_condition_type, phase_veto_participants, phase_excluded_participants,
phase_min_participants, phase_deadline_offset_value, phase_deadline_offset_unit,
phase_deadline_date, phase_min_decision_offset_value, phase_min_decision_offset_unit,
phase_min_decision_date, phase_check_ci_cd_evaluation_mode, phase_check_ci_cd_value,
phase_label_condition_type, phase_label_condition_operator,
phase_label_condition_labels, phase_min_time_evaluation_mode, phase_min_time_activity,
phase_min_time_offset_value, phase_min_time_offset_unit, current_phase_conditions_text):
"""Add a condition to the current phase"""
# Parse existing conditions from the display text (ignore error messages and success messages)
existing_conditions = []
if current_phase_conditions_text.strip():
for line in current_phase_conditions_text.strip().split('\n'):
line = line.strip()
# Skip error messages, success messages, and empty lines
if line and not line.startswith('❌ Error:') and not line.startswith('✅'):
existing_conditions.append(line)
# Helper function to format the conditions display with error
def format_conditions_with_error(error_msg):
if existing_conditions:
conditions_text = '\n'.join(existing_conditions)
return f"❌ Error: {error_msg}\n\n{conditions_text}"
else:
return f"❌ Error: {error_msg}"
# Helper function to return error with all condition fields reset
def return_error(error_msg):
return (
format_conditions_with_error(error_msg), # added_phase_conditions_list
gr.Dropdown(value=""), # phase_condition_type
gr.Dropdown(value=None), # phase_veto_participants
gr.Dropdown(value=None), # phase_excluded_participants
None, # phase_min_participants
None, # phase_deadline_offset_value
"days", # phase_deadline_offset_unit
"", # phase_deadline_date
None, # phase_min_decision_offset_value
"days", # phase_min_decision_offset_unit
"", # phase_min_decision_date
"", # phase_check_ci_cd_evaluation_mode
True, # phase_check_ci_cd_value
"", # phase_label_condition_type
"", # phase_label_condition_operator
"", # phase_label_condition_labels
"", # phase_min_time_evaluation_mode
"Activity", # phase_min_time_activity
None, # phase_min_time_offset_value
"days" # phase_min_time_offset_unit
)
if not phase_condition_type:
return return_error("Please select a condition type")
# Check for duplicates (except LabelCondition)
existing_types = []
for condition in existing_conditions:
if condition.startswith('Deadline'):
existing_types.append('Deadline')
elif condition.startswith('MinDecisionTime'):
existing_types.append('MinDecisionTime')
elif condition.startswith('ParticipantExclusion'):
existing_types.append('ParticipantExclusion')
elif condition.startswith('MinParticipants'):
existing_types.append('MinParticipants')
elif condition.startswith('VetoRight'):
existing_types.append('VetoRight')
elif condition.startswith('MinTime'):
existing_types.append('MinTime')
elif condition.startswith('CheckCiCd'):
existing_types.append('CheckCiCd')
# LabelCondition can have multiple entries, so we don't track it
if phase_condition_type != "LabelCondition" and phase_condition_type in existing_types:
return return_error(f"{phase_condition_type} already exists. Only LabelCondition can be added multiple times.")
# Create the condition string
condition_str = ""
if phase_condition_type == "VetoRight":
if not phase_veto_participants:
return return_error("Please specify veto participants")
participants_list = [p.strip() for p in phase_veto_participants if p.strip()] if isinstance(phase_veto_participants, list) else [phase_veto_participants.strip()]
condition_str = f"VetoRight: {', '.join(participants_list)}"
elif phase_condition_type == "ParticipantExclusion":
if not phase_excluded_participants:
return return_error("Please specify excluded participants")
participants_list = [p.strip() for p in phase_excluded_participants if p.strip()] if isinstance(phase_excluded_participants, list) else [phase_excluded_participants.strip()]
condition_str = f"ParticipantExclusion: {', '.join(participants_list)}"
elif phase_condition_type == "MinParticipants":
if not phase_min_participants or phase_min_participants < 1:
return return_error("Please specify minimum participants (≥1)")
condition_str = f"MinParticipants: {phase_min_participants}"
elif phase_condition_type == "Deadline":
parts = []
if phase_deadline_offset_value and phase_deadline_offset_unit:
if phase_deadline_offset_value < 1:
return return_error("Deadline offset value must be ≥1")
parts.append(f"{phase_deadline_offset_value} {phase_deadline_offset_unit}")
if phase_deadline_date and phase_deadline_date.strip():
is_valid, error_msg = validate_date_format(phase_deadline_date)
if not is_valid:
return return_error(f"Deadline date error: {error_msg}")
parts.append(phase_deadline_date.strip())
if not parts:
return return_error("Please specify deadline offset or date")
condition_str = f"Deadline: {', '.join(parts)}"
elif phase_condition_type == "MinDecisionTime":
parts = []
if phase_min_decision_offset_value and phase_min_decision_offset_unit:
if phase_min_decision_offset_value < 1:
return return_error("Minimum decision time offset value must be ≥1")
parts.append(f"{phase_min_decision_offset_value} {phase_min_decision_offset_unit}")
if phase_min_decision_date and phase_min_decision_date.strip():
is_valid, error_msg = validate_date_format(phase_min_decision_date)
if not is_valid:
return return_error(f"Minimum decision time date error: {error_msg}")
parts.append(phase_min_decision_date.strip())
if not parts:
return return_error("Please specify minimum decision time offset or date")
condition_str = f"MinDecisionTime: {', '.join(parts)}"
elif phase_condition_type == "CheckCiCd":
# CheckCiCd: evaluation_mode (optional) and boolean value (true/false)
evaluation_part = f"{phase_check_ci_cd_evaluation_mode} " if phase_check_ci_cd_evaluation_mode and phase_check_ci_cd_evaluation_mode.strip() else ""
check_value = "true" if phase_check_ci_cd_value else "false"
condition_str = f"CheckCiCd {evaluation_part}: {check_value}"
elif phase_condition_type == "MinTime":
# MinTime: evaluation_mode (optional), activity (required), offset value and unit (required)
if not phase_min_time_activity or not phase_min_time_activity.strip():
return return_error("Please specify activity type (Activity/InActivity)")
if not phase_min_time_offset_value or phase_min_time_offset_value < 1:
return return_error("Please specify MinTime offset value (≥1)")
if not phase_min_time_offset_unit:
return return_error("Please specify MinTime offset unit")
# Build with optional evaluation mode
evaluation_part = f"{phase_min_time_evaluation_mode} " if phase_min_time_evaluation_mode and phase_min_time_evaluation_mode.strip() else ""
condition_str = f"MinTime {evaluation_part}of {phase_min_time_activity}: {phase_min_time_offset_value} {phase_min_time_offset_unit}".rstrip()
elif phase_condition_type == "LabelCondition":
if not phase_label_condition_labels:
return return_error("Please specify labels")
labels = [l.strip() for l in phase_label_condition_labels.split(',') if l.strip()]
if not labels:
return return_error("Please specify valid labels")
# Build the condition string with optional evaluation mode
evaluation_part = f"{phase_label_condition_type} " if phase_label_condition_type and phase_label_condition_type.strip() else ""
operator_part = f"{phase_label_condition_operator} " if phase_label_condition_operator else ""
condition_str = f"LabelCondition {evaluation_part}{operator_part}: {', '.join(labels)}".rstrip()
if condition_str:
# Add to existing conditions and format success message
updated_conditions = existing_conditions + [condition_str]
conditions_text = '\n'.join(updated_conditions)
success_message = f"✅ {phase_condition_type} condition added successfully!\n\n{conditions_text}"
return (
success_message, # added_phase_conditions_list
gr.Dropdown(value=""), # phase_condition_type
gr.Dropdown(value=None), # phase_veto_participants
gr.Dropdown(value=None), # phase_excluded_participants
None, # phase_min_participants
None, # phase_deadline_offset_value
"days", # phase_deadline_offset_unit
"", # phase_deadline_date
None, # phase_min_decision_offset_value
"days", # phase_min_decision_offset_unit
"", # phase_min_decision_date
"", # phase_check_ci_cd_evaluation_mode
True, # phase_check_ci_cd_value
"", # phase_label_condition_type
"", # phase_label_condition_operator
"", # phase_label_condition_labels
"", # phase_min_time_evaluation_mode
"Activity", # phase_min_time_activity
None, # phase_min_time_offset_value
"days" # phase_min_time_offset_unit
)
return return_error("Unable to add condition")
def clear_phase_conditions():
"""Clear all conditions for the current phase"""
return ""
def update_phase_condition_visibility(phase_condition_type):
"""Update visibility of phase condition fields based on condition type"""
show_veto = phase_condition_type == "VetoRight"
show_exclusion = phase_condition_type == "ParticipantExclusion"
show_min_participants = phase_condition_type == "MinParticipants"
show_deadline = phase_condition_type == "Deadline"
show_min_decision = phase_condition_type == "MinDecisionTime"
show_check_ci_cd = phase_condition_type == "CheckCiCd"
show_label = phase_condition_type == "LabelCondition"
show_min_time = phase_condition_type == "MinTime"
return [
gr.Dropdown(visible=show_veto), # phase_veto_participants
gr.Dropdown(visible=show_exclusion), # phase_excluded_participants
gr.Number(visible=show_min_participants), # phase_min_participants
gr.Number(visible=show_deadline), # phase_deadline_offset_value
gr.Dropdown(visible=show_deadline), # phase_deadline_offset_unit
gr.Textbox(visible=show_deadline), # phase_deadline_date
gr.Number(visible=show_min_decision), # phase_min_decision_offset_value
gr.Dropdown(visible=show_min_decision), # phase_min_decision_offset_unit
gr.Textbox(visible=show_min_decision), # phase_min_decision_date
gr.Dropdown(visible=show_check_ci_cd), # phase_check_ci_cd_evaluation_mode
gr.Checkbox(visible=show_check_ci_cd), # phase_check_ci_cd_value
gr.Dropdown(visible=show_label), # phase_label_condition_type
gr.Dropdown(visible=show_label), # phase_label_condition_operator
gr.Textbox(visible=show_label), # phase_label_condition_labels
gr.Dropdown(visible=show_min_time), # phase_min_time_evaluation_mode
gr.Dropdown(visible=show_min_time), # phase_min_time_activity
gr.Number(visible=show_min_time), # phase_min_time_offset_value
gr.Dropdown(visible=show_min_time) # phase_min_time_offset_unit
]
# Composed Policy Functions
def add_phase(phase_name, phase_type, phase_participants, phase_decision_type, phase_decision_options,
phase_voting_ratio, phase_default_decision, phase_fallback_policy, phase_communication_channel,
current_phases_text, added_phase_conditions_text):
"""Add a phase to the current composed policy"""
# Parse existing phases from the display text (ignore error messages and success messages)
existing_phases = []
if current_phases_text.strip():
for line in current_phases_text.strip().split('\n'):
line = line.strip()
# Skip error messages, success messages, and empty lines
if line and not line.startswith('❌ Error:') and not line.startswith('✅'):
existing_phases.append(line)
# Helper function to format the phases display with error
def format_phases_with_error(error_msg):
if existing_phases:
phases_text = '\n'.join(existing_phases)
return f"❌ Error: {error_msg}\n\n{phases_text}"
else:
return f"❌ Error: {error_msg}"
# Helper function to return error with all phase fields reset
def return_phase_error(error_msg):
return (
format_phases_with_error(error_msg), # added_phases_list
"", # phase_name
"MajorityPolicy", # phase_type
[], # phase_participants - use empty list
"BooleanDecision", # phase_decision_type
"", # phase_decision_options
1.0, # phase_voting_ratio
None, # phase_default_decision
None, # phase_fallback_policy
"", # phase_communication_channel
"" # added_phase_conditions_text - clear conditions
)
# Validate phase name (mandatory)
if not phase_name.strip():
return return_phase_error("Please enter a phase name")
# Check for duplicates
phase_names = [line.split(' (')[0] for line in existing_phases] # Extract names from display format
if phase_name.strip() in phase_names:
return return_phase_error(f"Phase '{phase_name.strip()}' already exists")
# Validate phase participants (mandatory)
if not phase_participants or (isinstance(phase_participants, list) and len(phase_participants) == 0):
return return_phase_error("Please select at least one participant for this phase")
# Create phase display string
participants_list = phase_participants if isinstance(phase_participants, list) else [phase_participants]
phase_display = f"{phase_name.strip()} ({phase_type}) → participants: {', '.join(participants_list)}"
# Add type-specific details
details = []
if phase_decision_type != "BooleanDecision":
details.append(f"decision: {phase_decision_type}")
# Show voting ratio for voting-related policies (always show, not just when != 1.0)
if phase_type in ["MajorityPolicy", "AbsoluteMajorityPolicy", "VotingPolicy"]:
details.append(f"ratio: {phase_voting_ratio}")
# Show default decision only for LeaderDrivenPolicy
if phase_type == "LeaderDrivenPolicy" and phase_default_decision:
details.append(f"default: {phase_default_decision}")
# Show fallback policy only for ConsensusPolicy and LazyConsensusPolicy
if phase_type in ["ConsensusPolicy", "LazyConsensusPolicy"] and phase_fallback_policy:
details.append(f"fallback: {phase_fallback_policy}")
# Show decision options if specified and not BooleanDecision
if phase_decision_options and phase_decision_type != "BooleanDecision":
# Handle both string (StringList) and list (ElementList) inputs
if isinstance(phase_decision_options, list):
options_clean = ",".join(phase_decision_options) if phase_decision_options else ""
else:
options_clean = phase_decision_options.strip() if isinstance(phase_decision_options, str) else str(phase_decision_options)
if options_clean: # Only add if not empty after processing
details.append(f"options: {options_clean}")
# Optional communication channel detail
if phase_communication_channel and phase_communication_channel.strip():
details.append(f"channel: {phase_communication_channel.strip()}")
if details:
phase_display += f", {', '.join(details)}"
# Append conditions to the phase display if any
if added_phase_conditions_text and added_phase_conditions_text.strip():
# Filter out error/success messages from conditions
clean_conditions = []
for line in added_phase_conditions_text.strip().split('\n'):
line = line.strip()
if line and not line.startswith('❌') and not line.startswith('✅'):
clean_conditions.append(line)
if clean_conditions:
phase_display += f" [Conditions: {', '.join(clean_conditions)}]"
# Add to existing phases and format success message
updated_phases = existing_phases + [phase_display]
phases_text = '\n'.join(updated_phases)
success_message = f"✅ Phase '{phase_name.strip()}' added successfully!\n\n{phases_text}"
return (
success_message, # added_phases_list
"", # phase_name - clear
"MajorityPolicy", # phase_type - reset
[], # phase_participants - use empty list
"BooleanDecision", # phase_decision_type - reset
"", # phase_decision_options - clear
1.0, # phase_voting_ratio - reset
None, # phase_default_decision - clear
None, # phase_fallback_policy - clear
"", # phase_communication_channel - clear
"" # added_phase_conditions_text - clear conditions
)
def clear_phases():
"""Clear all phases for the current composed policy"""
return ""
def add_composed_policy(name, scope, execution_type, require_all, carry_over, phases_text, current_composed_policies):
"""Add a new composed policy to the list"""
# Validate composed policy name (mandatory)
if not name.strip():
display_text = self._format_composed_policies_display(current_composed_policies)
error_message = f"❌ Error: Please enter a composed policy name\n\n{display_text}" if current_composed_policies else "❌ Error: Please enter a composed policy name"
return current_composed_policies, error_message
# Validate composed policy scope (mandatory)
if not scope:
display_text = self._format_composed_policies_display(current_composed_policies)
error_message = f"❌ Error: Please select a composed policy scope\n\n{display_text}" if current_composed_policies else "❌ Error: Please select a composed policy scope"
return current_composed_policies, error_message
# Validate phases (at least one phase required)
phases = []
if phases_text.strip():
for line in phases_text.strip().split('\n'):
line = line.strip()
# Skip error messages, success messages, and empty lines
if line and not line.startswith('❌ Error:') and not line.startswith('✅'):
phases.append(line)
if not phases:
display_text = self._format_composed_policies_display(current_composed_policies)
error_message = f"❌ Error: Please add at least one phase\n\n{display_text}" if current_composed_policies else "❌ Error: Please add at least one phase"
return current_composed_policies, error_message
# Check if composed policy already exists
if any(p['name'] == name.strip() for p in current_composed_policies):
display_text = self._format_composed_policies_display(current_composed_policies)
error_message = f"❌ Error: Composed policy name '{name.strip()}' already exists\n\n{display_text}"
return current_composed_policies, error_message
# Create new composed policy
new_composed_policy = {
'type': 'ComposedPolicy',
'name': name.strip(),
'scope': scope,
'execution_type': execution_type,
'require_all': require_all,
'carry_over': carry_over,
'phases': phases
}
# Add to the list
updated_composed_policies = current_composed_policies + [new_composed_policy]
display_text = self._format_composed_policies_display(updated_composed_policies)
success_message = f"✅ Composed policy '{name.strip()}' added successfully!\n\n{display_text}"
return updated_composed_policies, success_message
def clear_composed_policies():
"""Clear all composed policies"""
return [], "No composed policies added yet"
def add_composed_policy_and_clear_form(name, scope, execution_type, require_all, carry_over, phases_text, current_composed_policies):
"""Add composed policy and clear form fields only on success"""
# First try to add the composed policy
policies_result, display_result = add_composed_policy(
name, scope, execution_type, require_all, carry_over, phases_text, current_composed_policies
)
# Check if the addition was successful (no error message)
if "❌ Error:" not in display_result:
# Success: clear the form fields
return (
policies_result, display_result, # composed_policies_data, composed_policies_display
"", # composed_policy_name - clear
None, # composed_policy_scope - clear
"sequential", # execution_type - reset to default
True, # require_all - reset to default
True, # carry_over - reset to default
"" # added_phases_list - clear
)
else:
# Error: keep form as is, only update composed policies display
return (
policies_result, display_result, # composed_policies_data, composed_policies_display
name, scope, execution_type, require_all, carry_over, # Keep form values
phases_text # Keep phases
)
def add_policy_and_clear_form(name, policy_type, scope, participants, decision_type, decision_options, voting_ratio,
default_decision, fallback_policy, condition_type, veto_participants, excluded_participants,
min_participants, deadline_offset_value, deadline_offset_unit, deadline_date,
min_decision_offset_value, min_decision_offset_unit, min_decision_date,
label_condition_type, label_condition_operator, label_condition_labels,
communication_channel,
added_conditions_list, current_policies):
"""Add policy and clear form fields only on success"""
# First try to add the policy
policies_result, display_result = add_policy(
name, policy_type, scope, participants, decision_type, decision_options, voting_ratio,
default_decision, fallback_policy, condition_type, veto_participants, excluded_participants,
min_participants, deadline_offset_value, deadline_offset_unit, deadline_date,
min_decision_offset_value, min_decision_offset_unit, min_decision_date,
label_condition_type, label_condition_operator, label_condition_labels,
communication_channel,
added_conditions_list, current_policies
)
# Check if the addition was successful (no error message)
if "❌ Error:" not in display_result:
# Success: clear the form and update dropdowns
# Since we're clearing the form (scope will be None), show all policies
fallback_choices = [p['name'] for p in policies_result]
return (
policies_result, display_result, # policies_data, policies_display
"", "MajorityPolicy", None, [], "BooleanDecision", "", 1.0, # Clear form fields - use empty list for participants
gr.Dropdown(choices=fallback_choices, visible=False), # default_decision
gr.Dropdown(choices=fallback_choices, visible=False), # fallback_policy
"", [], [], None, None, "days", "", None, "days", "", "pre", "", "", "", # Clear condition fields - use empty lists for participant dropdowns
"" # communication_channel clear
)
else:
# Error: keep form as is, only update policies display
return (
policies_result, display_result, # policies_data, policies_display
name, policy_type, scope, participants, decision_type, decision_options, voting_ratio, # Keep form values
default_decision, fallback_policy, # Keep current values
condition_type, veto_participants, excluded_participants, min_participants, # Keep basic condition values
deadline_offset_value, deadline_offset_unit, deadline_date, # Keep deadline values
min_decision_offset_value, min_decision_offset_unit, min_decision_date, # Keep min decision values
label_condition_type, label_condition_operator, label_condition_labels, # Keep label condition values
added_conditions_list, # Keep added conditions list
communication_channel # keep
)
def update_preview(*args):
"""Update the preview based on current form values"""
return self._generate_dsl_from_form(*args)
# removed unused local load_example (can be reintroduced and wired when enabling example loading)
# Scope management handlers - Project-specific helpers
def add_project_and_clear_form(name, platform, repo_owner, repo_name, current_projects, current_activities, current_tasks):
"""Add project and return cleared form with proper repository visibility"""
# First add the project
updated_projects, message, clear_name, clear_platform, _, _ = add_project(
name, platform, repo_owner, repo_name, current_projects, current_activities, current_tasks
)
# After clearing, repository fields should be hidden and empty since platform is cleared
hidden_repo_owner = gr.Textbox(value="", visible=False)
hidden_repo_name = gr.Textbox(value="", visible=False)
return updated_projects, message, clear_name, clear_platform, hidden_repo_owner, hidden_repo_name
scope_components['add_project_btn'].click(
fn=add_project_and_clear_form,
inputs=[
scope_components['project_name'],
scope_components['project_platform'],
scope_components['project_repo_owner'],
scope_components['project_repo_name'],
scope_components['projects_data'],
scope_components['activities_data'],
scope_components['tasks_data']
],
outputs=[
scope_components['projects_data'],
scope_components['projects_display'],
scope_components['project_name'], # Clear name field
scope_components['project_platform'], # Clear platform field
scope_components['project_repo_owner'], # Clear and hide repo owner field
scope_components['project_repo_name'] # Clear and hide repo name field
]
)
def clear_projects_and_reset_form():
"""Clear all projects and reset form with proper repository visibility"""
projects_data, display_message = clear_projects()
hidden_repo_owner = gr.Textbox(value="", visible=False)
hidden_repo_name = gr.Textbox(value="", visible=False)
return projects_data, display_message, hidden_repo_owner, hidden_repo_name
scope_components['clear_projects_btn'].click(
fn=clear_projects_and_reset_form,
outputs=[
scope_components['projects_data'],
scope_components['projects_display'],
scope_components['project_repo_owner'], # Hide repository owner field
scope_components['project_repo_name'] # Hide repository name field
]
)
# Show/hide repository fields based on platform selection
scope_components['project_platform'].change(
fn=update_repository_visibility,
inputs=[scope_components['project_platform']],
outputs=[scope_components['project_repo_owner'], scope_components['project_repo_name']]
)
# Activity management
scope_components['add_activity_btn'].click(
fn=add_activity,
inputs=[
scope_components['activity_name'],
scope_components['activity_parent'],
scope_components['activities_data'],
scope_components['projects_data']
],
outputs=[
scope_components['activities_data'],
scope_components['activities_display'],
scope_components['activity_name'], # Clear name field
scope_components['activity_parent'] # Clear parent field
]
)
scope_components['clear_activities_btn'].click(
fn=clear_activities,
outputs=[
scope_components['activities_data'],
scope_components['activities_display']
]
)
# Task management
scope_components['add_task_btn'].click(
fn=add_task,
inputs=[
scope_components['task_name'],
scope_components['task_parent'],
scope_components['task_type'],
scope_components['task_action'],
scope_components['tasks_data'],
scope_components['projects_data'],
scope_components['activities_data']
],
outputs=[
scope_components['tasks_data'],
scope_components['tasks_display'],
scope_components['task_name'], # Clear name field
scope_components['task_parent'], # Clear parent field
scope_components['task_type'], # Clear type field
scope_components['task_action'] # Clear action field
]
)
scope_components['clear_tasks_btn'].click(
fn=clear_tasks,
outputs=[
scope_components['tasks_data'],
scope_components['tasks_display']
]
)
# Update activity parent dropdown when projects change
scope_components['projects_data'].change(
fn=update_activity_parent_dropdown,
inputs=[scope_components['projects_data']],
outputs=[scope_components['activity_parent']]
)
# Update task parent dropdown when activities change (tasks can only have activities as parents)
scope_components['activities_data'].change(
fn=update_task_parent_dropdown,
inputs=[scope_components['activities_data']],
outputs=[scope_components['task_parent']]
)
# Update task action dropdown when task type changes
scope_components['task_type'].change(
fn=update_task_action_dropdown,
inputs=[scope_components['task_type']],
outputs=[scope_components['task_action']]
)
# Profile management handlers
participant_components['add_profile_btn'].click(
fn=add_profile,
inputs=[
participant_components['profile_name'],
participant_components['profile_gender'],
participant_components['profile_race'],
participant_components['profile_language'],
participant_components['profiles_data']
],
outputs=[
participant_components['profiles_data'],
participant_components['profiles_display'],
participant_components['profile_name'], # Clear name field
participant_components['profile_gender'], # Clear gender field
participant_components['profile_race'], # Clear race field
participant_components['profile_language'], # Clear language field
participant_components['individual_profile'] # Update individual profile dropdown
]
)
participant_components['clear_profiles_btn'].click(
fn=clear_profiles,
outputs=[
participant_components['profiles_data'],
participant_components['profiles_display'],
participant_components['individual_profile'] # Also clear individual profile dropdown
]
)
# Update individual profile dropdown when profiles data changes
participant_components['profiles_data'].change(
fn=lambda profiles_data: gr.Dropdown(choices=[p['name'] for p in profiles_data] if profiles_data else [], value=None),
inputs=[participant_components['profiles_data']],
outputs=[participant_components['individual_profile']]
)
# Roles management handlers
participant_components['add_role_btn'].click(
fn=add_role,
inputs=[
participant_components['role_name'],
participant_components['role_vote_value'],
participant_components['roles_data'],
participant_components['individuals_data'],
participant_components['agents_data']
],
outputs=[
participant_components['roles_data'],
participant_components['roles_display'],
participant_components['role_name'],
participant_components['role_vote_value']
]
)
participant_components['clear_roles_btn'].click(
fn=clear_all_roles,
inputs=[],
outputs=[
participant_components['roles_data'],
participant_components['roles_display'],
participant_components['role_name'],
participant_components['role_vote_value']
]
)
# Individual management handlers
participant_components['add_individual_btn'].click(
fn=add_individual,
inputs=[
participant_components['individual_name'],
participant_components['individual_vote_value'],
participant_components['individual_profile'],
participant_components['individual_role'],
participant_components['individuals_data'],
participant_components['profiles_data'], # For profile dropdown choices
participant_components['roles_data'], # For global uniqueness check
participant_components['agents_data'] # For global uniqueness check
],
outputs=[
participant_components['individuals_data'],
participant_components['individuals_display'],
participant_components['individual_name'], # Clear name field
participant_components['individual_vote_value'], # Reset vote value
participant_components['individual_profile'], # Update profile dropdown
participant_components['individual_role'] # Update role dropdown
]
)
participant_components['clear_individuals_btn'].click(
fn=clear_individuals,
outputs=[
participant_components['individuals_data'],
participant_components['individuals_display']
]
)
# Agent management handlers
participant_components['add_agent_btn'].click(
fn=add_agent,
inputs=[
participant_components['agent_name'],
participant_components['agent_vote_value'],
participant_components['agent_confidence'],
participant_components['agent_autonomy_level'],
participant_components['agent_explainability'],
participant_components['agent_role'],
participant_components['agents_data'],
participant_components['roles_data'], # For global uniqueness check
participant_components['individuals_data'] # For global uniqueness check
],
outputs=[
participant_components['agents_data'],
participant_components['agents_display'],
participant_components['agent_name'], # Clear name field
participant_components['agent_vote_value'], # Reset vote value to default
participant_components['agent_confidence'], # Clear confidence field
participant_components['agent_autonomy_level'], # Clear autonomy level field
participant_components['agent_explainability'], # Clear explainability field
participant_components['agent_role'] # Update role dropdown
]
)
participant_components['clear_agents_btn'].click(
fn=clear_agents,
outputs=[
participant_components['agents_data'],
participant_components['agents_display']
]
)
# Update individual and agent dropdowns when roles data changes
participant_components['roles_data'].change(
fn=update_individual_dropdowns,
inputs=[
participant_components['roles_data'],
participant_components['profiles_data']
],
outputs=[
participant_components['individual_profile'],
participant_components['individual_role']
]
)
# Update agent role dropdown when roles data changes
participant_components['roles_data'].change(
fn=update_agent_role_dropdown,
inputs=[participant_components['roles_data']],
outputs=[participant_components['agent_role']]
)
# Update ElementList dropdowns when individuals change (for both single and phase policies)
def update_element_list_dropdowns(individuals_data):
"""Update ElementList dropdowns when individuals change"""
individual_names = [i['name'] for i in individuals_data] if individuals_data else []
return (
gr.Dropdown(choices=individual_names, value=None, interactive=True), # decision_options_element_list
gr.Dropdown(choices=individual_names, value=None, interactive=True) # phase_decision_options_element_list
)
participant_components['individuals_data'].change(
fn=update_element_list_dropdowns,
inputs=[participant_components['individuals_data']],
outputs=[policy_components['decision_options_element_list'], policy_components['phase_decision_options_element_list']]
)
# Update policy scope dropdown when scope definitions change
for scope_component in [scope_components['projects_data'], scope_components['activities_data'], scope_components['tasks_data']]:
scope_component.change(
fn=update_scope_dropdown,
inputs=[
scope_components['projects_data'],
scope_components['activities_data'],
scope_components['tasks_data']
],
outputs=[policy_components['policy_scope']]
)
# Update policy participants dropdown when participant definitions change
for participant_component in [participant_components['roles_data'], participant_components['individuals_data'], participant_components['agents_data']]:
participant_component.change(
fn=update_participant_dropdown,
inputs=[
participant_components['roles_data'],
participant_components['individuals_data'],
participant_components['agents_data']
],
outputs=[
policy_components['policy_participants'],
policy_components['veto_participants'],
policy_components['excluded_participants'],
policy_components['phase_participants'],
policy_components['phase_veto_participants'],
policy_components['phase_excluded_participants']
]
)
# Update policy-specific attribute visibility when policy type changes
policy_components['policy_type'].change(
fn=update_policy_type_visibility,
inputs=[policy_components['policy_type']],
outputs=[
policy_components['default_decision'],
policy_components['fallback_policy'],
policy_components['voting_ratio']
]
)
# Update decision options visibility when decision type changes
policy_components['decision_type'].change(
fn=update_decision_options_visibility,
inputs=[policy_components['decision_type']],
outputs=[policy_components['decision_options'], policy_components['decision_options_element_list']]
)
# Update condition field visibility when condition type changes
policy_components['condition_type'].change(
fn=update_condition_visibility,
inputs=[policy_components['condition_type']],
outputs=[
policy_components['veto_participants'],
policy_components['excluded_participants'],
policy_components['min_participants'],
policy_components['deadline_offset_value'],
policy_components['deadline_offset_unit'],
policy_components['deadline_date'],
policy_components['min_decision_offset_value'],
policy_components['min_decision_offset_unit'],
policy_components['min_decision_date'],
policy_components['check_ci_cd_evaluation_mode'],
policy_components['check_ci_cd_value'],
policy_components['label_condition_type'],
policy_components['label_condition_operator'],
policy_components['label_condition_labels'],
policy_components['min_time_evaluation_mode'],
policy_components['min_time_activity'],
policy_components['min_time_offset_value'],
policy_components['min_time_offset_unit']
]
)
# Policy management handlers
policy_components['add_policy_btn'].click(
fn=add_policy_and_clear_form,
inputs=[
policy_components['policy_name'],
policy_components['policy_type'],
policy_components['policy_scope'],
policy_components['policy_participants'],
policy_components['decision_type'],
policy_components['decision_options'],
policy_components['voting_ratio'],
policy_components['default_decision'],
policy_components['fallback_policy'],
policy_components['condition_type'],
policy_components['veto_participants'],
policy_components['excluded_participants'],
policy_components['min_participants'],
policy_components['deadline_offset_value'],
policy_components['deadline_offset_unit'],
policy_components['deadline_date'],
policy_components['min_decision_offset_value'],
policy_components['min_decision_offset_unit'],
policy_components['min_decision_date'],
policy_components['label_condition_type'],
policy_components['label_condition_operator'],
policy_components['label_condition_labels'],
policy_components['communication_channel'],
policy_components['added_conditions_list'],
policy_components['policies_data']
],
outputs=[
policy_components['policies_data'],
policy_components['policies_display'],
policy_components['policy_name'], # Clear/keep name field
policy_components['policy_type'], # Reset/keep policy type
policy_components['policy_scope'], # Clear/keep scope
policy_components['policy_participants'], # Clear/keep participants - use component, not new dropdown
policy_components['decision_type'], # Reset/keep decision type
policy_components['decision_options'], # Clear/keep decision options
policy_components['voting_ratio'], # Reset/keep voting ratio
policy_components['default_decision'], # Update/keep default decision
policy_components['fallback_policy'], # Update/keep fallback policy
policy_components['condition_type'], # Clear/keep condition type
policy_components['veto_participants'], # Clear/keep veto participants
policy_components['excluded_participants'], # Clear/keep excluded participants
policy_components['min_participants'], # Reset/keep min participants
policy_components['deadline_offset_value'], # Reset/keep deadline offset value
policy_components['deadline_offset_unit'], # Reset/keep deadline offset unit
policy_components['deadline_date'], # Reset/keep deadline date
policy_components['min_decision_offset_value'], # Reset/keep min decision offset value
policy_components['min_decision_offset_unit'], # Reset/keep min decision offset unit
policy_components['min_decision_date'], # Reset/keep min decision date
policy_components['label_condition_type'], # Reset/keep label condition type
policy_components['label_condition_operator'], # Reset/keep label operator
policy_components['label_condition_labels'], # Clear/keep label condition labels
policy_components['added_conditions_list'], # Clear/keep added conditions list
policy_components['communication_channel'] # Clear/keep communication channel
]
)
policy_components['clear_policies_btn'].click(
fn=clear_policies,
outputs=[
policy_components['policies_data'],
policy_components['policies_display']
]
)
# Condition management handlers
policy_components['add_condition_btn'].click(
fn=add_condition,
inputs=[
policy_components['condition_type'],
policy_components['veto_participants'],
policy_components['excluded_participants'],
policy_components['min_participants'],
policy_components['deadline_offset_value'],
policy_components['deadline_offset_unit'],
policy_components['deadline_date'],
policy_components['min_decision_offset_value'],
policy_components['min_decision_offset_unit'],
policy_components['min_decision_date'],
policy_components['check_ci_cd_evaluation_mode'],
policy_components['check_ci_cd_value'],
policy_components['label_condition_type'],
policy_components['label_condition_operator'],
policy_components['label_condition_labels'],
policy_components['min_time_evaluation_mode'],
policy_components['min_time_activity'],
policy_components['min_time_offset_value'],
policy_components['min_time_offset_unit'],
policy_components['added_conditions_list']
],
outputs=[
policy_components['added_conditions_list'], # success/error message
policy_components['condition_type'], # reset dropdown
policy_components['veto_participants'], # reset multiselect
policy_components['excluded_participants'], # reset multiselect
policy_components['min_participants'], # reset number
policy_components['deadline_offset_value'], # reset number
policy_components['deadline_offset_unit'], # reset dropdown
policy_components['deadline_date'], # reset textbox
policy_components['min_decision_offset_value'], # reset number
policy_components['min_decision_offset_unit'], # reset dropdown
policy_components['min_decision_date'], # reset textbox
policy_components['check_ci_cd_evaluation_mode'], # reset dropdown
policy_components['check_ci_cd_value'], # reset checkbox
policy_components['label_condition_type'], # reset dropdown
policy_components['label_condition_operator'], # reset textbox
policy_components['label_condition_labels'], # reset textbox
policy_components['min_time_evaluation_mode'], # reset dropdown
policy_components['min_time_activity'], # reset dropdown
policy_components['min_time_offset_value'], # reset number
policy_components['min_time_offset_unit'] # reset dropdown
]
)
policy_components['clear_conditions_btn'].click(
fn=clear_conditions,
outputs=[
policy_components['added_conditions_list']
]
)
# Phase condition event handlers
policy_components['phase_condition_type'].change(
fn=update_phase_condition_visibility,
inputs=[policy_components['phase_condition_type']],
outputs=[
policy_components['phase_veto_participants'],
policy_components['phase_excluded_participants'],
policy_components['phase_min_participants'],
policy_components['phase_deadline_offset_value'],
policy_components['phase_deadline_offset_unit'],
policy_components['phase_deadline_date'],
policy_components['phase_min_decision_offset_value'],
policy_components['phase_min_decision_offset_unit'],
policy_components['phase_min_decision_date'],
policy_components['phase_check_ci_cd_evaluation_mode'],
policy_components['phase_check_ci_cd_value'],
policy_components['phase_label_condition_type'],
policy_components['phase_label_condition_operator'],
policy_components['phase_label_condition_labels'],
policy_components['phase_min_time_evaluation_mode'],
policy_components['phase_min_time_activity'],
policy_components['phase_min_time_offset_value'],
policy_components['phase_min_time_offset_unit']
]
)
policy_components['add_phase_condition_btn'].click(
fn=add_phase_condition,
inputs=[
policy_components['phase_condition_type'],
policy_components['phase_veto_participants'],
policy_components['phase_excluded_participants'],
policy_components['phase_min_participants'],
policy_components['phase_deadline_offset_value'],
policy_components['phase_deadline_offset_unit'],
policy_components['phase_deadline_date'],
policy_components['phase_min_decision_offset_value'],
policy_components['phase_min_decision_offset_unit'],
policy_components['phase_min_decision_date'],
policy_components['phase_check_ci_cd_evaluation_mode'],
policy_components['phase_check_ci_cd_value'],
policy_components['phase_label_condition_type'],
policy_components['phase_label_condition_operator'],
policy_components['phase_label_condition_labels'],
policy_components['phase_min_time_evaluation_mode'],
policy_components['phase_min_time_activity'],
policy_components['phase_min_time_offset_value'],
policy_components['phase_min_time_offset_unit'],
policy_components['added_phase_conditions_list']
],
outputs=[
policy_components['added_phase_conditions_list'], # success/error message
policy_components['phase_condition_type'], # reset dropdown
policy_components['phase_veto_participants'], # reset multiselect
policy_components['phase_excluded_participants'], # reset multiselect
policy_components['phase_min_participants'], # reset number
policy_components['phase_deadline_offset_value'], # reset number
policy_components['phase_deadline_offset_unit'], # reset dropdown
policy_components['phase_deadline_date'], # reset textbox
policy_components['phase_min_decision_offset_value'], # reset number
policy_components['phase_min_decision_offset_unit'], # reset dropdown
policy_components['phase_min_decision_date'], # reset textbox
policy_components['phase_check_ci_cd_evaluation_mode'], # reset dropdown
policy_components['phase_check_ci_cd_value'], # reset checkbox
policy_components['phase_label_condition_type'], # reset dropdown
policy_components['phase_label_condition_operator'], # reset textbox
policy_components['phase_label_condition_labels'], # reset textbox
policy_components['phase_min_time_evaluation_mode'], # reset dropdown
policy_components['phase_min_time_activity'], # reset dropdown
policy_components['phase_min_time_offset_value'], # reset number
policy_components['phase_min_time_offset_unit'] # reset dropdown
]
)
policy_components['clear_phase_conditions_btn'].click(
fn=clear_phase_conditions,
outputs=[
policy_components['added_phase_conditions_list']
]
)
# Update policy reference dropdowns when policies change
policy_components['policies_data'].change(
fn=update_policy_reference_dropdowns,
inputs=[
policy_components['policies_data'],
policy_components['policy_scope']
],
outputs=[
policy_components['default_decision'],
policy_components['fallback_policy']
]
)
# Update policy reference dropdowns when policy scope changes (and clear current selections)
policy_components['policy_scope'].change(
fn=update_policy_reference_dropdowns_on_scope_change,
inputs=[
policy_components['policies_data'],
policy_components['policy_scope']
],
outputs=[
policy_components['default_decision'],
policy_components['fallback_policy']
]
)
# Composed Policy management handlers
policy_components['add_phase_btn'].click(
fn=add_phase,
inputs=[
policy_components['phase_name'],
policy_components['phase_type'],
policy_components['phase_participants'],
policy_components['phase_decision_type'],
policy_components['phase_decision_options'],
policy_components['phase_voting_ratio'],
policy_components['phase_default_decision'],
policy_components['phase_fallback_policy'],
policy_components['phase_communication_channel'],
policy_components['added_phases_list'],
policy_components['added_phase_conditions_list']
],
outputs=[
policy_components['added_phases_list'],
policy_components['phase_name'],
policy_components['phase_type'],
policy_components['phase_participants'],
policy_components['phase_decision_type'],
policy_components['phase_decision_options'],
policy_components['phase_voting_ratio'],
policy_components['phase_default_decision'],
policy_components['phase_fallback_policy'],
policy_components['phase_communication_channel'],
policy_components['added_phase_conditions_list']
]
)
policy_components['clear_phases_btn'].click(
fn=clear_phases,
outputs=[
policy_components['added_phases_list']
]
)
policy_components['add_composed_policy_btn'].click(
fn=add_composed_policy_and_clear_form,
inputs=[
policy_components['composed_policy_name'],
policy_components['composed_policy_scope'],
policy_components['execution_type'],
policy_components['require_all'],
policy_components['carry_over'],
policy_components['added_phases_list'],
policy_components['composed_policies_data']
],
outputs=[
policy_components['composed_policies_data'],
policy_components['composed_policies_display'],
policy_components['composed_policy_name'],
policy_components['composed_policy_scope'],
policy_components['execution_type'],
policy_components['require_all'],
policy_components['carry_over'],
policy_components['added_phases_list']
]
)
policy_components['clear_composed_policies_btn'].click(
fn=clear_composed_policies,
outputs=[
policy_components['composed_policies_data'],
policy_components['composed_policies_display']
]
)
# Update phase participant dropdown when participant definitions change
for participant_component in [participant_components['roles_data'], participant_components['individuals_data'], participant_components['agents_data']]:
participant_component.change(
fn=update_phase_participant_dropdown,
inputs=[
participant_components['roles_data'],
participant_components['individuals_data'],
participant_components['agents_data']
],
outputs=[
policy_components['phase_participants']
]
)
# Update phase policy type visibility when phase type changes
policy_components['phase_type'].change(
fn=update_policy_type_visibility,
inputs=[policy_components['phase_type']],
outputs=[
policy_components['phase_default_decision'],
policy_components['phase_fallback_policy'],
policy_components['phase_voting_ratio']
]
)
# Update phase decision options visibility when phase decision type changes
policy_components['phase_decision_type'].change(
fn=update_decision_options_visibility,
inputs=[policy_components['phase_decision_type']],
outputs=[policy_components['phase_decision_options'], policy_components['phase_decision_options_element_list']]
)
# Update composed policy scope dropdown when scope definitions change
for scope_component in [scope_components['projects_data'], scope_components['activities_data'], scope_components['tasks_data']]:
scope_component.change(
fn=update_scope_dropdown,
inputs=[
scope_components['projects_data'],
scope_components['activities_data'],
scope_components['tasks_data']
],
outputs=[policy_components['composed_policy_scope']]
)
# Set up change handlers for preview updates
preview_data_components = []
# Scope data components (projects, activities, tasks)
preview_data_components.extend([
scope_components['projects_data'],
scope_components['activities_data'],
scope_components['tasks_data']
])
# Participant components
preview_data_components.extend([
participant_components['profiles_data'],
participant_components['roles_data'],
participant_components['individuals_data'],
participant_components['agents_data']
])
# Include both single policies and composed policies data for preview
preview_data_components.append(policy_components['policies_data'])
preview_data_components.append(policy_components['composed_policies_data'])
for component in preview_data_components:
if hasattr(component, 'change'):
component.change(
fn=update_preview,
inputs=preview_data_components,
outputs=preview_components['preview_code']
)
# Load example button
if hasattr(self, '_create_preview_panel'):
# This will be connected properly when we have the preview panel components
pass
# Download DSL button handler - use gr.File for download
def download_dsl(preview_text):
"""Generate a downloadable file"""
if not preview_text or preview_text.startswith("# Fill the form"):
preview_text = "# Fill the form to generate a governance policy"
# Create filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = "governance_policy_" + timestamp + ".txt"
# Write to a temporary file with the proper name
try:
import tempfile
import os
# Get temp directory
temp_dir = tempfile.gettempdir()
if not temp_dir or temp_dir == '/':
temp_dir = '/tmp'
# Create the file with the desired name
temp_path = os.path.join(temp_dir, filename)
with open(temp_path, 'w') as f:
f.write(preview_text)
# Return just the file path
return temp_path
except Exception as e:
# If file creation fails, try in current directory
try:
with open(filename, 'w') as f:
f.write(preview_text)
return filename
except:
return None
# Wire the "Download Policy" button to download the file
preview_components['download_btn'].click(
fn=download_dsl,
inputs=[preview_components['preview_code']],
outputs=preview_components['download_file']
)
def _format_float(self, value):
"""Format a number as a FLOAT (with decimal point) for DSL compliance"""
if value is None:
return None
# Convert to float and format with decimal point
float_val = float(value)
# If it's a whole number, add .0; otherwise keep as is
if float_val == int(float_val):
return f"{int(float_val)}.0"
else:
return str(float_val)
def _generate_dsl_from_form(self, *form_values):
"""Generate DSL code from form inputs"""
# Extract form values (in the order they appear in the interface)
(projects_data, activities_data, tasks_data, profiles_data, roles_data, individuals_data, agents_data,
policies_data, composed_policies_data) = form_values
dsl_parts = []
# Use structured data directly (no need to parse text anymore)
projects = projects_data if projects_data else []
activities = activities_data if activities_data else []
tasks = tasks_data if tasks_data else []
profiles = profiles_data if profiles_data else [] # profiles_data is already structured
individuals = individuals_data if individuals_data else [] # individuals_data is already structured
agents = agents_data if agents_data else [] # agents_data is already structured
# Generate Scopes section
scope_parts = []
# Get all project names (explicit + implicit from activity parents)
explicit_project_names = {p['name'] for p in projects}
implicit_project_names = {a['parent'] for a in activities if a['parent'] and a['parent'] not in explicit_project_names}
all_project_names = explicit_project_names | implicit_project_names
# Get all activity names (explicit + implicit from task parents)
explicit_activity_names = {a['name'] for a in activities}
implicit_activity_names = {t['parent'] for t in tasks if t['parent'] and t['parent'] not in explicit_activity_names and t['parent'] not in all_project_names}
# Generate Projects with their hierarchical structure
if all_project_names:
scope_parts.append(" Projects :")
for project_name in all_project_names:
# Get explicit project info if available
explicit_project = next((p for p in projects if p['name'] == project_name), None)
if explicit_project and explicit_project['platform']:
scope_parts.append(f" Project {project_name} from {explicit_project['platform']} : {explicit_project['repo']} {{")
else:
scope_parts.append(f" Project {project_name} {{")
# Find activities belonging to this project
project_activities = [a for a in activities if a['parent'] == project_name]
if project_activities:
scope_parts.append(" activities :")
for activity in project_activities:
scope_parts.append(f" {activity['name']} {{")
# Find tasks belonging to this activity
activity_tasks = [t for t in tasks if t['parent'] == activity['name']]
if activity_tasks:
scope_parts.append(" tasks :")
for task in activity_tasks:
if task['type'] and task['action']:
scope_parts.append(f" {task['name']} : {task['type']} {{")
scope_parts.append(f" Action : {task['action']}")
scope_parts.append(" }")
elif task['type']:
scope_parts.append(f" {task['name']} : {task['type']}")
else:
scope_parts.append(f" {task['name']}")
scope_parts.append(" }")
scope_parts.append(" }")
# Generate root-level Activities (explicit + implicit from task parents)
root_activities = [a for a in activities if not a['parent']]
# Add implicit activities referenced by tasks but not explicitly defined
for implicit_activity_name in implicit_activity_names:
root_activities.append({'name': implicit_activity_name, 'parent': None})
if root_activities:
scope_parts.append(" Activities :")
for activity in root_activities:
activity_name = activity['name']
scope_parts.append(f" {activity_name} {{")
# Find tasks belonging to this activity
activity_tasks = [t for t in tasks if t['parent'] == activity_name]
if activity_tasks:
scope_parts.append(" tasks :")
for task in activity_tasks:
if task['type'] and task['action']:
scope_parts.append(f" {task['name']} : {task['type']} {{")
scope_parts.append(f" Action : {task['action']}")
scope_parts.append(" }")
elif task['type']:
scope_parts.append(f" {task['name']} : {task['type']}")
else:
scope_parts.append(f" {task['name']}")
scope_parts.append(" }")
# Generate root-level Tasks (only tasks with no parent or parent that's a project)
root_tasks = [t for t in tasks if not t['parent'] or t['parent'] in all_project_names]
if root_tasks:
scope_parts.append(" Tasks :")
for task in root_tasks:
if task['type'] and task['action']:
scope_parts.append(f" {task['name']} : {task['type']} {{")
scope_parts.append(f" Action : {task['action']}")
scope_parts.append(" }")
elif task['type']:
scope_parts.append(f" {task['name']} : {task['type']}")
else:
scope_parts.append(f" {task['name']}")
# Add Scopes section if we have any scope content
if scope_parts:
dsl_parts.append("Scopes:")
dsl_parts.extend(scope_parts)
# Generate Participants section
participants_section = []
# Add Profiles section
if profiles:
participants_section.append(" Profiles:")
for profile in profiles:
participants_section.append(f" {profile['name']} {{")
if profile.get('gender'):
participants_section.append(f" gender : {profile['gender']}")
if profile.get('race'):
participants_section.append(f" race : {profile['race']}")
if profile.get('language'):
participants_section.append(f" language : {profile['language']}")
participants_section.append(" }")
# Add Roles section
if roles_data:
participants_section.append(" Roles :")
for role in roles_data:
line = f" {role['name']}"
if role.get('vote_value') is not None:
formatted_vote = self._format_float(role['vote_value'])
line += f" {{ vote value : {formatted_vote} }}"
participants_section.append(line + ",")
else:
participants_section.append(line + ",")
# Add Individuals section
if individuals or agents:
participants_section.append(" Individuals :")
# Add regular individuals
for individual in individuals:
line_parts = [f" {individual['name']}"]
if individual.get('vote_value') is not None or individual.get('profile') or individual.get('role'):
line_parts[0] += " {"
attributes = []
if individual.get('vote_value') is not None:
formatted_vote = self._format_float(individual['vote_value'])
attributes.append(f" vote value : {formatted_vote}")
if individual.get('profile'):
attributes.append(f" profile : {individual['profile']}")
if individual.get('role'):
attributes.append(f" role : {individual['role']}")
if attributes:
participants_section.extend(line_parts)
participants_section.extend(attributes)
participants_section.append(" },")
else:
participants_section.append(f" {individual['name']},")
else:
participants_section.append(f" {individual['name']},")
# Add agents
for agent in agents:
line_parts = [f" (Agent) {agent['name']}"]
has_attributes = (agent.get('vote_value') is not None or
agent.get('confidence') is not None or
agent.get('autonomy_level') is not None or
agent.get('explainability') is not None or
agent.get('role'))
if has_attributes:
line_parts[0] += " {"
attributes = []
if agent.get('vote_value') is not None:
formatted_vote = self._format_float(agent['vote_value'])
attributes.append(f" vote value : {formatted_vote}")
if agent.get('confidence'):
formatted_confidence = self._format_float(agent['confidence'])
attributes.append(f" confidence : {formatted_confidence}")
if agent.get('autonomy_level'):
formatted_autonomy = self._format_float(agent['autonomy_level'])
attributes.append(f" autonomy level: {formatted_autonomy}")
if agent.get('explainability'):
formatted_explainability = self._format_float(agent['explainability'])
attributes.append(f" explainability: {formatted_explainability}")
if agent.get('role'):
attributes.append(f" role : {agent['role']}")
if attributes:
participants_section.extend(line_parts)
participants_section.extend(attributes)
participants_section.append(" },")
else:
participants_section.append(f" (Agent) {agent['name']},")
else:
participants_section.append(f" (Agent) {agent['name']},")
if participants_section:
dsl_parts.append("Participants:")
dsl_parts.extend(participants_section)
# Generate Policies section
policies = policies_data if policies_data else []
for policy in policies:
if policy.get('name') and policy.get('type'):
policy_type = policy['type']
policy_name = policy['name']
dsl_parts.append(f"{policy_type} {policy_name} {{")
if policy.get('scope'):
dsl_parts.append(f" Scope: {policy['scope']}")
if policy.get('decision_type'):
if policy.get('decision_options') and policy['decision_type'] in ["StringList", "ElementList"]:
dsl_parts.append(f" DecisionType as {policy['decision_type']} : {policy['decision_options']}")
else:
dsl_parts.append(f" DecisionType as {policy['decision_type']}")
if policy.get('participants'):
# Handle multiselect participants
if isinstance(policy['participants'], list):
participants = [p.strip() for p in policy['participants'] if p.strip()]
else:
participants = [p.strip() for p in str(policy['participants']).split(',') if p.strip()]
if participants:
dsl_parts.append(f" Participant list : {', '.join(participants)}")
# Optional communication channel (must appear immediately after participants)
if policy.get('communication_channel'):
dsl_parts.append(f" CommunicationChannel : {policy['communication_channel']}")
# Add conditions
conditions = []
# Use the dynamically added conditions if available
if policy.get('added_conditions'):
condition_lines = [line.strip() for line in policy['added_conditions'].split('\n') if line.strip()]
for condition_line in condition_lines:
# Convert from display format to DSL format
if condition_line.startswith('VetoRight:'):
conditions.append(f" {condition_line.replace(':', ' :')}")
elif condition_line.startswith('ParticipantExclusion:'):
conditions.append(f" {condition_line.replace(':', ' :')}")
elif condition_line.startswith('MinParticipants:'):
conditions.append(f" {condition_line.replace(':', ' :')}")
elif condition_line.startswith('Deadline:'):
condition_content = condition_line.split(':', 1)[1].strip()
conditions.append(f" Deadline : {condition_content}")
elif condition_line.startswith('MinDecisionTime:'):
condition_content = condition_line.split(':', 1)[1].strip()
conditions.append(f" MinDecisionTime : {condition_content}")
elif condition_line.startswith('LabelCondition'):
# Parse LabelCondition format: "LabelCondition pre not : label1, label2"
condition_content = condition_line.split(':', 1)[1].strip()
condition_prefix = condition_line.split(':', 1)[0].strip()
conditions.append(f" {condition_prefix} : {condition_content}")
# Fallback to single condition fields for backward compatibility
elif policy.get('condition_type'):
condition_type = policy['condition_type']
if condition_type == "VetoRight" and policy.get('veto_participants'):
if isinstance(policy['veto_participants'], list):
veto_list = ', '.join(policy['veto_participants'])
else:
veto_list = str(policy['veto_participants'])
conditions.append(f" VetoRight : {veto_list}")
elif condition_type == "ParticipantExclusion" and policy.get('excluded_participants'):
if isinstance(policy['excluded_participants'], list):
exclusion_list = ', '.join(policy['excluded_participants'])
else:
exclusion_list = str(policy['excluded_participants'])
conditions.append(f" ParticipantExclusion : {exclusion_list}")
elif condition_type == "MinParticipants" and policy.get('min_participants'):
conditions.append(f" MinParticipants : {policy['min_participants']}")
elif condition_type == "Deadline":
deadline_parts = []
if policy.get('deadline_offset_value') and policy.get('deadline_offset_unit'):
deadline_parts.append(f"{policy['deadline_offset_value']} {policy['deadline_offset_unit']}")
if policy.get('deadline_date') and policy['deadline_date'].strip():
if deadline_parts:
deadline_parts.append(f", {policy['deadline_date']}")
else:
deadline_parts.append(policy['deadline_date'])
if deadline_parts:
conditions.append(f" Deadline policy_deadline : {''.join(deadline_parts)}")
elif condition_type == "MinDecisionTime":
min_decision_parts = []
if policy.get('min_decision_offset_value') and policy.get('min_decision_offset_unit'):
min_decision_parts.append(f"{policy['min_decision_offset_value']} {policy['min_decision_offset_unit']}")
if policy.get('min_decision_date') and policy['min_decision_date'].strip():
if min_decision_parts:
min_decision_parts.append(f", {policy['min_decision_date']}")
else:
min_decision_parts.append(policy['min_decision_date'])
if min_decision_parts:
conditions.append(f" MinDecisionTime min_time_id : {''.join(min_decision_parts)}")
elif condition_type == "LabelCondition" and policy.get('label_condition_labels'):
label_type = policy.get('label_condition_type', 'pre')
label_operator = policy.get('label_condition_operator', '')
labels = policy['label_condition_labels']
if label_operator:
conditions.append(f" LabelCondition {label_type} {label_operator} : {labels}")
else:
conditions.append(f" LabelCondition {label_type} : {labels}")
if conditions:
dsl_parts.append(" Conditions:")
dsl_parts.extend(conditions)
# Parameters (must come after conditions)
parameters = []
if policy_type in ["MajorityPolicy", "AbsoluteMajorityPolicy", "VotingPolicy"] and policy.get('voting_ratio'):
# Skip ratio if it's the default value of 0.5
if policy['voting_ratio'] != 0.5:
formatted_ratio = self._format_float(policy['voting_ratio'])
parameters.append(f" ratio: {formatted_ratio}")
# Add default decision for LeaderDrivenPolicy
if policy_type == "LeaderDrivenPolicy" and policy.get('default_decision'):
parameters.append(f" default: {policy['default_decision']}")
# Add fallback policy for ConsensusPolicy and LazyConsensusPolicy
if policy_type in ["ConsensusPolicy", "LazyConsensusPolicy"] and policy.get('fallback_policy'):
parameters.append(f" fallback: {policy['fallback_policy']}")
if parameters:
dsl_parts.append(" Parameters:")
dsl_parts.extend(parameters)
dsl_parts.append("}")
# Generate Composed Policies section
composed_policies = composed_policies_data if composed_policies_data else []
for composed_policy in composed_policies:
if composed_policy.get('name') and composed_policy.get('type') == 'ComposedPolicy':
policy_name = composed_policy['name']
dsl_parts.append(f"ComposedPolicy {policy_name} {{")
if composed_policy.get('scope'):
dsl_parts.append(f" Scope: {composed_policy['scope']}")
# Add execution order configuration
if composed_policy.get('execution_type') or composed_policy.get('require_all') is not None or composed_policy.get('carry_over') is not None:
dsl_parts.append(" Order:")
if composed_policy.get('execution_type'):
dsl_parts.append(f" Execution: {composed_policy['execution_type']}")
if composed_policy.get('require_all') is not None:
require_val = "true" if composed_policy['require_all'] else "false"
dsl_parts.append(f" RequireAll: {require_val}")
if composed_policy.get('carry_over') is not None:
carry_val = "true" if composed_policy['carry_over'] else "false"
dsl_parts.append(f" CarryOver: {carry_val}")
# Add phases
if composed_policy.get('phases'):
dsl_parts.append(" Phases {")
for phase_display in composed_policy['phases']:
# Parse phase from display format: "PhaseName (PolicyType) → participants: p1, p2 [Conditions: ...]"
if ' (' in phase_display and ') →' in phase_display:
# Extract phase name and type
name_part = phase_display.split(' (')[0].strip()
type_part = phase_display.split(' (')[1].split(')')[0].strip()
# Extract participants and details from display format
participants_part = ""
decision_type = "BooleanDecision" # default
ratio = None
default_decision = None
fallback_policy = None
options = None
channel = None
phase_conditions = []
# Parse the display string more carefully
if 'participants: ' in phase_display:
after_participants = phase_display.split('participants: ')[1]
# Extract conditions if present (check for [Conditions: ...])
if ' [Conditions: ' in after_participants:
base_part = after_participants.split(' [Conditions: ')[0]
conditions_part = after_participants.split(' [Conditions: ')[1].rstrip(']')
# Parse conditions: split by comma, then strip and add to list
phase_conditions = [c.strip() for c in conditions_part.split(',')]
else:
base_part = after_participants
# Extract participants (everything before the first comma with a keyword)
if ', decision:' in base_part:
participants_part = base_part.split(', decision:')[0]
elif ', ratio:' in base_part:
participants_part = base_part.split(', ratio:')[0]
elif ', default:' in base_part:
participants_part = base_part.split(', default:')[0]
elif ', fallback:' in base_part:
participants_part = base_part.split(', fallback:')[0]
elif ', options:' in base_part:
participants_part = base_part.split(', options:')[0]
else:
participants_part = base_part
# Extract parameters from the details section
if ', decision: ' in phase_display:
decision_type = phase_display.split(', decision: ')[1].split(',')[0].split(' [')[0].strip()
if ', ratio: ' in phase_display:
ratio = phase_display.split(', ratio: ')[1].split(',')[0].split(' [')[0].strip()
if ', default: ' in phase_display:
default_decision = phase_display.split(', default: ')[1].split(',')[0].split(' [')[0].strip()
if ', fallback: ' in phase_display:
fallback_policy = phase_display.split(', fallback: ')[1].split(',')[0].split(' [')[0].strip()
if ', options: ' in phase_display:
options = phase_display.split(', options: ')[1].split(' [')[0].strip()
if ', channel: ' in phase_display:
channel = phase_display.split(', channel: ')[1].split(',')[0].split(' [')[0].strip()
# Generate DSL for this phase
dsl_parts.append(f" {type_part} {name_part} {{")
dsl_parts.append(f" DecisionType as {decision_type}")
if participants_part:
participants_list = [p.strip() for p in participants_part.split(',')]
dsl_parts.append(f" Participant list: {', '.join(participants_list)}")
# Optional communication channel for phase
if channel:
dsl_parts.append(f" CommunicationChannel : {channel}")
# Add conditions if present
if phase_conditions:
dsl_parts.append(" Conditions:")
for condition in phase_conditions:
# Convert from display format to DSL format
if condition.startswith('VetoRight:'):
dsl_parts.append(f" {condition.replace(':', ' :')}")
elif condition.startswith('ParticipantExclusion:'):
dsl_parts.append(f" {condition.replace(':', ' :')}")
elif condition.startswith('MinParticipants:'):
dsl_parts.append(f" {condition.replace(':', ' :')}")
elif condition.startswith('Deadline:'):
condition_content = condition.split(':', 1)[1].strip()
dsl_parts.append(f" Deadline : {condition_content}")
elif condition.startswith('MinDecisionTime:'):
condition_content = condition.split(':', 1)[1].strip()
dsl_parts.append(f" MinDecisionTime : {condition_content}")
elif condition.startswith('LabelCondition'):
# Parse LabelCondition format: "LabelCondition pre not : label1, label2"
condition_content = condition.split(':', 1)[1].strip() if ':' in condition else ""
condition_prefix = condition.split(':')[0].strip()
if condition_content:
dsl_parts.append(f" {condition_prefix} : {condition_content}")
else:
dsl_parts.append(f" {condition_prefix}")
# Add parameters section if any parameters exist
parameters = []
if ratio:
# Skip ratio if it's the default value of 0.5
if ratio != 0.5:
formatted_ratio = self._format_float(ratio)
parameters.append(f" ratio: {formatted_ratio}")
if default_decision:
parameters.append(f" default: {default_decision}")
if fallback_policy:
parameters.append(f" fallback: {fallback_policy}")
if options:
parameters.append(f" options: {options}")
if parameters:
dsl_parts.append(" Parameters:")
dsl_parts.extend(parameters)
dsl_parts.append(" }")
dsl_parts.append(" }")
dsl_parts.append("}")
return '\n'.join(dsl_parts) if dsl_parts else "# Fill the form to generate DSL code"
def _parse_projects(self, projects_text):
"""Parse projects from multi-line text format"""
projects = []
if not projects_text.strip():
return projects
for line in projects_text.strip().split('\n'):
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split('|')]
project = {
'name': parts[0] if parts else '',
'platform': parts[1] if len(parts) > 1 and parts[1] else None,
'repo': parts[2] if len(parts) > 2 and parts[2] else None
}
if project['name']:
projects.append(project)
return projects
def _parse_activities(self, activities_text):
"""Parse activities from multi-line text format"""
activities = []
if not activities_text.strip():
return activities
for line in activities_text.strip().split('\n'):
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split('|')]
activity = {
'name': parts[0] if parts else '',
'parent': parts[1] if len(parts) > 1 and parts[1] else None
}
if activity['name']:
activities.append(activity)
return activities
def _parse_tasks(self, tasks_text):
"""Parse tasks from multi-line text format: TaskName | Parent | Type | Action"""
tasks = []
if not tasks_text.strip():
return tasks
for line in tasks_text.strip().split('\n'):
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split('|')]
task = {
'name': parts[0] if parts else '',
'parent': parts[1] if len(parts) > 1 and parts[1] else None,
'type': parts[2] if len(parts) > 2 and parts[2] else None,
'action': parts[3] if len(parts) > 3 and parts[3] else None
}
if task['name']:
tasks.append(task)
return tasks
def _format_individuals_display(self, individuals):
"""Format individuals for display in the text area"""
if not individuals:
return "No individuals added yet"
display_lines = [f"👥 {len(individuals)} individual(s) defined:", ""]
for i, individual in enumerate(individuals, 1):
line = f"{i}. {individual['name']}"
attributes = []
# Always show vote value
vote_value = individual.get('vote_value')
if vote_value is None:
vote_value = 1.0 # Show default if not set
attributes.append(f"🗳️ vote: {vote_value}")
if individual.get('profile'):
attributes.append(f"📋 profile: {individual['profile']}")
if individual.get('role'):
attributes.append(f"👔 role: {individual['role']}")
if attributes:
line += f" → {', '.join(attributes)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_agents_display(self, agents):
"""Format agents for display in the text area"""
if not agents:
return "No agents added yet"
display_lines = [f"🤖 {len(agents)} agent(s) defined:", ""]
for i, agent in enumerate(agents, 1):
line = f"{i}. {agent['name']}"
attributes = []
# Always show vote value
vote_value = agent.get('vote_value')
if vote_value is None:
vote_value = 1.0 # Show default if not set
attributes.append(f"🗳️ vote: {vote_value}")
if agent.get('confidence') is not None:
attributes.append(f"🔍 confidence: {agent['confidence']}")
if agent.get('autonomy_level') is not None:
attributes.append(f"⚡ autonomy: {agent['autonomy_level']}")
if agent.get('explainability') is not None:
attributes.append(f"💡 explainability: {agent['explainability']}")
if agent.get('role'):
attributes.append(f"👔 role: {agent['role']}")
if attributes:
line += f" → {', '.join(attributes)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _parse_agents(self, agents_text):
"""Parse agents from multi-line text format: Name | confidence | autonomy_level | explainability | role"""
agents = []
if not agents_text.strip():
return agents
for line in agents_text.strip().split('\n'):
line = line.strip()
if not line:
continue
parts = [p.strip() for p in line.split('|')]
agent = {'name': parts[0]}
if len(parts) > 1 and parts[1]:
try:
agent['confidence'] = float(parts[1])
except ValueError:
pass # Skip invalid confidence values
if len(parts) > 2 and parts[2]:
try:
agent['autonomy_level'] = float(parts[2])
except ValueError:
pass # Skip invalid autonomy level values
if len(parts) > 3 and parts[3]:
try:
agent['explainability'] = float(parts[3])
except ValueError:
pass # Skip invalid explainability values
if len(parts) > 4 and parts[4]:
agent['role'] = parts[4]
agents.append(agent)
return agents
def _format_profiles_display(self, profiles):
"""Format profiles for display in the text area"""
if not profiles:
return "No profiles added yet"
display_lines = [f"📊 {len(profiles)} profile(s) defined:", ""]
for i, profile in enumerate(profiles, 1):
line = f"{i}. {profile['name']}"
attributes = []
if profile.get('gender'):
attributes.append(f"👤 gender: {profile['gender']}")
if profile.get('race'):
attributes.append(f"🌍 race: {profile['race']}")
if profile.get('language'):
attributes.append(f"🌐 language: {profile['language']}")
if attributes:
line += f" → {', '.join(attributes)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_roles_display(self, roles):
"""Format roles for display in the text area"""
if not roles:
return "No roles added yet. Create roles to define project responsibilities."
display_lines = [f"👥 {len(roles)} role(s) defined:", ""]
for i, role in enumerate(roles, 1):
vote_value = role.get('vote_value')
if vote_value is None:
vote_value = 1.0 # Show default if not set
line = f"{i}. 🔹 **{role['name']}** (vote value: {vote_value})"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_projects_display(self, projects):
"""Format projects for display in the text area"""
if not projects:
return "No projects added yet. Create projects to define your organizational structure."
display_lines = [f"🏗️ {len(projects)} project(s) defined:", ""]
for i, project in enumerate(projects, 1):
line = f"{i}. {project['name']}"
details = []
if project.get('platform'):
details.append(f"📦 platform: {project['platform']}")
if project.get('repo'):
details.append(f"📂 repo: {project['repo']}")
if details:
line += f" → {', '.join(details)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_activities_display(self, activities):
"""Format activities for display in the text area"""
if not activities:
return "No activities added yet. Create activities to organize your project workflow."
display_lines = [f"⚡ {len(activities)} activit(ies) defined:", ""]
for i, activity in enumerate(activities, 1):
line = f"{i}. {activity['name']}"
if activity.get('parent'):
line += f" → 📋 parent: {activity['parent']}"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_tasks_display(self, tasks):
"""Format tasks for display in the text area"""
if not tasks:
return "No tasks added yet. Create tasks to define specific governance actions."
display_lines = [f"✅ {len(tasks)} task(s) defined:", ""]
for i, task in enumerate(tasks, 1):
line = f"{i}. {task['name']}"
details = []
if task.get('parent'):
details.append(f"📋 parent: {task['parent']}")
if task.get('type'):
details.append(f"🏷️ type: {task['type']}")
if task.get('action'):
details.append(f"⚙️ action: {task['action']}")
if details:
line += f" → {', '.join(details)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _extract_scope_names(self, projects_data, activities_data, tasks_data):
"""Extract all scope names (projects, activities, tasks) for dropdown choices"""
scope_names = []
# Extract project names
for project in projects_data if projects_data else []:
if project.get('name'):
scope_names.append(project['name'])
# Extract activity names
for activity in activities_data if activities_data else []:
if activity.get('name'):
scope_names.append(activity['name'])
# Extract task names
for task in tasks_data if tasks_data else []:
if task.get('name'):
scope_names.append(task['name'])
return sorted(list(set(scope_names))) # Remove duplicates and sort
def _extract_participant_names(self, roles_data, individuals_data, agents_data):
"""Extract all participant names (roles, individuals, agents) for dropdown choices"""
participant_names = []
# Extract role names
if roles_data:
participant_names.extend([role['name'] for role in roles_data])
# Extract individual names
if individuals_data:
for individual in individuals_data:
if individual.get('name'):
participant_names.append(individual['name'])
# Extract agent names
if agents_data:
for agent in agents_data:
if agent.get('name'):
participant_names.append(f"(Agent) {agent['name']}")
return sorted(list(set(participant_names))) # Remove duplicates and sort
def _format_policies_display(self, policies):
"""Format policies for display in the text area"""
if not policies:
return "No policies added yet"
display_lines = [f"📋 {len(policies)} policy(ies) defined:", ""]
for i, policy in enumerate(policies, 1):
line = f"{i}. {policy['name']} ({policy['type']})"
details = []
if policy.get('scope'):
details.append(f"🎯 scope: {policy['scope']}")
if policy.get('participants'):
participant_list = policy['participants'] if isinstance(policy['participants'], list) else [policy['participants']]
details.append(f"👥 participants: {', '.join(participant_list)}")
if policy.get('decision_type'):
details.append(f"🔹 decision: {policy['decision_type']}")
# Add type-specific parameters
if policy['type'] in ["MajorityPolicy", "AbsoluteMajorityPolicy", "VotingPolicy"] and policy.get('voting_ratio'):
# Skip displaying ratio if it's the default value of 0.5
if policy['voting_ratio'] != 0.5:
details.append(f"📊 ratio: {policy['voting_ratio']}")
if policy['type'] == "LeaderDrivenPolicy" and policy.get('default_decision'):
details.append(f"⚡ default: {policy['default_decision']}")
if policy['type'] == "ConsensusPolicy" and policy.get('fallback_policy'):
details.append(f"🔄 fallback: {policy['fallback_policy']}")
if details:
line += f" → {', '.join(details)}"
display_lines.append(line)
return '\n'.join(display_lines)
def _format_composed_policies_display(self, composed_policies):
"""Format composed policies for display in the text area"""
if not composed_policies:
return "No composed policies added yet"
display_lines = [f"🔄 {len(composed_policies)} composed policy(ies) defined:", ""]
for i, policy in enumerate(composed_policies, 1):
line = f"{i}. {policy['name']} (ComposedPolicy)"
details = []
if policy.get('scope'):
details.append(f"🎯 scope: {policy['scope']}")
if policy.get('execution_type'):
details.append(f"⚙️ execution: {policy['execution_type']}")
if policy.get('require_all') is not None:
details.append(f"✅ require all: {'yes' if policy['require_all'] else 'no'}")
if policy.get('carry_over') is not None:
details.append(f"📄 carry over: {'yes' if policy['carry_over'] else 'no'}")
if policy.get('phases'):
phases_count = len(policy['phases'])
phases_list = ', '.join(policy['phases'][:3]) # Show first 3 phases
if phases_count > 3:
phases_list += f" (+{phases_count - 3} more)"
details.append(f"📋 phases ({phases_count}): {phases_list}")
if details:
line += f" → {', '.join(details)}"
display_lines.append(line)
return '\n'.join(display_lines)
def main():
"""Main function to run the Gradio interface"""
builder = GovernanceFormBuilder()
app = builder.create_interface()
return app
if __name__ == "__main__":
app = main()
app.launch()