An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls

An Implementation of the Microsoft Agent Governance Toolkit for Safe AI Agent Tool Use with Policies, Approvals, Audit Logs, and Risk Controls



scenarios = [
{
“name”: “Safe database read”,
“tool”: research_db,
“kwargs”: {
“table”: “customers”,
“operation”: “select”,
“type”: “select”,
“sensitivity”: “medium”
}
},
{
“name”: “Blocked destructive database action”,
“tool”: research_db,
“kwargs”: {
“table”: “customers”,
“operation”: “drop”,
“type”: “drop_table”,
“sensitivity”: “critical”
}
},
{
“name”: “External email requiring approval”,
“tool”: research_email,
“kwargs”: {
“to”: “[email protected]”,
“recipient_domain”: “example.com”,
“subject”: “Quarterly update”,
“body”: “Sharing a non-confidential quarterly update.”,
“type”: “send_email”,
“sensitivity”: “medium”
}
},
{
“name”: “External email denied due to approval rejection”,
“tool”: research_email,
“kwargs”: {
“to”: “[email protected]”,
“recipient_domain”: “example.com”,
“subject”: “Confidential strategy”,
“body”: “This contains confidential strategy.”,
“type”: “send_email”,
“sensitivity”: “critical”
}
},
{
“name”: “Safe sandbox shell command”,
“tool”: ops_shell,
“kwargs”: {
“command”: “echo Agent governance is active”,
“type”: “shell_exec”,
“sensitivity”: “low”
}
},
{
“name”: “Dangerous shell command blocked”,
“tool”: ops_shell,
“kwargs”: {
“command”: “rm -rf /content/something”,
“type”: “shell_exec”,
“sensitivity”: “critical”
}
},
{
“name”: “Low-trust agent blocked from sensitive data”,
“tool”: shadow_db,
“kwargs”: {
“table”: “executive_compensation”,
“operation”: “select”,
“type”: “select”,
“sensitivity”: “critical”
}
},
{
“name”: “Financial transfer requiring approval”,
“tool”: finance_transfer,
“kwargs”: {
“amount”: 2500,
“destination”: “vendor-123”,
“type”: “transfer_money”,
“sensitivity”: “high”
}
},
{
“name”: “Large financial transfer rejected”,
“tool”: finance_transfer,
“kwargs”: {
“amount”: 15000,
“destination”: “vendor-999”,
“type”: “transfer_money”,
“sensitivity”: “critical”
}
},
]
results = []
for scenario in scenarios:
try:
output = scenario[“tool”](**scenario[“kwargs”])
results.append({
“scenario”: scenario[“name”],
“status”: “executed”,
“output”: output
})
except Exception as e:
results.append({
“scenario”: scenario[“name”],
“status”: “blocked_or_pending”,
“error”: str(e)
})
audit_df = audit_log.to_dataframe()
display_cols = [
“timestamp”,
“agent_name”,
“tool_name”,
“decision”,
“matched_rule”,
“severity”,
“reason”,
“record_hash”
]
display(audit_df[display_cols])
test_cases = [
{
“name”: “drop_table must be denied”,
“identity”: research_agent,
“tool_name”: “query_database”,
“action”: {“type”: “drop_table”, “sensitivity”: “critical”, “autonomous”: True},
“expected”: “deny”
},
{
“name”: “safe select should be allowed”,
“identity”: research_agent,
“tool_name”: “query_database”,
“action”: {“type”: “select”, “sensitivity”: “low”, “autonomous”: True},
“expected”: “allow”
},
{
“name”: “external email should require approval”,
“identity”: research_agent,
“tool_name”: “send_email”,
“action”: {
“type”: “send_email”,
“recipient_domain”: “example.com”,
“sensitivity”: “medium”,
“autonomous”: True
},
“expected”: “require_approval”
},
{
“name”: “low trust sensitive access denied”,
“identity”: unknown_agent,
“tool_name”: “query_database”,
“action”: {“type”: “select”, “sensitivity”: “critical”, “autonomous”: True},
“expected”: “deny”
},
{
“name”: “shell command should enter sandbox”,
“identity”: ops_agent,
“tool_name”: “shell_exec”,
“action”: {
“type”: “shell_exec”,
“command”: “echo hello”,
“sensitivity”: “low”,
“autonomous”: True
},
“expected”: “sandbox”
},
]
test_results = []
for test in test_cases:
decision = engine.evaluate(
identity=test[“identity”],
tool_name=test[“tool_name”],
action=test[“action”]
)
passed = decision.decision == test[“expected”]
test_results.append({
“test”: test[“name”],
“expected”: test[“expected”],
“actual”: decision.decision,
“passed”: passed,
“matched_rule”: decision.matched_rule
})
test_df = pd.DataFrame(test_results)
display(test_df)
engine.activate_kill_switch()
try:
research_db(
table=”customers”,
operation=”select”,
type=”select”,
sensitivity=”low”
)
except Exception as e:
pass
engine.deactivate_kill_switch()
audit_df = audit_log.to_dataframe()
summary = (
audit_df
.groupby([“decision”, “severity”], dropna=False)
.size()
.reset_index(name=”count”)
.sort_values(“count”, ascending=False)
)
display(summary)
agent_summary = (
audit_df
.groupby([“agent_name”, “decision”])
.size()
.reset_index(name=”count”)
.sort_values([“agent_name”, “count”], ascending=[True, False])
)
display(agent_summary)
decision_counts = audit_df[“decision”].value_counts()
plt.figure(figsize=(8, 5))
decision_counts.plot(kind=”bar”)
plt.title(“Governance Decisions Across Agent Actions”)
plt.xlabel(“Decision”)
plt.ylabel(“Count”)
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
severity_counts = audit_df[“severity”].fillna(“none”).value_counts()
plt.figure(figsize=(8, 5))
severity_counts.plot(kind=”bar”)
plt.title(“Governance Events by Severity”)
plt.xlabel(“Severity”)
plt.ylabel(“Count”)
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()
G = nx.DiGraph()
for _, row in audit_df.iterrows():
agent_node = f”Agent: {row[‘agent_name’]}”
tool_node = f”Tool: {row[‘tool_name’]}”
decision_node = f”Decision: {row[‘decision’]}”
rule_node = f”Rule: {row[‘matched_rule’]}” if pd.notna(row[“matched_rule”]) else “Rule: default”
G.add_node(agent_node, node_type=”agent”)
G.add_node(tool_node, node_type=”tool”)
G.add_node(decision_node, node_type=”decision”)
G.add_node(rule_node, node_type=”rule”)
G.add_edge(agent_node, tool_node, relation=”calls”)
G.add_edge(tool_node, decision_node, relation=”produces”)
G.add_edge(decision_node, rule_node, relation=”matched”)
plt.figure(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, k=0.8)
nx.draw_networkx_nodes(G, pos, node_size=1800)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle=”->”, arrowsize=15)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title(“Agent Governance Graph: Agents, Tools, Decisions, and Policy Rules”)
plt.axis(“off”)
plt.tight_layout()
plt.show()
EXPORT_DIR = “/content/agt_tutorial_outputs”
os.makedirs(EXPORT_DIR, exist_ok=True)
audit_json_path = os.path.join(EXPORT_DIR, “tamper_evident_audit_log.json”)
audit_csv_path = os.path.join(EXPORT_DIR, “governance_audit_log.csv”)
policy_copy_path = os.path.join(EXPORT_DIR, “advanced_agent_policy.yaml”)
test_results_path = os.path.join(EXPORT_DIR, “policy_test_results.csv”)
with open(audit_json_path, “w”) as f:
json.dump([asdict(r) for r in audit_log.records], f, indent=2, default=str)
audit_df.to_csv(audit_csv_path, index=False)
test_df.to_csv(test_results_path, index=False)
shutil.copy(POLICY_PATH, policy_copy_path)



Content Curated Originally From Here