Initial commit

This commit is contained in:
2025-10-14 14:17:21 +08:00
commit ac715a8b88
35011 changed files with 3834178 additions and 0 deletions

View File

@@ -0,0 +1,42 @@
import datetime
import time
import click
from sqlalchemy import text
from werkzeug.exceptions import NotFound
import app
from configs import dify_config
from extensions.ext_database import db
from models.dataset import Embedding
@app.celery.task(queue="dataset")
def clean_embedding_cache_task():
click.echo(click.style("Start clean embedding cache.", fg="green"))
clean_days = int(dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING)
start_at = time.perf_counter()
thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
while True:
try:
embedding_ids = (
db.session.query(Embedding.id)
.filter(Embedding.created_at < thirty_days_ago)
.order_by(Embedding.created_at.desc())
.limit(100)
.all()
)
embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
except NotFound:
break
if embedding_ids:
for embedding_id in embedding_ids:
db.session.execute(
text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
)
db.session.commit()
else:
break
end_at = time.perf_counter()
click.echo(click.style("Cleaned embedding cache from db success latency: {}".format(end_at - start_at), fg="green"))

View File

@@ -0,0 +1,91 @@
import datetime
import logging
import time
import click
from werkzeug.exceptions import NotFound
import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import (
App,
Message,
MessageAgentThought,
MessageAnnotation,
MessageChain,
MessageFeedback,
MessageFile,
)
from models.web import SavedMessage
from services.feature_service import FeatureService
_logger = logging.getLogger(__name__)
@app.celery.task(queue="dataset")
def clean_messages():
click.echo(click.style("Start clean messages.", fg="green"))
start_at = time.perf_counter()
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
)
while True:
try:
# Main query with join and filter
# FIXME:for mypy no paginate method error
messages = (
db.session.query(Message) # type: ignore
.filter(Message.created_at < plan_sandbox_clean_message_day)
.order_by(Message.created_at.desc())
.limit(100)
.all()
)
except NotFound:
break
if not messages:
break
for message in messages:
plan_sandbox_clean_message_day = message.created_at
app = db.session.query(App).filter_by(id=message.app_id).first()
if not app:
_logger.warning(
"Expected App record to exist, but none was found, app_id=%s, message_id=%s",
message.app_id,
message.id,
)
continue
features_cache_key = f"features:{app.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(app.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
# clean related message
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
synchronize_session=False
)
db.session.query(Message).filter(Message.id == message.id).delete()
db.session.commit()
end_at = time.perf_counter()
click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))

View File

@@ -0,0 +1,191 @@
import datetime
import time
import click
from sqlalchemy import func, select
from werkzeug.exceptions import NotFound
import app
from configs import dify_config
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, DatasetAutoDisableLog, DatasetQuery, Document
from services.feature_service import FeatureService
@app.celery.task(queue="dataset")
def clean_unused_datasets_task():
click.echo(click.style("Start clean unused datasets indexes.", fg="green"))
plan_sandbox_clean_day_setting = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING
plan_pro_clean_day_setting = dify_config.PLAN_PRO_CLEAN_DAY_SETTING
start_at = time.perf_counter()
plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting)
plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting)
while True:
try:
# Subquery for counting new documents
document_subquery_new = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at > plan_sandbox_clean_day,
)
.group_by(Document.dataset_id)
.subquery()
)
# Subquery for counting old documents
document_subquery_old = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at < plan_sandbox_clean_day,
)
.group_by(Document.dataset_id)
.subquery()
)
# Main query with join and filter
stmt = (
select(Dataset)
.outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
.outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
.filter(
Dataset.created_at < plan_sandbox_clean_day,
func.coalesce(document_subquery_new.c.document_count, 0) == 0,
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
)
.order_by(Dataset.created_at.desc())
)
datasets = db.paginate(stmt, page=1, per_page=50)
except NotFound:
break
if datasets.items is None or len(datasets.items) == 0:
break
for dataset in datasets:
dataset_query = (
db.session.query(DatasetQuery)
.filter(DatasetQuery.created_at > plan_sandbox_clean_day, DatasetQuery.dataset_id == dataset.id)
.all()
)
if not dataset_query or len(dataset_query) == 0:
try:
# add auto disable log
documents = (
db.session.query(Document)
.filter(
Document.dataset_id == dataset.id,
Document.enabled == True,
Document.archived == False,
)
.all()
)
for document in documents:
dataset_auto_disable_log = DatasetAutoDisableLog(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
document_id=document.id,
)
db.session.add(dataset_auto_disable_log)
# remove index
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
index_processor.clean(dataset, None)
# update document
update_params = {Document.enabled: False}
db.session.query(Document).filter_by(dataset_id=dataset.id).update(update_params)
db.session.commit()
click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green"))
except Exception as e:
click.echo(
click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
)
while True:
try:
# Subquery for counting new documents
document_subquery_new = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at > plan_pro_clean_day,
)
.group_by(Document.dataset_id)
.subquery()
)
# Subquery for counting old documents
document_subquery_old = (
db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
.filter(
Document.indexing_status == "completed",
Document.enabled == True,
Document.archived == False,
Document.updated_at < plan_pro_clean_day,
)
.group_by(Document.dataset_id)
.subquery()
)
# Main query with join and filter
stmt = (
select(Dataset)
.outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
.outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
.filter(
Dataset.created_at < plan_pro_clean_day,
func.coalesce(document_subquery_new.c.document_count, 0) == 0,
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
)
.order_by(Dataset.created_at.desc())
)
datasets = db.paginate(stmt, page=1, per_page=50)
except NotFound:
break
if datasets.items is None or len(datasets.items) == 0:
break
for dataset in datasets:
dataset_query = (
db.session.query(DatasetQuery)
.filter(DatasetQuery.created_at > plan_pro_clean_day, DatasetQuery.dataset_id == dataset.id)
.all()
)
if not dataset_query or len(dataset_query) == 0:
try:
features_cache_key = f"features:{dataset.tenant_id}"
plan_cache = redis_client.get(features_cache_key)
if plan_cache is None:
features = FeatureService.get_features(dataset.tenant_id)
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
plan = features.billing.subscription.plan
else:
plan = plan_cache.decode()
if plan == "sandbox":
# remove index
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
index_processor.clean(dataset, None)
# update document
update_params = {Document.enabled: False}
db.session.query(Document).filter_by(dataset_id=dataset.id).update(update_params)
db.session.commit()
click.echo(
click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green")
)
except Exception as e:
click.echo(
click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
)
end_at = time.perf_counter()
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))

