Files
dababase-etl-python/sample_data.py
2026-03-04 12:17:52 +08:00

336 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from database_manager import DatabaseManager
from config import DatabaseConfig
import logging
logger = logging.getLogger(__name__)
class SampleDataInitializer:
"""示例数据初始化器"""
def __init__(self):
self.db_manager = DatabaseManager()
def init_mysql_sample_data(self, config: dict = None):
"""初始化MySQL示例数据"""
try:
# 获取配置
if config is None:
config = DatabaseConfig.get_config("mysql")
# 创建连接
conn_id = self.db_manager.create_connection(
db_type="mysql",
host=config["host"],
port=config["port"],
username=config["username"],
password=config["password"],
database=config["database"]
)
# 创建示例表
create_users_table = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '用户ID',
name VARCHAR(100) NOT NULL COMMENT '用户姓名',
email VARCHAR(150) UNIQUE NOT NULL COMMENT '邮箱地址',
age INT COMMENT '年龄',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'
) COMMENT='用户信息表';
"""
create_products_table = """
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '产品ID',
name VARCHAR(200) NOT NULL COMMENT '产品名称',
price DECIMAL(10,2) NOT NULL COMMENT '价格',
category VARCHAR(100) COMMENT '分类',
description TEXT COMMENT '产品描述',
stock_quantity INT DEFAULT 0 COMMENT '库存数量',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) COMMENT='产品信息表';
"""
# 执行建表语句
self.db_manager.execute_non_query(conn_id, create_users_table)
self.db_manager.execute_non_query(conn_id, create_products_table)
# 插入示例数据
insert_users = """
INSERT IGNORE INTO users (name, email, age) VALUES
('张三', 'zhangsan@example.com', 25),
('李四', 'lisi@example.com', 30),
('王五', 'wangwu@example.com', 28),
('赵六', 'zhaoliu@example.com', 35),
('钱七', 'qianqi@example.com', 22);
"""
insert_products = """
INSERT IGNORE INTO products (name, price, category, description, stock_quantity) VALUES
('苹果手机', 5999.00, '电子产品', '最新款智能手机', 50),
('笔记本电脑', 8999.00, '电子产品', '高性能办公笔记本', 30),
('无线耳机', 299.00, '电子产品', '蓝牙无线耳机', 100),
('咖啡杯', 39.90, '生活用品', '陶瓷咖啡杯', 200),
('书包', 129.00, '生活用品', '学生书包', 80);
"""
self.db_manager.execute_non_query(conn_id, insert_users)
self.db_manager.execute_non_query(conn_id, insert_products)
logger.info(f"MySQL示例数据初始化成功: {conn_id}")
return conn_id
except Exception as e:
logger.error(f"MySQL示例数据初始化失败: {str(e)}")
return None
def init_oracle_sample_data(self, config: dict = None):
"""初始化Oracle示例数据"""
try:
# 获取配置
if config is None:
config = DatabaseConfig.get_config("oracle")
logger.info(f"开始初始化Oracle示例数据配置: host={config['host']}, port={config['port']}, service_name={config['service_name']}")
# 创建连接,传递额外参数
conn_id = self.db_manager.create_connection(
db_type="oracle",
host=config["host"],
port=config["port"],
username=config["username"],
password=config["password"],
database=config["service_name"],
# 添加Oracle特定参数
mode=config.get("mode"),
threaded=config.get("threaded", True)
)
logger.info(f"Oracle连接创建成功连接ID: {conn_id}")
# 创建示例表
create_employees_table = """
CREATE TABLE employees (
employee_id NUMBER PRIMARY KEY,
first_name VARCHAR2(50) NOT NULL,
last_name VARCHAR2(50) NOT NULL,
email VARCHAR2(100) UNIQUE NOT NULL,
phone_number VARCHAR2(20),
hire_date DATE DEFAULT SYSDATE,
job_id VARCHAR2(10),
salary NUMBER(8,2),
department_id NUMBER
)
"""
create_departments_table = """
CREATE TABLE departments (
department_id NUMBER PRIMARY KEY,
department_name VARCHAR2(100) NOT NULL,
manager_id NUMBER,
location_id NUMBER
)
"""
# 创建序列
create_emp_seq = "CREATE SEQUENCE emp_seq START WITH 1 INCREMENT BY 1"
create_dept_seq = "CREATE SEQUENCE dept_seq START WITH 1 INCREMENT BY 1"
try:
self.db_manager.execute_non_query(conn_id, "DROP TABLE employees")
self.db_manager.execute_non_query(conn_id, "DROP TABLE departments")
self.db_manager.execute_non_query(conn_id, "DROP SEQUENCE emp_seq")
self.db_manager.execute_non_query(conn_id, "DROP SEQUENCE dept_seq")
except:
pass # 忽略删除错误
# 执行建表和序列语句
self.db_manager.execute_non_query(conn_id, create_departments_table)
self.db_manager.execute_non_query(conn_id, create_employees_table)
self.db_manager.execute_non_query(conn_id, create_dept_seq)
self.db_manager.execute_non_query(conn_id, create_emp_seq)
# 插入示例数据
insert_departments = """
INSERT INTO departments (department_id, department_name, manager_id, location_id) VALUES
(dept_seq.NEXTVAL, '人力资源部', NULL, 1700)
"""
insert_departments2 = """
INSERT INTO departments (department_id, department_name, manager_id, location_id) VALUES
(dept_seq.NEXTVAL, '技术部', NULL, 1800)
"""
insert_departments3 = """
INSERT INTO departments (department_id, department_name, manager_id, location_id) VALUES
(dept_seq.NEXTVAL, '销售部', NULL, 1900)
"""
self.db_manager.execute_non_query(conn_id, insert_departments)
self.db_manager.execute_non_query(conn_id, insert_departments2)
self.db_manager.execute_non_query(conn_id, insert_departments3)
insert_employees = """
INSERT INTO employees (employee_id, first_name, last_name, email, phone_number, job_id, salary, department_id) VALUES
(emp_seq.NEXTVAL, '', '', 'zhang.san@company.com', '13800138001', 'IT_PROG', 8000, 2)
"""
insert_employees2 = """
INSERT INTO employees (employee_id, first_name, last_name, email, phone_number, job_id, salary, department_id) VALUES
(emp_seq.NEXTVAL, '', '', 'li.si@company.com', '13800138002', 'SA_REP', 6000, 3)
"""
insert_employees3 = """
INSERT INTO employees (employee_id, first_name, last_name, email, phone_number, job_id, salary, department_id) VALUES
(emp_seq.NEXTVAL, '', '', 'wang.wu@company.com', '13800138003', 'HR_REP', 5500, 1)
"""
self.db_manager.execute_non_query(conn_id, insert_employees)
self.db_manager.execute_non_query(conn_id, insert_employees2)
self.db_manager.execute_non_query(conn_id, insert_employees3)
# 添加表注释
self.db_manager.execute_non_query(conn_id, "COMMENT ON TABLE employees IS '员工信息表'")
self.db_manager.execute_non_query(conn_id, "COMMENT ON TABLE departments IS '部门信息表'")
# 添加列注释
self.db_manager.execute_non_query(conn_id, "COMMENT ON COLUMN employees.employee_id IS '员工ID'")
self.db_manager.execute_non_query(conn_id, "COMMENT ON COLUMN employees.first_name IS ''")
self.db_manager.execute_non_query(conn_id, "COMMENT ON COLUMN employees.last_name IS ''")
self.db_manager.execute_non_query(conn_id, "COMMENT ON COLUMN employees.email IS '邮箱地址'")
self.db_manager.execute_non_query(conn_id, "COMMENT ON COLUMN employees.salary IS '薪资'")
logger.info(f"Oracle示例数据初始化成功: {conn_id}")
return conn_id
except Exception as e:
logger.error(f"Oracle示例数据初始化失败: {str(e)}")
return None
def init_sqlserver_sample_data(self, config: dict = None):
"""初始化SQL Server示例数据"""
try:
# 获取配置
if config is None:
config = DatabaseConfig.get_config("sqlserver")
logger.info(f"开始初始化SQL Server示例数据配置: host={config['host']}, port={config['port']}, database={config['database']}")
# 创建连接
conn_id = self.db_manager.create_connection(
db_type="sqlserver",
host=config["host"],
port=config["port"],
username=config["username"],
password=config["password"],
database=config["database"]
)
logger.info(f"SQL Server连接创建成功连接ID: {conn_id}")
# 创建示例表
create_customers_table = """
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='customers' AND xtype='U')
CREATE TABLE customers (
customer_id INT IDENTITY(1,1) PRIMARY KEY,
company_name NVARCHAR(100) NOT NULL,
contact_name NVARCHAR(50),
contact_title NVARCHAR(30),
address NVARCHAR(100),
city NVARCHAR(50),
region NVARCHAR(50),
postal_code NVARCHAR(20),
country NVARCHAR(50),
phone NVARCHAR(30),
email NVARCHAR(100),
created_date DATETIME DEFAULT GETDATE()
)
"""
create_orders_table = """
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='orders' AND xtype='U')
CREATE TABLE orders (
order_id INT IDENTITY(1,1) PRIMARY KEY,
customer_id INT,
order_date DATETIME DEFAULT GETDATE(),
required_date DATETIME,
shipped_date DATETIME,
ship_via INT,
freight DECIMAL(10,2),
ship_name NVARCHAR(100),
ship_address NVARCHAR(100),
ship_city NVARCHAR(50),
ship_region NVARCHAR(50),
ship_postal_code NVARCHAR(20),
ship_country NVARCHAR(50),
FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
)
"""
# 执行建表语句
self.db_manager.execute_non_query(conn_id, create_customers_table)
self.db_manager.execute_non_query(conn_id, create_orders_table)
# 插入示例数据 - 客户表
customers_data = [
"INSERT INTO customers (company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, email) VALUES ('北京科技有限公司', '张三', '总经理', '北京市朝阳区建国路1号', '北京', '华北', '100001', '中国', '010-12345678', 'zhangsan@bjtech.com')",
"INSERT INTO customers (company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, email) VALUES ('上海贸易公司', '李四', '销售经理', '上海市浦东新区陆家嘴路100号', '上海', '华东', '200001', '中国', '021-87654321', 'lisi@shtrade.com')",
"INSERT INTO customers (company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, email) VALUES ('广州制造企业', '王五', '采购主管', '广州市天河区珠江路200号', '广州', '华南', '510001', '中国', '020-11223344', 'wangwu@gzmfg.com')",
"INSERT INTO customers (company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, email) VALUES ('深圳创新公司', '赵六', '技术总监', '深圳市南山区科技园300号', '深圳', '华南', '518001', '中国', '0755-99887766', 'zhaoliu@szinno.com')",
"INSERT INTO customers (company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, email) VALUES ('成都服务公司', '钱七', '客户经理', '成都市锦江区春熙路400号', '成都', '西南', '610001', '中国', '028-55443322', 'qianqi@cdservice.com')"
]
for sql in customers_data:
self.db_manager.execute_non_query(conn_id, sql)
# 插入示例数据 - 订单表
orders_data = [
"INSERT INTO orders (customer_id, required_date, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) VALUES (1, DATEADD(day, 7, GETDATE()), 25.50, '北京科技有限公司', '北京市朝阳区建国路1号', '北京', '华北', '100001', '中国')",
"INSERT INTO orders (customer_id, required_date, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) VALUES (2, DATEADD(day, 10, GETDATE()), 35.75, '上海贸易公司', '上海市浦东新区陆家嘴路100号', '上海', '华东', '200001', '中国')",
"INSERT INTO orders (customer_id, required_date, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) VALUES (3, DATEADD(day, 5, GETDATE()), 18.25, '广州制造企业', '广州市天河区珠江路200号', '广州', '华南', '510001', '中国')",
"INSERT INTO orders (customer_id, required_date, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) VALUES (4, DATEADD(day, 14, GETDATE()), 42.00, '深圳创新公司', '深圳市南山区科技园300号', '深圳', '华南', '518001', '中国')",
"INSERT INTO orders (customer_id, required_date, freight, ship_name, ship_address, ship_city, ship_region, ship_postal_code, ship_country) VALUES (5, DATEADD(day, 12, GETDATE()), 28.90, '成都服务公司', '成都市锦江区春熙路400号', '成都', '西南', '610001', '中国')"
]
for sql in orders_data:
self.db_manager.execute_non_query(conn_id, sql)
logger.info(f"SQL Server示例数据初始化成功: {conn_id}")
return conn_id
except Exception as e:
logger.error(f"SQL Server示例数据初始化失败: {str(e)}")
return None
def initialize_all_sample_data(self):
"""初始化所有示例数据"""
# 检查是否启用示例数据
if not DatabaseConfig.is_sample_data_enabled():
logger.info("示例数据初始化已禁用")
return {"mysql": None, "oracle": None, "sqlserver": None}
logger.info("开始初始化示例数据...")
# 初始化MySQL示例数据
mysql_conn = self.init_mysql_sample_data()
if mysql_conn:
logger.info("MySQL示例数据初始化完成")
else:
logger.warning("MySQL示例数据初始化失败请检查数据库连接配置")
# 初始化Oracle示例数据
oracle_conn = self.init_oracle_sample_data()
if oracle_conn:
logger.info("Oracle示例数据初始化完成")
else:
logger.warning("Oracle示例数据初始化失败请检查数据库连接配置")
# 初始化SQL Server示例数据
sqlserver_conn = self.init_sqlserver_sample_data()
if sqlserver_conn:
logger.info("SQL Server示例数据初始化完成")
else:
logger.warning("SQL Server示例数据初始化失败请检查数据库连接配置")
logger.info("示例数据初始化流程完成")
return {"mysql": mysql_conn, "oracle": oracle_conn, "sqlserver": sqlserver_conn}