Skip to content

第十四章 · DeerFlow 企业级应用案例

本章目标

  1. 掌握企业级 DeerFlow 的架构设计与多租户隔离
  2. 理解 RBAC、审计日志与企业知识库集成方案
  3. 了解多模态内容生成平台与 Kubernetes 部署实践

本章目标

  1. 掌握企业级 DeerFlow 的架构设计与多租户隔离
  2. 理解 RBAC、审计日志与企业知识库集成方案
  3. 了解多模态内容生成平台与 Kubernetes 部署实践

14.1 案例背景:企业级应用需求分析

基于企业级应用场景的需求分析,本章聚焦 DeerFlow 二次开发的具体实现路径。

核心需求映射:

企业级模块二次开发重点
Agent TeamsSub-Agent 体系扩展 + 自定义 Agent 类型
Memory企业知识库集成 + 项目级记忆
Sandbox企业级沙箱(审计、配额)
Human-in-loop审批中间件 + 审计日志
Context Engineering企业知识注入 + 合规过滤
多租户租户隔离 + RBAC

14.2 整体架构设计

┌──────────────────────────────────────────────────────────────────────┐
│                      Enterprise Architecture                            │
├──────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                    Frontend (Next.js)                          │   │
│  │              Agent Chat │ Project Dashboard │ Admin           │   │
│  └─────────────────────────────────┬────────────────────────────┘   │
│                                    │                                   │
│                                    ▼                                   │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                    Nginx (Port 2026)                           │   │
│  │                  统一入口 + 认证 + 限流                        │   │
│  └──────────┬─────────────────────┬──────────────────────────────┘   │
│             │                     │                                   │
│             ▼                     ▼                                   │
│  ┌─────────────────┐   ┌─────────────────────────────────────┐     │
│  │  Gateway API    │   │        LangGraph Server              │     │
│  │  (8001)         │   │        (2024)                        │     │
│  │                 │   │                                       │     │
│  │  - Auth         │   │  ┌─────────────────────────────────┐ │     │
│  │  - RBAC         │   │  │   Enterprise Agent              │ │     │
│  │  - Tenant Mgmt  │   │  │                                 │ │     │
│  │  - Audit        │   │  │  - Middleware Chain (定制)      │ │     │
│  └─────────────────┘   │  │  - Agent Teams                  │ │     │
│                        │  │  - Skills (企业)                 │ │     │
│                        │  └─────────────────────────────────┘ │     │
│                        └────────────────────┬────────────────────┘     │
│                                             │                          │
│         ┌───────────────────────────────────┼───────────────────┐    │
│         │                                   │                    │    │
│         ▼                                   ▼                    ▼    │
│  ┌─────────────┐                ┌──────────────┐      ┌──────────┐ │
│  │  Corporate   │                │   Sandbox     │      │ Memory   │ │
│  │  Knowledge   │                │   (K8s)      │      │ System   │ │
│  │  Base        │                │              │      │          │ │
│  └─────────────┘                └──────────────┘      └──────────┘ │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

14.3 多租户隔离实现

💡 最佳实践:多租户的 isolation_mode 默认 strict 最安全,但在内部工具场景下,relaxed 模式配合审计日志可以在安全与效率间取得更好平衡。

14.3.1 租户上下文

python
# enterprise/tenant/context.py

class TenantContext:
    """
    租户上下文 - ThreadLocal 存储
    """
    
    _local = threading.local()
    
    @classmethod
    def set(cls, tenant_id: str, user_id: str, role: str):
        cls._local.tenant_id = tenant_id
        cls._local.user_id = user_id
        cls._local.role = role
    
    @classmethod
    def get(cls) -> "TenantInfo":
        return TenantInfo(
            tenant_id=getattr(cls._local, 'tenant_id', None),
            user_id=getattr(cls._local, 'user_id', None),
            role=getattr(cls._local, 'role', None)
        )
    
    @classmethod
    def clear(cls):
        cls._local.tenant_id = None
        cls._local.user_id = None
        cls._local.role = None
```python

### 14.3.2 租户中间件

```python
# enterprise/tenant/middleware.py

class TenantMiddleware:
    """
    租户隔离中间件
    """
    
    async def process(self, state: ThreadState) -> MiddlewareResult:
        # 1. 从请求中获取租户信息
        tenant_info = TenantContext.get()
        
        if not tenant_info.tenant_id:
            raise UnauthorizedError("Tenant ID required")
        
        # 2. 注入到 ThreadState
        state["tenant_id"] = tenant_info.tenant_id
        state["user_id"] = tenant_info.user_id
        state["user_role"] = tenant_info.role
        
        # 3. 设置资源配额
        quota = await self._get_quota(tenant_info)
        state["resource_quota"] = quota
        
        return MiddlewareResult(should_continue=True)
    
    async def _get_quota(self, tenant: TenantInfo) -> ResourceQuota:
        """获取租户资源配额"""
        plan = await self.tenant_service.get_plan(tenant.tenant_id)
        
        return ResourceQuota(
            max_concurrent_agents=plan.agent_limit,
            max_sandbox_cpu=plan.cpu_limit,
            max_sandbox_memory=plan.memory_limit,
            max_storage_gb=plan.storage_limit,
            max_api_calls_per_day=plan.api_rate_limit
        )

