若是给你一个包罗一亿行数据的超大文件,让你将数据转化导入消费数据库,你会若何操做?
上面的问题是接到的一个实在的营业需求,将一个老系统汗青数据通过线下文件的体例迁徙到新的消费系统。
因为时间紧,而数据量又超大,所以设想的过程想到一下处理法子:
拆分文件多线程导入拆分文件起首我们能够写个小法式,或者利用拆分号令 split 将那个超大文件拆分一个个小文件。
-- 将一个大文件拆分红若干个小文件,每个文件 100000 行split -l 100000 largeFile.txt -d -a 4 smallFile_那里之所以选择先将大文件拆分,次要考虑到两个原因:
第一若是法式间接读取那个大文件,假设读取一半的时候,法式突然宕机,如许就会间接丧失文件读取的进度,又需要从头开头读取。
而文件拆分之后,一旦小文件读取完毕,我们能够将小文件挪动一个指定文件夹。
如许即便应用法式宕机重启,我们从头读取时,只需要读取剩余的文件。
第二,一个文件,只能被一个应用法式读取,如许就限造了导入的速度。
而文件拆分之后,我们能够接纳多节点摆设的体例,程度扩展。每个节点读取一部门文件,如许就能够成倍的加快导入速度。
多线程导入当我们拆分完文件,接着我们就需要读取文件内容,停止导入。
之前拆分的时候,设置每个小文件包罗 10w 行的数据。因为担忧一会儿将 10w 数据读取应用中,招致堆内存占用过高,引起频繁的 「Full GC」,所以下面接纳流式读取的体例,一行一行的读取数据。
当然了,若是拆分之后文件很小,或者说应用的堆内存设置很大,我们能够间接将文件加载到应用内存中处置。如许相对来说简单一点。
逐行读取的代码如下:
File file = ...try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line=iterator.nextLine(); convertToDB(line); }}上面代码利用 commons-io 中的 LineIterator类,那个类底层利用了 BufferedReader 读取文件内容。它将其封拆成迭代器形式,如许我们能够很便利的迭代读取。
若是当前利用 JDK1.8 ,那么上述操做愈加简单,我们能够间接利用 JDK 原生的类 Files将文件转成 Stream 体例读取,代码如下:
Files.lines(Paths.get("文件途径"), Charset.defaultCharset()).forEach(line -> { convertToDB(line);});其实认真看下 Files#lines底层源码,其实原理跟上面的 LineIterator类似,同样也是封拆成迭代器形式。
多线程的引入存在的问题上述读取的代码写起来不难,但是存在效率问题,次要是因为只要单线程在导入,上一行数据导入完成之后,才气继续操做下一行。
为了加快导入速度,那我们就多来几个线程,并发导入。
多线程我们天然将会利用线程池的体例,相关代码革新如下:
File file = ...;ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, // 文件数量,假设文件包罗 10W 行 new ArrayBlockingQueue<>(10*10000), // guava 供给 new ThreadFactoryBuilder().setNameFormat("test-%d").build());try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> { convertToDB(line); }); }}上述代码中,每读取到一行内容,就会间接交给线程池来施行。
我们晓得线程池原理如下:
若是核心线程数未满,将会间接创建线程施行使命。若是核心线程数已满,将会把使命放入到队列中。若是队列已满,将会再创建线程施行使命。若是更大线程数已满,队列也已满,那么将会施行回绝战略。
线程池施行流程图因为我们上述线程池设置的核心线程数为 5,很快就抵达了更大核心线程数,后续使命只能被参加队列。
为了后续使命不被线程池回绝,我们能够接纳如下计划:
将队列容量设置成很大,包罗整个文件所有行数将更大线程数设置成很大,数量大于整个文件所有行数以上两种计划都存在同样的问题,第一种是相当于将文件所有内容加载到内存,将会占用过多内存。
而第二种创建过多的线程,同样也会占用过多内存。
一旦内存占用过多,GC 无法清理,就可能会引起频繁的 「Full GC」,以至招致 「OOM」,招致法式导入速渡过慢。
当然了,我们还能够第三种计划,综合前两种,设置适宜队列长度,以及适宜更大线程数。不外呢,「适宜」那个度实欠好掌握,别的也仍是有 「OOM」 问题。
所认为领会决那个问题,研究出两个处理计划:
CountDownLatch 批量施行扩展线程池CountDownLatch 批量施行JDK 供给的 CountDownLatch,能够让主线程期待子线程都施行完成之后,再继续往下施行。
操纵那个特征,我们能够革新多线程导入的代码,主体逻辑如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { // 存储每个使命施行的行数 List lines = Lists.newArrayList(); // 存储异步使命 List tasks = Lists.newArrayList(); while (iterator.hasNext()) { String line = iterator.nextLine(); lines.add(line); // 设置每个线程施行的行数 if (lines.size() == 1000) { // 新建异步使命,留意那里需要创建一个 List tasks.add(new ConvertTask(Lists.newArrayList(lines))); lines.clear(); } if (tasks.size() == 10) { asyncBatchExecuteTask(tasks); } } // 文件读取完毕,但是可能还存在未被内容 tasks.add(new ConvertTask(Lists.newArrayList(lines))); // 最初再施行一次 asyncBatchExecuteTask(tasks);}那段代码中,每个异步使命将会导入 1000 行数据,等积累了 10 个异步使命,然后将会挪用 asyncBatchExecuteTask 利用线程池异步施行。
private static void asyncBatchExecuteTask(List tasks) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(tasks.size()); for (ConvertTask task : tasks) { task.setCountDownLatch(countDownLatch); executorService.submit(task); } // 主线程期待异步线程 countDownLatch 施行完毕 countDownLatch.await(); // 清空,从头添加使命 tasks.clear();}asyncBatchExecuteTask 办法内将会创建 CountDownLatch,然后主线程内挪用 await办法期待所有异步线程施行完毕。
ConvertTask 异步使命逻辑如下:
private static class ConvertTask implements Runnable { private CountDownLatch countDownLatch; private List lines; public ConvertTask(List lines) { this.lines = lines; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { for (String line : lines) { convertToDB(line); } } finally { countDownLatch.countDown(); } }}ConvertTask使命类逻辑就十分简单,遍历所有行,将其导入到数据库中。所有数据导入完毕,挪用 countDownLatch#countDown。
一旦所有异步线程施行完毕,挪用 countDownLatch#countDown,主线程将会被唤醒,继续施行文件读取。
固然那种体例处理上述问题,但是那种体例,每次都需要积累必然使命数才气起头异步施行所有使命。
别的每次都需要期待所有使命施行完毕之后,才气起头下一批使命,批量施行消耗的时间等于最慢的异步使命消耗的时间。
那种体例线程池中线程存在必然的闲置时间,那有没有法子不断压榨线程池,让它不断在干活呢?
扩展线程池回到最起头的问题,文件读取导入,其实就是一个「消费者-消费者」消费模子。
主线程做为消费者不竭读取文件,然后将其放置到队列中。
异步线程做为消费者不竭从队列中读取内容,导入到数据库中。
「一旦队列满载,消费者应该阻塞,曲到消费者消费使命。」
其实我们利用线程池的也是一个「消费者-消费者」消费模子,其也利用阻塞队列。
那为什么线程池在队列满载的时候,不发作阻塞?
那是因为线程池内部利用 offer 办法,那个办法在队列满载的时候「不会发作阻塞」,而是间接返回 。

那我们有没有法子在线程池队列满载的时候,阻塞主线程添加使命?
其实是能够的,我们自定义线程池回绝战略,当队列满时改为挪用 BlockingQueue.put 来实现消费者的阻塞。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } }};如许一旦线程池满载,主线程将会被阻塞。
利用那种体例之后,我们能够间接利用上面提到的多线程导入的代码。
ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 60, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("test-%d").build(), (r, executor) -> { if (!executor.isShutdown()) { try { // 主线程将会被阻塞 executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } });File file = new File("文件途径");try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) { while (iterator.hasNext()) { String line = iterator.nextLine(); executorService.submit(() -> convertToDB(line)); }} 小 结一个超大的文件,我们能够接纳拆分文件的体例,将其拆分红多份文件,然后摆设多个应用法式进步读取速度。
别的读取过程我们还能够利用多线程的体例并发导入,不外我们需要留意线程池满载之后,将会回绝后续使命。
我们能够通过扩展线程池,自定义回绝战略,使读取主线程阻塞。
好了,今天文章内容就到那里,不晓得列位有没有其他更好的处理法子,欢送留言讨论。






还没有评论,来说两句吧...