View File

@@ -0,0 +1,61 @@
import time
import click
import app
from configs import dify_config
from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
from extensions.ext_database import db
from models.dataset import TidbAuthBinding
@app.celery.task(queue="dataset")
def create_tidb_serverless_task():
click.echo(click.style("Start create tidb serverless task.", fg="green"))
if not dify_config.CREATE_TIDB_SERVICE_JOB_ENABLED:
return
tidb_serverless_number = dify_config.TIDB_SERVERLESS_NUMBER
start_at = time.perf_counter()
while True:
try:
# check the number of idle tidb serverless
idle_tidb_serverless_number = (
db.session.query(TidbAuthBinding).filter(TidbAuthBinding.active == False).count()
)
if idle_tidb_serverless_number >= tidb_serverless_number:
break
# create tidb serverless
iterations_per_thread = 20
create_clusters(iterations_per_thread)
except Exception as e:
click.echo(click.style(f"Error: {e}", fg="red"))
break
end_at = time.perf_counter()
click.echo(click.style("Create tidb serverless task success latency: {}".format(end_at - start_at), fg="green"))
def create_clusters(batch_size):
try:
# TODO: maybe we can set the default value for the following parameters in the config file
new_clusters = TidbService.batch_create_tidb_serverless_cluster(
batch_size=batch_size,
project_id=dify_config.TIDB_PROJECT_ID or "",
api_url=dify_config.TIDB_API_URL or "",
iam_url=dify_config.TIDB_IAM_API_URL or "",
public_key=dify_config.TIDB_PUBLIC_KEY or "",
private_key=dify_config.TIDB_PRIVATE_KEY or "",
region=dify_config.TIDB_REGION or "",
)
for new_cluster in new_clusters:
tidb_auth_binding = TidbAuthBinding(
cluster_id=new_cluster["cluster_id"],
cluster_name=new_cluster["cluster_name"],
account=new_cluster["account"],
password=new_cluster["password"],
)
db.session.add(tidb_auth_binding)
db.session.commit()
except Exception as e:
click.echo(click.style(f"Error: {e}", fg="red"))

View File