14.3.3 数据隔离策略

python
class TenantDataIsolation:
    """
    租户数据隔离策略
    """
    
    # 每个租户独立的数据存储前缀
    @staticmethod
    def get_table_name(table: str, tenant_id: str) -> str:
        """逻辑隔离:表名加租户前缀"""
        return f"tenant_{tenant_id}_{table}"
    
    @staticmethod
    def get_file_path(base: str, tenant_id: str, *parts) -> str:
        """文件系统隔离:租户独立目录"""
        return os.path.join(base, f"tenants/{tenant_id}", *parts)
    
    @staticmethod
    def get_vector_namespace(tenant_id: str) -> str:
        """向量存储隔离:租户独立 namespace"""
        return f"tenant_{tenant_id}"
```python

## 14.4 RBAC 权限控制

### 14.4.1 权限模型

```python
# enterprise/auth/rbac.py

class Role(str, Enum):
    TENANT_ADMIN = "tenant_admin"      # 租户管理员
    PROJECT_MANAGER = "project_manager"  # 项目经理
    MEMBER = "member"                  # 普通成员
    VIEWER = "viewer"                  # 只读成员
    EXTERNAL = "external"              # 外部访客

class Permission(str, Enum):
    # Agent 操作
    AGENT_CREATE = "agent:create"
    AGENT_DELETE = "agent:delete"
    AGENT_VIEW = "agent:view"
    
    # 项目操作
    PROJECT_CREATE = "project:create"
    PROJECT_MANAGE = "project:manage"
    PROJECT_DELETE = "project:delete"
    
    # 审批操作
    APPROVAL_CREATE = "approval:create"
    APPROVAL_GRANT = "approval:grant"
    APPROVAL_VIEW_SENSITIVE = "approval:view_sensitive"
    
    # 数据操作
    DATA_READ = "data:read"
    DATA_WRITE = "data:write"
    DATA_DELETE = "data:delete"
    DATA_EXPORT = "data:export"
    
    # 管理操作
    USER_MANAGE = "user:manage"
    TENANT_SETTINGS = "tenant:settings"

# 角色-权限映射
ROLE_PERMISSIONS = {
    Role.TENANT_ADMIN: [
        Permission.AGENT_CREATE, Permission.AGENT_DELETE, Permission.AGENT_VIEW,
        Permission.PROJECT_CREATE, Permission.PROJECT_MANAGE, Permission.PROJECT_DELETE,
        Permission.APPROVAL_CREATE, Permission.APPROVAL_GRANT, Permission.APPROVAL_VIEW_SENSITIVE,
        Permission.DATA_READ, Permission.DATA_WRITE, Permission.DATA_DELETE, Permission.DATA_EXPORT,
        Permission.USER_MANAGE, Permission.TENANT_SETTINGS,
    ],
    Role.PROJECT_MANAGER: [
        Permission.AGENT_CREATE, Permission.AGENT_VIEW,
        Permission.PROJECT_CREATE, Permission.PROJECT_MANAGE,
        Permission.APPROVAL_CREATE, Permission.APPROVAL_GRANT,
        Permission.DATA_READ, Permission.DATA_WRITE,
    ],
    Role.MEMBER: [
        Permission.AGENT_VIEW,
        Permission.PROJECT_MANAGE,
        Permission.DATA_READ, Permission.DATA_WRITE,
    ],
    Role.VIEWER: [
        Permission.AGENT_VIEW,
        Permission.PROJECT_MANAGE,
        Permission.DATA_READ,
    ],
    Role.EXTERNAL: [
        Permission.AGENT_VIEW,
        Permission.DATA_READ,
    ],
}

14.4.2 权限检查装饰器

python
# enterprise/auth/decorators.py

