04月23日
一、今日完成情况
- PPT通信网络基础的处理,有待整合还有视频制作
- 完成锚点间断点保存,并且两个数据库实现互通,总算知道我们的数据是从哪个数据库当中来的了,早就该问了。
- 体验助教,讲台上嘎嘣住了。
二、今日感悟
- 核心业务数据:
- 实验室项目进展还不错
- 三体看完了第二步还不错,状态非常差居然有这个成果。
- 今日工作总结:
- 无,对了,导师的句句话暗含杀气呀。
- 明日工作计划:
- 无
- 今日学习成长:
- 无
三、备注
- 明天我需要把数据批量处理打标的任务给完成。
- 每天需要完成。后天要讲的老板的课的 PPT,还有相关的内容的理解。
- 明天晚上的话要去费老师的课程收作业,可以思考一下课程可以做一些什么事情。
- 明天还需要整合一下通信网络基础的 PPT。最好分一下工,对后续的 PPT 演讲视频。明天还需要完成通讯网络基础,我这一部分的图的演示,并且整合成一个4页的 PPT。
四、数据抽取任务–线程、批次、任务数
这是我当前的并发配置,还有每次运行多少个任务,最后从环境变量当中获取任务的批次。
任务的批次我们可以不考虑,因为每一次任务的运行都是一批批的,可以原子分割,那么我们就把问题简化成了一个批次当中,并发的数量 和 任务的数量不同的时候,效率是怎么样的。
1、情况一:并发数 > 任务数(数据)
就是当前我的配置的情况,相当于AI有八个工人给我打工,可是我当前一个批次只有5个货物,而且每个货物都是无法拆分的(数据抽取),不可能把一个数据抽取任务拆开让AI完成,因此此情况下,有3个线程冗余,一直处于空闲状态,对于资源的利用效率不高
2、情况二:并发数 < 任务数(数据)
假如说,10条数据要处理,我当前并发数是3。
这个就可以理解了,一个并发线程处理一条数据,处理完了之后继续处理没完成的,有点像银行柜台处理业务。
所以对于线程并发的利用率是比较到位的。
比较合适的,并发数 = 任务数,这样的话每个线程一开始都是同时执行,然后最慢的那个线程处理完成之后,这个批次就结束了,开始下一个批次。
3、原则:
- 批次是老大:一轮必须等所有任务做完才结束
- 线程是工人:多出来的工人只能闲着,不能提前干下一轮的活
- 线程数 > 任务数:不影响批次结束时间,只是资源小冗余
并发编程的核心本质:
核心答案:
✅ 总处理数据条数 只和 批次 × 每批数据 有关,和线程数无关
✅ 线程数 只决定处理速度,不决定总数据量
✅ 这完全是线程池的核心思想!
1、线程数 = 每批数据量 → 等最慢的那个完成,才开始下一批
2、每批数据量 > 线程数 → 线程池会自动复用空闲线程,干完一个任务的线程,立刻接手下一个未处理的任务,不用等所有线程都干完,直到这一批所有数据全部处理完,才开始下一批。
| 概念 | 比喻 | 核心规则 |
|---|---|---|
| 线程池(MAX_WORKERS=8) | 8 个固定工人 | 人不变,干完一个接一个 |
| 每批数据(FETCH_BATCH_SIZE=N) | 每轮搬来 N 件货物 | 一组货物,必须全干完,才能搬下一组 |
| 批次(RUN_BATCHES) | 总共搬几轮货物 | 总货物数 = 轮数 × 每轮货物 |
| 你的任务是 IO 密集型(99% 时间在等 AI API 返回,不是本地计算),最优配置只有 2 条: |
- 线程数固定(MAX_WORKERS=8,不要改,受 AI API 限速限制)
- 每批数据量 ≫ 线程数(比如 20/30/50,越大越好)
→ 让 8 个工人一直有活干,不空闲 - 轮次固定 → 总数据量最大,总耗时最短
- 每批数据越大 → 轮次越少 → 减少「取数、等待、存档」的间隙时间
- 线程池复用 → 8 个线程不停干活,无空闲
- 总数据 = 轮次 × 每批数 → 每批数越大,总数越多
确实是这样,这样效率最高,我悟了!
五、面对任务断点的可扩展.env参数配置
.env配置
ds_api_key=sk-f127ef4564174b1eaf3b1cd63a0c3635
# ===================== 2. MongoDB 基础配置 =====================
MONGO_HOST=121.48.163.69
MONGO_PORT=27018
MONGO_USERNAME=kipley
MONGO_PASSWORD=188390
MONGO_DB_NAME=task-2025-10-20
# ===================== 3. 并发配置(保持不变) =====================
MAX_WORKERS=8
FETCH_BATCH_SIZE=25
RUN_BATCHES=1
# ===================== 🔥 4. 任务核心配置(多Collection/断点 核心!) =====================
# 唯一任务键(改这个=新任务/重跑,格式:源集合_版本)
TASK_KEY=google_news_copy_v1
# 源集合(你要读取的集合,切换数据只改这里)
SOURCE_COLLECTION=google_news_copy
# 目标集合(处理结果写入)
TARGET_COLLECTION=ldh_processed_news_events_test
# 运行模式:CONTINUE=断点续跑 | RESTART=重新开始
RUN_MODE=CONTINUE
# 手动指定起始ID(优先级最高,断点续跑/重跑都可覆盖,为空则不启用)
MANUAL_START_ID=
优先级:手动指定ID > 断点续跑 > 重新开始
if MANUAL_START_ID:
last_id = ObjectId(MANUAL_START_ID)
print(f"✅ 启用手动断点:从 {last_id} 开始处理")
elif RUN_MODE == "CONTINUE" and checkpoint["last_processed_id"]:
last_id = checkpoint["last_processed_id"]
print(f"✅ 断点续跑:从上次结束位置 {last_id} 开始")
elif RUN_MODE == "RESTART":
last_id = None
print("✅ 重新开始:从头处理所有数据")
从这里可以看出来,最高的就是手动指定ID,下面是规则:
任务说明书:
你 .env 里写的 TASK_KEY
代码逻辑(工业级标准):
- 读取
.env中的TASK_KEY - 去
etl_checkpoints表中只查这一条记录 - 读取这条记录里的
last_processed_id(断点) - 从这个断点继续跑
- 完全不碰其他 Collection 的断点
1. TASK_KEY
官方名称:任务唯一键
- 作用:标记你当前要跑哪个集合的哪个版本进度
- 格式:
源集合名称_版本 - 规则:
- 同一个集合,不同 TASK_KEY = 不同进度
- 同一个集合,相同 TASK_KEY = 读取同一个断点
2. SOURCE_COLLECTION
官方名称:源数据集合
- 作用:你要读取哪一张表的数据
- 必须和
TASK_KEY里的集合名一致
3. TARGET_COLLECTION
官方名称:结果写入集合
- 作用:处理后的结果写到哪里去
4. RUN_MODE
官方名称:运行模式
只有 2 种取值,无第三种:
CONTINUE:断点续跑 → 从上次停下的地方继续跑RESTART:重新开始 → 从头跑,覆盖当前任务的断点
5. MANUAL_START_ID
官方名称:手动指定起始 ID
- 作用:强制从某条
_id后面开始跑 - 优先级:最高!
填了这个值 → 无视断点、无视模式,直接从这条 ID 开始 - 不使用时:必须留空,不能写任何字符
所以,我第五个部分一般情况是不需要自己指定的,因为我没有必要指定从一个确定的地方开始,如果确实要这样的话,那就是拿一个确定的数据做测试的时候需要用得到,批量处理事用不到的。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 kipleyarch@gmail.com