2026-04-23

04月23日

一、今日完成情况

  • PPT通信网络基础的处理,有待整合还有视频制作
  • 完成锚点间断点保存,并且两个数据库实现互通,总算知道我们的数据是从哪个数据库当中来的了,早就该问了。
  • 体验助教,讲台上嘎嘣住了。

二、今日感悟

  • 核心业务数据​:
    • 实验室项目进展还不错
    • 三体看完了第二步还不错,状态非常差居然有这个成果。
  • ​今日工作总结:​
    • 无,对了,导师的句句话暗含杀气呀。
  • ​明日工作计划:
  • ​今日学习成长:​

三、备注

  • 明天我需要把数据批量处理打标的任务给完成。
  • 每天需要完成。后天要讲的老板的课的 PPT,还有相关的内容的理解。
  • 明天晚上的话要去费老师的课程收作业,可以思考一下课程可以做一些什么事情。
  • 明天还需要整合一下通信网络基础的 PPT。最好分一下工,对后续的 PPT 演讲视频。明天还需要完成通讯网络基础,我这一部分的图的演示,并且整合成一个4页的 PPT。

四、数据抽取任务–线程、批次、任务数

CleanShot 2026-04-23 at 17.05.09@2x

这是我当前的并发配置,还有每次运行多少个任务,最后从环境变量当中获取任务的批次。

任务的批次我们可以不考虑,因为每一次任务的运行都是一批批的,可以原子分割,那么我们就把问题简化成了一个批次当中,并发的数量 和 任务的数量不同的时候,效率是怎么样的。

1、情况一:并发数 > 任务数(数据)

就是当前我的配置的情况,相当于AI有八个工人给我打工,可是我当前一个批次只有5个货物,而且每个货物都是无法拆分的(数据抽取),不可能把一个数据抽取任务拆开让AI完成,因此此情况下,有3个线程冗余,一直处于空闲状态,对于资源的利用效率不高

2、情况二:并发数 < 任务数(数据)

假如说,10条数据要处理,我当前并发数是3。

这个就可以理解了,一个并发线程处理一条数据,处理完了之后继续处理没完成的,有点像银行柜台处理业务。

所以对于线程并发的利用率是比较到位的。

比较合适的,并发数 = 任务数,这样的话每个线程一开始都是同时执行,然后最慢的那个线程处理完成之后,这个批次就结束了,开始下一个批次。

3、原则:

  • 批次是老大:一轮必须等所有任务做完才结束
  • 线程是工人:多出来的工人只能闲着,不能提前干下一轮的活
  • 线程数 > 任务数:不影响批次结束时间,只是资源小冗余

并发编程的核心本质:

核心答案

✅ 总处理数据条数 只和 批次 × 每批数据 有关,和线程数无关
✅ 线程数 只决定处理速度,不决定总数据量
✅ 这完全是线程池的核心思想

1、线程数 = 每批数据量 → 等最慢的那个完成,才开始下一批
2、每批数据量 > 线程数线程池会自动复用空闲线程,干完一个任务的线程,立刻接手下一个未处理的任务,不用等所有线程都干完,直到这一批所有数据全部处理完,才开始下一批。

概念 比喻 核心规则
线程池(MAX_WORKERS=8) 8 个固定工人 人不变,干完一个接一个
每批数据(FETCH_BATCH_SIZE=N) 每轮搬来 N 件货物 一组货物,必须全干完,才能搬下一组
批次(RUN_BATCHES) 总共搬几轮货物 总货物数 = 轮数 × 每轮货物
你的任务是 IO 密集型(99% 时间在等 AI API 返回,不是本地计算),最优配置只有 2 条
  1. 线程数固定(MAX_WORKERS=8,不要改,受 AI API 限速限制)
  2. 每批数据量 ≫ 线程数(比如 20/30/50,越大越好)
    → 让 8 个工人一直有活干,不空闲
  3. 轮次固定 → 总数据量最大,总耗时最短
  • 每批数据越大 → 轮次越少 → 减少「取数、等待、存档」的间隙时间
  • 线程池复用 → 8 个线程不停干活,无空闲
  • 总数据 = 轮次 × 每批数 → 每批数越大,总数越多