def require_permission(*permissions: Permission):
    """
    权限检查装饰器
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            user = TenantContext.get()
            
            if not user:
                raise UnauthorizedError()
            
            user_permissions = ROLE_PERMISSIONS.get(user.role, [])
            
            # 检查是否有所需权限
            for permission in permissions:
                if permission not in user_permissions:
                    raise ForbiddenError(
                        f"Permission denied: {permission}"
                    )
            
            return await func(*args, **kwargs)
        
        return wrapper
    return decorator

# 使用示例
@router.post("/projects/{project_id}/approve")
@require_permission(Permission.APPROVAL_GRANT)
async def approve_action(project_id: str, action_type: str, approver: str):
    ...
```python

## 14.5 审计日志系统

### 14.5.1 审计事件定义

```python
# enterprise/audit/events.py

class AuditEventType(str, Enum):
    # 认证事件
    AUTH_LOGIN = "auth:login"
    AUTH_LOGOUT = "auth:logout"
    AUTH_FAILED = "auth:failed"
    
    # Agent 事件
    AGENT_CREATED = "agent:created"
    AGENT_TASK_STARTED = "agent:task_started"
    AGENT_TASK_COMPLETED = "agent:task_completed"
    AGENT_ERROR = "agent:error"
    
    # 数据事件
    DATA_READ = "data:read"
    DATA_WRITTEN = "data:written"
    DATA_DELETED = "data:deleted"
    DATA_EXPORTED = "data:exported"
    
    # 审批事件
    APPROVAL_REQUESTED = "approval:requested"
    APPROVAL_GRANTED = "approval:granted"
    APPROVAL_REJECTED = "approval:rejected"
    
    # Sandbox 事件
    SANDBOX_ACQUIRED = "sandbox:acquired"
    SANDBOX_RELEASED = "sandbox:released"
    SANDBOX_COMMAND_EXECUTED = "sandbox:command_executed"
    
    # 合规事件
    SENSITIVE_DATA_ACCESSED = "compliance:sensitive_data_accessed"
    POLICY_VIOLATION = "compliance:policy_violation"

@dataclass
class AuditEvent:
    """审计事件"""
    event_type: AuditEventType
    tenant_id: str
    user_id: str
    timestamp: datetime
    
    # 上下文
    project_id: Optional[str] = None
    agent_id: Optional[str] = None
    
    # 详情
    resource_type: Optional[str] = None
    resource_id: Optional[str] = None
    action: Optional[str] = None
    result: Optional[str] = None
    
    # 元数据
    ip_address: Optional[str] = None
    user_agent: Optional[str] = None
    request_id: Optional[str] = None
    
    # 防篡改
    signature: Optional[str] = None

14.5.2 审计日志存储

python
# enterprise/audit/storage.py

class AuditLogStorage:
    """
    审计日志存储 - 只追加,防篡改
    """
    
    def __init__(self, storage_backend: StorageBackend):
        self.storage = storage_backend
    
    async def append(self, event: AuditEvent):
        """追加审计事件"""
        # 生成签名
        event.signature = self._sign(event)
        
        # 序列化为 JSONL
        line = json.dumps(asdict(event), default=str)
        
        # 写入追加存储
        await self.storage.append(f"audit/{event.tenant_id}", line)
    
    def _sign(self, event: AuditEvent) -> str:
        """防篡改签名"""
        content = json.dumps(
            asdict(event),
            sort_keys=True,
            exclude={"signature"}
        )
        return hmac.new(
            self.secret_key,
            content.encode(),
            hashlib.sha256
        ).hexdigest()
    
    async def verify(self, tenant_id: str) -> bool:
        """验证审计日志完整性"""
        async for line in self.storage.read(f"audit/{tenant_id}"):
            event = json.loads(line)
            
            if not self._verify_signature(event):
                return False
        
        return True
    
    def _verify_signature(self, event: dict) -> bool:
        """验证单个事件签名"""
        stored_sig = event.pop("signature", None)
        if not stored_sig:
            return False
        
        computed_sig = self._sign_from_dict(event)
        return hmac.compare_digest(stored_sig, computed_sig)
```python

### 14.5.3 审计中间件

```python
# enterprise/audit/middleware.py

class AuditMiddleware:
    """
    审计中间件 - 自动记录关键操作
    """
    
    async def process(
        self,
        state: ThreadState,
        event_type: AuditEventType
    ) -> MiddlewareResult:
        """记录审计事件"""
        tenant = TenantContext.get()
        
        event = AuditEvent(
            event_type=event_type,
            tenant_id=tenant.tenant_id,
            user_id=tenant.user_id,
            timestamp=datetime.now(),
            project_id=state.get("project_id"),
            agent_id=state.get("agent_id"),
            ip_address=get_client_ip(),
            user_agent=get_user_agent(),
            request_id=get_request_id()
        )
        
        # 异步写入,不阻塞主流程
        asyncio.create_task(self.audit_storage.append(event))
        
        return MiddlewareResult(should_continue=True)

14.6 企业知识库集成

14.6.1 知识库客户端

python
# enterprise/knowledge/base.py

class CorporateKnowledgeBase(ABC):
    """
    企业知识库抽象接口
    """
    
    @abstractmethod
    async def search(
        self,
        query: str,
        filters: KnowledgeFilters,
        top_k: int
    ) -> List[KnowledgeEntry]:
        pass
    
    @abstractmethod
    async def get_document(self, doc_id: str) -> Document:
        pass
    
    @abstractmethod
    async def list_categories(self) -> List[str]:
        pass


class KnowledgeEntry(BaseModel):
    id: str
    title: str
    content: str
    source: str
    category: str
    department: Optional[str]
    sensitivity: SensitivityLevel
    created_at: datetime
    updated_at: datetime
    metadata: Dict[str, Any]


class KnowledgeFilters(BaseModel):
    categories: Optional[List[str]] = None
    departments: Optional[List[str]] = None
    sensitivity: Optional[List[SensitivityLevel]] = None
    date_from: Optional[datetime] = None
    date_to: Optional[datetime] = None
    project_id: Optional[str] = None
```python

### 14.6.2 知识检索中间件

```python
# enterprise/knowledge/middleware.py

class KnowledgeRetrievalMiddleware:
    """
    企业知识检索中间件
    """
    
    def __init__(
        self,
        knowledge_base: CorporateKnowledgeBase,
        retriever: HybridRetriever
    ):
        self.kb = knowledge_base
        self.retriever = retriever
    
    async def process(self, state: ThreadState) -> MiddlewareResult:
        tenant = TenantContext.get()
        
        # 1. 提取查询
        query = self._extract_query(state)
        if not query:
            return MiddlewareResult(should_continue=True)
        
        # 2. 构建过滤器(基于租户权限)
        filters = KnowledgeFilters(
            departments=[tenant.department] if tenant.department else None,
            sensitivity=[SensitivityLevel.PUBLIC, SensitivityLevel.INTERNAL],
            project_id=state.get("project_id")
        )
        
        # 3. 检索知识
        results = await self.kb.search(query, filters, top_k=10)
        
        # 4. 注入上下文
        if results:
            context = self._build_context(results)
            state["knowledge_context"] = context
        
        return MiddlewareResult(should_continue=True)
    
    def _build_context(self, entries: List[KnowledgeEntry]) -> str:
        parts = ["## 企业知识库参考\n"]
        
        for entry in entries[:5]:
            parts.append(f"""