@@ -0,0 +1,94 @@
import logging
import time
from collections import defaultdict
import click
from flask import render_template # type: ignore
import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_mail import mail
from models.account import Account, Tenant, TenantAccountJoin
from models.dataset import Dataset, DatasetAutoDisableLog
from services.feature_service import FeatureService
@app.celery.task(queue="dataset")
def mail_clean_document_notify_task():
"""
Async Send document clean notify mail
Usage: mail_clean_document_notify_task.delay()
"""
if not mail.is_inited():
return
logging.info(click.style("Start send document clean notify mail", fg="green"))
start_at = time.perf_counter()
# send document clean notify mail
try:
dataset_auto_disable_logs = (
db.session.query(DatasetAutoDisableLog).filter(DatasetAutoDisableLog.notified == False).all()
)
# group by tenant_id
dataset_auto_disable_logs_map: dict[str, list[DatasetAutoDisableLog]] = defaultdict(list)
for dataset_auto_disable_log in dataset_auto_disable_logs:
if dataset_auto_disable_log.tenant_id not in dataset_auto_disable_logs_map:
dataset_auto_disable_logs_map[dataset_auto_disable_log.tenant_id] = []
dataset_auto_disable_logs_map[dataset_auto_disable_log.tenant_id].append(dataset_auto_disable_log)
url = f"{dify_config.CONSOLE_WEB_URL}/datasets"
for tenant_id, tenant_dataset_auto_disable_logs in dataset_auto_disable_logs_map.items():
features = FeatureService.get_features(tenant_id)
plan = features.billing.subscription.plan
if plan != "sandbox":
knowledge_details = []
# check tenant
tenant = db.session.query(Tenant).filter(Tenant.id == tenant_id).first()
if not tenant:
continue
# check current owner
current_owner_join = (
db.session.query(TenantAccountJoin).filter_by(tenant_id=tenant.id, role="owner").first()
)
if not current_owner_join:
continue
account = db.session.query(Account).filter(Account.id == current_owner_join.account_id).first()
if not account:
continue
dataset_auto_dataset_map = {} # type: ignore
for dataset_auto_disable_log in tenant_dataset_auto_disable_logs:
if dataset_auto_disable_log.dataset_id not in dataset_auto_dataset_map:
dataset_auto_dataset_map[dataset_auto_disable_log.dataset_id] = []
dataset_auto_dataset_map[dataset_auto_disable_log.dataset_id].append(
dataset_auto_disable_log.document_id
)
for dataset_id, document_ids in dataset_auto_dataset_map.items():
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if dataset:
document_count = len(document_ids)
knowledge_details.append(rf"Knowledge base {dataset.name}: {document_count} documents")
if knowledge_details:
html_content = render_template(
"clean_document_job_mail_template-US.html",
userName=account.email,
knowledge_details=knowledge_details,
url=url,
)
mail.send(
to=account.email, subject="Dify Knowledge base auto disable notification", html=html_content
)
# update notified to True
for dataset_auto_disable_log in tenant_dataset_auto_disable_logs:
dataset_auto_disable_log.notified = True
db.session.commit()
end_at = time.perf_counter()
logging.info(
click.style("Send document clean notify mail succeeded: latency: {}".format(end_at - start_at), fg="green")
)
except Exception:
logging.exception("Send document clean notify mail failed")

View File

@@ -0,0 +1,52 @@
import time
import click
import app
from configs import dify_config
from core.rag.datasource.vdb.tidb_on_qdrant.tidb_service import TidbService
from extensions.ext_database import db
from models.dataset import TidbAuthBinding
@app.celery.task(queue="dataset")
def update_tidb_serverless_status_task():
click.echo(click.style("Update tidb serverless status task.", fg="green"))
start_at = time.perf_counter()
try:
# check the number of idle tidb serverless
tidb_serverless_list = (
db.session.query(TidbAuthBinding)
.filter(TidbAuthBinding.active == False, TidbAuthBinding.status == "CREATING")
.all()
)
if len(tidb_serverless_list) == 0:
return
# update tidb serverless status
update_clusters(tidb_serverless_list)
except Exception as e:
click.echo(click.style(f"Error: {e}", fg="red"))
end_at = time.perf_counter()
click.echo(
click.style("Update tidb serverless status task success latency: {}".format(end_at - start_at), fg="green")
)
def update_clusters(tidb_serverless_list: list[TidbAuthBinding]):
try:
# batch 20
for i in range(0, len(tidb_serverless_list), 20):
items = tidb_serverless_list[i : i + 20]
# TODO: maybe we can set the default value for the following parameters in the config file
TidbService.batch_update_tidb_serverless_cluster_status(
tidb_serverless_list=items,
project_id=dify_config.TIDB_PROJECT_ID or "",
api_url=dify_config.TIDB_API_URL or "",
iam_url=dify_config.TIDB_IAM_API_URL or "",
public_key=dify_config.TIDB_PUBLIC_KEY or "",
private_key=dify_config.TIDB_PRIVATE_KEY or "",
)
except Exception as e:
click.echo(click.style(f"Error: {e}", fg="red"))