首页 分享 超大文件中在有限的内存里找到单词频率 top 100

超大文件中在有限的内存里找到单词频率 top 100

来源:花匠小妙招 时间:2025-10-10 07:44

TOP 100 freq words

问题

问题:有一个 1G 大小的文件,里面每一行是一个词,每个词的大小不超过 16 字节,内存限制大小是 1M。返回出现频率最高的 100 个单词

思路

问题分析

有一个 1GB 的文件,每行是一个词,每个词不超过 16 字节。 内存限制为 1MB,因此无法一次性加载整个文件到内存中处理。 需要返回出现频率最高的 100 个词,要求精确结果(非近似)。 核心挑战:内存小,文件大,需高效处理大数据,避免内存溢出。

解决思路:分治法 + 外部排序 + 堆维护 Top 100

使用分治法将大文件分解为小文件块,每个块在内存中处理,然后多级合并部分结果。在合并过程中,维护一个最小堆来跟踪全局频率最高的 100 个词。思路简单直接,步骤如下:

分块(Divide): 将大文件分割成多个小文件块,每个块大小约 500KB(约 32,000 行),确保块内不同词的数量不超过内存限制(1MB)。 为什么 500KB?内存 1MB,每个词最大 16 字节,哈希表条目(词 + 频率)约 20-32 字节(包括开销),32,000 个不同词需约 640KB,小于 1MB。 分割方法:顺序读取大文件,逐行写入块文件,直到块大小达到约 500KB。重复直到处理完整个文件。 块数量:文件 1GB ≈ 1,000,000,000 字节,每行平均约 10 字节(词 + 换行符),总行数约 100,000,000 行。每个块 32,000 行,块数约 3,125(100,000,000 / 32,000 ≈ 3,125)。 块内处理(Process Chunks): 对每个块文件: 读入内存(约 500KB)。 使用哈希表统计词频:逐行读取,以词为键,累加频率。 块处理完后,对哈希表中的词按键(词)排序。 输出到临时文件:每行格式为 “词 频率”,并按词排序(便于后续合并)。 内存使用:哈希表存储词(最大 16 字节)和频率(4 字节整数),条目上限 32,000,总内存约 640KB,满足 1MB 限制。 输出:每个块生成一个排序后的部分频率文件。 第一级合并(Merge Partially): 部分文件数量多(约 3,125 个),需分组合并,避免同时打开太多文件(文件描述符限制,假设一次最多打开 256 个文件)。 分组:将约 3,125 个部分文件分成 13 组(每组 240 文件)和 1 组(剩余文件),确保每组文件数 ≤ 256。 对每组执行多路合并: 使用优先队列(最小堆)按键(词)合并:堆中每个条目存储(词、频率、文件索引),大小约 30 字节。 合并逻辑:弹出最小词,检查所有输入文件是否有相同词,累加频率,输出全局(词,聚合频率)到新临时文件(按词排序)。 内存使用:优先队列(256 条目 × 30 字节 ≈ 7.7KB),输入缓冲区(每个文件 512 字节缓冲区 × 256 文件 ≈ 128KB),输出缓冲区(4KB),总内存约 140KB < 1MB。 输出:每组生成一个更大的排序频率文件(约 13 个文件)。 第二级合并与 Top 100 提取(Final Merge and Heap Update): 第一级合并后,文件数少(约 13 个),可一次性合并。 执行多路合并(13 路): 使用优先队列按键(词)合并:堆中每个条目约 30 字节。 合并逻辑:弹出最小词,累加所有输入文件中的频率,得到全局(词,全局频率)。 同时维护一个最小堆(大小 100)用于 Top 100:以频率为键,存储(词,频率)。 对于每个全局(词,频率),插入堆中。 如果堆大小 > 100,弹出最小频率条目,确保堆始终保留最大频率的 100 个词。 内存使用:优先队列(13 条目 × 30 字节 ≈ 0.4KB),输入缓冲区(13 文件 × 4KB = 52KB),堆(100 条目 × 30 字节 ≈ 3KB),总内存约 55KB < 1MB。 注意:无需输出完整全局文件,只更新堆,节省内存和磁盘。 处理完所有词后,堆中即为频率最高的 100 个词。 输出结果: 堆中元素按频率最小堆组织,但 Top 100 已确定。 将堆中元素按频率降序排序(例如,通过逐个弹出并反转),返回结果。

为什么满足内存限制?

分块大小(500KB)确保块内处理在 1MB 内。 合并阶段使用小缓冲区(每个文件 512 字节-4KB)和紧凑数据结构(优先队列条目 30 字节),总内存始终 < 1MB。 堆维护 Top 100 仅需约 3KB,不影响内存。

优点