### {entry.title}
**来源**: {entry.source}
**分类**: {entry.category}

{entry.content}

---
""")
        
        return "\n".join(parts)

14.7 Human-in-the-Loop 审批实现

14.7.1 审批规则引擎

python
# enterprise/approval/engine.py

class ApprovalRuleEngine:
    """
    审批规则引擎
    """
    
    def __init__(self, rules: List[ApprovalRule]):
        self.rules = rules
    
    async def check_requires_approval(
        self,
        action: AgentAction,
        context: ActionContext
    ) -> ApprovalRequirement:
        """
        检查动作是否需要审批
        """
        for rule in self.rules:
            if await rule.matches(action, context):
                return ApprovalRequirement(
                    required=True,
                    rule=rule,
                    approvers=await rule.get_approvers(action, context),
                    urgency=rule.get_urgency(action, context)
                )
        
        return ApprovalRequirement(required=False)
    
    async def execute_approval_flow(
        self,
        action: AgentAction,
        requirement: ApprovalRequirement
    ) -> ApprovalResult:
        """
        执行审批流程
        """
        # 1. 创建审批请求
        approval = await self.approval_store.create(
            action=action,
            approvers=requirement.approvers,
            urgency=requirement.urgency
        )
        
        # 2. 通知审批人
        await self.notifier.notify(approval)
        
        # 3. 暂停 Agent,等待审批
        return ApprovalResult(
            status=ApprovalStatus.PENDING,
            approval_id=approval.id
        )


class ActionContext(BaseModel):
    tenant_id: str
    user_id: str
    project_id: Optional[str]
    role: Role
    sandbox_id: Optional[str]


# 内置审批规则
class FinancialApprovalRule(ApprovalRule):
    """财务操作审批规则"""
    
    async def matches(self, action: AgentAction, ctx: ActionContext) -> bool:
        return (
            action.type == "financial_operation" and
            action.amount >= ctx.approval_threshold
        )
    
    async def get_approvers(self, action, ctx) -> List[User]:
        if action.amount >= 100000:
            return await self.user_service.get_directors()
        else:
            return await self.user_service.get_managers()


class SensitiveDataApprovalRule(ApprovalRule):
    """敏感数据访问审批规则"""
    
    async def matches(self, action: AgentAction, ctx: ActionContext) -> bool:
        return (
            action.type == "data_access" and
            action.data_sensitivity == SensitivityLevel.CONFIDENTIAL
        )
```python

### 14.7.2 飞书审批卡片

```python
# enterprise/approval/feishu.py

class FeishuApprovalNotifier:
    """
    飞书审批通知
    """
    
    def __init__(self, feishu_client: FeishuClient):
        self.client = feishu_client
    
    async def notify(self, approval: Approval):
        """发送飞书审批卡片"""
        for approver in approval.approvers:
            card = self._build_approval_card(approval)
            
            await self.client.send_interactive_card(
                open_id=approver.feishu_open_id,
                card=card
            )
    
    def _build_approval_card(self, approval: Approval) -> dict:
        """构建审批卡片"""
        return {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {
                        "tag": "plain_text",
                        "content": f"🔔 待审批: {approval.action.title}"
                    },
                    "template": "orange" if approval.urgency == "urgent" else "blue"
                },
                "elements": [
                    {
                        "tag": "div",
                        "text": {
                            "tag": "lark_md",
                            "content": self._format_action_details(approval)
                        }
                    },
                    {
                        "tag": "hr"
                    },
                    {
                        "tag": "note",
                        "elements": [
                            {
                                "tag": "plain_text",
                                "content": f"申请人: {approval.applicant.name}"
                            },
                            {
                                "tag": "plain_text",
                                "content": f"截止: {approval.deadline.strftime('%Y-%m-%d %H:%M')}"
                            }
                        ]
                    },
                    {
                        "tag": "action",
                        "actions": [
                            {
                                "tag": "button",
                                "text": {"tag": "plain_text", "content": "✅ 批准"},
                                "type": "primary",
                                "value": {
                                    "approval_id": approval.id,
                                    "action": "approve"
                                }
                            },
                            {
                                "tag": "button",
                                "text": {"tag": "plain_text", "content": "❌ 拒绝"},
                                "type": "danger",
                                "value": {
                                    "approval_id": approval.id,
                                    "action": "reject"
                                }
                            }
                        ]
                    }
                ]
            }
        }

14.8 项目级长程任务管理

14.8.1 项目状态机

python
# enterprise/project/state_machine.py

class ProjectState(str, Enum):
    PLANNING = "planning"
    ACTIVE = "active"
    PAUSED = "paused"
    PENDING_APPROVAL = "pending_approval"
    COMPLETED = "completed"
    ARCHIVED = "archived"


class ProjectStateMachine:
    """
    项目状态机
    """
    
    def __init__(self):
        self.transitions = {
            ProjectState.PLANNING: [ProjectState.ACTIVE],
            ProjectState.ACTIVE: [
                ProjectState.PAUSED,
                ProjectState.PENDING_APPROVAL,
                ProjectState.COMPLETED
            ],
            ProjectState.PAUSED: [ProjectState.ACTIVE],
            ProjectState.PENDING_APPROVAL: [
                ProjectState.ACTIVE,
                ProjectState.PAUSED
            ],
            ProjectState.COMPLETED: [ProjectState.ARCHIVED],
        }
    
    def can_transition(self, from_state: ProjectState, to_state: ProjectState) -> bool:
        return to_state in self.transitions.get(from_state, [])
    
    async def transition(
        self,
        project: Project,
        to_state: ProjectState,
        user: User,
        reason: str = None
    ) -> Project:
        """执行状态转换"""
        if not self.can_transition(project.state, to_state):
            raise InvalidTransitionError(
                f"Cannot transition from {project.state} to {to_state}"
            )
        
        # 记录转换
        await self.audit_log.log(
            event_type=AuditEventType.PROJECT_STATE_CHANGED,
            resource_id=project.id,
            action=f"{project.state} -> {to_state}",
            reason=reason,
            user_id=user.id
        )
        
        # 更新状态
        project.state = to_state
        project.updated_at = datetime.now()
        project.state_history.append(StateTransition(
            from_state=project.previous_state,
            to_state=to_state,
            changed_by=user.id,
            reason=reason,
            timestamp=datetime.now()
        ))
        
        await self.project_store.save(project)
        
        # 触发后续逻辑
        await self._on_transition(project, to_state)
        
        return project
```python

### 14.8.2 多 Agent 任务分解

```python
# enterprise/project/task_decomposition.py

class TaskDecomposer:
    """
    任务分解器 - LLM 驱动
    """
    
    def __init__(self, llm: ChatModel):
        self.llm = llm
    
    async def decompose(
        self,
        goal: str,
        project: Project,
        available_agents: List[AgentType]
    ) -> List[SubTask]:
        """
        将项目目标分解为可执行的子任务
        """
        prompt = f"""
        项目: {project.name}
        目标: {goal}
        
        可用的 Agent 类型:
        {', '.join([a.name for a in available_agents])}
        
        请将目标分解为具体的子任务,每个子任务应该:
        1. 有明确的交付物
        2. 可由单个 Agent 完成
        3. 有清晰的验收标准
        
        返回 JSON 格式:
        {{
            "tasks": [
                {{
                    "title": "任务标题",
                    "description": "任务描述",
                    "agent_type": "需要的 Agent 类型",
                    "dependencies": ["前置任务ID"],
                    "estimated_duration": "预计时长",
                    "checkpoints": ["检查点1", "检查点2"]
                }}
            ]
        }}
        """
        
        response = await self.llm.ainvoke([
            HumanMessage(content=prompt)
        ])
        
        # 解析响应
        result = json.loads(response.content)
        tasks = [SubTask(**t) for t in result["tasks"]]
        
        # 设置依赖关系
        for i, task in enumerate(tasks):
            task.index = i
            task.project_id = project.id
        
        return tasks

14.9 多模态内容生成平台

企业级应用中,内容生成不再是单一的文本输出。DeerFlow 的多模态输出能力使企业能够自动化生成 PPT、培训视频、数据看板、播客等内容,大幅提升内部沟通效率和外部营销能力。

14.9.1 平台架构

┌──────────────────────────────────────────────────────────────────────┐
│                Enterprise Multimodal Content Platform                 │
├──────────────────────────────────────────────────────────────────────┤
│                                                                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  │
│  │   内容源    │  │  AI 处理层  │  │  输出格式   │  │  分发渠道   │  │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤  ├─────────────┤  │
│  │ • 文档      │  │ • 语义理解  │  │ • PPT      │  │ • 企业微信  │  │
│  │ • 数据库    │  │ • 内容规划  │  │ • 视频     │  │ • 飞书      │  │
│  │ • API       │  │ • 生成编排  │  │ • 播客     │  │ • 邮件      │  │
│  │ • 人工输入  │  │ • 质量审核  │  │ • 看板     │  │ • 内网门户  │  │
│  │ • 会议记录  │  │             │  │ • 信息图   │  │ • 知识库    │  │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘  │
│                                                                       │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │                    企业级特性层                                │   │
│  │  • 模板管理(品牌规范)  • 审批工作流  • 版本控制  • 版权管理  │   │
│  └──────────────────────────────────────────────────────────────┘   │
│                                                                       │
└──────────────────────────────────────────────────────────────────────┘

14.9.2 典型应用场景

场景输入输出价值
季度汇报财务数据 + 项目总结PPT(品牌模板)节省 2-3 天制作时间
新员工培训产品文档 + 流程说明视频课程标准化培训内容
技术播客技术博客文章音频播客扩大内容受众
项目进度任务数据(Jira/飞书)可视化看板实时同步团队状态
营销物料产品卖点 + 视觉素材宣传视频快速响应市场活动

14.9.3 品牌合规控制

企业级多模态输出必须遵循品牌规范:

python
# enterprise/multimodal/brand_controller.py

@dataclass
class BrandGuidelines:
    """品牌规范"""
    
    primary_color: str = "#1E3A5F"
    secondary_color: str = "#4A90E2"
    font_family: str = "PingFang SC, Microsoft YaHei"
    logo_url: str = "https://cdn.company.com/logo.png"
    watermark: str = "CONFIDENTIAL"
    
    # PPT 模板
    ppt_template: str = "templates/enterprise_ppt.pptx"
    
    # 视频片头片尾
    video_intro: str = "assets/intro.mp4"
    video_outro: str = "assets/outro.mp4"

class BrandController:
    """品牌控制器 — 确保所有输出符合企业规范"""
    
    def __init__(self, guidelines: BrandGuidelines):
        self.guidelines = guidelines
    
    def apply_to_ppt(self, ppt_path: str) -> str:
        """应用品牌规范到 PPT"""
        from pptx import Presentation
        
        prs = Presentation(ppt_path)
        
        # 应用主题色
        for slide in prs.slides:
            for shape in slide.shapes:
                if shape.has_text_frame:
                    for paragraph in shape.text_frame.paragraphs:
                        paragraph.font.color.rgb = RgbColor.from_string(
                            self.guidelines.primary_color
                        )
        
        # 添加 Logo
        for slide in prs.slides:
            left = Inches(9.5)
            top = Inches(0.2)
            slide.shapes.add_picture(
                self.guidelines.logo_url,
                left, top,
                width=Inches(0.8)
            )
        
        # 保存
        output_path = ppt_path.replace(".pptx", "_branded.pptx")
        prs.save(output_path)
        return output_path
    
    def apply_to_video(self, video_path: str) -> str:
        """应用品牌规范到视频"""
        from moviepy.editor import (
            VideoFileClip, concatenate_videoclips
        )
        
        # 添加片头片尾
        intro = VideoFileClip(self.guidelines.video_intro)
        main = VideoFileClip(video_path)
        outro = VideoFileClip(self.guidelines.video_outro)
        
        final = concatenate_videoclips([intro, main, outro])
        
        output_path = video_path.replace(".mp4", "_branded.mp4")
        final.write_videofile(output_path)
        return output_path
```python

### 14.9.4 审批工作流

多模态内容发布前需要审批:

```python
# enterprise/multimodal/approval_workflow.py

class ContentApprovalWorkflow:
    """内容审批工作流"""
    
    def __init__(self):
        self.stages = [
            "auto_review",      # 自动合规检查
            "department_lead",  # 部门负责人
            "brand_team",       # 品牌团队
            "legal_review",     # 法务审核(敏感内容)
            "final_publish"     # 发布
        ]
    
    async def submit_for_approval(
        self,
        content: MultimodalOutput,
        submitter: User,
        department: str
    ) -> str:
        """提交内容审批"""
        
        # 1. 自动检查
        auto_result = await self._auto_review(content)
        if not auto_result.passed:
            return f"REJECTED: {auto_result.reason}"
        
        # 2. 创建审批工单
        ticket = ApprovalTicket(
            content_id=content.file_path,
            submitter=submitter.id,
            department=department,
            stages=self.stages,
            current_stage="department_lead"
        )
        
        # 3. 通知部门负责人
        await self._notify_approver(
            ticket,
            role="department_lead",
            department=department
        )
        
        return ticket.id
    
    async def _auto_review(self, content: MultimodalOutput) -> ReviewResult:
        """自动合规审查"""
        
        checks = []
        
        # 检查敏感词
        if content.type in ["ppt", "video", "podcast"]:
            text_content = await self._extract_text(content)
            sensitive_words = self._check_sensitive_words(text_content)
            checks.append(len(sensitive_words) == 0)
        
        # 检查品牌元素
        if content.type == "ppt":
            has_brand_elements = self._check_brand_elements(content.file_path)
            checks.append(has_brand_elements)
        
        # 检查文件大小
        checks.append(content.size_bytes < 100 * 1024 * 1024)  # 100MB
        
        return ReviewResult(
            passed=all(checks),
            reason="Auto review passed" if all(checks) else "Failed auto checks"
        )

14.9.5 与现有系统的集成

python
# enterprise/multimodal/integrations.py

class EnterpriseIntegrationHub:
    """企业集成中心"""
    
    async def sync_to_feishu(
        self,
        content: MultimodalOutput,
        chat_id: str
    ):
        """同步内容到飞书群"""
        # 使用飞书机器人 API 发送文件
        pass
    
    async def sync_to_wecom(
        self,
        content: MultimodalOutput,
        room_id: str
    ):
        """同步内容到企业微信群"""
        # 使用企业微信 API 发送文件
        pass
    
    async def publish_to_knowledge_base(
        self,
        content: MultimodalOutput,
        category: str
    ):
        """发布到企业知识库"""
        # 上传到内部 Wiki/Confluence/飞书文档
        pass
    
    async def send_email_digest(
        self,
        contents: list[MultimodalOutput],
        recipients: list[str]
    ):
        """发送邮件摘要"""
        # 生成邮件,附带内容链接
        pass
```python

### 14.9.6 成本与配额管理

企业级多模态生成涉及 API 调用成本,需要配额管理:

```python
# enterprise/multimodal/quota_manager.py

@dataclass
class ModelCost:
    """模型成本配置"""
    
    model: str
    input_price_per_1k: float   # 美元
    output_price_per_1k: float  # 美元
    image_price_per_image: float
    audio_price_per_minute: float

class QuotaManager:
    """配额管理器"""
    
    COST_TABLE = {
        "dall-e-3": ModelCost("dall-e-3", 0, 0, 0.04, 0),
        "gpt-4o": ModelCost("gpt-4o", 0.005, 0.015, 0, 0),
        "tts-1": ModelCost("tts-1", 0, 0, 0, 0.015),
    }
    
    def __init__(self, department_budgets: dict):
        self.budgets = department_budgets
        self.usage = defaultdict(lambda: defaultdict(float))
    
    def check_quota(
        self,
        department: str,
        model: str,
        estimated_cost: float
    ) -> bool:
        """检查配额是否充足"""
        
        budget = self.budgets.get(department, 1000.0)  # 默认 $1000
        used = sum(self.usage[department].values())
        
        return (used + estimated_cost) <= budget
    
    def record_usage(
        self,
        department: str,
        model: str,
        tokens_in: int = 0,
        tokens_out: int = 0,
        images: int = 0,
        audio_minutes: float = 0
    ):
        """记录使用量"""
        
        cost_config = self.COST_TABLE.get(model)
        if not cost_config:
            return
        
        cost = (
            tokens_in / 1000 * cost_config.input_price_per_1k +
            tokens_out / 1000 * cost_config.output_price_per_1k +
            images * cost_config.image_price_per_image +
            audio_minutes * cost_config.audio_price_per_minute
        )
        
        self.usage[department][model] += cost

14.10 部署架构

🏢 企业级建议:K8s 部署的 replicas 数量应基于实际负载设置,而不是固定值。建议配置 HPA(Horizontal Pod Autoscaler)根据 CPU/内存/QPS 自动伸缩。

14.10.1 Kubernetes 部署

yaml
# k8s/deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: enterprise-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: enterprise-gateway
  template:
    spec:
      containers:
        - name: gateway
          image: enterprise/gateway:latest
          ports:
            - containerPort: 8001
          env:
            - name: TENANT_DB_URL
              valueFrom:
                secretKeyRef:
                  name: enterprise-secrets
                  key: tenant-db-url
            - name: AUDIT_STORAGE_KEY
              valueFrom:
                secretKeyRef:
                  name: enterprise-secrets
                  key: audit-signing-key
          resources:
            requests:
              cpu: "500m"
              memory: "1Gi"
            limits:
              cpu: "2000m"
              memory: "4Gi"
yaml
# k8s/langgraph-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: enterprise-langgraph
spec:
  replicas: 5
  selector:
    matchLabels:
      app: enterprise-langgraph
  template:
    spec:
      containers:
        - name: langgraph
          image: enterprise/langgraph:latest
          ports:
            - containerPort: 2024
          env:
            - name: SANDBOX_MODE
              value: "provisioner"
            - name: PROVISIONER_URL
              value: "http://enterprise-provisioner:8002"

14.10.2 配置管理

yaml
# config/enterprise-config.yaml

enterprise:
  # 多租户配置
  multi_tenant:
    enabled: true
    isolation_mode: "strict"  # strict | relaxed
    
  # RBAC 配置
  rbac:
    default_role: member
    role_hierarchy:
      - tenant_admin
      - project_manager
      - member
      - viewer
      - external
  
  # 审计配置
  audit:
    enabled: true
    storage:
      type: s3
      bucket: enterprise-audit-logs
    retention_days: 2555  # 7年,合规要求
  
  # 知识库配置
  knowledge:
    providers:
      - type: vector
        endpoint: http://knowledge-search:8080
      - type: graph
        endpoint: http://knowledge-graph:8080
    default_top_k: 10
  
  # Sandbox 配置
  sandbox:
    mode: provisioner
    provisioner:
      url: http://enterprise-provisioner:8002
      kubeconfig_path: /etc/kube/config
    defaults:
      cpu_limit: "2"
      memory_limit: "4Gi"
      timeout_seconds: 3600

14.11 小结

本章展示了基于 DeerFlow 构建企业级应用的完整开发路径:

模块实现方式
多租户隔离ThreadLocal + 中间件 + 数据层隔离
RBAC角色-权限映射 + 装饰器检查
审计日志事件驱动 + 防篡改签名
企业知识库抽象接口 + 检索中间件
Human-in-loop规则引擎 + 飞书卡片通知
长程任务状态机 + LLM 驱动的任务分解

核心原则:

  1. 复用 DeerFlow — Sub-Agent、Skills、Sandbox 能力直接复用
  2. 中间件扩展 — 通过中间件链注入企业逻辑
  3. 独立服务 — 租户、RBAC、审计作为独立模块
  4. 配置驱动 — 通过 config.yaml 管理企业参数

这样既能享受 DeerFlow 快速迭代的红利,又能满足企业级安全与合规要求。

本章小结

本章通过企业级案例展示了 DeerFlow 的二次开发与部署实践:

  1. 多租户架构:基于 Gateway API 实现租户隔离,通过 RBAC 控制用户权限,确保数据与资源的安全边界。
  2. 企业知识库:集成向量数据库(如 Milvus/Pinecone)与私有文档,实现项目级记忆的持久化与召回。
  3. 审计与合规:全链路审计日志记录 Agent 行为、人工干预、沙箱执行,支持合规报告与事后追溯。
  4. Kubernetes 部署:通过 Deployment + Service + ConfigMap 实现高可用部署,HPA 自动伸缩应对流量波动。

🏢 企业级建议:在生产环境上线前,务必完成渗透测试(尤其关注沙箱逃逸风险)与性能基准测试(并发 Agent 数、P95 延迟)。


下一步:参考附录 A(配置参考)与附录 B(贡献指南),开始你的 DeerFlow 二次开发之旅。