超大文件中在有限的内存里找到单词频率 top 100
TOP 100 freq words
问题
问题:有一个 1G 大小的文件,里面每一行是一个词,每个词的大小不超过 16 字节,内存限制大小是 1M。返回出现频率最高的 100 个单词
思路
问题分析
有一个 1GB 的文件,每行是一个词,每个词不超过 16 字节。 内存限制为 1MB,因此无法一次性加载整个文件到内存中处理。 需要返回出现频率最高的 100 个词,要求精确结果(非近似)。 核心挑战:内存小,文件大,需高效处理大数据,避免内存溢出。解决思路:分治法 + 外部排序 + 堆维护 Top 100
使用分治法将大文件分解为小文件块,每个块在内存中处理,然后多级合并部分结果。在合并过程中,维护一个最小堆来跟踪全局频率最高的 100 个词。思路简单直接,步骤如下:
分块(Divide): 将大文件分割成多个小文件块,每个块大小约 500KB(约 32,000 行),确保块内不同词的数量不超过内存限制(1MB)。 为什么 500KB?内存 1MB,每个词最大 16 字节,哈希表条目(词 + 频率)约 20-32 字节(包括开销),32,000 个不同词需约 640KB,小于 1MB。 分割方法:顺序读取大文件,逐行写入块文件,直到块大小达到约 500KB。重复直到处理完整个文件。 块数量:文件 1GB ≈ 1,000,000,000 字节,每行平均约 10 字节(词 + 换行符),总行数约 100,000,000 行。每个块 32,000 行,块数约 3,125(100,000,000 / 32,000 ≈ 3,125)。 块内处理(Process Chunks): 对每个块文件: 读入内存(约 500KB)。 使用哈希表统计词频:逐行读取,以词为键,累加频率。 块处理完后,对哈希表中的词按键(词)排序。 输出到临时文件:每行格式为 “词 频率”,并按词排序(便于后续合并)。 内存使用:哈希表存储词(最大 16 字节)和频率(4 字节整数),条目上限 32,000,总内存约 640KB,满足 1MB 限制。 输出:每个块生成一个排序后的部分频率文件。 第一级合并(Merge Partially): 部分文件数量多(约 3,125 个),需分组合并,避免同时打开太多文件(文件描述符限制,假设一次最多打开 256 个文件)。 分组:将约 3,125 个部分文件分成 13 组(每组 240 文件)和 1 组(剩余文件),确保每组文件数 ≤ 256。 对每组执行多路合并: 使用优先队列(最小堆)按键(词)合并:堆中每个条目存储(词、频率、文件索引),大小约 30 字节。 合并逻辑:弹出最小词,检查所有输入文件是否有相同词,累加频率,输出全局(词,聚合频率)到新临时文件(按词排序)。 内存使用:优先队列(256 条目 × 30 字节 ≈ 7.7KB),输入缓冲区(每个文件 512 字节缓冲区 × 256 文件 ≈ 128KB),输出缓冲区(4KB),总内存约 140KB < 1MB。 输出:每组生成一个更大的排序频率文件(约 13 个文件)。 第二级合并与 Top 100 提取(Final Merge and Heap Update): 第一级合并后,文件数少(约 13 个),可一次性合并。 执行多路合并(13 路): 使用优先队列按键(词)合并:堆中每个条目约 30 字节。 合并逻辑:弹出最小词,累加所有输入文件中的频率,得到全局(词,全局频率)。 同时维护一个最小堆(大小 100)用于 Top 100:以频率为键,存储(词,频率)。 对于每个全局(词,频率),插入堆中。 如果堆大小 > 100,弹出最小频率条目,确保堆始终保留最大频率的 100 个词。 内存使用:优先队列(13 条目 × 30 字节 ≈ 0.4KB),输入缓冲区(13 文件 × 4KB = 52KB),堆(100 条目 × 30 字节 ≈ 3KB),总内存约 55KB < 1MB。 注意:无需输出完整全局文件,只更新堆,节省内存和磁盘。 处理完所有词后,堆中即为频率最高的 100 个词。 输出结果: 堆中元素按频率最小堆组织,但 Top 100 已确定。 将堆中元素按频率降序排序(例如,通过逐个弹出并反转),返回结果。为什么满足内存限制?
分块大小(500KB)确保块内处理在 1MB 内。 合并阶段使用小缓冲区(每个文件 512 字节-4KB)和紧凑数据结构(优先队列条目 30 字节),总内存始终 < 1MB。 堆维护 Top 100 仅需约 3KB,不影响内存。优点
简单直接:分治减少内存压力,外部排序处理大数据,堆高效维护 Top K。 精确结果:通过全局频率累加,确保正确性。 可扩展:适用于更大文件或更小内存。此思路可在标准编程语言(如 Python、Java)中实现,注意文件 I/O 和缓冲区管理以优化性能。
解法一
第一步:将 1 G 拆分成小文件,每个小文件的大小不超过 1 MB ,其实应该留空间用于统计一个小文件中单词出现的频率,所以为了保险起见,每个文件的大小不超过 512 kb,那么得到 1 GB / 512 KB = 2048 个小文件
这里不用 hash 的方法来切分小文件,直接顺序读取然后写到小文件,小文件大小多到了 512kb 就关闭,写下一个小文件
第二步:分别对每个文件做下面的事情:
① 将小文件中的单词加载进内存
② 使用 HashMap 进行单词统计
③ 将 HashMap 中词频数据写到另一个新的小文件中,我们称为词频小文件
这里再写的时候,对单词进行 hash (word) % 2048 写到对应的文件中
这样做的目的是为了相同的单词放到同一个文件中。
第三步:初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
分别对每个词频小文件做下面的事情:
① 将小文件中的单词及其频率加载进内存
② 使用 HashMap 进行单词统计
③ 遍历 HashMap 中单词词频,如果词频大于小顶堆中堆顶词频的话,则放入小顶堆,否则不放
最终,小顶堆中的 100 个单词就是出现频率最多的单词了
该方案在第二步的第 ③ 点的时候有点问题:就是当词频小文件大于 1 M 了,该怎么处理呢?
或者说极端情况下,每个单词都只出现一次,并且每个单词的 hash (word) % 2048 值都是相同的话,那词频小文件的大小都会超过 1 G 了
解法二
第一步:使用多路归并排序对大文件进行排序,这样相同的单词肯定是挨着的
第二步:
① 初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
② 遍历整个文件,一个单词一个单词的从文件中取出来,并计数
③ 等到遍历的单词和上一个单词不同的话,那么上一个单词及其频率如果大于堆顶的词的频率,那么放在堆中,否则不放
最终,小顶堆中就是出现频率前 100 的单词了
多路归并排序对大文件进行排序的步骤如下:
① 将文件按照顺序切分成大小不超过 512KB 的小文件,总共 2048 个小文件
② 使用 1MB 内存分别对 2048 个小文件中的单词进行排序
③ 使用一个大小为 2048 大小的堆,对 2048 个小文件进行多路排序,结果写到一个大文件中
代码实现
数据准备和工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class FileIOUtils {
public static BufferedReader getReader(String name) {
try {
FileInputStream inputStream = new FileInputStream(name);
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
return br;
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}
public static BufferedWriter getWriter(String name) {
try {
FileOutputStream outputStream = new FileOutputStream(name);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
return bw;
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}
public static void closeReader(Reader reader) {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}
public static void closeWriter(Writer writer) {
try {
if (writer != null) {
writer.close();
}
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class _0_WordsGenerator {
private static Random r = new Random();
public static void main(String[] args) throws IOException {
BufferedWriter writer = FileIOUtils.getWriter("data\top100\words.txt");
char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g'};
int m = chars.length;
for (int i = 0; i < 10000; i++) {
StringBuilder line = new StringBuilder();
for (int j = 0; j < r.nextInt(16); j++) {
line.append(chars[r.nextInt(m)]);
}
if (line.length() == 0) continue;
writer.write(line.toString());
writer.newLine();
}
FileIOUtils.closeWriter(writer);
}
}
将一个大文件切割成多个小文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class _1_FileSplit {
public void splitFile(String fileName) throws IOException {
int fileNum = 0;
String fileSuffix = "data\top100\raw_data\";
String littleFileName = fileSuffix + fileNum;
long totalSize = 0;
BufferedWriter bw = FileIOUtils.getWriter(littleFileName);
BufferedReader br = FileIOUtils.getReader(fileName);
String line = null;
while ((line = br.readLine()) != null) {
if (totalSize >= 512 * 1024) {
FileIOUtils.closeWriter(bw);
fileNum++;
littleFileName = fileSuffix + fileNum;
bw = FileIOUtils.getWriter(littleFileName);
totalSize = 0;
}
totalSize += line.length();
bw.write(line);
bw.newLine();
}
FileIOUtils.closeReader(br);
}
public static void main(String[] args) throws IOException {
String fileName = "data\top100\words.txt";
new _1_FileSplit().splitFile(fileName);
}
}
每个小文件中的单词进行排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class _2_LittleFileSorter {
public void sortEachFile(String dirName) throws IOException {
File dir = new File(dirName);
File[] littleFiles = dir.listFiles();
for (int i = 0; i < littleFiles.length; i++) {
BufferedReader br = FileIOUtils.getReader(littleFiles[i].getName());
List<String> words = new ArrayList<>();
String line = null;
while ((line = br.readLine()) != null) {
words.add(line);
}
FileIOUtils.closeReader(br);
Collections.sort(words);
BufferedWriter bw = FileIOUtils.getWriter("data\top100\sorted_data\" + i);
for (String word : words) {
bw.write(word);
bw.newLine();
}
FileIOUtils.closeWriter(bw);
}
}
public static void main(String[] args) throws IOException {
String dir = "data\top100\raw_data";
new _2_LittleFileSorter().sortEachFile(dir);
}
}
对有序的小文件进行外部排序
我们先写一个 BufferedIterator 类,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class BufferedIterator {
private BufferedReader reader;
private String head;
BufferedIterator(BufferedReader reader) {
this.reader = reader;
}
public boolean hasNext() {
try {
head = this.reader.readLine();
} catch (IOException e) {
e.printStackTrace();
head = null;
}
return head != null;
}
public String next() {
return head;
}
public void close() throws Exception {
this.reader.close();
}
}
然后使用多路归并排序来实现文件的外部排序,如下代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class _3_ExternalSorter {
public void mergeSort(String dirName) throws Exception {
File dir = new File(dirName);
File[] children = dir.listFiles();
PriorityQueue<BufferedIterator> minHeap = new PriorityQueue<>(children.length, new Comparator<BufferedIterator>() {
@Override
public int compare(BufferedIterator o1, BufferedIterator o2) {
return o1.next().compareTo(o2.next());
}
});
for (File file : children) {
BufferedReader br = FileIOUtils.getReader(file.getName());
BufferedIterator buf = new BufferedIterator(br);
if (buf.hasNext()) {
minHeap.add(buf);
} else {
buf.close();
}
}
BufferedWriter bw = FileIOUtils.getWriter("data\top100\sorted_words.txt");
while (!minHeap.isEmpty()) {
BufferedIterator firstBuf = minHeap.poll();
bw.write(firstBuf.next());
bw.newLine();
if (firstBuf.hasNext()) {
minHeap.add(firstBuf);
} else {
firstBuf.close();
}
}
FileIOUtils.closeWriter(bw);
}
public static void main(String[] args) throws Exception {
String dirName = "data\top100\sorted_data\";
new _3_ExternalSorter().mergeSort(dirName);
}
}
利用 100 大小的小顶堆得到出现频率 top 100 的单词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class _4_Top_100_Words {
class Pair {
String word;
int cnt;
Pair(String word, int cnt) {
this.word = word;
this.cnt = cnt;
}
}
public String[] top_100 (String fileName) throws Exception {
PriorityQueue<Pair> minHeap = new PriorityQueue<>(100, new Comparator<Pair>() {
@Override
public int compare(Pair o1, Pair o2) {
return o1.cnt - o2.cnt;
}
});
String prevWord = null;
int prevCnt = 0;
BufferedReader br = FileIOUtils.getReader(fileName);
String currWord = null;
while ((currWord = br.readLine()) != null) {
if (!currWord.equals(prevWord)) {
if (minHeap.size() < 100) {
minHeap.add(new Pair(prevWord, prevCnt));
} else if (prevCnt > minHeap.peek().cnt) {
minHeap.remove();
minHeap.add(new Pair(prevWord, prevCnt));
}
prevWord = currWord;
prevCnt = 0;
}
prevCnt++;
}
String[] res = new String[100];
int index = 0;
while (!minHeap.isEmpty()) {
res[index++] = minHeap.poll().word;
}
return res;
}
public static void main(String[] args) throws Exception {
String fileName = "data\top100\sorted_words.txt";
String[] res = new _4_Top_100_Words().top_100(fileName);
System.out.println(Arrays.toString(res));
}
}
MapReduce 框架 Hadoop Spark
MapReduce 思路解决大文件词频统计与 Top 100 问题
核心思想MapReduce 是分布式计算模型,天然适合处理海量数据。我们将问题分解为两个 MapReduce 作业:
词频统计作业:精确计算每个词的全局频率 Top 100 提取作业:从全局词频中筛选最高频词 系统架构1
2
3
4
5
6
7
8
9
输入文件 (1GB)
│
├─ MapReduce Job 1 (词频统计)
│ ├─ Mapper 局部词频统计
│ └─ Reducer 全局词频合并
│
└─ MapReduce Job 2 (Top 100 提取)
├─ Mapper 局部 Top 100 筛选
└─ Reducer 全局 Top 100 合并
Job 1: 全局词频统计
目标:精确计算每个词的出现频率
Map 阶段 输入:文件分块(每块 ≈500KB)1
2
3
4
5
6
7
8
def mapper(block):
word_count = {}
for word in block:
word_count[word] = word_count.get(word, 0) + 1
for word, count in word_count.items():
yield (word, count)
内存控制:每个 Mapper 处理单块(500KB),哈希表内存 ≈700KB < 1MB 输出示例:(apple, 15), (banana, 23), … Shuffle 阶段 按单词哈希分区(如 hash(word) % R) 相同单词的计数发送到同一 Reducer Reduce 阶段 输入:相同单词的所有局部计数 [ (word, [count1, count2,...]) ]
1
2
3
def reducer(word, counts):
total = sum(counts)
yield (word, total)
内存控制: 设置 Reducer 数量 R = 4000 每个 Reducer 处理约 1亿/R ≈ 25,000 个单词 内存占用:25,000 词 × 30 字节 ≈ 750KB < 1MB 输出:全局词频文件(R 个分区文件)
Job 2: Top 100 提取
目标:从全局词频中筛选频率最高的 100 个词
Map 阶段 输入:Job 1 的输出文件(每个文件 ≈250KB)1
2
3
4
5
6
7
8
9
10
11
12
def mapper(file):
heap = MinHeap(100)
for (word, count) in file:
if heap.size < 100 or count > heap.min():
heap.push((count, word))
if heap.size > 100:
heap.pop()
for count, word in heap.items:
yield (None, (word, count))
内存控制:100 个词 × 30 字节 ≈ 3KB,远低于 1MB 输出示例:(None, ("apple", 1580)), (None, ("banana", 1423))… Reduce 阶段(单 Reducer) 输入:所有 Mapper 的 Top 100 结果(共 4000 × 100 = 400,000 条)
1
2
3
4
5
6
7
8
9
10
11
def reducer(_, values):
heap = MinHeap(100)
for (word, count) in values:
if heap.size < 100 or count > heap.min():
heap.push((count, word))
if heap.size > 100:
heap.pop()
return sorted(heap.items, reverse=True)
内存控制: 流式处理:逐条读取记录,不一次性加载 堆大小固定 100,内存 ≈3KB 文件缓冲区:4KB 总计 < 10KB << 1MB
关键优化与内存保障
动态 Reducer 数量: Job 1 中 R = 4000 确保每个 Reducer 处理 ≤25,000 词 公式:R > (总词数 × 30 字节) / 1MB 流式处理: Job 2 的 Reducer 逐条处理记录,避免加载全部 400,000 条数据 堆排序优势: 时间复杂度 O(n log k)(k=100 为常数级) 空间复杂度 O(k) 严格受限 分布式扩展性: 文件更大时:增加 Mapper 数量 词量激增时:线性增加 Job 1 的 Reducer 数量执行示例
1
2
3
4
5
6
7
8
9
flowchart TB
A[1GB 输入文件] --> B{Job1 Mappers}
B -->|局部词频| C[Shuffle]
C --> D{Job1 Reducers}
D -->|全局词频| E[4000 个分区文件]
E --> F{Job2 Mappers}
F -->|局部 Top100| G[Shuffle]
G --> H{Job2 Reducer}
H --> I[全局 Top100]
局部词频
全局词频
局部 Top100
1GB 输入文件
Job1 Mappers
Shuffle
Job1 Reducers
4000 个分区文件
Job2 Mappers
Shuffle
Job2 Reducer
全局 Top100
此方案严格满足 1MB 内存限制,充分利用 MapReduce 的分布式能力,同时保证结果精确性。在 Hadoop/Spark 等框架可直接实现。
相关知识
怎么看自己的内存频率
Kingston FURY叛逆者DDR5 CUDIMM内存评测 问鼎8800MT/s
eclipse 增加内存的方法、修改配置文件 内存优化
Gensim库的使用——Word2vec模型(二)训练自己的模型与训练参数
提升网站性能与内存管理:打造高效的前端体验
js上传文件带参数,并且,返回给前台文件路径,解析上传的xml文件,存储到数据库中
订花圈攻略:如何在有限预算内找到最合适的花圈
图欧学习资源网创站以来的更新日志(截止至2022.5.6)不完全统计
内存小而且不受任何防沉迷的游戏 steam内存小又好玩的游戏
矢量栀子花雕刻文件
网址: 超大文件中在有限的内存里找到单词频率 top 100 https://www.huajiangbk.com/newsview2396543.html
上一篇: 在Windows上对一个非常大的 |
下一篇: 海量数据场景面试题:出现频率最高 |
推荐分享

- 1君子兰什么品种最名贵 十大名 4012
- 2世界上最名贵的10种兰花图片 3364
- 3花圈挽联怎么写? 3286
- 4迷信说家里不能放假花 家里摆 1878
- 5香山红叶什么时候红 1493
- 6花的意思,花的解释,花的拼音 1210
- 7教师节送什么花最合适 1167
- 8勿忘我花图片 1103
- 9橄榄枝的象征意义 1093
- 10洛阳的市花 1039