确实是这样,这样效率最高,我悟了!

五、面对任务断点的可扩展.env参数配置

.env配置

ds_api_key=sk-f127ef4564174b1eaf3b1cd63a0c3635


# ===================== 2. MongoDB 基础配置 =====================
MONGO_HOST=121.48.163.69
MONGO_PORT=27018
MONGO_USERNAME=kipley
MONGO_PASSWORD=188390
MONGO_DB_NAME=task-2025-10-20

# ===================== 3. 并发配置(保持不变) =====================
MAX_WORKERS=8
FETCH_BATCH_SIZE=25
RUN_BATCHES=1

# ===================== 🔥 4. 任务核心配置(多Collection/断点 核心!) =====================
# 唯一任务键(改这个=新任务/重跑,格式:源集合_版本)
TASK_KEY=google_news_copy_v1
# 源集合(你要读取的集合,切换数据只改这里)
SOURCE_COLLECTION=google_news_copy
# 目标集合(处理结果写入)
TARGET_COLLECTION=ldh_processed_news_events_test
# 运行模式:CONTINUE=断点续跑 | RESTART=重新开始
RUN_MODE=CONTINUE
# 手动指定起始ID(优先级最高,断点续跑/重跑都可覆盖,为空则不启用)
MANUAL_START_ID=

优先级:手动指定ID > 断点续跑 > 重新开始

if MANUAL_START_ID:
	last_id = ObjectId(MANUAL_START_ID)
	print(f"✅ 启用手动断点:从 {last_id} 开始处理")
elif RUN_MODE == "CONTINUE" and checkpoint["last_processed_id"]:
	last_id = checkpoint["last_processed_id"]
	print(f"✅ 断点续跑:从上次结束位置 {last_id} 开始")
elif RUN_MODE == "RESTART":
	last_id = None
	print("✅ 重新开始:从头处理所有数据")

从这里可以看出来,最高的就是手动指定ID,下面是规则:

任务说明书:

你 .env 里写的 TASK_KEY

代码逻辑(工业级标准):

  1. 读取 .env 中的 TASK_KEY
  2. etl_checkpoints 表中只查这一条记录
  3. 读取这条记录里的 last_processed_id(断点)
  4. 从这个断点继续跑
  5. 完全不碰其他 Collection 的断点

1. TASK_KEY

官方名称:任务唯一键

  • 作用:标记你当前要跑哪个集合的哪个版本进度
  • 格式:源集合名称_版本
  • 规则:
    • 同一个集合,不同 TASK_KEY = 不同进度
    • 同一个集合,相同 TASK_KEY = 读取同一个断点

2. SOURCE_COLLECTION

官方名称:源数据集合

  • 作用:你要读取哪一张表的数据
  • 必须和 TASK_KEY 里的集合名一致

3. TARGET_COLLECTION

官方名称:结果写入集合

  • 作用:处理后的结果写到哪里去

4. RUN_MODE

官方名称:运行模式
只有 2 种取值,无第三种:

  • CONTINUE断点续跑 → 从上次停下的地方继续跑
  • RESTART重新开始 → 从头跑,覆盖当前任务的断点

5. MANUAL_START_ID

官方名称:手动指定起始 ID

  • 作用:强制从某条 _id 后面开始跑
  • 优先级:最高!
    填了这个值 → 无视断点、无视模式,直接从这条 ID 开始
  • 不使用时:必须留空,不能写任何字符

所以,我第五个部分一般情况是不需要自己指定的,因为我没有必要指定从一个确定的地方开始,如果确实要这样的话,那就是拿一个确定的数据做测试的时候需要用得到,批量处理事用不到的。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 kipleyarch@gmail.com
Archive PDF预览 PPTX Obsidian