简单直接:分治减少内存压力,外部排序处理大数据,堆高效维护 Top K。 精确结果:通过全局频率累加,确保正确性。 可扩展:适用于更大文件或更小内存。

此思路可在标准编程语言(如 Python、Java)中实现,注意文件 I/O 和缓冲区管理以优化性能。

解法一

第一步:将 1 G 拆分成小文件,每个小文件的大小不超过 1 MB ,其实应该留空间用于统计一个小文件中单词出现的频率,所以为了保险起见,每个文件的大小不超过 512 kb,那么得到 1 GB / 512 KB = 2048 个小文件

这里不用 hash 的方法来切分小文件,直接顺序读取然后写到小文件,小文件大小多到了 512kb 就关闭,写下一个小文件

第二步:分别对每个文件做下面的事情:
① 将小文件中的单词加载进内存
② 使用 HashMap 进行单词统计
③ 将 HashMap 中词频数据写到另一个新的小文件中,我们称为词频小文件
这里再写的时候,对单词进行 hash (word) % 2048 写到对应的文件中
这样做的目的是为了相同的单词放到同一个文件中。

第三步:初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
分别对每个词频小文件做下面的事情:
① 将小文件中的单词及其频率加载进内存
② 使用 HashMap 进行单词统计
③ 遍历 HashMap 中单词词频,如果词频大于小顶堆中堆顶词频的话,则放入小顶堆,否则不放

最终,小顶堆中的 100 个单词就是出现频率最多的单词了

该方案在第二步的第 ③ 点的时候有点问题:就是当词频小文件大于 1 M 了,该怎么处理呢?
或者说极端情况下,每个单词都只出现一次,并且每个单词的 hash (word) % 2048 值都是相同的话,那词频小文件的大小都会超过 1 G 了

解法二

第一步:使用多路归并排序对大文件进行排序,这样相同的单词肯定是挨着的

第二步:
① 初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词
② 遍历整个文件,一个单词一个单词的从文件中取出来,并计数
③ 等到遍历的单词和上一个单词不同的话,那么上一个单词及其频率如果大于堆顶的词的频率,那么放在堆中,否则不放

最终,小顶堆中就是出现频率前 100 的单词了

多路归并排序对大文件进行排序的步骤如下:
① 将文件按照顺序切分成大小不超过 512KB 的小文件,总共 2048 个小文件
② 使用 1MB 内存分别对 2048 个小文件中的单词进行排序
③ 使用一个大小为 2048 大小的堆,对 2048 个小文件进行多路排序,结果写到一个大文件中

代码实现

数据准备和工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

