Files
2026-03-04 12:17:52 +08:00

247 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, HTTPException, status
import logging
from pydantic import BaseModel
from database_manager import db_manager
from schemas import (
CreateTableRequest, AlterTableRequest, CommentRequest, ApiResponse
)
logger = logging.getLogger(__name__)
# 创建路由器
router = APIRouter()
# 表结构管理接口
@router.post("/tables/create", response_model=ApiResponse, summary="创建表")
async def create_table(request: CreateTableRequest):
"""创建表"""
try:
# 构建创建表的SQL
column_definitions = []
for col in request.columns:
col_def = f"{col['name']} {col['type']}"
if col.get('not_null', False):
col_def += " NOT NULL"
if col.get('primary_key', False):
col_def += " PRIMARY KEY"
if col.get('default'):
col_def += f" DEFAULT {col['default']}"
if col.get('comment'):
col_def += f" COMMENT '{col['comment']}'"
column_definitions.append(col_def)
sql = f"CREATE TABLE {request.table_name} ({', '.join(column_definitions)})"
affected_rows = db_manager.execute_non_query(
connection_id=request.connection_id,
sql=sql
)
return ApiResponse(
success=True,
message="表创建成功",
data={"table_name": request.table_name, "affected_rows": affected_rows}
)
except Exception as e:
logger.error(f"创建表失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"创建表失败: {str(e)}"
)
class DropTableRequest(BaseModel):
"""删除表请求体模型
Attributes:
connection_id: 连接ID
table_name: 需要删除的表名
"""
connection_id: str
table_name: str
@router.post("/tables/delete", response_model=ApiResponse, summary="删除表POSTJSON传参")
async def drop_table(request: DropTableRequest):
"""删除表POST使用JSON Body传参
请求示例:
{"connection_id": "mysql_localhost_3306_test_db", "table_name": "users"}
"""
try:
sql = f"DROP TABLE {request.table_name}"
affected_rows = db_manager.execute_non_query(
connection_id=request.connection_id,
sql=sql
)
return ApiResponse(
success=True,
message="表删除成功",
data={"table_name": request.table_name, "affected_rows": affected_rows}
)
except Exception as e:
logger.error(f"删除表失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"删除表失败: {str(e)}"
)
@router.post("/tables/alter", response_model=ApiResponse, summary="修改表结构改为POST")
async def alter_table(request: AlterTableRequest):
"""修改表结构HTTP方法改为POST"""
try:
sql = ""
if request.operation.upper() == "ADD":
# 添加列
col_def = request.column_definition
column_sql = f"{col_def['name']} {col_def['type']}"
if col_def.get('not_null', False):
column_sql += " NOT NULL"
if col_def.get('default'):
column_sql += f" DEFAULT {col_def['default']}"
sql = f"ALTER TABLE {request.table_name} ADD COLUMN {column_sql}"
elif request.operation.upper() == "DROP":
# 删除列
column_name = request.column_definition['name']
sql = f"ALTER TABLE {request.table_name} DROP COLUMN {column_name}"
elif request.operation.upper() == "MODIFY":
# 修改列
col_def = request.column_definition
column_sql = f"{col_def['name']} {col_def['type']}"
if col_def.get('not_null', False):
column_sql += " NOT NULL"
sql = f"ALTER TABLE {request.table_name} MODIFY COLUMN {column_sql}"
else:
raise ValueError(f"不支持的操作类型: {request.operation}")
affected_rows = db_manager.execute_non_query(
connection_id=request.connection_id,
sql=sql
)
return ApiResponse(
success=True,
message="表结构修改成功",
data={"table_name": request.table_name, "operation": request.operation, "affected_rows": affected_rows}
)
except Exception as e:
logger.error(f"修改表结构失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"修改表结构失败: {str(e)}"
)
# 备注管理接口
@router.post("/tables/comment", response_model=ApiResponse, summary="修改表或字段备注改为POST")
async def update_comment(request: CommentRequest):
"""修改表或字段备注HTTP方法改为POST"""
try:
if request.column_name:
# 修改字段备注
sql = f"ALTER TABLE {request.table_name} MODIFY COLUMN {request.column_name} COMMENT '{request.comment}'"
else:
# 修改表备注
sql = f"ALTER TABLE {request.table_name} COMMENT '{request.comment}'"
affected_rows = db_manager.execute_non_query(
connection_id=request.connection_id,
sql=sql
)
return ApiResponse(
success=True,
message="备注修改成功",
data={
"table_name": request.table_name,
"column_name": request.column_name,
"comment": request.comment,
"affected_rows": affected_rows
}
)
except Exception as e:
logger.error(f"修改备注失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"修改备注失败: {str(e)}"
)
@router.get("/tables/columns", response_model=ApiResponse, summary="获取表的所有字段信息使用query参数")
async def get_table_columns(connection_id: str, table_name: str):
"""获取表的所有字段信息通过URL query参数
Params:
- connection_id: 通过URL query传入例如 `?connection_id=xxx`
- table_name: 通过URL query传入例如 `?table_name=users`
"""
try:
columns = db_manager.get_table_columns(connection_id, table_name)
return ApiResponse(
success=True,
message="获取字段信息成功",
data={
"table_name": table_name,
"columns": columns
}
)
except Exception as e:
logger.error(f"获取字段信息失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"获取字段信息失败: {str(e)}"
)
@router.get("/tables", response_model=ApiResponse, summary="获取数据库中的所有表使用query参数")
async def get_all_tables(connection_id: str):
"""获取数据库中的所有表通过URL query参数
Params:
- connection_id: 通过URL query传入例如 `?connection_id=xxx`
"""
try:
db_info = db_manager.get_database_info(connection_id)
return ApiResponse(
success=True,
message="获取表列表成功",
data={
"database_name": db_info["database_name"],
"tables": db_info["tables"],
"table_count": db_info["table_count"]
}
)
except Exception as e:
logger.error(f"获取表列表失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"获取表列表失败: {str(e)}"
)
@router.get("/tables/details", response_model=ApiResponse, summary="获取所有表及其备注信息使用query参数")
async def get_tables_with_comments(connection_id: str):
"""获取所有表及其备注信息通过URL query参数
Params:
- connection_id: 通过URL query传入例如 `?connection_id=xxx`
"""
try:
tables = db_manager.get_tables_with_comments(connection_id)
return ApiResponse(
success=True,
message="获取表及备注信息成功",
data={
"tables": tables,
"table_count": len(tables)
}
)
except Exception as e:
logger.error(f"获取表备注信息失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"获取表备注信息失败: {str(e)}"
)