第十四章 · DeerFlow 企业级应用案例
本章目标:
- 掌握企业级 DeerFlow 的架构设计与多租户隔离
- 理解 RBAC、审计日志与企业知识库集成方案
- 了解多模态内容生成平台与 Kubernetes 部署实践
本章目标:
- 掌握企业级 DeerFlow 的架构设计与多租户隔离
- 理解 RBAC、审计日志与企业知识库集成方案
- 了解多模态内容生成平台与 Kubernetes 部署实践
14.1 案例背景:企业级应用需求分析
基于企业级应用场景的需求分析,本章聚焦 DeerFlow 二次开发的具体实现路径。
核心需求映射:
| 企业级模块 | 二次开发重点 |
|---|---|
| Agent Teams | Sub-Agent 体系扩展 + 自定义 Agent 类型 |
| Memory | 企业知识库集成 + 项目级记忆 |
| Sandbox | 企业级沙箱(审计、配额) |
| Human-in-loop | 审批中间件 + 审计日志 |
| Context Engineering | 企业知识注入 + 合规过滤 |
| 多租户 | 租户隔离 + RBAC |
14.2 整体架构设计
┌──────────────────────────────────────────────────────────────────────┐
│ Enterprise Architecture │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Frontend (Next.js) │ │
│ │ Agent Chat │ Project Dashboard │ Admin │ │
│ └─────────────────────────────────┬────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Nginx (Port 2026) │ │
│ │ 统一入口 + 认证 + 限流 │ │
│ └──────────┬─────────────────────┬──────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────────┐ │
│ │ Gateway API │ │ LangGraph Server │ │
│ │ (8001) │ │ (2024) │ │
│ │ │ │ │ │
│ │ - Auth │ │ ┌─────────────────────────────────┐ │ │
│ │ - RBAC │ │ │ Enterprise Agent │ │ │
│ │ - Tenant Mgmt │ │ │ │ │ │
│ │ - Audit │ │ │ - Middleware Chain (定制) │ │ │
│ └─────────────────┘ │ │ - Agent Teams │ │ │
│ │ │ - Skills (企业) │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └────────────────────┬────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Corporate │ │ Sandbox │ │ Memory │ │
│ │ Knowledge │ │ (K8s) │ │ System │ │
│ │ Base │ │ │ │ │ │
│ └─────────────┘ └──────────────┘ └──────────┘ │
│ │
└───────────────────────────────────────────────────────────────────────┘14.3 多租户隔离实现
💡 最佳实践:多租户的
isolation_mode默认strict最安全,但在内部工具场景下,relaxed模式配合审计日志可以在安全与效率间取得更好平衡。
14.3.1 租户上下文
python
# enterprise/tenant/context.py
class TenantContext:
"""
租户上下文 - ThreadLocal 存储
"""
_local = threading.local()
@classmethod
def set(cls, tenant_id: str, user_id: str, role: str):
cls._local.tenant_id = tenant_id
cls._local.user_id = user_id
cls._local.role = role
@classmethod
def get(cls) -> "TenantInfo":
return TenantInfo(
tenant_id=getattr(cls._local, 'tenant_id', None),
user_id=getattr(cls._local, 'user_id', None),
role=getattr(cls._local, 'role', None)
)
@classmethod
def clear(cls):
cls._local.tenant_id = None
cls._local.user_id = None
cls._local.role = None
```python
### 14.3.2 租户中间件
```python
# enterprise/tenant/middleware.py
class TenantMiddleware:
"""
租户隔离中间件
"""
async def process(self, state: ThreadState) -> MiddlewareResult:
# 1. 从请求中获取租户信息
tenant_info = TenantContext.get()
if not tenant_info.tenant_id:
raise UnauthorizedError("Tenant ID required")
# 2. 注入到 ThreadState
state["tenant_id"] = tenant_info.tenant_id
state["user_id"] = tenant_info.user_id
state["user_role"] = tenant_info.role
# 3. 设置资源配额
quota = await self._get_quota(tenant_info)
state["resource_quota"] = quota
return MiddlewareResult(should_continue=True)
async def _get_quota(self, tenant: TenantInfo) -> ResourceQuota:
"""获取租户资源配额"""
plan = await self.tenant_service.get_plan(tenant.tenant_id)
return ResourceQuota(
max_concurrent_agents=plan.agent_limit,
max_sandbox_cpu=plan.cpu_limit,
max_sandbox_memory=plan.memory_limit,
max_storage_gb=plan.storage_limit,
max_api_calls_per_day=plan.api_rate_limit
)14.3.3 数据隔离策略
python
class TenantDataIsolation:
"""
租户数据隔离策略
"""
# 每个租户独立的数据存储前缀
@staticmethod
def get_table_name(table: str, tenant_id: str) -> str:
"""逻辑隔离:表名加租户前缀"""
return f"tenant_{tenant_id}_{table}"
@staticmethod
def get_file_path(base: str, tenant_id: str, *parts) -> str:
"""文件系统隔离:租户独立目录"""
return os.path.join(base, f"tenants/{tenant_id}", *parts)
@staticmethod
def get_vector_namespace(tenant_id: str) -> str:
"""向量存储隔离:租户独立 namespace"""
return f"tenant_{tenant_id}"
```python
## 14.4 RBAC 权限控制
### 14.4.1 权限模型
```python
# enterprise/auth/rbac.py
class Role(str, Enum):
TENANT_ADMIN = "tenant_admin" # 租户管理员
PROJECT_MANAGER = "project_manager" # 项目经理
MEMBER = "member" # 普通成员
VIEWER = "viewer" # 只读成员
EXTERNAL = "external" # 外部访客
class Permission(str, Enum):
# Agent 操作
AGENT_CREATE = "agent:create"
AGENT_DELETE = "agent:delete"
AGENT_VIEW = "agent:view"
# 项目操作
PROJECT_CREATE = "project:create"
PROJECT_MANAGE = "project:manage"
PROJECT_DELETE = "project:delete"
# 审批操作
APPROVAL_CREATE = "approval:create"
APPROVAL_GRANT = "approval:grant"
APPROVAL_VIEW_SENSITIVE = "approval:view_sensitive"
# 数据操作
DATA_READ = "data:read"
DATA_WRITE = "data:write"
DATA_DELETE = "data:delete"
DATA_EXPORT = "data:export"
# 管理操作
USER_MANAGE = "user:manage"
TENANT_SETTINGS = "tenant:settings"
# 角色-权限映射
ROLE_PERMISSIONS = {
Role.TENANT_ADMIN: [
Permission.AGENT_CREATE, Permission.AGENT_DELETE, Permission.AGENT_VIEW,
Permission.PROJECT_CREATE, Permission.PROJECT_MANAGE, Permission.PROJECT_DELETE,
Permission.APPROVAL_CREATE, Permission.APPROVAL_GRANT, Permission.APPROVAL_VIEW_SENSITIVE,
Permission.DATA_READ, Permission.DATA_WRITE, Permission.DATA_DELETE, Permission.DATA_EXPORT,
Permission.USER_MANAGE, Permission.TENANT_SETTINGS,
],
Role.PROJECT_MANAGER: [
Permission.AGENT_CREATE, Permission.AGENT_VIEW,
Permission.PROJECT_CREATE, Permission.PROJECT_MANAGE,
Permission.APPROVAL_CREATE, Permission.APPROVAL_GRANT,
Permission.DATA_READ, Permission.DATA_WRITE,
],
Role.MEMBER: [
Permission.AGENT_VIEW,
Permission.PROJECT_MANAGE,
Permission.DATA_READ, Permission.DATA_WRITE,
],
Role.VIEWER: [
Permission.AGENT_VIEW,
Permission.PROJECT_MANAGE,
Permission.DATA_READ,
],
Role.EXTERNAL: [
Permission.AGENT_VIEW,
Permission.DATA_READ,
],
}14.4.2 权限检查装饰器
python
# enterprise/auth/decorators.py
def require_permission(*permissions: Permission):
"""
权限检查装饰器
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
user = TenantContext.get()
if not user:
raise UnauthorizedError()
user_permissions = ROLE_PERMISSIONS.get(user.role, [])
# 检查是否有所需权限
for permission in permissions:
if permission not in user_permissions:
raise ForbiddenError(
f"Permission denied: {permission}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@router.post("/projects/{project_id}/approve")
@require_permission(Permission.APPROVAL_GRANT)
async def approve_action(project_id: str, action_type: str, approver: str):
...
```python
## 14.5 审计日志系统
### 14.5.1 审计事件定义
```python
# enterprise/audit/events.py
class AuditEventType(str, Enum):
# 认证事件
AUTH_LOGIN = "auth:login"
AUTH_LOGOUT = "auth:logout"
AUTH_FAILED = "auth:failed"
# Agent 事件
AGENT_CREATED = "agent:created"
AGENT_TASK_STARTED = "agent:task_started"
AGENT_TASK_COMPLETED = "agent:task_completed"
AGENT_ERROR = "agent:error"
# 数据事件
DATA_READ = "data:read"
DATA_WRITTEN = "data:written"
DATA_DELETED = "data:deleted"
DATA_EXPORTED = "data:exported"
# 审批事件
APPROVAL_REQUESTED = "approval:requested"
APPROVAL_GRANTED = "approval:granted"
APPROVAL_REJECTED = "approval:rejected"
# Sandbox 事件
SANDBOX_ACQUIRED = "sandbox:acquired"
SANDBOX_RELEASED = "sandbox:released"
SANDBOX_COMMAND_EXECUTED = "sandbox:command_executed"
# 合规事件
SENSITIVE_DATA_ACCESSED = "compliance:sensitive_data_accessed"
POLICY_VIOLATION = "compliance:policy_violation"
@dataclass
class AuditEvent:
"""审计事件"""
event_type: AuditEventType
tenant_id: str
user_id: str
timestamp: datetime
# 上下文
project_id: Optional[str] = None
agent_id: Optional[str] = None
# 详情
resource_type: Optional[str] = None
resource_id: Optional[str] = None
action: Optional[str] = None
result: Optional[str] = None
# 元数据
ip_address: Optional[str] = None
user_agent: Optional[str] = None
request_id: Optional[str] = None
# 防篡改
signature: Optional[str] = None14.5.2 审计日志存储
python
# enterprise/audit/storage.py
class AuditLogStorage:
"""
审计日志存储 - 只追加,防篡改
"""
def __init__(self, storage_backend: StorageBackend):
self.storage = storage_backend
async def append(self, event: AuditEvent):
"""追加审计事件"""
# 生成签名
event.signature = self._sign(event)
# 序列化为 JSONL
line = json.dumps(asdict(event), default=str)
# 写入追加存储
await self.storage.append(f"audit/{event.tenant_id}", line)
def _sign(self, event: AuditEvent) -> str:
"""防篡改签名"""
content = json.dumps(
asdict(event),
sort_keys=True,
exclude={"signature"}
)
return hmac.new(
self.secret_key,
content.encode(),
hashlib.sha256
).hexdigest()
async def verify(self, tenant_id: str) -> bool:
"""验证审计日志完整性"""
async for line in self.storage.read(f"audit/{tenant_id}"):
event = json.loads(line)
if not self._verify_signature(event):
return False
return True
def _verify_signature(self, event: dict) -> bool:
"""验证单个事件签名"""
stored_sig = event.pop("signature", None)
if not stored_sig:
return False
computed_sig = self._sign_from_dict(event)
return hmac.compare_digest(stored_sig, computed_sig)
```python
### 14.5.3 审计中间件
```python
# enterprise/audit/middleware.py
class AuditMiddleware:
"""
审计中间件 - 自动记录关键操作
"""
async def process(
self,
state: ThreadState,
event_type: AuditEventType
) -> MiddlewareResult:
"""记录审计事件"""
tenant = TenantContext.get()
event = AuditEvent(
event_type=event_type,
tenant_id=tenant.tenant_id,
user_id=tenant.user_id,
timestamp=datetime.now(),
project_id=state.get("project_id"),
agent_id=state.get("agent_id"),
ip_address=get_client_ip(),
user_agent=get_user_agent(),
request_id=get_request_id()
)
# 异步写入,不阻塞主流程
asyncio.create_task(self.audit_storage.append(event))
return MiddlewareResult(should_continue=True)14.6 企业知识库集成
14.6.1 知识库客户端
python
# enterprise/knowledge/base.py
class CorporateKnowledgeBase(ABC):
"""
企业知识库抽象接口
"""
@abstractmethod
async def search(
self,
query: str,
filters: KnowledgeFilters,
top_k: int
) -> List[KnowledgeEntry]:
pass
@abstractmethod
async def get_document(self, doc_id: str) -> Document:
pass
@abstractmethod
async def list_categories(self) -> List[str]:
pass
class KnowledgeEntry(BaseModel):
id: str
title: str
content: str
source: str
category: str
department: Optional[str]
sensitivity: SensitivityLevel
created_at: datetime
updated_at: datetime
metadata: Dict[str, Any]
class KnowledgeFilters(BaseModel):
categories: Optional[List[str]] = None
departments: Optional[List[str]] = None
sensitivity: Optional[List[SensitivityLevel]] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
project_id: Optional[str] = None
```python
### 14.6.2 知识检索中间件
```python
# enterprise/knowledge/middleware.py
class KnowledgeRetrievalMiddleware:
"""
企业知识检索中间件
"""
def __init__(
self,
knowledge_base: CorporateKnowledgeBase,
retriever: HybridRetriever
):
self.kb = knowledge_base
self.retriever = retriever
async def process(self, state: ThreadState) -> MiddlewareResult:
tenant = TenantContext.get()
# 1. 提取查询
query = self._extract_query(state)
if not query:
return MiddlewareResult(should_continue=True)
# 2. 构建过滤器(基于租户权限)
filters = KnowledgeFilters(
departments=[tenant.department] if tenant.department else None,
sensitivity=[SensitivityLevel.PUBLIC, SensitivityLevel.INTERNAL],
project_id=state.get("project_id")
)
# 3. 检索知识
results = await self.kb.search(query, filters, top_k=10)
# 4. 注入上下文
if results:
context = self._build_context(results)
state["knowledge_context"] = context
return MiddlewareResult(should_continue=True)
def _build_context(self, entries: List[KnowledgeEntry]) -> str:
parts = ["## 企业知识库参考\n"]
for entry in entries[:5]:
parts.append(f"""
### {entry.title}
**来源**: {entry.source}
**分类**: {entry.category}
{entry.content}
---
""")
return "\n".join(parts)14.7 Human-in-the-Loop 审批实现
14.7.1 审批规则引擎
python
# enterprise/approval/engine.py
class ApprovalRuleEngine:
"""
审批规则引擎
"""
def __init__(self, rules: List[ApprovalRule]):
self.rules = rules
async def check_requires_approval(
self,
action: AgentAction,
context: ActionContext
) -> ApprovalRequirement:
"""
检查动作是否需要审批
"""
for rule in self.rules:
if await rule.matches(action, context):
return ApprovalRequirement(
required=True,
rule=rule,
approvers=await rule.get_approvers(action, context),
urgency=rule.get_urgency(action, context)
)
return ApprovalRequirement(required=False)
async def execute_approval_flow(
self,
action: AgentAction,
requirement: ApprovalRequirement
) -> ApprovalResult:
"""
执行审批流程
"""
# 1. 创建审批请求
approval = await self.approval_store.create(
action=action,
approvers=requirement.approvers,
urgency=requirement.urgency
)
# 2. 通知审批人
await self.notifier.notify(approval)
# 3. 暂停 Agent,等待审批
return ApprovalResult(
status=ApprovalStatus.PENDING,
approval_id=approval.id
)
class ActionContext(BaseModel):
tenant_id: str
user_id: str
project_id: Optional[str]
role: Role
sandbox_id: Optional[str]
# 内置审批规则
class FinancialApprovalRule(ApprovalRule):
"""财务操作审批规则"""
async def matches(self, action: AgentAction, ctx: ActionContext) -> bool:
return (
action.type == "financial_operation" and
action.amount >= ctx.approval_threshold
)
async def get_approvers(self, action, ctx) -> List[User]:
if action.amount >= 100000:
return await self.user_service.get_directors()
else:
return await self.user_service.get_managers()
class SensitiveDataApprovalRule(ApprovalRule):
"""敏感数据访问审批规则"""
async def matches(self, action: AgentAction, ctx: ActionContext) -> bool:
return (
action.type == "data_access" and
action.data_sensitivity == SensitivityLevel.CONFIDENTIAL
)
```python
### 14.7.2 飞书审批卡片
```python
# enterprise/approval/feishu.py
class FeishuApprovalNotifier:
"""
飞书审批通知
"""
def __init__(self, feishu_client: FeishuClient):
self.client = feishu_client
async def notify(self, approval: Approval):
"""发送飞书审批卡片"""
for approver in approval.approvers:
card = self._build_approval_card(approval)
await self.client.send_interactive_card(
open_id=approver.feishu_open_id,
card=card
)
def _build_approval_card(self, approval: Approval) -> dict:
"""构建审批卡片"""
return {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"tag": "plain_text",
"content": f"🔔 待审批: {approval.action.title}"
},
"template": "orange" if approval.urgency == "urgent" else "blue"
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": self._format_action_details(approval)
}
},
{
"tag": "hr"
},
{
"tag": "note",
"elements": [
{
"tag": "plain_text",
"content": f"申请人: {approval.applicant.name}"
},
{
"tag": "plain_text",
"content": f"截止: {approval.deadline.strftime('%Y-%m-%d %H:%M')}"
}
]
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {"tag": "plain_text", "content": "✅ 批准"},
"type": "primary",
"value": {
"approval_id": approval.id,
"action": "approve"
}
},
{
"tag": "button",
"text": {"tag": "plain_text", "content": "❌ 拒绝"},
"type": "danger",
"value": {
"approval_id": approval.id,
"action": "reject"
}
}
]
}
]
}
}14.8 项目级长程任务管理
14.8.1 项目状态机
python
# enterprise/project/state_machine.py
class ProjectState(str, Enum):
PLANNING = "planning"
ACTIVE = "active"
PAUSED = "paused"
PENDING_APPROVAL = "pending_approval"
COMPLETED = "completed"
ARCHIVED = "archived"
class ProjectStateMachine:
"""
项目状态机
"""
def __init__(self):
self.transitions = {
ProjectState.PLANNING: [ProjectState.ACTIVE],
ProjectState.ACTIVE: [
ProjectState.PAUSED,
ProjectState.PENDING_APPROVAL,
ProjectState.COMPLETED
],
ProjectState.PAUSED: [ProjectState.ACTIVE],
ProjectState.PENDING_APPROVAL: [
ProjectState.ACTIVE,
ProjectState.PAUSED
],
ProjectState.COMPLETED: [ProjectState.ARCHIVED],
}
def can_transition(self, from_state: ProjectState, to_state: ProjectState) -> bool:
return to_state in self.transitions.get(from_state, [])
async def transition(
self,
project: Project,
to_state: ProjectState,
user: User,
reason: str = None
) -> Project:
"""执行状态转换"""
if not self.can_transition(project.state, to_state):
raise InvalidTransitionError(
f"Cannot transition from {project.state} to {to_state}"
)
# 记录转换
await self.audit_log.log(
event_type=AuditEventType.PROJECT_STATE_CHANGED,
resource_id=project.id,
action=f"{project.state} -> {to_state}",
reason=reason,
user_id=user.id
)
# 更新状态
project.state = to_state
project.updated_at = datetime.now()
project.state_history.append(StateTransition(
from_state=project.previous_state,
to_state=to_state,
changed_by=user.id,
reason=reason,
timestamp=datetime.now()
))
await self.project_store.save(project)
# 触发后续逻辑
await self._on_transition(project, to_state)
return project
```python
### 14.8.2 多 Agent 任务分解
```python
# enterprise/project/task_decomposition.py
class TaskDecomposer:
"""
任务分解器 - LLM 驱动
"""
def __init__(self, llm: ChatModel):
self.llm = llm
async def decompose(
self,
goal: str,
project: Project,
available_agents: List[AgentType]
) -> List[SubTask]:
"""
将项目目标分解为可执行的子任务
"""
prompt = f"""
项目: {project.name}
目标: {goal}
可用的 Agent 类型:
{', '.join([a.name for a in available_agents])}
请将目标分解为具体的子任务,每个子任务应该:
1. 有明确的交付物
2. 可由单个 Agent 完成
3. 有清晰的验收标准
返回 JSON 格式:
{{
"tasks": [
{{
"title": "任务标题",
"description": "任务描述",
"agent_type": "需要的 Agent 类型",
"dependencies": ["前置任务ID"],
"estimated_duration": "预计时长",
"checkpoints": ["检查点1", "检查点2"]
}}
]
}}
"""
response = await self.llm.ainvoke([
HumanMessage(content=prompt)
])
# 解析响应
result = json.loads(response.content)
tasks = [SubTask(**t) for t in result["tasks"]]
# 设置依赖关系
for i, task in enumerate(tasks):
task.index = i
task.project_id = project.id
return tasks14.9 多模态内容生成平台
企业级应用中,内容生成不再是单一的文本输出。DeerFlow 的多模态输出能力使企业能够自动化生成 PPT、培训视频、数据看板、播客等内容,大幅提升内部沟通效率和外部营销能力。
14.9.1 平台架构
┌──────────────────────────────────────────────────────────────────────┐
│ Enterprise Multimodal Content Platform │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 内容源 │ │ AI 处理层 │ │ 输出格式 │ │ 分发渠道 │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ • 文档 │ │ • 语义理解 │ │ • PPT │ │ • 企业微信 │ │
│ │ • 数据库 │ │ • 内容规划 │ │ • 视频 │ │ • 飞书 │ │
│ │ • API │ │ • 生成编排 │ │ • 播客 │ │ • 邮件 │ │
│ │ • 人工输入 │ │ • 质量审核 │ │ • 看板 │ │ • 内网门户 │ │
│ │ • 会议记录 │ │ │ │ • 信息图 │ │ • 知识库 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 企业级特性层 │ │
│ │ • 模板管理(品牌规范) • 审批工作流 • 版本控制 • 版权管理 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘14.9.2 典型应用场景
| 场景 | 输入 | 输出 | 价值 |
|---|---|---|---|
| 季度汇报 | 财务数据 + 项目总结 | PPT(品牌模板) | 节省 2-3 天制作时间 |
| 新员工培训 | 产品文档 + 流程说明 | 视频课程 | 标准化培训内容 |
| 技术播客 | 技术博客文章 | 音频播客 | 扩大内容受众 |
| 项目进度 | 任务数据(Jira/飞书) | 可视化看板 | 实时同步团队状态 |
| 营销物料 | 产品卖点 + 视觉素材 | 宣传视频 | 快速响应市场活动 |
14.9.3 品牌合规控制
企业级多模态输出必须遵循品牌规范:
python
# enterprise/multimodal/brand_controller.py
@dataclass
class BrandGuidelines:
"""品牌规范"""
primary_color: str = "#1E3A5F"
secondary_color: str = "#4A90E2"
font_family: str = "PingFang SC, Microsoft YaHei"
logo_url: str = "https://cdn.company.com/logo.png"
watermark: str = "CONFIDENTIAL"
# PPT 模板
ppt_template: str = "templates/enterprise_ppt.pptx"
# 视频片头片尾
video_intro: str = "assets/intro.mp4"
video_outro: str = "assets/outro.mp4"
class BrandController:
"""品牌控制器 — 确保所有输出符合企业规范"""
def __init__(self, guidelines: BrandGuidelines):
self.guidelines = guidelines
def apply_to_ppt(self, ppt_path: str) -> str:
"""应用品牌规范到 PPT"""
from pptx import Presentation
prs = Presentation(ppt_path)
# 应用主题色
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
for paragraph in shape.text_frame.paragraphs:
paragraph.font.color.rgb = RgbColor.from_string(
self.guidelines.primary_color
)
# 添加 Logo
for slide in prs.slides:
left = Inches(9.5)
top = Inches(0.2)
slide.shapes.add_picture(
self.guidelines.logo_url,
left, top,
width=Inches(0.8)
)
# 保存
output_path = ppt_path.replace(".pptx", "_branded.pptx")
prs.save(output_path)
return output_path
def apply_to_video(self, video_path: str) -> str:
"""应用品牌规范到视频"""
from moviepy.editor import (
VideoFileClip, concatenate_videoclips
)
# 添加片头片尾
intro = VideoFileClip(self.guidelines.video_intro)
main = VideoFileClip(video_path)
outro = VideoFileClip(self.guidelines.video_outro)
final = concatenate_videoclips([intro, main, outro])
output_path = video_path.replace(".mp4", "_branded.mp4")
final.write_videofile(output_path)
return output_path
```python
### 14.9.4 审批工作流
多模态内容发布前需要审批:
```python
# enterprise/multimodal/approval_workflow.py
class ContentApprovalWorkflow:
"""内容审批工作流"""
def __init__(self):
self.stages = [
"auto_review", # 自动合规检查
"department_lead", # 部门负责人
"brand_team", # 品牌团队
"legal_review", # 法务审核(敏感内容)
"final_publish" # 发布
]
async def submit_for_approval(
self,
content: MultimodalOutput,
submitter: User,
department: str
) -> str:
"""提交内容审批"""
# 1. 自动检查
auto_result = await self._auto_review(content)
if not auto_result.passed:
return f"REJECTED: {auto_result.reason}"
# 2. 创建审批工单
ticket = ApprovalTicket(
content_id=content.file_path,
submitter=submitter.id,
department=department,
stages=self.stages,
current_stage="department_lead"
)
# 3. 通知部门负责人
await self._notify_approver(
ticket,
role="department_lead",
department=department
)
return ticket.id
async def _auto_review(self, content: MultimodalOutput) -> ReviewResult:
"""自动合规审查"""
checks = []
# 检查敏感词
if content.type in ["ppt", "video", "podcast"]:
text_content = await self._extract_text(content)
sensitive_words = self._check_sensitive_words(text_content)
checks.append(len(sensitive_words) == 0)
# 检查品牌元素
if content.type == "ppt":
has_brand_elements = self._check_brand_elements(content.file_path)
checks.append(has_brand_elements)
# 检查文件大小
checks.append(content.size_bytes < 100 * 1024 * 1024) # 100MB
return ReviewResult(
passed=all(checks),
reason="Auto review passed" if all(checks) else "Failed auto checks"
)14.9.5 与现有系统的集成
python
# enterprise/multimodal/integrations.py
class EnterpriseIntegrationHub:
"""企业集成中心"""
async def sync_to_feishu(
self,
content: MultimodalOutput,
chat_id: str
):
"""同步内容到飞书群"""
# 使用飞书机器人 API 发送文件
pass
async def sync_to_wecom(
self,
content: MultimodalOutput,
room_id: str
):
"""同步内容到企业微信群"""
# 使用企业微信 API 发送文件
pass
async def publish_to_knowledge_base(
self,
content: MultimodalOutput,
category: str
):
"""发布到企业知识库"""
# 上传到内部 Wiki/Confluence/飞书文档
pass
async def send_email_digest(
self,
contents: list[MultimodalOutput],
recipients: list[str]
):
"""发送邮件摘要"""
# 生成邮件,附带内容链接
pass
```python
### 14.9.6 成本与配额管理
企业级多模态生成涉及 API 调用成本,需要配额管理:
```python
# enterprise/multimodal/quota_manager.py
@dataclass
class ModelCost:
"""模型成本配置"""
model: str
input_price_per_1k: float # 美元
output_price_per_1k: float # 美元
image_price_per_image: float
audio_price_per_minute: float
class QuotaManager:
"""配额管理器"""
COST_TABLE = {
"dall-e-3": ModelCost("dall-e-3", 0, 0, 0.04, 0),
"gpt-4o": ModelCost("gpt-4o", 0.005, 0.015, 0, 0),
"tts-1": ModelCost("tts-1", 0, 0, 0, 0.015),
}
def __init__(self, department_budgets: dict):
self.budgets = department_budgets
self.usage = defaultdict(lambda: defaultdict(float))
def check_quota(
self,
department: str,
model: str,
estimated_cost: float
) -> bool:
"""检查配额是否充足"""
budget = self.budgets.get(department, 1000.0) # 默认 $1000
used = sum(self.usage[department].values())
return (used + estimated_cost) <= budget
def record_usage(
self,
department: str,
model: str,
tokens_in: int = 0,
tokens_out: int = 0,
images: int = 0,
audio_minutes: float = 0
):
"""记录使用量"""
cost_config = self.COST_TABLE.get(model)
if not cost_config:
return
cost = (
tokens_in / 1000 * cost_config.input_price_per_1k +
tokens_out / 1000 * cost_config.output_price_per_1k +
images * cost_config.image_price_per_image +
audio_minutes * cost_config.audio_price_per_minute
)
self.usage[department][model] += cost14.10 部署架构
🏢 企业级建议:K8s 部署的
replicas数量应基于实际负载设置,而不是固定值。建议配置 HPA(Horizontal Pod Autoscaler)根据 CPU/内存/QPS 自动伸缩。
14.10.1 Kubernetes 部署
yaml
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: enterprise-gateway
spec:
replicas: 3
selector:
matchLabels:
app: enterprise-gateway
template:
spec:
containers:
- name: gateway
image: enterprise/gateway:latest
ports:
- containerPort: 8001
env:
- name: TENANT_DB_URL
valueFrom:
secretKeyRef:
name: enterprise-secrets
key: tenant-db-url
- name: AUDIT_STORAGE_KEY
valueFrom:
secretKeyRef:
name: enterprise-secrets
key: audit-signing-key
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"yaml
# k8s/langgraph-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: enterprise-langgraph
spec:
replicas: 5
selector:
matchLabels:
app: enterprise-langgraph
template:
spec:
containers:
- name: langgraph
image: enterprise/langgraph:latest
ports:
- containerPort: 2024
env:
- name: SANDBOX_MODE
value: "provisioner"
- name: PROVISIONER_URL
value: "http://enterprise-provisioner:8002"14.10.2 配置管理
yaml
# config/enterprise-config.yaml
enterprise:
# 多租户配置
multi_tenant:
enabled: true
isolation_mode: "strict" # strict | relaxed
# RBAC 配置
rbac:
default_role: member
role_hierarchy:
- tenant_admin
- project_manager
- member
- viewer
- external
# 审计配置
audit:
enabled: true
storage:
type: s3
bucket: enterprise-audit-logs
retention_days: 2555 # 7年,合规要求
# 知识库配置
knowledge:
providers:
- type: vector
endpoint: http://knowledge-search:8080
- type: graph
endpoint: http://knowledge-graph:8080
default_top_k: 10
# Sandbox 配置
sandbox:
mode: provisioner
provisioner:
url: http://enterprise-provisioner:8002
kubeconfig_path: /etc/kube/config
defaults:
cpu_limit: "2"
memory_limit: "4Gi"
timeout_seconds: 360014.11 小结
本章展示了基于 DeerFlow 构建企业级应用的完整开发路径:
| 模块 | 实现方式 |
|---|---|
| 多租户隔离 | ThreadLocal + 中间件 + 数据层隔离 |
| RBAC | 角色-权限映射 + 装饰器检查 |
| 审计日志 | 事件驱动 + 防篡改签名 |
| 企业知识库 | 抽象接口 + 检索中间件 |
| Human-in-loop | 规则引擎 + 飞书卡片通知 |
| 长程任务 | 状态机 + LLM 驱动的任务分解 |
核心原则:
- 复用 DeerFlow — Sub-Agent、Skills、Sandbox 能力直接复用
- 中间件扩展 — 通过中间件链注入企业逻辑
- 独立服务 — 租户、RBAC、审计作为独立模块
- 配置驱动 — 通过 config.yaml 管理企业参数
这样既能享受 DeerFlow 快速迭代的红利,又能满足企业级安全与合规要求。
本章小结
本章通过企业级案例展示了 DeerFlow 的二次开发与部署实践:
- 多租户架构:基于 Gateway API 实现租户隔离,通过 RBAC 控制用户权限,确保数据与资源的安全边界。
- 企业知识库:集成向量数据库(如 Milvus/Pinecone)与私有文档,实现项目级记忆的持久化与召回。
- 审计与合规:全链路审计日志记录 Agent 行为、人工干预、沙箱执行,支持合规报告与事后追溯。
- Kubernetes 部署:通过 Deployment + Service + ConfigMap 实现高可用部署,HPA 自动伸缩应对流量波动。
🏢 企业级建议:在生产环境上线前,务必完成渗透测试(尤其关注沙箱逃逸风险)与性能基准测试(并发 Agent 数、P95 延迟)。
下一步:参考附录 A(配置参考)与附录 B(贡献指南),开始你的 DeerFlow 二次开发之旅。