public class FileIOUtils {


public static BufferedReader getReader(String name) {
try {
FileInputStream inputStream = new FileInputStream(name);
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
return br;
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}


public static BufferedWriter getWriter(String name) {
try {
FileOutputStream outputStream = new FileOutputStream(name);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream));
return bw;
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}

public static void closeReader(Reader reader) {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}

public static void closeWriter(Writer writer) {
try {
if (writer != null) {
writer.close();
}
} catch (IOException e) {
throw new RuntimeException("IOException", e);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27





public class _0_WordsGenerator {
private static Random r = new Random();

public static void main(String[] args) throws IOException {

BufferedWriter writer = FileIOUtils.getWriter("data\top100\words.txt");

char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g'};
int m = chars.length;

for (int i = 0; i < 10000; i++) {
StringBuilder line = new StringBuilder();
for (int j = 0; j < r.nextInt(16); j++) {
line.append(chars[r.nextInt(m)]);
}
if (line.length() == 0) continue;
writer.write(line.toString());
writer.newLine();
}

FileIOUtils.closeWriter(writer);
}
}

将一个大文件切割成多个小文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47






public class _1_FileSplit {

public void splitFile(String fileName) throws IOException {
int fileNum = 0;
String fileSuffix = "data\top100\raw_data\";
String littleFileName = fileSuffix + fileNum;


long totalSize = 0;

BufferedWriter bw = FileIOUtils.getWriter(littleFileName);

BufferedReader br = FileIOUtils.getReader(fileName);
String line = null;
while ((line = br.readLine()) != null) {

if (totalSize >= 512 * 1024) {

FileIOUtils.closeWriter(bw);


fileNum++;
littleFileName = fileSuffix + fileNum;
bw = FileIOUtils.getWriter(littleFileName);
totalSize = 0;
}

totalSize += line.length();

bw.write(line);
bw.newLine();
}

FileIOUtils.closeReader(br);
}

public static void main(String[] args) throws IOException {
String fileName = "data\top100\words.txt";
new _1_FileSplit().splitFile(fileName);
}
}

每个小文件中的单词进行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44





public class _2_LittleFileSorter {

public void sortEachFile(String dirName) throws IOException {
File dir = new File(dirName);
File[] littleFiles = dir.listFiles();


for (int i = 0; i < littleFiles.length; i++) {

BufferedReader br = FileIOUtils.getReader(littleFiles[i].getName());
List<String> words = new ArrayList<>();
String line = null;
while ((line = br.readLine()) != null) {
words.add(line);
}

FileIOUtils.closeReader(br);


Collections.sort(words);


BufferedWriter bw = FileIOUtils.getWriter("data\top100\sorted_data\" + i);
for (String word : words) {
bw.write(word);
bw.newLine();
}
FileIOUtils.closeWriter(bw);
}

}

public static void main(String[] args) throws IOException {
String dir = "data\top100\raw_data";
new _2_LittleFileSorter().sortEachFile(dir);
}

}


对有序的小文件进行外部排序

我们先写一个 BufferedIterator 类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32




public class BufferedIterator {

private BufferedReader reader;
private String head;

BufferedIterator(BufferedReader reader) {
this.reader = reader;
}

public boolean hasNext() {
try {
head = this.reader.readLine();
} catch (IOException e) {
e.printStackTrace();
head = null;
}
return head != null;
}

public String next() {
return head;
}

public void close() throws Exception {
this.reader.close();
}
}


然后使用多路归并排序来实现文件的外部排序,如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62





public class _3_ExternalSorter {

public void mergeSort(String dirName) throws Exception {

File dir = new File(dirName);
File[] children = dir.listFiles();






PriorityQueue<BufferedIterator> minHeap = new PriorityQueue<>(children.length, new Comparator<BufferedIterator>() {
@Override
public int compare(BufferedIterator o1, BufferedIterator o2) {


return o1.next().compareTo(o2.next());
}
});


for (File file : children) {
BufferedReader br = FileIOUtils.getReader(file.getName());
BufferedIterator buf = new BufferedIterator(br);

if (buf.hasNext()) {
minHeap.add(buf);
} else {
buf.close();
}
}

BufferedWriter bw = FileIOUtils.getWriter("data\top100\sorted_words.txt");

while (!minHeap.isEmpty()) {
BufferedIterator firstBuf = minHeap.poll();
bw.write(firstBuf.next());
bw.newLine();

if (firstBuf.hasNext()) {
minHeap.add(firstBuf);
} else {
firstBuf.close();
}
}

FileIOUtils.closeWriter(bw);
}

public static void main(String[] args) throws Exception {
String dirName = "data\top100\sorted_data\";
new _3_ExternalSorter().mergeSort(dirName);
}

}


利用 100 大小的小顶堆得到出现频率 top 100 的单词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80






public class _4_Top_100_Words {

class Pair {
String word;
int cnt;

Pair(String word, int cnt) {
this.word = word;
this.cnt = cnt;
}
}

public String[] top_100 (String fileName) throws Exception {



PriorityQueue<Pair> minHeap = new PriorityQueue<>(100, new Comparator<Pair>() {
@Override
public int compare(Pair o1, Pair o2) {


return o1.cnt - o2.cnt;
}
});



String prevWord = null;

int prevCnt = 0;

BufferedReader br = FileIOUtils.getReader(fileName);

String currWord = null;

while ((currWord = br.readLine()) != null) {


if (!currWord.equals(prevWord)) {

if (minHeap.size() < 100) {
minHeap.add(new Pair(prevWord, prevCnt));
} else if (prevCnt > minHeap.peek().cnt) {

minHeap.remove();

minHeap.add(new Pair(prevWord, prevCnt));
}


prevWord = currWord;

prevCnt = 0;
}

prevCnt++;
}


String[] res = new String[100];
int index = 0;
while (!minHeap.isEmpty()) {
res[index++] = minHeap.poll().word;
}

return res;
}

public static void main(String[] args) throws Exception {
String fileName = "data\top100\sorted_words.txt";
String[] res = new _4_Top_100_Words().top_100(fileName);
System.out.println(Arrays.toString(res));
}

}

MapReduce 框架 Hadoop Spark

MapReduce 思路解决大文件词频统计与 Top 100 问题

核心思想

MapReduce 是分布式计算模型,天然适合处理海量数据。我们将问题分解为两个 MapReduce 作业:

词频统计作业:精确计算每个词的全局频率 Top 100 提取作业:从全局词频中筛选最高频词 系统架构

1
2
3
4
5
6
7
8
9

输入文件 (1GB)

├─ MapReduce Job 1 (词频统计)
│ ├─ Mapper 局部词频统计
│ └─ Reducer 全局词频合并

└─ MapReduce Job 2 (Top 100 提取)
├─ Mapper 局部 Top 100 筛选
└─ Reducer 全局 Top 100 合并

Job 1: 全局词频统计

目标:精确计算每个词的出现频率

Map 阶段 输入:文件分块(每块 ≈500KB)

1
2
3
4
5
6
7
8

def mapper(block):
word_count = {}
for word in block:
word_count[word] = word_count.get(word, 0) + 1


for word, count in word_count.items():
yield (word, count)
内存控制:每个 Mapper 处理单块(500KB),哈希表内存 ≈700KB < 1MB 输出示例:(apple, 15), (banana, 23), … Shuffle 阶段 按单词哈希分区(如 hash(word) % R) 相同单词的计数发送到同一 Reducer Reduce 阶段 输入:相同单词的所有局部计数 [ (word, [count1, count2,...]) ]

1
2
3

def reducer(word, counts):
total = sum(counts)
yield (word, total)
内存控制: 设置 Reducer 数量 R = 4000 每个 Reducer 处理约 1亿/R ≈ 25,000 个单词 内存占用:25,000 词 × 30 字节 ≈ 750KB < 1MB 输出:全局词频文件(R 个分区文件)

Job 2: Top 100 提取

目标:从全局词频中筛选频率最高的 100 个词

Map 阶段 输入:Job 1 的输出文件(每个文件 ≈250KB)

1
2
3
4
5
6
7
8
9
10
11
12

def mapper(file):
heap = MinHeap(100)

for (word, count) in file:
if heap.size < 100 or count > heap.min():
heap.push((count, word))
if heap.size > 100:
heap.pop()


for count, word in heap.items:
yield (None, (word, count))
内存控制:100 个词 × 30 字节 ≈ 3KB,远低于 1MB 输出示例:(None, ("apple", 1580)), (None, ("banana", 1423))… Reduce 阶段(单 Reducer) 输入:所有 Mapper 的 Top 100 结果(共 4000 × 100 = 400,000 条)

1
2
3
4
5
6
7
8
9
10
11

def reducer(_, values):
heap = MinHeap(100)

for (word, count) in values:
if heap.size < 100 or count > heap.min():
heap.push((count, word))
if heap.size > 100:
heap.pop()


return sorted(heap.items, reverse=True)
内存控制: 流式处理:逐条读取记录,不一次性加载 堆大小固定 100,内存 ≈3KB 文件缓冲区:4KB 总计 < 10KB << 1MB

关键优化与内存保障

动态 Reducer 数量: Job 1 中 R = 4000 确保每个 Reducer 处理 ≤25,000 词 公式:R > (总词数 × 30 字节) / 1MB 流式处理: Job 2 的 Reducer 逐条处理记录,避免加载全部 400,000 条数据 堆排序优势: 时间复杂度 O(n log k)(k=100 为常数级) 空间复杂度 O(k) 严格受限 分布式扩展性: 文件更大时:增加 Mapper 数量 词量激增时:线性增加 Job 1 的 Reducer 数量

执行示例

1
2
3
4
5
6
7
8
9

flowchart TB
A[1GB 输入文件] --> B{Job1 Mappers}
B -->|局部词频| C[Shuffle]
C --> D{Job1 Reducers}
D -->|全局词频| E[4000 个分区文件]
E --> F{Job2 Mappers}
F -->|局部 Top100| G[Shuffle]
G --> H{Job2 Reducer}
H --> I[全局 Top100]

局部词频

全局词频

局部 Top100

1GB 输入文件

Job1 Mappers

Shuffle

Job1 Reducers

4000 个分区文件

Job2 Mappers

Shuffle

Job2 Reducer

全局 Top100

此方案严格满足 1MB 内存限制,充分利用 MapReduce 的分布式能力,同时保证结果精确性。在 Hadoop/Spark 等框架可直接实现。

相关知识

怎么看自己的内存频率
Kingston FURY叛逆者DDR5 CUDIMM内存评测 问鼎8800MT/s
eclipse 增加内存的方法、修改配置文件 内存优化
Gensim库的使用——Word2vec模型(二)训练自己的模型与训练参数
提升网站性能与内存管理:打造高效的前端体验
js上传文件带参数,并且,返回给前台文件路径,解析上传的xml文件,存储到数据库中
订花圈攻略:如何在有限预算内找到最合适的花圈
图欧学习资源网创站以来的更新日志(截止至2022.5.6)不完全统计
内存小而且不受任何防沉迷的游戏 steam内存小又好玩的游戏
矢量栀子花雕刻文件

网址: 超大文件中在有限的内存里找到单词频率 top 100 https://www.huajiangbk.com/newsview2396543.html

所属分类:花卉
上一篇: 在Windows上对一个非常大的
下一篇: 海量数据场景面试题:出现频率最高

推荐分享