Files
dababase-etl-python/models.py
2026-03-04 12:17:52 +08:00

120 lines
5.2 KiB
Python

from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
from enum import Enum
class DatabaseType(str, Enum):
"""支持的数据库类型"""
MYSQL = "mysql"
ORACLE = "oracle"
SQLSERVER = "sqlserver"
POSTGRESQL = "postgresql"
class DatabaseConnection(BaseModel):
"""数据库连接配置"""
db_type: DatabaseType = Field(..., description="数据库类型")
host: str = Field(..., description="数据库主机地址")
port: int = Field(..., description="数据库端口")
username: str = Field(..., description="用户名")
password: str = Field(..., description="密码")
database: Optional[str] = Field(None, description="数据库名称")
# Oracle特定参数
mode: Optional[str] = Field(None, description="Oracle连接模式")
threaded: Optional[bool] = Field(None, description="Oracle是否启用线程模式")
# 其他连接参数
extra_params: Optional[Dict[str, Any]] = Field(None, description="额外的连接参数")
class QueryRequest(BaseModel):
"""查询请求"""
connection_id: str = Field(..., description="连接ID")
sql: str = Field(..., description="SQL语句")
params: Optional[Dict[str, Any]] = Field(None, description="SQL参数")
class ExecuteRequest(BaseModel):
"""执行请求"""
connection_id: str = Field(..., description="连接ID")
sql: str = Field(..., description="SQL语句")
params: Optional[Dict[str, Any]] = Field(None, description="SQL参数")
class TableDataRequest(BaseModel):
"""表数据请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
page: int = Field(1, description="页码")
page_size: int = Field(10, description="每页大小")
where_clause: Optional[str] = Field(None, description="WHERE条件")
order_by: Optional[str] = Field(None, description="排序字段")
class InsertDataRequest(BaseModel):
"""插入数据请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
data: Dict[str, Any] = Field(..., description="要插入的数据")
class UpdateDataRequest(BaseModel):
"""更新数据请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
data: Dict[str, Any] = Field(..., description="要更新的数据")
where_clause: str = Field(..., description="WHERE条件")
class DeleteDataRequest(BaseModel):
"""删除数据请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
where_clause: str = Field(..., description="WHERE条件")
class CreateTableRequest(BaseModel):
"""创建表请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
columns: List[Dict[str, Any]] = Field(..., description="列定义")
class AlterTableRequest(BaseModel):
"""修改表请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
operation: str = Field(..., description="操作类型: ADD, DROP, MODIFY")
column_definition: Optional[Dict[str, Any]] = Field(None, description="列定义")
class CommentRequest(BaseModel):
"""备注请求"""
connection_id: str = Field(..., description="连接ID")
table_name: str = Field(..., description="表名")
column_name: Optional[str] = Field(None, description="列名(为空则修改表备注)")
comment: str = Field(..., description="备注内容")
class ApiResponse(BaseModel):
"""API响应格式"""
success: bool = Field(..., description="是否成功")
message: str = Field(..., description="响应消息")
data: Optional[Any] = Field(None, description="响应数据")
error: Optional[str] = Field(None, description="错误信息")
class ConnectionResponse(BaseModel):
"""连接响应"""
connection_id: str = Field(..., description="连接ID")
db_type: str = Field(..., description="数据库类型")
host: str = Field(..., description="主机地址")
port: int = Field(..., description="端口")
database: Optional[str] = Field(None, description="数据库名称")
class DatabaseInfo(BaseModel):
"""数据库信息"""
database_name: str = Field(..., description="数据库名称")
tables: List[str] = Field(..., description="表列表")
table_count: int = Field(..., description="表数量")
class TableInfo(BaseModel):
"""表信息"""
table_name: str = Field(..., description="表名")
columns: List[Dict[str, Any]] = Field(..., description="列信息")
primary_keys: Dict[str, Any] = Field(..., description="主键信息")
foreign_keys: List[Dict[str, Any]] = Field(..., description="外键信息")
indexes: List[Dict[str, Any]] = Field(..., description="索引信息")
class QueryResult(BaseModel):
"""查询结果"""
data: List[Dict[str, Any]] = Field(..., description="查询数据")
total: int = Field(..., description="总记录数")
page: int = Field(..., description="当前页码")
page_size: int = Field(..., description="每页大小")